[go: nahoru, domu]

file.c revision 79a8726a8453a2350f463fc3182bae43a5417181
1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 *
36 * lustre/llite/file.c
37 *
38 * Author: Peter Braam <braam@clusterfs.com>
39 * Author: Phil Schwan <phil@clusterfs.com>
40 * Author: Andreas Dilger <adilger@clusterfs.com>
41 */
42
43#define DEBUG_SUBSYSTEM S_LLITE
44#include <lustre_dlm.h>
45#include <lustre_lite.h>
46#include <linux/pagemap.h>
47#include <linux/file.h>
48#include "llite_internal.h"
49#include <lustre/ll_fiemap.h>
50
51#include "cl_object.h"
52
53struct ll_file_data *ll_file_data_get(void)
54{
55	struct ll_file_data *fd;
56
57	OBD_SLAB_ALLOC_PTR_GFP(fd, ll_file_data_slab, __GFP_IO);
58	if (fd == NULL)
59		return NULL;
60	fd->fd_write_failed = false;
61	return fd;
62}
63
64static void ll_file_data_put(struct ll_file_data *fd)
65{
66	if (fd != NULL)
67		OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
68}
69
70void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
71			  struct lustre_handle *fh)
72{
73	op_data->op_fid1 = ll_i2info(inode)->lli_fid;
74	op_data->op_attr.ia_mode = inode->i_mode;
75	op_data->op_attr.ia_atime = inode->i_atime;
76	op_data->op_attr.ia_mtime = inode->i_mtime;
77	op_data->op_attr.ia_ctime = inode->i_ctime;
78	op_data->op_attr.ia_size = i_size_read(inode);
79	op_data->op_attr_blocks = inode->i_blocks;
80	((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags =
81					ll_inode_to_ext_flags(inode->i_flags);
82	op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
83	if (fh)
84		op_data->op_handle = *fh;
85	op_data->op_capa1 = ll_mdscapa_get(inode);
86
87	if (LLIF_DATA_MODIFIED & ll_i2info(inode)->lli_flags)
88		op_data->op_bias |= MDS_DATA_MODIFIED;
89}
90
91/**
92 * Closes the IO epoch and packs all the attributes into @op_data for
93 * the CLOSE rpc.
94 */
95static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
96			     struct obd_client_handle *och)
97{
98	ENTRY;
99
100	op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME | ATTR_ATIME_SET |
101					ATTR_MTIME | ATTR_MTIME_SET |
102					ATTR_CTIME | ATTR_CTIME_SET;
103
104	if (!(och->och_flags & FMODE_WRITE))
105		goto out;
106
107	if (!exp_connect_som(ll_i2mdexp(inode)) || !S_ISREG(inode->i_mode))
108		op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
109	else
110		ll_ioepoch_close(inode, op_data, &och, 0);
111
112out:
113	ll_pack_inode2opdata(inode, op_data, &och->och_fh);
114	ll_prep_md_op_data(op_data, inode, NULL, NULL,
115			   0, 0, LUSTRE_OPC_ANY, NULL);
116	EXIT;
117}
118
119static int ll_close_inode_openhandle(struct obd_export *md_exp,
120				     struct inode *inode,
121				     struct obd_client_handle *och)
122{
123	struct obd_export *exp = ll_i2mdexp(inode);
124	struct md_op_data *op_data;
125	struct ptlrpc_request *req = NULL;
126	struct obd_device *obd = class_exp2obd(exp);
127	int epoch_close = 1;
128	int rc;
129	ENTRY;
130
131	if (obd == NULL) {
132		/*
133		 * XXX: in case of LMV, is this correct to access
134		 * ->exp_handle?
135		 */
136		CERROR("Invalid MDC connection handle "LPX64"\n",
137		       ll_i2mdexp(inode)->exp_handle.h_cookie);
138		GOTO(out, rc = 0);
139	}
140
141	OBD_ALLOC_PTR(op_data);
142	if (op_data == NULL)
143		GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
144
145	ll_prepare_close(inode, op_data, och);
146	epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
147	rc = md_close(md_exp, op_data, och->och_mod, &req);
148	if (rc == -EAGAIN) {
149		/* This close must have the epoch closed. */
150		LASSERT(epoch_close);
151		/* MDS has instructed us to obtain Size-on-MDS attribute from
152		 * OSTs and send setattr to back to MDS. */
153		rc = ll_som_update(inode, op_data);
154		if (rc) {
155			CERROR("inode %lu mdc Size-on-MDS update failed: "
156			       "rc = %d\n", inode->i_ino, rc);
157			rc = 0;
158		}
159	} else if (rc) {
160		CERROR("inode %lu mdc close failed: rc = %d\n",
161		       inode->i_ino, rc);
162	}
163
164	/* DATA_MODIFIED flag was successfully sent on close, cancel data
165	 * modification flag. */
166	if (rc == 0 && (op_data->op_bias & MDS_DATA_MODIFIED)) {
167		struct ll_inode_info *lli = ll_i2info(inode);
168
169		spin_lock(&lli->lli_lock);
170		lli->lli_flags &= ~LLIF_DATA_MODIFIED;
171		spin_unlock(&lli->lli_lock);
172	}
173
174	ll_finish_md_op_data(op_data);
175
176	if (rc == 0) {
177		rc = ll_objects_destroy(req, inode);
178		if (rc)
179			CERROR("inode %lu ll_objects destroy: rc = %d\n",
180			       inode->i_ino, rc);
181	}
182
183	EXIT;
184out:
185
186	if (exp_connect_som(exp) && !epoch_close &&
187	    S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
188		ll_queue_done_writing(inode, LLIF_DONE_WRITING);
189	} else {
190		md_clear_open_replay_data(md_exp, och);
191		/* Free @och if it is not waiting for DONE_WRITING. */
192		och->och_fh.cookie = DEAD_HANDLE_MAGIC;
193		OBD_FREE_PTR(och);
194	}
195	if (req) /* This is close request */
196		ptlrpc_req_finished(req);
197	return rc;
198}
199
200int ll_md_real_close(struct inode *inode, int flags)
201{
202	struct ll_inode_info *lli = ll_i2info(inode);
203	struct obd_client_handle **och_p;
204	struct obd_client_handle *och;
205	__u64 *och_usecount;
206	int rc = 0;
207	ENTRY;
208
209	if (flags & FMODE_WRITE) {
210		och_p = &lli->lli_mds_write_och;
211		och_usecount = &lli->lli_open_fd_write_count;
212	} else if (flags & FMODE_EXEC) {
213		och_p = &lli->lli_mds_exec_och;
214		och_usecount = &lli->lli_open_fd_exec_count;
215	} else {
216		LASSERT(flags & FMODE_READ);
217		och_p = &lli->lli_mds_read_och;
218		och_usecount = &lli->lli_open_fd_read_count;
219	}
220
221	mutex_lock(&lli->lli_och_mutex);
222	if (*och_usecount) { /* There are still users of this handle, so
223				skip freeing it. */
224		mutex_unlock(&lli->lli_och_mutex);
225		RETURN(0);
226	}
227	och=*och_p;
228	*och_p = NULL;
229	mutex_unlock(&lli->lli_och_mutex);
230
231	if (och) { /* There might be a race and somebody have freed this och
232		      already */
233		rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
234					       inode, och);
235	}
236
237	RETURN(rc);
238}
239
240int ll_md_close(struct obd_export *md_exp, struct inode *inode,
241		struct file *file)
242{
243	struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
244	struct ll_inode_info *lli = ll_i2info(inode);
245	int rc = 0;
246	ENTRY;
247
248	/* clear group lock, if present */
249	if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED))
250		ll_put_grouplock(inode, file, fd->fd_grouplock.cg_gid);
251
252	/* Let's see if we have good enough OPEN lock on the file and if
253	   we can skip talking to MDS */
254	if (file->f_dentry->d_inode) { /* Can this ever be false? */
255		int lockmode;
256		int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
257		struct lustre_handle lockh;
258		struct inode *inode = file->f_dentry->d_inode;
259		ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
260
261		mutex_lock(&lli->lli_och_mutex);
262		if (fd->fd_omode & FMODE_WRITE) {
263			lockmode = LCK_CW;
264			LASSERT(lli->lli_open_fd_write_count);
265			lli->lli_open_fd_write_count--;
266		} else if (fd->fd_omode & FMODE_EXEC) {
267			lockmode = LCK_PR;
268			LASSERT(lli->lli_open_fd_exec_count);
269			lli->lli_open_fd_exec_count--;
270		} else {
271			lockmode = LCK_CR;
272			LASSERT(lli->lli_open_fd_read_count);
273			lli->lli_open_fd_read_count--;
274		}
275		mutex_unlock(&lli->lli_och_mutex);
276
277		if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
278				   LDLM_IBITS, &policy, lockmode,
279				   &lockh)) {
280			rc = ll_md_real_close(file->f_dentry->d_inode,
281					      fd->fd_omode);
282		}
283	} else {
284		CERROR("Releasing a file %p with negative dentry %p. Name %s",
285		       file, file->f_dentry, file->f_dentry->d_name.name);
286	}
287
288	LUSTRE_FPRIVATE(file) = NULL;
289	ll_file_data_put(fd);
290	ll_capa_close(inode);
291
292	RETURN(rc);
293}
294
295/* While this returns an error code, fput() the caller does not, so we need
296 * to make every effort to clean up all of our state here.  Also, applications
297 * rarely check close errors and even if an error is returned they will not
298 * re-try the close call.
299 */
300int ll_file_release(struct inode *inode, struct file *file)
301{
302	struct ll_file_data *fd;
303	struct ll_sb_info *sbi = ll_i2sbi(inode);
304	struct ll_inode_info *lli = ll_i2info(inode);
305	int rc;
306	ENTRY;
307
308	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
309	       inode->i_generation, inode);
310
311#ifdef CONFIG_FS_POSIX_ACL
312	if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
313	    inode == inode->i_sb->s_root->d_inode) {
314		struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
315
316		LASSERT(fd != NULL);
317		if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
318			fd->fd_flags &= ~LL_FILE_RMTACL;
319			rct_del(&sbi->ll_rct, current_pid());
320			et_search_free(&sbi->ll_et, current_pid());
321		}
322	}
323#endif
324
325	if (inode->i_sb->s_root != file->f_dentry)
326		ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
327	fd = LUSTRE_FPRIVATE(file);
328	LASSERT(fd != NULL);
329
330	/* The last ref on @file, maybe not the the owner pid of statahead.
331	 * Different processes can open the same dir, "ll_opendir_key" means:
332	 * it is me that should stop the statahead thread. */
333	if (S_ISDIR(inode->i_mode) && lli->lli_opendir_key == fd &&
334	    lli->lli_opendir_pid != 0)
335		ll_stop_statahead(inode, lli->lli_opendir_key);
336
337	if (inode->i_sb->s_root == file->f_dentry) {
338		LUSTRE_FPRIVATE(file) = NULL;
339		ll_file_data_put(fd);
340		RETURN(0);
341	}
342
343	if (!S_ISDIR(inode->i_mode)) {
344		lov_read_and_clear_async_rc(lli->lli_clob);
345		lli->lli_async_rc = 0;
346	}
347
348	rc = ll_md_close(sbi->ll_md_exp, inode, file);
349
350	if (CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_DUMP_LOG, cfs_fail_val))
351		libcfs_debug_dumplog();
352
353	RETURN(rc);
354}
355
356static int ll_intent_file_open(struct file *file, void *lmm,
357			       int lmmsize, struct lookup_intent *itp)
358{
359	struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
360	struct dentry *parent = file->f_dentry->d_parent;
361	const char *name = file->f_dentry->d_name.name;
362	const int len = file->f_dentry->d_name.len;
363	struct md_op_data *op_data;
364	struct ptlrpc_request *req;
365	__u32 opc = LUSTRE_OPC_ANY;
366	int rc;
367	ENTRY;
368
369	if (!parent)
370		RETURN(-ENOENT);
371
372	/* Usually we come here only for NFSD, and we want open lock.
373	   But we can also get here with pre 2.6.15 patchless kernels, and in
374	   that case that lock is also ok */
375	/* We can also get here if there was cached open handle in revalidate_it
376	 * but it disappeared while we were getting from there to ll_file_open.
377	 * But this means this file was closed and immediatelly opened which
378	 * makes a good candidate for using OPEN lock */
379	/* If lmmsize & lmm are not 0, we are just setting stripe info
380	 * parameters. No need for the open lock */
381	if (lmm == NULL && lmmsize == 0) {
382		itp->it_flags |= MDS_OPEN_LOCK;
383		if (itp->it_flags & FMODE_WRITE)
384			opc = LUSTRE_OPC_CREATE;
385	}
386
387	op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
388				      file->f_dentry->d_inode, name, len,
389				      O_RDWR, opc, NULL);
390	if (IS_ERR(op_data))
391		RETURN(PTR_ERR(op_data));
392
393	itp->it_flags |= MDS_OPEN_BY_FID;
394	rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
395			    0 /*unused */, &req, ll_md_blocking_ast, 0);
396	ll_finish_md_op_data(op_data);
397	if (rc == -ESTALE) {
398		/* reason for keep own exit path - don`t flood log
399		* with messages with -ESTALE errors.
400		*/
401		if (!it_disposition(itp, DISP_OPEN_OPEN) ||
402		     it_open_error(DISP_OPEN_OPEN, itp))
403			GOTO(out, rc);
404		ll_release_openhandle(file->f_dentry, itp);
405		GOTO(out, rc);
406	}
407
408	if (it_disposition(itp, DISP_LOOKUP_NEG))
409		GOTO(out, rc = -ENOENT);
410
411	if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
412		rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
413		CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
414		GOTO(out, rc);
415	}
416
417	rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL, itp);
418	if (!rc && itp->d.lustre.it_lock_mode)
419		ll_set_lock_data(sbi->ll_md_exp, file->f_dentry->d_inode,
420				 itp, NULL);
421
422out:
423	ptlrpc_req_finished(itp->d.lustre.it_data);
424	it_clear_disposition(itp, DISP_ENQ_COMPLETE);
425	ll_intent_drop_lock(itp);
426
427	RETURN(rc);
428}
429
430/**
431 * Assign an obtained @ioepoch to client's inode. No lock is needed, MDS does
432 * not believe attributes if a few ioepoch holders exist. Attributes for
433 * previous ioepoch if new one is opened are also skipped by MDS.
434 */
435void ll_ioepoch_open(struct ll_inode_info *lli, __u64 ioepoch)
436{
437	if (ioepoch && lli->lli_ioepoch != ioepoch) {
438		lli->lli_ioepoch = ioepoch;
439		CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
440		       ioepoch, PFID(&lli->lli_fid));
441	}
442}
443
444static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
445		       struct lookup_intent *it, struct obd_client_handle *och)
446{
447	struct ptlrpc_request *req = it->d.lustre.it_data;
448	struct mdt_body *body;
449
450	LASSERT(och);
451
452	body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
453	LASSERT(body != NULL);		      /* reply already checked out */
454
455	memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
456	och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
457	och->och_fid = lli->lli_fid;
458	och->och_flags = it->it_flags;
459	ll_ioepoch_open(lli, body->ioepoch);
460
461	return md_set_open_replay_data(md_exp, och, req);
462}
463
464int ll_local_open(struct file *file, struct lookup_intent *it,
465		  struct ll_file_data *fd, struct obd_client_handle *och)
466{
467	struct inode *inode = file->f_dentry->d_inode;
468	struct ll_inode_info *lli = ll_i2info(inode);
469	ENTRY;
470
471	LASSERT(!LUSTRE_FPRIVATE(file));
472
473	LASSERT(fd != NULL);
474
475	if (och) {
476		struct ptlrpc_request *req = it->d.lustre.it_data;
477		struct mdt_body *body;
478		int rc;
479
480		rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
481		if (rc)
482			RETURN(rc);
483
484		body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
485		if ((it->it_flags & FMODE_WRITE) &&
486		    (body->valid & OBD_MD_FLSIZE))
487			CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
488			       lli->lli_ioepoch, PFID(&lli->lli_fid));
489	}
490
491	LUSTRE_FPRIVATE(file) = fd;
492	ll_readahead_init(inode, &fd->fd_ras);
493	fd->fd_omode = it->it_flags;
494	RETURN(0);
495}
496
497/* Open a file, and (for the very first open) create objects on the OSTs at
498 * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
499 * creation or open until ll_lov_setstripe() ioctl is called.
500 *
501 * If we already have the stripe MD locally then we don't request it in
502 * md_open(), by passing a lmm_size = 0.
503 *
504 * It is up to the application to ensure no other processes open this file
505 * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
506 * used.  We might be able to avoid races of that sort by getting lli_open_sem
507 * before returning in the O_LOV_DELAY_CREATE case and dropping it here
508 * or in ll_file_release(), but I'm not sure that is desirable/necessary.
509 */
510int ll_file_open(struct inode *inode, struct file *file)
511{
512	struct ll_inode_info *lli = ll_i2info(inode);
513	struct lookup_intent *it, oit = { .it_op = IT_OPEN,
514					  .it_flags = file->f_flags };
515	struct obd_client_handle **och_p = NULL;
516	__u64 *och_usecount = NULL;
517	struct ll_file_data *fd;
518	int rc = 0, opendir_set = 0;
519	ENTRY;
520
521	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
522	       inode->i_generation, inode, file->f_flags);
523
524	it = file->private_data; /* XXX: compat macro */
525	file->private_data = NULL; /* prevent ll_local_open assertion */
526
527	fd = ll_file_data_get();
528	if (fd == NULL)
529		GOTO(out_openerr, rc = -ENOMEM);
530
531	fd->fd_file = file;
532	if (S_ISDIR(inode->i_mode)) {
533		spin_lock(&lli->lli_sa_lock);
534		if (lli->lli_opendir_key == NULL && lli->lli_sai == NULL &&
535		    lli->lli_opendir_pid == 0) {
536			lli->lli_opendir_key = fd;
537			lli->lli_opendir_pid = current_pid();
538			opendir_set = 1;
539		}
540		spin_unlock(&lli->lli_sa_lock);
541	}
542
543	if (inode->i_sb->s_root == file->f_dentry) {
544		LUSTRE_FPRIVATE(file) = fd;
545		RETURN(0);
546	}
547
548	if (!it || !it->d.lustre.it_disposition) {
549		/* Convert f_flags into access mode. We cannot use file->f_mode,
550		 * because everything but O_ACCMODE mask was stripped from
551		 * there */
552		if ((oit.it_flags + 1) & O_ACCMODE)
553			oit.it_flags++;
554		if (file->f_flags & O_TRUNC)
555			oit.it_flags |= FMODE_WRITE;
556
557		/* kernel only call f_op->open in dentry_open.  filp_open calls
558		 * dentry_open after call to open_namei that checks permissions.
559		 * Only nfsd_open call dentry_open directly without checking
560		 * permissions and because of that this code below is safe. */
561		if (oit.it_flags & (FMODE_WRITE | FMODE_READ))
562			oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
563
564		/* We do not want O_EXCL here, presumably we opened the file
565		 * already? XXX - NFS implications? */
566		oit.it_flags &= ~O_EXCL;
567
568		/* bug20584, if "it_flags" contains O_CREAT, the file will be
569		 * created if necessary, then "IT_CREAT" should be set to keep
570		 * consistent with it */
571		if (oit.it_flags & O_CREAT)
572			oit.it_op |= IT_CREAT;
573
574		it = &oit;
575	}
576
577restart:
578	/* Let's see if we have file open on MDS already. */
579	if (it->it_flags & FMODE_WRITE) {
580		och_p = &lli->lli_mds_write_och;
581		och_usecount = &lli->lli_open_fd_write_count;
582	} else if (it->it_flags & FMODE_EXEC) {
583		och_p = &lli->lli_mds_exec_och;
584		och_usecount = &lli->lli_open_fd_exec_count;
585	 } else {
586		och_p = &lli->lli_mds_read_och;
587		och_usecount = &lli->lli_open_fd_read_count;
588	}
589
590	mutex_lock(&lli->lli_och_mutex);
591	if (*och_p) { /* Open handle is present */
592		if (it_disposition(it, DISP_OPEN_OPEN)) {
593			/* Well, there's extra open request that we do not need,
594			   let's close it somehow. This will decref request. */
595			rc = it_open_error(DISP_OPEN_OPEN, it);
596			if (rc) {
597				mutex_unlock(&lli->lli_och_mutex);
598				GOTO(out_openerr, rc);
599			}
600
601			ll_release_openhandle(file->f_dentry, it);
602		}
603		(*och_usecount)++;
604
605		rc = ll_local_open(file, it, fd, NULL);
606		if (rc) {
607			(*och_usecount)--;
608			mutex_unlock(&lli->lli_och_mutex);
609			GOTO(out_openerr, rc);
610		}
611	} else {
612		LASSERT(*och_usecount == 0);
613		if (!it->d.lustre.it_disposition) {
614			/* We cannot just request lock handle now, new ELC code
615			   means that one of other OPEN locks for this file
616			   could be cancelled, and since blocking ast handler
617			   would attempt to grab och_mutex as well, that would
618			   result in a deadlock */
619			mutex_unlock(&lli->lli_och_mutex);
620			it->it_create_mode |= M_CHECK_STALE;
621			rc = ll_intent_file_open(file, NULL, 0, it);
622			it->it_create_mode &= ~M_CHECK_STALE;
623			if (rc)
624				GOTO(out_openerr, rc);
625
626			goto restart;
627		}
628		OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
629		if (!*och_p)
630			GOTO(out_och_free, rc = -ENOMEM);
631
632		(*och_usecount)++;
633
634		/* md_intent_lock() didn't get a request ref if there was an
635		 * open error, so don't do cleanup on the request here
636		 * (bug 3430) */
637		/* XXX (green): Should not we bail out on any error here, not
638		 * just open error? */
639		rc = it_open_error(DISP_OPEN_OPEN, it);
640		if (rc)
641			GOTO(out_och_free, rc);
642
643		LASSERT(it_disposition(it, DISP_ENQ_OPEN_REF));
644
645		rc = ll_local_open(file, it, fd, *och_p);
646		if (rc)
647			GOTO(out_och_free, rc);
648	}
649	mutex_unlock(&lli->lli_och_mutex);
650	fd = NULL;
651
652	/* Must do this outside lli_och_mutex lock to prevent deadlock where
653	   different kind of OPEN lock for this same inode gets cancelled
654	   by ldlm_cancel_lru */
655	if (!S_ISREG(inode->i_mode))
656		GOTO(out_och_free, rc);
657
658	ll_capa_open(inode);
659
660	if (!lli->lli_has_smd) {
661		if (file->f_flags & O_LOV_DELAY_CREATE ||
662		    !(file->f_mode & FMODE_WRITE)) {
663			CDEBUG(D_INODE, "object creation was delayed\n");
664			GOTO(out_och_free, rc);
665		}
666	}
667	file->f_flags &= ~O_LOV_DELAY_CREATE;
668	GOTO(out_och_free, rc);
669
670out_och_free:
671	if (rc) {
672		if (och_p && *och_p) {
673			OBD_FREE(*och_p, sizeof (struct obd_client_handle));
674			*och_p = NULL; /* OBD_FREE writes some magic there */
675			(*och_usecount)--;
676		}
677		mutex_unlock(&lli->lli_och_mutex);
678
679out_openerr:
680		if (opendir_set != 0)
681			ll_stop_statahead(inode, lli->lli_opendir_key);
682		if (fd != NULL)
683			ll_file_data_put(fd);
684	} else {
685		ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
686	}
687
688	if (it && it_disposition(it, DISP_ENQ_OPEN_REF)) {
689		ptlrpc_req_finished(it->d.lustre.it_data);
690		it_clear_disposition(it, DISP_ENQ_OPEN_REF);
691	}
692
693	return rc;
694}
695
696/* Fills the obdo with the attributes for the lsm */
697static int ll_lsm_getattr(struct lov_stripe_md *lsm, struct obd_export *exp,
698			  struct obd_capa *capa, struct obdo *obdo,
699			  __u64 ioepoch, int sync)
700{
701	struct ptlrpc_request_set *set;
702	struct obd_info	    oinfo = { { { 0 } } };
703	int			rc;
704
705	ENTRY;
706
707	LASSERT(lsm != NULL);
708
709	oinfo.oi_md = lsm;
710	oinfo.oi_oa = obdo;
711	oinfo.oi_oa->o_oi = lsm->lsm_oi;
712	oinfo.oi_oa->o_mode = S_IFREG;
713	oinfo.oi_oa->o_ioepoch = ioepoch;
714	oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
715			       OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
716			       OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
717			       OBD_MD_FLMTIME | OBD_MD_FLCTIME |
718			       OBD_MD_FLGROUP | OBD_MD_FLEPOCH |
719			       OBD_MD_FLDATAVERSION;
720	oinfo.oi_capa = capa;
721	if (sync) {
722		oinfo.oi_oa->o_valid |= OBD_MD_FLFLAGS;
723		oinfo.oi_oa->o_flags |= OBD_FL_SRVLOCK;
724	}
725
726	set = ptlrpc_prep_set();
727	if (set == NULL) {
728		CERROR("can't allocate ptlrpc set\n");
729		rc = -ENOMEM;
730	} else {
731		rc = obd_getattr_async(exp, &oinfo, set);
732		if (rc == 0)
733			rc = ptlrpc_set_wait(set);
734		ptlrpc_set_destroy(set);
735	}
736	if (rc == 0)
737		oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
738					 OBD_MD_FLATIME | OBD_MD_FLMTIME |
739					 OBD_MD_FLCTIME | OBD_MD_FLSIZE |
740					 OBD_MD_FLDATAVERSION);
741	RETURN(rc);
742}
743
744/**
745  * Performs the getattr on the inode and updates its fields.
746  * If @sync != 0, perform the getattr under the server-side lock.
747  */
748int ll_inode_getattr(struct inode *inode, struct obdo *obdo,
749		     __u64 ioepoch, int sync)
750{
751	struct obd_capa      *capa = ll_mdscapa_get(inode);
752	struct lov_stripe_md *lsm;
753	int rc;
754	ENTRY;
755
756	lsm = ccc_inode_lsm_get(inode);
757	rc = ll_lsm_getattr(lsm, ll_i2dtexp(inode),
758			    capa, obdo, ioepoch, sync);
759	capa_put(capa);
760	if (rc == 0) {
761		struct ost_id *oi = lsm ? &lsm->lsm_oi : &obdo->o_oi;
762
763		obdo_refresh_inode(inode, obdo, obdo->o_valid);
764		CDEBUG(D_INODE, "objid "DOSTID" size %llu, blocks %llu,"
765		       " blksize %lu\n", POSTID(oi), i_size_read(inode),
766		       (unsigned long long)inode->i_blocks,
767		       (unsigned long)ll_inode_blksize(inode));
768	}
769	ccc_inode_lsm_put(inode, lsm);
770	RETURN(rc);
771}
772
773int ll_merge_lvb(const struct lu_env *env, struct inode *inode)
774{
775	struct ll_inode_info *lli = ll_i2info(inode);
776	struct cl_object *obj = lli->lli_clob;
777	struct cl_attr *attr = ccc_env_thread_attr(env);
778	struct ost_lvb lvb;
779	int rc = 0;
780
781	ENTRY;
782
783	ll_inode_size_lock(inode);
784	/* merge timestamps the most recently obtained from mds with
785	   timestamps obtained from osts */
786	LTIME_S(inode->i_atime) = lli->lli_lvb.lvb_atime;
787	LTIME_S(inode->i_mtime) = lli->lli_lvb.lvb_mtime;
788	LTIME_S(inode->i_ctime) = lli->lli_lvb.lvb_ctime;
789	inode_init_lvb(inode, &lvb);
790
791	cl_object_attr_lock(obj);
792	rc = cl_object_attr_get(env, obj, attr);
793	cl_object_attr_unlock(obj);
794
795	if (rc == 0) {
796		if (lvb.lvb_atime < attr->cat_atime)
797			lvb.lvb_atime = attr->cat_atime;
798		if (lvb.lvb_ctime < attr->cat_ctime)
799			lvb.lvb_ctime = attr->cat_ctime;
800		if (lvb.lvb_mtime < attr->cat_mtime)
801			lvb.lvb_mtime = attr->cat_mtime;
802
803		CDEBUG(D_VFSTRACE, DFID" updating i_size "LPU64"\n",
804				PFID(&lli->lli_fid), attr->cat_size);
805		cl_isize_write_nolock(inode, attr->cat_size);
806
807		inode->i_blocks = attr->cat_blocks;
808
809		LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
810		LTIME_S(inode->i_atime) = lvb.lvb_atime;
811		LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
812	}
813	ll_inode_size_unlock(inode);
814
815	RETURN(rc);
816}
817
818int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
819		     lstat_t *st)
820{
821	struct obdo obdo = { 0 };
822	int rc;
823
824	rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, &obdo, 0, 0);
825	if (rc == 0) {
826		st->st_size   = obdo.o_size;
827		st->st_blocks = obdo.o_blocks;
828		st->st_mtime  = obdo.o_mtime;
829		st->st_atime  = obdo.o_atime;
830		st->st_ctime  = obdo.o_ctime;
831	}
832	return rc;
833}
834
835void ll_io_init(struct cl_io *io, const struct file *file, int write)
836{
837	struct inode *inode = file->f_dentry->d_inode;
838
839	io->u.ci_rw.crw_nonblock = file->f_flags & O_NONBLOCK;
840	if (write) {
841		io->u.ci_wr.wr_append = !!(file->f_flags & O_APPEND);
842		io->u.ci_wr.wr_sync = file->f_flags & O_SYNC ||
843				      file->f_flags & O_DIRECT ||
844				      IS_SYNC(inode);
845	}
846	io->ci_obj     = ll_i2info(inode)->lli_clob;
847	io->ci_lockreq = CILR_MAYBE;
848	if (ll_file_nolock(file)) {
849		io->ci_lockreq = CILR_NEVER;
850		io->ci_no_srvlock = 1;
851	} else if (file->f_flags & O_APPEND) {
852		io->ci_lockreq = CILR_MANDATORY;
853	}
854}
855
856static ssize_t
857ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
858		   struct file *file, enum cl_io_type iot,
859		   loff_t *ppos, size_t count)
860{
861	struct ll_inode_info *lli = ll_i2info(file->f_dentry->d_inode);
862	struct ll_file_data  *fd  = LUSTRE_FPRIVATE(file);
863	struct cl_io	 *io;
864	ssize_t	       result;
865	ENTRY;
866
867restart:
868	io = ccc_env_thread_io(env);
869	ll_io_init(io, file, iot == CIT_WRITE);
870
871	if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) {
872		struct vvp_io *vio = vvp_env_io(env);
873		struct ccc_io *cio = ccc_env_io(env);
874		int write_mutex_locked = 0;
875
876		cio->cui_fd  = LUSTRE_FPRIVATE(file);
877		vio->cui_io_subtype = args->via_io_subtype;
878
879		switch (vio->cui_io_subtype) {
880		case IO_NORMAL:
881			cio->cui_iov = args->u.normal.via_iov;
882			cio->cui_nrsegs = args->u.normal.via_nrsegs;
883			cio->cui_tot_nrsegs = cio->cui_nrsegs;
884			cio->cui_iocb = args->u.normal.via_iocb;
885			if ((iot == CIT_WRITE) &&
886			    !(cio->cui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
887				if (mutex_lock_interruptible(&lli->
888							       lli_write_mutex))
889					GOTO(out, result = -ERESTARTSYS);
890				write_mutex_locked = 1;
891			} else if (iot == CIT_READ) {
892				down_read(&lli->lli_trunc_sem);
893			}
894			break;
895		case IO_SENDFILE:
896			vio->u.sendfile.cui_actor = args->u.sendfile.via_actor;
897			vio->u.sendfile.cui_target = args->u.sendfile.via_target;
898			break;
899		case IO_SPLICE:
900			vio->u.splice.cui_pipe = args->u.splice.via_pipe;
901			vio->u.splice.cui_flags = args->u.splice.via_flags;
902			break;
903		default:
904			CERROR("Unknow IO type - %u\n", vio->cui_io_subtype);
905			LBUG();
906		}
907		result = cl_io_loop(env, io);
908		if (write_mutex_locked)
909			mutex_unlock(&lli->lli_write_mutex);
910		else if (args->via_io_subtype == IO_NORMAL && iot == CIT_READ)
911			up_read(&lli->lli_trunc_sem);
912	} else {
913		/* cl_io_rw_init() handled IO */
914		result = io->ci_result;
915	}
916
917	if (io->ci_nob > 0) {
918		result = io->ci_nob;
919		*ppos = io->u.ci_wr.wr.crw_pos;
920	}
921	GOTO(out, result);
922out:
923	cl_io_fini(env, io);
924	/* If any bit been read/written (result != 0), we just return
925	 * short read/write instead of restart io. */
926	if (result == 0 && io->ci_need_restart) {
927		CDEBUG(D_VFSTRACE, "Restart %s on %s from %lld, count:%zd\n",
928		       iot == CIT_READ ? "read" : "write",
929		       file->f_dentry->d_name.name, *ppos, count);
930		LASSERTF(io->ci_nob == 0, "%zd", io->ci_nob);
931		goto restart;
932	}
933
934	if (iot == CIT_READ) {
935		if (result >= 0)
936			ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
937					   LPROC_LL_READ_BYTES, result);
938	} else if (iot == CIT_WRITE) {
939		if (result >= 0) {
940			ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
941					   LPROC_LL_WRITE_BYTES, result);
942			fd->fd_write_failed = false;
943		} else if (result != -ERESTARTSYS) {
944			fd->fd_write_failed = true;
945		}
946	}
947
948	return result;
949}
950
951
952/*
953 * XXX: exact copy from kernel code (__generic_file_aio_write_nolock)
954 */
955static int ll_file_get_iov_count(const struct iovec *iov,
956				 unsigned long *nr_segs, size_t *count)
957{
958	size_t cnt = 0;
959	unsigned long seg;
960
961	for (seg = 0; seg < *nr_segs; seg++) {
962		const struct iovec *iv = &iov[seg];
963
964		/*
965		 * If any segment has a negative length, or the cumulative
966		 * length ever wraps negative then return -EINVAL.
967		 */
968		cnt += iv->iov_len;
969		if (unlikely((ssize_t)(cnt|iv->iov_len) < 0))
970			return -EINVAL;
971		if (access_ok(VERIFY_READ, iv->iov_base, iv->iov_len))
972			continue;
973		if (seg == 0)
974			return -EFAULT;
975		*nr_segs = seg;
976		cnt -= iv->iov_len;   /* This segment is no good */
977		break;
978	}
979	*count = cnt;
980	return 0;
981}
982
983static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
984				unsigned long nr_segs, loff_t pos)
985{
986	struct lu_env      *env;
987	struct vvp_io_args *args;
988	size_t	      count;
989	ssize_t	     result;
990	int		 refcheck;
991	ENTRY;
992
993	result = ll_file_get_iov_count(iov, &nr_segs, &count);
994	if (result)
995		RETURN(result);
996
997	env = cl_env_get(&refcheck);
998	if (IS_ERR(env))
999		RETURN(PTR_ERR(env));
1000
1001	args = vvp_env_args(env, IO_NORMAL);
1002	args->u.normal.via_iov = (struct iovec *)iov;
1003	args->u.normal.via_nrsegs = nr_segs;
1004	args->u.normal.via_iocb = iocb;
1005
1006	result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ,
1007				    &iocb->ki_pos, count);
1008	cl_env_put(env, &refcheck);
1009	RETURN(result);
1010}
1011
1012static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1013			    loff_t *ppos)
1014{
1015	struct lu_env *env;
1016	struct iovec  *local_iov;
1017	struct kiocb  *kiocb;
1018	ssize_t	result;
1019	int	    refcheck;
1020	ENTRY;
1021
1022	env = cl_env_get(&refcheck);
1023	if (IS_ERR(env))
1024		RETURN(PTR_ERR(env));
1025
1026	local_iov = &vvp_env_info(env)->vti_local_iov;
1027	kiocb = &vvp_env_info(env)->vti_kiocb;
1028	local_iov->iov_base = (void __user *)buf;
1029	local_iov->iov_len = count;
1030	init_sync_kiocb(kiocb, file);
1031	kiocb->ki_pos = *ppos;
1032	kiocb->ki_left = count;
1033
1034	result = ll_file_aio_read(kiocb, local_iov, 1, kiocb->ki_pos);
1035	*ppos = kiocb->ki_pos;
1036
1037	cl_env_put(env, &refcheck);
1038	RETURN(result);
1039}
1040
1041/*
1042 * Write to a file (through the page cache).
1043 */
1044static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
1045				 unsigned long nr_segs, loff_t pos)
1046{
1047	struct lu_env      *env;
1048	struct vvp_io_args *args;
1049	size_t	      count;
1050	ssize_t	     result;
1051	int		 refcheck;
1052	ENTRY;
1053
1054	result = ll_file_get_iov_count(iov, &nr_segs, &count);
1055	if (result)
1056		RETURN(result);
1057
1058	env = cl_env_get(&refcheck);
1059	if (IS_ERR(env))
1060		RETURN(PTR_ERR(env));
1061
1062	args = vvp_env_args(env, IO_NORMAL);
1063	args->u.normal.via_iov = (struct iovec *)iov;
1064	args->u.normal.via_nrsegs = nr_segs;
1065	args->u.normal.via_iocb = iocb;
1066
1067	result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_WRITE,
1068				  &iocb->ki_pos, count);
1069	cl_env_put(env, &refcheck);
1070	RETURN(result);
1071}
1072
1073static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1074			     loff_t *ppos)
1075{
1076	struct lu_env *env;
1077	struct iovec  *local_iov;
1078	struct kiocb  *kiocb;
1079	ssize_t	result;
1080	int	    refcheck;
1081	ENTRY;
1082
1083	env = cl_env_get(&refcheck);
1084	if (IS_ERR(env))
1085		RETURN(PTR_ERR(env));
1086
1087	local_iov = &vvp_env_info(env)->vti_local_iov;
1088	kiocb = &vvp_env_info(env)->vti_kiocb;
1089	local_iov->iov_base = (void __user *)buf;
1090	local_iov->iov_len = count;
1091	init_sync_kiocb(kiocb, file);
1092	kiocb->ki_pos = *ppos;
1093	kiocb->ki_left = count;
1094
1095	result = ll_file_aio_write(kiocb, local_iov, 1, kiocb->ki_pos);
1096	*ppos = kiocb->ki_pos;
1097
1098	cl_env_put(env, &refcheck);
1099	RETURN(result);
1100}
1101
1102
1103
1104/*
1105 * Send file content (through pagecache) somewhere with helper
1106 */
1107static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
1108				   struct pipe_inode_info *pipe, size_t count,
1109				   unsigned int flags)
1110{
1111	struct lu_env      *env;
1112	struct vvp_io_args *args;
1113	ssize_t	     result;
1114	int		 refcheck;
1115	ENTRY;
1116
1117	env = cl_env_get(&refcheck);
1118	if (IS_ERR(env))
1119		RETURN(PTR_ERR(env));
1120
1121	args = vvp_env_args(env, IO_SPLICE);
1122	args->u.splice.via_pipe = pipe;
1123	args->u.splice.via_flags = flags;
1124
1125	result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1126	cl_env_put(env, &refcheck);
1127	RETURN(result);
1128}
1129
1130static int ll_lov_recreate(struct inode *inode, struct ost_id *oi,
1131			   obd_count ost_idx)
1132{
1133	struct obd_export *exp = ll_i2dtexp(inode);
1134	struct obd_trans_info oti = { 0 };
1135	struct obdo *oa = NULL;
1136	int lsm_size;
1137	int rc = 0;
1138	struct lov_stripe_md *lsm = NULL, *lsm2;
1139	ENTRY;
1140
1141	OBDO_ALLOC(oa);
1142	if (oa == NULL)
1143		RETURN(-ENOMEM);
1144
1145	lsm = ccc_inode_lsm_get(inode);
1146	if (lsm == NULL)
1147		GOTO(out, rc = -ENOENT);
1148
1149	lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1150		   (lsm->lsm_stripe_count));
1151
1152	OBD_ALLOC_LARGE(lsm2, lsm_size);
1153	if (lsm2 == NULL)
1154		GOTO(out, rc = -ENOMEM);
1155
1156	oa->o_oi = *oi;
1157	oa->o_nlink = ost_idx;
1158	oa->o_flags |= OBD_FL_RECREATE_OBJS;
1159	oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1160	obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1161				   OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1162	obdo_set_parent_fid(oa, &ll_i2info(inode)->lli_fid);
1163	memcpy(lsm2, lsm, lsm_size);
1164	ll_inode_size_lock(inode);
1165	rc = obd_create(NULL, exp, oa, &lsm2, &oti);
1166	ll_inode_size_unlock(inode);
1167
1168	OBD_FREE_LARGE(lsm2, lsm_size);
1169	GOTO(out, rc);
1170out:
1171	ccc_inode_lsm_put(inode, lsm);
1172	OBDO_FREE(oa);
1173	return rc;
1174}
1175
1176static int ll_lov_recreate_obj(struct inode *inode, unsigned long arg)
1177{
1178	struct ll_recreate_obj ucreat;
1179	struct ost_id		oi;
1180	ENTRY;
1181
1182	if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1183		RETURN(-EPERM);
1184
1185	if (copy_from_user(&ucreat, (struct ll_recreate_obj *)arg,
1186			   sizeof(ucreat)))
1187		RETURN(-EFAULT);
1188
1189	ostid_set_seq_mdt0(&oi);
1190	ostid_set_id(&oi, ucreat.lrc_id);
1191	RETURN(ll_lov_recreate(inode, &oi, ucreat.lrc_ost_idx));
1192}
1193
1194static int ll_lov_recreate_fid(struct inode *inode, unsigned long arg)
1195{
1196	struct lu_fid	fid;
1197	struct ost_id	oi;
1198	obd_count	ost_idx;
1199	ENTRY;
1200
1201	if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1202		RETURN(-EPERM);
1203
1204	if (copy_from_user(&fid, (struct lu_fid *)arg, sizeof(fid)))
1205		RETURN(-EFAULT);
1206
1207	fid_to_ostid(&fid, &oi);
1208	ost_idx = (fid_seq(&fid) >> 16) & 0xffff;
1209	RETURN(ll_lov_recreate(inode, &oi, ost_idx));
1210}
1211
1212int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1213			     int flags, struct lov_user_md *lum, int lum_size)
1214{
1215	struct lov_stripe_md *lsm = NULL;
1216	struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1217	int rc = 0;
1218	ENTRY;
1219
1220	lsm = ccc_inode_lsm_get(inode);
1221	if (lsm != NULL) {
1222		ccc_inode_lsm_put(inode, lsm);
1223		CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1224		       inode->i_ino);
1225		RETURN(-EEXIST);
1226	}
1227
1228	ll_inode_size_lock(inode);
1229	rc = ll_intent_file_open(file, lum, lum_size, &oit);
1230	if (rc)
1231		GOTO(out, rc);
1232	rc = oit.d.lustre.it_status;
1233	if (rc < 0)
1234		GOTO(out_req_free, rc);
1235
1236	ll_release_openhandle(file->f_dentry, &oit);
1237
1238 out:
1239	ll_inode_size_unlock(inode);
1240	ll_intent_release(&oit);
1241	ccc_inode_lsm_put(inode, lsm);
1242	RETURN(rc);
1243out_req_free:
1244	ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1245	goto out;
1246}
1247
1248int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
1249			     struct lov_mds_md **lmmp, int *lmm_size,
1250			     struct ptlrpc_request **request)
1251{
1252	struct ll_sb_info *sbi = ll_i2sbi(inode);
1253	struct mdt_body  *body;
1254	struct lov_mds_md *lmm = NULL;
1255	struct ptlrpc_request *req = NULL;
1256	struct md_op_data *op_data;
1257	int rc, lmmsize;
1258
1259	rc = ll_get_max_mdsize(sbi, &lmmsize);
1260	if (rc)
1261		RETURN(rc);
1262
1263	op_data = ll_prep_md_op_data(NULL, inode, NULL, filename,
1264				     strlen(filename), lmmsize,
1265				     LUSTRE_OPC_ANY, NULL);
1266	if (IS_ERR(op_data))
1267		RETURN(PTR_ERR(op_data));
1268
1269	op_data->op_valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA;
1270	rc = md_getattr_name(sbi->ll_md_exp, op_data, &req);
1271	ll_finish_md_op_data(op_data);
1272	if (rc < 0) {
1273		CDEBUG(D_INFO, "md_getattr_name failed "
1274		       "on %s: rc %d\n", filename, rc);
1275		GOTO(out, rc);
1276	}
1277
1278	body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1279	LASSERT(body != NULL); /* checked by mdc_getattr_name */
1280
1281	lmmsize = body->eadatasize;
1282
1283	if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1284			lmmsize == 0) {
1285		GOTO(out, rc = -ENODATA);
1286	}
1287
1288	lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
1289	LASSERT(lmm != NULL);
1290
1291	if ((lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1)) &&
1292	    (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3))) {
1293		GOTO(out, rc = -EPROTO);
1294	}
1295
1296	/*
1297	 * This is coming from the MDS, so is probably in
1298	 * little endian.  We convert it to host endian before
1299	 * passing it to userspace.
1300	 */
1301	if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) {
1302		/* if function called for directory - we should
1303		 * avoid swab not existent lsm objects */
1304		if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) {
1305			lustre_swab_lov_user_md_v1((struct lov_user_md_v1 *)lmm);
1306			if (S_ISREG(body->mode))
1307				lustre_swab_lov_user_md_objects(
1308				 ((struct lov_user_md_v1 *)lmm)->lmm_objects,
1309				 ((struct lov_user_md_v1 *)lmm)->lmm_stripe_count);
1310		} else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) {
1311			lustre_swab_lov_user_md_v3((struct lov_user_md_v3 *)lmm);
1312			if (S_ISREG(body->mode))
1313				lustre_swab_lov_user_md_objects(
1314				 ((struct lov_user_md_v3 *)lmm)->lmm_objects,
1315				 ((struct lov_user_md_v3 *)lmm)->lmm_stripe_count);
1316		}
1317	}
1318
1319out:
1320	*lmmp = lmm;
1321	*lmm_size = lmmsize;
1322	*request = req;
1323	return rc;
1324}
1325
1326static int ll_lov_setea(struct inode *inode, struct file *file,
1327			    unsigned long arg)
1328{
1329	int			 flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1330	struct lov_user_md	*lump;
1331	int			 lum_size = sizeof(struct lov_user_md) +
1332					    sizeof(struct lov_user_ost_data);
1333	int			 rc;
1334	ENTRY;
1335
1336	if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1337		RETURN(-EPERM);
1338
1339	OBD_ALLOC_LARGE(lump, lum_size);
1340	if (lump == NULL)
1341		RETURN(-ENOMEM);
1342
1343	if (copy_from_user(lump, (struct lov_user_md  *)arg, lum_size)) {
1344		OBD_FREE_LARGE(lump, lum_size);
1345		RETURN(-EFAULT);
1346	}
1347
1348	rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1349
1350	OBD_FREE_LARGE(lump, lum_size);
1351	RETURN(rc);
1352}
1353
1354static int ll_lov_setstripe(struct inode *inode, struct file *file,
1355			    unsigned long arg)
1356{
1357	struct lov_user_md_v3	 lumv3;
1358	struct lov_user_md_v1	*lumv1 = (struct lov_user_md_v1 *)&lumv3;
1359	struct lov_user_md_v1	*lumv1p = (struct lov_user_md_v1 *)arg;
1360	struct lov_user_md_v3	*lumv3p = (struct lov_user_md_v3 *)arg;
1361	int			 lum_size, rc;
1362	int			 flags = FMODE_WRITE;
1363	ENTRY;
1364
1365	/* first try with v1 which is smaller than v3 */
1366	lum_size = sizeof(struct lov_user_md_v1);
1367	if (copy_from_user(lumv1, lumv1p, lum_size))
1368		RETURN(-EFAULT);
1369
1370	if (lumv1->lmm_magic == LOV_USER_MAGIC_V3) {
1371		lum_size = sizeof(struct lov_user_md_v3);
1372		if (copy_from_user(&lumv3, lumv3p, lum_size))
1373			RETURN(-EFAULT);
1374	}
1375
1376	rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
1377	if (rc == 0) {
1378		struct lov_stripe_md *lsm;
1379		__u32 gen;
1380
1381		put_user(0, &lumv1p->lmm_stripe_count);
1382
1383		ll_layout_refresh(inode, &gen);
1384		lsm = ccc_inode_lsm_get(inode);
1385		rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1386				   0, lsm, (void *)arg);
1387		ccc_inode_lsm_put(inode, lsm);
1388	}
1389	RETURN(rc);
1390}
1391
1392static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1393{
1394	struct lov_stripe_md *lsm;
1395	int rc = -ENODATA;
1396	ENTRY;
1397
1398	lsm = ccc_inode_lsm_get(inode);
1399	if (lsm != NULL)
1400		rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0,
1401				   lsm, (void *)arg);
1402	ccc_inode_lsm_put(inode, lsm);
1403	RETURN(rc);
1404}
1405
1406int ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1407{
1408	struct ll_inode_info   *lli = ll_i2info(inode);
1409	struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1410	struct ccc_grouplock    grouplock;
1411	int		     rc;
1412	ENTRY;
1413
1414	if (ll_file_nolock(file))
1415		RETURN(-EOPNOTSUPP);
1416
1417	spin_lock(&lli->lli_lock);
1418	if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1419		CWARN("group lock already existed with gid %lu\n",
1420		      fd->fd_grouplock.cg_gid);
1421		spin_unlock(&lli->lli_lock);
1422		RETURN(-EINVAL);
1423	}
1424	LASSERT(fd->fd_grouplock.cg_lock == NULL);
1425	spin_unlock(&lli->lli_lock);
1426
1427	rc = cl_get_grouplock(cl_i2info(inode)->lli_clob,
1428			      arg, (file->f_flags & O_NONBLOCK), &grouplock);
1429	if (rc)
1430		RETURN(rc);
1431
1432	spin_lock(&lli->lli_lock);
1433	if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1434		spin_unlock(&lli->lli_lock);
1435		CERROR("another thread just won the race\n");
1436		cl_put_grouplock(&grouplock);
1437		RETURN(-EINVAL);
1438	}
1439
1440	fd->fd_flags |= LL_FILE_GROUP_LOCKED;
1441	fd->fd_grouplock = grouplock;
1442	spin_unlock(&lli->lli_lock);
1443
1444	CDEBUG(D_INFO, "group lock %lu obtained\n", arg);
1445	RETURN(0);
1446}
1447
1448int ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1449{
1450	struct ll_inode_info   *lli = ll_i2info(inode);
1451	struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1452	struct ccc_grouplock    grouplock;
1453	ENTRY;
1454
1455	spin_lock(&lli->lli_lock);
1456	if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1457		spin_unlock(&lli->lli_lock);
1458		CWARN("no group lock held\n");
1459		RETURN(-EINVAL);
1460	}
1461	LASSERT(fd->fd_grouplock.cg_lock != NULL);
1462
1463	if (fd->fd_grouplock.cg_gid != arg) {
1464		CWARN("group lock %lu doesn't match current id %lu\n",
1465		       arg, fd->fd_grouplock.cg_gid);
1466		spin_unlock(&lli->lli_lock);
1467		RETURN(-EINVAL);
1468	}
1469
1470	grouplock = fd->fd_grouplock;
1471	memset(&fd->fd_grouplock, 0, sizeof(fd->fd_grouplock));
1472	fd->fd_flags &= ~LL_FILE_GROUP_LOCKED;
1473	spin_unlock(&lli->lli_lock);
1474
1475	cl_put_grouplock(&grouplock);
1476	CDEBUG(D_INFO, "group lock %lu released\n", arg);
1477	RETURN(0);
1478}
1479
1480/**
1481 * Close inode open handle
1482 *
1483 * \param dentry [in]     dentry which contains the inode
1484 * \param it     [in,out] intent which contains open info and result
1485 *
1486 * \retval 0     success
1487 * \retval <0    failure
1488 */
1489int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
1490{
1491	struct inode *inode = dentry->d_inode;
1492	struct obd_client_handle *och;
1493	int rc;
1494	ENTRY;
1495
1496	LASSERT(inode);
1497
1498	/* Root ? Do nothing. */
1499	if (dentry->d_inode->i_sb->s_root == dentry)
1500		RETURN(0);
1501
1502	/* No open handle to close? Move away */
1503	if (!it_disposition(it, DISP_OPEN_OPEN))
1504		RETURN(0);
1505
1506	LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
1507
1508	OBD_ALLOC(och, sizeof(*och));
1509	if (!och)
1510		GOTO(out, rc = -ENOMEM);
1511
1512	ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
1513		    ll_i2info(inode), it, och);
1514
1515	rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
1516				       inode, och);
1517 out:
1518	/* this one is in place of ll_file_open */
1519	if (it_disposition(it, DISP_ENQ_OPEN_REF)) {
1520		ptlrpc_req_finished(it->d.lustre.it_data);
1521		it_clear_disposition(it, DISP_ENQ_OPEN_REF);
1522	}
1523	RETURN(rc);
1524}
1525
1526/**
1527 * Get size for inode for which FIEMAP mapping is requested.
1528 * Make the FIEMAP get_info call and returns the result.
1529 */
1530int ll_do_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
1531	      int num_bytes)
1532{
1533	struct obd_export *exp = ll_i2dtexp(inode);
1534	struct lov_stripe_md *lsm = NULL;
1535	struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
1536	int vallen = num_bytes;
1537	int rc;
1538	ENTRY;
1539
1540	/* Checks for fiemap flags */
1541	if (fiemap->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
1542		fiemap->fm_flags &= ~LUSTRE_FIEMAP_FLAGS_COMPAT;
1543		return -EBADR;
1544	}
1545
1546	/* Check for FIEMAP_FLAG_SYNC */
1547	if (fiemap->fm_flags & FIEMAP_FLAG_SYNC) {
1548		rc = filemap_fdatawrite(inode->i_mapping);
1549		if (rc)
1550			return rc;
1551	}
1552
1553	lsm = ccc_inode_lsm_get(inode);
1554	if (lsm == NULL)
1555		return -ENOENT;
1556
1557	/* If the stripe_count > 1 and the application does not understand
1558	 * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
1559	 */
1560	if (lsm->lsm_stripe_count > 1 &&
1561	    !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
1562		GOTO(out, rc = -EOPNOTSUPP);
1563
1564	fm_key.oa.o_oi = lsm->lsm_oi;
1565	fm_key.oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
1566
1567	obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLSIZE);
1568	obdo_set_parent_fid(&fm_key.oa, &ll_i2info(inode)->lli_fid);
1569	/* If filesize is 0, then there would be no objects for mapping */
1570	if (fm_key.oa.o_size == 0) {
1571		fiemap->fm_mapped_extents = 0;
1572		GOTO(out, rc = 0);
1573	}
1574
1575	memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
1576
1577	rc = obd_get_info(NULL, exp, sizeof(fm_key), &fm_key, &vallen,
1578			  fiemap, lsm);
1579	if (rc)
1580		CERROR("obd_get_info failed: rc = %d\n", rc);
1581
1582out:
1583	ccc_inode_lsm_put(inode, lsm);
1584	RETURN(rc);
1585}
1586
1587int ll_fid2path(struct inode *inode, void *arg)
1588{
1589	struct obd_export	*exp = ll_i2mdexp(inode);
1590	struct getinfo_fid2path	*gfout, *gfin;
1591	int			 outsize, rc;
1592	ENTRY;
1593
1594	if (!cfs_capable(CFS_CAP_DAC_READ_SEARCH) &&
1595	    !(ll_i2sbi(inode)->ll_flags & LL_SBI_USER_FID2PATH))
1596		RETURN(-EPERM);
1597
1598	/* Need to get the buflen */
1599	OBD_ALLOC_PTR(gfin);
1600	if (gfin == NULL)
1601		RETURN(-ENOMEM);
1602	if (copy_from_user(gfin, arg, sizeof(*gfin))) {
1603		OBD_FREE_PTR(gfin);
1604		RETURN(-EFAULT);
1605	}
1606
1607	outsize = sizeof(*gfout) + gfin->gf_pathlen;
1608	OBD_ALLOC(gfout, outsize);
1609	if (gfout == NULL) {
1610		OBD_FREE_PTR(gfin);
1611		RETURN(-ENOMEM);
1612	}
1613	memcpy(gfout, gfin, sizeof(*gfout));
1614	OBD_FREE_PTR(gfin);
1615
1616	/* Call mdc_iocontrol */
1617	rc = obd_iocontrol(OBD_IOC_FID2PATH, exp, outsize, gfout, NULL);
1618	if (rc)
1619		GOTO(gf_free, rc);
1620
1621	if (copy_to_user(arg, gfout, outsize))
1622		rc = -EFAULT;
1623
1624gf_free:
1625	OBD_FREE(gfout, outsize);
1626	RETURN(rc);
1627}
1628
1629static int ll_ioctl_fiemap(struct inode *inode, unsigned long arg)
1630{
1631	struct ll_user_fiemap *fiemap_s;
1632	size_t num_bytes, ret_bytes;
1633	unsigned int extent_count;
1634	int rc = 0;
1635
1636	/* Get the extent count so we can calculate the size of
1637	 * required fiemap buffer */
1638	if (get_user(extent_count,
1639	    &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
1640		RETURN(-EFAULT);
1641	num_bytes = sizeof(*fiemap_s) + (extent_count *
1642					 sizeof(struct ll_fiemap_extent));
1643
1644	OBD_ALLOC_LARGE(fiemap_s, num_bytes);
1645	if (fiemap_s == NULL)
1646		RETURN(-ENOMEM);
1647
1648	/* get the fiemap value */
1649	if (copy_from_user(fiemap_s, (struct ll_user_fiemap __user *)arg,
1650			   sizeof(*fiemap_s)))
1651		GOTO(error, rc = -EFAULT);
1652
1653	/* If fm_extent_count is non-zero, read the first extent since
1654	 * it is used to calculate end_offset and device from previous
1655	 * fiemap call. */
1656	if (extent_count) {
1657		if (copy_from_user(&fiemap_s->fm_extents[0],
1658		    (char __user *)arg + sizeof(*fiemap_s),
1659		    sizeof(struct ll_fiemap_extent)))
1660			GOTO(error, rc = -EFAULT);
1661	}
1662
1663	rc = ll_do_fiemap(inode, fiemap_s, num_bytes);
1664	if (rc)
1665		GOTO(error, rc);
1666
1667	ret_bytes = sizeof(struct ll_user_fiemap);
1668
1669	if (extent_count != 0)
1670		ret_bytes += (fiemap_s->fm_mapped_extents *
1671				 sizeof(struct ll_fiemap_extent));
1672
1673	if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
1674		rc = -EFAULT;
1675
1676error:
1677	OBD_FREE_LARGE(fiemap_s, num_bytes);
1678	RETURN(rc);
1679}
1680
1681/*
1682 * Read the data_version for inode.
1683 *
1684 * This value is computed using stripe object version on OST.
1685 * Version is computed using server side locking.
1686 *
1687 * @param extent_lock  Take extent lock. Not needed if a process is already
1688 *		       holding the OST object group locks.
1689 */
1690int ll_data_version(struct inode *inode, __u64 *data_version,
1691		    int extent_lock)
1692{
1693	struct lov_stripe_md	*lsm = NULL;
1694	struct ll_sb_info	*sbi = ll_i2sbi(inode);
1695	struct obdo		*obdo = NULL;
1696	int			 rc;
1697	ENTRY;
1698
1699	/* If no stripe, we consider version is 0. */
1700	lsm = ccc_inode_lsm_get(inode);
1701	if (lsm == NULL) {
1702		*data_version = 0;
1703		CDEBUG(D_INODE, "No object for inode\n");
1704		RETURN(0);
1705	}
1706
1707	OBD_ALLOC_PTR(obdo);
1708	if (obdo == NULL) {
1709		ccc_inode_lsm_put(inode, lsm);
1710		RETURN(-ENOMEM);
1711	}
1712
1713	rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, obdo, 0, extent_lock);
1714	if (!rc) {
1715		if (!(obdo->o_valid & OBD_MD_FLDATAVERSION))
1716			rc = -EOPNOTSUPP;
1717		else
1718			*data_version = obdo->o_data_version;
1719	}
1720
1721	OBD_FREE_PTR(obdo);
1722	ccc_inode_lsm_put(inode, lsm);
1723
1724	RETURN(rc);
1725}
1726
1727struct ll_swap_stack {
1728	struct iattr		 ia1, ia2;
1729	__u64			 dv1, dv2;
1730	struct inode		*inode1, *inode2;
1731	bool			 check_dv1, check_dv2;
1732};
1733
1734static int ll_swap_layouts(struct file *file1, struct file *file2,
1735			   struct lustre_swap_layouts *lsl)
1736{
1737	struct mdc_swap_layouts	 msl;
1738	struct md_op_data	*op_data;
1739	__u32			 gid;
1740	__u64			 dv;
1741	struct ll_swap_stack	*llss = NULL;
1742	int			 rc;
1743
1744	OBD_ALLOC_PTR(llss);
1745	if (llss == NULL)
1746		RETURN(-ENOMEM);
1747
1748	llss->inode1 = file1->f_dentry->d_inode;
1749	llss->inode2 = file2->f_dentry->d_inode;
1750
1751	if (!S_ISREG(llss->inode2->i_mode))
1752		GOTO(free, rc = -EINVAL);
1753
1754	if (ll_permission(llss->inode1, MAY_WRITE, NULL) ||
1755	    ll_permission(llss->inode2, MAY_WRITE, NULL))
1756		GOTO(free, rc = -EPERM);
1757
1758	if (llss->inode2->i_sb != llss->inode1->i_sb)
1759		GOTO(free, rc = -EXDEV);
1760
1761	/* we use 2 bool because it is easier to swap than 2 bits */
1762	if (lsl->sl_flags & SWAP_LAYOUTS_CHECK_DV1)
1763		llss->check_dv1 = true;
1764
1765	if (lsl->sl_flags & SWAP_LAYOUTS_CHECK_DV2)
1766		llss->check_dv2 = true;
1767
1768	/* we cannot use lsl->sl_dvX directly because we may swap them */
1769	llss->dv1 = lsl->sl_dv1;
1770	llss->dv2 = lsl->sl_dv2;
1771
1772	rc = lu_fid_cmp(ll_inode2fid(llss->inode1), ll_inode2fid(llss->inode2));
1773	if (rc == 0) /* same file, done! */
1774		GOTO(free, rc = 0);
1775
1776	if (rc < 0) { /* sequentialize it */
1777		swap(llss->inode1, llss->inode2);
1778		swap(file1, file2);
1779		swap(llss->dv1, llss->dv2);
1780		swap(llss->check_dv1, llss->check_dv2);
1781	}
1782
1783	gid = lsl->sl_gid;
1784	if (gid != 0) { /* application asks to flush dirty cache */
1785		rc = ll_get_grouplock(llss->inode1, file1, gid);
1786		if (rc < 0)
1787			GOTO(free, rc);
1788
1789		rc = ll_get_grouplock(llss->inode2, file2, gid);
1790		if (rc < 0) {
1791			ll_put_grouplock(llss->inode1, file1, gid);
1792			GOTO(free, rc);
1793		}
1794	}
1795
1796	/* to be able to restore mtime and atime after swap
1797	 * we need to first save them */
1798	if (lsl->sl_flags &
1799	    (SWAP_LAYOUTS_KEEP_MTIME | SWAP_LAYOUTS_KEEP_ATIME)) {
1800		llss->ia1.ia_mtime = llss->inode1->i_mtime;
1801		llss->ia1.ia_atime = llss->inode1->i_atime;
1802		llss->ia1.ia_valid = ATTR_MTIME | ATTR_ATIME;
1803		llss->ia2.ia_mtime = llss->inode2->i_mtime;
1804		llss->ia2.ia_atime = llss->inode2->i_atime;
1805		llss->ia2.ia_valid = ATTR_MTIME | ATTR_ATIME;
1806	}
1807
1808	/* ultimate check, before swaping the layouts we check if
1809	 * dataversion has changed (if requested) */
1810	if (llss->check_dv1) {
1811		rc = ll_data_version(llss->inode1, &dv, 0);
1812		if (rc)
1813			GOTO(putgl, rc);
1814		if (dv != llss->dv1)
1815			GOTO(putgl, rc = -EAGAIN);
1816	}
1817
1818	if (llss->check_dv2) {
1819		rc = ll_data_version(llss->inode2, &dv, 0);
1820		if (rc)
1821			GOTO(putgl, rc);
1822		if (dv != llss->dv2)
1823			GOTO(putgl, rc = -EAGAIN);
1824	}
1825
1826	/* struct md_op_data is used to send the swap args to the mdt
1827	 * only flags is missing, so we use struct mdc_swap_layouts
1828	 * through the md_op_data->op_data */
1829	/* flags from user space have to be converted before they are send to
1830	 * server, no flag is sent today, they are only used on the client */
1831	msl.msl_flags = 0;
1832	rc = -ENOMEM;
1833	op_data = ll_prep_md_op_data(NULL, llss->inode1, llss->inode2, NULL, 0,
1834				     0, LUSTRE_OPC_ANY, &msl);
1835	if (IS_ERR(op_data))
1836		GOTO(free, rc = PTR_ERR(op_data));
1837
1838	rc = obd_iocontrol(LL_IOC_LOV_SWAP_LAYOUTS, ll_i2mdexp(llss->inode1),
1839			   sizeof(*op_data), op_data, NULL);
1840	ll_finish_md_op_data(op_data);
1841
1842putgl:
1843	if (gid != 0) {
1844		ll_put_grouplock(llss->inode2, file2, gid);
1845		ll_put_grouplock(llss->inode1, file1, gid);
1846	}
1847
1848	/* rc can be set from obd_iocontrol() or from a GOTO(putgl, ...) */
1849	if (rc != 0)
1850		GOTO(free, rc);
1851
1852	/* clear useless flags */
1853	if (!(lsl->sl_flags & SWAP_LAYOUTS_KEEP_MTIME)) {
1854		llss->ia1.ia_valid &= ~ATTR_MTIME;
1855		llss->ia2.ia_valid &= ~ATTR_MTIME;
1856	}
1857
1858	if (!(lsl->sl_flags & SWAP_LAYOUTS_KEEP_ATIME)) {
1859		llss->ia1.ia_valid &= ~ATTR_ATIME;
1860		llss->ia2.ia_valid &= ~ATTR_ATIME;
1861	}
1862
1863	/* update time if requested */
1864	rc = 0;
1865	if (llss->ia2.ia_valid != 0) {
1866		mutex_lock(&llss->inode1->i_mutex);
1867		rc = ll_setattr(file1->f_dentry, &llss->ia2);
1868		mutex_unlock(&llss->inode1->i_mutex);
1869	}
1870
1871	if (llss->ia1.ia_valid != 0) {
1872		int rc1;
1873
1874		mutex_lock(&llss->inode2->i_mutex);
1875		rc1 = ll_setattr(file2->f_dentry, &llss->ia1);
1876		mutex_unlock(&llss->inode2->i_mutex);
1877		if (rc == 0)
1878			rc = rc1;
1879	}
1880
1881free:
1882	if (llss != NULL)
1883		OBD_FREE_PTR(llss);
1884
1885	RETURN(rc);
1886}
1887
1888long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
1889{
1890	struct inode		*inode = file->f_dentry->d_inode;
1891	struct ll_file_data	*fd = LUSTRE_FPRIVATE(file);
1892	int			 flags, rc;
1893	ENTRY;
1894
1895	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
1896	       inode->i_generation, inode, cmd);
1897	ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
1898
1899	/* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
1900	if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
1901		RETURN(-ENOTTY);
1902
1903	switch(cmd) {
1904	case LL_IOC_GETFLAGS:
1905		/* Get the current value of the file flags */
1906		return put_user(fd->fd_flags, (int *)arg);
1907	case LL_IOC_SETFLAGS:
1908	case LL_IOC_CLRFLAGS:
1909		/* Set or clear specific file flags */
1910		/* XXX This probably needs checks to ensure the flags are
1911		 *     not abused, and to handle any flag side effects.
1912		 */
1913		if (get_user(flags, (int *) arg))
1914			RETURN(-EFAULT);
1915
1916		if (cmd == LL_IOC_SETFLAGS) {
1917			if ((flags & LL_FILE_IGNORE_LOCK) &&
1918			    !(file->f_flags & O_DIRECT)) {
1919				CERROR("%s: unable to disable locking on "
1920				       "non-O_DIRECT file\n", current->comm);
1921				RETURN(-EINVAL);
1922			}
1923
1924			fd->fd_flags |= flags;
1925		} else {
1926			fd->fd_flags &= ~flags;
1927		}
1928		RETURN(0);
1929	case LL_IOC_LOV_SETSTRIPE:
1930		RETURN(ll_lov_setstripe(inode, file, arg));
1931	case LL_IOC_LOV_SETEA:
1932		RETURN(ll_lov_setea(inode, file, arg));
1933	case LL_IOC_LOV_SWAP_LAYOUTS: {
1934		struct file *file2;
1935		struct lustre_swap_layouts lsl;
1936
1937		if (copy_from_user(&lsl, (char *)arg,
1938				       sizeof(struct lustre_swap_layouts)))
1939			RETURN(-EFAULT);
1940
1941		if ((file->f_flags & O_ACCMODE) == 0) /* O_RDONLY */
1942			RETURN(-EPERM);
1943
1944		file2 = fget(lsl.sl_fd);
1945		if (file2 == NULL)
1946			RETURN(-EBADF);
1947
1948		rc = -EPERM;
1949		if ((file2->f_flags & O_ACCMODE) != 0) /* O_WRONLY or O_RDWR */
1950			rc = ll_swap_layouts(file, file2, &lsl);
1951		fput(file2);
1952		RETURN(rc);
1953	}
1954	case LL_IOC_LOV_GETSTRIPE:
1955		RETURN(ll_lov_getstripe(inode, arg));
1956	case LL_IOC_RECREATE_OBJ:
1957		RETURN(ll_lov_recreate_obj(inode, arg));
1958	case LL_IOC_RECREATE_FID:
1959		RETURN(ll_lov_recreate_fid(inode, arg));
1960	case FSFILT_IOC_FIEMAP:
1961		RETURN(ll_ioctl_fiemap(inode, arg));
1962	case FSFILT_IOC_GETFLAGS:
1963	case FSFILT_IOC_SETFLAGS:
1964		RETURN(ll_iocontrol(inode, file, cmd, arg));
1965	case FSFILT_IOC_GETVERSION_OLD:
1966	case FSFILT_IOC_GETVERSION:
1967		RETURN(put_user(inode->i_generation, (int *)arg));
1968	case LL_IOC_GROUP_LOCK:
1969		RETURN(ll_get_grouplock(inode, file, arg));
1970	case LL_IOC_GROUP_UNLOCK:
1971		RETURN(ll_put_grouplock(inode, file, arg));
1972	case IOC_OBD_STATFS:
1973		RETURN(ll_obd_statfs(inode, (void *)arg));
1974
1975	/* We need to special case any other ioctls we want to handle,
1976	 * to send them to the MDS/OST as appropriate and to properly
1977	 * network encode the arg field.
1978	case FSFILT_IOC_SETVERSION_OLD:
1979	case FSFILT_IOC_SETVERSION:
1980	*/
1981	case LL_IOC_FLUSHCTX:
1982		RETURN(ll_flush_ctx(inode));
1983	case LL_IOC_PATH2FID: {
1984		if (copy_to_user((void *)arg, ll_inode2fid(inode),
1985				 sizeof(struct lu_fid)))
1986			RETURN(-EFAULT);
1987
1988		RETURN(0);
1989	}
1990	case OBD_IOC_FID2PATH:
1991		RETURN(ll_fid2path(inode, (void *)arg));
1992	case LL_IOC_DATA_VERSION: {
1993		struct ioc_data_version	idv;
1994		int			rc;
1995
1996		if (copy_from_user(&idv, (char *)arg, sizeof(idv)))
1997			RETURN(-EFAULT);
1998
1999		rc = ll_data_version(inode, &idv.idv_version,
2000				!(idv.idv_flags & LL_DV_NOFLUSH));
2001
2002		if (rc == 0 && copy_to_user((char *) arg, &idv, sizeof(idv)))
2003			RETURN(-EFAULT);
2004
2005		RETURN(rc);
2006	}
2007
2008	case LL_IOC_GET_MDTIDX: {
2009		int mdtidx;
2010
2011		mdtidx = ll_get_mdt_idx(inode);
2012		if (mdtidx < 0)
2013			RETURN(mdtidx);
2014
2015		if (put_user((int)mdtidx, (int*)arg))
2016			RETURN(-EFAULT);
2017
2018		RETURN(0);
2019	}
2020	case OBD_IOC_GETDTNAME:
2021	case OBD_IOC_GETMDNAME:
2022		RETURN(ll_get_obd_name(inode, cmd, arg));
2023	case LL_IOC_HSM_STATE_GET: {
2024		struct md_op_data	*op_data;
2025		struct hsm_user_state	*hus;
2026		int			 rc;
2027
2028		OBD_ALLOC_PTR(hus);
2029		if (hus == NULL)
2030			RETURN(-ENOMEM);
2031
2032		op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2033					     LUSTRE_OPC_ANY, hus);
2034		if (IS_ERR(op_data)) {
2035			OBD_FREE_PTR(hus);
2036			RETURN(PTR_ERR(op_data));
2037		}
2038
2039		rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2040				   op_data, NULL);
2041
2042		if (copy_to_user((void *)arg, hus, sizeof(*hus)))
2043			rc = -EFAULT;
2044
2045		ll_finish_md_op_data(op_data);
2046		OBD_FREE_PTR(hus);
2047		RETURN(rc);
2048	}
2049	case LL_IOC_HSM_STATE_SET: {
2050		struct md_op_data	*op_data;
2051		struct hsm_state_set	*hss;
2052		int			 rc;
2053
2054		OBD_ALLOC_PTR(hss);
2055		if (hss == NULL)
2056			RETURN(-ENOMEM);
2057		if (copy_from_user(hss, (char *)arg, sizeof(*hss))) {
2058			OBD_FREE_PTR(hss);
2059			RETURN(-EFAULT);
2060		}
2061
2062		/* Non-root users are forbidden to set or clear flags which are
2063		 * NOT defined in HSM_USER_MASK. */
2064		if (((hss->hss_setmask | hss->hss_clearmask) & ~HSM_USER_MASK)
2065		    && !cfs_capable(CFS_CAP_SYS_ADMIN)) {
2066			OBD_FREE_PTR(hss);
2067			RETURN(-EPERM);
2068		}
2069
2070		op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2071					     LUSTRE_OPC_ANY, hss);
2072		if (IS_ERR(op_data)) {
2073			OBD_FREE_PTR(hss);
2074			RETURN(PTR_ERR(op_data));
2075		}
2076
2077		rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2078				   op_data, NULL);
2079
2080		ll_finish_md_op_data(op_data);
2081
2082		OBD_FREE_PTR(hss);
2083		RETURN(rc);
2084	}
2085	case LL_IOC_HSM_ACTION: {
2086		struct md_op_data		*op_data;
2087		struct hsm_current_action	*hca;
2088		int				 rc;
2089
2090		OBD_ALLOC_PTR(hca);
2091		if (hca == NULL)
2092			RETURN(-ENOMEM);
2093
2094		op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2095					     LUSTRE_OPC_ANY, hca);
2096		if (IS_ERR(op_data)) {
2097			OBD_FREE_PTR(hca);
2098			RETURN(PTR_ERR(op_data));
2099		}
2100
2101		rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2102				   op_data, NULL);
2103
2104		if (copy_to_user((char *)arg, hca, sizeof(*hca)))
2105			rc = -EFAULT;
2106
2107		ll_finish_md_op_data(op_data);
2108		OBD_FREE_PTR(hca);
2109		RETURN(rc);
2110	}
2111	default: {
2112		int err;
2113
2114		if (LLIOC_STOP ==
2115		     ll_iocontrol_call(inode, file, cmd, arg, &err))
2116			RETURN(err);
2117
2118		RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2119				     (void *)arg));
2120	}
2121	}
2122}
2123
2124
2125loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2126{
2127	struct inode *inode = file->f_dentry->d_inode;
2128	loff_t retval, eof = 0;
2129
2130	ENTRY;
2131	retval = offset + ((origin == SEEK_END) ? i_size_read(inode) :
2132			   (origin == SEEK_CUR) ? file->f_pos : 0);
2133	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%llu=%#llx(%d)\n",
2134	       inode->i_ino, inode->i_generation, inode, retval, retval,
2135	       origin);
2136	ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2137
2138	if (origin == SEEK_END || origin == SEEK_HOLE || origin == SEEK_DATA) {
2139		retval = ll_glimpse_size(inode);
2140		if (retval != 0)
2141			RETURN(retval);
2142		eof = i_size_read(inode);
2143	}
2144
2145	retval = ll_generic_file_llseek_size(file, offset, origin,
2146					  ll_file_maxbytes(inode), eof);
2147	RETURN(retval);
2148}
2149
2150int ll_flush(struct file *file, fl_owner_t id)
2151{
2152	struct inode *inode = file->f_dentry->d_inode;
2153	struct ll_inode_info *lli = ll_i2info(inode);
2154	struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2155	int rc, err;
2156
2157	LASSERT(!S_ISDIR(inode->i_mode));
2158
2159	/* catch async errors that were recorded back when async writeback
2160	 * failed for pages in this mapping. */
2161	rc = lli->lli_async_rc;
2162	lli->lli_async_rc = 0;
2163	err = lov_read_and_clear_async_rc(lli->lli_clob);
2164	if (rc == 0)
2165		rc = err;
2166
2167	/* The application has been told write failure already.
2168	 * Do not report failure again. */
2169	if (fd->fd_write_failed)
2170		return 0;
2171	return rc ? -EIO : 0;
2172}
2173
2174/**
2175 * Called to make sure a portion of file has been written out.
2176 * if @local_only is not true, it will send OST_SYNC RPCs to ost.
2177 *
2178 * Return how many pages have been written.
2179 */
2180int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end,
2181		       enum cl_fsync_mode mode, int ignore_layout)
2182{
2183	struct cl_env_nest nest;
2184	struct lu_env *env;
2185	struct cl_io *io;
2186	struct obd_capa *capa = NULL;
2187	struct cl_fsync_io *fio;
2188	int result;
2189	ENTRY;
2190
2191	if (mode != CL_FSYNC_NONE && mode != CL_FSYNC_LOCAL &&
2192	    mode != CL_FSYNC_DISCARD && mode != CL_FSYNC_ALL)
2193		RETURN(-EINVAL);
2194
2195	env = cl_env_nested_get(&nest);
2196	if (IS_ERR(env))
2197		RETURN(PTR_ERR(env));
2198
2199	capa = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2200
2201	io = ccc_env_thread_io(env);
2202	io->ci_obj = cl_i2info(inode)->lli_clob;
2203	io->ci_ignore_layout = ignore_layout;
2204
2205	/* initialize parameters for sync */
2206	fio = &io->u.ci_fsync;
2207	fio->fi_capa = capa;
2208	fio->fi_start = start;
2209	fio->fi_end = end;
2210	fio->fi_fid = ll_inode2fid(inode);
2211	fio->fi_mode = mode;
2212	fio->fi_nr_written = 0;
2213
2214	if (cl_io_init(env, io, CIT_FSYNC, io->ci_obj) == 0)
2215		result = cl_io_loop(env, io);
2216	else
2217		result = io->ci_result;
2218	if (result == 0)
2219		result = fio->fi_nr_written;
2220	cl_io_fini(env, io);
2221	cl_env_nested_put(&nest, env);
2222
2223	capa_put(capa);
2224
2225	RETURN(result);
2226}
2227
2228/*
2229 * When dentry is provided (the 'else' case), *file->f_dentry may be
2230 * null and dentry must be used directly rather than pulled from
2231 * *file->f_dentry as is done otherwise.
2232 */
2233
2234int ll_fsync(struct file *file, loff_t start, loff_t end, int datasync)
2235{
2236	struct dentry *dentry = file->f_dentry;
2237	struct inode *inode = dentry->d_inode;
2238	struct ll_inode_info *lli = ll_i2info(inode);
2239	struct ptlrpc_request *req;
2240	struct obd_capa *oc;
2241	int rc, err;
2242	ENTRY;
2243
2244	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2245	       inode->i_generation, inode);
2246	ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2247
2248	rc = filemap_write_and_wait_range(inode->i_mapping, start, end);
2249	mutex_lock(&inode->i_mutex);
2250
2251	/* catch async errors that were recorded back when async writeback
2252	 * failed for pages in this mapping. */
2253	if (!S_ISDIR(inode->i_mode)) {
2254		err = lli->lli_async_rc;
2255		lli->lli_async_rc = 0;
2256		if (rc == 0)
2257			rc = err;
2258		err = lov_read_and_clear_async_rc(lli->lli_clob);
2259		if (rc == 0)
2260			rc = err;
2261	}
2262
2263	oc = ll_mdscapa_get(inode);
2264	err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2265		      &req);
2266	capa_put(oc);
2267	if (!rc)
2268		rc = err;
2269	if (!err)
2270		ptlrpc_req_finished(req);
2271
2272	if (datasync && S_ISREG(inode->i_mode)) {
2273		struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2274
2275		err = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF,
2276				CL_FSYNC_ALL, 0);
2277		if (rc == 0 && err < 0)
2278			rc = err;
2279		if (rc < 0)
2280			fd->fd_write_failed = true;
2281		else
2282			fd->fd_write_failed = false;
2283	}
2284
2285	mutex_unlock(&inode->i_mutex);
2286	RETURN(rc);
2287}
2288
2289int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2290{
2291	struct inode *inode = file->f_dentry->d_inode;
2292	struct ll_sb_info *sbi = ll_i2sbi(inode);
2293	struct ldlm_enqueue_info einfo = { .ei_type = LDLM_FLOCK,
2294					   .ei_cb_cp =ldlm_flock_completion_ast,
2295					   .ei_cbdata = file_lock };
2296	struct md_op_data *op_data;
2297	struct lustre_handle lockh = {0};
2298	ldlm_policy_data_t flock = {{0}};
2299	int flags = 0;
2300	int rc;
2301	int rc2 = 0;
2302	ENTRY;
2303
2304	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2305	       inode->i_ino, file_lock);
2306
2307	ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2308
2309	if (file_lock->fl_flags & FL_FLOCK) {
2310		LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2311		/* flocks are whole-file locks */
2312		flock.l_flock.end = OFFSET_MAX;
2313		/* For flocks owner is determined by the local file desctiptor*/
2314		flock.l_flock.owner = (unsigned long)file_lock->fl_file;
2315	} else if (file_lock->fl_flags & FL_POSIX) {
2316		flock.l_flock.owner = (unsigned long)file_lock->fl_owner;
2317		flock.l_flock.start = file_lock->fl_start;
2318		flock.l_flock.end = file_lock->fl_end;
2319	} else {
2320		RETURN(-EINVAL);
2321	}
2322	flock.l_flock.pid = file_lock->fl_pid;
2323
2324	/* Somewhat ugly workaround for svc lockd.
2325	 * lockd installs custom fl_lmops->lm_compare_owner that checks
2326	 * for the fl_owner to be the same (which it always is on local node
2327	 * I guess between lockd processes) and then compares pid.
2328	 * As such we assign pid to the owner field to make it all work,
2329	 * conflict with normal locks is unlikely since pid space and
2330	 * pointer space for current->files are not intersecting */
2331	if (file_lock->fl_lmops && file_lock->fl_lmops->lm_compare_owner)
2332		flock.l_flock.owner = (unsigned long)file_lock->fl_pid;
2333
2334	switch (file_lock->fl_type) {
2335	case F_RDLCK:
2336		einfo.ei_mode = LCK_PR;
2337		break;
2338	case F_UNLCK:
2339		/* An unlock request may or may not have any relation to
2340		 * existing locks so we may not be able to pass a lock handle
2341		 * via a normal ldlm_lock_cancel() request. The request may even
2342		 * unlock a byte range in the middle of an existing lock. In
2343		 * order to process an unlock request we need all of the same
2344		 * information that is given with a normal read or write record
2345		 * lock request. To avoid creating another ldlm unlock (cancel)
2346		 * message we'll treat a LCK_NL flock request as an unlock. */
2347		einfo.ei_mode = LCK_NL;
2348		break;
2349	case F_WRLCK:
2350		einfo.ei_mode = LCK_PW;
2351		break;
2352	default:
2353		CDEBUG(D_INFO, "Unknown fcntl lock type: %d\n",
2354			file_lock->fl_type);
2355		RETURN (-ENOTSUPP);
2356	}
2357
2358	switch (cmd) {
2359	case F_SETLKW:
2360#ifdef F_SETLKW64
2361	case F_SETLKW64:
2362#endif
2363		flags = 0;
2364		break;
2365	case F_SETLK:
2366#ifdef F_SETLK64
2367	case F_SETLK64:
2368#endif
2369		flags = LDLM_FL_BLOCK_NOWAIT;
2370		break;
2371	case F_GETLK:
2372#ifdef F_GETLK64
2373	case F_GETLK64:
2374#endif
2375		flags = LDLM_FL_TEST_LOCK;
2376		/* Save the old mode so that if the mode in the lock changes we
2377		 * can decrement the appropriate reader or writer refcount. */
2378		file_lock->fl_type = einfo.ei_mode;
2379		break;
2380	default:
2381		CERROR("unknown fcntl lock command: %d\n", cmd);
2382		RETURN (-EINVAL);
2383	}
2384
2385	op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2386				     LUSTRE_OPC_ANY, NULL);
2387	if (IS_ERR(op_data))
2388		RETURN(PTR_ERR(op_data));
2389
2390	CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2391	       "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2392	       flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2393
2394	rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2395			op_data, &lockh, &flock, 0, NULL /* req */, flags);
2396
2397	if ((file_lock->fl_flags & FL_FLOCK) &&
2398	    (rc == 0 || file_lock->fl_type == F_UNLCK))
2399		rc2  = flock_lock_file_wait(file, file_lock);
2400	if ((file_lock->fl_flags & FL_POSIX) &&
2401	    (rc == 0 || file_lock->fl_type == F_UNLCK) &&
2402	    !(flags & LDLM_FL_TEST_LOCK))
2403		rc2  = posix_lock_file_wait(file, file_lock);
2404
2405	if (rc2 && file_lock->fl_type != F_UNLCK) {
2406		einfo.ei_mode = LCK_NL;
2407		md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2408			op_data, &lockh, &flock, 0, NULL /* req */, flags);
2409		rc = rc2;
2410	}
2411
2412	ll_finish_md_op_data(op_data);
2413
2414	RETURN(rc);
2415}
2416
2417int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2418{
2419	ENTRY;
2420
2421	RETURN(-ENOSYS);
2422}
2423
2424/**
2425 * test if some locks matching bits and l_req_mode are acquired
2426 * - bits can be in different locks
2427 * - if found clear the common lock bits in *bits
2428 * - the bits not found, are kept in *bits
2429 * \param inode [IN]
2430 * \param bits [IN] searched lock bits [IN]
2431 * \param l_req_mode [IN] searched lock mode
2432 * \retval boolean, true iff all bits are found
2433 */
2434int ll_have_md_lock(struct inode *inode, __u64 *bits,  ldlm_mode_t l_req_mode)
2435{
2436	struct lustre_handle lockh;
2437	ldlm_policy_data_t policy;
2438	ldlm_mode_t mode = (l_req_mode == LCK_MINMODE) ?
2439				(LCK_CR|LCK_CW|LCK_PR|LCK_PW) : l_req_mode;
2440	struct lu_fid *fid;
2441	__u64 flags;
2442	int i;
2443	ENTRY;
2444
2445	if (!inode)
2446	       RETURN(0);
2447
2448	fid = &ll_i2info(inode)->lli_fid;
2449	CDEBUG(D_INFO, "trying to match res "DFID" mode %s\n", PFID(fid),
2450	       ldlm_lockname[mode]);
2451
2452	flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2453	for (i = 0; i <= MDS_INODELOCK_MAXSHIFT && *bits != 0; i++) {
2454		policy.l_inodebits.bits = *bits & (1 << i);
2455		if (policy.l_inodebits.bits == 0)
2456			continue;
2457
2458		if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS,
2459				  &policy, mode, &lockh)) {
2460			struct ldlm_lock *lock;
2461
2462			lock = ldlm_handle2lock(&lockh);
2463			if (lock) {
2464				*bits &=
2465				      ~(lock->l_policy_data.l_inodebits.bits);
2466				LDLM_LOCK_PUT(lock);
2467			} else {
2468				*bits &= ~policy.l_inodebits.bits;
2469			}
2470		}
2471	}
2472	RETURN(*bits == 0);
2473}
2474
2475ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2476			    struct lustre_handle *lockh, __u64 flags)
2477{
2478	ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2479	struct lu_fid *fid;
2480	ldlm_mode_t rc;
2481	ENTRY;
2482
2483	fid = &ll_i2info(inode)->lli_fid;
2484	CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2485
2486	rc = md_lock_match(ll_i2mdexp(inode), LDLM_FL_BLOCK_GRANTED|flags,
2487			   fid, LDLM_IBITS, &policy,
2488			   LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2489	RETURN(rc);
2490}
2491
2492static int ll_inode_revalidate_fini(struct inode *inode, int rc)
2493{
2494	/* Already unlinked. Just update nlink and return success */
2495	if (rc == -ENOENT) {
2496		clear_nlink(inode);
2497		/* This path cannot be hit for regular files unless in
2498		 * case of obscure races, so no need to to validate
2499		 * size. */
2500		if (!S_ISREG(inode->i_mode) && !S_ISDIR(inode->i_mode))
2501			return 0;
2502	} else if (rc != 0) {
2503		CERROR("%s: revalidate FID "DFID" error: rc = %d\n",
2504		       ll_get_fsname(inode->i_sb, NULL, 0),
2505		       PFID(ll_inode2fid(inode)), rc);
2506	}
2507
2508	return rc;
2509}
2510
2511int __ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2512			     __u64 ibits)
2513{
2514	struct inode *inode = dentry->d_inode;
2515	struct ptlrpc_request *req = NULL;
2516	struct obd_export *exp;
2517	int rc = 0;
2518	ENTRY;
2519
2520	LASSERT(inode != NULL);
2521
2522	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2523	       inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2524
2525	exp = ll_i2mdexp(inode);
2526
2527	/* XXX: Enable OBD_CONNECT_ATTRFID to reduce unnecessary getattr RPC.
2528	 *      But under CMD case, it caused some lock issues, should be fixed
2529	 *      with new CMD ibits lock. See bug 12718 */
2530	if (exp_connect_flags(exp) & OBD_CONNECT_ATTRFID) {
2531		struct lookup_intent oit = { .it_op = IT_GETATTR };
2532		struct md_op_data *op_data;
2533
2534		if (ibits == MDS_INODELOCK_LOOKUP)
2535			oit.it_op = IT_LOOKUP;
2536
2537		/* Call getattr by fid, so do not provide name at all. */
2538		op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2539					     dentry->d_inode, NULL, 0, 0,
2540					     LUSTRE_OPC_ANY, NULL);
2541		if (IS_ERR(op_data))
2542			RETURN(PTR_ERR(op_data));
2543
2544		oit.it_create_mode |= M_CHECK_STALE;
2545		rc = md_intent_lock(exp, op_data, NULL, 0,
2546				    /* we are not interested in name
2547				       based lookup */
2548				    &oit, 0, &req,
2549				    ll_md_blocking_ast, 0);
2550		ll_finish_md_op_data(op_data);
2551		oit.it_create_mode &= ~M_CHECK_STALE;
2552		if (rc < 0) {
2553			rc = ll_inode_revalidate_fini(inode, rc);
2554			GOTO (out, rc);
2555		}
2556
2557		rc = ll_revalidate_it_finish(req, &oit, dentry);
2558		if (rc != 0) {
2559			ll_intent_release(&oit);
2560			GOTO(out, rc);
2561		}
2562
2563		/* Unlinked? Unhash dentry, so it is not picked up later by
2564		   do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2565		   here to preserve get_cwd functionality on 2.6.
2566		   Bug 10503 */
2567		if (!dentry->d_inode->i_nlink)
2568			d_lustre_invalidate(dentry, 0);
2569
2570		ll_lookup_finish_locks(&oit, dentry);
2571	} else if (!ll_have_md_lock(dentry->d_inode, &ibits, LCK_MINMODE)) {
2572		struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2573		obd_valid valid = OBD_MD_FLGETATTR;
2574		struct md_op_data *op_data;
2575		int ealen = 0;
2576
2577		if (S_ISREG(inode->i_mode)) {
2578			rc = ll_get_max_mdsize(sbi, &ealen);
2579			if (rc)
2580				RETURN(rc);
2581			valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2582		}
2583
2584		op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL,
2585					     0, ealen, LUSTRE_OPC_ANY,
2586					     NULL);
2587		if (IS_ERR(op_data))
2588			RETURN(PTR_ERR(op_data));
2589
2590		op_data->op_valid = valid;
2591		/* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2592		 * capa for this inode. Because we only keep capas of dirs
2593		 * fresh. */
2594		rc = md_getattr(sbi->ll_md_exp, op_data, &req);
2595		ll_finish_md_op_data(op_data);
2596		if (rc) {
2597			rc = ll_inode_revalidate_fini(inode, rc);
2598			RETURN(rc);
2599		}
2600
2601		rc = ll_prep_inode(&inode, req, NULL, NULL);
2602	}
2603out:
2604	ptlrpc_req_finished(req);
2605	return rc;
2606}
2607
2608int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2609			   __u64 ibits)
2610{
2611	struct inode *inode = dentry->d_inode;
2612	int rc;
2613	ENTRY;
2614
2615	rc = __ll_inode_revalidate_it(dentry, it, ibits);
2616	if (rc != 0)
2617		RETURN(rc);
2618
2619	/* if object isn't regular file, don't validate size */
2620	if (!S_ISREG(inode->i_mode)) {
2621		LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime;
2622		LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
2623		LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
2624	} else {
2625		rc = ll_glimpse_size(inode);
2626	}
2627	RETURN(rc);
2628}
2629
2630int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2631		  struct lookup_intent *it, struct kstat *stat)
2632{
2633	struct inode *inode = de->d_inode;
2634	struct ll_sb_info *sbi = ll_i2sbi(inode);
2635	struct ll_inode_info *lli = ll_i2info(inode);
2636	int res = 0;
2637
2638	res = ll_inode_revalidate_it(de, it, MDS_INODELOCK_UPDATE |
2639					     MDS_INODELOCK_LOOKUP);
2640	ll_stats_ops_tally(sbi, LPROC_LL_GETATTR, 1);
2641
2642	if (res)
2643		return res;
2644
2645	stat->dev = inode->i_sb->s_dev;
2646	if (ll_need_32bit_api(sbi))
2647		stat->ino = cl_fid_build_ino(&lli->lli_fid, 1);
2648	else
2649		stat->ino = inode->i_ino;
2650	stat->mode = inode->i_mode;
2651	stat->nlink = inode->i_nlink;
2652	stat->uid = inode->i_uid;
2653	stat->gid = inode->i_gid;
2654	stat->rdev = inode->i_rdev;
2655	stat->atime = inode->i_atime;
2656	stat->mtime = inode->i_mtime;
2657	stat->ctime = inode->i_ctime;
2658	stat->blksize = 1 << inode->i_blkbits;
2659
2660	stat->size = i_size_read(inode);
2661	stat->blocks = inode->i_blocks;
2662
2663	return 0;
2664}
2665int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2666{
2667	struct lookup_intent it = { .it_op = IT_GETATTR };
2668
2669	return ll_getattr_it(mnt, de, &it, stat);
2670}
2671
2672
2673struct posix_acl * ll_get_acl(struct inode *inode, int type)
2674{
2675	struct ll_inode_info *lli = ll_i2info(inode);
2676	struct posix_acl *acl = NULL;
2677	ENTRY;
2678
2679	spin_lock(&lli->lli_lock);
2680	/* VFS' acl_permission_check->check_acl will release the refcount */
2681	acl = posix_acl_dup(lli->lli_posix_acl);
2682	spin_unlock(&lli->lli_lock);
2683
2684	RETURN(acl);
2685}
2686
2687
2688int ll_inode_permission(struct inode *inode, int mask)
2689{
2690	int rc = 0;
2691	ENTRY;
2692
2693#ifdef MAY_NOT_BLOCK
2694	if (mask & MAY_NOT_BLOCK)
2695		return -ECHILD;
2696#endif
2697
2698       /* as root inode are NOT getting validated in lookup operation,
2699	* need to do it before permission check. */
2700
2701	if (inode == inode->i_sb->s_root->d_inode) {
2702		struct lookup_intent it = { .it_op = IT_LOOKUP };
2703
2704		rc = __ll_inode_revalidate_it(inode->i_sb->s_root, &it,
2705					      MDS_INODELOCK_LOOKUP);
2706		if (rc)
2707			RETURN(rc);
2708	}
2709
2710	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), inode mode %x mask %o\n",
2711	       inode->i_ino, inode->i_generation, inode, inode->i_mode, mask);
2712
2713	if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2714		return lustre_check_remote_perm(inode, mask);
2715
2716	ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2717	rc = ll_generic_permission(inode, mask, flags, ll_check_acl);
2718
2719	RETURN(rc);
2720}
2721
2722#define READ_METHOD aio_read
2723#define READ_FUNCTION ll_file_aio_read
2724#define WRITE_METHOD aio_write
2725#define WRITE_FUNCTION ll_file_aio_write
2726
2727/* -o localflock - only provides locally consistent flock locks */
2728struct file_operations ll_file_operations = {
2729	.read	   = ll_file_read,
2730	.READ_METHOD    = READ_FUNCTION,
2731	.write	  = ll_file_write,
2732	.WRITE_METHOD   = WRITE_FUNCTION,
2733	.unlocked_ioctl = ll_file_ioctl,
2734	.open	   = ll_file_open,
2735	.release	= ll_file_release,
2736	.mmap	   = ll_file_mmap,
2737	.llseek	 = ll_file_seek,
2738	.splice_read    = ll_file_splice_read,
2739	.fsync	  = ll_fsync,
2740	.flush	  = ll_flush
2741};
2742
2743struct file_operations ll_file_operations_flock = {
2744	.read	   = ll_file_read,
2745	.READ_METHOD    = READ_FUNCTION,
2746	.write	  = ll_file_write,
2747	.WRITE_METHOD   = WRITE_FUNCTION,
2748	.unlocked_ioctl = ll_file_ioctl,
2749	.open	   = ll_file_open,
2750	.release	= ll_file_release,
2751	.mmap	   = ll_file_mmap,
2752	.llseek	 = ll_file_seek,
2753	.splice_read    = ll_file_splice_read,
2754	.fsync	  = ll_fsync,
2755	.flush	  = ll_flush,
2756	.flock	  = ll_file_flock,
2757	.lock	   = ll_file_flock
2758};
2759
2760/* These are for -o noflock - to return ENOSYS on flock calls */
2761struct file_operations ll_file_operations_noflock = {
2762	.read	   = ll_file_read,
2763	.READ_METHOD    = READ_FUNCTION,
2764	.write	  = ll_file_write,
2765	.WRITE_METHOD   = WRITE_FUNCTION,
2766	.unlocked_ioctl = ll_file_ioctl,
2767	.open	   = ll_file_open,
2768	.release	= ll_file_release,
2769	.mmap	   = ll_file_mmap,
2770	.llseek	 = ll_file_seek,
2771	.splice_read    = ll_file_splice_read,
2772	.fsync	  = ll_fsync,
2773	.flush	  = ll_flush,
2774	.flock	  = ll_file_noflock,
2775	.lock	   = ll_file_noflock
2776};
2777
2778struct inode_operations ll_file_inode_operations = {
2779	.setattr	= ll_setattr,
2780	.getattr	= ll_getattr,
2781	.permission	= ll_inode_permission,
2782	.setxattr	= ll_setxattr,
2783	.getxattr	= ll_getxattr,
2784	.listxattr	= ll_listxattr,
2785	.removexattr	= ll_removexattr,
2786	.get_acl	= ll_get_acl,
2787};
2788
2789/* dynamic ioctl number support routins */
2790static struct llioc_ctl_data {
2791	struct rw_semaphore	ioc_sem;
2792	struct list_head	      ioc_head;
2793} llioc = {
2794	__RWSEM_INITIALIZER(llioc.ioc_sem),
2795	LIST_HEAD_INIT(llioc.ioc_head)
2796};
2797
2798
2799struct llioc_data {
2800	struct list_head	      iocd_list;
2801	unsigned int	    iocd_size;
2802	llioc_callback_t	iocd_cb;
2803	unsigned int	    iocd_count;
2804	unsigned int	    iocd_cmd[0];
2805};
2806
2807void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
2808{
2809	unsigned int size;
2810	struct llioc_data *in_data = NULL;
2811	ENTRY;
2812
2813	if (cb == NULL || cmd == NULL ||
2814	    count > LLIOC_MAX_CMD || count < 0)
2815		RETURN(NULL);
2816
2817	size = sizeof(*in_data) + count * sizeof(unsigned int);
2818	OBD_ALLOC(in_data, size);
2819	if (in_data == NULL)
2820		RETURN(NULL);
2821
2822	memset(in_data, 0, sizeof(*in_data));
2823	in_data->iocd_size = size;
2824	in_data->iocd_cb = cb;
2825	in_data->iocd_count = count;
2826	memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
2827
2828	down_write(&llioc.ioc_sem);
2829	list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
2830	up_write(&llioc.ioc_sem);
2831
2832	RETURN(in_data);
2833}
2834
2835void ll_iocontrol_unregister(void *magic)
2836{
2837	struct llioc_data *tmp;
2838
2839	if (magic == NULL)
2840		return;
2841
2842	down_write(&llioc.ioc_sem);
2843	list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
2844		if (tmp == magic) {
2845			unsigned int size = tmp->iocd_size;
2846
2847			list_del(&tmp->iocd_list);
2848			up_write(&llioc.ioc_sem);
2849
2850			OBD_FREE(tmp, size);
2851			return;
2852		}
2853	}
2854	up_write(&llioc.ioc_sem);
2855
2856	CWARN("didn't find iocontrol register block with magic: %p\n", magic);
2857}
2858
2859EXPORT_SYMBOL(ll_iocontrol_register);
2860EXPORT_SYMBOL(ll_iocontrol_unregister);
2861
2862enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file,
2863			unsigned int cmd, unsigned long arg, int *rcp)
2864{
2865	enum llioc_iter ret = LLIOC_CONT;
2866	struct llioc_data *data;
2867	int rc = -EINVAL, i;
2868
2869	down_read(&llioc.ioc_sem);
2870	list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
2871		for (i = 0; i < data->iocd_count; i++) {
2872			if (cmd != data->iocd_cmd[i])
2873				continue;
2874
2875			ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
2876			break;
2877		}
2878
2879		if (ret == LLIOC_STOP)
2880			break;
2881	}
2882	up_read(&llioc.ioc_sem);
2883
2884	if (rcp)
2885		*rcp = rc;
2886	return ret;
2887}
2888
2889int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf)
2890{
2891	struct ll_inode_info *lli = ll_i2info(inode);
2892	struct cl_env_nest nest;
2893	struct lu_env *env;
2894	int result;
2895	ENTRY;
2896
2897	if (lli->lli_clob == NULL)
2898		RETURN(0);
2899
2900	env = cl_env_nested_get(&nest);
2901	if (IS_ERR(env))
2902		RETURN(PTR_ERR(env));
2903
2904	result = cl_conf_set(env, lli->lli_clob, conf);
2905	cl_env_nested_put(&nest, env);
2906
2907	if (conf->coc_opc == OBJECT_CONF_SET) {
2908		struct ldlm_lock *lock = conf->coc_lock;
2909
2910		LASSERT(lock != NULL);
2911		LASSERT(ldlm_has_layout(lock));
2912		if (result == 0) {
2913			/* it can only be allowed to match after layout is
2914			 * applied to inode otherwise false layout would be
2915			 * seen. Applying layout shoud happen before dropping
2916			 * the intent lock. */
2917			ldlm_lock_allow_match(lock);
2918		}
2919	}
2920	RETURN(result);
2921}
2922
2923/* Fetch layout from MDT with getxattr request, if it's not ready yet */
2924static int ll_layout_fetch(struct inode *inode, struct ldlm_lock *lock)
2925
2926{
2927	struct ll_sb_info *sbi = ll_i2sbi(inode);
2928	struct obd_capa *oc;
2929	struct ptlrpc_request *req;
2930	struct mdt_body *body;
2931	void *lvbdata;
2932	void *lmm;
2933	int lmmsize;
2934	int rc;
2935	ENTRY;
2936
2937	if (lock->l_lvb_data != NULL)
2938		RETURN(0);
2939
2940	/* if layout lock was granted right away, the layout is returned
2941	 * within DLM_LVB of dlm reply; otherwise if the lock was ever
2942	 * blocked and then granted via completion ast, we have to fetch
2943	 * layout here. Please note that we can't use the LVB buffer in
2944	 * completion AST because it doesn't have a large enough buffer */
2945	oc = ll_mdscapa_get(inode);
2946	rc = ll_get_max_mdsize(sbi, &lmmsize);
2947	if (rc == 0)
2948		rc = md_getxattr(sbi->ll_md_exp, ll_inode2fid(inode), oc,
2949				OBD_MD_FLXATTR, XATTR_NAME_LOV, NULL, 0,
2950				lmmsize, 0, &req);
2951	capa_put(oc);
2952	if (rc < 0)
2953		RETURN(rc);
2954
2955	body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
2956	if (body == NULL || body->eadatasize > lmmsize)
2957		GOTO(out, rc = -EPROTO);
2958
2959	lmmsize = body->eadatasize;
2960	if (lmmsize == 0) /* empty layout */
2961		GOTO(out, rc = 0);
2962
2963	lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_EADATA, lmmsize);
2964	if (lmm == NULL)
2965		GOTO(out, rc = -EFAULT);
2966
2967	OBD_ALLOC_LARGE(lvbdata, lmmsize);
2968	if (lvbdata == NULL)
2969		GOTO(out, rc = -ENOMEM);
2970
2971	memcpy(lvbdata, lmm, lmmsize);
2972	lock_res_and_lock(lock);
2973	if (lock->l_lvb_data == NULL) {
2974		lock->l_lvb_data = lvbdata;
2975		lock->l_lvb_len = lmmsize;
2976		lvbdata = NULL;
2977	}
2978	unlock_res_and_lock(lock);
2979
2980	if (lvbdata != NULL)
2981		OBD_FREE_LARGE(lvbdata, lmmsize);
2982	EXIT;
2983
2984out:
2985	ptlrpc_req_finished(req);
2986	return rc;
2987}
2988
2989/**
2990 * Apply the layout to the inode. Layout lock is held and will be released
2991 * in this function.
2992 */
2993static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode,
2994				struct inode *inode, __u32 *gen, bool reconf)
2995{
2996	struct ll_inode_info *lli = ll_i2info(inode);
2997	struct ll_sb_info    *sbi = ll_i2sbi(inode);
2998	struct ldlm_lock *lock;
2999	struct lustre_md md = { NULL };
3000	struct cl_object_conf conf;
3001	int rc = 0;
3002	bool lvb_ready;
3003	bool wait_layout = false;
3004	ENTRY;
3005
3006	LASSERT(lustre_handle_is_used(lockh));
3007
3008	lock = ldlm_handle2lock(lockh);
3009	LASSERT(lock != NULL);
3010	LASSERT(ldlm_has_layout(lock));
3011
3012	LDLM_DEBUG(lock, "File %p/"DFID" being reconfigured: %d.\n",
3013		inode, PFID(&lli->lli_fid), reconf);
3014
3015	/* in case this is a caching lock and reinstate with new inode */
3016	md_set_lock_data(sbi->ll_md_exp, &lockh->cookie, inode, NULL);
3017
3018	lock_res_and_lock(lock);
3019	lvb_ready = !!(lock->l_flags & LDLM_FL_LVB_READY);
3020	unlock_res_and_lock(lock);
3021	/* checking lvb_ready is racy but this is okay. The worst case is
3022	 * that multi processes may configure the file on the same time. */
3023	if (lvb_ready || !reconf) {
3024		rc = -ENODATA;
3025		if (lvb_ready) {
3026			/* layout_gen must be valid if layout lock is not
3027			 * cancelled and stripe has already set */
3028			*gen = lli->lli_layout_gen;
3029			rc = 0;
3030		}
3031		GOTO(out, rc);
3032	}
3033
3034	rc = ll_layout_fetch(inode, lock);
3035	if (rc < 0)
3036		GOTO(out, rc);
3037
3038	/* for layout lock, lmm is returned in lock's lvb.
3039	 * lvb_data is immutable if the lock is held so it's safe to access it
3040	 * without res lock. See the description in ldlm_lock_decref_internal()
3041	 * for the condition to free lvb_data of layout lock */
3042	if (lock->l_lvb_data != NULL) {
3043		rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
3044				  lock->l_lvb_data, lock->l_lvb_len);
3045		if (rc >= 0) {
3046			*gen = LL_LAYOUT_GEN_EMPTY;
3047			if (md.lsm != NULL)
3048				*gen = md.lsm->lsm_layout_gen;
3049			rc = 0;
3050		} else {
3051			CERROR("%s: file "DFID" unpackmd error: %d\n",
3052				ll_get_fsname(inode->i_sb, NULL, 0),
3053				PFID(&lli->lli_fid), rc);
3054		}
3055	}
3056	if (rc < 0)
3057		GOTO(out, rc);
3058
3059	/* set layout to file. Unlikely this will fail as old layout was
3060	 * surely eliminated */
3061	memset(&conf, 0, sizeof conf);
3062	conf.coc_opc = OBJECT_CONF_SET;
3063	conf.coc_inode = inode;
3064	conf.coc_lock = lock;
3065	conf.u.coc_md = &md;
3066	rc = ll_layout_conf(inode, &conf);
3067
3068	if (md.lsm != NULL)
3069		obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
3070
3071	/* refresh layout failed, need to wait */
3072	wait_layout = rc == -EBUSY;
3073	EXIT;
3074
3075out:
3076	LDLM_LOCK_PUT(lock);
3077	ldlm_lock_decref(lockh, mode);
3078
3079	/* wait for IO to complete if it's still being used. */
3080	if (wait_layout) {
3081		CDEBUG(D_INODE, "%s: %p/"DFID" wait for layout reconf.\n",
3082			ll_get_fsname(inode->i_sb, NULL, 0),
3083			inode, PFID(&lli->lli_fid));
3084
3085		memset(&conf, 0, sizeof conf);
3086		conf.coc_opc = OBJECT_CONF_WAIT;
3087		conf.coc_inode = inode;
3088		rc = ll_layout_conf(inode, &conf);
3089		if (rc == 0)
3090			rc = -EAGAIN;
3091
3092		CDEBUG(D_INODE, "file: "DFID" waiting layout return: %d.\n",
3093			PFID(&lli->lli_fid), rc);
3094	}
3095	RETURN(rc);
3096}
3097
3098/**
3099 * This function checks if there exists a LAYOUT lock on the client side,
3100 * or enqueues it if it doesn't have one in cache.
3101 *
3102 * This function will not hold layout lock so it may be revoked any time after
3103 * this function returns. Any operations depend on layout should be redone
3104 * in that case.
3105 *
3106 * This function should be called before lov_io_init() to get an uptodate
3107 * layout version, the caller should save the version number and after IO
3108 * is finished, this function should be called again to verify that layout
3109 * is not changed during IO time.
3110 */
3111int ll_layout_refresh(struct inode *inode, __u32 *gen)
3112{
3113	struct ll_inode_info  *lli = ll_i2info(inode);
3114	struct ll_sb_info     *sbi = ll_i2sbi(inode);
3115	struct md_op_data     *op_data;
3116	struct lookup_intent   it;
3117	struct lustre_handle   lockh;
3118	ldlm_mode_t	       mode;
3119	struct ldlm_enqueue_info einfo = { .ei_type = LDLM_IBITS,
3120					   .ei_mode = LCK_CR,
3121					   .ei_cb_bl = ll_md_blocking_ast,
3122					   .ei_cb_cp = ldlm_completion_ast,
3123					   .ei_cbdata = NULL };
3124	int rc;
3125	ENTRY;
3126
3127	*gen = lli->lli_layout_gen;
3128	if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK))
3129		RETURN(0);
3130
3131	/* sanity checks */
3132	LASSERT(fid_is_sane(ll_inode2fid(inode)));
3133	LASSERT(S_ISREG(inode->i_mode));
3134
3135	/* mostly layout lock is caching on the local side, so try to match
3136	 * it before grabbing layout lock mutex. */
3137	mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0);
3138	if (mode != 0) { /* hit cached lock */
3139		rc = ll_layout_lock_set(&lockh, mode, inode, gen, false);
3140		if (rc == 0)
3141			RETURN(0);
3142
3143		/* better hold lli_layout_mutex to try again otherwise
3144		 * it will have starvation problem. */
3145	}
3146
3147	/* take layout lock mutex to enqueue layout lock exclusively. */
3148	mutex_lock(&lli->lli_layout_mutex);
3149
3150again:
3151	/* try again. Maybe somebody else has done this. */
3152	mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0);
3153	if (mode != 0) { /* hit cached lock */
3154		rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
3155		if (rc == -EAGAIN)
3156			goto again;
3157
3158		mutex_unlock(&lli->lli_layout_mutex);
3159		RETURN(rc);
3160	}
3161
3162	op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
3163			0, 0, LUSTRE_OPC_ANY, NULL);
3164	if (IS_ERR(op_data)) {
3165		mutex_unlock(&lli->lli_layout_mutex);
3166		RETURN(PTR_ERR(op_data));
3167	}
3168
3169	/* have to enqueue one */
3170	memset(&it, 0, sizeof(it));
3171	it.it_op = IT_LAYOUT;
3172	lockh.cookie = 0ULL;
3173
3174	LDLM_DEBUG_NOLOCK("%s: requeue layout lock for file %p/"DFID".\n",
3175			ll_get_fsname(inode->i_sb, NULL, 0), inode,
3176			PFID(&lli->lli_fid));
3177
3178	rc = md_enqueue(sbi->ll_md_exp, &einfo, &it, op_data, &lockh,
3179			NULL, 0, NULL, 0);
3180	if (it.d.lustre.it_data != NULL)
3181		ptlrpc_req_finished(it.d.lustre.it_data);
3182	it.d.lustre.it_data = NULL;
3183
3184	ll_finish_md_op_data(op_data);
3185
3186	mode = it.d.lustre.it_lock_mode;
3187	it.d.lustre.it_lock_mode = 0;
3188	ll_intent_drop_lock(&it);
3189
3190	if (rc == 0) {
3191		/* set lock data in case this is a new lock */
3192		ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
3193		rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
3194		if (rc == -EAGAIN)
3195			goto again;
3196	}
3197	mutex_unlock(&lli->lli_layout_mutex);
3198
3199	RETURN(rc);
3200}
3201