[go: nahoru, domu]

file.c revision 0a3bdb00710bf253ba8ba8f645645f22297c7a04
1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 *
36 * lustre/llite/file.c
37 *
38 * Author: Peter Braam <braam@clusterfs.com>
39 * Author: Phil Schwan <phil@clusterfs.com>
40 * Author: Andreas Dilger <adilger@clusterfs.com>
41 */
42
43#define DEBUG_SUBSYSTEM S_LLITE
44#include <lustre_dlm.h>
45#include <lustre_lite.h>
46#include <linux/pagemap.h>
47#include <linux/file.h>
48#include "llite_internal.h"
49#include <lustre/ll_fiemap.h>
50
51#include "cl_object.h"
52
53struct ll_file_data *ll_file_data_get(void)
54{
55	struct ll_file_data *fd;
56
57	OBD_SLAB_ALLOC_PTR_GFP(fd, ll_file_data_slab, __GFP_IO);
58	if (fd == NULL)
59		return NULL;
60	fd->fd_write_failed = false;
61	return fd;
62}
63
64static void ll_file_data_put(struct ll_file_data *fd)
65{
66	if (fd != NULL)
67		OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
68}
69
70void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
71			  struct lustre_handle *fh)
72{
73	op_data->op_fid1 = ll_i2info(inode)->lli_fid;
74	op_data->op_attr.ia_mode = inode->i_mode;
75	op_data->op_attr.ia_atime = inode->i_atime;
76	op_data->op_attr.ia_mtime = inode->i_mtime;
77	op_data->op_attr.ia_ctime = inode->i_ctime;
78	op_data->op_attr.ia_size = i_size_read(inode);
79	op_data->op_attr_blocks = inode->i_blocks;
80	((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags =
81					ll_inode_to_ext_flags(inode->i_flags);
82	op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
83	if (fh)
84		op_data->op_handle = *fh;
85	op_data->op_capa1 = ll_mdscapa_get(inode);
86
87	if (LLIF_DATA_MODIFIED & ll_i2info(inode)->lli_flags)
88		op_data->op_bias |= MDS_DATA_MODIFIED;
89}
90
91/**
92 * Closes the IO epoch and packs all the attributes into @op_data for
93 * the CLOSE rpc.
94 */
95static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
96			     struct obd_client_handle *och)
97{
98	op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME | ATTR_ATIME_SET |
99					ATTR_MTIME | ATTR_MTIME_SET |
100					ATTR_CTIME | ATTR_CTIME_SET;
101
102	if (!(och->och_flags & FMODE_WRITE))
103		goto out;
104
105	if (!exp_connect_som(ll_i2mdexp(inode)) || !S_ISREG(inode->i_mode))
106		op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
107	else
108		ll_ioepoch_close(inode, op_data, &och, 0);
109
110out:
111	ll_pack_inode2opdata(inode, op_data, &och->och_fh);
112	ll_prep_md_op_data(op_data, inode, NULL, NULL,
113			   0, 0, LUSTRE_OPC_ANY, NULL);
114}
115
116static int ll_close_inode_openhandle(struct obd_export *md_exp,
117				     struct inode *inode,
118				     struct obd_client_handle *och)
119{
120	struct obd_export *exp = ll_i2mdexp(inode);
121	struct md_op_data *op_data;
122	struct ptlrpc_request *req = NULL;
123	struct obd_device *obd = class_exp2obd(exp);
124	int epoch_close = 1;
125	int rc;
126
127	if (obd == NULL) {
128		/*
129		 * XXX: in case of LMV, is this correct to access
130		 * ->exp_handle?
131		 */
132		CERROR("Invalid MDC connection handle "LPX64"\n",
133		       ll_i2mdexp(inode)->exp_handle.h_cookie);
134		GOTO(out, rc = 0);
135	}
136
137	OBD_ALLOC_PTR(op_data);
138	if (op_data == NULL)
139		GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
140
141	ll_prepare_close(inode, op_data, och);
142	epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
143	rc = md_close(md_exp, op_data, och->och_mod, &req);
144	if (rc == -EAGAIN) {
145		/* This close must have the epoch closed. */
146		LASSERT(epoch_close);
147		/* MDS has instructed us to obtain Size-on-MDS attribute from
148		 * OSTs and send setattr to back to MDS. */
149		rc = ll_som_update(inode, op_data);
150		if (rc) {
151			CERROR("inode %lu mdc Size-on-MDS update failed: "
152			       "rc = %d\n", inode->i_ino, rc);
153			rc = 0;
154		}
155	} else if (rc) {
156		CERROR("inode %lu mdc close failed: rc = %d\n",
157		       inode->i_ino, rc);
158	}
159
160	/* DATA_MODIFIED flag was successfully sent on close, cancel data
161	 * modification flag. */
162	if (rc == 0 && (op_data->op_bias & MDS_DATA_MODIFIED)) {
163		struct ll_inode_info *lli = ll_i2info(inode);
164
165		spin_lock(&lli->lli_lock);
166		lli->lli_flags &= ~LLIF_DATA_MODIFIED;
167		spin_unlock(&lli->lli_lock);
168	}
169
170	ll_finish_md_op_data(op_data);
171
172	if (rc == 0) {
173		rc = ll_objects_destroy(req, inode);
174		if (rc)
175			CERROR("inode %lu ll_objects destroy: rc = %d\n",
176			       inode->i_ino, rc);
177	}
178
179out:
180	if (exp_connect_som(exp) && !epoch_close &&
181	    S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
182		ll_queue_done_writing(inode, LLIF_DONE_WRITING);
183	} else {
184		md_clear_open_replay_data(md_exp, och);
185		/* Free @och if it is not waiting for DONE_WRITING. */
186		och->och_fh.cookie = DEAD_HANDLE_MAGIC;
187		OBD_FREE_PTR(och);
188	}
189	if (req) /* This is close request */
190		ptlrpc_req_finished(req);
191	return rc;
192}
193
194int ll_md_real_close(struct inode *inode, int flags)
195{
196	struct ll_inode_info *lli = ll_i2info(inode);
197	struct obd_client_handle **och_p;
198	struct obd_client_handle *och;
199	__u64 *och_usecount;
200	int rc = 0;
201
202	if (flags & FMODE_WRITE) {
203		och_p = &lli->lli_mds_write_och;
204		och_usecount = &lli->lli_open_fd_write_count;
205	} else if (flags & FMODE_EXEC) {
206		och_p = &lli->lli_mds_exec_och;
207		och_usecount = &lli->lli_open_fd_exec_count;
208	} else {
209		LASSERT(flags & FMODE_READ);
210		och_p = &lli->lli_mds_read_och;
211		och_usecount = &lli->lli_open_fd_read_count;
212	}
213
214	mutex_lock(&lli->lli_och_mutex);
215	if (*och_usecount) { /* There are still users of this handle, so
216				skip freeing it. */
217		mutex_unlock(&lli->lli_och_mutex);
218		return 0;
219	}
220	och=*och_p;
221	*och_p = NULL;
222	mutex_unlock(&lli->lli_och_mutex);
223
224	if (och) { /* There might be a race and somebody have freed this och
225		      already */
226		rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
227					       inode, och);
228	}
229
230	return rc;
231}
232
233int ll_md_close(struct obd_export *md_exp, struct inode *inode,
234		struct file *file)
235{
236	struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
237	struct ll_inode_info *lli = ll_i2info(inode);
238	int rc = 0;
239
240	/* clear group lock, if present */
241	if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED))
242		ll_put_grouplock(inode, file, fd->fd_grouplock.cg_gid);
243
244	/* Let's see if we have good enough OPEN lock on the file and if
245	   we can skip talking to MDS */
246	if (file->f_dentry->d_inode) { /* Can this ever be false? */
247		int lockmode;
248		int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
249		struct lustre_handle lockh;
250		struct inode *inode = file->f_dentry->d_inode;
251		ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
252
253		mutex_lock(&lli->lli_och_mutex);
254		if (fd->fd_omode & FMODE_WRITE) {
255			lockmode = LCK_CW;
256			LASSERT(lli->lli_open_fd_write_count);
257			lli->lli_open_fd_write_count--;
258		} else if (fd->fd_omode & FMODE_EXEC) {
259			lockmode = LCK_PR;
260			LASSERT(lli->lli_open_fd_exec_count);
261			lli->lli_open_fd_exec_count--;
262		} else {
263			lockmode = LCK_CR;
264			LASSERT(lli->lli_open_fd_read_count);
265			lli->lli_open_fd_read_count--;
266		}
267		mutex_unlock(&lli->lli_och_mutex);
268
269		if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
270				   LDLM_IBITS, &policy, lockmode,
271				   &lockh)) {
272			rc = ll_md_real_close(file->f_dentry->d_inode,
273					      fd->fd_omode);
274		}
275	} else {
276		CERROR("Releasing a file %p with negative dentry %p. Name %s",
277		       file, file->f_dentry, file->f_dentry->d_name.name);
278	}
279
280	LUSTRE_FPRIVATE(file) = NULL;
281	ll_file_data_put(fd);
282	ll_capa_close(inode);
283
284	return rc;
285}
286
287/* While this returns an error code, fput() the caller does not, so we need
288 * to make every effort to clean up all of our state here.  Also, applications
289 * rarely check close errors and even if an error is returned they will not
290 * re-try the close call.
291 */
292int ll_file_release(struct inode *inode, struct file *file)
293{
294	struct ll_file_data *fd;
295	struct ll_sb_info *sbi = ll_i2sbi(inode);
296	struct ll_inode_info *lli = ll_i2info(inode);
297	int rc;
298
299	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
300	       inode->i_generation, inode);
301
302#ifdef CONFIG_FS_POSIX_ACL
303	if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
304	    inode == inode->i_sb->s_root->d_inode) {
305		struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
306
307		LASSERT(fd != NULL);
308		if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
309			fd->fd_flags &= ~LL_FILE_RMTACL;
310			rct_del(&sbi->ll_rct, current_pid());
311			et_search_free(&sbi->ll_et, current_pid());
312		}
313	}
314#endif
315
316	if (inode->i_sb->s_root != file->f_dentry)
317		ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
318	fd = LUSTRE_FPRIVATE(file);
319	LASSERT(fd != NULL);
320
321	/* The last ref on @file, maybe not the the owner pid of statahead.
322	 * Different processes can open the same dir, "ll_opendir_key" means:
323	 * it is me that should stop the statahead thread. */
324	if (S_ISDIR(inode->i_mode) && lli->lli_opendir_key == fd &&
325	    lli->lli_opendir_pid != 0)
326		ll_stop_statahead(inode, lli->lli_opendir_key);
327
328	if (inode->i_sb->s_root == file->f_dentry) {
329		LUSTRE_FPRIVATE(file) = NULL;
330		ll_file_data_put(fd);
331		return 0;
332	}
333
334	if (!S_ISDIR(inode->i_mode)) {
335		lov_read_and_clear_async_rc(lli->lli_clob);
336		lli->lli_async_rc = 0;
337	}
338
339	rc = ll_md_close(sbi->ll_md_exp, inode, file);
340
341	if (CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_DUMP_LOG, cfs_fail_val))
342		libcfs_debug_dumplog();
343
344	return rc;
345}
346
347static int ll_intent_file_open(struct file *file, void *lmm,
348			       int lmmsize, struct lookup_intent *itp)
349{
350	struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
351	struct dentry *parent = file->f_dentry->d_parent;
352	const char *name = file->f_dentry->d_name.name;
353	const int len = file->f_dentry->d_name.len;
354	struct md_op_data *op_data;
355	struct ptlrpc_request *req;
356	__u32 opc = LUSTRE_OPC_ANY;
357	int rc;
358
359	if (!parent)
360		return -ENOENT;
361
362	/* Usually we come here only for NFSD, and we want open lock.
363	   But we can also get here with pre 2.6.15 patchless kernels, and in
364	   that case that lock is also ok */
365	/* We can also get here if there was cached open handle in revalidate_it
366	 * but it disappeared while we were getting from there to ll_file_open.
367	 * But this means this file was closed and immediatelly opened which
368	 * makes a good candidate for using OPEN lock */
369	/* If lmmsize & lmm are not 0, we are just setting stripe info
370	 * parameters. No need for the open lock */
371	if (lmm == NULL && lmmsize == 0) {
372		itp->it_flags |= MDS_OPEN_LOCK;
373		if (itp->it_flags & FMODE_WRITE)
374			opc = LUSTRE_OPC_CREATE;
375	}
376
377	op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
378				      file->f_dentry->d_inode, name, len,
379				      O_RDWR, opc, NULL);
380	if (IS_ERR(op_data))
381		return PTR_ERR(op_data);
382
383	itp->it_flags |= MDS_OPEN_BY_FID;
384	rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
385			    0 /*unused */, &req, ll_md_blocking_ast, 0);
386	ll_finish_md_op_data(op_data);
387	if (rc == -ESTALE) {
388		/* reason for keep own exit path - don`t flood log
389		* with messages with -ESTALE errors.
390		*/
391		if (!it_disposition(itp, DISP_OPEN_OPEN) ||
392		     it_open_error(DISP_OPEN_OPEN, itp))
393			GOTO(out, rc);
394		ll_release_openhandle(file->f_dentry, itp);
395		GOTO(out, rc);
396	}
397
398	if (it_disposition(itp, DISP_LOOKUP_NEG))
399		GOTO(out, rc = -ENOENT);
400
401	if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
402		rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
403		CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
404		GOTO(out, rc);
405	}
406
407	rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL, itp);
408	if (!rc && itp->d.lustre.it_lock_mode)
409		ll_set_lock_data(sbi->ll_md_exp, file->f_dentry->d_inode,
410				 itp, NULL);
411
412out:
413	ptlrpc_req_finished(itp->d.lustre.it_data);
414	it_clear_disposition(itp, DISP_ENQ_COMPLETE);
415	ll_intent_drop_lock(itp);
416
417	return rc;
418}
419
420/**
421 * Assign an obtained @ioepoch to client's inode. No lock is needed, MDS does
422 * not believe attributes if a few ioepoch holders exist. Attributes for
423 * previous ioepoch if new one is opened are also skipped by MDS.
424 */
425void ll_ioepoch_open(struct ll_inode_info *lli, __u64 ioepoch)
426{
427	if (ioepoch && lli->lli_ioepoch != ioepoch) {
428		lli->lli_ioepoch = ioepoch;
429		CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
430		       ioepoch, PFID(&lli->lli_fid));
431	}
432}
433
434static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
435		       struct lookup_intent *it, struct obd_client_handle *och)
436{
437	struct ptlrpc_request *req = it->d.lustre.it_data;
438	struct mdt_body *body;
439
440	LASSERT(och);
441
442	body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
443	LASSERT(body != NULL);		      /* reply already checked out */
444
445	memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
446	och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
447	och->och_fid = lli->lli_fid;
448	och->och_flags = it->it_flags;
449	ll_ioepoch_open(lli, body->ioepoch);
450
451	return md_set_open_replay_data(md_exp, och, req);
452}
453
454int ll_local_open(struct file *file, struct lookup_intent *it,
455		  struct ll_file_data *fd, struct obd_client_handle *och)
456{
457	struct inode *inode = file->f_dentry->d_inode;
458	struct ll_inode_info *lli = ll_i2info(inode);
459
460	LASSERT(!LUSTRE_FPRIVATE(file));
461
462	LASSERT(fd != NULL);
463
464	if (och) {
465		struct ptlrpc_request *req = it->d.lustre.it_data;
466		struct mdt_body *body;
467		int rc;
468
469		rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
470		if (rc)
471			return rc;
472
473		body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
474		if ((it->it_flags & FMODE_WRITE) &&
475		    (body->valid & OBD_MD_FLSIZE))
476			CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
477			       lli->lli_ioepoch, PFID(&lli->lli_fid));
478	}
479
480	LUSTRE_FPRIVATE(file) = fd;
481	ll_readahead_init(inode, &fd->fd_ras);
482	fd->fd_omode = it->it_flags;
483	return 0;
484}
485
486/* Open a file, and (for the very first open) create objects on the OSTs at
487 * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
488 * creation or open until ll_lov_setstripe() ioctl is called.
489 *
490 * If we already have the stripe MD locally then we don't request it in
491 * md_open(), by passing a lmm_size = 0.
492 *
493 * It is up to the application to ensure no other processes open this file
494 * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
495 * used.  We might be able to avoid races of that sort by getting lli_open_sem
496 * before returning in the O_LOV_DELAY_CREATE case and dropping it here
497 * or in ll_file_release(), but I'm not sure that is desirable/necessary.
498 */
499int ll_file_open(struct inode *inode, struct file *file)
500{
501	struct ll_inode_info *lli = ll_i2info(inode);
502	struct lookup_intent *it, oit = { .it_op = IT_OPEN,
503					  .it_flags = file->f_flags };
504	struct obd_client_handle **och_p = NULL;
505	__u64 *och_usecount = NULL;
506	struct ll_file_data *fd;
507	int rc = 0, opendir_set = 0;
508
509	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
510	       inode->i_generation, inode, file->f_flags);
511
512	it = file->private_data; /* XXX: compat macro */
513	file->private_data = NULL; /* prevent ll_local_open assertion */
514
515	fd = ll_file_data_get();
516	if (fd == NULL)
517		GOTO(out_openerr, rc = -ENOMEM);
518
519	fd->fd_file = file;
520	if (S_ISDIR(inode->i_mode)) {
521		spin_lock(&lli->lli_sa_lock);
522		if (lli->lli_opendir_key == NULL && lli->lli_sai == NULL &&
523		    lli->lli_opendir_pid == 0) {
524			lli->lli_opendir_key = fd;
525			lli->lli_opendir_pid = current_pid();
526			opendir_set = 1;
527		}
528		spin_unlock(&lli->lli_sa_lock);
529	}
530
531	if (inode->i_sb->s_root == file->f_dentry) {
532		LUSTRE_FPRIVATE(file) = fd;
533		return 0;
534	}
535
536	if (!it || !it->d.lustre.it_disposition) {
537		/* Convert f_flags into access mode. We cannot use file->f_mode,
538		 * because everything but O_ACCMODE mask was stripped from
539		 * there */
540		if ((oit.it_flags + 1) & O_ACCMODE)
541			oit.it_flags++;
542		if (file->f_flags & O_TRUNC)
543			oit.it_flags |= FMODE_WRITE;
544
545		/* kernel only call f_op->open in dentry_open.  filp_open calls
546		 * dentry_open after call to open_namei that checks permissions.
547		 * Only nfsd_open call dentry_open directly without checking
548		 * permissions and because of that this code below is safe. */
549		if (oit.it_flags & (FMODE_WRITE | FMODE_READ))
550			oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
551
552		/* We do not want O_EXCL here, presumably we opened the file
553		 * already? XXX - NFS implications? */
554		oit.it_flags &= ~O_EXCL;
555
556		/* bug20584, if "it_flags" contains O_CREAT, the file will be
557		 * created if necessary, then "IT_CREAT" should be set to keep
558		 * consistent with it */
559		if (oit.it_flags & O_CREAT)
560			oit.it_op |= IT_CREAT;
561
562		it = &oit;
563	}
564
565restart:
566	/* Let's see if we have file open on MDS already. */
567	if (it->it_flags & FMODE_WRITE) {
568		och_p = &lli->lli_mds_write_och;
569		och_usecount = &lli->lli_open_fd_write_count;
570	} else if (it->it_flags & FMODE_EXEC) {
571		och_p = &lli->lli_mds_exec_och;
572		och_usecount = &lli->lli_open_fd_exec_count;
573	 } else {
574		och_p = &lli->lli_mds_read_och;
575		och_usecount = &lli->lli_open_fd_read_count;
576	}
577
578	mutex_lock(&lli->lli_och_mutex);
579	if (*och_p) { /* Open handle is present */
580		if (it_disposition(it, DISP_OPEN_OPEN)) {
581			/* Well, there's extra open request that we do not need,
582			   let's close it somehow. This will decref request. */
583			rc = it_open_error(DISP_OPEN_OPEN, it);
584			if (rc) {
585				mutex_unlock(&lli->lli_och_mutex);
586				GOTO(out_openerr, rc);
587			}
588
589			ll_release_openhandle(file->f_dentry, it);
590		}
591		(*och_usecount)++;
592
593		rc = ll_local_open(file, it, fd, NULL);
594		if (rc) {
595			(*och_usecount)--;
596			mutex_unlock(&lli->lli_och_mutex);
597			GOTO(out_openerr, rc);
598		}
599	} else {
600		LASSERT(*och_usecount == 0);
601		if (!it->d.lustre.it_disposition) {
602			/* We cannot just request lock handle now, new ELC code
603			   means that one of other OPEN locks for this file
604			   could be cancelled, and since blocking ast handler
605			   would attempt to grab och_mutex as well, that would
606			   result in a deadlock */
607			mutex_unlock(&lli->lli_och_mutex);
608			it->it_create_mode |= M_CHECK_STALE;
609			rc = ll_intent_file_open(file, NULL, 0, it);
610			it->it_create_mode &= ~M_CHECK_STALE;
611			if (rc)
612				GOTO(out_openerr, rc);
613
614			goto restart;
615		}
616		OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
617		if (!*och_p)
618			GOTO(out_och_free, rc = -ENOMEM);
619
620		(*och_usecount)++;
621
622		/* md_intent_lock() didn't get a request ref if there was an
623		 * open error, so don't do cleanup on the request here
624		 * (bug 3430) */
625		/* XXX (green): Should not we bail out on any error here, not
626		 * just open error? */
627		rc = it_open_error(DISP_OPEN_OPEN, it);
628		if (rc)
629			GOTO(out_och_free, rc);
630
631		LASSERT(it_disposition(it, DISP_ENQ_OPEN_REF));
632
633		rc = ll_local_open(file, it, fd, *och_p);
634		if (rc)
635			GOTO(out_och_free, rc);
636	}
637	mutex_unlock(&lli->lli_och_mutex);
638	fd = NULL;
639
640	/* Must do this outside lli_och_mutex lock to prevent deadlock where
641	   different kind of OPEN lock for this same inode gets cancelled
642	   by ldlm_cancel_lru */
643	if (!S_ISREG(inode->i_mode))
644		GOTO(out_och_free, rc);
645
646	ll_capa_open(inode);
647
648	if (!lli->lli_has_smd) {
649		if (file->f_flags & O_LOV_DELAY_CREATE ||
650		    !(file->f_mode & FMODE_WRITE)) {
651			CDEBUG(D_INODE, "object creation was delayed\n");
652			GOTO(out_och_free, rc);
653		}
654	}
655	file->f_flags &= ~O_LOV_DELAY_CREATE;
656	GOTO(out_och_free, rc);
657
658out_och_free:
659	if (rc) {
660		if (och_p && *och_p) {
661			OBD_FREE(*och_p, sizeof (struct obd_client_handle));
662			*och_p = NULL; /* OBD_FREE writes some magic there */
663			(*och_usecount)--;
664		}
665		mutex_unlock(&lli->lli_och_mutex);
666
667out_openerr:
668		if (opendir_set != 0)
669			ll_stop_statahead(inode, lli->lli_opendir_key);
670		if (fd != NULL)
671			ll_file_data_put(fd);
672	} else {
673		ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
674	}
675
676	if (it && it_disposition(it, DISP_ENQ_OPEN_REF)) {
677		ptlrpc_req_finished(it->d.lustre.it_data);
678		it_clear_disposition(it, DISP_ENQ_OPEN_REF);
679	}
680
681	return rc;
682}
683
684/* Fills the obdo with the attributes for the lsm */
685static int ll_lsm_getattr(struct lov_stripe_md *lsm, struct obd_export *exp,
686			  struct obd_capa *capa, struct obdo *obdo,
687			  __u64 ioepoch, int sync)
688{
689	struct ptlrpc_request_set *set;
690	struct obd_info	    oinfo = { { { 0 } } };
691	int			rc;
692
693	LASSERT(lsm != NULL);
694
695	oinfo.oi_md = lsm;
696	oinfo.oi_oa = obdo;
697	oinfo.oi_oa->o_oi = lsm->lsm_oi;
698	oinfo.oi_oa->o_mode = S_IFREG;
699	oinfo.oi_oa->o_ioepoch = ioepoch;
700	oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
701			       OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
702			       OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
703			       OBD_MD_FLMTIME | OBD_MD_FLCTIME |
704			       OBD_MD_FLGROUP | OBD_MD_FLEPOCH |
705			       OBD_MD_FLDATAVERSION;
706	oinfo.oi_capa = capa;
707	if (sync) {
708		oinfo.oi_oa->o_valid |= OBD_MD_FLFLAGS;
709		oinfo.oi_oa->o_flags |= OBD_FL_SRVLOCK;
710	}
711
712	set = ptlrpc_prep_set();
713	if (set == NULL) {
714		CERROR("can't allocate ptlrpc set\n");
715		rc = -ENOMEM;
716	} else {
717		rc = obd_getattr_async(exp, &oinfo, set);
718		if (rc == 0)
719			rc = ptlrpc_set_wait(set);
720		ptlrpc_set_destroy(set);
721	}
722	if (rc == 0)
723		oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
724					 OBD_MD_FLATIME | OBD_MD_FLMTIME |
725					 OBD_MD_FLCTIME | OBD_MD_FLSIZE |
726					 OBD_MD_FLDATAVERSION);
727	return rc;
728}
729
730/**
731  * Performs the getattr on the inode and updates its fields.
732  * If @sync != 0, perform the getattr under the server-side lock.
733  */
734int ll_inode_getattr(struct inode *inode, struct obdo *obdo,
735		     __u64 ioepoch, int sync)
736{
737	struct obd_capa      *capa = ll_mdscapa_get(inode);
738	struct lov_stripe_md *lsm;
739	int rc;
740
741	lsm = ccc_inode_lsm_get(inode);
742	rc = ll_lsm_getattr(lsm, ll_i2dtexp(inode),
743			    capa, obdo, ioepoch, sync);
744	capa_put(capa);
745	if (rc == 0) {
746		struct ost_id *oi = lsm ? &lsm->lsm_oi : &obdo->o_oi;
747
748		obdo_refresh_inode(inode, obdo, obdo->o_valid);
749		CDEBUG(D_INODE, "objid "DOSTID" size %llu, blocks %llu,"
750		       " blksize %lu\n", POSTID(oi), i_size_read(inode),
751		       (unsigned long long)inode->i_blocks,
752		       (unsigned long)ll_inode_blksize(inode));
753	}
754	ccc_inode_lsm_put(inode, lsm);
755	return rc;
756}
757
758int ll_merge_lvb(const struct lu_env *env, struct inode *inode)
759{
760	struct ll_inode_info *lli = ll_i2info(inode);
761	struct cl_object *obj = lli->lli_clob;
762	struct cl_attr *attr = ccc_env_thread_attr(env);
763	struct ost_lvb lvb;
764	int rc = 0;
765
766	ll_inode_size_lock(inode);
767	/* merge timestamps the most recently obtained from mds with
768	   timestamps obtained from osts */
769	LTIME_S(inode->i_atime) = lli->lli_lvb.lvb_atime;
770	LTIME_S(inode->i_mtime) = lli->lli_lvb.lvb_mtime;
771	LTIME_S(inode->i_ctime) = lli->lli_lvb.lvb_ctime;
772	inode_init_lvb(inode, &lvb);
773
774	cl_object_attr_lock(obj);
775	rc = cl_object_attr_get(env, obj, attr);
776	cl_object_attr_unlock(obj);
777
778	if (rc == 0) {
779		if (lvb.lvb_atime < attr->cat_atime)
780			lvb.lvb_atime = attr->cat_atime;
781		if (lvb.lvb_ctime < attr->cat_ctime)
782			lvb.lvb_ctime = attr->cat_ctime;
783		if (lvb.lvb_mtime < attr->cat_mtime)
784			lvb.lvb_mtime = attr->cat_mtime;
785
786		CDEBUG(D_VFSTRACE, DFID" updating i_size "LPU64"\n",
787				PFID(&lli->lli_fid), attr->cat_size);
788		cl_isize_write_nolock(inode, attr->cat_size);
789
790		inode->i_blocks = attr->cat_blocks;
791
792		LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
793		LTIME_S(inode->i_atime) = lvb.lvb_atime;
794		LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
795	}
796	ll_inode_size_unlock(inode);
797
798	return rc;
799}
800
801int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
802		     lstat_t *st)
803{
804	struct obdo obdo = { 0 };
805	int rc;
806
807	rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, &obdo, 0, 0);
808	if (rc == 0) {
809		st->st_size   = obdo.o_size;
810		st->st_blocks = obdo.o_blocks;
811		st->st_mtime  = obdo.o_mtime;
812		st->st_atime  = obdo.o_atime;
813		st->st_ctime  = obdo.o_ctime;
814	}
815	return rc;
816}
817
818void ll_io_init(struct cl_io *io, const struct file *file, int write)
819{
820	struct inode *inode = file->f_dentry->d_inode;
821
822	io->u.ci_rw.crw_nonblock = file->f_flags & O_NONBLOCK;
823	if (write) {
824		io->u.ci_wr.wr_append = !!(file->f_flags & O_APPEND);
825		io->u.ci_wr.wr_sync = file->f_flags & O_SYNC ||
826				      file->f_flags & O_DIRECT ||
827				      IS_SYNC(inode);
828	}
829	io->ci_obj     = ll_i2info(inode)->lli_clob;
830	io->ci_lockreq = CILR_MAYBE;
831	if (ll_file_nolock(file)) {
832		io->ci_lockreq = CILR_NEVER;
833		io->ci_no_srvlock = 1;
834	} else if (file->f_flags & O_APPEND) {
835		io->ci_lockreq = CILR_MANDATORY;
836	}
837}
838
839static ssize_t
840ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
841		   struct file *file, enum cl_io_type iot,
842		   loff_t *ppos, size_t count)
843{
844	struct ll_inode_info *lli = ll_i2info(file->f_dentry->d_inode);
845	struct ll_file_data  *fd  = LUSTRE_FPRIVATE(file);
846	struct cl_io	 *io;
847	ssize_t	       result;
848
849restart:
850	io = ccc_env_thread_io(env);
851	ll_io_init(io, file, iot == CIT_WRITE);
852
853	if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) {
854		struct vvp_io *vio = vvp_env_io(env);
855		struct ccc_io *cio = ccc_env_io(env);
856		int write_mutex_locked = 0;
857
858		cio->cui_fd  = LUSTRE_FPRIVATE(file);
859		vio->cui_io_subtype = args->via_io_subtype;
860
861		switch (vio->cui_io_subtype) {
862		case IO_NORMAL:
863			cio->cui_iov = args->u.normal.via_iov;
864			cio->cui_nrsegs = args->u.normal.via_nrsegs;
865			cio->cui_tot_nrsegs = cio->cui_nrsegs;
866			cio->cui_iocb = args->u.normal.via_iocb;
867			if ((iot == CIT_WRITE) &&
868			    !(cio->cui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
869				if (mutex_lock_interruptible(&lli->
870							       lli_write_mutex))
871					GOTO(out, result = -ERESTARTSYS);
872				write_mutex_locked = 1;
873			} else if (iot == CIT_READ) {
874				down_read(&lli->lli_trunc_sem);
875			}
876			break;
877		case IO_SENDFILE:
878			vio->u.sendfile.cui_actor = args->u.sendfile.via_actor;
879			vio->u.sendfile.cui_target = args->u.sendfile.via_target;
880			break;
881		case IO_SPLICE:
882			vio->u.splice.cui_pipe = args->u.splice.via_pipe;
883			vio->u.splice.cui_flags = args->u.splice.via_flags;
884			break;
885		default:
886			CERROR("Unknow IO type - %u\n", vio->cui_io_subtype);
887			LBUG();
888		}
889		result = cl_io_loop(env, io);
890		if (write_mutex_locked)
891			mutex_unlock(&lli->lli_write_mutex);
892		else if (args->via_io_subtype == IO_NORMAL && iot == CIT_READ)
893			up_read(&lli->lli_trunc_sem);
894	} else {
895		/* cl_io_rw_init() handled IO */
896		result = io->ci_result;
897	}
898
899	if (io->ci_nob > 0) {
900		result = io->ci_nob;
901		*ppos = io->u.ci_wr.wr.crw_pos;
902	}
903	GOTO(out, result);
904out:
905	cl_io_fini(env, io);
906	/* If any bit been read/written (result != 0), we just return
907	 * short read/write instead of restart io. */
908	if (result == 0 && io->ci_need_restart) {
909		CDEBUG(D_VFSTRACE, "Restart %s on %s from %lld, count:%zd\n",
910		       iot == CIT_READ ? "read" : "write",
911		       file->f_dentry->d_name.name, *ppos, count);
912		LASSERTF(io->ci_nob == 0, "%zd", io->ci_nob);
913		goto restart;
914	}
915
916	if (iot == CIT_READ) {
917		if (result >= 0)
918			ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
919					   LPROC_LL_READ_BYTES, result);
920	} else if (iot == CIT_WRITE) {
921		if (result >= 0) {
922			ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
923					   LPROC_LL_WRITE_BYTES, result);
924			fd->fd_write_failed = false;
925		} else if (result != -ERESTARTSYS) {
926			fd->fd_write_failed = true;
927		}
928	}
929
930	return result;
931}
932
933
934/*
935 * XXX: exact copy from kernel code (__generic_file_aio_write_nolock)
936 */
937static int ll_file_get_iov_count(const struct iovec *iov,
938				 unsigned long *nr_segs, size_t *count)
939{
940	size_t cnt = 0;
941	unsigned long seg;
942
943	for (seg = 0; seg < *nr_segs; seg++) {
944		const struct iovec *iv = &iov[seg];
945
946		/*
947		 * If any segment has a negative length, or the cumulative
948		 * length ever wraps negative then return -EINVAL.
949		 */
950		cnt += iv->iov_len;
951		if (unlikely((ssize_t)(cnt|iv->iov_len) < 0))
952			return -EINVAL;
953		if (access_ok(VERIFY_READ, iv->iov_base, iv->iov_len))
954			continue;
955		if (seg == 0)
956			return -EFAULT;
957		*nr_segs = seg;
958		cnt -= iv->iov_len;   /* This segment is no good */
959		break;
960	}
961	*count = cnt;
962	return 0;
963}
964
965static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
966				unsigned long nr_segs, loff_t pos)
967{
968	struct lu_env      *env;
969	struct vvp_io_args *args;
970	size_t	      count;
971	ssize_t	     result;
972	int		 refcheck;
973
974	result = ll_file_get_iov_count(iov, &nr_segs, &count);
975	if (result)
976		return result;
977
978	env = cl_env_get(&refcheck);
979	if (IS_ERR(env))
980		return PTR_ERR(env);
981
982	args = vvp_env_args(env, IO_NORMAL);
983	args->u.normal.via_iov = (struct iovec *)iov;
984	args->u.normal.via_nrsegs = nr_segs;
985	args->u.normal.via_iocb = iocb;
986
987	result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ,
988				    &iocb->ki_pos, count);
989	cl_env_put(env, &refcheck);
990	return result;
991}
992
993static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
994			    loff_t *ppos)
995{
996	struct lu_env *env;
997	struct iovec  *local_iov;
998	struct kiocb  *kiocb;
999	ssize_t	result;
1000	int	    refcheck;
1001
1002	env = cl_env_get(&refcheck);
1003	if (IS_ERR(env))
1004		return PTR_ERR(env);
1005
1006	local_iov = &vvp_env_info(env)->vti_local_iov;
1007	kiocb = &vvp_env_info(env)->vti_kiocb;
1008	local_iov->iov_base = (void __user *)buf;
1009	local_iov->iov_len = count;
1010	init_sync_kiocb(kiocb, file);
1011	kiocb->ki_pos = *ppos;
1012	kiocb->ki_left = count;
1013
1014	result = ll_file_aio_read(kiocb, local_iov, 1, kiocb->ki_pos);
1015	*ppos = kiocb->ki_pos;
1016
1017	cl_env_put(env, &refcheck);
1018	return result;
1019}
1020
1021/*
1022 * Write to a file (through the page cache).
1023 */
1024static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
1025				 unsigned long nr_segs, loff_t pos)
1026{
1027	struct lu_env      *env;
1028	struct vvp_io_args *args;
1029	size_t	      count;
1030	ssize_t	     result;
1031	int		 refcheck;
1032
1033	result = ll_file_get_iov_count(iov, &nr_segs, &count);
1034	if (result)
1035		return result;
1036
1037	env = cl_env_get(&refcheck);
1038	if (IS_ERR(env))
1039		return PTR_ERR(env);
1040
1041	args = vvp_env_args(env, IO_NORMAL);
1042	args->u.normal.via_iov = (struct iovec *)iov;
1043	args->u.normal.via_nrsegs = nr_segs;
1044	args->u.normal.via_iocb = iocb;
1045
1046	result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_WRITE,
1047				  &iocb->ki_pos, count);
1048	cl_env_put(env, &refcheck);
1049	return result;
1050}
1051
1052static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1053			     loff_t *ppos)
1054{
1055	struct lu_env *env;
1056	struct iovec  *local_iov;
1057	struct kiocb  *kiocb;
1058	ssize_t	result;
1059	int	    refcheck;
1060
1061	env = cl_env_get(&refcheck);
1062	if (IS_ERR(env))
1063		return PTR_ERR(env);
1064
1065	local_iov = &vvp_env_info(env)->vti_local_iov;
1066	kiocb = &vvp_env_info(env)->vti_kiocb;
1067	local_iov->iov_base = (void __user *)buf;
1068	local_iov->iov_len = count;
1069	init_sync_kiocb(kiocb, file);
1070	kiocb->ki_pos = *ppos;
1071	kiocb->ki_left = count;
1072
1073	result = ll_file_aio_write(kiocb, local_iov, 1, kiocb->ki_pos);
1074	*ppos = kiocb->ki_pos;
1075
1076	cl_env_put(env, &refcheck);
1077	return result;
1078}
1079
1080
1081
1082/*
1083 * Send file content (through pagecache) somewhere with helper
1084 */
1085static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
1086				   struct pipe_inode_info *pipe, size_t count,
1087				   unsigned int flags)
1088{
1089	struct lu_env      *env;
1090	struct vvp_io_args *args;
1091	ssize_t	     result;
1092	int		 refcheck;
1093
1094	env = cl_env_get(&refcheck);
1095	if (IS_ERR(env))
1096		return PTR_ERR(env);
1097
1098	args = vvp_env_args(env, IO_SPLICE);
1099	args->u.splice.via_pipe = pipe;
1100	args->u.splice.via_flags = flags;
1101
1102	result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1103	cl_env_put(env, &refcheck);
1104	return result;
1105}
1106
1107static int ll_lov_recreate(struct inode *inode, struct ost_id *oi,
1108			   obd_count ost_idx)
1109{
1110	struct obd_export *exp = ll_i2dtexp(inode);
1111	struct obd_trans_info oti = { 0 };
1112	struct obdo *oa = NULL;
1113	int lsm_size;
1114	int rc = 0;
1115	struct lov_stripe_md *lsm = NULL, *lsm2;
1116
1117	OBDO_ALLOC(oa);
1118	if (oa == NULL)
1119		return -ENOMEM;
1120
1121	lsm = ccc_inode_lsm_get(inode);
1122	if (!lsm_has_objects(lsm))
1123		GOTO(out, rc = -ENOENT);
1124
1125	lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1126		   (lsm->lsm_stripe_count));
1127
1128	OBD_ALLOC_LARGE(lsm2, lsm_size);
1129	if (lsm2 == NULL)
1130		GOTO(out, rc = -ENOMEM);
1131
1132	oa->o_oi = *oi;
1133	oa->o_nlink = ost_idx;
1134	oa->o_flags |= OBD_FL_RECREATE_OBJS;
1135	oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1136	obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1137				   OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1138	obdo_set_parent_fid(oa, &ll_i2info(inode)->lli_fid);
1139	memcpy(lsm2, lsm, lsm_size);
1140	ll_inode_size_lock(inode);
1141	rc = obd_create(NULL, exp, oa, &lsm2, &oti);
1142	ll_inode_size_unlock(inode);
1143
1144	OBD_FREE_LARGE(lsm2, lsm_size);
1145	GOTO(out, rc);
1146out:
1147	ccc_inode_lsm_put(inode, lsm);
1148	OBDO_FREE(oa);
1149	return rc;
1150}
1151
1152static int ll_lov_recreate_obj(struct inode *inode, unsigned long arg)
1153{
1154	struct ll_recreate_obj ucreat;
1155	struct ost_id		oi;
1156
1157	if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1158		return -EPERM;
1159
1160	if (copy_from_user(&ucreat, (struct ll_recreate_obj *)arg,
1161			   sizeof(ucreat)))
1162		return -EFAULT;
1163
1164	ostid_set_seq_mdt0(&oi);
1165	ostid_set_id(&oi, ucreat.lrc_id);
1166	return ll_lov_recreate(inode, &oi, ucreat.lrc_ost_idx);
1167}
1168
1169static int ll_lov_recreate_fid(struct inode *inode, unsigned long arg)
1170{
1171	struct lu_fid	fid;
1172	struct ost_id	oi;
1173	obd_count	ost_idx;
1174
1175	if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1176		return -EPERM;
1177
1178	if (copy_from_user(&fid, (struct lu_fid *)arg, sizeof(fid)))
1179		return -EFAULT;
1180
1181	fid_to_ostid(&fid, &oi);
1182	ost_idx = (fid_seq(&fid) >> 16) & 0xffff;
1183	return ll_lov_recreate(inode, &oi, ost_idx);
1184}
1185
1186int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1187			     int flags, struct lov_user_md *lum, int lum_size)
1188{
1189	struct lov_stripe_md *lsm = NULL;
1190	struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1191	int rc = 0;
1192
1193	lsm = ccc_inode_lsm_get(inode);
1194	if (lsm != NULL) {
1195		ccc_inode_lsm_put(inode, lsm);
1196		CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1197		       inode->i_ino);
1198		return -EEXIST;
1199	}
1200
1201	ll_inode_size_lock(inode);
1202	rc = ll_intent_file_open(file, lum, lum_size, &oit);
1203	if (rc)
1204		GOTO(out, rc);
1205	rc = oit.d.lustre.it_status;
1206	if (rc < 0)
1207		GOTO(out_req_free, rc);
1208
1209	ll_release_openhandle(file->f_dentry, &oit);
1210
1211 out:
1212	ll_inode_size_unlock(inode);
1213	ll_intent_release(&oit);
1214	ccc_inode_lsm_put(inode, lsm);
1215	return rc;
1216out_req_free:
1217	ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1218	goto out;
1219}
1220
1221int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
1222			     struct lov_mds_md **lmmp, int *lmm_size,
1223			     struct ptlrpc_request **request)
1224{
1225	struct ll_sb_info *sbi = ll_i2sbi(inode);
1226	struct mdt_body  *body;
1227	struct lov_mds_md *lmm = NULL;
1228	struct ptlrpc_request *req = NULL;
1229	struct md_op_data *op_data;
1230	int rc, lmmsize;
1231
1232	rc = ll_get_max_mdsize(sbi, &lmmsize);
1233	if (rc)
1234		return rc;
1235
1236	op_data = ll_prep_md_op_data(NULL, inode, NULL, filename,
1237				     strlen(filename), lmmsize,
1238				     LUSTRE_OPC_ANY, NULL);
1239	if (IS_ERR(op_data))
1240		return PTR_ERR(op_data);
1241
1242	op_data->op_valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA;
1243	rc = md_getattr_name(sbi->ll_md_exp, op_data, &req);
1244	ll_finish_md_op_data(op_data);
1245	if (rc < 0) {
1246		CDEBUG(D_INFO, "md_getattr_name failed "
1247		       "on %s: rc %d\n", filename, rc);
1248		GOTO(out, rc);
1249	}
1250
1251	body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1252	LASSERT(body != NULL); /* checked by mdc_getattr_name */
1253
1254	lmmsize = body->eadatasize;
1255
1256	if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1257			lmmsize == 0) {
1258		GOTO(out, rc = -ENODATA);
1259	}
1260
1261	lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
1262	LASSERT(lmm != NULL);
1263
1264	if ((lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1)) &&
1265	    (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3))) {
1266		GOTO(out, rc = -EPROTO);
1267	}
1268
1269	/*
1270	 * This is coming from the MDS, so is probably in
1271	 * little endian.  We convert it to host endian before
1272	 * passing it to userspace.
1273	 */
1274	if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) {
1275		int stripe_count;
1276
1277		stripe_count = le16_to_cpu(lmm->lmm_stripe_count);
1278		if (le32_to_cpu(lmm->lmm_pattern) & LOV_PATTERN_F_RELEASED)
1279			stripe_count = 0;
1280
1281		/* if function called for directory - we should
1282		 * avoid swab not existent lsm objects */
1283		if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) {
1284			lustre_swab_lov_user_md_v1((struct lov_user_md_v1 *)lmm);
1285			if (S_ISREG(body->mode))
1286				lustre_swab_lov_user_md_objects(
1287				 ((struct lov_user_md_v1 *)lmm)->lmm_objects,
1288				 stripe_count);
1289		} else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) {
1290			lustre_swab_lov_user_md_v3((struct lov_user_md_v3 *)lmm);
1291			if (S_ISREG(body->mode))
1292				lustre_swab_lov_user_md_objects(
1293				 ((struct lov_user_md_v3 *)lmm)->lmm_objects,
1294				 stripe_count);
1295		}
1296	}
1297
1298out:
1299	*lmmp = lmm;
1300	*lmm_size = lmmsize;
1301	*request = req;
1302	return rc;
1303}
1304
1305static int ll_lov_setea(struct inode *inode, struct file *file,
1306			    unsigned long arg)
1307{
1308	int			 flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1309	struct lov_user_md	*lump;
1310	int			 lum_size = sizeof(struct lov_user_md) +
1311					    sizeof(struct lov_user_ost_data);
1312	int			 rc;
1313
1314	if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1315		return -EPERM;
1316
1317	OBD_ALLOC_LARGE(lump, lum_size);
1318	if (lump == NULL)
1319		return -ENOMEM;
1320
1321	if (copy_from_user(lump, (struct lov_user_md  *)arg, lum_size)) {
1322		OBD_FREE_LARGE(lump, lum_size);
1323		return -EFAULT;
1324	}
1325
1326	rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1327
1328	OBD_FREE_LARGE(lump, lum_size);
1329	return rc;
1330}
1331
1332static int ll_lov_setstripe(struct inode *inode, struct file *file,
1333			    unsigned long arg)
1334{
1335	struct lov_user_md_v3	 lumv3;
1336	struct lov_user_md_v1	*lumv1 = (struct lov_user_md_v1 *)&lumv3;
1337	struct lov_user_md_v1	*lumv1p = (struct lov_user_md_v1 *)arg;
1338	struct lov_user_md_v3	*lumv3p = (struct lov_user_md_v3 *)arg;
1339	int			 lum_size, rc;
1340	int			 flags = FMODE_WRITE;
1341
1342	/* first try with v1 which is smaller than v3 */
1343	lum_size = sizeof(struct lov_user_md_v1);
1344	if (copy_from_user(lumv1, lumv1p, lum_size))
1345		return -EFAULT;
1346
1347	if (lumv1->lmm_magic == LOV_USER_MAGIC_V3) {
1348		lum_size = sizeof(struct lov_user_md_v3);
1349		if (copy_from_user(&lumv3, lumv3p, lum_size))
1350			return -EFAULT;
1351	}
1352
1353	rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
1354	if (rc == 0) {
1355		struct lov_stripe_md *lsm;
1356		__u32 gen;
1357
1358		put_user(0, &lumv1p->lmm_stripe_count);
1359
1360		ll_layout_refresh(inode, &gen);
1361		lsm = ccc_inode_lsm_get(inode);
1362		rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1363				   0, lsm, (void *)arg);
1364		ccc_inode_lsm_put(inode, lsm);
1365	}
1366	return rc;
1367}
1368
1369static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1370{
1371	struct lov_stripe_md *lsm;
1372	int rc = -ENODATA;
1373
1374	lsm = ccc_inode_lsm_get(inode);
1375	if (lsm != NULL)
1376		rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0,
1377				   lsm, (void *)arg);
1378	ccc_inode_lsm_put(inode, lsm);
1379	return rc;
1380}
1381
1382int ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1383{
1384	struct ll_inode_info   *lli = ll_i2info(inode);
1385	struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1386	struct ccc_grouplock    grouplock;
1387	int		     rc;
1388
1389	if (ll_file_nolock(file))
1390		return -EOPNOTSUPP;
1391
1392	spin_lock(&lli->lli_lock);
1393	if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1394		CWARN("group lock already existed with gid %lu\n",
1395		      fd->fd_grouplock.cg_gid);
1396		spin_unlock(&lli->lli_lock);
1397		return -EINVAL;
1398	}
1399	LASSERT(fd->fd_grouplock.cg_lock == NULL);
1400	spin_unlock(&lli->lli_lock);
1401
1402	rc = cl_get_grouplock(cl_i2info(inode)->lli_clob,
1403			      arg, (file->f_flags & O_NONBLOCK), &grouplock);
1404	if (rc)
1405		return rc;
1406
1407	spin_lock(&lli->lli_lock);
1408	if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1409		spin_unlock(&lli->lli_lock);
1410		CERROR("another thread just won the race\n");
1411		cl_put_grouplock(&grouplock);
1412		return -EINVAL;
1413	}
1414
1415	fd->fd_flags |= LL_FILE_GROUP_LOCKED;
1416	fd->fd_grouplock = grouplock;
1417	spin_unlock(&lli->lli_lock);
1418
1419	CDEBUG(D_INFO, "group lock %lu obtained\n", arg);
1420	return 0;
1421}
1422
1423int ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1424{
1425	struct ll_inode_info   *lli = ll_i2info(inode);
1426	struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1427	struct ccc_grouplock    grouplock;
1428
1429	spin_lock(&lli->lli_lock);
1430	if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1431		spin_unlock(&lli->lli_lock);
1432		CWARN("no group lock held\n");
1433		return -EINVAL;
1434	}
1435	LASSERT(fd->fd_grouplock.cg_lock != NULL);
1436
1437	if (fd->fd_grouplock.cg_gid != arg) {
1438		CWARN("group lock %lu doesn't match current id %lu\n",
1439		       arg, fd->fd_grouplock.cg_gid);
1440		spin_unlock(&lli->lli_lock);
1441		return -EINVAL;
1442	}
1443
1444	grouplock = fd->fd_grouplock;
1445	memset(&fd->fd_grouplock, 0, sizeof(fd->fd_grouplock));
1446	fd->fd_flags &= ~LL_FILE_GROUP_LOCKED;
1447	spin_unlock(&lli->lli_lock);
1448
1449	cl_put_grouplock(&grouplock);
1450	CDEBUG(D_INFO, "group lock %lu released\n", arg);
1451	return 0;
1452}
1453
1454/**
1455 * Close inode open handle
1456 *
1457 * \param dentry [in]     dentry which contains the inode
1458 * \param it     [in,out] intent which contains open info and result
1459 *
1460 * \retval 0     success
1461 * \retval <0    failure
1462 */
1463int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
1464{
1465	struct inode *inode = dentry->d_inode;
1466	struct obd_client_handle *och;
1467	int rc;
1468
1469	LASSERT(inode);
1470
1471	/* Root ? Do nothing. */
1472	if (dentry->d_inode->i_sb->s_root == dentry)
1473		return 0;
1474
1475	/* No open handle to close? Move away */
1476	if (!it_disposition(it, DISP_OPEN_OPEN))
1477		return 0;
1478
1479	LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
1480
1481	OBD_ALLOC(och, sizeof(*och));
1482	if (!och)
1483		GOTO(out, rc = -ENOMEM);
1484
1485	ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
1486		    ll_i2info(inode), it, och);
1487
1488	rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
1489				       inode, och);
1490 out:
1491	/* this one is in place of ll_file_open */
1492	if (it_disposition(it, DISP_ENQ_OPEN_REF)) {
1493		ptlrpc_req_finished(it->d.lustre.it_data);
1494		it_clear_disposition(it, DISP_ENQ_OPEN_REF);
1495	}
1496	return rc;
1497}
1498
1499/**
1500 * Get size for inode for which FIEMAP mapping is requested.
1501 * Make the FIEMAP get_info call and returns the result.
1502 */
1503int ll_do_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
1504	      int num_bytes)
1505{
1506	struct obd_export *exp = ll_i2dtexp(inode);
1507	struct lov_stripe_md *lsm = NULL;
1508	struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
1509	int vallen = num_bytes;
1510	int rc;
1511
1512	/* Checks for fiemap flags */
1513	if (fiemap->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
1514		fiemap->fm_flags &= ~LUSTRE_FIEMAP_FLAGS_COMPAT;
1515		return -EBADR;
1516	}
1517
1518	/* Check for FIEMAP_FLAG_SYNC */
1519	if (fiemap->fm_flags & FIEMAP_FLAG_SYNC) {
1520		rc = filemap_fdatawrite(inode->i_mapping);
1521		if (rc)
1522			return rc;
1523	}
1524
1525	lsm = ccc_inode_lsm_get(inode);
1526	if (lsm == NULL)
1527		return -ENOENT;
1528
1529	/* If the stripe_count > 1 and the application does not understand
1530	 * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
1531	 */
1532	if (lsm->lsm_stripe_count > 1 &&
1533	    !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
1534		GOTO(out, rc = -EOPNOTSUPP);
1535
1536	fm_key.oa.o_oi = lsm->lsm_oi;
1537	fm_key.oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
1538
1539	obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLSIZE);
1540	obdo_set_parent_fid(&fm_key.oa, &ll_i2info(inode)->lli_fid);
1541	/* If filesize is 0, then there would be no objects for mapping */
1542	if (fm_key.oa.o_size == 0) {
1543		fiemap->fm_mapped_extents = 0;
1544		GOTO(out, rc = 0);
1545	}
1546
1547	memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
1548
1549	rc = obd_get_info(NULL, exp, sizeof(fm_key), &fm_key, &vallen,
1550			  fiemap, lsm);
1551	if (rc)
1552		CERROR("obd_get_info failed: rc = %d\n", rc);
1553
1554out:
1555	ccc_inode_lsm_put(inode, lsm);
1556	return rc;
1557}
1558
1559int ll_fid2path(struct inode *inode, void *arg)
1560{
1561	struct obd_export	*exp = ll_i2mdexp(inode);
1562	struct getinfo_fid2path	*gfout, *gfin;
1563	int			 outsize, rc;
1564
1565	if (!cfs_capable(CFS_CAP_DAC_READ_SEARCH) &&
1566	    !(ll_i2sbi(inode)->ll_flags & LL_SBI_USER_FID2PATH))
1567		return -EPERM;
1568
1569	/* Need to get the buflen */
1570	OBD_ALLOC_PTR(gfin);
1571	if (gfin == NULL)
1572		return -ENOMEM;
1573	if (copy_from_user(gfin, arg, sizeof(*gfin))) {
1574		OBD_FREE_PTR(gfin);
1575		return -EFAULT;
1576	}
1577
1578	outsize = sizeof(*gfout) + gfin->gf_pathlen;
1579	OBD_ALLOC(gfout, outsize);
1580	if (gfout == NULL) {
1581		OBD_FREE_PTR(gfin);
1582		return -ENOMEM;
1583	}
1584	memcpy(gfout, gfin, sizeof(*gfout));
1585	OBD_FREE_PTR(gfin);
1586
1587	/* Call mdc_iocontrol */
1588	rc = obd_iocontrol(OBD_IOC_FID2PATH, exp, outsize, gfout, NULL);
1589	if (rc)
1590		GOTO(gf_free, rc);
1591
1592	if (copy_to_user(arg, gfout, outsize))
1593		rc = -EFAULT;
1594
1595gf_free:
1596	OBD_FREE(gfout, outsize);
1597	return rc;
1598}
1599
1600static int ll_ioctl_fiemap(struct inode *inode, unsigned long arg)
1601{
1602	struct ll_user_fiemap *fiemap_s;
1603	size_t num_bytes, ret_bytes;
1604	unsigned int extent_count;
1605	int rc = 0;
1606
1607	/* Get the extent count so we can calculate the size of
1608	 * required fiemap buffer */
1609	if (get_user(extent_count,
1610	    &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
1611		return -EFAULT;
1612	num_bytes = sizeof(*fiemap_s) + (extent_count *
1613					 sizeof(struct ll_fiemap_extent));
1614
1615	OBD_ALLOC_LARGE(fiemap_s, num_bytes);
1616	if (fiemap_s == NULL)
1617		return -ENOMEM;
1618
1619	/* get the fiemap value */
1620	if (copy_from_user(fiemap_s, (struct ll_user_fiemap __user *)arg,
1621			   sizeof(*fiemap_s)))
1622		GOTO(error, rc = -EFAULT);
1623
1624	/* If fm_extent_count is non-zero, read the first extent since
1625	 * it is used to calculate end_offset and device from previous
1626	 * fiemap call. */
1627	if (extent_count) {
1628		if (copy_from_user(&fiemap_s->fm_extents[0],
1629		    (char __user *)arg + sizeof(*fiemap_s),
1630		    sizeof(struct ll_fiemap_extent)))
1631			GOTO(error, rc = -EFAULT);
1632	}
1633
1634	rc = ll_do_fiemap(inode, fiemap_s, num_bytes);
1635	if (rc)
1636		GOTO(error, rc);
1637
1638	ret_bytes = sizeof(struct ll_user_fiemap);
1639
1640	if (extent_count != 0)
1641		ret_bytes += (fiemap_s->fm_mapped_extents *
1642				 sizeof(struct ll_fiemap_extent));
1643
1644	if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
1645		rc = -EFAULT;
1646
1647error:
1648	OBD_FREE_LARGE(fiemap_s, num_bytes);
1649	return rc;
1650}
1651
1652/*
1653 * Read the data_version for inode.
1654 *
1655 * This value is computed using stripe object version on OST.
1656 * Version is computed using server side locking.
1657 *
1658 * @param extent_lock  Take extent lock. Not needed if a process is already
1659 *		       holding the OST object group locks.
1660 */
1661int ll_data_version(struct inode *inode, __u64 *data_version,
1662		    int extent_lock)
1663{
1664	struct lov_stripe_md	*lsm = NULL;
1665	struct ll_sb_info	*sbi = ll_i2sbi(inode);
1666	struct obdo		*obdo = NULL;
1667	int			 rc;
1668
1669	/* If no stripe, we consider version is 0. */
1670	lsm = ccc_inode_lsm_get(inode);
1671	if (!lsm_has_objects(lsm)) {
1672		*data_version = 0;
1673		CDEBUG(D_INODE, "No object for inode\n");
1674		GOTO(out, rc = 0);
1675	}
1676
1677	OBD_ALLOC_PTR(obdo);
1678	if (obdo == NULL)
1679		GOTO(out, rc = -ENOMEM);
1680
1681	rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, obdo, 0, extent_lock);
1682	if (rc == 0) {
1683		if (!(obdo->o_valid & OBD_MD_FLDATAVERSION))
1684			rc = -EOPNOTSUPP;
1685		else
1686			*data_version = obdo->o_data_version;
1687	}
1688
1689	OBD_FREE_PTR(obdo);
1690out:
1691	ccc_inode_lsm_put(inode, lsm);
1692	return rc;
1693}
1694
1695struct ll_swap_stack {
1696	struct iattr		 ia1, ia2;
1697	__u64			 dv1, dv2;
1698	struct inode		*inode1, *inode2;
1699	bool			 check_dv1, check_dv2;
1700};
1701
1702static int ll_swap_layouts(struct file *file1, struct file *file2,
1703			   struct lustre_swap_layouts *lsl)
1704{
1705	struct mdc_swap_layouts	 msl;
1706	struct md_op_data	*op_data;
1707	__u32			 gid;
1708	__u64			 dv;
1709	struct ll_swap_stack	*llss = NULL;
1710	int			 rc;
1711
1712	OBD_ALLOC_PTR(llss);
1713	if (llss == NULL)
1714		return -ENOMEM;
1715
1716	llss->inode1 = file1->f_dentry->d_inode;
1717	llss->inode2 = file2->f_dentry->d_inode;
1718
1719	if (!S_ISREG(llss->inode2->i_mode))
1720		GOTO(free, rc = -EINVAL);
1721
1722	if (inode_permission(llss->inode1, MAY_WRITE) ||
1723	    inode_permission(llss->inode2, MAY_WRITE))
1724		GOTO(free, rc = -EPERM);
1725
1726	if (llss->inode2->i_sb != llss->inode1->i_sb)
1727		GOTO(free, rc = -EXDEV);
1728
1729	/* we use 2 bool because it is easier to swap than 2 bits */
1730	if (lsl->sl_flags & SWAP_LAYOUTS_CHECK_DV1)
1731		llss->check_dv1 = true;
1732
1733	if (lsl->sl_flags & SWAP_LAYOUTS_CHECK_DV2)
1734		llss->check_dv2 = true;
1735
1736	/* we cannot use lsl->sl_dvX directly because we may swap them */
1737	llss->dv1 = lsl->sl_dv1;
1738	llss->dv2 = lsl->sl_dv2;
1739
1740	rc = lu_fid_cmp(ll_inode2fid(llss->inode1), ll_inode2fid(llss->inode2));
1741	if (rc == 0) /* same file, done! */
1742		GOTO(free, rc = 0);
1743
1744	if (rc < 0) { /* sequentialize it */
1745		swap(llss->inode1, llss->inode2);
1746		swap(file1, file2);
1747		swap(llss->dv1, llss->dv2);
1748		swap(llss->check_dv1, llss->check_dv2);
1749	}
1750
1751	gid = lsl->sl_gid;
1752	if (gid != 0) { /* application asks to flush dirty cache */
1753		rc = ll_get_grouplock(llss->inode1, file1, gid);
1754		if (rc < 0)
1755			GOTO(free, rc);
1756
1757		rc = ll_get_grouplock(llss->inode2, file2, gid);
1758		if (rc < 0) {
1759			ll_put_grouplock(llss->inode1, file1, gid);
1760			GOTO(free, rc);
1761		}
1762	}
1763
1764	/* to be able to restore mtime and atime after swap
1765	 * we need to first save them */
1766	if (lsl->sl_flags &
1767	    (SWAP_LAYOUTS_KEEP_MTIME | SWAP_LAYOUTS_KEEP_ATIME)) {
1768		llss->ia1.ia_mtime = llss->inode1->i_mtime;
1769		llss->ia1.ia_atime = llss->inode1->i_atime;
1770		llss->ia1.ia_valid = ATTR_MTIME | ATTR_ATIME;
1771		llss->ia2.ia_mtime = llss->inode2->i_mtime;
1772		llss->ia2.ia_atime = llss->inode2->i_atime;
1773		llss->ia2.ia_valid = ATTR_MTIME | ATTR_ATIME;
1774	}
1775
1776	/* ultimate check, before swaping the layouts we check if
1777	 * dataversion has changed (if requested) */
1778	if (llss->check_dv1) {
1779		rc = ll_data_version(llss->inode1, &dv, 0);
1780		if (rc)
1781			GOTO(putgl, rc);
1782		if (dv != llss->dv1)
1783			GOTO(putgl, rc = -EAGAIN);
1784	}
1785
1786	if (llss->check_dv2) {
1787		rc = ll_data_version(llss->inode2, &dv, 0);
1788		if (rc)
1789			GOTO(putgl, rc);
1790		if (dv != llss->dv2)
1791			GOTO(putgl, rc = -EAGAIN);
1792	}
1793
1794	/* struct md_op_data is used to send the swap args to the mdt
1795	 * only flags is missing, so we use struct mdc_swap_layouts
1796	 * through the md_op_data->op_data */
1797	/* flags from user space have to be converted before they are send to
1798	 * server, no flag is sent today, they are only used on the client */
1799	msl.msl_flags = 0;
1800	rc = -ENOMEM;
1801	op_data = ll_prep_md_op_data(NULL, llss->inode1, llss->inode2, NULL, 0,
1802				     0, LUSTRE_OPC_ANY, &msl);
1803	if (IS_ERR(op_data))
1804		GOTO(free, rc = PTR_ERR(op_data));
1805
1806	rc = obd_iocontrol(LL_IOC_LOV_SWAP_LAYOUTS, ll_i2mdexp(llss->inode1),
1807			   sizeof(*op_data), op_data, NULL);
1808	ll_finish_md_op_data(op_data);
1809
1810putgl:
1811	if (gid != 0) {
1812		ll_put_grouplock(llss->inode2, file2, gid);
1813		ll_put_grouplock(llss->inode1, file1, gid);
1814	}
1815
1816	/* rc can be set from obd_iocontrol() or from a GOTO(putgl, ...) */
1817	if (rc != 0)
1818		GOTO(free, rc);
1819
1820	/* clear useless flags */
1821	if (!(lsl->sl_flags & SWAP_LAYOUTS_KEEP_MTIME)) {
1822		llss->ia1.ia_valid &= ~ATTR_MTIME;
1823		llss->ia2.ia_valid &= ~ATTR_MTIME;
1824	}
1825
1826	if (!(lsl->sl_flags & SWAP_LAYOUTS_KEEP_ATIME)) {
1827		llss->ia1.ia_valid &= ~ATTR_ATIME;
1828		llss->ia2.ia_valid &= ~ATTR_ATIME;
1829	}
1830
1831	/* update time if requested */
1832	rc = 0;
1833	if (llss->ia2.ia_valid != 0) {
1834		mutex_lock(&llss->inode1->i_mutex);
1835		rc = ll_setattr(file1->f_dentry, &llss->ia2);
1836		mutex_unlock(&llss->inode1->i_mutex);
1837	}
1838
1839	if (llss->ia1.ia_valid != 0) {
1840		int rc1;
1841
1842		mutex_lock(&llss->inode2->i_mutex);
1843		rc1 = ll_setattr(file2->f_dentry, &llss->ia1);
1844		mutex_unlock(&llss->inode2->i_mutex);
1845		if (rc == 0)
1846			rc = rc1;
1847	}
1848
1849free:
1850	if (llss != NULL)
1851		OBD_FREE_PTR(llss);
1852
1853	return rc;
1854}
1855
1856long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
1857{
1858	struct inode		*inode = file->f_dentry->d_inode;
1859	struct ll_file_data	*fd = LUSTRE_FPRIVATE(file);
1860	int			 flags, rc;
1861
1862	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
1863	       inode->i_generation, inode, cmd);
1864	ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
1865
1866	/* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
1867	if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
1868		return -ENOTTY;
1869
1870	switch(cmd) {
1871	case LL_IOC_GETFLAGS:
1872		/* Get the current value of the file flags */
1873		return put_user(fd->fd_flags, (int *)arg);
1874	case LL_IOC_SETFLAGS:
1875	case LL_IOC_CLRFLAGS:
1876		/* Set or clear specific file flags */
1877		/* XXX This probably needs checks to ensure the flags are
1878		 *     not abused, and to handle any flag side effects.
1879		 */
1880		if (get_user(flags, (int *) arg))
1881			return -EFAULT;
1882
1883		if (cmd == LL_IOC_SETFLAGS) {
1884			if ((flags & LL_FILE_IGNORE_LOCK) &&
1885			    !(file->f_flags & O_DIRECT)) {
1886				CERROR("%s: unable to disable locking on "
1887				       "non-O_DIRECT file\n", current->comm);
1888				return -EINVAL;
1889			}
1890
1891			fd->fd_flags |= flags;
1892		} else {
1893			fd->fd_flags &= ~flags;
1894		}
1895		return 0;
1896	case LL_IOC_LOV_SETSTRIPE:
1897		return ll_lov_setstripe(inode, file, arg);
1898	case LL_IOC_LOV_SETEA:
1899		return ll_lov_setea(inode, file, arg);
1900	case LL_IOC_LOV_SWAP_LAYOUTS: {
1901		struct file *file2;
1902		struct lustre_swap_layouts lsl;
1903
1904		if (copy_from_user(&lsl, (char *)arg,
1905				       sizeof(struct lustre_swap_layouts)))
1906			return -EFAULT;
1907
1908		if ((file->f_flags & O_ACCMODE) == 0) /* O_RDONLY */
1909			return -EPERM;
1910
1911		file2 = fget(lsl.sl_fd);
1912		if (file2 == NULL)
1913			return -EBADF;
1914
1915		rc = -EPERM;
1916		if ((file2->f_flags & O_ACCMODE) != 0) /* O_WRONLY or O_RDWR */
1917			rc = ll_swap_layouts(file, file2, &lsl);
1918		fput(file2);
1919		return rc;
1920	}
1921	case LL_IOC_LOV_GETSTRIPE:
1922		return ll_lov_getstripe(inode, arg);
1923	case LL_IOC_RECREATE_OBJ:
1924		return ll_lov_recreate_obj(inode, arg);
1925	case LL_IOC_RECREATE_FID:
1926		return ll_lov_recreate_fid(inode, arg);
1927	case FSFILT_IOC_FIEMAP:
1928		return ll_ioctl_fiemap(inode, arg);
1929	case FSFILT_IOC_GETFLAGS:
1930	case FSFILT_IOC_SETFLAGS:
1931		return ll_iocontrol(inode, file, cmd, arg);
1932	case FSFILT_IOC_GETVERSION_OLD:
1933	case FSFILT_IOC_GETVERSION:
1934		return put_user(inode->i_generation, (int *)arg);
1935	case LL_IOC_GROUP_LOCK:
1936		return ll_get_grouplock(inode, file, arg);
1937	case LL_IOC_GROUP_UNLOCK:
1938		return ll_put_grouplock(inode, file, arg);
1939	case IOC_OBD_STATFS:
1940		return ll_obd_statfs(inode, (void *)arg);
1941
1942	/* We need to special case any other ioctls we want to handle,
1943	 * to send them to the MDS/OST as appropriate and to properly
1944	 * network encode the arg field.
1945	case FSFILT_IOC_SETVERSION_OLD:
1946	case FSFILT_IOC_SETVERSION:
1947	*/
1948	case LL_IOC_FLUSHCTX:
1949		return ll_flush_ctx(inode);
1950	case LL_IOC_PATH2FID: {
1951		if (copy_to_user((void *)arg, ll_inode2fid(inode),
1952				 sizeof(struct lu_fid)))
1953			return -EFAULT;
1954
1955		return 0;
1956	}
1957	case OBD_IOC_FID2PATH:
1958		return ll_fid2path(inode, (void *)arg);
1959	case LL_IOC_DATA_VERSION: {
1960		struct ioc_data_version	idv;
1961		int			rc;
1962
1963		if (copy_from_user(&idv, (char *)arg, sizeof(idv)))
1964			return -EFAULT;
1965
1966		rc = ll_data_version(inode, &idv.idv_version,
1967				!(idv.idv_flags & LL_DV_NOFLUSH));
1968
1969		if (rc == 0 && copy_to_user((char *) arg, &idv, sizeof(idv)))
1970			return -EFAULT;
1971
1972		return rc;
1973	}
1974
1975	case LL_IOC_GET_MDTIDX: {
1976		int mdtidx;
1977
1978		mdtidx = ll_get_mdt_idx(inode);
1979		if (mdtidx < 0)
1980			return mdtidx;
1981
1982		if (put_user((int)mdtidx, (int*)arg))
1983			return -EFAULT;
1984
1985		return 0;
1986	}
1987	case OBD_IOC_GETDTNAME:
1988	case OBD_IOC_GETMDNAME:
1989		return ll_get_obd_name(inode, cmd, arg);
1990	case LL_IOC_HSM_STATE_GET: {
1991		struct md_op_data	*op_data;
1992		struct hsm_user_state	*hus;
1993		int			 rc;
1994
1995		OBD_ALLOC_PTR(hus);
1996		if (hus == NULL)
1997			return -ENOMEM;
1998
1999		op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2000					     LUSTRE_OPC_ANY, hus);
2001		if (IS_ERR(op_data)) {
2002			OBD_FREE_PTR(hus);
2003			return PTR_ERR(op_data);
2004		}
2005
2006		rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2007				   op_data, NULL);
2008
2009		if (copy_to_user((void *)arg, hus, sizeof(*hus)))
2010			rc = -EFAULT;
2011
2012		ll_finish_md_op_data(op_data);
2013		OBD_FREE_PTR(hus);
2014		return rc;
2015	}
2016	case LL_IOC_HSM_STATE_SET: {
2017		struct md_op_data	*op_data;
2018		struct hsm_state_set	*hss;
2019		int			 rc;
2020
2021		OBD_ALLOC_PTR(hss);
2022		if (hss == NULL)
2023			return -ENOMEM;
2024		if (copy_from_user(hss, (char *)arg, sizeof(*hss))) {
2025			OBD_FREE_PTR(hss);
2026			return -EFAULT;
2027		}
2028
2029		/* Non-root users are forbidden to set or clear flags which are
2030		 * NOT defined in HSM_USER_MASK. */
2031		if (((hss->hss_setmask | hss->hss_clearmask) & ~HSM_USER_MASK)
2032		    && !cfs_capable(CFS_CAP_SYS_ADMIN)) {
2033			OBD_FREE_PTR(hss);
2034			return -EPERM;
2035		}
2036
2037		op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2038					     LUSTRE_OPC_ANY, hss);
2039		if (IS_ERR(op_data)) {
2040			OBD_FREE_PTR(hss);
2041			return PTR_ERR(op_data);
2042		}
2043
2044		rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2045				   op_data, NULL);
2046
2047		ll_finish_md_op_data(op_data);
2048
2049		OBD_FREE_PTR(hss);
2050		return rc;
2051	}
2052	case LL_IOC_HSM_ACTION: {
2053		struct md_op_data		*op_data;
2054		struct hsm_current_action	*hca;
2055		int				 rc;
2056
2057		OBD_ALLOC_PTR(hca);
2058		if (hca == NULL)
2059			return -ENOMEM;
2060
2061		op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2062					     LUSTRE_OPC_ANY, hca);
2063		if (IS_ERR(op_data)) {
2064			OBD_FREE_PTR(hca);
2065			return PTR_ERR(op_data);
2066		}
2067
2068		rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2069				   op_data, NULL);
2070
2071		if (copy_to_user((char *)arg, hca, sizeof(*hca)))
2072			rc = -EFAULT;
2073
2074		ll_finish_md_op_data(op_data);
2075		OBD_FREE_PTR(hca);
2076		return rc;
2077	}
2078	default: {
2079		int err;
2080
2081		if (LLIOC_STOP ==
2082		     ll_iocontrol_call(inode, file, cmd, arg, &err))
2083			return err;
2084
2085		return obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2086				     (void *)arg);
2087	}
2088	}
2089}
2090
2091
2092loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2093{
2094	struct inode *inode = file->f_dentry->d_inode;
2095	loff_t retval, eof = 0;
2096
2097	retval = offset + ((origin == SEEK_END) ? i_size_read(inode) :
2098			   (origin == SEEK_CUR) ? file->f_pos : 0);
2099	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%llu=%#llx(%d)\n",
2100	       inode->i_ino, inode->i_generation, inode, retval, retval,
2101	       origin);
2102	ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2103
2104	if (origin == SEEK_END || origin == SEEK_HOLE || origin == SEEK_DATA) {
2105		retval = ll_glimpse_size(inode);
2106		if (retval != 0)
2107			return retval;
2108		eof = i_size_read(inode);
2109	}
2110
2111	retval = generic_file_llseek_size(file, offset, origin,
2112					  ll_file_maxbytes(inode), eof);
2113	return retval;
2114}
2115
2116int ll_flush(struct file *file, fl_owner_t id)
2117{
2118	struct inode *inode = file->f_dentry->d_inode;
2119	struct ll_inode_info *lli = ll_i2info(inode);
2120	struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2121	int rc, err;
2122
2123	LASSERT(!S_ISDIR(inode->i_mode));
2124
2125	/* catch async errors that were recorded back when async writeback
2126	 * failed for pages in this mapping. */
2127	rc = lli->lli_async_rc;
2128	lli->lli_async_rc = 0;
2129	err = lov_read_and_clear_async_rc(lli->lli_clob);
2130	if (rc == 0)
2131		rc = err;
2132
2133	/* The application has been told write failure already.
2134	 * Do not report failure again. */
2135	if (fd->fd_write_failed)
2136		return 0;
2137	return rc ? -EIO : 0;
2138}
2139
2140/**
2141 * Called to make sure a portion of file has been written out.
2142 * if @local_only is not true, it will send OST_SYNC RPCs to ost.
2143 *
2144 * Return how many pages have been written.
2145 */
2146int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end,
2147		       enum cl_fsync_mode mode, int ignore_layout)
2148{
2149	struct cl_env_nest nest;
2150	struct lu_env *env;
2151	struct cl_io *io;
2152	struct obd_capa *capa = NULL;
2153	struct cl_fsync_io *fio;
2154	int result;
2155
2156	if (mode != CL_FSYNC_NONE && mode != CL_FSYNC_LOCAL &&
2157	    mode != CL_FSYNC_DISCARD && mode != CL_FSYNC_ALL)
2158		return -EINVAL;
2159
2160	env = cl_env_nested_get(&nest);
2161	if (IS_ERR(env))
2162		return PTR_ERR(env);
2163
2164	capa = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2165
2166	io = ccc_env_thread_io(env);
2167	io->ci_obj = cl_i2info(inode)->lli_clob;
2168	io->ci_ignore_layout = ignore_layout;
2169
2170	/* initialize parameters for sync */
2171	fio = &io->u.ci_fsync;
2172	fio->fi_capa = capa;
2173	fio->fi_start = start;
2174	fio->fi_end = end;
2175	fio->fi_fid = ll_inode2fid(inode);
2176	fio->fi_mode = mode;
2177	fio->fi_nr_written = 0;
2178
2179	if (cl_io_init(env, io, CIT_FSYNC, io->ci_obj) == 0)
2180		result = cl_io_loop(env, io);
2181	else
2182		result = io->ci_result;
2183	if (result == 0)
2184		result = fio->fi_nr_written;
2185	cl_io_fini(env, io);
2186	cl_env_nested_put(&nest, env);
2187
2188	capa_put(capa);
2189
2190	return result;
2191}
2192
2193/*
2194 * When dentry is provided (the 'else' case), *file->f_dentry may be
2195 * null and dentry must be used directly rather than pulled from
2196 * *file->f_dentry as is done otherwise.
2197 */
2198
2199int ll_fsync(struct file *file, loff_t start, loff_t end, int datasync)
2200{
2201	struct dentry *dentry = file->f_dentry;
2202	struct inode *inode = dentry->d_inode;
2203	struct ll_inode_info *lli = ll_i2info(inode);
2204	struct ptlrpc_request *req;
2205	struct obd_capa *oc;
2206	int rc, err;
2207
2208	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2209	       inode->i_generation, inode);
2210	ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2211
2212	rc = filemap_write_and_wait_range(inode->i_mapping, start, end);
2213	mutex_lock(&inode->i_mutex);
2214
2215	/* catch async errors that were recorded back when async writeback
2216	 * failed for pages in this mapping. */
2217	if (!S_ISDIR(inode->i_mode)) {
2218		err = lli->lli_async_rc;
2219		lli->lli_async_rc = 0;
2220		if (rc == 0)
2221			rc = err;
2222		err = lov_read_and_clear_async_rc(lli->lli_clob);
2223		if (rc == 0)
2224			rc = err;
2225	}
2226
2227	oc = ll_mdscapa_get(inode);
2228	err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2229		      &req);
2230	capa_put(oc);
2231	if (!rc)
2232		rc = err;
2233	if (!err)
2234		ptlrpc_req_finished(req);
2235
2236	if (datasync && S_ISREG(inode->i_mode)) {
2237		struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2238
2239		err = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF,
2240				CL_FSYNC_ALL, 0);
2241		if (rc == 0 && err < 0)
2242			rc = err;
2243		if (rc < 0)
2244			fd->fd_write_failed = true;
2245		else
2246			fd->fd_write_failed = false;
2247	}
2248
2249	mutex_unlock(&inode->i_mutex);
2250	return rc;
2251}
2252
2253int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2254{
2255	struct inode *inode = file->f_dentry->d_inode;
2256	struct ll_sb_info *sbi = ll_i2sbi(inode);
2257	struct ldlm_enqueue_info einfo = {
2258		.ei_type	= LDLM_FLOCK,
2259		.ei_cb_cp	= ldlm_flock_completion_ast,
2260		.ei_cbdata	= file_lock,
2261	};
2262	struct md_op_data *op_data;
2263	struct lustre_handle lockh = {0};
2264	ldlm_policy_data_t flock = {{0}};
2265	int flags = 0;
2266	int rc;
2267	int rc2 = 0;
2268
2269	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2270	       inode->i_ino, file_lock);
2271
2272	ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2273
2274	if (file_lock->fl_flags & FL_FLOCK) {
2275		LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2276		/* flocks are whole-file locks */
2277		flock.l_flock.end = OFFSET_MAX;
2278		/* For flocks owner is determined by the local file desctiptor*/
2279		flock.l_flock.owner = (unsigned long)file_lock->fl_file;
2280	} else if (file_lock->fl_flags & FL_POSIX) {
2281		flock.l_flock.owner = (unsigned long)file_lock->fl_owner;
2282		flock.l_flock.start = file_lock->fl_start;
2283		flock.l_flock.end = file_lock->fl_end;
2284	} else {
2285		return -EINVAL;
2286	}
2287	flock.l_flock.pid = file_lock->fl_pid;
2288
2289	/* Somewhat ugly workaround for svc lockd.
2290	 * lockd installs custom fl_lmops->lm_compare_owner that checks
2291	 * for the fl_owner to be the same (which it always is on local node
2292	 * I guess between lockd processes) and then compares pid.
2293	 * As such we assign pid to the owner field to make it all work,
2294	 * conflict with normal locks is unlikely since pid space and
2295	 * pointer space for current->files are not intersecting */
2296	if (file_lock->fl_lmops && file_lock->fl_lmops->lm_compare_owner)
2297		flock.l_flock.owner = (unsigned long)file_lock->fl_pid;
2298
2299	switch (file_lock->fl_type) {
2300	case F_RDLCK:
2301		einfo.ei_mode = LCK_PR;
2302		break;
2303	case F_UNLCK:
2304		/* An unlock request may or may not have any relation to
2305		 * existing locks so we may not be able to pass a lock handle
2306		 * via a normal ldlm_lock_cancel() request. The request may even
2307		 * unlock a byte range in the middle of an existing lock. In
2308		 * order to process an unlock request we need all of the same
2309		 * information that is given with a normal read or write record
2310		 * lock request. To avoid creating another ldlm unlock (cancel)
2311		 * message we'll treat a LCK_NL flock request as an unlock. */
2312		einfo.ei_mode = LCK_NL;
2313		break;
2314	case F_WRLCK:
2315		einfo.ei_mode = LCK_PW;
2316		break;
2317	default:
2318		CDEBUG(D_INFO, "Unknown fcntl lock type: %d\n",
2319			file_lock->fl_type);
2320		return -ENOTSUPP;
2321	}
2322
2323	switch (cmd) {
2324	case F_SETLKW:
2325#ifdef F_SETLKW64
2326	case F_SETLKW64:
2327#endif
2328		flags = 0;
2329		break;
2330	case F_SETLK:
2331#ifdef F_SETLK64
2332	case F_SETLK64:
2333#endif
2334		flags = LDLM_FL_BLOCK_NOWAIT;
2335		break;
2336	case F_GETLK:
2337#ifdef F_GETLK64
2338	case F_GETLK64:
2339#endif
2340		flags = LDLM_FL_TEST_LOCK;
2341		/* Save the old mode so that if the mode in the lock changes we
2342		 * can decrement the appropriate reader or writer refcount. */
2343		file_lock->fl_type = einfo.ei_mode;
2344		break;
2345	default:
2346		CERROR("unknown fcntl lock command: %d\n", cmd);
2347		return -EINVAL;
2348	}
2349
2350	op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2351				     LUSTRE_OPC_ANY, NULL);
2352	if (IS_ERR(op_data))
2353		return PTR_ERR(op_data);
2354
2355	CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2356	       "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2357	       flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2358
2359	rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2360			op_data, &lockh, &flock, 0, NULL /* req */, flags);
2361
2362	if ((file_lock->fl_flags & FL_FLOCK) &&
2363	    (rc == 0 || file_lock->fl_type == F_UNLCK))
2364		rc2  = flock_lock_file_wait(file, file_lock);
2365	if ((file_lock->fl_flags & FL_POSIX) &&
2366	    (rc == 0 || file_lock->fl_type == F_UNLCK) &&
2367	    !(flags & LDLM_FL_TEST_LOCK))
2368		rc2  = posix_lock_file_wait(file, file_lock);
2369
2370	if (rc2 && file_lock->fl_type != F_UNLCK) {
2371		einfo.ei_mode = LCK_NL;
2372		md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2373			op_data, &lockh, &flock, 0, NULL /* req */, flags);
2374		rc = rc2;
2375	}
2376
2377	ll_finish_md_op_data(op_data);
2378
2379	return rc;
2380}
2381
2382int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2383{
2384	return -ENOSYS;
2385}
2386
2387/**
2388 * test if some locks matching bits and l_req_mode are acquired
2389 * - bits can be in different locks
2390 * - if found clear the common lock bits in *bits
2391 * - the bits not found, are kept in *bits
2392 * \param inode [IN]
2393 * \param bits [IN] searched lock bits [IN]
2394 * \param l_req_mode [IN] searched lock mode
2395 * \retval boolean, true iff all bits are found
2396 */
2397int ll_have_md_lock(struct inode *inode, __u64 *bits,  ldlm_mode_t l_req_mode)
2398{
2399	struct lustre_handle lockh;
2400	ldlm_policy_data_t policy;
2401	ldlm_mode_t mode = (l_req_mode == LCK_MINMODE) ?
2402				(LCK_CR|LCK_CW|LCK_PR|LCK_PW) : l_req_mode;
2403	struct lu_fid *fid;
2404	__u64 flags;
2405	int i;
2406
2407	if (!inode)
2408	       return 0;
2409
2410	fid = &ll_i2info(inode)->lli_fid;
2411	CDEBUG(D_INFO, "trying to match res "DFID" mode %s\n", PFID(fid),
2412	       ldlm_lockname[mode]);
2413
2414	flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2415	for (i = 0; i <= MDS_INODELOCK_MAXSHIFT && *bits != 0; i++) {
2416		policy.l_inodebits.bits = *bits & (1 << i);
2417		if (policy.l_inodebits.bits == 0)
2418			continue;
2419
2420		if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS,
2421				  &policy, mode, &lockh)) {
2422			struct ldlm_lock *lock;
2423
2424			lock = ldlm_handle2lock(&lockh);
2425			if (lock) {
2426				*bits &=
2427				      ~(lock->l_policy_data.l_inodebits.bits);
2428				LDLM_LOCK_PUT(lock);
2429			} else {
2430				*bits &= ~policy.l_inodebits.bits;
2431			}
2432		}
2433	}
2434	return *bits == 0;
2435}
2436
2437ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2438			    struct lustre_handle *lockh, __u64 flags)
2439{
2440	ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2441	struct lu_fid *fid;
2442	ldlm_mode_t rc;
2443
2444	fid = &ll_i2info(inode)->lli_fid;
2445	CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2446
2447	rc = md_lock_match(ll_i2mdexp(inode), LDLM_FL_BLOCK_GRANTED|flags,
2448			   fid, LDLM_IBITS, &policy,
2449			   LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2450	return rc;
2451}
2452
2453static int ll_inode_revalidate_fini(struct inode *inode, int rc)
2454{
2455	/* Already unlinked. Just update nlink and return success */
2456	if (rc == -ENOENT) {
2457		clear_nlink(inode);
2458		/* This path cannot be hit for regular files unless in
2459		 * case of obscure races, so no need to to validate
2460		 * size. */
2461		if (!S_ISREG(inode->i_mode) && !S_ISDIR(inode->i_mode))
2462			return 0;
2463	} else if (rc != 0) {
2464		CERROR("%s: revalidate FID "DFID" error: rc = %d\n",
2465		       ll_get_fsname(inode->i_sb, NULL, 0),
2466		       PFID(ll_inode2fid(inode)), rc);
2467	}
2468
2469	return rc;
2470}
2471
2472int __ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2473			     __u64 ibits)
2474{
2475	struct inode *inode = dentry->d_inode;
2476	struct ptlrpc_request *req = NULL;
2477	struct obd_export *exp;
2478	int rc = 0;
2479
2480	LASSERT(inode != NULL);
2481
2482	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2483	       inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2484
2485	exp = ll_i2mdexp(inode);
2486
2487	/* XXX: Enable OBD_CONNECT_ATTRFID to reduce unnecessary getattr RPC.
2488	 *      But under CMD case, it caused some lock issues, should be fixed
2489	 *      with new CMD ibits lock. See bug 12718 */
2490	if (exp_connect_flags(exp) & OBD_CONNECT_ATTRFID) {
2491		struct lookup_intent oit = { .it_op = IT_GETATTR };
2492		struct md_op_data *op_data;
2493
2494		if (ibits == MDS_INODELOCK_LOOKUP)
2495			oit.it_op = IT_LOOKUP;
2496
2497		/* Call getattr by fid, so do not provide name at all. */
2498		op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2499					     dentry->d_inode, NULL, 0, 0,
2500					     LUSTRE_OPC_ANY, NULL);
2501		if (IS_ERR(op_data))
2502			return PTR_ERR(op_data);
2503
2504		oit.it_create_mode |= M_CHECK_STALE;
2505		rc = md_intent_lock(exp, op_data, NULL, 0,
2506				    /* we are not interested in name
2507				       based lookup */
2508				    &oit, 0, &req,
2509				    ll_md_blocking_ast, 0);
2510		ll_finish_md_op_data(op_data);
2511		oit.it_create_mode &= ~M_CHECK_STALE;
2512		if (rc < 0) {
2513			rc = ll_inode_revalidate_fini(inode, rc);
2514			GOTO (out, rc);
2515		}
2516
2517		rc = ll_revalidate_it_finish(req, &oit, dentry);
2518		if (rc != 0) {
2519			ll_intent_release(&oit);
2520			GOTO(out, rc);
2521		}
2522
2523		/* Unlinked? Unhash dentry, so it is not picked up later by
2524		   do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2525		   here to preserve get_cwd functionality on 2.6.
2526		   Bug 10503 */
2527		if (!dentry->d_inode->i_nlink)
2528			d_lustre_invalidate(dentry, 0);
2529
2530		ll_lookup_finish_locks(&oit, dentry);
2531	} else if (!ll_have_md_lock(dentry->d_inode, &ibits, LCK_MINMODE)) {
2532		struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2533		obd_valid valid = OBD_MD_FLGETATTR;
2534		struct md_op_data *op_data;
2535		int ealen = 0;
2536
2537		if (S_ISREG(inode->i_mode)) {
2538			rc = ll_get_max_mdsize(sbi, &ealen);
2539			if (rc)
2540				return rc;
2541			valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2542		}
2543
2544		op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL,
2545					     0, ealen, LUSTRE_OPC_ANY,
2546					     NULL);
2547		if (IS_ERR(op_data))
2548			return PTR_ERR(op_data);
2549
2550		op_data->op_valid = valid;
2551		/* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2552		 * capa for this inode. Because we only keep capas of dirs
2553		 * fresh. */
2554		rc = md_getattr(sbi->ll_md_exp, op_data, &req);
2555		ll_finish_md_op_data(op_data);
2556		if (rc) {
2557			rc = ll_inode_revalidate_fini(inode, rc);
2558			return rc;
2559		}
2560
2561		rc = ll_prep_inode(&inode, req, NULL, NULL);
2562	}
2563out:
2564	ptlrpc_req_finished(req);
2565	return rc;
2566}
2567
2568int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2569			   __u64 ibits)
2570{
2571	struct inode *inode = dentry->d_inode;
2572	int rc;
2573
2574	rc = __ll_inode_revalidate_it(dentry, it, ibits);
2575	if (rc != 0)
2576		return rc;
2577
2578	/* if object isn't regular file, don't validate size */
2579	if (!S_ISREG(inode->i_mode)) {
2580		LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime;
2581		LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
2582		LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
2583	} else {
2584		rc = ll_glimpse_size(inode);
2585	}
2586	return rc;
2587}
2588
2589int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2590		  struct lookup_intent *it, struct kstat *stat)
2591{
2592	struct inode *inode = de->d_inode;
2593	struct ll_sb_info *sbi = ll_i2sbi(inode);
2594	struct ll_inode_info *lli = ll_i2info(inode);
2595	int res = 0;
2596
2597	res = ll_inode_revalidate_it(de, it, MDS_INODELOCK_UPDATE |
2598					     MDS_INODELOCK_LOOKUP);
2599	ll_stats_ops_tally(sbi, LPROC_LL_GETATTR, 1);
2600
2601	if (res)
2602		return res;
2603
2604	stat->dev = inode->i_sb->s_dev;
2605	if (ll_need_32bit_api(sbi))
2606		stat->ino = cl_fid_build_ino(&lli->lli_fid, 1);
2607	else
2608		stat->ino = inode->i_ino;
2609	stat->mode = inode->i_mode;
2610	stat->nlink = inode->i_nlink;
2611	stat->uid = inode->i_uid;
2612	stat->gid = inode->i_gid;
2613	stat->rdev = inode->i_rdev;
2614	stat->atime = inode->i_atime;
2615	stat->mtime = inode->i_mtime;
2616	stat->ctime = inode->i_ctime;
2617	stat->blksize = 1 << inode->i_blkbits;
2618
2619	stat->size = i_size_read(inode);
2620	stat->blocks = inode->i_blocks;
2621
2622	return 0;
2623}
2624int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2625{
2626	struct lookup_intent it = { .it_op = IT_GETATTR };
2627
2628	return ll_getattr_it(mnt, de, &it, stat);
2629}
2630
2631
2632struct posix_acl * ll_get_acl(struct inode *inode, int type)
2633{
2634	struct ll_inode_info *lli = ll_i2info(inode);
2635	struct posix_acl *acl = NULL;
2636
2637	spin_lock(&lli->lli_lock);
2638	/* VFS' acl_permission_check->check_acl will release the refcount */
2639	acl = posix_acl_dup(lli->lli_posix_acl);
2640	spin_unlock(&lli->lli_lock);
2641
2642	return acl;
2643}
2644
2645
2646int ll_inode_permission(struct inode *inode, int mask)
2647{
2648	int rc = 0;
2649
2650#ifdef MAY_NOT_BLOCK
2651	if (mask & MAY_NOT_BLOCK)
2652		return -ECHILD;
2653#endif
2654
2655       /* as root inode are NOT getting validated in lookup operation,
2656	* need to do it before permission check. */
2657
2658	if (inode == inode->i_sb->s_root->d_inode) {
2659		struct lookup_intent it = { .it_op = IT_LOOKUP };
2660
2661		rc = __ll_inode_revalidate_it(inode->i_sb->s_root, &it,
2662					      MDS_INODELOCK_LOOKUP);
2663		if (rc)
2664			return rc;
2665	}
2666
2667	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), inode mode %x mask %o\n",
2668	       inode->i_ino, inode->i_generation, inode, inode->i_mode, mask);
2669
2670	if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2671		return lustre_check_remote_perm(inode, mask);
2672
2673	ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2674	rc = generic_permission(inode, mask);
2675
2676	return rc;
2677}
2678
2679#define READ_METHOD aio_read
2680#define READ_FUNCTION ll_file_aio_read
2681#define WRITE_METHOD aio_write
2682#define WRITE_FUNCTION ll_file_aio_write
2683
2684/* -o localflock - only provides locally consistent flock locks */
2685struct file_operations ll_file_operations = {
2686	.read	   = ll_file_read,
2687	.READ_METHOD    = READ_FUNCTION,
2688	.write	  = ll_file_write,
2689	.WRITE_METHOD   = WRITE_FUNCTION,
2690	.unlocked_ioctl = ll_file_ioctl,
2691	.open	   = ll_file_open,
2692	.release	= ll_file_release,
2693	.mmap	   = ll_file_mmap,
2694	.llseek	 = ll_file_seek,
2695	.splice_read    = ll_file_splice_read,
2696	.fsync	  = ll_fsync,
2697	.flush	  = ll_flush
2698};
2699
2700struct file_operations ll_file_operations_flock = {
2701	.read	   = ll_file_read,
2702	.READ_METHOD    = READ_FUNCTION,
2703	.write	  = ll_file_write,
2704	.WRITE_METHOD   = WRITE_FUNCTION,
2705	.unlocked_ioctl = ll_file_ioctl,
2706	.open	   = ll_file_open,
2707	.release	= ll_file_release,
2708	.mmap	   = ll_file_mmap,
2709	.llseek	 = ll_file_seek,
2710	.splice_read    = ll_file_splice_read,
2711	.fsync	  = ll_fsync,
2712	.flush	  = ll_flush,
2713	.flock	  = ll_file_flock,
2714	.lock	   = ll_file_flock
2715};
2716
2717/* These are for -o noflock - to return ENOSYS on flock calls */
2718struct file_operations ll_file_operations_noflock = {
2719	.read	   = ll_file_read,
2720	.READ_METHOD    = READ_FUNCTION,
2721	.write	  = ll_file_write,
2722	.WRITE_METHOD   = WRITE_FUNCTION,
2723	.unlocked_ioctl = ll_file_ioctl,
2724	.open	   = ll_file_open,
2725	.release	= ll_file_release,
2726	.mmap	   = ll_file_mmap,
2727	.llseek	 = ll_file_seek,
2728	.splice_read    = ll_file_splice_read,
2729	.fsync	  = ll_fsync,
2730	.flush	  = ll_flush,
2731	.flock	  = ll_file_noflock,
2732	.lock	   = ll_file_noflock
2733};
2734
2735struct inode_operations ll_file_inode_operations = {
2736	.setattr	= ll_setattr,
2737	.getattr	= ll_getattr,
2738	.permission	= ll_inode_permission,
2739	.setxattr	= ll_setxattr,
2740	.getxattr	= ll_getxattr,
2741	.listxattr	= ll_listxattr,
2742	.removexattr	= ll_removexattr,
2743	.get_acl	= ll_get_acl,
2744};
2745
2746/* dynamic ioctl number support routins */
2747static struct llioc_ctl_data {
2748	struct rw_semaphore	ioc_sem;
2749	struct list_head	      ioc_head;
2750} llioc = {
2751	__RWSEM_INITIALIZER(llioc.ioc_sem),
2752	LIST_HEAD_INIT(llioc.ioc_head)
2753};
2754
2755
2756struct llioc_data {
2757	struct list_head	      iocd_list;
2758	unsigned int	    iocd_size;
2759	llioc_callback_t	iocd_cb;
2760	unsigned int	    iocd_count;
2761	unsigned int	    iocd_cmd[0];
2762};
2763
2764void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
2765{
2766	unsigned int size;
2767	struct llioc_data *in_data = NULL;
2768
2769	if (cb == NULL || cmd == NULL ||
2770	    count > LLIOC_MAX_CMD || count < 0)
2771		return NULL;
2772
2773	size = sizeof(*in_data) + count * sizeof(unsigned int);
2774	OBD_ALLOC(in_data, size);
2775	if (in_data == NULL)
2776		return NULL;
2777
2778	memset(in_data, 0, sizeof(*in_data));
2779	in_data->iocd_size = size;
2780	in_data->iocd_cb = cb;
2781	in_data->iocd_count = count;
2782	memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
2783
2784	down_write(&llioc.ioc_sem);
2785	list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
2786	up_write(&llioc.ioc_sem);
2787
2788	return in_data;
2789}
2790
2791void ll_iocontrol_unregister(void *magic)
2792{
2793	struct llioc_data *tmp;
2794
2795	if (magic == NULL)
2796		return;
2797
2798	down_write(&llioc.ioc_sem);
2799	list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
2800		if (tmp == magic) {
2801			unsigned int size = tmp->iocd_size;
2802
2803			list_del(&tmp->iocd_list);
2804			up_write(&llioc.ioc_sem);
2805
2806			OBD_FREE(tmp, size);
2807			return;
2808		}
2809	}
2810	up_write(&llioc.ioc_sem);
2811
2812	CWARN("didn't find iocontrol register block with magic: %p\n", magic);
2813}
2814
2815EXPORT_SYMBOL(ll_iocontrol_register);
2816EXPORT_SYMBOL(ll_iocontrol_unregister);
2817
2818enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file,
2819			unsigned int cmd, unsigned long arg, int *rcp)
2820{
2821	enum llioc_iter ret = LLIOC_CONT;
2822	struct llioc_data *data;
2823	int rc = -EINVAL, i;
2824
2825	down_read(&llioc.ioc_sem);
2826	list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
2827		for (i = 0; i < data->iocd_count; i++) {
2828			if (cmd != data->iocd_cmd[i])
2829				continue;
2830
2831			ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
2832			break;
2833		}
2834
2835		if (ret == LLIOC_STOP)
2836			break;
2837	}
2838	up_read(&llioc.ioc_sem);
2839
2840	if (rcp)
2841		*rcp = rc;
2842	return ret;
2843}
2844
2845int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf)
2846{
2847	struct ll_inode_info *lli = ll_i2info(inode);
2848	struct cl_env_nest nest;
2849	struct lu_env *env;
2850	int result;
2851
2852	if (lli->lli_clob == NULL)
2853		return 0;
2854
2855	env = cl_env_nested_get(&nest);
2856	if (IS_ERR(env))
2857		return PTR_ERR(env);
2858
2859	result = cl_conf_set(env, lli->lli_clob, conf);
2860	cl_env_nested_put(&nest, env);
2861
2862	if (conf->coc_opc == OBJECT_CONF_SET) {
2863		struct ldlm_lock *lock = conf->coc_lock;
2864
2865		LASSERT(lock != NULL);
2866		LASSERT(ldlm_has_layout(lock));
2867		if (result == 0) {
2868			/* it can only be allowed to match after layout is
2869			 * applied to inode otherwise false layout would be
2870			 * seen. Applying layout shoud happen before dropping
2871			 * the intent lock. */
2872			ldlm_lock_allow_match(lock);
2873		}
2874	}
2875	return result;
2876}
2877
2878/* Fetch layout from MDT with getxattr request, if it's not ready yet */
2879static int ll_layout_fetch(struct inode *inode, struct ldlm_lock *lock)
2880
2881{
2882	struct ll_sb_info *sbi = ll_i2sbi(inode);
2883	struct obd_capa *oc;
2884	struct ptlrpc_request *req;
2885	struct mdt_body *body;
2886	void *lvbdata;
2887	void *lmm;
2888	int lmmsize;
2889	int rc;
2890
2891	CDEBUG(D_INODE, DFID" LVB_READY=%d l_lvb_data=%p l_lvb_len=%d\n",
2892	       PFID(ll_inode2fid(inode)), !!(lock->l_flags & LDLM_FL_LVB_READY),
2893	       lock->l_lvb_data, lock->l_lvb_len);
2894
2895	if ((lock->l_lvb_data != NULL) && (lock->l_flags & LDLM_FL_LVB_READY))
2896		return 0;
2897
2898	/* if layout lock was granted right away, the layout is returned
2899	 * within DLM_LVB of dlm reply; otherwise if the lock was ever
2900	 * blocked and then granted via completion ast, we have to fetch
2901	 * layout here. Please note that we can't use the LVB buffer in
2902	 * completion AST because it doesn't have a large enough buffer */
2903	oc = ll_mdscapa_get(inode);
2904	rc = ll_get_max_mdsize(sbi, &lmmsize);
2905	if (rc == 0)
2906		rc = md_getxattr(sbi->ll_md_exp, ll_inode2fid(inode), oc,
2907				OBD_MD_FLXATTR, XATTR_NAME_LOV, NULL, 0,
2908				lmmsize, 0, &req);
2909	capa_put(oc);
2910	if (rc < 0)
2911		return rc;
2912
2913	body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
2914	if (body == NULL || body->eadatasize > lmmsize)
2915		GOTO(out, rc = -EPROTO);
2916
2917	lmmsize = body->eadatasize;
2918	if (lmmsize == 0) /* empty layout */
2919		GOTO(out, rc = 0);
2920
2921	lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_EADATA, lmmsize);
2922	if (lmm == NULL)
2923		GOTO(out, rc = -EFAULT);
2924
2925	OBD_ALLOC_LARGE(lvbdata, lmmsize);
2926	if (lvbdata == NULL)
2927		GOTO(out, rc = -ENOMEM);
2928
2929	memcpy(lvbdata, lmm, lmmsize);
2930	lock_res_and_lock(lock);
2931	if (lock->l_lvb_data != NULL)
2932		OBD_FREE_LARGE(lock->l_lvb_data, lock->l_lvb_len);
2933
2934	lock->l_lvb_data = lvbdata;
2935	lock->l_lvb_len = lmmsize;
2936	unlock_res_and_lock(lock);
2937
2938out:
2939	ptlrpc_req_finished(req);
2940	return rc;
2941}
2942
2943/**
2944 * Apply the layout to the inode. Layout lock is held and will be released
2945 * in this function.
2946 */
2947static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode,
2948				struct inode *inode, __u32 *gen, bool reconf)
2949{
2950	struct ll_inode_info *lli = ll_i2info(inode);
2951	struct ll_sb_info    *sbi = ll_i2sbi(inode);
2952	struct ldlm_lock *lock;
2953	struct lustre_md md = { NULL };
2954	struct cl_object_conf conf;
2955	int rc = 0;
2956	bool lvb_ready;
2957	bool wait_layout = false;
2958
2959	LASSERT(lustre_handle_is_used(lockh));
2960
2961	lock = ldlm_handle2lock(lockh);
2962	LASSERT(lock != NULL);
2963	LASSERT(ldlm_has_layout(lock));
2964
2965	LDLM_DEBUG(lock, "File %p/"DFID" being reconfigured: %d.\n",
2966		   inode, PFID(&lli->lli_fid), reconf);
2967
2968	/* in case this is a caching lock and reinstate with new inode */
2969	md_set_lock_data(sbi->ll_md_exp, &lockh->cookie, inode, NULL);
2970
2971	lock_res_and_lock(lock);
2972	lvb_ready = !!(lock->l_flags & LDLM_FL_LVB_READY);
2973	unlock_res_and_lock(lock);
2974	/* checking lvb_ready is racy but this is okay. The worst case is
2975	 * that multi processes may configure the file on the same time. */
2976	if (lvb_ready || !reconf) {
2977		rc = -ENODATA;
2978		if (lvb_ready) {
2979			/* layout_gen must be valid if layout lock is not
2980			 * cancelled and stripe has already set */
2981			*gen = lli->lli_layout_gen;
2982			rc = 0;
2983		}
2984		GOTO(out, rc);
2985	}
2986
2987	rc = ll_layout_fetch(inode, lock);
2988	if (rc < 0)
2989		GOTO(out, rc);
2990
2991	/* for layout lock, lmm is returned in lock's lvb.
2992	 * lvb_data is immutable if the lock is held so it's safe to access it
2993	 * without res lock. See the description in ldlm_lock_decref_internal()
2994	 * for the condition to free lvb_data of layout lock */
2995	if (lock->l_lvb_data != NULL) {
2996		rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
2997				  lock->l_lvb_data, lock->l_lvb_len);
2998		if (rc >= 0) {
2999			*gen = LL_LAYOUT_GEN_EMPTY;
3000			if (md.lsm != NULL)
3001				*gen = md.lsm->lsm_layout_gen;
3002			rc = 0;
3003		} else {
3004			CERROR("%s: file "DFID" unpackmd error: %d\n",
3005				ll_get_fsname(inode->i_sb, NULL, 0),
3006				PFID(&lli->lli_fid), rc);
3007		}
3008	}
3009	if (rc < 0)
3010		GOTO(out, rc);
3011
3012	/* set layout to file. Unlikely this will fail as old layout was
3013	 * surely eliminated */
3014	memset(&conf, 0, sizeof conf);
3015	conf.coc_opc = OBJECT_CONF_SET;
3016	conf.coc_inode = inode;
3017	conf.coc_lock = lock;
3018	conf.u.coc_md = &md;
3019	rc = ll_layout_conf(inode, &conf);
3020
3021	if (md.lsm != NULL)
3022		obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
3023
3024	/* refresh layout failed, need to wait */
3025	wait_layout = rc == -EBUSY;
3026
3027out:
3028	LDLM_LOCK_PUT(lock);
3029	ldlm_lock_decref(lockh, mode);
3030
3031	/* wait for IO to complete if it's still being used. */
3032	if (wait_layout) {
3033		CDEBUG(D_INODE, "%s: %p/"DFID" wait for layout reconf.\n",
3034			ll_get_fsname(inode->i_sb, NULL, 0),
3035			inode, PFID(&lli->lli_fid));
3036
3037		memset(&conf, 0, sizeof conf);
3038		conf.coc_opc = OBJECT_CONF_WAIT;
3039		conf.coc_inode = inode;
3040		rc = ll_layout_conf(inode, &conf);
3041		if (rc == 0)
3042			rc = -EAGAIN;
3043
3044		CDEBUG(D_INODE, "file: "DFID" waiting layout return: %d.\n",
3045			PFID(&lli->lli_fid), rc);
3046	}
3047	return rc;
3048}
3049
3050/**
3051 * This function checks if there exists a LAYOUT lock on the client side,
3052 * or enqueues it if it doesn't have one in cache.
3053 *
3054 * This function will not hold layout lock so it may be revoked any time after
3055 * this function returns. Any operations depend on layout should be redone
3056 * in that case.
3057 *
3058 * This function should be called before lov_io_init() to get an uptodate
3059 * layout version, the caller should save the version number and after IO
3060 * is finished, this function should be called again to verify that layout
3061 * is not changed during IO time.
3062 */
3063int ll_layout_refresh(struct inode *inode, __u32 *gen)
3064{
3065	struct ll_inode_info  *lli = ll_i2info(inode);
3066	struct ll_sb_info     *sbi = ll_i2sbi(inode);
3067	struct md_op_data     *op_data;
3068	struct lookup_intent   it;
3069	struct lustre_handle   lockh;
3070	ldlm_mode_t	       mode;
3071	struct ldlm_enqueue_info einfo = {
3072		.ei_type = LDLM_IBITS,
3073		.ei_mode = LCK_CR,
3074		.ei_cb_bl = ll_md_blocking_ast,
3075		.ei_cb_cp = ldlm_completion_ast,
3076	};
3077	int rc;
3078
3079	*gen = lli->lli_layout_gen;
3080	if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK))
3081		return 0;
3082
3083	/* sanity checks */
3084	LASSERT(fid_is_sane(ll_inode2fid(inode)));
3085	LASSERT(S_ISREG(inode->i_mode));
3086
3087	/* mostly layout lock is caching on the local side, so try to match
3088	 * it before grabbing layout lock mutex. */
3089	mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0);
3090	if (mode != 0) { /* hit cached lock */
3091		rc = ll_layout_lock_set(&lockh, mode, inode, gen, false);
3092		if (rc == 0)
3093			return 0;
3094
3095		/* better hold lli_layout_mutex to try again otherwise
3096		 * it will have starvation problem. */
3097	}
3098
3099	/* take layout lock mutex to enqueue layout lock exclusively. */
3100	mutex_lock(&lli->lli_layout_mutex);
3101
3102again:
3103	/* try again. Maybe somebody else has done this. */
3104	mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0);
3105	if (mode != 0) { /* hit cached lock */
3106		rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
3107		if (rc == -EAGAIN)
3108			goto again;
3109
3110		mutex_unlock(&lli->lli_layout_mutex);
3111		return rc;
3112	}
3113
3114	op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
3115			0, 0, LUSTRE_OPC_ANY, NULL);
3116	if (IS_ERR(op_data)) {
3117		mutex_unlock(&lli->lli_layout_mutex);
3118		return PTR_ERR(op_data);
3119	}
3120
3121	/* have to enqueue one */
3122	memset(&it, 0, sizeof(it));
3123	it.it_op = IT_LAYOUT;
3124	lockh.cookie = 0ULL;
3125
3126	LDLM_DEBUG_NOLOCK("%s: requeue layout lock for file %p/"DFID".\n",
3127			ll_get_fsname(inode->i_sb, NULL, 0), inode,
3128			PFID(&lli->lli_fid));
3129
3130	rc = md_enqueue(sbi->ll_md_exp, &einfo, &it, op_data, &lockh,
3131			NULL, 0, NULL, 0);
3132	if (it.d.lustre.it_data != NULL)
3133		ptlrpc_req_finished(it.d.lustre.it_data);
3134	it.d.lustre.it_data = NULL;
3135
3136	ll_finish_md_op_data(op_data);
3137
3138	mode = it.d.lustre.it_lock_mode;
3139	it.d.lustre.it_lock_mode = 0;
3140	ll_intent_drop_lock(&it);
3141
3142	if (rc == 0) {
3143		/* set lock data in case this is a new lock */
3144		ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
3145		rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
3146		if (rc == -EAGAIN)
3147			goto again;
3148	}
3149	mutex_unlock(&lli->lli_layout_mutex);
3150
3151	return rc;
3152}
3153