[go: nahoru, domu]

file.c revision 1253b2e850850a66a71a512d2f830d4e0205ac72
1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 *
36 * lustre/llite/file.c
37 *
38 * Author: Peter Braam <braam@clusterfs.com>
39 * Author: Phil Schwan <phil@clusterfs.com>
40 * Author: Andreas Dilger <adilger@clusterfs.com>
41 */
42
43#define DEBUG_SUBSYSTEM S_LLITE
44#include <lustre_dlm.h>
45#include <lustre_lite.h>
46#include <linux/pagemap.h>
47#include <linux/file.h>
48#include "llite_internal.h"
49#include <lustre/ll_fiemap.h>
50
51#include "cl_object.h"
52
53struct ll_file_data *ll_file_data_get(void)
54{
55	struct ll_file_data *fd;
56
57	OBD_SLAB_ALLOC_PTR_GFP(fd, ll_file_data_slab, __GFP_IO);
58	fd->fd_write_failed = false;
59	return fd;
60}
61
62static void ll_file_data_put(struct ll_file_data *fd)
63{
64	if (fd != NULL)
65		OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
66}
67
68void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
69			  struct lustre_handle *fh)
70{
71	op_data->op_fid1 = ll_i2info(inode)->lli_fid;
72	op_data->op_attr.ia_mode = inode->i_mode;
73	op_data->op_attr.ia_atime = inode->i_atime;
74	op_data->op_attr.ia_mtime = inode->i_mtime;
75	op_data->op_attr.ia_ctime = inode->i_ctime;
76	op_data->op_attr.ia_size = i_size_read(inode);
77	op_data->op_attr_blocks = inode->i_blocks;
78	((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags =
79					ll_inode_to_ext_flags(inode->i_flags);
80	op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
81	if (fh)
82		op_data->op_handle = *fh;
83	op_data->op_capa1 = ll_mdscapa_get(inode);
84
85	if (LLIF_DATA_MODIFIED & ll_i2info(inode)->lli_flags)
86		op_data->op_bias |= MDS_DATA_MODIFIED;
87}
88
89/**
90 * Closes the IO epoch and packs all the attributes into @op_data for
91 * the CLOSE rpc.
92 */
93static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
94			     struct obd_client_handle *och)
95{
96	ENTRY;
97
98	op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME | ATTR_ATIME_SET |
99					ATTR_MTIME | ATTR_MTIME_SET |
100					ATTR_CTIME | ATTR_CTIME_SET;
101
102	if (!(och->och_flags & FMODE_WRITE))
103		goto out;
104
105	if (!exp_connect_som(ll_i2mdexp(inode)) || !S_ISREG(inode->i_mode))
106		op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
107	else
108		ll_ioepoch_close(inode, op_data, &och, 0);
109
110out:
111	ll_pack_inode2opdata(inode, op_data, &och->och_fh);
112	ll_prep_md_op_data(op_data, inode, NULL, NULL,
113			   0, 0, LUSTRE_OPC_ANY, NULL);
114	EXIT;
115}
116
117static int ll_close_inode_openhandle(struct obd_export *md_exp,
118				     struct inode *inode,
119				     struct obd_client_handle *och)
120{
121	struct obd_export *exp = ll_i2mdexp(inode);
122	struct md_op_data *op_data;
123	struct ptlrpc_request *req = NULL;
124	struct obd_device *obd = class_exp2obd(exp);
125	int epoch_close = 1;
126	int rc;
127	ENTRY;
128
129	if (obd == NULL) {
130		/*
131		 * XXX: in case of LMV, is this correct to access
132		 * ->exp_handle?
133		 */
134		CERROR("Invalid MDC connection handle "LPX64"\n",
135		       ll_i2mdexp(inode)->exp_handle.h_cookie);
136		GOTO(out, rc = 0);
137	}
138
139	OBD_ALLOC_PTR(op_data);
140	if (op_data == NULL)
141		GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
142
143	ll_prepare_close(inode, op_data, och);
144	epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
145	rc = md_close(md_exp, op_data, och->och_mod, &req);
146	if (rc == -EAGAIN) {
147		/* This close must have the epoch closed. */
148		LASSERT(epoch_close);
149		/* MDS has instructed us to obtain Size-on-MDS attribute from
150		 * OSTs and send setattr to back to MDS. */
151		rc = ll_som_update(inode, op_data);
152		if (rc) {
153			CERROR("inode %lu mdc Size-on-MDS update failed: "
154			       "rc = %d\n", inode->i_ino, rc);
155			rc = 0;
156		}
157	} else if (rc) {
158		CERROR("inode %lu mdc close failed: rc = %d\n",
159		       inode->i_ino, rc);
160	}
161
162	/* DATA_MODIFIED flag was successfully sent on close, cancel data
163	 * modification flag. */
164	if (rc == 0 && (op_data->op_bias & MDS_DATA_MODIFIED)) {
165		struct ll_inode_info *lli = ll_i2info(inode);
166
167		spin_lock(&lli->lli_lock);
168		lli->lli_flags &= ~LLIF_DATA_MODIFIED;
169		spin_unlock(&lli->lli_lock);
170	}
171
172	ll_finish_md_op_data(op_data);
173
174	if (rc == 0) {
175		rc = ll_objects_destroy(req, inode);
176		if (rc)
177			CERROR("inode %lu ll_objects destroy: rc = %d\n",
178			       inode->i_ino, rc);
179	}
180
181	EXIT;
182out:
183
184	if (exp_connect_som(exp) && !epoch_close &&
185	    S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
186		ll_queue_done_writing(inode, LLIF_DONE_WRITING);
187	} else {
188		md_clear_open_replay_data(md_exp, och);
189		/* Free @och if it is not waiting for DONE_WRITING. */
190		och->och_fh.cookie = DEAD_HANDLE_MAGIC;
191		OBD_FREE_PTR(och);
192	}
193	if (req) /* This is close request */
194		ptlrpc_req_finished(req);
195	return rc;
196}
197
198int ll_md_real_close(struct inode *inode, int flags)
199{
200	struct ll_inode_info *lli = ll_i2info(inode);
201	struct obd_client_handle **och_p;
202	struct obd_client_handle *och;
203	__u64 *och_usecount;
204	int rc = 0;
205	ENTRY;
206
207	if (flags & FMODE_WRITE) {
208		och_p = &lli->lli_mds_write_och;
209		och_usecount = &lli->lli_open_fd_write_count;
210	} else if (flags & FMODE_EXEC) {
211		och_p = &lli->lli_mds_exec_och;
212		och_usecount = &lli->lli_open_fd_exec_count;
213	} else {
214		LASSERT(flags & FMODE_READ);
215		och_p = &lli->lli_mds_read_och;
216		och_usecount = &lli->lli_open_fd_read_count;
217	}
218
219	mutex_lock(&lli->lli_och_mutex);
220	if (*och_usecount) { /* There are still users of this handle, so
221				skip freeing it. */
222		mutex_unlock(&lli->lli_och_mutex);
223		RETURN(0);
224	}
225	och=*och_p;
226	*och_p = NULL;
227	mutex_unlock(&lli->lli_och_mutex);
228
229	if (och) { /* There might be a race and somebody have freed this och
230		      already */
231		rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
232					       inode, och);
233	}
234
235	RETURN(rc);
236}
237
238int ll_md_close(struct obd_export *md_exp, struct inode *inode,
239		struct file *file)
240{
241	struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
242	struct ll_inode_info *lli = ll_i2info(inode);
243	int rc = 0;
244	ENTRY;
245
246	/* clear group lock, if present */
247	if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED))
248		ll_put_grouplock(inode, file, fd->fd_grouplock.cg_gid);
249
250	/* Let's see if we have good enough OPEN lock on the file and if
251	   we can skip talking to MDS */
252	if (file->f_dentry->d_inode) { /* Can this ever be false? */
253		int lockmode;
254		int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
255		struct lustre_handle lockh;
256		struct inode *inode = file->f_dentry->d_inode;
257		ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
258
259		mutex_lock(&lli->lli_och_mutex);
260		if (fd->fd_omode & FMODE_WRITE) {
261			lockmode = LCK_CW;
262			LASSERT(lli->lli_open_fd_write_count);
263			lli->lli_open_fd_write_count--;
264		} else if (fd->fd_omode & FMODE_EXEC) {
265			lockmode = LCK_PR;
266			LASSERT(lli->lli_open_fd_exec_count);
267			lli->lli_open_fd_exec_count--;
268		} else {
269			lockmode = LCK_CR;
270			LASSERT(lli->lli_open_fd_read_count);
271			lli->lli_open_fd_read_count--;
272		}
273		mutex_unlock(&lli->lli_och_mutex);
274
275		if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
276				   LDLM_IBITS, &policy, lockmode,
277				   &lockh)) {
278			rc = ll_md_real_close(file->f_dentry->d_inode,
279					      fd->fd_omode);
280		}
281	} else {
282		CERROR("Releasing a file %p with negative dentry %p. Name %s",
283		       file, file->f_dentry, file->f_dentry->d_name.name);
284	}
285
286	LUSTRE_FPRIVATE(file) = NULL;
287	ll_file_data_put(fd);
288	ll_capa_close(inode);
289
290	RETURN(rc);
291}
292
293/* While this returns an error code, fput() the caller does not, so we need
294 * to make every effort to clean up all of our state here.  Also, applications
295 * rarely check close errors and even if an error is returned they will not
296 * re-try the close call.
297 */
298int ll_file_release(struct inode *inode, struct file *file)
299{
300	struct ll_file_data *fd;
301	struct ll_sb_info *sbi = ll_i2sbi(inode);
302	struct ll_inode_info *lli = ll_i2info(inode);
303	int rc;
304	ENTRY;
305
306	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
307	       inode->i_generation, inode);
308
309#ifdef CONFIG_FS_POSIX_ACL
310	if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
311	    inode == inode->i_sb->s_root->d_inode) {
312		struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
313
314		LASSERT(fd != NULL);
315		if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
316			fd->fd_flags &= ~LL_FILE_RMTACL;
317			rct_del(&sbi->ll_rct, current_pid());
318			et_search_free(&sbi->ll_et, current_pid());
319		}
320	}
321#endif
322
323	if (inode->i_sb->s_root != file->f_dentry)
324		ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
325	fd = LUSTRE_FPRIVATE(file);
326	LASSERT(fd != NULL);
327
328	/* The last ref on @file, maybe not the the owner pid of statahead.
329	 * Different processes can open the same dir, "ll_opendir_key" means:
330	 * it is me that should stop the statahead thread. */
331	if (S_ISDIR(inode->i_mode) && lli->lli_opendir_key == fd &&
332	    lli->lli_opendir_pid != 0)
333		ll_stop_statahead(inode, lli->lli_opendir_key);
334
335	if (inode->i_sb->s_root == file->f_dentry) {
336		LUSTRE_FPRIVATE(file) = NULL;
337		ll_file_data_put(fd);
338		RETURN(0);
339	}
340
341	if (!S_ISDIR(inode->i_mode)) {
342		lov_read_and_clear_async_rc(lli->lli_clob);
343		lli->lli_async_rc = 0;
344	}
345
346	rc = ll_md_close(sbi->ll_md_exp, inode, file);
347
348	if (CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_DUMP_LOG, cfs_fail_val))
349		libcfs_debug_dumplog();
350
351	RETURN(rc);
352}
353
354static int ll_intent_file_open(struct file *file, void *lmm,
355			       int lmmsize, struct lookup_intent *itp)
356{
357	struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
358	struct dentry *parent = file->f_dentry->d_parent;
359	const char *name = file->f_dentry->d_name.name;
360	const int len = file->f_dentry->d_name.len;
361	struct md_op_data *op_data;
362	struct ptlrpc_request *req;
363	__u32 opc = LUSTRE_OPC_ANY;
364	int rc;
365	ENTRY;
366
367	if (!parent)
368		RETURN(-ENOENT);
369
370	/* Usually we come here only for NFSD, and we want open lock.
371	   But we can also get here with pre 2.6.15 patchless kernels, and in
372	   that case that lock is also ok */
373	/* We can also get here if there was cached open handle in revalidate_it
374	 * but it disappeared while we were getting from there to ll_file_open.
375	 * But this means this file was closed and immediatelly opened which
376	 * makes a good candidate for using OPEN lock */
377	/* If lmmsize & lmm are not 0, we are just setting stripe info
378	 * parameters. No need for the open lock */
379	if (lmm == NULL && lmmsize == 0) {
380		itp->it_flags |= MDS_OPEN_LOCK;
381		if (itp->it_flags & FMODE_WRITE)
382			opc = LUSTRE_OPC_CREATE;
383	}
384
385	op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
386				      file->f_dentry->d_inode, name, len,
387				      O_RDWR, opc, NULL);
388	if (IS_ERR(op_data))
389		RETURN(PTR_ERR(op_data));
390
391	itp->it_flags |= MDS_OPEN_BY_FID;
392	rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
393			    0 /*unused */, &req, ll_md_blocking_ast, 0);
394	ll_finish_md_op_data(op_data);
395	if (rc == -ESTALE) {
396		/* reason for keep own exit path - don`t flood log
397		* with messages with -ESTALE errors.
398		*/
399		if (!it_disposition(itp, DISP_OPEN_OPEN) ||
400		     it_open_error(DISP_OPEN_OPEN, itp))
401			GOTO(out, rc);
402		ll_release_openhandle(file->f_dentry, itp);
403		GOTO(out, rc);
404	}
405
406	if (it_disposition(itp, DISP_LOOKUP_NEG))
407		GOTO(out, rc = -ENOENT);
408
409	if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
410		rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
411		CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
412		GOTO(out, rc);
413	}
414
415	rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL, itp);
416	if (!rc && itp->d.lustre.it_lock_mode)
417		ll_set_lock_data(sbi->ll_md_exp, file->f_dentry->d_inode,
418				 itp, NULL);
419
420out:
421	ptlrpc_req_finished(itp->d.lustre.it_data);
422	it_clear_disposition(itp, DISP_ENQ_COMPLETE);
423	ll_intent_drop_lock(itp);
424
425	RETURN(rc);
426}
427
428/**
429 * Assign an obtained @ioepoch to client's inode. No lock is needed, MDS does
430 * not believe attributes if a few ioepoch holders exist. Attributes for
431 * previous ioepoch if new one is opened are also skipped by MDS.
432 */
433void ll_ioepoch_open(struct ll_inode_info *lli, __u64 ioepoch)
434{
435	if (ioepoch && lli->lli_ioepoch != ioepoch) {
436		lli->lli_ioepoch = ioepoch;
437		CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
438		       ioepoch, PFID(&lli->lli_fid));
439	}
440}
441
442static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
443		       struct lookup_intent *it, struct obd_client_handle *och)
444{
445	struct ptlrpc_request *req = it->d.lustre.it_data;
446	struct mdt_body *body;
447
448	LASSERT(och);
449
450	body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
451	LASSERT(body != NULL);		      /* reply already checked out */
452
453	memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
454	och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
455	och->och_fid = lli->lli_fid;
456	och->och_flags = it->it_flags;
457	ll_ioepoch_open(lli, body->ioepoch);
458
459	return md_set_open_replay_data(md_exp, och, req);
460}
461
462int ll_local_open(struct file *file, struct lookup_intent *it,
463		  struct ll_file_data *fd, struct obd_client_handle *och)
464{
465	struct inode *inode = file->f_dentry->d_inode;
466	struct ll_inode_info *lli = ll_i2info(inode);
467	ENTRY;
468
469	LASSERT(!LUSTRE_FPRIVATE(file));
470
471	LASSERT(fd != NULL);
472
473	if (och) {
474		struct ptlrpc_request *req = it->d.lustre.it_data;
475		struct mdt_body *body;
476		int rc;
477
478		rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
479		if (rc)
480			RETURN(rc);
481
482		body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
483		if ((it->it_flags & FMODE_WRITE) &&
484		    (body->valid & OBD_MD_FLSIZE))
485			CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
486			       lli->lli_ioepoch, PFID(&lli->lli_fid));
487	}
488
489	LUSTRE_FPRIVATE(file) = fd;
490	ll_readahead_init(inode, &fd->fd_ras);
491	fd->fd_omode = it->it_flags;
492	RETURN(0);
493}
494
495/* Open a file, and (for the very first open) create objects on the OSTs at
496 * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
497 * creation or open until ll_lov_setstripe() ioctl is called.
498 *
499 * If we already have the stripe MD locally then we don't request it in
500 * md_open(), by passing a lmm_size = 0.
501 *
502 * It is up to the application to ensure no other processes open this file
503 * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
504 * used.  We might be able to avoid races of that sort by getting lli_open_sem
505 * before returning in the O_LOV_DELAY_CREATE case and dropping it here
506 * or in ll_file_release(), but I'm not sure that is desirable/necessary.
507 */
508int ll_file_open(struct inode *inode, struct file *file)
509{
510	struct ll_inode_info *lli = ll_i2info(inode);
511	struct lookup_intent *it, oit = { .it_op = IT_OPEN,
512					  .it_flags = file->f_flags };
513	struct obd_client_handle **och_p = NULL;
514	__u64 *och_usecount = NULL;
515	struct ll_file_data *fd;
516	int rc = 0, opendir_set = 0;
517	ENTRY;
518
519	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
520	       inode->i_generation, inode, file->f_flags);
521
522	it = file->private_data; /* XXX: compat macro */
523	file->private_data = NULL; /* prevent ll_local_open assertion */
524
525	fd = ll_file_data_get();
526	if (fd == NULL)
527		GOTO(out_och_free, rc = -ENOMEM);
528
529	fd->fd_file = file;
530	if (S_ISDIR(inode->i_mode)) {
531		spin_lock(&lli->lli_sa_lock);
532		if (lli->lli_opendir_key == NULL && lli->lli_sai == NULL &&
533		    lli->lli_opendir_pid == 0) {
534			lli->lli_opendir_key = fd;
535			lli->lli_opendir_pid = current_pid();
536			opendir_set = 1;
537		}
538		spin_unlock(&lli->lli_sa_lock);
539	}
540
541	if (inode->i_sb->s_root == file->f_dentry) {
542		LUSTRE_FPRIVATE(file) = fd;
543		RETURN(0);
544	}
545
546	if (!it || !it->d.lustre.it_disposition) {
547		/* Convert f_flags into access mode. We cannot use file->f_mode,
548		 * because everything but O_ACCMODE mask was stripped from
549		 * there */
550		if ((oit.it_flags + 1) & O_ACCMODE)
551			oit.it_flags++;
552		if (file->f_flags & O_TRUNC)
553			oit.it_flags |= FMODE_WRITE;
554
555		/* kernel only call f_op->open in dentry_open.  filp_open calls
556		 * dentry_open after call to open_namei that checks permissions.
557		 * Only nfsd_open call dentry_open directly without checking
558		 * permissions and because of that this code below is safe. */
559		if (oit.it_flags & (FMODE_WRITE | FMODE_READ))
560			oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
561
562		/* We do not want O_EXCL here, presumably we opened the file
563		 * already? XXX - NFS implications? */
564		oit.it_flags &= ~O_EXCL;
565
566		/* bug20584, if "it_flags" contains O_CREAT, the file will be
567		 * created if necessary, then "IT_CREAT" should be set to keep
568		 * consistent with it */
569		if (oit.it_flags & O_CREAT)
570			oit.it_op |= IT_CREAT;
571
572		it = &oit;
573	}
574
575restart:
576	/* Let's see if we have file open on MDS already. */
577	if (it->it_flags & FMODE_WRITE) {
578		och_p = &lli->lli_mds_write_och;
579		och_usecount = &lli->lli_open_fd_write_count;
580	} else if (it->it_flags & FMODE_EXEC) {
581		och_p = &lli->lli_mds_exec_och;
582		och_usecount = &lli->lli_open_fd_exec_count;
583	 } else {
584		och_p = &lli->lli_mds_read_och;
585		och_usecount = &lli->lli_open_fd_read_count;
586	}
587
588	mutex_lock(&lli->lli_och_mutex);
589	if (*och_p) { /* Open handle is present */
590		if (it_disposition(it, DISP_OPEN_OPEN)) {
591			/* Well, there's extra open request that we do not need,
592			   let's close it somehow. This will decref request. */
593			rc = it_open_error(DISP_OPEN_OPEN, it);
594			if (rc) {
595				mutex_unlock(&lli->lli_och_mutex);
596				GOTO(out_openerr, rc);
597			}
598
599			ll_release_openhandle(file->f_dentry, it);
600		}
601		(*och_usecount)++;
602
603		rc = ll_local_open(file, it, fd, NULL);
604		if (rc) {
605			(*och_usecount)--;
606			mutex_unlock(&lli->lli_och_mutex);
607			GOTO(out_openerr, rc);
608		}
609	} else {
610		LASSERT(*och_usecount == 0);
611		if (!it->d.lustre.it_disposition) {
612			/* We cannot just request lock handle now, new ELC code
613			   means that one of other OPEN locks for this file
614			   could be cancelled, and since blocking ast handler
615			   would attempt to grab och_mutex as well, that would
616			   result in a deadlock */
617			mutex_unlock(&lli->lli_och_mutex);
618			it->it_create_mode |= M_CHECK_STALE;
619			rc = ll_intent_file_open(file, NULL, 0, it);
620			it->it_create_mode &= ~M_CHECK_STALE;
621			if (rc)
622				GOTO(out_openerr, rc);
623
624			goto restart;
625		}
626		OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
627		if (!*och_p)
628			GOTO(out_och_free, rc = -ENOMEM);
629
630		(*och_usecount)++;
631
632		/* md_intent_lock() didn't get a request ref if there was an
633		 * open error, so don't do cleanup on the request here
634		 * (bug 3430) */
635		/* XXX (green): Should not we bail out on any error here, not
636		 * just open error? */
637		rc = it_open_error(DISP_OPEN_OPEN, it);
638		if (rc)
639			GOTO(out_och_free, rc);
640
641		LASSERT(it_disposition(it, DISP_ENQ_OPEN_REF));
642
643		rc = ll_local_open(file, it, fd, *och_p);
644		if (rc)
645			GOTO(out_och_free, rc);
646	}
647	mutex_unlock(&lli->lli_och_mutex);
648	fd = NULL;
649
650	/* Must do this outside lli_och_mutex lock to prevent deadlock where
651	   different kind of OPEN lock for this same inode gets cancelled
652	   by ldlm_cancel_lru */
653	if (!S_ISREG(inode->i_mode))
654		GOTO(out_och_free, rc);
655
656	ll_capa_open(inode);
657
658	if (!lli->lli_has_smd) {
659		if (file->f_flags & O_LOV_DELAY_CREATE ||
660		    !(file->f_mode & FMODE_WRITE)) {
661			CDEBUG(D_INODE, "object creation was delayed\n");
662			GOTO(out_och_free, rc);
663		}
664	}
665	file->f_flags &= ~O_LOV_DELAY_CREATE;
666	GOTO(out_och_free, rc);
667
668out_och_free:
669	if (rc) {
670		if (och_p && *och_p) {
671			OBD_FREE(*och_p, sizeof (struct obd_client_handle));
672			*och_p = NULL; /* OBD_FREE writes some magic there */
673			(*och_usecount)--;
674		}
675		mutex_unlock(&lli->lli_och_mutex);
676
677out_openerr:
678		if (opendir_set != 0)
679			ll_stop_statahead(inode, lli->lli_opendir_key);
680		if (fd != NULL)
681			ll_file_data_put(fd);
682	} else {
683		ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
684	}
685
686	if (it && it_disposition(it, DISP_ENQ_OPEN_REF)) {
687		ptlrpc_req_finished(it->d.lustre.it_data);
688		it_clear_disposition(it, DISP_ENQ_OPEN_REF);
689	}
690
691	return rc;
692}
693
694/* Fills the obdo with the attributes for the lsm */
695static int ll_lsm_getattr(struct lov_stripe_md *lsm, struct obd_export *exp,
696			  struct obd_capa *capa, struct obdo *obdo,
697			  __u64 ioepoch, int sync)
698{
699	struct ptlrpc_request_set *set;
700	struct obd_info	    oinfo = { { { 0 } } };
701	int			rc;
702
703	ENTRY;
704
705	LASSERT(lsm != NULL);
706
707	oinfo.oi_md = lsm;
708	oinfo.oi_oa = obdo;
709	oinfo.oi_oa->o_oi = lsm->lsm_oi;
710	oinfo.oi_oa->o_mode = S_IFREG;
711	oinfo.oi_oa->o_ioepoch = ioepoch;
712	oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
713			       OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
714			       OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
715			       OBD_MD_FLMTIME | OBD_MD_FLCTIME |
716			       OBD_MD_FLGROUP | OBD_MD_FLEPOCH |
717			       OBD_MD_FLDATAVERSION;
718	oinfo.oi_capa = capa;
719	if (sync) {
720		oinfo.oi_oa->o_valid |= OBD_MD_FLFLAGS;
721		oinfo.oi_oa->o_flags |= OBD_FL_SRVLOCK;
722	}
723
724	set = ptlrpc_prep_set();
725	if (set == NULL) {
726		CERROR("can't allocate ptlrpc set\n");
727		rc = -ENOMEM;
728	} else {
729		rc = obd_getattr_async(exp, &oinfo, set);
730		if (rc == 0)
731			rc = ptlrpc_set_wait(set);
732		ptlrpc_set_destroy(set);
733	}
734	if (rc == 0)
735		oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
736					 OBD_MD_FLATIME | OBD_MD_FLMTIME |
737					 OBD_MD_FLCTIME | OBD_MD_FLSIZE |
738					 OBD_MD_FLDATAVERSION);
739	RETURN(rc);
740}
741
742/**
743  * Performs the getattr on the inode and updates its fields.
744  * If @sync != 0, perform the getattr under the server-side lock.
745  */
746int ll_inode_getattr(struct inode *inode, struct obdo *obdo,
747		     __u64 ioepoch, int sync)
748{
749	struct obd_capa      *capa = ll_mdscapa_get(inode);
750	struct lov_stripe_md *lsm;
751	int rc;
752	ENTRY;
753
754	lsm = ccc_inode_lsm_get(inode);
755	rc = ll_lsm_getattr(lsm, ll_i2dtexp(inode),
756			    capa, obdo, ioepoch, sync);
757	capa_put(capa);
758	if (rc == 0) {
759		struct ost_id *oi = lsm ? &lsm->lsm_oi : &obdo->o_oi;
760
761		obdo_refresh_inode(inode, obdo, obdo->o_valid);
762		CDEBUG(D_INODE, "objid "DOSTID" size %llu, blocks %llu,"
763		       " blksize %lu\n", POSTID(oi), i_size_read(inode),
764		       (unsigned long long)inode->i_blocks,
765		       (unsigned long)ll_inode_blksize(inode));
766	}
767	ccc_inode_lsm_put(inode, lsm);
768	RETURN(rc);
769}
770
771int ll_merge_lvb(const struct lu_env *env, struct inode *inode)
772{
773	struct ll_inode_info *lli = ll_i2info(inode);
774	struct cl_object *obj = lli->lli_clob;
775	struct cl_attr *attr = ccc_env_thread_attr(env);
776	struct ost_lvb lvb;
777	int rc = 0;
778
779	ENTRY;
780
781	ll_inode_size_lock(inode);
782	/* merge timestamps the most recently obtained from mds with
783	   timestamps obtained from osts */
784	LTIME_S(inode->i_atime) = lli->lli_lvb.lvb_atime;
785	LTIME_S(inode->i_mtime) = lli->lli_lvb.lvb_mtime;
786	LTIME_S(inode->i_ctime) = lli->lli_lvb.lvb_ctime;
787	inode_init_lvb(inode, &lvb);
788
789	cl_object_attr_lock(obj);
790	rc = cl_object_attr_get(env, obj, attr);
791	cl_object_attr_unlock(obj);
792
793	if (rc == 0) {
794		if (lvb.lvb_atime < attr->cat_atime)
795			lvb.lvb_atime = attr->cat_atime;
796		if (lvb.lvb_ctime < attr->cat_ctime)
797			lvb.lvb_ctime = attr->cat_ctime;
798		if (lvb.lvb_mtime < attr->cat_mtime)
799			lvb.lvb_mtime = attr->cat_mtime;
800
801		CDEBUG(D_VFSTRACE, DFID" updating i_size "LPU64"\n",
802				PFID(&lli->lli_fid), attr->cat_size);
803		cl_isize_write_nolock(inode, attr->cat_size);
804
805		inode->i_blocks = attr->cat_blocks;
806
807		LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
808		LTIME_S(inode->i_atime) = lvb.lvb_atime;
809		LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
810	}
811	ll_inode_size_unlock(inode);
812
813	RETURN(rc);
814}
815
816int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
817		     lstat_t *st)
818{
819	struct obdo obdo = { 0 };
820	int rc;
821
822	rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, &obdo, 0, 0);
823	if (rc == 0) {
824		st->st_size   = obdo.o_size;
825		st->st_blocks = obdo.o_blocks;
826		st->st_mtime  = obdo.o_mtime;
827		st->st_atime  = obdo.o_atime;
828		st->st_ctime  = obdo.o_ctime;
829	}
830	return rc;
831}
832
833void ll_io_init(struct cl_io *io, const struct file *file, int write)
834{
835	struct inode *inode = file->f_dentry->d_inode;
836
837	io->u.ci_rw.crw_nonblock = file->f_flags & O_NONBLOCK;
838	if (write) {
839		io->u.ci_wr.wr_append = !!(file->f_flags & O_APPEND);
840		io->u.ci_wr.wr_sync = file->f_flags & O_SYNC ||
841				      file->f_flags & O_DIRECT ||
842				      IS_SYNC(inode);
843	}
844	io->ci_obj     = ll_i2info(inode)->lli_clob;
845	io->ci_lockreq = CILR_MAYBE;
846	if (ll_file_nolock(file)) {
847		io->ci_lockreq = CILR_NEVER;
848		io->ci_no_srvlock = 1;
849	} else if (file->f_flags & O_APPEND) {
850		io->ci_lockreq = CILR_MANDATORY;
851	}
852}
853
854static ssize_t
855ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
856		   struct file *file, enum cl_io_type iot,
857		   loff_t *ppos, size_t count)
858{
859	struct ll_inode_info *lli = ll_i2info(file->f_dentry->d_inode);
860	struct ll_file_data  *fd  = LUSTRE_FPRIVATE(file);
861	struct cl_io	 *io;
862	ssize_t	       result;
863	ENTRY;
864
865restart:
866	io = ccc_env_thread_io(env);
867	ll_io_init(io, file, iot == CIT_WRITE);
868
869	if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) {
870		struct vvp_io *vio = vvp_env_io(env);
871		struct ccc_io *cio = ccc_env_io(env);
872		int write_mutex_locked = 0;
873
874		cio->cui_fd  = LUSTRE_FPRIVATE(file);
875		vio->cui_io_subtype = args->via_io_subtype;
876
877		switch (vio->cui_io_subtype) {
878		case IO_NORMAL:
879			cio->cui_iov = args->u.normal.via_iov;
880			cio->cui_nrsegs = args->u.normal.via_nrsegs;
881			cio->cui_tot_nrsegs = cio->cui_nrsegs;
882			cio->cui_iocb = args->u.normal.via_iocb;
883			if ((iot == CIT_WRITE) &&
884			    !(cio->cui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
885				if (mutex_lock_interruptible(&lli->
886							       lli_write_mutex))
887					GOTO(out, result = -ERESTARTSYS);
888				write_mutex_locked = 1;
889			} else if (iot == CIT_READ) {
890				down_read(&lli->lli_trunc_sem);
891			}
892			break;
893		case IO_SENDFILE:
894			vio->u.sendfile.cui_actor = args->u.sendfile.via_actor;
895			vio->u.sendfile.cui_target = args->u.sendfile.via_target;
896			break;
897		case IO_SPLICE:
898			vio->u.splice.cui_pipe = args->u.splice.via_pipe;
899			vio->u.splice.cui_flags = args->u.splice.via_flags;
900			break;
901		default:
902			CERROR("Unknow IO type - %u\n", vio->cui_io_subtype);
903			LBUG();
904		}
905		result = cl_io_loop(env, io);
906		if (write_mutex_locked)
907			mutex_unlock(&lli->lli_write_mutex);
908		else if (args->via_io_subtype == IO_NORMAL && iot == CIT_READ)
909			up_read(&lli->lli_trunc_sem);
910	} else {
911		/* cl_io_rw_init() handled IO */
912		result = io->ci_result;
913	}
914
915	if (io->ci_nob > 0) {
916		result = io->ci_nob;
917		*ppos = io->u.ci_wr.wr.crw_pos;
918	}
919	GOTO(out, result);
920out:
921	cl_io_fini(env, io);
922	/* If any bit been read/written (result != 0), we just return
923	 * short read/write instead of restart io. */
924	if (result == 0 && io->ci_need_restart) {
925		CDEBUG(D_VFSTRACE, "Restart %s on %s from %lld, count:%zd\n",
926		       iot == CIT_READ ? "read" : "write",
927		       file->f_dentry->d_name.name, *ppos, count);
928		LASSERTF(io->ci_nob == 0, "%zd", io->ci_nob);
929		goto restart;
930	}
931
932	if (iot == CIT_READ) {
933		if (result >= 0)
934			ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
935					   LPROC_LL_READ_BYTES, result);
936	} else if (iot == CIT_WRITE) {
937		if (result >= 0) {
938			ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
939					   LPROC_LL_WRITE_BYTES, result);
940			fd->fd_write_failed = false;
941		} else if (result != -ERESTARTSYS) {
942			fd->fd_write_failed = true;
943		}
944	}
945
946	return result;
947}
948
949
950/*
951 * XXX: exact copy from kernel code (__generic_file_aio_write_nolock)
952 */
953static int ll_file_get_iov_count(const struct iovec *iov,
954				 unsigned long *nr_segs, size_t *count)
955{
956	size_t cnt = 0;
957	unsigned long seg;
958
959	for (seg = 0; seg < *nr_segs; seg++) {
960		const struct iovec *iv = &iov[seg];
961
962		/*
963		 * If any segment has a negative length, or the cumulative
964		 * length ever wraps negative then return -EINVAL.
965		 */
966		cnt += iv->iov_len;
967		if (unlikely((ssize_t)(cnt|iv->iov_len) < 0))
968			return -EINVAL;
969		if (access_ok(VERIFY_READ, iv->iov_base, iv->iov_len))
970			continue;
971		if (seg == 0)
972			return -EFAULT;
973		*nr_segs = seg;
974		cnt -= iv->iov_len;   /* This segment is no good */
975		break;
976	}
977	*count = cnt;
978	return 0;
979}
980
981static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
982				unsigned long nr_segs, loff_t pos)
983{
984	struct lu_env      *env;
985	struct vvp_io_args *args;
986	size_t	      count;
987	ssize_t	     result;
988	int		 refcheck;
989	ENTRY;
990
991	result = ll_file_get_iov_count(iov, &nr_segs, &count);
992	if (result)
993		RETURN(result);
994
995	env = cl_env_get(&refcheck);
996	if (IS_ERR(env))
997		RETURN(PTR_ERR(env));
998
999	args = vvp_env_args(env, IO_NORMAL);
1000	args->u.normal.via_iov = (struct iovec *)iov;
1001	args->u.normal.via_nrsegs = nr_segs;
1002	args->u.normal.via_iocb = iocb;
1003
1004	result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ,
1005				    &iocb->ki_pos, count);
1006	cl_env_put(env, &refcheck);
1007	RETURN(result);
1008}
1009
1010static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1011			    loff_t *ppos)
1012{
1013	struct lu_env *env;
1014	struct iovec  *local_iov;
1015	struct kiocb  *kiocb;
1016	ssize_t	result;
1017	int	    refcheck;
1018	ENTRY;
1019
1020	env = cl_env_get(&refcheck);
1021	if (IS_ERR(env))
1022		RETURN(PTR_ERR(env));
1023
1024	local_iov = &vvp_env_info(env)->vti_local_iov;
1025	kiocb = &vvp_env_info(env)->vti_kiocb;
1026	local_iov->iov_base = (void __user *)buf;
1027	local_iov->iov_len = count;
1028	init_sync_kiocb(kiocb, file);
1029	kiocb->ki_pos = *ppos;
1030	kiocb->ki_left = count;
1031
1032	result = ll_file_aio_read(kiocb, local_iov, 1, kiocb->ki_pos);
1033	*ppos = kiocb->ki_pos;
1034
1035	cl_env_put(env, &refcheck);
1036	RETURN(result);
1037}
1038
1039/*
1040 * Write to a file (through the page cache).
1041 */
1042static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
1043				 unsigned long nr_segs, loff_t pos)
1044{
1045	struct lu_env      *env;
1046	struct vvp_io_args *args;
1047	size_t	      count;
1048	ssize_t	     result;
1049	int		 refcheck;
1050	ENTRY;
1051
1052	result = ll_file_get_iov_count(iov, &nr_segs, &count);
1053	if (result)
1054		RETURN(result);
1055
1056	env = cl_env_get(&refcheck);
1057	if (IS_ERR(env))
1058		RETURN(PTR_ERR(env));
1059
1060	args = vvp_env_args(env, IO_NORMAL);
1061	args->u.normal.via_iov = (struct iovec *)iov;
1062	args->u.normal.via_nrsegs = nr_segs;
1063	args->u.normal.via_iocb = iocb;
1064
1065	result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_WRITE,
1066				  &iocb->ki_pos, count);
1067	cl_env_put(env, &refcheck);
1068	RETURN(result);
1069}
1070
1071static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1072			     loff_t *ppos)
1073{
1074	struct lu_env *env;
1075	struct iovec  *local_iov;
1076	struct kiocb  *kiocb;
1077	ssize_t	result;
1078	int	    refcheck;
1079	ENTRY;
1080
1081	env = cl_env_get(&refcheck);
1082	if (IS_ERR(env))
1083		RETURN(PTR_ERR(env));
1084
1085	local_iov = &vvp_env_info(env)->vti_local_iov;
1086	kiocb = &vvp_env_info(env)->vti_kiocb;
1087	local_iov->iov_base = (void __user *)buf;
1088	local_iov->iov_len = count;
1089	init_sync_kiocb(kiocb, file);
1090	kiocb->ki_pos = *ppos;
1091	kiocb->ki_left = count;
1092
1093	result = ll_file_aio_write(kiocb, local_iov, 1, kiocb->ki_pos);
1094	*ppos = kiocb->ki_pos;
1095
1096	cl_env_put(env, &refcheck);
1097	RETURN(result);
1098}
1099
1100
1101
1102/*
1103 * Send file content (through pagecache) somewhere with helper
1104 */
1105static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
1106				   struct pipe_inode_info *pipe, size_t count,
1107				   unsigned int flags)
1108{
1109	struct lu_env      *env;
1110	struct vvp_io_args *args;
1111	ssize_t	     result;
1112	int		 refcheck;
1113	ENTRY;
1114
1115	env = cl_env_get(&refcheck);
1116	if (IS_ERR(env))
1117		RETURN(PTR_ERR(env));
1118
1119	args = vvp_env_args(env, IO_SPLICE);
1120	args->u.splice.via_pipe = pipe;
1121	args->u.splice.via_flags = flags;
1122
1123	result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1124	cl_env_put(env, &refcheck);
1125	RETURN(result);
1126}
1127
1128static int ll_lov_recreate(struct inode *inode, struct ost_id *oi,
1129			   obd_count ost_idx)
1130{
1131	struct obd_export *exp = ll_i2dtexp(inode);
1132	struct obd_trans_info oti = { 0 };
1133	struct obdo *oa = NULL;
1134	int lsm_size;
1135	int rc = 0;
1136	struct lov_stripe_md *lsm = NULL, *lsm2;
1137	ENTRY;
1138
1139	OBDO_ALLOC(oa);
1140	if (oa == NULL)
1141		RETURN(-ENOMEM);
1142
1143	lsm = ccc_inode_lsm_get(inode);
1144	if (lsm == NULL)
1145		GOTO(out, rc = -ENOENT);
1146
1147	lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1148		   (lsm->lsm_stripe_count));
1149
1150	OBD_ALLOC_LARGE(lsm2, lsm_size);
1151	if (lsm2 == NULL)
1152		GOTO(out, rc = -ENOMEM);
1153
1154	oa->o_oi = *oi;
1155	oa->o_nlink = ost_idx;
1156	oa->o_flags |= OBD_FL_RECREATE_OBJS;
1157	oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1158	obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1159				   OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1160	obdo_set_parent_fid(oa, &ll_i2info(inode)->lli_fid);
1161	memcpy(lsm2, lsm, lsm_size);
1162	ll_inode_size_lock(inode);
1163	rc = obd_create(NULL, exp, oa, &lsm2, &oti);
1164	ll_inode_size_unlock(inode);
1165
1166	OBD_FREE_LARGE(lsm2, lsm_size);
1167	GOTO(out, rc);
1168out:
1169	ccc_inode_lsm_put(inode, lsm);
1170	OBDO_FREE(oa);
1171	return rc;
1172}
1173
1174static int ll_lov_recreate_obj(struct inode *inode, unsigned long arg)
1175{
1176	struct ll_recreate_obj ucreat;
1177	struct ost_id		oi;
1178	ENTRY;
1179
1180	if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1181		RETURN(-EPERM);
1182
1183	if (copy_from_user(&ucreat, (struct ll_recreate_obj *)arg,
1184			   sizeof(ucreat)))
1185		RETURN(-EFAULT);
1186
1187	ostid_set_seq_mdt0(&oi);
1188	ostid_set_id(&oi, ucreat.lrc_id);
1189	RETURN(ll_lov_recreate(inode, &oi, ucreat.lrc_ost_idx));
1190}
1191
1192static int ll_lov_recreate_fid(struct inode *inode, unsigned long arg)
1193{
1194	struct lu_fid	fid;
1195	struct ost_id	oi;
1196	obd_count	ost_idx;
1197	ENTRY;
1198
1199	if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1200		RETURN(-EPERM);
1201
1202	if (copy_from_user(&fid, (struct lu_fid *)arg, sizeof(fid)))
1203		RETURN(-EFAULT);
1204
1205	fid_to_ostid(&fid, &oi);
1206	ost_idx = (fid_seq(&fid) >> 16) & 0xffff;
1207	RETURN(ll_lov_recreate(inode, &oi, ost_idx));
1208}
1209
1210int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1211			     int flags, struct lov_user_md *lum, int lum_size)
1212{
1213	struct lov_stripe_md *lsm = NULL;
1214	struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1215	int rc = 0;
1216	ENTRY;
1217
1218	lsm = ccc_inode_lsm_get(inode);
1219	if (lsm != NULL) {
1220		ccc_inode_lsm_put(inode, lsm);
1221		CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1222		       inode->i_ino);
1223		RETURN(-EEXIST);
1224	}
1225
1226	ll_inode_size_lock(inode);
1227	rc = ll_intent_file_open(file, lum, lum_size, &oit);
1228	if (rc)
1229		GOTO(out, rc);
1230	rc = oit.d.lustre.it_status;
1231	if (rc < 0)
1232		GOTO(out_req_free, rc);
1233
1234	ll_release_openhandle(file->f_dentry, &oit);
1235
1236 out:
1237	ll_inode_size_unlock(inode);
1238	ll_intent_release(&oit);
1239	ccc_inode_lsm_put(inode, lsm);
1240	RETURN(rc);
1241out_req_free:
1242	ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1243	goto out;
1244}
1245
1246int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
1247			     struct lov_mds_md **lmmp, int *lmm_size,
1248			     struct ptlrpc_request **request)
1249{
1250	struct ll_sb_info *sbi = ll_i2sbi(inode);
1251	struct mdt_body  *body;
1252	struct lov_mds_md *lmm = NULL;
1253	struct ptlrpc_request *req = NULL;
1254	struct md_op_data *op_data;
1255	int rc, lmmsize;
1256
1257	rc = ll_get_max_mdsize(sbi, &lmmsize);
1258	if (rc)
1259		RETURN(rc);
1260
1261	op_data = ll_prep_md_op_data(NULL, inode, NULL, filename,
1262				     strlen(filename), lmmsize,
1263				     LUSTRE_OPC_ANY, NULL);
1264	if (IS_ERR(op_data))
1265		RETURN(PTR_ERR(op_data));
1266
1267	op_data->op_valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA;
1268	rc = md_getattr_name(sbi->ll_md_exp, op_data, &req);
1269	ll_finish_md_op_data(op_data);
1270	if (rc < 0) {
1271		CDEBUG(D_INFO, "md_getattr_name failed "
1272		       "on %s: rc %d\n", filename, rc);
1273		GOTO(out, rc);
1274	}
1275
1276	body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1277	LASSERT(body != NULL); /* checked by mdc_getattr_name */
1278
1279	lmmsize = body->eadatasize;
1280
1281	if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1282			lmmsize == 0) {
1283		GOTO(out, rc = -ENODATA);
1284	}
1285
1286	lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
1287	LASSERT(lmm != NULL);
1288
1289	if ((lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1)) &&
1290	    (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3))) {
1291		GOTO(out, rc = -EPROTO);
1292	}
1293
1294	/*
1295	 * This is coming from the MDS, so is probably in
1296	 * little endian.  We convert it to host endian before
1297	 * passing it to userspace.
1298	 */
1299	if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) {
1300		/* if function called for directory - we should
1301		 * avoid swab not existent lsm objects */
1302		if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) {
1303			lustre_swab_lov_user_md_v1((struct lov_user_md_v1 *)lmm);
1304			if (S_ISREG(body->mode))
1305				lustre_swab_lov_user_md_objects(
1306				 ((struct lov_user_md_v1 *)lmm)->lmm_objects,
1307				 ((struct lov_user_md_v1 *)lmm)->lmm_stripe_count);
1308		} else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) {
1309			lustre_swab_lov_user_md_v3((struct lov_user_md_v3 *)lmm);
1310			if (S_ISREG(body->mode))
1311				lustre_swab_lov_user_md_objects(
1312				 ((struct lov_user_md_v3 *)lmm)->lmm_objects,
1313				 ((struct lov_user_md_v3 *)lmm)->lmm_stripe_count);
1314		}
1315	}
1316
1317out:
1318	*lmmp = lmm;
1319	*lmm_size = lmmsize;
1320	*request = req;
1321	return rc;
1322}
1323
1324static int ll_lov_setea(struct inode *inode, struct file *file,
1325			    unsigned long arg)
1326{
1327	int			 flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1328	struct lov_user_md	*lump;
1329	int			 lum_size = sizeof(struct lov_user_md) +
1330					    sizeof(struct lov_user_ost_data);
1331	int			 rc;
1332	ENTRY;
1333
1334	if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1335		RETURN(-EPERM);
1336
1337	OBD_ALLOC_LARGE(lump, lum_size);
1338	if (lump == NULL)
1339		RETURN(-ENOMEM);
1340
1341	if (copy_from_user(lump, (struct lov_user_md  *)arg, lum_size)) {
1342		OBD_FREE_LARGE(lump, lum_size);
1343		RETURN(-EFAULT);
1344	}
1345
1346	rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1347
1348	OBD_FREE_LARGE(lump, lum_size);
1349	RETURN(rc);
1350}
1351
1352static int ll_lov_setstripe(struct inode *inode, struct file *file,
1353			    unsigned long arg)
1354{
1355	struct lov_user_md_v3	 lumv3;
1356	struct lov_user_md_v1	*lumv1 = (struct lov_user_md_v1 *)&lumv3;
1357	struct lov_user_md_v1	*lumv1p = (struct lov_user_md_v1 *)arg;
1358	struct lov_user_md_v3	*lumv3p = (struct lov_user_md_v3 *)arg;
1359	int			 lum_size, rc;
1360	int			 flags = FMODE_WRITE;
1361	ENTRY;
1362
1363	/* first try with v1 which is smaller than v3 */
1364	lum_size = sizeof(struct lov_user_md_v1);
1365	if (copy_from_user(lumv1, lumv1p, lum_size))
1366		RETURN(-EFAULT);
1367
1368	if (lumv1->lmm_magic == LOV_USER_MAGIC_V3) {
1369		lum_size = sizeof(struct lov_user_md_v3);
1370		if (copy_from_user(&lumv3, lumv3p, lum_size))
1371			RETURN(-EFAULT);
1372	}
1373
1374	rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
1375	if (rc == 0) {
1376		struct lov_stripe_md *lsm;
1377		__u32 gen;
1378
1379		put_user(0, &lumv1p->lmm_stripe_count);
1380
1381		ll_layout_refresh(inode, &gen);
1382		lsm = ccc_inode_lsm_get(inode);
1383		rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1384				   0, lsm, (void *)arg);
1385		ccc_inode_lsm_put(inode, lsm);
1386	}
1387	RETURN(rc);
1388}
1389
1390static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1391{
1392	struct lov_stripe_md *lsm;
1393	int rc = -ENODATA;
1394	ENTRY;
1395
1396	lsm = ccc_inode_lsm_get(inode);
1397	if (lsm != NULL)
1398		rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0,
1399				   lsm, (void *)arg);
1400	ccc_inode_lsm_put(inode, lsm);
1401	RETURN(rc);
1402}
1403
1404int ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1405{
1406	struct ll_inode_info   *lli = ll_i2info(inode);
1407	struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1408	struct ccc_grouplock    grouplock;
1409	int		     rc;
1410	ENTRY;
1411
1412	if (ll_file_nolock(file))
1413		RETURN(-EOPNOTSUPP);
1414
1415	spin_lock(&lli->lli_lock);
1416	if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1417		CWARN("group lock already existed with gid %lu\n",
1418		      fd->fd_grouplock.cg_gid);
1419		spin_unlock(&lli->lli_lock);
1420		RETURN(-EINVAL);
1421	}
1422	LASSERT(fd->fd_grouplock.cg_lock == NULL);
1423	spin_unlock(&lli->lli_lock);
1424
1425	rc = cl_get_grouplock(cl_i2info(inode)->lli_clob,
1426			      arg, (file->f_flags & O_NONBLOCK), &grouplock);
1427	if (rc)
1428		RETURN(rc);
1429
1430	spin_lock(&lli->lli_lock);
1431	if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1432		spin_unlock(&lli->lli_lock);
1433		CERROR("another thread just won the race\n");
1434		cl_put_grouplock(&grouplock);
1435		RETURN(-EINVAL);
1436	}
1437
1438	fd->fd_flags |= LL_FILE_GROUP_LOCKED;
1439	fd->fd_grouplock = grouplock;
1440	spin_unlock(&lli->lli_lock);
1441
1442	CDEBUG(D_INFO, "group lock %lu obtained\n", arg);
1443	RETURN(0);
1444}
1445
1446int ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1447{
1448	struct ll_inode_info   *lli = ll_i2info(inode);
1449	struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1450	struct ccc_grouplock    grouplock;
1451	ENTRY;
1452
1453	spin_lock(&lli->lli_lock);
1454	if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1455		spin_unlock(&lli->lli_lock);
1456		CWARN("no group lock held\n");
1457		RETURN(-EINVAL);
1458	}
1459	LASSERT(fd->fd_grouplock.cg_lock != NULL);
1460
1461	if (fd->fd_grouplock.cg_gid != arg) {
1462		CWARN("group lock %lu doesn't match current id %lu\n",
1463		       arg, fd->fd_grouplock.cg_gid);
1464		spin_unlock(&lli->lli_lock);
1465		RETURN(-EINVAL);
1466	}
1467
1468	grouplock = fd->fd_grouplock;
1469	memset(&fd->fd_grouplock, 0, sizeof(fd->fd_grouplock));
1470	fd->fd_flags &= ~LL_FILE_GROUP_LOCKED;
1471	spin_unlock(&lli->lli_lock);
1472
1473	cl_put_grouplock(&grouplock);
1474	CDEBUG(D_INFO, "group lock %lu released\n", arg);
1475	RETURN(0);
1476}
1477
1478/**
1479 * Close inode open handle
1480 *
1481 * \param dentry [in]     dentry which contains the inode
1482 * \param it     [in,out] intent which contains open info and result
1483 *
1484 * \retval 0     success
1485 * \retval <0    failure
1486 */
1487int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
1488{
1489	struct inode *inode = dentry->d_inode;
1490	struct obd_client_handle *och;
1491	int rc;
1492	ENTRY;
1493
1494	LASSERT(inode);
1495
1496	/* Root ? Do nothing. */
1497	if (dentry->d_inode->i_sb->s_root == dentry)
1498		RETURN(0);
1499
1500	/* No open handle to close? Move away */
1501	if (!it_disposition(it, DISP_OPEN_OPEN))
1502		RETURN(0);
1503
1504	LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
1505
1506	OBD_ALLOC(och, sizeof(*och));
1507	if (!och)
1508		GOTO(out, rc = -ENOMEM);
1509
1510	ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
1511		    ll_i2info(inode), it, och);
1512
1513	rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
1514				       inode, och);
1515 out:
1516	/* this one is in place of ll_file_open */
1517	if (it_disposition(it, DISP_ENQ_OPEN_REF)) {
1518		ptlrpc_req_finished(it->d.lustre.it_data);
1519		it_clear_disposition(it, DISP_ENQ_OPEN_REF);
1520	}
1521	RETURN(rc);
1522}
1523
1524/**
1525 * Get size for inode for which FIEMAP mapping is requested.
1526 * Make the FIEMAP get_info call and returns the result.
1527 */
1528int ll_do_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
1529	      int num_bytes)
1530{
1531	struct obd_export *exp = ll_i2dtexp(inode);
1532	struct lov_stripe_md *lsm = NULL;
1533	struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
1534	int vallen = num_bytes;
1535	int rc;
1536	ENTRY;
1537
1538	/* Checks for fiemap flags */
1539	if (fiemap->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
1540		fiemap->fm_flags &= ~LUSTRE_FIEMAP_FLAGS_COMPAT;
1541		return -EBADR;
1542	}
1543
1544	/* Check for FIEMAP_FLAG_SYNC */
1545	if (fiemap->fm_flags & FIEMAP_FLAG_SYNC) {
1546		rc = filemap_fdatawrite(inode->i_mapping);
1547		if (rc)
1548			return rc;
1549	}
1550
1551	lsm = ccc_inode_lsm_get(inode);
1552	if (lsm == NULL)
1553		return -ENOENT;
1554
1555	/* If the stripe_count > 1 and the application does not understand
1556	 * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
1557	 */
1558	if (lsm->lsm_stripe_count > 1 &&
1559	    !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
1560		GOTO(out, rc = -EOPNOTSUPP);
1561
1562	fm_key.oa.o_oi = lsm->lsm_oi;
1563	fm_key.oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
1564
1565	obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLSIZE);
1566	obdo_set_parent_fid(&fm_key.oa, &ll_i2info(inode)->lli_fid);
1567	/* If filesize is 0, then there would be no objects for mapping */
1568	if (fm_key.oa.o_size == 0) {
1569		fiemap->fm_mapped_extents = 0;
1570		GOTO(out, rc = 0);
1571	}
1572
1573	memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
1574
1575	rc = obd_get_info(NULL, exp, sizeof(fm_key), &fm_key, &vallen,
1576			  fiemap, lsm);
1577	if (rc)
1578		CERROR("obd_get_info failed: rc = %d\n", rc);
1579
1580out:
1581	ccc_inode_lsm_put(inode, lsm);
1582	RETURN(rc);
1583}
1584
1585int ll_fid2path(struct inode *inode, void *arg)
1586{
1587	struct obd_export	*exp = ll_i2mdexp(inode);
1588	struct getinfo_fid2path	*gfout, *gfin;
1589	int			 outsize, rc;
1590	ENTRY;
1591
1592	if (!cfs_capable(CFS_CAP_DAC_READ_SEARCH) &&
1593	    !(ll_i2sbi(inode)->ll_flags & LL_SBI_USER_FID2PATH))
1594		RETURN(-EPERM);
1595
1596	/* Need to get the buflen */
1597	OBD_ALLOC_PTR(gfin);
1598	if (gfin == NULL)
1599		RETURN(-ENOMEM);
1600	if (copy_from_user(gfin, arg, sizeof(*gfin))) {
1601		OBD_FREE_PTR(gfin);
1602		RETURN(-EFAULT);
1603	}
1604
1605	outsize = sizeof(*gfout) + gfin->gf_pathlen;
1606	OBD_ALLOC(gfout, outsize);
1607	if (gfout == NULL) {
1608		OBD_FREE_PTR(gfin);
1609		RETURN(-ENOMEM);
1610	}
1611	memcpy(gfout, gfin, sizeof(*gfout));
1612	OBD_FREE_PTR(gfin);
1613
1614	/* Call mdc_iocontrol */
1615	rc = obd_iocontrol(OBD_IOC_FID2PATH, exp, outsize, gfout, NULL);
1616	if (rc)
1617		GOTO(gf_free, rc);
1618
1619	if (copy_to_user(arg, gfout, outsize))
1620		rc = -EFAULT;
1621
1622gf_free:
1623	OBD_FREE(gfout, outsize);
1624	RETURN(rc);
1625}
1626
1627static int ll_ioctl_fiemap(struct inode *inode, unsigned long arg)
1628{
1629	struct ll_user_fiemap *fiemap_s;
1630	size_t num_bytes, ret_bytes;
1631	unsigned int extent_count;
1632	int rc = 0;
1633
1634	/* Get the extent count so we can calculate the size of
1635	 * required fiemap buffer */
1636	if (get_user(extent_count,
1637	    &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
1638		RETURN(-EFAULT);
1639	num_bytes = sizeof(*fiemap_s) + (extent_count *
1640					 sizeof(struct ll_fiemap_extent));
1641
1642	OBD_ALLOC_LARGE(fiemap_s, num_bytes);
1643	if (fiemap_s == NULL)
1644		RETURN(-ENOMEM);
1645
1646	/* get the fiemap value */
1647	if (copy_from_user(fiemap_s, (struct ll_user_fiemap __user *)arg,
1648			   sizeof(*fiemap_s)))
1649		GOTO(error, rc = -EFAULT);
1650
1651	/* If fm_extent_count is non-zero, read the first extent since
1652	 * it is used to calculate end_offset and device from previous
1653	 * fiemap call. */
1654	if (extent_count) {
1655		if (copy_from_user(&fiemap_s->fm_extents[0],
1656		    (char __user *)arg + sizeof(*fiemap_s),
1657		    sizeof(struct ll_fiemap_extent)))
1658			GOTO(error, rc = -EFAULT);
1659	}
1660
1661	rc = ll_do_fiemap(inode, fiemap_s, num_bytes);
1662	if (rc)
1663		GOTO(error, rc);
1664
1665	ret_bytes = sizeof(struct ll_user_fiemap);
1666
1667	if (extent_count != 0)
1668		ret_bytes += (fiemap_s->fm_mapped_extents *
1669				 sizeof(struct ll_fiemap_extent));
1670
1671	if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
1672		rc = -EFAULT;
1673
1674error:
1675	OBD_FREE_LARGE(fiemap_s, num_bytes);
1676	RETURN(rc);
1677}
1678
1679/*
1680 * Read the data_version for inode.
1681 *
1682 * This value is computed using stripe object version on OST.
1683 * Version is computed using server side locking.
1684 *
1685 * @param extent_lock  Take extent lock. Not needed if a process is already
1686 *		       holding the OST object group locks.
1687 */
1688int ll_data_version(struct inode *inode, __u64 *data_version,
1689		    int extent_lock)
1690{
1691	struct lov_stripe_md	*lsm = NULL;
1692	struct ll_sb_info	*sbi = ll_i2sbi(inode);
1693	struct obdo		*obdo = NULL;
1694	int			 rc;
1695	ENTRY;
1696
1697	/* If no stripe, we consider version is 0. */
1698	lsm = ccc_inode_lsm_get(inode);
1699	if (lsm == NULL) {
1700		*data_version = 0;
1701		CDEBUG(D_INODE, "No object for inode\n");
1702		RETURN(0);
1703	}
1704
1705	OBD_ALLOC_PTR(obdo);
1706	if (obdo == NULL) {
1707		ccc_inode_lsm_put(inode, lsm);
1708		RETURN(-ENOMEM);
1709	}
1710
1711	rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, obdo, 0, extent_lock);
1712	if (!rc) {
1713		if (!(obdo->o_valid & OBD_MD_FLDATAVERSION))
1714			rc = -EOPNOTSUPP;
1715		else
1716			*data_version = obdo->o_data_version;
1717	}
1718
1719	OBD_FREE_PTR(obdo);
1720	ccc_inode_lsm_put(inode, lsm);
1721
1722	RETURN(rc);
1723}
1724
1725struct ll_swap_stack {
1726	struct iattr		 ia1, ia2;
1727	__u64			 dv1, dv2;
1728	struct inode		*inode1, *inode2;
1729	bool			 check_dv1, check_dv2;
1730};
1731
1732static int ll_swap_layouts(struct file *file1, struct file *file2,
1733			   struct lustre_swap_layouts *lsl)
1734{
1735	struct mdc_swap_layouts	 msl;
1736	struct md_op_data	*op_data;
1737	__u32			 gid;
1738	__u64			 dv;
1739	struct ll_swap_stack	*llss = NULL;
1740	int			 rc;
1741
1742	OBD_ALLOC_PTR(llss);
1743	if (llss == NULL)
1744		RETURN(-ENOMEM);
1745
1746	llss->inode1 = file1->f_dentry->d_inode;
1747	llss->inode2 = file2->f_dentry->d_inode;
1748
1749	if (!S_ISREG(llss->inode2->i_mode))
1750		GOTO(free, rc = -EINVAL);
1751
1752	if (ll_permission(llss->inode1, MAY_WRITE, NULL) ||
1753	    ll_permission(llss->inode2, MAY_WRITE, NULL))
1754		GOTO(free, rc = -EPERM);
1755
1756	if (llss->inode2->i_sb != llss->inode1->i_sb)
1757		GOTO(free, rc = -EXDEV);
1758
1759	/* we use 2 bool because it is easier to swap than 2 bits */
1760	if (lsl->sl_flags & SWAP_LAYOUTS_CHECK_DV1)
1761		llss->check_dv1 = true;
1762
1763	if (lsl->sl_flags & SWAP_LAYOUTS_CHECK_DV2)
1764		llss->check_dv2 = true;
1765
1766	/* we cannot use lsl->sl_dvX directly because we may swap them */
1767	llss->dv1 = lsl->sl_dv1;
1768	llss->dv2 = lsl->sl_dv2;
1769
1770	rc = lu_fid_cmp(ll_inode2fid(llss->inode1), ll_inode2fid(llss->inode2));
1771	if (rc == 0) /* same file, done! */
1772		GOTO(free, rc = 0);
1773
1774	if (rc < 0) { /* sequentialize it */
1775		swap(llss->inode1, llss->inode2);
1776		swap(file1, file2);
1777		swap(llss->dv1, llss->dv2);
1778		swap(llss->check_dv1, llss->check_dv2);
1779	}
1780
1781	gid = lsl->sl_gid;
1782	if (gid != 0) { /* application asks to flush dirty cache */
1783		rc = ll_get_grouplock(llss->inode1, file1, gid);
1784		if (rc < 0)
1785			GOTO(free, rc);
1786
1787		rc = ll_get_grouplock(llss->inode2, file2, gid);
1788		if (rc < 0) {
1789			ll_put_grouplock(llss->inode1, file1, gid);
1790			GOTO(free, rc);
1791		}
1792	}
1793
1794	/* to be able to restore mtime and atime after swap
1795	 * we need to first save them */
1796	if (lsl->sl_flags &
1797	    (SWAP_LAYOUTS_KEEP_MTIME | SWAP_LAYOUTS_KEEP_ATIME)) {
1798		llss->ia1.ia_mtime = llss->inode1->i_mtime;
1799		llss->ia1.ia_atime = llss->inode1->i_atime;
1800		llss->ia1.ia_valid = ATTR_MTIME | ATTR_ATIME;
1801		llss->ia2.ia_mtime = llss->inode2->i_mtime;
1802		llss->ia2.ia_atime = llss->inode2->i_atime;
1803		llss->ia2.ia_valid = ATTR_MTIME | ATTR_ATIME;
1804	}
1805
1806	/* ultimate check, before swaping the layouts we check if
1807	 * dataversion has changed (if requested) */
1808	if (llss->check_dv1) {
1809		rc = ll_data_version(llss->inode1, &dv, 0);
1810		if (rc)
1811			GOTO(putgl, rc);
1812		if (dv != llss->dv1)
1813			GOTO(putgl, rc = -EAGAIN);
1814	}
1815
1816	if (llss->check_dv2) {
1817		rc = ll_data_version(llss->inode2, &dv, 0);
1818		if (rc)
1819			GOTO(putgl, rc);
1820		if (dv != llss->dv2)
1821			GOTO(putgl, rc = -EAGAIN);
1822	}
1823
1824	/* struct md_op_data is used to send the swap args to the mdt
1825	 * only flags is missing, so we use struct mdc_swap_layouts
1826	 * through the md_op_data->op_data */
1827	/* flags from user space have to be converted before they are send to
1828	 * server, no flag is sent today, they are only used on the client */
1829	msl.msl_flags = 0;
1830	rc = -ENOMEM;
1831	op_data = ll_prep_md_op_data(NULL, llss->inode1, llss->inode2, NULL, 0,
1832				     0, LUSTRE_OPC_ANY, &msl);
1833	if (op_data != NULL) {
1834		rc = obd_iocontrol(LL_IOC_LOV_SWAP_LAYOUTS,
1835				   ll_i2mdexp(llss->inode1),
1836				   sizeof(*op_data), op_data, NULL);
1837		ll_finish_md_op_data(op_data);
1838	}
1839
1840putgl:
1841	if (gid != 0) {
1842		ll_put_grouplock(llss->inode2, file2, gid);
1843		ll_put_grouplock(llss->inode1, file1, gid);
1844	}
1845
1846	/* rc can be set from obd_iocontrol() or from a GOTO(putgl, ...) */
1847	if (rc != 0)
1848		GOTO(free, rc);
1849
1850	/* clear useless flags */
1851	if (!(lsl->sl_flags & SWAP_LAYOUTS_KEEP_MTIME)) {
1852		llss->ia1.ia_valid &= ~ATTR_MTIME;
1853		llss->ia2.ia_valid &= ~ATTR_MTIME;
1854	}
1855
1856	if (!(lsl->sl_flags & SWAP_LAYOUTS_KEEP_ATIME)) {
1857		llss->ia1.ia_valid &= ~ATTR_ATIME;
1858		llss->ia2.ia_valid &= ~ATTR_ATIME;
1859	}
1860
1861	/* update time if requested */
1862	rc = 0;
1863	if (llss->ia2.ia_valid != 0) {
1864		mutex_lock(&llss->inode1->i_mutex);
1865		rc = ll_setattr(file1->f_dentry, &llss->ia2);
1866		mutex_unlock(&llss->inode1->i_mutex);
1867	}
1868
1869	if (llss->ia1.ia_valid != 0) {
1870		int rc1;
1871
1872		mutex_lock(&llss->inode2->i_mutex);
1873		rc1 = ll_setattr(file2->f_dentry, &llss->ia1);
1874		mutex_unlock(&llss->inode2->i_mutex);
1875		if (rc == 0)
1876			rc = rc1;
1877	}
1878
1879free:
1880	if (llss != NULL)
1881		OBD_FREE_PTR(llss);
1882
1883	RETURN(rc);
1884}
1885
1886long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
1887{
1888	struct inode		*inode = file->f_dentry->d_inode;
1889	struct ll_file_data	*fd = LUSTRE_FPRIVATE(file);
1890	int			 flags, rc;
1891	ENTRY;
1892
1893	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
1894	       inode->i_generation, inode, cmd);
1895	ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
1896
1897	/* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
1898	if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
1899		RETURN(-ENOTTY);
1900
1901	switch(cmd) {
1902	case LL_IOC_GETFLAGS:
1903		/* Get the current value of the file flags */
1904		return put_user(fd->fd_flags, (int *)arg);
1905	case LL_IOC_SETFLAGS:
1906	case LL_IOC_CLRFLAGS:
1907		/* Set or clear specific file flags */
1908		/* XXX This probably needs checks to ensure the flags are
1909		 *     not abused, and to handle any flag side effects.
1910		 */
1911		if (get_user(flags, (int *) arg))
1912			RETURN(-EFAULT);
1913
1914		if (cmd == LL_IOC_SETFLAGS) {
1915			if ((flags & LL_FILE_IGNORE_LOCK) &&
1916			    !(file->f_flags & O_DIRECT)) {
1917				CERROR("%s: unable to disable locking on "
1918				       "non-O_DIRECT file\n", current->comm);
1919				RETURN(-EINVAL);
1920			}
1921
1922			fd->fd_flags |= flags;
1923		} else {
1924			fd->fd_flags &= ~flags;
1925		}
1926		RETURN(0);
1927	case LL_IOC_LOV_SETSTRIPE:
1928		RETURN(ll_lov_setstripe(inode, file, arg));
1929	case LL_IOC_LOV_SETEA:
1930		RETURN(ll_lov_setea(inode, file, arg));
1931	case LL_IOC_LOV_SWAP_LAYOUTS: {
1932		struct file *file2;
1933		struct lustre_swap_layouts lsl;
1934
1935		if (copy_from_user(&lsl, (char *)arg,
1936				       sizeof(struct lustre_swap_layouts)))
1937			RETURN(-EFAULT);
1938
1939		if ((file->f_flags & O_ACCMODE) == 0) /* O_RDONLY */
1940			RETURN(-EPERM);
1941
1942		file2 = fget(lsl.sl_fd);
1943		if (file2 == NULL)
1944			RETURN(-EBADF);
1945
1946		rc = -EPERM;
1947		if ((file2->f_flags & O_ACCMODE) != 0) /* O_WRONLY or O_RDWR */
1948			rc = ll_swap_layouts(file, file2, &lsl);
1949		fput(file2);
1950		RETURN(rc);
1951	}
1952	case LL_IOC_LOV_GETSTRIPE:
1953		RETURN(ll_lov_getstripe(inode, arg));
1954	case LL_IOC_RECREATE_OBJ:
1955		RETURN(ll_lov_recreate_obj(inode, arg));
1956	case LL_IOC_RECREATE_FID:
1957		RETURN(ll_lov_recreate_fid(inode, arg));
1958	case FSFILT_IOC_FIEMAP:
1959		RETURN(ll_ioctl_fiemap(inode, arg));
1960	case FSFILT_IOC_GETFLAGS:
1961	case FSFILT_IOC_SETFLAGS:
1962		RETURN(ll_iocontrol(inode, file, cmd, arg));
1963	case FSFILT_IOC_GETVERSION_OLD:
1964	case FSFILT_IOC_GETVERSION:
1965		RETURN(put_user(inode->i_generation, (int *)arg));
1966	case LL_IOC_GROUP_LOCK:
1967		RETURN(ll_get_grouplock(inode, file, arg));
1968	case LL_IOC_GROUP_UNLOCK:
1969		RETURN(ll_put_grouplock(inode, file, arg));
1970	case IOC_OBD_STATFS:
1971		RETURN(ll_obd_statfs(inode, (void *)arg));
1972
1973	/* We need to special case any other ioctls we want to handle,
1974	 * to send them to the MDS/OST as appropriate and to properly
1975	 * network encode the arg field.
1976	case FSFILT_IOC_SETVERSION_OLD:
1977	case FSFILT_IOC_SETVERSION:
1978	*/
1979	case LL_IOC_FLUSHCTX:
1980		RETURN(ll_flush_ctx(inode));
1981	case LL_IOC_PATH2FID: {
1982		if (copy_to_user((void *)arg, ll_inode2fid(inode),
1983				 sizeof(struct lu_fid)))
1984			RETURN(-EFAULT);
1985
1986		RETURN(0);
1987	}
1988	case OBD_IOC_FID2PATH:
1989		RETURN(ll_fid2path(inode, (void *)arg));
1990	case LL_IOC_DATA_VERSION: {
1991		struct ioc_data_version	idv;
1992		int			rc;
1993
1994		if (copy_from_user(&idv, (char *)arg, sizeof(idv)))
1995			RETURN(-EFAULT);
1996
1997		rc = ll_data_version(inode, &idv.idv_version,
1998				!(idv.idv_flags & LL_DV_NOFLUSH));
1999
2000		if (rc == 0 && copy_to_user((char *) arg, &idv, sizeof(idv)))
2001			RETURN(-EFAULT);
2002
2003		RETURN(rc);
2004	}
2005
2006	case LL_IOC_GET_MDTIDX: {
2007		int mdtidx;
2008
2009		mdtidx = ll_get_mdt_idx(inode);
2010		if (mdtidx < 0)
2011			RETURN(mdtidx);
2012
2013		if (put_user((int)mdtidx, (int*)arg))
2014			RETURN(-EFAULT);
2015
2016		RETURN(0);
2017	}
2018	case OBD_IOC_GETDTNAME:
2019	case OBD_IOC_GETMDNAME:
2020		RETURN(ll_get_obd_name(inode, cmd, arg));
2021	case LL_IOC_HSM_STATE_GET: {
2022		struct md_op_data	*op_data;
2023		struct hsm_user_state	*hus;
2024		int			 rc;
2025
2026		OBD_ALLOC_PTR(hus);
2027		if (hus == NULL)
2028			RETURN(-ENOMEM);
2029
2030		op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2031					     LUSTRE_OPC_ANY, hus);
2032		if (op_data == NULL) {
2033			OBD_FREE_PTR(hus);
2034			RETURN(-ENOMEM);
2035		}
2036
2037		rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2038				   op_data, NULL);
2039
2040		if (copy_to_user((void *)arg, hus, sizeof(*hus)))
2041			rc = -EFAULT;
2042
2043		ll_finish_md_op_data(op_data);
2044		OBD_FREE_PTR(hus);
2045		RETURN(rc);
2046	}
2047	case LL_IOC_HSM_STATE_SET: {
2048		struct md_op_data	*op_data;
2049		struct hsm_state_set	*hss;
2050		int			 rc;
2051
2052		OBD_ALLOC_PTR(hss);
2053		if (hss == NULL)
2054			RETURN(-ENOMEM);
2055		if (copy_from_user(hss, (char *)arg, sizeof(*hss))) {
2056			OBD_FREE_PTR(hss);
2057			RETURN(-EFAULT);
2058		}
2059
2060		/* Non-root users are forbidden to set or clear flags which are
2061		 * NOT defined in HSM_USER_MASK. */
2062		if (((hss->hss_setmask | hss->hss_clearmask) & ~HSM_USER_MASK)
2063		    && !cfs_capable(CFS_CAP_SYS_ADMIN)) {
2064			OBD_FREE_PTR(hss);
2065			RETURN(-EPERM);
2066		}
2067
2068		op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2069					     LUSTRE_OPC_ANY, hss);
2070		if (op_data == NULL) {
2071			OBD_FREE_PTR(hss);
2072			RETURN(-ENOMEM);
2073		}
2074
2075		rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2076				   op_data, NULL);
2077
2078		ll_finish_md_op_data(op_data);
2079
2080		OBD_FREE_PTR(hss);
2081		RETURN(rc);
2082	}
2083	case LL_IOC_HSM_ACTION: {
2084		struct md_op_data		*op_data;
2085		struct hsm_current_action	*hca;
2086		int				 rc;
2087
2088		OBD_ALLOC_PTR(hca);
2089		if (hca == NULL)
2090			RETURN(-ENOMEM);
2091
2092		op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2093					     LUSTRE_OPC_ANY, hca);
2094		if (op_data == NULL) {
2095			OBD_FREE_PTR(hca);
2096			RETURN(-ENOMEM);
2097		}
2098
2099		rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2100				   op_data, NULL);
2101
2102		if (copy_to_user((char *)arg, hca, sizeof(*hca)))
2103			rc = -EFAULT;
2104
2105		ll_finish_md_op_data(op_data);
2106		OBD_FREE_PTR(hca);
2107		RETURN(rc);
2108	}
2109	default: {
2110		int err;
2111
2112		if (LLIOC_STOP ==
2113		     ll_iocontrol_call(inode, file, cmd, arg, &err))
2114			RETURN(err);
2115
2116		RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2117				     (void *)arg));
2118	}
2119	}
2120}
2121
2122
2123loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2124{
2125	struct inode *inode = file->f_dentry->d_inode;
2126	loff_t retval, eof = 0;
2127
2128	ENTRY;
2129	retval = offset + ((origin == SEEK_END) ? i_size_read(inode) :
2130			   (origin == SEEK_CUR) ? file->f_pos : 0);
2131	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%llu=%#llx(%d)\n",
2132	       inode->i_ino, inode->i_generation, inode, retval, retval,
2133	       origin);
2134	ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2135
2136	if (origin == SEEK_END || origin == SEEK_HOLE || origin == SEEK_DATA) {
2137		retval = ll_glimpse_size(inode);
2138		if (retval != 0)
2139			RETURN(retval);
2140		eof = i_size_read(inode);
2141	}
2142
2143	retval = ll_generic_file_llseek_size(file, offset, origin,
2144					  ll_file_maxbytes(inode), eof);
2145	RETURN(retval);
2146}
2147
2148int ll_flush(struct file *file, fl_owner_t id)
2149{
2150	struct inode *inode = file->f_dentry->d_inode;
2151	struct ll_inode_info *lli = ll_i2info(inode);
2152	struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2153	int rc, err;
2154
2155	LASSERT(!S_ISDIR(inode->i_mode));
2156
2157	/* catch async errors that were recorded back when async writeback
2158	 * failed for pages in this mapping. */
2159	rc = lli->lli_async_rc;
2160	lli->lli_async_rc = 0;
2161	err = lov_read_and_clear_async_rc(lli->lli_clob);
2162	if (rc == 0)
2163		rc = err;
2164
2165	/* The application has been told write failure already.
2166	 * Do not report failure again. */
2167	if (fd->fd_write_failed)
2168		return 0;
2169	return rc ? -EIO : 0;
2170}
2171
2172/**
2173 * Called to make sure a portion of file has been written out.
2174 * if @local_only is not true, it will send OST_SYNC RPCs to ost.
2175 *
2176 * Return how many pages have been written.
2177 */
2178int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end,
2179		       enum cl_fsync_mode mode, int ignore_layout)
2180{
2181	struct cl_env_nest nest;
2182	struct lu_env *env;
2183	struct cl_io *io;
2184	struct obd_capa *capa = NULL;
2185	struct cl_fsync_io *fio;
2186	int result;
2187	ENTRY;
2188
2189	if (mode != CL_FSYNC_NONE && mode != CL_FSYNC_LOCAL &&
2190	    mode != CL_FSYNC_DISCARD && mode != CL_FSYNC_ALL)
2191		RETURN(-EINVAL);
2192
2193	env = cl_env_nested_get(&nest);
2194	if (IS_ERR(env))
2195		RETURN(PTR_ERR(env));
2196
2197	capa = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2198
2199	io = ccc_env_thread_io(env);
2200	io->ci_obj = cl_i2info(inode)->lli_clob;
2201	io->ci_ignore_layout = ignore_layout;
2202
2203	/* initialize parameters for sync */
2204	fio = &io->u.ci_fsync;
2205	fio->fi_capa = capa;
2206	fio->fi_start = start;
2207	fio->fi_end = end;
2208	fio->fi_fid = ll_inode2fid(inode);
2209	fio->fi_mode = mode;
2210	fio->fi_nr_written = 0;
2211
2212	if (cl_io_init(env, io, CIT_FSYNC, io->ci_obj) == 0)
2213		result = cl_io_loop(env, io);
2214	else
2215		result = io->ci_result;
2216	if (result == 0)
2217		result = fio->fi_nr_written;
2218	cl_io_fini(env, io);
2219	cl_env_nested_put(&nest, env);
2220
2221	capa_put(capa);
2222
2223	RETURN(result);
2224}
2225
2226/*
2227 * When dentry is provided (the 'else' case), *file->f_dentry may be
2228 * null and dentry must be used directly rather than pulled from
2229 * *file->f_dentry as is done otherwise.
2230 */
2231
2232int ll_fsync(struct file *file, loff_t start, loff_t end, int datasync)
2233{
2234	struct dentry *dentry = file->f_dentry;
2235	struct inode *inode = dentry->d_inode;
2236	struct ll_inode_info *lli = ll_i2info(inode);
2237	struct ptlrpc_request *req;
2238	struct obd_capa *oc;
2239	int rc, err;
2240	ENTRY;
2241
2242	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2243	       inode->i_generation, inode);
2244	ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2245
2246	rc = filemap_write_and_wait_range(inode->i_mapping, start, end);
2247	mutex_lock(&inode->i_mutex);
2248
2249	/* catch async errors that were recorded back when async writeback
2250	 * failed for pages in this mapping. */
2251	if (!S_ISDIR(inode->i_mode)) {
2252		err = lli->lli_async_rc;
2253		lli->lli_async_rc = 0;
2254		if (rc == 0)
2255			rc = err;
2256		err = lov_read_and_clear_async_rc(lli->lli_clob);
2257		if (rc == 0)
2258			rc = err;
2259	}
2260
2261	oc = ll_mdscapa_get(inode);
2262	err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2263		      &req);
2264	capa_put(oc);
2265	if (!rc)
2266		rc = err;
2267	if (!err)
2268		ptlrpc_req_finished(req);
2269
2270	if (datasync && S_ISREG(inode->i_mode)) {
2271		struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2272
2273		err = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF,
2274				CL_FSYNC_ALL, 0);
2275		if (rc == 0 && err < 0)
2276			rc = err;
2277		if (rc < 0)
2278			fd->fd_write_failed = true;
2279		else
2280			fd->fd_write_failed = false;
2281	}
2282
2283	mutex_unlock(&inode->i_mutex);
2284	RETURN(rc);
2285}
2286
2287int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2288{
2289	struct inode *inode = file->f_dentry->d_inode;
2290	struct ll_sb_info *sbi = ll_i2sbi(inode);
2291	struct ldlm_enqueue_info einfo = { .ei_type = LDLM_FLOCK,
2292					   .ei_cb_cp =ldlm_flock_completion_ast,
2293					   .ei_cbdata = file_lock };
2294	struct md_op_data *op_data;
2295	struct lustre_handle lockh = {0};
2296	ldlm_policy_data_t flock = {{0}};
2297	int flags = 0;
2298	int rc;
2299	int rc2 = 0;
2300	ENTRY;
2301
2302	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2303	       inode->i_ino, file_lock);
2304
2305	ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2306
2307	if (file_lock->fl_flags & FL_FLOCK) {
2308		LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2309		/* flocks are whole-file locks */
2310		flock.l_flock.end = OFFSET_MAX;
2311		/* For flocks owner is determined by the local file desctiptor*/
2312		flock.l_flock.owner = (unsigned long)file_lock->fl_file;
2313	} else if (file_lock->fl_flags & FL_POSIX) {
2314		flock.l_flock.owner = (unsigned long)file_lock->fl_owner;
2315		flock.l_flock.start = file_lock->fl_start;
2316		flock.l_flock.end = file_lock->fl_end;
2317	} else {
2318		RETURN(-EINVAL);
2319	}
2320	flock.l_flock.pid = file_lock->fl_pid;
2321
2322	/* Somewhat ugly workaround for svc lockd.
2323	 * lockd installs custom fl_lmops->lm_compare_owner that checks
2324	 * for the fl_owner to be the same (which it always is on local node
2325	 * I guess between lockd processes) and then compares pid.
2326	 * As such we assign pid to the owner field to make it all work,
2327	 * conflict with normal locks is unlikely since pid space and
2328	 * pointer space for current->files are not intersecting */
2329	if (file_lock->fl_lmops && file_lock->fl_lmops->lm_compare_owner)
2330		flock.l_flock.owner = (unsigned long)file_lock->fl_pid;
2331
2332	switch (file_lock->fl_type) {
2333	case F_RDLCK:
2334		einfo.ei_mode = LCK_PR;
2335		break;
2336	case F_UNLCK:
2337		/* An unlock request may or may not have any relation to
2338		 * existing locks so we may not be able to pass a lock handle
2339		 * via a normal ldlm_lock_cancel() request. The request may even
2340		 * unlock a byte range in the middle of an existing lock. In
2341		 * order to process an unlock request we need all of the same
2342		 * information that is given with a normal read or write record
2343		 * lock request. To avoid creating another ldlm unlock (cancel)
2344		 * message we'll treat a LCK_NL flock request as an unlock. */
2345		einfo.ei_mode = LCK_NL;
2346		break;
2347	case F_WRLCK:
2348		einfo.ei_mode = LCK_PW;
2349		break;
2350	default:
2351		CDEBUG(D_INFO, "Unknown fcntl lock type: %d\n",
2352			file_lock->fl_type);
2353		RETURN (-ENOTSUPP);
2354	}
2355
2356	switch (cmd) {
2357	case F_SETLKW:
2358#ifdef F_SETLKW64
2359	case F_SETLKW64:
2360#endif
2361		flags = 0;
2362		break;
2363	case F_SETLK:
2364#ifdef F_SETLK64
2365	case F_SETLK64:
2366#endif
2367		flags = LDLM_FL_BLOCK_NOWAIT;
2368		break;
2369	case F_GETLK:
2370#ifdef F_GETLK64
2371	case F_GETLK64:
2372#endif
2373		flags = LDLM_FL_TEST_LOCK;
2374		/* Save the old mode so that if the mode in the lock changes we
2375		 * can decrement the appropriate reader or writer refcount. */
2376		file_lock->fl_type = einfo.ei_mode;
2377		break;
2378	default:
2379		CERROR("unknown fcntl lock command: %d\n", cmd);
2380		RETURN (-EINVAL);
2381	}
2382
2383	op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2384				     LUSTRE_OPC_ANY, NULL);
2385	if (IS_ERR(op_data))
2386		RETURN(PTR_ERR(op_data));
2387
2388	CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2389	       "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2390	       flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2391
2392	rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2393			op_data, &lockh, &flock, 0, NULL /* req */, flags);
2394
2395	if ((file_lock->fl_flags & FL_FLOCK) &&
2396	    (rc == 0 || file_lock->fl_type == F_UNLCK))
2397		rc2  = flock_lock_file_wait(file, file_lock);
2398	if ((file_lock->fl_flags & FL_POSIX) &&
2399	    (rc == 0 || file_lock->fl_type == F_UNLCK) &&
2400	    !(flags & LDLM_FL_TEST_LOCK))
2401		rc2  = posix_lock_file_wait(file, file_lock);
2402
2403	if (rc2 && file_lock->fl_type != F_UNLCK) {
2404		einfo.ei_mode = LCK_NL;
2405		md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2406			op_data, &lockh, &flock, 0, NULL /* req */, flags);
2407		rc = rc2;
2408	}
2409
2410	ll_finish_md_op_data(op_data);
2411
2412	RETURN(rc);
2413}
2414
2415int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2416{
2417	ENTRY;
2418
2419	RETURN(-ENOSYS);
2420}
2421
2422/**
2423 * test if some locks matching bits and l_req_mode are acquired
2424 * - bits can be in different locks
2425 * - if found clear the common lock bits in *bits
2426 * - the bits not found, are kept in *bits
2427 * \param inode [IN]
2428 * \param bits [IN] searched lock bits [IN]
2429 * \param l_req_mode [IN] searched lock mode
2430 * \retval boolean, true iff all bits are found
2431 */
2432int ll_have_md_lock(struct inode *inode, __u64 *bits,  ldlm_mode_t l_req_mode)
2433{
2434	struct lustre_handle lockh;
2435	ldlm_policy_data_t policy;
2436	ldlm_mode_t mode = (l_req_mode == LCK_MINMODE) ?
2437				(LCK_CR|LCK_CW|LCK_PR|LCK_PW) : l_req_mode;
2438	struct lu_fid *fid;
2439	__u64 flags;
2440	int i;
2441	ENTRY;
2442
2443	if (!inode)
2444	       RETURN(0);
2445
2446	fid = &ll_i2info(inode)->lli_fid;
2447	CDEBUG(D_INFO, "trying to match res "DFID" mode %s\n", PFID(fid),
2448	       ldlm_lockname[mode]);
2449
2450	flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2451	for (i = 0; i <= MDS_INODELOCK_MAXSHIFT && *bits != 0; i++) {
2452		policy.l_inodebits.bits = *bits & (1 << i);
2453		if (policy.l_inodebits.bits == 0)
2454			continue;
2455
2456		if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS,
2457				  &policy, mode, &lockh)) {
2458			struct ldlm_lock *lock;
2459
2460			lock = ldlm_handle2lock(&lockh);
2461			if (lock) {
2462				*bits &=
2463				      ~(lock->l_policy_data.l_inodebits.bits);
2464				LDLM_LOCK_PUT(lock);
2465			} else {
2466				*bits &= ~policy.l_inodebits.bits;
2467			}
2468		}
2469	}
2470	RETURN(*bits == 0);
2471}
2472
2473ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2474			    struct lustre_handle *lockh, __u64 flags)
2475{
2476	ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2477	struct lu_fid *fid;
2478	ldlm_mode_t rc;
2479	ENTRY;
2480
2481	fid = &ll_i2info(inode)->lli_fid;
2482	CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2483
2484	rc = md_lock_match(ll_i2mdexp(inode), LDLM_FL_BLOCK_GRANTED|flags,
2485			   fid, LDLM_IBITS, &policy,
2486			   LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2487	RETURN(rc);
2488}
2489
2490static int ll_inode_revalidate_fini(struct inode *inode, int rc)
2491{
2492	/* Already unlinked. Just update nlink and return success */
2493	if (rc == -ENOENT) {
2494		clear_nlink(inode);
2495		/* This path cannot be hit for regular files unless in
2496		 * case of obscure races, so no need to to validate
2497		 * size. */
2498		if (!S_ISREG(inode->i_mode) && !S_ISDIR(inode->i_mode))
2499			return 0;
2500	} else if (rc != 0) {
2501		CERROR("%s: revalidate FID "DFID" error: rc = %d\n",
2502		       ll_get_fsname(inode->i_sb, NULL, 0),
2503		       PFID(ll_inode2fid(inode)), rc);
2504	}
2505
2506	return rc;
2507}
2508
2509int __ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2510			     __u64 ibits)
2511{
2512	struct inode *inode = dentry->d_inode;
2513	struct ptlrpc_request *req = NULL;
2514	struct obd_export *exp;
2515	int rc = 0;
2516	ENTRY;
2517
2518	LASSERT(inode != NULL);
2519
2520	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2521	       inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2522
2523	exp = ll_i2mdexp(inode);
2524
2525	/* XXX: Enable OBD_CONNECT_ATTRFID to reduce unnecessary getattr RPC.
2526	 *      But under CMD case, it caused some lock issues, should be fixed
2527	 *      with new CMD ibits lock. See bug 12718 */
2528	if (exp_connect_flags(exp) & OBD_CONNECT_ATTRFID) {
2529		struct lookup_intent oit = { .it_op = IT_GETATTR };
2530		struct md_op_data *op_data;
2531
2532		if (ibits == MDS_INODELOCK_LOOKUP)
2533			oit.it_op = IT_LOOKUP;
2534
2535		/* Call getattr by fid, so do not provide name at all. */
2536		op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2537					     dentry->d_inode, NULL, 0, 0,
2538					     LUSTRE_OPC_ANY, NULL);
2539		if (IS_ERR(op_data))
2540			RETURN(PTR_ERR(op_data));
2541
2542		oit.it_create_mode |= M_CHECK_STALE;
2543		rc = md_intent_lock(exp, op_data, NULL, 0,
2544				    /* we are not interested in name
2545				       based lookup */
2546				    &oit, 0, &req,
2547				    ll_md_blocking_ast, 0);
2548		ll_finish_md_op_data(op_data);
2549		oit.it_create_mode &= ~M_CHECK_STALE;
2550		if (rc < 0) {
2551			rc = ll_inode_revalidate_fini(inode, rc);
2552			GOTO (out, rc);
2553		}
2554
2555		rc = ll_revalidate_it_finish(req, &oit, dentry);
2556		if (rc != 0) {
2557			ll_intent_release(&oit);
2558			GOTO(out, rc);
2559		}
2560
2561		/* Unlinked? Unhash dentry, so it is not picked up later by
2562		   do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2563		   here to preserve get_cwd functionality on 2.6.
2564		   Bug 10503 */
2565		if (!dentry->d_inode->i_nlink)
2566			d_lustre_invalidate(dentry, 0);
2567
2568		ll_lookup_finish_locks(&oit, dentry);
2569	} else if (!ll_have_md_lock(dentry->d_inode, &ibits, LCK_MINMODE)) {
2570		struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2571		obd_valid valid = OBD_MD_FLGETATTR;
2572		struct md_op_data *op_data;
2573		int ealen = 0;
2574
2575		if (S_ISREG(inode->i_mode)) {
2576			rc = ll_get_max_mdsize(sbi, &ealen);
2577			if (rc)
2578				RETURN(rc);
2579			valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2580		}
2581
2582		op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL,
2583					     0, ealen, LUSTRE_OPC_ANY,
2584					     NULL);
2585		if (IS_ERR(op_data))
2586			RETURN(PTR_ERR(op_data));
2587
2588		op_data->op_valid = valid;
2589		/* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2590		 * capa for this inode. Because we only keep capas of dirs
2591		 * fresh. */
2592		rc = md_getattr(sbi->ll_md_exp, op_data, &req);
2593		ll_finish_md_op_data(op_data);
2594		if (rc) {
2595			rc = ll_inode_revalidate_fini(inode, rc);
2596			RETURN(rc);
2597		}
2598
2599		rc = ll_prep_inode(&inode, req, NULL, NULL);
2600	}
2601out:
2602	ptlrpc_req_finished(req);
2603	return rc;
2604}
2605
2606int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2607			   __u64 ibits)
2608{
2609	struct inode *inode = dentry->d_inode;
2610	int rc;
2611	ENTRY;
2612
2613	rc = __ll_inode_revalidate_it(dentry, it, ibits);
2614	if (rc != 0)
2615		RETURN(rc);
2616
2617	/* if object isn't regular file, don't validate size */
2618	if (!S_ISREG(inode->i_mode)) {
2619		LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime;
2620		LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
2621		LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
2622	} else {
2623		rc = ll_glimpse_size(inode);
2624	}
2625	RETURN(rc);
2626}
2627
2628int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2629		  struct lookup_intent *it, struct kstat *stat)
2630{
2631	struct inode *inode = de->d_inode;
2632	struct ll_sb_info *sbi = ll_i2sbi(inode);
2633	struct ll_inode_info *lli = ll_i2info(inode);
2634	int res = 0;
2635
2636	res = ll_inode_revalidate_it(de, it, MDS_INODELOCK_UPDATE |
2637					     MDS_INODELOCK_LOOKUP);
2638	ll_stats_ops_tally(sbi, LPROC_LL_GETATTR, 1);
2639
2640	if (res)
2641		return res;
2642
2643	stat->dev = inode->i_sb->s_dev;
2644	if (ll_need_32bit_api(sbi))
2645		stat->ino = cl_fid_build_ino(&lli->lli_fid, 1);
2646	else
2647		stat->ino = inode->i_ino;
2648	stat->mode = inode->i_mode;
2649	stat->nlink = inode->i_nlink;
2650	stat->uid = inode->i_uid;
2651	stat->gid = inode->i_gid;
2652	stat->rdev = inode->i_rdev;
2653	stat->atime = inode->i_atime;
2654	stat->mtime = inode->i_mtime;
2655	stat->ctime = inode->i_ctime;
2656	stat->blksize = 1 << inode->i_blkbits;
2657
2658	stat->size = i_size_read(inode);
2659	stat->blocks = inode->i_blocks;
2660
2661	return 0;
2662}
2663int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2664{
2665	struct lookup_intent it = { .it_op = IT_GETATTR };
2666
2667	return ll_getattr_it(mnt, de, &it, stat);
2668}
2669
2670
2671struct posix_acl * ll_get_acl(struct inode *inode, int type)
2672{
2673	struct ll_inode_info *lli = ll_i2info(inode);
2674	struct posix_acl *acl = NULL;
2675	ENTRY;
2676
2677	spin_lock(&lli->lli_lock);
2678	/* VFS' acl_permission_check->check_acl will release the refcount */
2679	acl = posix_acl_dup(lli->lli_posix_acl);
2680	spin_unlock(&lli->lli_lock);
2681
2682	RETURN(acl);
2683}
2684
2685
2686int ll_inode_permission(struct inode *inode, int mask)
2687{
2688	int rc = 0;
2689	ENTRY;
2690
2691#ifdef MAY_NOT_BLOCK
2692	if (mask & MAY_NOT_BLOCK)
2693		return -ECHILD;
2694#endif
2695
2696       /* as root inode are NOT getting validated in lookup operation,
2697	* need to do it before permission check. */
2698
2699	if (inode == inode->i_sb->s_root->d_inode) {
2700		struct lookup_intent it = { .it_op = IT_LOOKUP };
2701
2702		rc = __ll_inode_revalidate_it(inode->i_sb->s_root, &it,
2703					      MDS_INODELOCK_LOOKUP);
2704		if (rc)
2705			RETURN(rc);
2706	}
2707
2708	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), inode mode %x mask %o\n",
2709	       inode->i_ino, inode->i_generation, inode, inode->i_mode, mask);
2710
2711	if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2712		return lustre_check_remote_perm(inode, mask);
2713
2714	ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2715	rc = ll_generic_permission(inode, mask, flags, ll_check_acl);
2716
2717	RETURN(rc);
2718}
2719
2720#define READ_METHOD aio_read
2721#define READ_FUNCTION ll_file_aio_read
2722#define WRITE_METHOD aio_write
2723#define WRITE_FUNCTION ll_file_aio_write
2724
2725/* -o localflock - only provides locally consistent flock locks */
2726struct file_operations ll_file_operations = {
2727	.read	   = ll_file_read,
2728	.READ_METHOD    = READ_FUNCTION,
2729	.write	  = ll_file_write,
2730	.WRITE_METHOD   = WRITE_FUNCTION,
2731	.unlocked_ioctl = ll_file_ioctl,
2732	.open	   = ll_file_open,
2733	.release	= ll_file_release,
2734	.mmap	   = ll_file_mmap,
2735	.llseek	 = ll_file_seek,
2736	.splice_read    = ll_file_splice_read,
2737	.fsync	  = ll_fsync,
2738	.flush	  = ll_flush
2739};
2740
2741struct file_operations ll_file_operations_flock = {
2742	.read	   = ll_file_read,
2743	.READ_METHOD    = READ_FUNCTION,
2744	.write	  = ll_file_write,
2745	.WRITE_METHOD   = WRITE_FUNCTION,
2746	.unlocked_ioctl = ll_file_ioctl,
2747	.open	   = ll_file_open,
2748	.release	= ll_file_release,
2749	.mmap	   = ll_file_mmap,
2750	.llseek	 = ll_file_seek,
2751	.splice_read    = ll_file_splice_read,
2752	.fsync	  = ll_fsync,
2753	.flush	  = ll_flush,
2754	.flock	  = ll_file_flock,
2755	.lock	   = ll_file_flock
2756};
2757
2758/* These are for -o noflock - to return ENOSYS on flock calls */
2759struct file_operations ll_file_operations_noflock = {
2760	.read	   = ll_file_read,
2761	.READ_METHOD    = READ_FUNCTION,
2762	.write	  = ll_file_write,
2763	.WRITE_METHOD   = WRITE_FUNCTION,
2764	.unlocked_ioctl = ll_file_ioctl,
2765	.open	   = ll_file_open,
2766	.release	= ll_file_release,
2767	.mmap	   = ll_file_mmap,
2768	.llseek	 = ll_file_seek,
2769	.splice_read    = ll_file_splice_read,
2770	.fsync	  = ll_fsync,
2771	.flush	  = ll_flush,
2772	.flock	  = ll_file_noflock,
2773	.lock	   = ll_file_noflock
2774};
2775
2776struct inode_operations ll_file_inode_operations = {
2777	.setattr	= ll_setattr,
2778	.getattr	= ll_getattr,
2779	.permission	= ll_inode_permission,
2780	.setxattr	= ll_setxattr,
2781	.getxattr	= ll_getxattr,
2782	.listxattr	= ll_listxattr,
2783	.removexattr	= ll_removexattr,
2784	.get_acl	= ll_get_acl,
2785};
2786
2787/* dynamic ioctl number support routins */
2788static struct llioc_ctl_data {
2789	struct rw_semaphore	ioc_sem;
2790	struct list_head	      ioc_head;
2791} llioc = {
2792	__RWSEM_INITIALIZER(llioc.ioc_sem),
2793	LIST_HEAD_INIT(llioc.ioc_head)
2794};
2795
2796
2797struct llioc_data {
2798	struct list_head	      iocd_list;
2799	unsigned int	    iocd_size;
2800	llioc_callback_t	iocd_cb;
2801	unsigned int	    iocd_count;
2802	unsigned int	    iocd_cmd[0];
2803};
2804
2805void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
2806{
2807	unsigned int size;
2808	struct llioc_data *in_data = NULL;
2809	ENTRY;
2810
2811	if (cb == NULL || cmd == NULL ||
2812	    count > LLIOC_MAX_CMD || count < 0)
2813		RETURN(NULL);
2814
2815	size = sizeof(*in_data) + count * sizeof(unsigned int);
2816	OBD_ALLOC(in_data, size);
2817	if (in_data == NULL)
2818		RETURN(NULL);
2819
2820	memset(in_data, 0, sizeof(*in_data));
2821	in_data->iocd_size = size;
2822	in_data->iocd_cb = cb;
2823	in_data->iocd_count = count;
2824	memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
2825
2826	down_write(&llioc.ioc_sem);
2827	list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
2828	up_write(&llioc.ioc_sem);
2829
2830	RETURN(in_data);
2831}
2832
2833void ll_iocontrol_unregister(void *magic)
2834{
2835	struct llioc_data *tmp;
2836
2837	if (magic == NULL)
2838		return;
2839
2840	down_write(&llioc.ioc_sem);
2841	list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
2842		if (tmp == magic) {
2843			unsigned int size = tmp->iocd_size;
2844
2845			list_del(&tmp->iocd_list);
2846			up_write(&llioc.ioc_sem);
2847
2848			OBD_FREE(tmp, size);
2849			return;
2850		}
2851	}
2852	up_write(&llioc.ioc_sem);
2853
2854	CWARN("didn't find iocontrol register block with magic: %p\n", magic);
2855}
2856
2857EXPORT_SYMBOL(ll_iocontrol_register);
2858EXPORT_SYMBOL(ll_iocontrol_unregister);
2859
2860enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file,
2861			unsigned int cmd, unsigned long arg, int *rcp)
2862{
2863	enum llioc_iter ret = LLIOC_CONT;
2864	struct llioc_data *data;
2865	int rc = -EINVAL, i;
2866
2867	down_read(&llioc.ioc_sem);
2868	list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
2869		for (i = 0; i < data->iocd_count; i++) {
2870			if (cmd != data->iocd_cmd[i])
2871				continue;
2872
2873			ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
2874			break;
2875		}
2876
2877		if (ret == LLIOC_STOP)
2878			break;
2879	}
2880	up_read(&llioc.ioc_sem);
2881
2882	if (rcp)
2883		*rcp = rc;
2884	return ret;
2885}
2886
2887int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf)
2888{
2889	struct ll_inode_info *lli = ll_i2info(inode);
2890	struct cl_env_nest nest;
2891	struct lu_env *env;
2892	int result;
2893	ENTRY;
2894
2895	if (lli->lli_clob == NULL)
2896		RETURN(0);
2897
2898	env = cl_env_nested_get(&nest);
2899	if (IS_ERR(env))
2900		RETURN(PTR_ERR(env));
2901
2902	result = cl_conf_set(env, lli->lli_clob, conf);
2903	cl_env_nested_put(&nest, env);
2904
2905	if (conf->coc_opc == OBJECT_CONF_SET) {
2906		struct ldlm_lock *lock = conf->coc_lock;
2907
2908		LASSERT(lock != NULL);
2909		LASSERT(ldlm_has_layout(lock));
2910		if (result == 0) {
2911			/* it can only be allowed to match after layout is
2912			 * applied to inode otherwise false layout would be
2913			 * seen. Applying layout shoud happen before dropping
2914			 * the intent lock. */
2915			ldlm_lock_allow_match(lock);
2916		}
2917	}
2918	RETURN(result);
2919}
2920
2921/* Fetch layout from MDT with getxattr request, if it's not ready yet */
2922static int ll_layout_fetch(struct inode *inode, struct ldlm_lock *lock)
2923
2924{
2925	struct ll_sb_info *sbi = ll_i2sbi(inode);
2926	struct obd_capa *oc;
2927	struct ptlrpc_request *req;
2928	struct mdt_body *body;
2929	void *lvbdata;
2930	void *lmm;
2931	int lmmsize;
2932	int rc;
2933	ENTRY;
2934
2935	if (lock->l_lvb_data != NULL)
2936		RETURN(0);
2937
2938	/* if layout lock was granted right away, the layout is returned
2939	 * within DLM_LVB of dlm reply; otherwise if the lock was ever
2940	 * blocked and then granted via completion ast, we have to fetch
2941	 * layout here. Please note that we can't use the LVB buffer in
2942	 * completion AST because it doesn't have a large enough buffer */
2943	oc = ll_mdscapa_get(inode);
2944	rc = ll_get_max_mdsize(sbi, &lmmsize);
2945	if (rc == 0)
2946		rc = md_getxattr(sbi->ll_md_exp, ll_inode2fid(inode), oc,
2947				OBD_MD_FLXATTR, XATTR_NAME_LOV, NULL, 0,
2948				lmmsize, 0, &req);
2949	capa_put(oc);
2950	if (rc < 0)
2951		RETURN(rc);
2952
2953	body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
2954	if (body == NULL || body->eadatasize > lmmsize)
2955		GOTO(out, rc = -EPROTO);
2956
2957	lmmsize = body->eadatasize;
2958	if (lmmsize == 0) /* empty layout */
2959		GOTO(out, rc = 0);
2960
2961	lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_EADATA, lmmsize);
2962	if (lmm == NULL)
2963		GOTO(out, rc = -EFAULT);
2964
2965	OBD_ALLOC_LARGE(lvbdata, lmmsize);
2966	if (lvbdata == NULL)
2967		GOTO(out, rc = -ENOMEM);
2968
2969	memcpy(lvbdata, lmm, lmmsize);
2970	lock_res_and_lock(lock);
2971	if (lock->l_lvb_data == NULL) {
2972		lock->l_lvb_data = lvbdata;
2973		lock->l_lvb_len = lmmsize;
2974		lvbdata = NULL;
2975	}
2976	unlock_res_and_lock(lock);
2977
2978	if (lvbdata != NULL)
2979		OBD_FREE_LARGE(lvbdata, lmmsize);
2980	EXIT;
2981
2982out:
2983	ptlrpc_req_finished(req);
2984	return rc;
2985}
2986
2987/**
2988 * Apply the layout to the inode. Layout lock is held and will be released
2989 * in this function.
2990 */
2991static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode,
2992				struct inode *inode, __u32 *gen, bool reconf)
2993{
2994	struct ll_inode_info *lli = ll_i2info(inode);
2995	struct ll_sb_info    *sbi = ll_i2sbi(inode);
2996	struct ldlm_lock *lock;
2997	struct lustre_md md = { NULL };
2998	struct cl_object_conf conf;
2999	int rc = 0;
3000	bool lvb_ready;
3001	bool wait_layout = false;
3002	ENTRY;
3003
3004	LASSERT(lustre_handle_is_used(lockh));
3005
3006	lock = ldlm_handle2lock(lockh);
3007	LASSERT(lock != NULL);
3008	LASSERT(ldlm_has_layout(lock));
3009
3010	LDLM_DEBUG(lock, "File %p/"DFID" being reconfigured: %d.\n",
3011		inode, PFID(&lli->lli_fid), reconf);
3012
3013	/* in case this is a caching lock and reinstate with new inode */
3014	md_set_lock_data(sbi->ll_md_exp, &lockh->cookie, inode, NULL);
3015
3016	lock_res_and_lock(lock);
3017	lvb_ready = !!(lock->l_flags & LDLM_FL_LVB_READY);
3018	unlock_res_and_lock(lock);
3019	/* checking lvb_ready is racy but this is okay. The worst case is
3020	 * that multi processes may configure the file on the same time. */
3021	if (lvb_ready || !reconf) {
3022		rc = -ENODATA;
3023		if (lvb_ready) {
3024			/* layout_gen must be valid if layout lock is not
3025			 * cancelled and stripe has already set */
3026			*gen = lli->lli_layout_gen;
3027			rc = 0;
3028		}
3029		GOTO(out, rc);
3030	}
3031
3032	rc = ll_layout_fetch(inode, lock);
3033	if (rc < 0)
3034		GOTO(out, rc);
3035
3036	/* for layout lock, lmm is returned in lock's lvb.
3037	 * lvb_data is immutable if the lock is held so it's safe to access it
3038	 * without res lock. See the description in ldlm_lock_decref_internal()
3039	 * for the condition to free lvb_data of layout lock */
3040	if (lock->l_lvb_data != NULL) {
3041		rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
3042				  lock->l_lvb_data, lock->l_lvb_len);
3043		if (rc >= 0) {
3044			*gen = LL_LAYOUT_GEN_EMPTY;
3045			if (md.lsm != NULL)
3046				*gen = md.lsm->lsm_layout_gen;
3047			rc = 0;
3048		} else {
3049			CERROR("%s: file "DFID" unpackmd error: %d\n",
3050				ll_get_fsname(inode->i_sb, NULL, 0),
3051				PFID(&lli->lli_fid), rc);
3052		}
3053	}
3054	if (rc < 0)
3055		GOTO(out, rc);
3056
3057	/* set layout to file. Unlikely this will fail as old layout was
3058	 * surely eliminated */
3059	memset(&conf, 0, sizeof conf);
3060	conf.coc_opc = OBJECT_CONF_SET;
3061	conf.coc_inode = inode;
3062	conf.coc_lock = lock;
3063	conf.u.coc_md = &md;
3064	rc = ll_layout_conf(inode, &conf);
3065
3066	if (md.lsm != NULL)
3067		obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
3068
3069	/* refresh layout failed, need to wait */
3070	wait_layout = rc == -EBUSY;
3071	EXIT;
3072
3073out:
3074	LDLM_LOCK_PUT(lock);
3075	ldlm_lock_decref(lockh, mode);
3076
3077	/* wait for IO to complete if it's still being used. */
3078	if (wait_layout) {
3079		CDEBUG(D_INODE, "%s: %p/"DFID" wait for layout reconf.\n",
3080			ll_get_fsname(inode->i_sb, NULL, 0),
3081			inode, PFID(&lli->lli_fid));
3082
3083		memset(&conf, 0, sizeof conf);
3084		conf.coc_opc = OBJECT_CONF_WAIT;
3085		conf.coc_inode = inode;
3086		rc = ll_layout_conf(inode, &conf);
3087		if (rc == 0)
3088			rc = -EAGAIN;
3089
3090		CDEBUG(D_INODE, "file: "DFID" waiting layout return: %d.\n",
3091			PFID(&lli->lli_fid), rc);
3092	}
3093	RETURN(rc);
3094}
3095
3096/**
3097 * This function checks if there exists a LAYOUT lock on the client side,
3098 * or enqueues it if it doesn't have one in cache.
3099 *
3100 * This function will not hold layout lock so it may be revoked any time after
3101 * this function returns. Any operations depend on layout should be redone
3102 * in that case.
3103 *
3104 * This function should be called before lov_io_init() to get an uptodate
3105 * layout version, the caller should save the version number and after IO
3106 * is finished, this function should be called again to verify that layout
3107 * is not changed during IO time.
3108 */
3109int ll_layout_refresh(struct inode *inode, __u32 *gen)
3110{
3111	struct ll_inode_info  *lli = ll_i2info(inode);
3112	struct ll_sb_info     *sbi = ll_i2sbi(inode);
3113	struct md_op_data     *op_data;
3114	struct lookup_intent   it;
3115	struct lustre_handle   lockh;
3116	ldlm_mode_t	       mode;
3117	struct ldlm_enqueue_info einfo = { .ei_type = LDLM_IBITS,
3118					   .ei_mode = LCK_CR,
3119					   .ei_cb_bl = ll_md_blocking_ast,
3120					   .ei_cb_cp = ldlm_completion_ast,
3121					   .ei_cbdata = NULL };
3122	int rc;
3123	ENTRY;
3124
3125	*gen = lli->lli_layout_gen;
3126	if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK))
3127		RETURN(0);
3128
3129	/* sanity checks */
3130	LASSERT(fid_is_sane(ll_inode2fid(inode)));
3131	LASSERT(S_ISREG(inode->i_mode));
3132
3133	/* mostly layout lock is caching on the local side, so try to match
3134	 * it before grabbing layout lock mutex. */
3135	mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0);
3136	if (mode != 0) { /* hit cached lock */
3137		rc = ll_layout_lock_set(&lockh, mode, inode, gen, false);
3138		if (rc == 0)
3139			RETURN(0);
3140
3141		/* better hold lli_layout_mutex to try again otherwise
3142		 * it will have starvation problem. */
3143	}
3144
3145	/* take layout lock mutex to enqueue layout lock exclusively. */
3146	mutex_lock(&lli->lli_layout_mutex);
3147
3148again:
3149	/* try again. Maybe somebody else has done this. */
3150	mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0);
3151	if (mode != 0) { /* hit cached lock */
3152		rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
3153		if (rc == -EAGAIN)
3154			goto again;
3155
3156		mutex_unlock(&lli->lli_layout_mutex);
3157		RETURN(rc);
3158	}
3159
3160	op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
3161			0, 0, LUSTRE_OPC_ANY, NULL);
3162	if (IS_ERR(op_data)) {
3163		mutex_unlock(&lli->lli_layout_mutex);
3164		RETURN(PTR_ERR(op_data));
3165	}
3166
3167	/* have to enqueue one */
3168	memset(&it, 0, sizeof(it));
3169	it.it_op = IT_LAYOUT;
3170	lockh.cookie = 0ULL;
3171
3172	LDLM_DEBUG_NOLOCK("%s: requeue layout lock for file %p/"DFID".\n",
3173			ll_get_fsname(inode->i_sb, NULL, 0), inode,
3174			PFID(&lli->lli_fid));
3175
3176	rc = md_enqueue(sbi->ll_md_exp, &einfo, &it, op_data, &lockh,
3177			NULL, 0, NULL, 0);
3178	if (it.d.lustre.it_data != NULL)
3179		ptlrpc_req_finished(it.d.lustre.it_data);
3180	it.d.lustre.it_data = NULL;
3181
3182	ll_finish_md_op_data(op_data);
3183
3184	mode = it.d.lustre.it_lock_mode;
3185	it.d.lustre.it_lock_mode = 0;
3186	ll_intent_drop_lock(&it);
3187
3188	if (rc == 0) {
3189		/* set lock data in case this is a new lock */
3190		ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
3191		rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
3192		if (rc == -EAGAIN)
3193			goto again;
3194	}
3195	mutex_unlock(&lli->lli_layout_mutex);
3196
3197	RETURN(rc);
3198}
3199