[go: nahoru, domu]

file.c revision 6f014339718fac7597a53d155a2aa1714091a9af
1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 *
36 * lustre/llite/file.c
37 *
38 * Author: Peter Braam <braam@clusterfs.com>
39 * Author: Phil Schwan <phil@clusterfs.com>
40 * Author: Andreas Dilger <adilger@clusterfs.com>
41 */
42
43#define DEBUG_SUBSYSTEM S_LLITE
44#include <lustre_dlm.h>
45#include <lustre_lite.h>
46#include <linux/pagemap.h>
47#include <linux/file.h>
48#include "llite_internal.h"
49#include <lustre/ll_fiemap.h>
50
51#include "cl_object.h"
52
53struct ll_file_data *ll_file_data_get(void)
54{
55	struct ll_file_data *fd;
56
57	OBD_SLAB_ALLOC_PTR_GFP(fd, ll_file_data_slab, __GFP_IO);
58	if (fd == NULL)
59		return NULL;
60	fd->fd_write_failed = false;
61	return fd;
62}
63
64static void ll_file_data_put(struct ll_file_data *fd)
65{
66	if (fd != NULL)
67		OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
68}
69
70void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
71			  struct lustre_handle *fh)
72{
73	op_data->op_fid1 = ll_i2info(inode)->lli_fid;
74	op_data->op_attr.ia_mode = inode->i_mode;
75	op_data->op_attr.ia_atime = inode->i_atime;
76	op_data->op_attr.ia_mtime = inode->i_mtime;
77	op_data->op_attr.ia_ctime = inode->i_ctime;
78	op_data->op_attr.ia_size = i_size_read(inode);
79	op_data->op_attr_blocks = inode->i_blocks;
80	((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags =
81					ll_inode_to_ext_flags(inode->i_flags);
82	op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
83	if (fh)
84		op_data->op_handle = *fh;
85	op_data->op_capa1 = ll_mdscapa_get(inode);
86
87	if (LLIF_DATA_MODIFIED & ll_i2info(inode)->lli_flags)
88		op_data->op_bias |= MDS_DATA_MODIFIED;
89}
90
91/**
92 * Closes the IO epoch and packs all the attributes into @op_data for
93 * the CLOSE rpc.
94 */
95static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
96			     struct obd_client_handle *och)
97{
98	ENTRY;
99
100	op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME | ATTR_ATIME_SET |
101					ATTR_MTIME | ATTR_MTIME_SET |
102					ATTR_CTIME | ATTR_CTIME_SET;
103
104	if (!(och->och_flags & FMODE_WRITE))
105		goto out;
106
107	if (!exp_connect_som(ll_i2mdexp(inode)) || !S_ISREG(inode->i_mode))
108		op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
109	else
110		ll_ioepoch_close(inode, op_data, &och, 0);
111
112out:
113	ll_pack_inode2opdata(inode, op_data, &och->och_fh);
114	ll_prep_md_op_data(op_data, inode, NULL, NULL,
115			   0, 0, LUSTRE_OPC_ANY, NULL);
116	EXIT;
117}
118
119static int ll_close_inode_openhandle(struct obd_export *md_exp,
120				     struct inode *inode,
121				     struct obd_client_handle *och)
122{
123	struct obd_export *exp = ll_i2mdexp(inode);
124	struct md_op_data *op_data;
125	struct ptlrpc_request *req = NULL;
126	struct obd_device *obd = class_exp2obd(exp);
127	int epoch_close = 1;
128	int rc;
129	ENTRY;
130
131	if (obd == NULL) {
132		/*
133		 * XXX: in case of LMV, is this correct to access
134		 * ->exp_handle?
135		 */
136		CERROR("Invalid MDC connection handle "LPX64"\n",
137		       ll_i2mdexp(inode)->exp_handle.h_cookie);
138		GOTO(out, rc = 0);
139	}
140
141	OBD_ALLOC_PTR(op_data);
142	if (op_data == NULL)
143		GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
144
145	ll_prepare_close(inode, op_data, och);
146	epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
147	rc = md_close(md_exp, op_data, och->och_mod, &req);
148	if (rc == -EAGAIN) {
149		/* This close must have the epoch closed. */
150		LASSERT(epoch_close);
151		/* MDS has instructed us to obtain Size-on-MDS attribute from
152		 * OSTs and send setattr to back to MDS. */
153		rc = ll_som_update(inode, op_data);
154		if (rc) {
155			CERROR("inode %lu mdc Size-on-MDS update failed: "
156			       "rc = %d\n", inode->i_ino, rc);
157			rc = 0;
158		}
159	} else if (rc) {
160		CERROR("inode %lu mdc close failed: rc = %d\n",
161		       inode->i_ino, rc);
162	}
163
164	/* DATA_MODIFIED flag was successfully sent on close, cancel data
165	 * modification flag. */
166	if (rc == 0 && (op_data->op_bias & MDS_DATA_MODIFIED)) {
167		struct ll_inode_info *lli = ll_i2info(inode);
168
169		spin_lock(&lli->lli_lock);
170		lli->lli_flags &= ~LLIF_DATA_MODIFIED;
171		spin_unlock(&lli->lli_lock);
172	}
173
174	ll_finish_md_op_data(op_data);
175
176	if (rc == 0) {
177		rc = ll_objects_destroy(req, inode);
178		if (rc)
179			CERROR("inode %lu ll_objects destroy: rc = %d\n",
180			       inode->i_ino, rc);
181	}
182
183	EXIT;
184out:
185
186	if (exp_connect_som(exp) && !epoch_close &&
187	    S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
188		ll_queue_done_writing(inode, LLIF_DONE_WRITING);
189	} else {
190		md_clear_open_replay_data(md_exp, och);
191		/* Free @och if it is not waiting for DONE_WRITING. */
192		och->och_fh.cookie = DEAD_HANDLE_MAGIC;
193		OBD_FREE_PTR(och);
194	}
195	if (req) /* This is close request */
196		ptlrpc_req_finished(req);
197	return rc;
198}
199
200int ll_md_real_close(struct inode *inode, int flags)
201{
202	struct ll_inode_info *lli = ll_i2info(inode);
203	struct obd_client_handle **och_p;
204	struct obd_client_handle *och;
205	__u64 *och_usecount;
206	int rc = 0;
207	ENTRY;
208
209	if (flags & FMODE_WRITE) {
210		och_p = &lli->lli_mds_write_och;
211		och_usecount = &lli->lli_open_fd_write_count;
212	} else if (flags & FMODE_EXEC) {
213		och_p = &lli->lli_mds_exec_och;
214		och_usecount = &lli->lli_open_fd_exec_count;
215	} else {
216		LASSERT(flags & FMODE_READ);
217		och_p = &lli->lli_mds_read_och;
218		och_usecount = &lli->lli_open_fd_read_count;
219	}
220
221	mutex_lock(&lli->lli_och_mutex);
222	if (*och_usecount) { /* There are still users of this handle, so
223				skip freeing it. */
224		mutex_unlock(&lli->lli_och_mutex);
225		RETURN(0);
226	}
227	och=*och_p;
228	*och_p = NULL;
229	mutex_unlock(&lli->lli_och_mutex);
230
231	if (och) { /* There might be a race and somebody have freed this och
232		      already */
233		rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
234					       inode, och);
235	}
236
237	RETURN(rc);
238}
239
240int ll_md_close(struct obd_export *md_exp, struct inode *inode,
241		struct file *file)
242{
243	struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
244	struct ll_inode_info *lli = ll_i2info(inode);
245	int rc = 0;
246	ENTRY;
247
248	/* clear group lock, if present */
249	if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED))
250		ll_put_grouplock(inode, file, fd->fd_grouplock.cg_gid);
251
252	/* Let's see if we have good enough OPEN lock on the file and if
253	   we can skip talking to MDS */
254	if (file->f_dentry->d_inode) { /* Can this ever be false? */
255		int lockmode;
256		int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
257		struct lustre_handle lockh;
258		struct inode *inode = file->f_dentry->d_inode;
259		ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
260
261		mutex_lock(&lli->lli_och_mutex);
262		if (fd->fd_omode & FMODE_WRITE) {
263			lockmode = LCK_CW;
264			LASSERT(lli->lli_open_fd_write_count);
265			lli->lli_open_fd_write_count--;
266		} else if (fd->fd_omode & FMODE_EXEC) {
267			lockmode = LCK_PR;
268			LASSERT(lli->lli_open_fd_exec_count);
269			lli->lli_open_fd_exec_count--;
270		} else {
271			lockmode = LCK_CR;
272			LASSERT(lli->lli_open_fd_read_count);
273			lli->lli_open_fd_read_count--;
274		}
275		mutex_unlock(&lli->lli_och_mutex);
276
277		if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
278				   LDLM_IBITS, &policy, lockmode,
279				   &lockh)) {
280			rc = ll_md_real_close(file->f_dentry->d_inode,
281					      fd->fd_omode);
282		}
283	} else {
284		CERROR("Releasing a file %p with negative dentry %p. Name %s",
285		       file, file->f_dentry, file->f_dentry->d_name.name);
286	}
287
288	LUSTRE_FPRIVATE(file) = NULL;
289	ll_file_data_put(fd);
290	ll_capa_close(inode);
291
292	RETURN(rc);
293}
294
295/* While this returns an error code, fput() the caller does not, so we need
296 * to make every effort to clean up all of our state here.  Also, applications
297 * rarely check close errors and even if an error is returned they will not
298 * re-try the close call.
299 */
300int ll_file_release(struct inode *inode, struct file *file)
301{
302	struct ll_file_data *fd;
303	struct ll_sb_info *sbi = ll_i2sbi(inode);
304	struct ll_inode_info *lli = ll_i2info(inode);
305	int rc;
306	ENTRY;
307
308	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
309	       inode->i_generation, inode);
310
311#ifdef CONFIG_FS_POSIX_ACL
312	if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
313	    inode == inode->i_sb->s_root->d_inode) {
314		struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
315
316		LASSERT(fd != NULL);
317		if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
318			fd->fd_flags &= ~LL_FILE_RMTACL;
319			rct_del(&sbi->ll_rct, current_pid());
320			et_search_free(&sbi->ll_et, current_pid());
321		}
322	}
323#endif
324
325	if (inode->i_sb->s_root != file->f_dentry)
326		ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
327	fd = LUSTRE_FPRIVATE(file);
328	LASSERT(fd != NULL);
329
330	/* The last ref on @file, maybe not the the owner pid of statahead.
331	 * Different processes can open the same dir, "ll_opendir_key" means:
332	 * it is me that should stop the statahead thread. */
333	if (S_ISDIR(inode->i_mode) && lli->lli_opendir_key == fd &&
334	    lli->lli_opendir_pid != 0)
335		ll_stop_statahead(inode, lli->lli_opendir_key);
336
337	if (inode->i_sb->s_root == file->f_dentry) {
338		LUSTRE_FPRIVATE(file) = NULL;
339		ll_file_data_put(fd);
340		RETURN(0);
341	}
342
343	if (!S_ISDIR(inode->i_mode)) {
344		lov_read_and_clear_async_rc(lli->lli_clob);
345		lli->lli_async_rc = 0;
346	}
347
348	rc = ll_md_close(sbi->ll_md_exp, inode, file);
349
350	if (CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_DUMP_LOG, cfs_fail_val))
351		libcfs_debug_dumplog();
352
353	RETURN(rc);
354}
355
356static int ll_intent_file_open(struct file *file, void *lmm,
357			       int lmmsize, struct lookup_intent *itp)
358{
359	struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
360	struct dentry *parent = file->f_dentry->d_parent;
361	const char *name = file->f_dentry->d_name.name;
362	const int len = file->f_dentry->d_name.len;
363	struct md_op_data *op_data;
364	struct ptlrpc_request *req;
365	__u32 opc = LUSTRE_OPC_ANY;
366	int rc;
367	ENTRY;
368
369	if (!parent)
370		RETURN(-ENOENT);
371
372	/* Usually we come here only for NFSD, and we want open lock.
373	   But we can also get here with pre 2.6.15 patchless kernels, and in
374	   that case that lock is also ok */
375	/* We can also get here if there was cached open handle in revalidate_it
376	 * but it disappeared while we were getting from there to ll_file_open.
377	 * But this means this file was closed and immediatelly opened which
378	 * makes a good candidate for using OPEN lock */
379	/* If lmmsize & lmm are not 0, we are just setting stripe info
380	 * parameters. No need for the open lock */
381	if (lmm == NULL && lmmsize == 0) {
382		itp->it_flags |= MDS_OPEN_LOCK;
383		if (itp->it_flags & FMODE_WRITE)
384			opc = LUSTRE_OPC_CREATE;
385	}
386
387	op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
388				      file->f_dentry->d_inode, name, len,
389				      O_RDWR, opc, NULL);
390	if (IS_ERR(op_data))
391		RETURN(PTR_ERR(op_data));
392
393	itp->it_flags |= MDS_OPEN_BY_FID;
394	rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
395			    0 /*unused */, &req, ll_md_blocking_ast, 0);
396	ll_finish_md_op_data(op_data);
397	if (rc == -ESTALE) {
398		/* reason for keep own exit path - don`t flood log
399		* with messages with -ESTALE errors.
400		*/
401		if (!it_disposition(itp, DISP_OPEN_OPEN) ||
402		     it_open_error(DISP_OPEN_OPEN, itp))
403			GOTO(out, rc);
404		ll_release_openhandle(file->f_dentry, itp);
405		GOTO(out, rc);
406	}
407
408	if (it_disposition(itp, DISP_LOOKUP_NEG))
409		GOTO(out, rc = -ENOENT);
410
411	if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
412		rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
413		CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
414		GOTO(out, rc);
415	}
416
417	rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL, itp);
418	if (!rc && itp->d.lustre.it_lock_mode)
419		ll_set_lock_data(sbi->ll_md_exp, file->f_dentry->d_inode,
420				 itp, NULL);
421
422out:
423	ptlrpc_req_finished(itp->d.lustre.it_data);
424	it_clear_disposition(itp, DISP_ENQ_COMPLETE);
425	ll_intent_drop_lock(itp);
426
427	RETURN(rc);
428}
429
430/**
431 * Assign an obtained @ioepoch to client's inode. No lock is needed, MDS does
432 * not believe attributes if a few ioepoch holders exist. Attributes for
433 * previous ioepoch if new one is opened are also skipped by MDS.
434 */
435void ll_ioepoch_open(struct ll_inode_info *lli, __u64 ioepoch)
436{
437	if (ioepoch && lli->lli_ioepoch != ioepoch) {
438		lli->lli_ioepoch = ioepoch;
439		CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
440		       ioepoch, PFID(&lli->lli_fid));
441	}
442}
443
444static int ll_och_fill(struct obd_export *md_exp, struct ll_inode_info *lli,
445		       struct lookup_intent *it, struct obd_client_handle *och)
446{
447	struct ptlrpc_request *req = it->d.lustre.it_data;
448	struct mdt_body *body;
449
450	LASSERT(och);
451
452	body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
453	LASSERT(body != NULL);		      /* reply already checked out */
454
455	memcpy(&och->och_fh, &body->handle, sizeof(body->handle));
456	och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
457	och->och_fid = lli->lli_fid;
458	och->och_flags = it->it_flags;
459	ll_ioepoch_open(lli, body->ioepoch);
460
461	return md_set_open_replay_data(md_exp, och, req);
462}
463
464int ll_local_open(struct file *file, struct lookup_intent *it,
465		  struct ll_file_data *fd, struct obd_client_handle *och)
466{
467	struct inode *inode = file->f_dentry->d_inode;
468	struct ll_inode_info *lli = ll_i2info(inode);
469	ENTRY;
470
471	LASSERT(!LUSTRE_FPRIVATE(file));
472
473	LASSERT(fd != NULL);
474
475	if (och) {
476		struct ptlrpc_request *req = it->d.lustre.it_data;
477		struct mdt_body *body;
478		int rc;
479
480		rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, lli, it, och);
481		if (rc)
482			RETURN(rc);
483
484		body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
485		if ((it->it_flags & FMODE_WRITE) &&
486		    (body->valid & OBD_MD_FLSIZE))
487			CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
488			       lli->lli_ioepoch, PFID(&lli->lli_fid));
489	}
490
491	LUSTRE_FPRIVATE(file) = fd;
492	ll_readahead_init(inode, &fd->fd_ras);
493	fd->fd_omode = it->it_flags;
494	RETURN(0);
495}
496
497/* Open a file, and (for the very first open) create objects on the OSTs at
498 * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
499 * creation or open until ll_lov_setstripe() ioctl is called.
500 *
501 * If we already have the stripe MD locally then we don't request it in
502 * md_open(), by passing a lmm_size = 0.
503 *
504 * It is up to the application to ensure no other processes open this file
505 * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
506 * used.  We might be able to avoid races of that sort by getting lli_open_sem
507 * before returning in the O_LOV_DELAY_CREATE case and dropping it here
508 * or in ll_file_release(), but I'm not sure that is desirable/necessary.
509 */
510int ll_file_open(struct inode *inode, struct file *file)
511{
512	struct ll_inode_info *lli = ll_i2info(inode);
513	struct lookup_intent *it, oit = { .it_op = IT_OPEN,
514					  .it_flags = file->f_flags };
515	struct obd_client_handle **och_p = NULL;
516	__u64 *och_usecount = NULL;
517	struct ll_file_data *fd;
518	int rc = 0, opendir_set = 0;
519	ENTRY;
520
521	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
522	       inode->i_generation, inode, file->f_flags);
523
524	it = file->private_data; /* XXX: compat macro */
525	file->private_data = NULL; /* prevent ll_local_open assertion */
526
527	fd = ll_file_data_get();
528	if (fd == NULL)
529		GOTO(out_openerr, rc = -ENOMEM);
530
531	fd->fd_file = file;
532	if (S_ISDIR(inode->i_mode)) {
533		spin_lock(&lli->lli_sa_lock);
534		if (lli->lli_opendir_key == NULL && lli->lli_sai == NULL &&
535		    lli->lli_opendir_pid == 0) {
536			lli->lli_opendir_key = fd;
537			lli->lli_opendir_pid = current_pid();
538			opendir_set = 1;
539		}
540		spin_unlock(&lli->lli_sa_lock);
541	}
542
543	if (inode->i_sb->s_root == file->f_dentry) {
544		LUSTRE_FPRIVATE(file) = fd;
545		RETURN(0);
546	}
547
548	if (!it || !it->d.lustre.it_disposition) {
549		/* Convert f_flags into access mode. We cannot use file->f_mode,
550		 * because everything but O_ACCMODE mask was stripped from
551		 * there */
552		if ((oit.it_flags + 1) & O_ACCMODE)
553			oit.it_flags++;
554		if (file->f_flags & O_TRUNC)
555			oit.it_flags |= FMODE_WRITE;
556
557		/* kernel only call f_op->open in dentry_open.  filp_open calls
558		 * dentry_open after call to open_namei that checks permissions.
559		 * Only nfsd_open call dentry_open directly without checking
560		 * permissions and because of that this code below is safe. */
561		if (oit.it_flags & (FMODE_WRITE | FMODE_READ))
562			oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
563
564		/* We do not want O_EXCL here, presumably we opened the file
565		 * already? XXX - NFS implications? */
566		oit.it_flags &= ~O_EXCL;
567
568		/* bug20584, if "it_flags" contains O_CREAT, the file will be
569		 * created if necessary, then "IT_CREAT" should be set to keep
570		 * consistent with it */
571		if (oit.it_flags & O_CREAT)
572			oit.it_op |= IT_CREAT;
573
574		it = &oit;
575	}
576
577restart:
578	/* Let's see if we have file open on MDS already. */
579	if (it->it_flags & FMODE_WRITE) {
580		och_p = &lli->lli_mds_write_och;
581		och_usecount = &lli->lli_open_fd_write_count;
582	} else if (it->it_flags & FMODE_EXEC) {
583		och_p = &lli->lli_mds_exec_och;
584		och_usecount = &lli->lli_open_fd_exec_count;
585	 } else {
586		och_p = &lli->lli_mds_read_och;
587		och_usecount = &lli->lli_open_fd_read_count;
588	}
589
590	mutex_lock(&lli->lli_och_mutex);
591	if (*och_p) { /* Open handle is present */
592		if (it_disposition(it, DISP_OPEN_OPEN)) {
593			/* Well, there's extra open request that we do not need,
594			   let's close it somehow. This will decref request. */
595			rc = it_open_error(DISP_OPEN_OPEN, it);
596			if (rc) {
597				mutex_unlock(&lli->lli_och_mutex);
598				GOTO(out_openerr, rc);
599			}
600
601			ll_release_openhandle(file->f_dentry, it);
602		}
603		(*och_usecount)++;
604
605		rc = ll_local_open(file, it, fd, NULL);
606		if (rc) {
607			(*och_usecount)--;
608			mutex_unlock(&lli->lli_och_mutex);
609			GOTO(out_openerr, rc);
610		}
611	} else {
612		LASSERT(*och_usecount == 0);
613		if (!it->d.lustre.it_disposition) {
614			/* We cannot just request lock handle now, new ELC code
615			   means that one of other OPEN locks for this file
616			   could be cancelled, and since blocking ast handler
617			   would attempt to grab och_mutex as well, that would
618			   result in a deadlock */
619			mutex_unlock(&lli->lli_och_mutex);
620			it->it_create_mode |= M_CHECK_STALE;
621			rc = ll_intent_file_open(file, NULL, 0, it);
622			it->it_create_mode &= ~M_CHECK_STALE;
623			if (rc)
624				GOTO(out_openerr, rc);
625
626			goto restart;
627		}
628		OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
629		if (!*och_p)
630			GOTO(out_och_free, rc = -ENOMEM);
631
632		(*och_usecount)++;
633
634		/* md_intent_lock() didn't get a request ref if there was an
635		 * open error, so don't do cleanup on the request here
636		 * (bug 3430) */
637		/* XXX (green): Should not we bail out on any error here, not
638		 * just open error? */
639		rc = it_open_error(DISP_OPEN_OPEN, it);
640		if (rc)
641			GOTO(out_och_free, rc);
642
643		LASSERT(it_disposition(it, DISP_ENQ_OPEN_REF));
644
645		rc = ll_local_open(file, it, fd, *och_p);
646		if (rc)
647			GOTO(out_och_free, rc);
648	}
649	mutex_unlock(&lli->lli_och_mutex);
650	fd = NULL;
651
652	/* Must do this outside lli_och_mutex lock to prevent deadlock where
653	   different kind of OPEN lock for this same inode gets cancelled
654	   by ldlm_cancel_lru */
655	if (!S_ISREG(inode->i_mode))
656		GOTO(out_och_free, rc);
657
658	ll_capa_open(inode);
659
660	if (!lli->lli_has_smd) {
661		if (file->f_flags & O_LOV_DELAY_CREATE ||
662		    !(file->f_mode & FMODE_WRITE)) {
663			CDEBUG(D_INODE, "object creation was delayed\n");
664			GOTO(out_och_free, rc);
665		}
666	}
667	file->f_flags &= ~O_LOV_DELAY_CREATE;
668	GOTO(out_och_free, rc);
669
670out_och_free:
671	if (rc) {
672		if (och_p && *och_p) {
673			OBD_FREE(*och_p, sizeof (struct obd_client_handle));
674			*och_p = NULL; /* OBD_FREE writes some magic there */
675			(*och_usecount)--;
676		}
677		mutex_unlock(&lli->lli_och_mutex);
678
679out_openerr:
680		if (opendir_set != 0)
681			ll_stop_statahead(inode, lli->lli_opendir_key);
682		if (fd != NULL)
683			ll_file_data_put(fd);
684	} else {
685		ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
686	}
687
688	if (it && it_disposition(it, DISP_ENQ_OPEN_REF)) {
689		ptlrpc_req_finished(it->d.lustre.it_data);
690		it_clear_disposition(it, DISP_ENQ_OPEN_REF);
691	}
692
693	return rc;
694}
695
696/* Fills the obdo with the attributes for the lsm */
697static int ll_lsm_getattr(struct lov_stripe_md *lsm, struct obd_export *exp,
698			  struct obd_capa *capa, struct obdo *obdo,
699			  __u64 ioepoch, int sync)
700{
701	struct ptlrpc_request_set *set;
702	struct obd_info	    oinfo = { { { 0 } } };
703	int			rc;
704
705	ENTRY;
706
707	LASSERT(lsm != NULL);
708
709	oinfo.oi_md = lsm;
710	oinfo.oi_oa = obdo;
711	oinfo.oi_oa->o_oi = lsm->lsm_oi;
712	oinfo.oi_oa->o_mode = S_IFREG;
713	oinfo.oi_oa->o_ioepoch = ioepoch;
714	oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
715			       OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
716			       OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
717			       OBD_MD_FLMTIME | OBD_MD_FLCTIME |
718			       OBD_MD_FLGROUP | OBD_MD_FLEPOCH |
719			       OBD_MD_FLDATAVERSION;
720	oinfo.oi_capa = capa;
721	if (sync) {
722		oinfo.oi_oa->o_valid |= OBD_MD_FLFLAGS;
723		oinfo.oi_oa->o_flags |= OBD_FL_SRVLOCK;
724	}
725
726	set = ptlrpc_prep_set();
727	if (set == NULL) {
728		CERROR("can't allocate ptlrpc set\n");
729		rc = -ENOMEM;
730	} else {
731		rc = obd_getattr_async(exp, &oinfo, set);
732		if (rc == 0)
733			rc = ptlrpc_set_wait(set);
734		ptlrpc_set_destroy(set);
735	}
736	if (rc == 0)
737		oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
738					 OBD_MD_FLATIME | OBD_MD_FLMTIME |
739					 OBD_MD_FLCTIME | OBD_MD_FLSIZE |
740					 OBD_MD_FLDATAVERSION);
741	RETURN(rc);
742}
743
744/**
745  * Performs the getattr on the inode and updates its fields.
746  * If @sync != 0, perform the getattr under the server-side lock.
747  */
748int ll_inode_getattr(struct inode *inode, struct obdo *obdo,
749		     __u64 ioepoch, int sync)
750{
751	struct obd_capa      *capa = ll_mdscapa_get(inode);
752	struct lov_stripe_md *lsm;
753	int rc;
754	ENTRY;
755
756	lsm = ccc_inode_lsm_get(inode);
757	rc = ll_lsm_getattr(lsm, ll_i2dtexp(inode),
758			    capa, obdo, ioepoch, sync);
759	capa_put(capa);
760	if (rc == 0) {
761		struct ost_id *oi = lsm ? &lsm->lsm_oi : &obdo->o_oi;
762
763		obdo_refresh_inode(inode, obdo, obdo->o_valid);
764		CDEBUG(D_INODE, "objid "DOSTID" size %llu, blocks %llu,"
765		       " blksize %lu\n", POSTID(oi), i_size_read(inode),
766		       (unsigned long long)inode->i_blocks,
767		       (unsigned long)ll_inode_blksize(inode));
768	}
769	ccc_inode_lsm_put(inode, lsm);
770	RETURN(rc);
771}
772
773int ll_merge_lvb(const struct lu_env *env, struct inode *inode)
774{
775	struct ll_inode_info *lli = ll_i2info(inode);
776	struct cl_object *obj = lli->lli_clob;
777	struct cl_attr *attr = ccc_env_thread_attr(env);
778	struct ost_lvb lvb;
779	int rc = 0;
780
781	ENTRY;
782
783	ll_inode_size_lock(inode);
784	/* merge timestamps the most recently obtained from mds with
785	   timestamps obtained from osts */
786	LTIME_S(inode->i_atime) = lli->lli_lvb.lvb_atime;
787	LTIME_S(inode->i_mtime) = lli->lli_lvb.lvb_mtime;
788	LTIME_S(inode->i_ctime) = lli->lli_lvb.lvb_ctime;
789	inode_init_lvb(inode, &lvb);
790
791	cl_object_attr_lock(obj);
792	rc = cl_object_attr_get(env, obj, attr);
793	cl_object_attr_unlock(obj);
794
795	if (rc == 0) {
796		if (lvb.lvb_atime < attr->cat_atime)
797			lvb.lvb_atime = attr->cat_atime;
798		if (lvb.lvb_ctime < attr->cat_ctime)
799			lvb.lvb_ctime = attr->cat_ctime;
800		if (lvb.lvb_mtime < attr->cat_mtime)
801			lvb.lvb_mtime = attr->cat_mtime;
802
803		CDEBUG(D_VFSTRACE, DFID" updating i_size "LPU64"\n",
804				PFID(&lli->lli_fid), attr->cat_size);
805		cl_isize_write_nolock(inode, attr->cat_size);
806
807		inode->i_blocks = attr->cat_blocks;
808
809		LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
810		LTIME_S(inode->i_atime) = lvb.lvb_atime;
811		LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
812	}
813	ll_inode_size_unlock(inode);
814
815	RETURN(rc);
816}
817
818int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
819		     lstat_t *st)
820{
821	struct obdo obdo = { 0 };
822	int rc;
823
824	rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, &obdo, 0, 0);
825	if (rc == 0) {
826		st->st_size   = obdo.o_size;
827		st->st_blocks = obdo.o_blocks;
828		st->st_mtime  = obdo.o_mtime;
829		st->st_atime  = obdo.o_atime;
830		st->st_ctime  = obdo.o_ctime;
831	}
832	return rc;
833}
834
835void ll_io_init(struct cl_io *io, const struct file *file, int write)
836{
837	struct inode *inode = file->f_dentry->d_inode;
838
839	io->u.ci_rw.crw_nonblock = file->f_flags & O_NONBLOCK;
840	if (write) {
841		io->u.ci_wr.wr_append = !!(file->f_flags & O_APPEND);
842		io->u.ci_wr.wr_sync = file->f_flags & O_SYNC ||
843				      file->f_flags & O_DIRECT ||
844				      IS_SYNC(inode);
845	}
846	io->ci_obj     = ll_i2info(inode)->lli_clob;
847	io->ci_lockreq = CILR_MAYBE;
848	if (ll_file_nolock(file)) {
849		io->ci_lockreq = CILR_NEVER;
850		io->ci_no_srvlock = 1;
851	} else if (file->f_flags & O_APPEND) {
852		io->ci_lockreq = CILR_MANDATORY;
853	}
854}
855
856static ssize_t
857ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
858		   struct file *file, enum cl_io_type iot,
859		   loff_t *ppos, size_t count)
860{
861	struct ll_inode_info *lli = ll_i2info(file->f_dentry->d_inode);
862	struct ll_file_data  *fd  = LUSTRE_FPRIVATE(file);
863	struct cl_io	 *io;
864	ssize_t	       result;
865	ENTRY;
866
867restart:
868	io = ccc_env_thread_io(env);
869	ll_io_init(io, file, iot == CIT_WRITE);
870
871	if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) {
872		struct vvp_io *vio = vvp_env_io(env);
873		struct ccc_io *cio = ccc_env_io(env);
874		int write_mutex_locked = 0;
875
876		cio->cui_fd  = LUSTRE_FPRIVATE(file);
877		vio->cui_io_subtype = args->via_io_subtype;
878
879		switch (vio->cui_io_subtype) {
880		case IO_NORMAL:
881			cio->cui_iov = args->u.normal.via_iov;
882			cio->cui_nrsegs = args->u.normal.via_nrsegs;
883			cio->cui_tot_nrsegs = cio->cui_nrsegs;
884			cio->cui_iocb = args->u.normal.via_iocb;
885			if ((iot == CIT_WRITE) &&
886			    !(cio->cui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
887				if (mutex_lock_interruptible(&lli->
888							       lli_write_mutex))
889					GOTO(out, result = -ERESTARTSYS);
890				write_mutex_locked = 1;
891			} else if (iot == CIT_READ) {
892				down_read(&lli->lli_trunc_sem);
893			}
894			break;
895		case IO_SENDFILE:
896			vio->u.sendfile.cui_actor = args->u.sendfile.via_actor;
897			vio->u.sendfile.cui_target = args->u.sendfile.via_target;
898			break;
899		case IO_SPLICE:
900			vio->u.splice.cui_pipe = args->u.splice.via_pipe;
901			vio->u.splice.cui_flags = args->u.splice.via_flags;
902			break;
903		default:
904			CERROR("Unknow IO type - %u\n", vio->cui_io_subtype);
905			LBUG();
906		}
907		result = cl_io_loop(env, io);
908		if (write_mutex_locked)
909			mutex_unlock(&lli->lli_write_mutex);
910		else if (args->via_io_subtype == IO_NORMAL && iot == CIT_READ)
911			up_read(&lli->lli_trunc_sem);
912	} else {
913		/* cl_io_rw_init() handled IO */
914		result = io->ci_result;
915	}
916
917	if (io->ci_nob > 0) {
918		result = io->ci_nob;
919		*ppos = io->u.ci_wr.wr.crw_pos;
920	}
921	GOTO(out, result);
922out:
923	cl_io_fini(env, io);
924	/* If any bit been read/written (result != 0), we just return
925	 * short read/write instead of restart io. */
926	if (result == 0 && io->ci_need_restart) {
927		CDEBUG(D_VFSTRACE, "Restart %s on %s from %lld, count:%zd\n",
928		       iot == CIT_READ ? "read" : "write",
929		       file->f_dentry->d_name.name, *ppos, count);
930		LASSERTF(io->ci_nob == 0, "%zd", io->ci_nob);
931		goto restart;
932	}
933
934	if (iot == CIT_READ) {
935		if (result >= 0)
936			ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
937					   LPROC_LL_READ_BYTES, result);
938	} else if (iot == CIT_WRITE) {
939		if (result >= 0) {
940			ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
941					   LPROC_LL_WRITE_BYTES, result);
942			fd->fd_write_failed = false;
943		} else if (result != -ERESTARTSYS) {
944			fd->fd_write_failed = true;
945		}
946	}
947
948	return result;
949}
950
951
952/*
953 * XXX: exact copy from kernel code (__generic_file_aio_write_nolock)
954 */
955static int ll_file_get_iov_count(const struct iovec *iov,
956				 unsigned long *nr_segs, size_t *count)
957{
958	size_t cnt = 0;
959	unsigned long seg;
960
961	for (seg = 0; seg < *nr_segs; seg++) {
962		const struct iovec *iv = &iov[seg];
963
964		/*
965		 * If any segment has a negative length, or the cumulative
966		 * length ever wraps negative then return -EINVAL.
967		 */
968		cnt += iv->iov_len;
969		if (unlikely((ssize_t)(cnt|iv->iov_len) < 0))
970			return -EINVAL;
971		if (access_ok(VERIFY_READ, iv->iov_base, iv->iov_len))
972			continue;
973		if (seg == 0)
974			return -EFAULT;
975		*nr_segs = seg;
976		cnt -= iv->iov_len;   /* This segment is no good */
977		break;
978	}
979	*count = cnt;
980	return 0;
981}
982
983static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
984				unsigned long nr_segs, loff_t pos)
985{
986	struct lu_env      *env;
987	struct vvp_io_args *args;
988	size_t	      count;
989	ssize_t	     result;
990	int		 refcheck;
991	ENTRY;
992
993	result = ll_file_get_iov_count(iov, &nr_segs, &count);
994	if (result)
995		RETURN(result);
996
997	env = cl_env_get(&refcheck);
998	if (IS_ERR(env))
999		RETURN(PTR_ERR(env));
1000
1001	args = vvp_env_args(env, IO_NORMAL);
1002	args->u.normal.via_iov = (struct iovec *)iov;
1003	args->u.normal.via_nrsegs = nr_segs;
1004	args->u.normal.via_iocb = iocb;
1005
1006	result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ,
1007				    &iocb->ki_pos, count);
1008	cl_env_put(env, &refcheck);
1009	RETURN(result);
1010}
1011
1012static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1013			    loff_t *ppos)
1014{
1015	struct lu_env *env;
1016	struct iovec  *local_iov;
1017	struct kiocb  *kiocb;
1018	ssize_t	result;
1019	int	    refcheck;
1020	ENTRY;
1021
1022	env = cl_env_get(&refcheck);
1023	if (IS_ERR(env))
1024		RETURN(PTR_ERR(env));
1025
1026	local_iov = &vvp_env_info(env)->vti_local_iov;
1027	kiocb = &vvp_env_info(env)->vti_kiocb;
1028	local_iov->iov_base = (void __user *)buf;
1029	local_iov->iov_len = count;
1030	init_sync_kiocb(kiocb, file);
1031	kiocb->ki_pos = *ppos;
1032	kiocb->ki_left = count;
1033
1034	result = ll_file_aio_read(kiocb, local_iov, 1, kiocb->ki_pos);
1035	*ppos = kiocb->ki_pos;
1036
1037	cl_env_put(env, &refcheck);
1038	RETURN(result);
1039}
1040
1041/*
1042 * Write to a file (through the page cache).
1043 */
1044static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
1045				 unsigned long nr_segs, loff_t pos)
1046{
1047	struct lu_env      *env;
1048	struct vvp_io_args *args;
1049	size_t	      count;
1050	ssize_t	     result;
1051	int		 refcheck;
1052	ENTRY;
1053
1054	result = ll_file_get_iov_count(iov, &nr_segs, &count);
1055	if (result)
1056		RETURN(result);
1057
1058	env = cl_env_get(&refcheck);
1059	if (IS_ERR(env))
1060		RETURN(PTR_ERR(env));
1061
1062	args = vvp_env_args(env, IO_NORMAL);
1063	args->u.normal.via_iov = (struct iovec *)iov;
1064	args->u.normal.via_nrsegs = nr_segs;
1065	args->u.normal.via_iocb = iocb;
1066
1067	result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_WRITE,
1068				  &iocb->ki_pos, count);
1069	cl_env_put(env, &refcheck);
1070	RETURN(result);
1071}
1072
1073static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1074			     loff_t *ppos)
1075{
1076	struct lu_env *env;
1077	struct iovec  *local_iov;
1078	struct kiocb  *kiocb;
1079	ssize_t	result;
1080	int	    refcheck;
1081	ENTRY;
1082
1083	env = cl_env_get(&refcheck);
1084	if (IS_ERR(env))
1085		RETURN(PTR_ERR(env));
1086
1087	local_iov = &vvp_env_info(env)->vti_local_iov;
1088	kiocb = &vvp_env_info(env)->vti_kiocb;
1089	local_iov->iov_base = (void __user *)buf;
1090	local_iov->iov_len = count;
1091	init_sync_kiocb(kiocb, file);
1092	kiocb->ki_pos = *ppos;
1093	kiocb->ki_left = count;
1094
1095	result = ll_file_aio_write(kiocb, local_iov, 1, kiocb->ki_pos);
1096	*ppos = kiocb->ki_pos;
1097
1098	cl_env_put(env, &refcheck);
1099	RETURN(result);
1100}
1101
1102
1103
1104/*
1105 * Send file content (through pagecache) somewhere with helper
1106 */
1107static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
1108				   struct pipe_inode_info *pipe, size_t count,
1109				   unsigned int flags)
1110{
1111	struct lu_env      *env;
1112	struct vvp_io_args *args;
1113	ssize_t	     result;
1114	int		 refcheck;
1115	ENTRY;
1116
1117	env = cl_env_get(&refcheck);
1118	if (IS_ERR(env))
1119		RETURN(PTR_ERR(env));
1120
1121	args = vvp_env_args(env, IO_SPLICE);
1122	args->u.splice.via_pipe = pipe;
1123	args->u.splice.via_flags = flags;
1124
1125	result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1126	cl_env_put(env, &refcheck);
1127	RETURN(result);
1128}
1129
1130static int ll_lov_recreate(struct inode *inode, struct ost_id *oi,
1131			   obd_count ost_idx)
1132{
1133	struct obd_export *exp = ll_i2dtexp(inode);
1134	struct obd_trans_info oti = { 0 };
1135	struct obdo *oa = NULL;
1136	int lsm_size;
1137	int rc = 0;
1138	struct lov_stripe_md *lsm = NULL, *lsm2;
1139	ENTRY;
1140
1141	OBDO_ALLOC(oa);
1142	if (oa == NULL)
1143		RETURN(-ENOMEM);
1144
1145	lsm = ccc_inode_lsm_get(inode);
1146	if (!lsm_has_objects(lsm))
1147		GOTO(out, rc = -ENOENT);
1148
1149	lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1150		   (lsm->lsm_stripe_count));
1151
1152	OBD_ALLOC_LARGE(lsm2, lsm_size);
1153	if (lsm2 == NULL)
1154		GOTO(out, rc = -ENOMEM);
1155
1156	oa->o_oi = *oi;
1157	oa->o_nlink = ost_idx;
1158	oa->o_flags |= OBD_FL_RECREATE_OBJS;
1159	oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1160	obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1161				   OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1162	obdo_set_parent_fid(oa, &ll_i2info(inode)->lli_fid);
1163	memcpy(lsm2, lsm, lsm_size);
1164	ll_inode_size_lock(inode);
1165	rc = obd_create(NULL, exp, oa, &lsm2, &oti);
1166	ll_inode_size_unlock(inode);
1167
1168	OBD_FREE_LARGE(lsm2, lsm_size);
1169	GOTO(out, rc);
1170out:
1171	ccc_inode_lsm_put(inode, lsm);
1172	OBDO_FREE(oa);
1173	return rc;
1174}
1175
1176static int ll_lov_recreate_obj(struct inode *inode, unsigned long arg)
1177{
1178	struct ll_recreate_obj ucreat;
1179	struct ost_id		oi;
1180	ENTRY;
1181
1182	if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1183		RETURN(-EPERM);
1184
1185	if (copy_from_user(&ucreat, (struct ll_recreate_obj *)arg,
1186			   sizeof(ucreat)))
1187		RETURN(-EFAULT);
1188
1189	ostid_set_seq_mdt0(&oi);
1190	ostid_set_id(&oi, ucreat.lrc_id);
1191	RETURN(ll_lov_recreate(inode, &oi, ucreat.lrc_ost_idx));
1192}
1193
1194static int ll_lov_recreate_fid(struct inode *inode, unsigned long arg)
1195{
1196	struct lu_fid	fid;
1197	struct ost_id	oi;
1198	obd_count	ost_idx;
1199	ENTRY;
1200
1201	if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1202		RETURN(-EPERM);
1203
1204	if (copy_from_user(&fid, (struct lu_fid *)arg, sizeof(fid)))
1205		RETURN(-EFAULT);
1206
1207	fid_to_ostid(&fid, &oi);
1208	ost_idx = (fid_seq(&fid) >> 16) & 0xffff;
1209	RETURN(ll_lov_recreate(inode, &oi, ost_idx));
1210}
1211
1212int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1213			     int flags, struct lov_user_md *lum, int lum_size)
1214{
1215	struct lov_stripe_md *lsm = NULL;
1216	struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1217	int rc = 0;
1218	ENTRY;
1219
1220	lsm = ccc_inode_lsm_get(inode);
1221	if (lsm != NULL) {
1222		ccc_inode_lsm_put(inode, lsm);
1223		CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1224		       inode->i_ino);
1225		RETURN(-EEXIST);
1226	}
1227
1228	ll_inode_size_lock(inode);
1229	rc = ll_intent_file_open(file, lum, lum_size, &oit);
1230	if (rc)
1231		GOTO(out, rc);
1232	rc = oit.d.lustre.it_status;
1233	if (rc < 0)
1234		GOTO(out_req_free, rc);
1235
1236	ll_release_openhandle(file->f_dentry, &oit);
1237
1238 out:
1239	ll_inode_size_unlock(inode);
1240	ll_intent_release(&oit);
1241	ccc_inode_lsm_put(inode, lsm);
1242	RETURN(rc);
1243out_req_free:
1244	ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1245	goto out;
1246}
1247
1248int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
1249			     struct lov_mds_md **lmmp, int *lmm_size,
1250			     struct ptlrpc_request **request)
1251{
1252	struct ll_sb_info *sbi = ll_i2sbi(inode);
1253	struct mdt_body  *body;
1254	struct lov_mds_md *lmm = NULL;
1255	struct ptlrpc_request *req = NULL;
1256	struct md_op_data *op_data;
1257	int rc, lmmsize;
1258
1259	rc = ll_get_max_mdsize(sbi, &lmmsize);
1260	if (rc)
1261		RETURN(rc);
1262
1263	op_data = ll_prep_md_op_data(NULL, inode, NULL, filename,
1264				     strlen(filename), lmmsize,
1265				     LUSTRE_OPC_ANY, NULL);
1266	if (IS_ERR(op_data))
1267		RETURN(PTR_ERR(op_data));
1268
1269	op_data->op_valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA;
1270	rc = md_getattr_name(sbi->ll_md_exp, op_data, &req);
1271	ll_finish_md_op_data(op_data);
1272	if (rc < 0) {
1273		CDEBUG(D_INFO, "md_getattr_name failed "
1274		       "on %s: rc %d\n", filename, rc);
1275		GOTO(out, rc);
1276	}
1277
1278	body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1279	LASSERT(body != NULL); /* checked by mdc_getattr_name */
1280
1281	lmmsize = body->eadatasize;
1282
1283	if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1284			lmmsize == 0) {
1285		GOTO(out, rc = -ENODATA);
1286	}
1287
1288	lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
1289	LASSERT(lmm != NULL);
1290
1291	if ((lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1)) &&
1292	    (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3))) {
1293		GOTO(out, rc = -EPROTO);
1294	}
1295
1296	/*
1297	 * This is coming from the MDS, so is probably in
1298	 * little endian.  We convert it to host endian before
1299	 * passing it to userspace.
1300	 */
1301	if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) {
1302		int stripe_count;
1303
1304		stripe_count = le16_to_cpu(lmm->lmm_stripe_count);
1305		if (le32_to_cpu(lmm->lmm_pattern) & LOV_PATTERN_F_RELEASED)
1306			stripe_count = 0;
1307
1308		/* if function called for directory - we should
1309		 * avoid swab not existent lsm objects */
1310		if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) {
1311			lustre_swab_lov_user_md_v1((struct lov_user_md_v1 *)lmm);
1312			if (S_ISREG(body->mode))
1313				lustre_swab_lov_user_md_objects(
1314				 ((struct lov_user_md_v1 *)lmm)->lmm_objects,
1315				 stripe_count);
1316		} else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) {
1317			lustre_swab_lov_user_md_v3((struct lov_user_md_v3 *)lmm);
1318			if (S_ISREG(body->mode))
1319				lustre_swab_lov_user_md_objects(
1320				 ((struct lov_user_md_v3 *)lmm)->lmm_objects,
1321				 stripe_count);
1322		}
1323	}
1324
1325out:
1326	*lmmp = lmm;
1327	*lmm_size = lmmsize;
1328	*request = req;
1329	return rc;
1330}
1331
1332static int ll_lov_setea(struct inode *inode, struct file *file,
1333			    unsigned long arg)
1334{
1335	int			 flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1336	struct lov_user_md	*lump;
1337	int			 lum_size = sizeof(struct lov_user_md) +
1338					    sizeof(struct lov_user_ost_data);
1339	int			 rc;
1340	ENTRY;
1341
1342	if (!cfs_capable(CFS_CAP_SYS_ADMIN))
1343		RETURN(-EPERM);
1344
1345	OBD_ALLOC_LARGE(lump, lum_size);
1346	if (lump == NULL)
1347		RETURN(-ENOMEM);
1348
1349	if (copy_from_user(lump, (struct lov_user_md  *)arg, lum_size)) {
1350		OBD_FREE_LARGE(lump, lum_size);
1351		RETURN(-EFAULT);
1352	}
1353
1354	rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1355
1356	OBD_FREE_LARGE(lump, lum_size);
1357	RETURN(rc);
1358}
1359
1360static int ll_lov_setstripe(struct inode *inode, struct file *file,
1361			    unsigned long arg)
1362{
1363	struct lov_user_md_v3	 lumv3;
1364	struct lov_user_md_v1	*lumv1 = (struct lov_user_md_v1 *)&lumv3;
1365	struct lov_user_md_v1	*lumv1p = (struct lov_user_md_v1 *)arg;
1366	struct lov_user_md_v3	*lumv3p = (struct lov_user_md_v3 *)arg;
1367	int			 lum_size, rc;
1368	int			 flags = FMODE_WRITE;
1369	ENTRY;
1370
1371	/* first try with v1 which is smaller than v3 */
1372	lum_size = sizeof(struct lov_user_md_v1);
1373	if (copy_from_user(lumv1, lumv1p, lum_size))
1374		RETURN(-EFAULT);
1375
1376	if (lumv1->lmm_magic == LOV_USER_MAGIC_V3) {
1377		lum_size = sizeof(struct lov_user_md_v3);
1378		if (copy_from_user(&lumv3, lumv3p, lum_size))
1379			RETURN(-EFAULT);
1380	}
1381
1382	rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
1383	if (rc == 0) {
1384		struct lov_stripe_md *lsm;
1385		__u32 gen;
1386
1387		put_user(0, &lumv1p->lmm_stripe_count);
1388
1389		ll_layout_refresh(inode, &gen);
1390		lsm = ccc_inode_lsm_get(inode);
1391		rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1392				   0, lsm, (void *)arg);
1393		ccc_inode_lsm_put(inode, lsm);
1394	}
1395	RETURN(rc);
1396}
1397
1398static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1399{
1400	struct lov_stripe_md *lsm;
1401	int rc = -ENODATA;
1402	ENTRY;
1403
1404	lsm = ccc_inode_lsm_get(inode);
1405	if (lsm != NULL)
1406		rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0,
1407				   lsm, (void *)arg);
1408	ccc_inode_lsm_put(inode, lsm);
1409	RETURN(rc);
1410}
1411
1412int ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1413{
1414	struct ll_inode_info   *lli = ll_i2info(inode);
1415	struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1416	struct ccc_grouplock    grouplock;
1417	int		     rc;
1418	ENTRY;
1419
1420	if (ll_file_nolock(file))
1421		RETURN(-EOPNOTSUPP);
1422
1423	spin_lock(&lli->lli_lock);
1424	if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1425		CWARN("group lock already existed with gid %lu\n",
1426		      fd->fd_grouplock.cg_gid);
1427		spin_unlock(&lli->lli_lock);
1428		RETURN(-EINVAL);
1429	}
1430	LASSERT(fd->fd_grouplock.cg_lock == NULL);
1431	spin_unlock(&lli->lli_lock);
1432
1433	rc = cl_get_grouplock(cl_i2info(inode)->lli_clob,
1434			      arg, (file->f_flags & O_NONBLOCK), &grouplock);
1435	if (rc)
1436		RETURN(rc);
1437
1438	spin_lock(&lli->lli_lock);
1439	if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1440		spin_unlock(&lli->lli_lock);
1441		CERROR("another thread just won the race\n");
1442		cl_put_grouplock(&grouplock);
1443		RETURN(-EINVAL);
1444	}
1445
1446	fd->fd_flags |= LL_FILE_GROUP_LOCKED;
1447	fd->fd_grouplock = grouplock;
1448	spin_unlock(&lli->lli_lock);
1449
1450	CDEBUG(D_INFO, "group lock %lu obtained\n", arg);
1451	RETURN(0);
1452}
1453
1454int ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1455{
1456	struct ll_inode_info   *lli = ll_i2info(inode);
1457	struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1458	struct ccc_grouplock    grouplock;
1459	ENTRY;
1460
1461	spin_lock(&lli->lli_lock);
1462	if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1463		spin_unlock(&lli->lli_lock);
1464		CWARN("no group lock held\n");
1465		RETURN(-EINVAL);
1466	}
1467	LASSERT(fd->fd_grouplock.cg_lock != NULL);
1468
1469	if (fd->fd_grouplock.cg_gid != arg) {
1470		CWARN("group lock %lu doesn't match current id %lu\n",
1471		       arg, fd->fd_grouplock.cg_gid);
1472		spin_unlock(&lli->lli_lock);
1473		RETURN(-EINVAL);
1474	}
1475
1476	grouplock = fd->fd_grouplock;
1477	memset(&fd->fd_grouplock, 0, sizeof(fd->fd_grouplock));
1478	fd->fd_flags &= ~LL_FILE_GROUP_LOCKED;
1479	spin_unlock(&lli->lli_lock);
1480
1481	cl_put_grouplock(&grouplock);
1482	CDEBUG(D_INFO, "group lock %lu released\n", arg);
1483	RETURN(0);
1484}
1485
1486/**
1487 * Close inode open handle
1488 *
1489 * \param dentry [in]     dentry which contains the inode
1490 * \param it     [in,out] intent which contains open info and result
1491 *
1492 * \retval 0     success
1493 * \retval <0    failure
1494 */
1495int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
1496{
1497	struct inode *inode = dentry->d_inode;
1498	struct obd_client_handle *och;
1499	int rc;
1500	ENTRY;
1501
1502	LASSERT(inode);
1503
1504	/* Root ? Do nothing. */
1505	if (dentry->d_inode->i_sb->s_root == dentry)
1506		RETURN(0);
1507
1508	/* No open handle to close? Move away */
1509	if (!it_disposition(it, DISP_OPEN_OPEN))
1510		RETURN(0);
1511
1512	LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
1513
1514	OBD_ALLOC(och, sizeof(*och));
1515	if (!och)
1516		GOTO(out, rc = -ENOMEM);
1517
1518	ll_och_fill(ll_i2sbi(inode)->ll_md_exp,
1519		    ll_i2info(inode), it, och);
1520
1521	rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
1522				       inode, och);
1523 out:
1524	/* this one is in place of ll_file_open */
1525	if (it_disposition(it, DISP_ENQ_OPEN_REF)) {
1526		ptlrpc_req_finished(it->d.lustre.it_data);
1527		it_clear_disposition(it, DISP_ENQ_OPEN_REF);
1528	}
1529	RETURN(rc);
1530}
1531
1532/**
1533 * Get size for inode for which FIEMAP mapping is requested.
1534 * Make the FIEMAP get_info call and returns the result.
1535 */
1536int ll_do_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
1537	      int num_bytes)
1538{
1539	struct obd_export *exp = ll_i2dtexp(inode);
1540	struct lov_stripe_md *lsm = NULL;
1541	struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
1542	int vallen = num_bytes;
1543	int rc;
1544	ENTRY;
1545
1546	/* Checks for fiemap flags */
1547	if (fiemap->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
1548		fiemap->fm_flags &= ~LUSTRE_FIEMAP_FLAGS_COMPAT;
1549		return -EBADR;
1550	}
1551
1552	/* Check for FIEMAP_FLAG_SYNC */
1553	if (fiemap->fm_flags & FIEMAP_FLAG_SYNC) {
1554		rc = filemap_fdatawrite(inode->i_mapping);
1555		if (rc)
1556			return rc;
1557	}
1558
1559	lsm = ccc_inode_lsm_get(inode);
1560	if (lsm == NULL)
1561		return -ENOENT;
1562
1563	/* If the stripe_count > 1 and the application does not understand
1564	 * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
1565	 */
1566	if (lsm->lsm_stripe_count > 1 &&
1567	    !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
1568		GOTO(out, rc = -EOPNOTSUPP);
1569
1570	fm_key.oa.o_oi = lsm->lsm_oi;
1571	fm_key.oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
1572
1573	obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLSIZE);
1574	obdo_set_parent_fid(&fm_key.oa, &ll_i2info(inode)->lli_fid);
1575	/* If filesize is 0, then there would be no objects for mapping */
1576	if (fm_key.oa.o_size == 0) {
1577		fiemap->fm_mapped_extents = 0;
1578		GOTO(out, rc = 0);
1579	}
1580
1581	memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
1582
1583	rc = obd_get_info(NULL, exp, sizeof(fm_key), &fm_key, &vallen,
1584			  fiemap, lsm);
1585	if (rc)
1586		CERROR("obd_get_info failed: rc = %d\n", rc);
1587
1588out:
1589	ccc_inode_lsm_put(inode, lsm);
1590	RETURN(rc);
1591}
1592
1593int ll_fid2path(struct inode *inode, void *arg)
1594{
1595	struct obd_export	*exp = ll_i2mdexp(inode);
1596	struct getinfo_fid2path	*gfout, *gfin;
1597	int			 outsize, rc;
1598	ENTRY;
1599
1600	if (!cfs_capable(CFS_CAP_DAC_READ_SEARCH) &&
1601	    !(ll_i2sbi(inode)->ll_flags & LL_SBI_USER_FID2PATH))
1602		RETURN(-EPERM);
1603
1604	/* Need to get the buflen */
1605	OBD_ALLOC_PTR(gfin);
1606	if (gfin == NULL)
1607		RETURN(-ENOMEM);
1608	if (copy_from_user(gfin, arg, sizeof(*gfin))) {
1609		OBD_FREE_PTR(gfin);
1610		RETURN(-EFAULT);
1611	}
1612
1613	outsize = sizeof(*gfout) + gfin->gf_pathlen;
1614	OBD_ALLOC(gfout, outsize);
1615	if (gfout == NULL) {
1616		OBD_FREE_PTR(gfin);
1617		RETURN(-ENOMEM);
1618	}
1619	memcpy(gfout, gfin, sizeof(*gfout));
1620	OBD_FREE_PTR(gfin);
1621
1622	/* Call mdc_iocontrol */
1623	rc = obd_iocontrol(OBD_IOC_FID2PATH, exp, outsize, gfout, NULL);
1624	if (rc)
1625		GOTO(gf_free, rc);
1626
1627	if (copy_to_user(arg, gfout, outsize))
1628		rc = -EFAULT;
1629
1630gf_free:
1631	OBD_FREE(gfout, outsize);
1632	RETURN(rc);
1633}
1634
1635static int ll_ioctl_fiemap(struct inode *inode, unsigned long arg)
1636{
1637	struct ll_user_fiemap *fiemap_s;
1638	size_t num_bytes, ret_bytes;
1639	unsigned int extent_count;
1640	int rc = 0;
1641
1642	/* Get the extent count so we can calculate the size of
1643	 * required fiemap buffer */
1644	if (get_user(extent_count,
1645	    &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
1646		RETURN(-EFAULT);
1647	num_bytes = sizeof(*fiemap_s) + (extent_count *
1648					 sizeof(struct ll_fiemap_extent));
1649
1650	OBD_ALLOC_LARGE(fiemap_s, num_bytes);
1651	if (fiemap_s == NULL)
1652		RETURN(-ENOMEM);
1653
1654	/* get the fiemap value */
1655	if (copy_from_user(fiemap_s, (struct ll_user_fiemap __user *)arg,
1656			   sizeof(*fiemap_s)))
1657		GOTO(error, rc = -EFAULT);
1658
1659	/* If fm_extent_count is non-zero, read the first extent since
1660	 * it is used to calculate end_offset and device from previous
1661	 * fiemap call. */
1662	if (extent_count) {
1663		if (copy_from_user(&fiemap_s->fm_extents[0],
1664		    (char __user *)arg + sizeof(*fiemap_s),
1665		    sizeof(struct ll_fiemap_extent)))
1666			GOTO(error, rc = -EFAULT);
1667	}
1668
1669	rc = ll_do_fiemap(inode, fiemap_s, num_bytes);
1670	if (rc)
1671		GOTO(error, rc);
1672
1673	ret_bytes = sizeof(struct ll_user_fiemap);
1674
1675	if (extent_count != 0)
1676		ret_bytes += (fiemap_s->fm_mapped_extents *
1677				 sizeof(struct ll_fiemap_extent));
1678
1679	if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
1680		rc = -EFAULT;
1681
1682error:
1683	OBD_FREE_LARGE(fiemap_s, num_bytes);
1684	RETURN(rc);
1685}
1686
1687/*
1688 * Read the data_version for inode.
1689 *
1690 * This value is computed using stripe object version on OST.
1691 * Version is computed using server side locking.
1692 *
1693 * @param extent_lock  Take extent lock. Not needed if a process is already
1694 *		       holding the OST object group locks.
1695 */
1696int ll_data_version(struct inode *inode, __u64 *data_version,
1697		    int extent_lock)
1698{
1699	struct lov_stripe_md	*lsm = NULL;
1700	struct ll_sb_info	*sbi = ll_i2sbi(inode);
1701	struct obdo		*obdo = NULL;
1702	int			 rc;
1703	ENTRY;
1704
1705	/* If no stripe, we consider version is 0. */
1706	lsm = ccc_inode_lsm_get(inode);
1707	if (!lsm_has_objects(lsm)) {
1708		*data_version = 0;
1709		CDEBUG(D_INODE, "No object for inode\n");
1710		GOTO(out, rc = 0);
1711	}
1712
1713	OBD_ALLOC_PTR(obdo);
1714	if (obdo == NULL)
1715		GOTO(out, rc = -ENOMEM);
1716
1717	rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, obdo, 0, extent_lock);
1718	if (rc == 0) {
1719		if (!(obdo->o_valid & OBD_MD_FLDATAVERSION))
1720			rc = -EOPNOTSUPP;
1721		else
1722			*data_version = obdo->o_data_version;
1723	}
1724
1725	OBD_FREE_PTR(obdo);
1726	EXIT;
1727out:
1728	ccc_inode_lsm_put(inode, lsm);
1729	RETURN(rc);
1730}
1731
1732struct ll_swap_stack {
1733	struct iattr		 ia1, ia2;
1734	__u64			 dv1, dv2;
1735	struct inode		*inode1, *inode2;
1736	bool			 check_dv1, check_dv2;
1737};
1738
1739static int ll_swap_layouts(struct file *file1, struct file *file2,
1740			   struct lustre_swap_layouts *lsl)
1741{
1742	struct mdc_swap_layouts	 msl;
1743	struct md_op_data	*op_data;
1744	__u32			 gid;
1745	__u64			 dv;
1746	struct ll_swap_stack	*llss = NULL;
1747	int			 rc;
1748
1749	OBD_ALLOC_PTR(llss);
1750	if (llss == NULL)
1751		RETURN(-ENOMEM);
1752
1753	llss->inode1 = file1->f_dentry->d_inode;
1754	llss->inode2 = file2->f_dentry->d_inode;
1755
1756	if (!S_ISREG(llss->inode2->i_mode))
1757		GOTO(free, rc = -EINVAL);
1758
1759	if (ll_permission(llss->inode1, MAY_WRITE, NULL) ||
1760	    ll_permission(llss->inode2, MAY_WRITE, NULL))
1761		GOTO(free, rc = -EPERM);
1762
1763	if (llss->inode2->i_sb != llss->inode1->i_sb)
1764		GOTO(free, rc = -EXDEV);
1765
1766	/* we use 2 bool because it is easier to swap than 2 bits */
1767	if (lsl->sl_flags & SWAP_LAYOUTS_CHECK_DV1)
1768		llss->check_dv1 = true;
1769
1770	if (lsl->sl_flags & SWAP_LAYOUTS_CHECK_DV2)
1771		llss->check_dv2 = true;
1772
1773	/* we cannot use lsl->sl_dvX directly because we may swap them */
1774	llss->dv1 = lsl->sl_dv1;
1775	llss->dv2 = lsl->sl_dv2;
1776
1777	rc = lu_fid_cmp(ll_inode2fid(llss->inode1), ll_inode2fid(llss->inode2));
1778	if (rc == 0) /* same file, done! */
1779		GOTO(free, rc = 0);
1780
1781	if (rc < 0) { /* sequentialize it */
1782		swap(llss->inode1, llss->inode2);
1783		swap(file1, file2);
1784		swap(llss->dv1, llss->dv2);
1785		swap(llss->check_dv1, llss->check_dv2);
1786	}
1787
1788	gid = lsl->sl_gid;
1789	if (gid != 0) { /* application asks to flush dirty cache */
1790		rc = ll_get_grouplock(llss->inode1, file1, gid);
1791		if (rc < 0)
1792			GOTO(free, rc);
1793
1794		rc = ll_get_grouplock(llss->inode2, file2, gid);
1795		if (rc < 0) {
1796			ll_put_grouplock(llss->inode1, file1, gid);
1797			GOTO(free, rc);
1798		}
1799	}
1800
1801	/* to be able to restore mtime and atime after swap
1802	 * we need to first save them */
1803	if (lsl->sl_flags &
1804	    (SWAP_LAYOUTS_KEEP_MTIME | SWAP_LAYOUTS_KEEP_ATIME)) {
1805		llss->ia1.ia_mtime = llss->inode1->i_mtime;
1806		llss->ia1.ia_atime = llss->inode1->i_atime;
1807		llss->ia1.ia_valid = ATTR_MTIME | ATTR_ATIME;
1808		llss->ia2.ia_mtime = llss->inode2->i_mtime;
1809		llss->ia2.ia_atime = llss->inode2->i_atime;
1810		llss->ia2.ia_valid = ATTR_MTIME | ATTR_ATIME;
1811	}
1812
1813	/* ultimate check, before swaping the layouts we check if
1814	 * dataversion has changed (if requested) */
1815	if (llss->check_dv1) {
1816		rc = ll_data_version(llss->inode1, &dv, 0);
1817		if (rc)
1818			GOTO(putgl, rc);
1819		if (dv != llss->dv1)
1820			GOTO(putgl, rc = -EAGAIN);
1821	}
1822
1823	if (llss->check_dv2) {
1824		rc = ll_data_version(llss->inode2, &dv, 0);
1825		if (rc)
1826			GOTO(putgl, rc);
1827		if (dv != llss->dv2)
1828			GOTO(putgl, rc = -EAGAIN);
1829	}
1830
1831	/* struct md_op_data is used to send the swap args to the mdt
1832	 * only flags is missing, so we use struct mdc_swap_layouts
1833	 * through the md_op_data->op_data */
1834	/* flags from user space have to be converted before they are send to
1835	 * server, no flag is sent today, they are only used on the client */
1836	msl.msl_flags = 0;
1837	rc = -ENOMEM;
1838	op_data = ll_prep_md_op_data(NULL, llss->inode1, llss->inode2, NULL, 0,
1839				     0, LUSTRE_OPC_ANY, &msl);
1840	if (IS_ERR(op_data))
1841		GOTO(free, rc = PTR_ERR(op_data));
1842
1843	rc = obd_iocontrol(LL_IOC_LOV_SWAP_LAYOUTS, ll_i2mdexp(llss->inode1),
1844			   sizeof(*op_data), op_data, NULL);
1845	ll_finish_md_op_data(op_data);
1846
1847putgl:
1848	if (gid != 0) {
1849		ll_put_grouplock(llss->inode2, file2, gid);
1850		ll_put_grouplock(llss->inode1, file1, gid);
1851	}
1852
1853	/* rc can be set from obd_iocontrol() or from a GOTO(putgl, ...) */
1854	if (rc != 0)
1855		GOTO(free, rc);
1856
1857	/* clear useless flags */
1858	if (!(lsl->sl_flags & SWAP_LAYOUTS_KEEP_MTIME)) {
1859		llss->ia1.ia_valid &= ~ATTR_MTIME;
1860		llss->ia2.ia_valid &= ~ATTR_MTIME;
1861	}
1862
1863	if (!(lsl->sl_flags & SWAP_LAYOUTS_KEEP_ATIME)) {
1864		llss->ia1.ia_valid &= ~ATTR_ATIME;
1865		llss->ia2.ia_valid &= ~ATTR_ATIME;
1866	}
1867
1868	/* update time if requested */
1869	rc = 0;
1870	if (llss->ia2.ia_valid != 0) {
1871		mutex_lock(&llss->inode1->i_mutex);
1872		rc = ll_setattr(file1->f_dentry, &llss->ia2);
1873		mutex_unlock(&llss->inode1->i_mutex);
1874	}
1875
1876	if (llss->ia1.ia_valid != 0) {
1877		int rc1;
1878
1879		mutex_lock(&llss->inode2->i_mutex);
1880		rc1 = ll_setattr(file2->f_dentry, &llss->ia1);
1881		mutex_unlock(&llss->inode2->i_mutex);
1882		if (rc == 0)
1883			rc = rc1;
1884	}
1885
1886free:
1887	if (llss != NULL)
1888		OBD_FREE_PTR(llss);
1889
1890	RETURN(rc);
1891}
1892
1893long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
1894{
1895	struct inode		*inode = file->f_dentry->d_inode;
1896	struct ll_file_data	*fd = LUSTRE_FPRIVATE(file);
1897	int			 flags, rc;
1898	ENTRY;
1899
1900	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
1901	       inode->i_generation, inode, cmd);
1902	ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
1903
1904	/* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
1905	if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
1906		RETURN(-ENOTTY);
1907
1908	switch(cmd) {
1909	case LL_IOC_GETFLAGS:
1910		/* Get the current value of the file flags */
1911		return put_user(fd->fd_flags, (int *)arg);
1912	case LL_IOC_SETFLAGS:
1913	case LL_IOC_CLRFLAGS:
1914		/* Set or clear specific file flags */
1915		/* XXX This probably needs checks to ensure the flags are
1916		 *     not abused, and to handle any flag side effects.
1917		 */
1918		if (get_user(flags, (int *) arg))
1919			RETURN(-EFAULT);
1920
1921		if (cmd == LL_IOC_SETFLAGS) {
1922			if ((flags & LL_FILE_IGNORE_LOCK) &&
1923			    !(file->f_flags & O_DIRECT)) {
1924				CERROR("%s: unable to disable locking on "
1925				       "non-O_DIRECT file\n", current->comm);
1926				RETURN(-EINVAL);
1927			}
1928
1929			fd->fd_flags |= flags;
1930		} else {
1931			fd->fd_flags &= ~flags;
1932		}
1933		RETURN(0);
1934	case LL_IOC_LOV_SETSTRIPE:
1935		RETURN(ll_lov_setstripe(inode, file, arg));
1936	case LL_IOC_LOV_SETEA:
1937		RETURN(ll_lov_setea(inode, file, arg));
1938	case LL_IOC_LOV_SWAP_LAYOUTS: {
1939		struct file *file2;
1940		struct lustre_swap_layouts lsl;
1941
1942		if (copy_from_user(&lsl, (char *)arg,
1943				       sizeof(struct lustre_swap_layouts)))
1944			RETURN(-EFAULT);
1945
1946		if ((file->f_flags & O_ACCMODE) == 0) /* O_RDONLY */
1947			RETURN(-EPERM);
1948
1949		file2 = fget(lsl.sl_fd);
1950		if (file2 == NULL)
1951			RETURN(-EBADF);
1952
1953		rc = -EPERM;
1954		if ((file2->f_flags & O_ACCMODE) != 0) /* O_WRONLY or O_RDWR */
1955			rc = ll_swap_layouts(file, file2, &lsl);
1956		fput(file2);
1957		RETURN(rc);
1958	}
1959	case LL_IOC_LOV_GETSTRIPE:
1960		RETURN(ll_lov_getstripe(inode, arg));
1961	case LL_IOC_RECREATE_OBJ:
1962		RETURN(ll_lov_recreate_obj(inode, arg));
1963	case LL_IOC_RECREATE_FID:
1964		RETURN(ll_lov_recreate_fid(inode, arg));
1965	case FSFILT_IOC_FIEMAP:
1966		RETURN(ll_ioctl_fiemap(inode, arg));
1967	case FSFILT_IOC_GETFLAGS:
1968	case FSFILT_IOC_SETFLAGS:
1969		RETURN(ll_iocontrol(inode, file, cmd, arg));
1970	case FSFILT_IOC_GETVERSION_OLD:
1971	case FSFILT_IOC_GETVERSION:
1972		RETURN(put_user(inode->i_generation, (int *)arg));
1973	case LL_IOC_GROUP_LOCK:
1974		RETURN(ll_get_grouplock(inode, file, arg));
1975	case LL_IOC_GROUP_UNLOCK:
1976		RETURN(ll_put_grouplock(inode, file, arg));
1977	case IOC_OBD_STATFS:
1978		RETURN(ll_obd_statfs(inode, (void *)arg));
1979
1980	/* We need to special case any other ioctls we want to handle,
1981	 * to send them to the MDS/OST as appropriate and to properly
1982	 * network encode the arg field.
1983	case FSFILT_IOC_SETVERSION_OLD:
1984	case FSFILT_IOC_SETVERSION:
1985	*/
1986	case LL_IOC_FLUSHCTX:
1987		RETURN(ll_flush_ctx(inode));
1988	case LL_IOC_PATH2FID: {
1989		if (copy_to_user((void *)arg, ll_inode2fid(inode),
1990				 sizeof(struct lu_fid)))
1991			RETURN(-EFAULT);
1992
1993		RETURN(0);
1994	}
1995	case OBD_IOC_FID2PATH:
1996		RETURN(ll_fid2path(inode, (void *)arg));
1997	case LL_IOC_DATA_VERSION: {
1998		struct ioc_data_version	idv;
1999		int			rc;
2000
2001		if (copy_from_user(&idv, (char *)arg, sizeof(idv)))
2002			RETURN(-EFAULT);
2003
2004		rc = ll_data_version(inode, &idv.idv_version,
2005				!(idv.idv_flags & LL_DV_NOFLUSH));
2006
2007		if (rc == 0 && copy_to_user((char *) arg, &idv, sizeof(idv)))
2008			RETURN(-EFAULT);
2009
2010		RETURN(rc);
2011	}
2012
2013	case LL_IOC_GET_MDTIDX: {
2014		int mdtidx;
2015
2016		mdtidx = ll_get_mdt_idx(inode);
2017		if (mdtidx < 0)
2018			RETURN(mdtidx);
2019
2020		if (put_user((int)mdtidx, (int*)arg))
2021			RETURN(-EFAULT);
2022
2023		RETURN(0);
2024	}
2025	case OBD_IOC_GETDTNAME:
2026	case OBD_IOC_GETMDNAME:
2027		RETURN(ll_get_obd_name(inode, cmd, arg));
2028	case LL_IOC_HSM_STATE_GET: {
2029		struct md_op_data	*op_data;
2030		struct hsm_user_state	*hus;
2031		int			 rc;
2032
2033		OBD_ALLOC_PTR(hus);
2034		if (hus == NULL)
2035			RETURN(-ENOMEM);
2036
2037		op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2038					     LUSTRE_OPC_ANY, hus);
2039		if (IS_ERR(op_data)) {
2040			OBD_FREE_PTR(hus);
2041			RETURN(PTR_ERR(op_data));
2042		}
2043
2044		rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2045				   op_data, NULL);
2046
2047		if (copy_to_user((void *)arg, hus, sizeof(*hus)))
2048			rc = -EFAULT;
2049
2050		ll_finish_md_op_data(op_data);
2051		OBD_FREE_PTR(hus);
2052		RETURN(rc);
2053	}
2054	case LL_IOC_HSM_STATE_SET: {
2055		struct md_op_data	*op_data;
2056		struct hsm_state_set	*hss;
2057		int			 rc;
2058
2059		OBD_ALLOC_PTR(hss);
2060		if (hss == NULL)
2061			RETURN(-ENOMEM);
2062		if (copy_from_user(hss, (char *)arg, sizeof(*hss))) {
2063			OBD_FREE_PTR(hss);
2064			RETURN(-EFAULT);
2065		}
2066
2067		/* Non-root users are forbidden to set or clear flags which are
2068		 * NOT defined in HSM_USER_MASK. */
2069		if (((hss->hss_setmask | hss->hss_clearmask) & ~HSM_USER_MASK)
2070		    && !cfs_capable(CFS_CAP_SYS_ADMIN)) {
2071			OBD_FREE_PTR(hss);
2072			RETURN(-EPERM);
2073		}
2074
2075		op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2076					     LUSTRE_OPC_ANY, hss);
2077		if (IS_ERR(op_data)) {
2078			OBD_FREE_PTR(hss);
2079			RETURN(PTR_ERR(op_data));
2080		}
2081
2082		rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2083				   op_data, NULL);
2084
2085		ll_finish_md_op_data(op_data);
2086
2087		OBD_FREE_PTR(hss);
2088		RETURN(rc);
2089	}
2090	case LL_IOC_HSM_ACTION: {
2091		struct md_op_data		*op_data;
2092		struct hsm_current_action	*hca;
2093		int				 rc;
2094
2095		OBD_ALLOC_PTR(hca);
2096		if (hca == NULL)
2097			RETURN(-ENOMEM);
2098
2099		op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2100					     LUSTRE_OPC_ANY, hca);
2101		if (IS_ERR(op_data)) {
2102			OBD_FREE_PTR(hca);
2103			RETURN(PTR_ERR(op_data));
2104		}
2105
2106		rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2107				   op_data, NULL);
2108
2109		if (copy_to_user((char *)arg, hca, sizeof(*hca)))
2110			rc = -EFAULT;
2111
2112		ll_finish_md_op_data(op_data);
2113		OBD_FREE_PTR(hca);
2114		RETURN(rc);
2115	}
2116	default: {
2117		int err;
2118
2119		if (LLIOC_STOP ==
2120		     ll_iocontrol_call(inode, file, cmd, arg, &err))
2121			RETURN(err);
2122
2123		RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2124				     (void *)arg));
2125	}
2126	}
2127}
2128
2129
2130loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2131{
2132	struct inode *inode = file->f_dentry->d_inode;
2133	loff_t retval, eof = 0;
2134
2135	ENTRY;
2136	retval = offset + ((origin == SEEK_END) ? i_size_read(inode) :
2137			   (origin == SEEK_CUR) ? file->f_pos : 0);
2138	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%llu=%#llx(%d)\n",
2139	       inode->i_ino, inode->i_generation, inode, retval, retval,
2140	       origin);
2141	ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2142
2143	if (origin == SEEK_END || origin == SEEK_HOLE || origin == SEEK_DATA) {
2144		retval = ll_glimpse_size(inode);
2145		if (retval != 0)
2146			RETURN(retval);
2147		eof = i_size_read(inode);
2148	}
2149
2150	retval = generic_file_llseek_size(file, offset, origin,
2151					  ll_file_maxbytes(inode), eof);
2152	RETURN(retval);
2153}
2154
2155int ll_flush(struct file *file, fl_owner_t id)
2156{
2157	struct inode *inode = file->f_dentry->d_inode;
2158	struct ll_inode_info *lli = ll_i2info(inode);
2159	struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2160	int rc, err;
2161
2162	LASSERT(!S_ISDIR(inode->i_mode));
2163
2164	/* catch async errors that were recorded back when async writeback
2165	 * failed for pages in this mapping. */
2166	rc = lli->lli_async_rc;
2167	lli->lli_async_rc = 0;
2168	err = lov_read_and_clear_async_rc(lli->lli_clob);
2169	if (rc == 0)
2170		rc = err;
2171
2172	/* The application has been told write failure already.
2173	 * Do not report failure again. */
2174	if (fd->fd_write_failed)
2175		return 0;
2176	return rc ? -EIO : 0;
2177}
2178
2179/**
2180 * Called to make sure a portion of file has been written out.
2181 * if @local_only is not true, it will send OST_SYNC RPCs to ost.
2182 *
2183 * Return how many pages have been written.
2184 */
2185int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end,
2186		       enum cl_fsync_mode mode, int ignore_layout)
2187{
2188	struct cl_env_nest nest;
2189	struct lu_env *env;
2190	struct cl_io *io;
2191	struct obd_capa *capa = NULL;
2192	struct cl_fsync_io *fio;
2193	int result;
2194	ENTRY;
2195
2196	if (mode != CL_FSYNC_NONE && mode != CL_FSYNC_LOCAL &&
2197	    mode != CL_FSYNC_DISCARD && mode != CL_FSYNC_ALL)
2198		RETURN(-EINVAL);
2199
2200	env = cl_env_nested_get(&nest);
2201	if (IS_ERR(env))
2202		RETURN(PTR_ERR(env));
2203
2204	capa = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2205
2206	io = ccc_env_thread_io(env);
2207	io->ci_obj = cl_i2info(inode)->lli_clob;
2208	io->ci_ignore_layout = ignore_layout;
2209
2210	/* initialize parameters for sync */
2211	fio = &io->u.ci_fsync;
2212	fio->fi_capa = capa;
2213	fio->fi_start = start;
2214	fio->fi_end = end;
2215	fio->fi_fid = ll_inode2fid(inode);
2216	fio->fi_mode = mode;
2217	fio->fi_nr_written = 0;
2218
2219	if (cl_io_init(env, io, CIT_FSYNC, io->ci_obj) == 0)
2220		result = cl_io_loop(env, io);
2221	else
2222		result = io->ci_result;
2223	if (result == 0)
2224		result = fio->fi_nr_written;
2225	cl_io_fini(env, io);
2226	cl_env_nested_put(&nest, env);
2227
2228	capa_put(capa);
2229
2230	RETURN(result);
2231}
2232
2233/*
2234 * When dentry is provided (the 'else' case), *file->f_dentry may be
2235 * null and dentry must be used directly rather than pulled from
2236 * *file->f_dentry as is done otherwise.
2237 */
2238
2239int ll_fsync(struct file *file, loff_t start, loff_t end, int datasync)
2240{
2241	struct dentry *dentry = file->f_dentry;
2242	struct inode *inode = dentry->d_inode;
2243	struct ll_inode_info *lli = ll_i2info(inode);
2244	struct ptlrpc_request *req;
2245	struct obd_capa *oc;
2246	int rc, err;
2247	ENTRY;
2248
2249	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2250	       inode->i_generation, inode);
2251	ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2252
2253	rc = filemap_write_and_wait_range(inode->i_mapping, start, end);
2254	mutex_lock(&inode->i_mutex);
2255
2256	/* catch async errors that were recorded back when async writeback
2257	 * failed for pages in this mapping. */
2258	if (!S_ISDIR(inode->i_mode)) {
2259		err = lli->lli_async_rc;
2260		lli->lli_async_rc = 0;
2261		if (rc == 0)
2262			rc = err;
2263		err = lov_read_and_clear_async_rc(lli->lli_clob);
2264		if (rc == 0)
2265			rc = err;
2266	}
2267
2268	oc = ll_mdscapa_get(inode);
2269	err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2270		      &req);
2271	capa_put(oc);
2272	if (!rc)
2273		rc = err;
2274	if (!err)
2275		ptlrpc_req_finished(req);
2276
2277	if (datasync && S_ISREG(inode->i_mode)) {
2278		struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2279
2280		err = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF,
2281				CL_FSYNC_ALL, 0);
2282		if (rc == 0 && err < 0)
2283			rc = err;
2284		if (rc < 0)
2285			fd->fd_write_failed = true;
2286		else
2287			fd->fd_write_failed = false;
2288	}
2289
2290	mutex_unlock(&inode->i_mutex);
2291	RETURN(rc);
2292}
2293
2294int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2295{
2296	struct inode *inode = file->f_dentry->d_inode;
2297	struct ll_sb_info *sbi = ll_i2sbi(inode);
2298	struct ldlm_enqueue_info einfo = {
2299		.ei_type	= LDLM_FLOCK,
2300		.ei_cb_cp	= ldlm_flock_completion_ast,
2301		.ei_cbdata	= file_lock,
2302	};
2303	struct md_op_data *op_data;
2304	struct lustre_handle lockh = {0};
2305	ldlm_policy_data_t flock = {{0}};
2306	int flags = 0;
2307	int rc;
2308	int rc2 = 0;
2309	ENTRY;
2310
2311	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2312	       inode->i_ino, file_lock);
2313
2314	ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2315
2316	if (file_lock->fl_flags & FL_FLOCK) {
2317		LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2318		/* flocks are whole-file locks */
2319		flock.l_flock.end = OFFSET_MAX;
2320		/* For flocks owner is determined by the local file desctiptor*/
2321		flock.l_flock.owner = (unsigned long)file_lock->fl_file;
2322	} else if (file_lock->fl_flags & FL_POSIX) {
2323		flock.l_flock.owner = (unsigned long)file_lock->fl_owner;
2324		flock.l_flock.start = file_lock->fl_start;
2325		flock.l_flock.end = file_lock->fl_end;
2326	} else {
2327		RETURN(-EINVAL);
2328	}
2329	flock.l_flock.pid = file_lock->fl_pid;
2330
2331	/* Somewhat ugly workaround for svc lockd.
2332	 * lockd installs custom fl_lmops->lm_compare_owner that checks
2333	 * for the fl_owner to be the same (which it always is on local node
2334	 * I guess between lockd processes) and then compares pid.
2335	 * As such we assign pid to the owner field to make it all work,
2336	 * conflict with normal locks is unlikely since pid space and
2337	 * pointer space for current->files are not intersecting */
2338	if (file_lock->fl_lmops && file_lock->fl_lmops->lm_compare_owner)
2339		flock.l_flock.owner = (unsigned long)file_lock->fl_pid;
2340
2341	switch (file_lock->fl_type) {
2342	case F_RDLCK:
2343		einfo.ei_mode = LCK_PR;
2344		break;
2345	case F_UNLCK:
2346		/* An unlock request may or may not have any relation to
2347		 * existing locks so we may not be able to pass a lock handle
2348		 * via a normal ldlm_lock_cancel() request. The request may even
2349		 * unlock a byte range in the middle of an existing lock. In
2350		 * order to process an unlock request we need all of the same
2351		 * information that is given with a normal read or write record
2352		 * lock request. To avoid creating another ldlm unlock (cancel)
2353		 * message we'll treat a LCK_NL flock request as an unlock. */
2354		einfo.ei_mode = LCK_NL;
2355		break;
2356	case F_WRLCK:
2357		einfo.ei_mode = LCK_PW;
2358		break;
2359	default:
2360		CDEBUG(D_INFO, "Unknown fcntl lock type: %d\n",
2361			file_lock->fl_type);
2362		RETURN (-ENOTSUPP);
2363	}
2364
2365	switch (cmd) {
2366	case F_SETLKW:
2367#ifdef F_SETLKW64
2368	case F_SETLKW64:
2369#endif
2370		flags = 0;
2371		break;
2372	case F_SETLK:
2373#ifdef F_SETLK64
2374	case F_SETLK64:
2375#endif
2376		flags = LDLM_FL_BLOCK_NOWAIT;
2377		break;
2378	case F_GETLK:
2379#ifdef F_GETLK64
2380	case F_GETLK64:
2381#endif
2382		flags = LDLM_FL_TEST_LOCK;
2383		/* Save the old mode so that if the mode in the lock changes we
2384		 * can decrement the appropriate reader or writer refcount. */
2385		file_lock->fl_type = einfo.ei_mode;
2386		break;
2387	default:
2388		CERROR("unknown fcntl lock command: %d\n", cmd);
2389		RETURN (-EINVAL);
2390	}
2391
2392	op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2393				     LUSTRE_OPC_ANY, NULL);
2394	if (IS_ERR(op_data))
2395		RETURN(PTR_ERR(op_data));
2396
2397	CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2398	       "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2399	       flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2400
2401	rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2402			op_data, &lockh, &flock, 0, NULL /* req */, flags);
2403
2404	if ((file_lock->fl_flags & FL_FLOCK) &&
2405	    (rc == 0 || file_lock->fl_type == F_UNLCK))
2406		rc2  = flock_lock_file_wait(file, file_lock);
2407	if ((file_lock->fl_flags & FL_POSIX) &&
2408	    (rc == 0 || file_lock->fl_type == F_UNLCK) &&
2409	    !(flags & LDLM_FL_TEST_LOCK))
2410		rc2  = posix_lock_file_wait(file, file_lock);
2411
2412	if (rc2 && file_lock->fl_type != F_UNLCK) {
2413		einfo.ei_mode = LCK_NL;
2414		md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2415			op_data, &lockh, &flock, 0, NULL /* req */, flags);
2416		rc = rc2;
2417	}
2418
2419	ll_finish_md_op_data(op_data);
2420
2421	RETURN(rc);
2422}
2423
2424int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2425{
2426	ENTRY;
2427
2428	RETURN(-ENOSYS);
2429}
2430
2431/**
2432 * test if some locks matching bits and l_req_mode are acquired
2433 * - bits can be in different locks
2434 * - if found clear the common lock bits in *bits
2435 * - the bits not found, are kept in *bits
2436 * \param inode [IN]
2437 * \param bits [IN] searched lock bits [IN]
2438 * \param l_req_mode [IN] searched lock mode
2439 * \retval boolean, true iff all bits are found
2440 */
2441int ll_have_md_lock(struct inode *inode, __u64 *bits,  ldlm_mode_t l_req_mode)
2442{
2443	struct lustre_handle lockh;
2444	ldlm_policy_data_t policy;
2445	ldlm_mode_t mode = (l_req_mode == LCK_MINMODE) ?
2446				(LCK_CR|LCK_CW|LCK_PR|LCK_PW) : l_req_mode;
2447	struct lu_fid *fid;
2448	__u64 flags;
2449	int i;
2450	ENTRY;
2451
2452	if (!inode)
2453	       RETURN(0);
2454
2455	fid = &ll_i2info(inode)->lli_fid;
2456	CDEBUG(D_INFO, "trying to match res "DFID" mode %s\n", PFID(fid),
2457	       ldlm_lockname[mode]);
2458
2459	flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2460	for (i = 0; i <= MDS_INODELOCK_MAXSHIFT && *bits != 0; i++) {
2461		policy.l_inodebits.bits = *bits & (1 << i);
2462		if (policy.l_inodebits.bits == 0)
2463			continue;
2464
2465		if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS,
2466				  &policy, mode, &lockh)) {
2467			struct ldlm_lock *lock;
2468
2469			lock = ldlm_handle2lock(&lockh);
2470			if (lock) {
2471				*bits &=
2472				      ~(lock->l_policy_data.l_inodebits.bits);
2473				LDLM_LOCK_PUT(lock);
2474			} else {
2475				*bits &= ~policy.l_inodebits.bits;
2476			}
2477		}
2478	}
2479	RETURN(*bits == 0);
2480}
2481
2482ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2483			    struct lustre_handle *lockh, __u64 flags)
2484{
2485	ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2486	struct lu_fid *fid;
2487	ldlm_mode_t rc;
2488	ENTRY;
2489
2490	fid = &ll_i2info(inode)->lli_fid;
2491	CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2492
2493	rc = md_lock_match(ll_i2mdexp(inode), LDLM_FL_BLOCK_GRANTED|flags,
2494			   fid, LDLM_IBITS, &policy,
2495			   LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
2496	RETURN(rc);
2497}
2498
2499static int ll_inode_revalidate_fini(struct inode *inode, int rc)
2500{
2501	/* Already unlinked. Just update nlink and return success */
2502	if (rc == -ENOENT) {
2503		clear_nlink(inode);
2504		/* This path cannot be hit for regular files unless in
2505		 * case of obscure races, so no need to to validate
2506		 * size. */
2507		if (!S_ISREG(inode->i_mode) && !S_ISDIR(inode->i_mode))
2508			return 0;
2509	} else if (rc != 0) {
2510		CERROR("%s: revalidate FID "DFID" error: rc = %d\n",
2511		       ll_get_fsname(inode->i_sb, NULL, 0),
2512		       PFID(ll_inode2fid(inode)), rc);
2513	}
2514
2515	return rc;
2516}
2517
2518int __ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2519			     __u64 ibits)
2520{
2521	struct inode *inode = dentry->d_inode;
2522	struct ptlrpc_request *req = NULL;
2523	struct obd_export *exp;
2524	int rc = 0;
2525	ENTRY;
2526
2527	LASSERT(inode != NULL);
2528
2529	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2530	       inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2531
2532	exp = ll_i2mdexp(inode);
2533
2534	/* XXX: Enable OBD_CONNECT_ATTRFID to reduce unnecessary getattr RPC.
2535	 *      But under CMD case, it caused some lock issues, should be fixed
2536	 *      with new CMD ibits lock. See bug 12718 */
2537	if (exp_connect_flags(exp) & OBD_CONNECT_ATTRFID) {
2538		struct lookup_intent oit = { .it_op = IT_GETATTR };
2539		struct md_op_data *op_data;
2540
2541		if (ibits == MDS_INODELOCK_LOOKUP)
2542			oit.it_op = IT_LOOKUP;
2543
2544		/* Call getattr by fid, so do not provide name at all. */
2545		op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
2546					     dentry->d_inode, NULL, 0, 0,
2547					     LUSTRE_OPC_ANY, NULL);
2548		if (IS_ERR(op_data))
2549			RETURN(PTR_ERR(op_data));
2550
2551		oit.it_create_mode |= M_CHECK_STALE;
2552		rc = md_intent_lock(exp, op_data, NULL, 0,
2553				    /* we are not interested in name
2554				       based lookup */
2555				    &oit, 0, &req,
2556				    ll_md_blocking_ast, 0);
2557		ll_finish_md_op_data(op_data);
2558		oit.it_create_mode &= ~M_CHECK_STALE;
2559		if (rc < 0) {
2560			rc = ll_inode_revalidate_fini(inode, rc);
2561			GOTO (out, rc);
2562		}
2563
2564		rc = ll_revalidate_it_finish(req, &oit, dentry);
2565		if (rc != 0) {
2566			ll_intent_release(&oit);
2567			GOTO(out, rc);
2568		}
2569
2570		/* Unlinked? Unhash dentry, so it is not picked up later by
2571		   do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2572		   here to preserve get_cwd functionality on 2.6.
2573		   Bug 10503 */
2574		if (!dentry->d_inode->i_nlink)
2575			d_lustre_invalidate(dentry, 0);
2576
2577		ll_lookup_finish_locks(&oit, dentry);
2578	} else if (!ll_have_md_lock(dentry->d_inode, &ibits, LCK_MINMODE)) {
2579		struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2580		obd_valid valid = OBD_MD_FLGETATTR;
2581		struct md_op_data *op_data;
2582		int ealen = 0;
2583
2584		if (S_ISREG(inode->i_mode)) {
2585			rc = ll_get_max_mdsize(sbi, &ealen);
2586			if (rc)
2587				RETURN(rc);
2588			valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2589		}
2590
2591		op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL,
2592					     0, ealen, LUSTRE_OPC_ANY,
2593					     NULL);
2594		if (IS_ERR(op_data))
2595			RETURN(PTR_ERR(op_data));
2596
2597		op_data->op_valid = valid;
2598		/* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2599		 * capa for this inode. Because we only keep capas of dirs
2600		 * fresh. */
2601		rc = md_getattr(sbi->ll_md_exp, op_data, &req);
2602		ll_finish_md_op_data(op_data);
2603		if (rc) {
2604			rc = ll_inode_revalidate_fini(inode, rc);
2605			RETURN(rc);
2606		}
2607
2608		rc = ll_prep_inode(&inode, req, NULL, NULL);
2609	}
2610out:
2611	ptlrpc_req_finished(req);
2612	return rc;
2613}
2614
2615int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
2616			   __u64 ibits)
2617{
2618	struct inode *inode = dentry->d_inode;
2619	int rc;
2620	ENTRY;
2621
2622	rc = __ll_inode_revalidate_it(dentry, it, ibits);
2623	if (rc != 0)
2624		RETURN(rc);
2625
2626	/* if object isn't regular file, don't validate size */
2627	if (!S_ISREG(inode->i_mode)) {
2628		LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime;
2629		LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
2630		LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
2631	} else {
2632		rc = ll_glimpse_size(inode);
2633	}
2634	RETURN(rc);
2635}
2636
2637int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
2638		  struct lookup_intent *it, struct kstat *stat)
2639{
2640	struct inode *inode = de->d_inode;
2641	struct ll_sb_info *sbi = ll_i2sbi(inode);
2642	struct ll_inode_info *lli = ll_i2info(inode);
2643	int res = 0;
2644
2645	res = ll_inode_revalidate_it(de, it, MDS_INODELOCK_UPDATE |
2646					     MDS_INODELOCK_LOOKUP);
2647	ll_stats_ops_tally(sbi, LPROC_LL_GETATTR, 1);
2648
2649	if (res)
2650		return res;
2651
2652	stat->dev = inode->i_sb->s_dev;
2653	if (ll_need_32bit_api(sbi))
2654		stat->ino = cl_fid_build_ino(&lli->lli_fid, 1);
2655	else
2656		stat->ino = inode->i_ino;
2657	stat->mode = inode->i_mode;
2658	stat->nlink = inode->i_nlink;
2659	stat->uid = inode->i_uid;
2660	stat->gid = inode->i_gid;
2661	stat->rdev = inode->i_rdev;
2662	stat->atime = inode->i_atime;
2663	stat->mtime = inode->i_mtime;
2664	stat->ctime = inode->i_ctime;
2665	stat->blksize = 1 << inode->i_blkbits;
2666
2667	stat->size = i_size_read(inode);
2668	stat->blocks = inode->i_blocks;
2669
2670	return 0;
2671}
2672int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
2673{
2674	struct lookup_intent it = { .it_op = IT_GETATTR };
2675
2676	return ll_getattr_it(mnt, de, &it, stat);
2677}
2678
2679
2680struct posix_acl * ll_get_acl(struct inode *inode, int type)
2681{
2682	struct ll_inode_info *lli = ll_i2info(inode);
2683	struct posix_acl *acl = NULL;
2684	ENTRY;
2685
2686	spin_lock(&lli->lli_lock);
2687	/* VFS' acl_permission_check->check_acl will release the refcount */
2688	acl = posix_acl_dup(lli->lli_posix_acl);
2689	spin_unlock(&lli->lli_lock);
2690
2691	RETURN(acl);
2692}
2693
2694
2695int ll_inode_permission(struct inode *inode, int mask)
2696{
2697	int rc = 0;
2698	ENTRY;
2699
2700#ifdef MAY_NOT_BLOCK
2701	if (mask & MAY_NOT_BLOCK)
2702		return -ECHILD;
2703#endif
2704
2705       /* as root inode are NOT getting validated in lookup operation,
2706	* need to do it before permission check. */
2707
2708	if (inode == inode->i_sb->s_root->d_inode) {
2709		struct lookup_intent it = { .it_op = IT_LOOKUP };
2710
2711		rc = __ll_inode_revalidate_it(inode->i_sb->s_root, &it,
2712					      MDS_INODELOCK_LOOKUP);
2713		if (rc)
2714			RETURN(rc);
2715	}
2716
2717	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), inode mode %x mask %o\n",
2718	       inode->i_ino, inode->i_generation, inode, inode->i_mode, mask);
2719
2720	if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
2721		return lustre_check_remote_perm(inode, mask);
2722
2723	ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
2724	rc = ll_generic_permission(inode, mask, flags, ll_check_acl);
2725
2726	RETURN(rc);
2727}
2728
2729#define READ_METHOD aio_read
2730#define READ_FUNCTION ll_file_aio_read
2731#define WRITE_METHOD aio_write
2732#define WRITE_FUNCTION ll_file_aio_write
2733
2734/* -o localflock - only provides locally consistent flock locks */
2735struct file_operations ll_file_operations = {
2736	.read	   = ll_file_read,
2737	.READ_METHOD    = READ_FUNCTION,
2738	.write	  = ll_file_write,
2739	.WRITE_METHOD   = WRITE_FUNCTION,
2740	.unlocked_ioctl = ll_file_ioctl,
2741	.open	   = ll_file_open,
2742	.release	= ll_file_release,
2743	.mmap	   = ll_file_mmap,
2744	.llseek	 = ll_file_seek,
2745	.splice_read    = ll_file_splice_read,
2746	.fsync	  = ll_fsync,
2747	.flush	  = ll_flush
2748};
2749
2750struct file_operations ll_file_operations_flock = {
2751	.read	   = ll_file_read,
2752	.READ_METHOD    = READ_FUNCTION,
2753	.write	  = ll_file_write,
2754	.WRITE_METHOD   = WRITE_FUNCTION,
2755	.unlocked_ioctl = ll_file_ioctl,
2756	.open	   = ll_file_open,
2757	.release	= ll_file_release,
2758	.mmap	   = ll_file_mmap,
2759	.llseek	 = ll_file_seek,
2760	.splice_read    = ll_file_splice_read,
2761	.fsync	  = ll_fsync,
2762	.flush	  = ll_flush,
2763	.flock	  = ll_file_flock,
2764	.lock	   = ll_file_flock
2765};
2766
2767/* These are for -o noflock - to return ENOSYS on flock calls */
2768struct file_operations ll_file_operations_noflock = {
2769	.read	   = ll_file_read,
2770	.READ_METHOD    = READ_FUNCTION,
2771	.write	  = ll_file_write,
2772	.WRITE_METHOD   = WRITE_FUNCTION,
2773	.unlocked_ioctl = ll_file_ioctl,
2774	.open	   = ll_file_open,
2775	.release	= ll_file_release,
2776	.mmap	   = ll_file_mmap,
2777	.llseek	 = ll_file_seek,
2778	.splice_read    = ll_file_splice_read,
2779	.fsync	  = ll_fsync,
2780	.flush	  = ll_flush,
2781	.flock	  = ll_file_noflock,
2782	.lock	   = ll_file_noflock
2783};
2784
2785struct inode_operations ll_file_inode_operations = {
2786	.setattr	= ll_setattr,
2787	.getattr	= ll_getattr,
2788	.permission	= ll_inode_permission,
2789	.setxattr	= ll_setxattr,
2790	.getxattr	= ll_getxattr,
2791	.listxattr	= ll_listxattr,
2792	.removexattr	= ll_removexattr,
2793	.get_acl	= ll_get_acl,
2794};
2795
2796/* dynamic ioctl number support routins */
2797static struct llioc_ctl_data {
2798	struct rw_semaphore	ioc_sem;
2799	struct list_head	      ioc_head;
2800} llioc = {
2801	__RWSEM_INITIALIZER(llioc.ioc_sem),
2802	LIST_HEAD_INIT(llioc.ioc_head)
2803};
2804
2805
2806struct llioc_data {
2807	struct list_head	      iocd_list;
2808	unsigned int	    iocd_size;
2809	llioc_callback_t	iocd_cb;
2810	unsigned int	    iocd_count;
2811	unsigned int	    iocd_cmd[0];
2812};
2813
2814void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
2815{
2816	unsigned int size;
2817	struct llioc_data *in_data = NULL;
2818	ENTRY;
2819
2820	if (cb == NULL || cmd == NULL ||
2821	    count > LLIOC_MAX_CMD || count < 0)
2822		RETURN(NULL);
2823
2824	size = sizeof(*in_data) + count * sizeof(unsigned int);
2825	OBD_ALLOC(in_data, size);
2826	if (in_data == NULL)
2827		RETURN(NULL);
2828
2829	memset(in_data, 0, sizeof(*in_data));
2830	in_data->iocd_size = size;
2831	in_data->iocd_cb = cb;
2832	in_data->iocd_count = count;
2833	memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
2834
2835	down_write(&llioc.ioc_sem);
2836	list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
2837	up_write(&llioc.ioc_sem);
2838
2839	RETURN(in_data);
2840}
2841
2842void ll_iocontrol_unregister(void *magic)
2843{
2844	struct llioc_data *tmp;
2845
2846	if (magic == NULL)
2847		return;
2848
2849	down_write(&llioc.ioc_sem);
2850	list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
2851		if (tmp == magic) {
2852			unsigned int size = tmp->iocd_size;
2853
2854			list_del(&tmp->iocd_list);
2855			up_write(&llioc.ioc_sem);
2856
2857			OBD_FREE(tmp, size);
2858			return;
2859		}
2860	}
2861	up_write(&llioc.ioc_sem);
2862
2863	CWARN("didn't find iocontrol register block with magic: %p\n", magic);
2864}
2865
2866EXPORT_SYMBOL(ll_iocontrol_register);
2867EXPORT_SYMBOL(ll_iocontrol_unregister);
2868
2869enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file,
2870			unsigned int cmd, unsigned long arg, int *rcp)
2871{
2872	enum llioc_iter ret = LLIOC_CONT;
2873	struct llioc_data *data;
2874	int rc = -EINVAL, i;
2875
2876	down_read(&llioc.ioc_sem);
2877	list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
2878		for (i = 0; i < data->iocd_count; i++) {
2879			if (cmd != data->iocd_cmd[i])
2880				continue;
2881
2882			ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
2883			break;
2884		}
2885
2886		if (ret == LLIOC_STOP)
2887			break;
2888	}
2889	up_read(&llioc.ioc_sem);
2890
2891	if (rcp)
2892		*rcp = rc;
2893	return ret;
2894}
2895
2896int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf)
2897{
2898	struct ll_inode_info *lli = ll_i2info(inode);
2899	struct cl_env_nest nest;
2900	struct lu_env *env;
2901	int result;
2902	ENTRY;
2903
2904	if (lli->lli_clob == NULL)
2905		RETURN(0);
2906
2907	env = cl_env_nested_get(&nest);
2908	if (IS_ERR(env))
2909		RETURN(PTR_ERR(env));
2910
2911	result = cl_conf_set(env, lli->lli_clob, conf);
2912	cl_env_nested_put(&nest, env);
2913
2914	if (conf->coc_opc == OBJECT_CONF_SET) {
2915		struct ldlm_lock *lock = conf->coc_lock;
2916
2917		LASSERT(lock != NULL);
2918		LASSERT(ldlm_has_layout(lock));
2919		if (result == 0) {
2920			/* it can only be allowed to match after layout is
2921			 * applied to inode otherwise false layout would be
2922			 * seen. Applying layout shoud happen before dropping
2923			 * the intent lock. */
2924			ldlm_lock_allow_match(lock);
2925		}
2926	}
2927	RETURN(result);
2928}
2929
2930/* Fetch layout from MDT with getxattr request, if it's not ready yet */
2931static int ll_layout_fetch(struct inode *inode, struct ldlm_lock *lock)
2932
2933{
2934	struct ll_sb_info *sbi = ll_i2sbi(inode);
2935	struct obd_capa *oc;
2936	struct ptlrpc_request *req;
2937	struct mdt_body *body;
2938	void *lvbdata;
2939	void *lmm;
2940	int lmmsize;
2941	int rc;
2942	ENTRY;
2943
2944	CDEBUG(D_INODE, DFID" LVB_READY=%d l_lvb_data=%p l_lvb_len=%d\n",
2945	       PFID(ll_inode2fid(inode)), !!(lock->l_flags & LDLM_FL_LVB_READY),
2946	       lock->l_lvb_data, lock->l_lvb_len);
2947
2948	if ((lock->l_lvb_data != NULL) && (lock->l_flags & LDLM_FL_LVB_READY))
2949		RETURN(0);
2950
2951	/* if layout lock was granted right away, the layout is returned
2952	 * within DLM_LVB of dlm reply; otherwise if the lock was ever
2953	 * blocked and then granted via completion ast, we have to fetch
2954	 * layout here. Please note that we can't use the LVB buffer in
2955	 * completion AST because it doesn't have a large enough buffer */
2956	oc = ll_mdscapa_get(inode);
2957	rc = ll_get_max_mdsize(sbi, &lmmsize);
2958	if (rc == 0)
2959		rc = md_getxattr(sbi->ll_md_exp, ll_inode2fid(inode), oc,
2960				OBD_MD_FLXATTR, XATTR_NAME_LOV, NULL, 0,
2961				lmmsize, 0, &req);
2962	capa_put(oc);
2963	if (rc < 0)
2964		RETURN(rc);
2965
2966	body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
2967	if (body == NULL || body->eadatasize > lmmsize)
2968		GOTO(out, rc = -EPROTO);
2969
2970	lmmsize = body->eadatasize;
2971	if (lmmsize == 0) /* empty layout */
2972		GOTO(out, rc = 0);
2973
2974	lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_EADATA, lmmsize);
2975	if (lmm == NULL)
2976		GOTO(out, rc = -EFAULT);
2977
2978	OBD_ALLOC_LARGE(lvbdata, lmmsize);
2979	if (lvbdata == NULL)
2980		GOTO(out, rc = -ENOMEM);
2981
2982	memcpy(lvbdata, lmm, lmmsize);
2983	lock_res_and_lock(lock);
2984	if (lock->l_lvb_data != NULL)
2985		OBD_FREE_LARGE(lock->l_lvb_data, lock->l_lvb_len);
2986
2987	lock->l_lvb_data = lvbdata;
2988	lock->l_lvb_len = lmmsize;
2989	unlock_res_and_lock(lock);
2990
2991	EXIT;
2992
2993out:
2994	ptlrpc_req_finished(req);
2995	return rc;
2996}
2997
2998/**
2999 * Apply the layout to the inode. Layout lock is held and will be released
3000 * in this function.
3001 */
3002static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode,
3003				struct inode *inode, __u32 *gen, bool reconf)
3004{
3005	struct ll_inode_info *lli = ll_i2info(inode);
3006	struct ll_sb_info    *sbi = ll_i2sbi(inode);
3007	struct ldlm_lock *lock;
3008	struct lustre_md md = { NULL };
3009	struct cl_object_conf conf;
3010	int rc = 0;
3011	bool lvb_ready;
3012	bool wait_layout = false;
3013	ENTRY;
3014
3015	LASSERT(lustre_handle_is_used(lockh));
3016
3017	lock = ldlm_handle2lock(lockh);
3018	LASSERT(lock != NULL);
3019	LASSERT(ldlm_has_layout(lock));
3020
3021	LDLM_DEBUG(lock, "File %p/"DFID" being reconfigured: %d.\n",
3022		   inode, PFID(&lli->lli_fid), reconf);
3023
3024	/* in case this is a caching lock and reinstate with new inode */
3025	md_set_lock_data(sbi->ll_md_exp, &lockh->cookie, inode, NULL);
3026
3027	lock_res_and_lock(lock);
3028	lvb_ready = !!(lock->l_flags & LDLM_FL_LVB_READY);
3029	unlock_res_and_lock(lock);
3030	/* checking lvb_ready is racy but this is okay. The worst case is
3031	 * that multi processes may configure the file on the same time. */
3032	if (lvb_ready || !reconf) {
3033		rc = -ENODATA;
3034		if (lvb_ready) {
3035			/* layout_gen must be valid if layout lock is not
3036			 * cancelled and stripe has already set */
3037			*gen = lli->lli_layout_gen;
3038			rc = 0;
3039		}
3040		GOTO(out, rc);
3041	}
3042
3043	rc = ll_layout_fetch(inode, lock);
3044	if (rc < 0)
3045		GOTO(out, rc);
3046
3047	/* for layout lock, lmm is returned in lock's lvb.
3048	 * lvb_data is immutable if the lock is held so it's safe to access it
3049	 * without res lock. See the description in ldlm_lock_decref_internal()
3050	 * for the condition to free lvb_data of layout lock */
3051	if (lock->l_lvb_data != NULL) {
3052		rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
3053				  lock->l_lvb_data, lock->l_lvb_len);
3054		if (rc >= 0) {
3055			*gen = LL_LAYOUT_GEN_EMPTY;
3056			if (md.lsm != NULL)
3057				*gen = md.lsm->lsm_layout_gen;
3058			rc = 0;
3059		} else {
3060			CERROR("%s: file "DFID" unpackmd error: %d\n",
3061				ll_get_fsname(inode->i_sb, NULL, 0),
3062				PFID(&lli->lli_fid), rc);
3063		}
3064	}
3065	if (rc < 0)
3066		GOTO(out, rc);
3067
3068	/* set layout to file. Unlikely this will fail as old layout was
3069	 * surely eliminated */
3070	memset(&conf, 0, sizeof conf);
3071	conf.coc_opc = OBJECT_CONF_SET;
3072	conf.coc_inode = inode;
3073	conf.coc_lock = lock;
3074	conf.u.coc_md = &md;
3075	rc = ll_layout_conf(inode, &conf);
3076
3077	if (md.lsm != NULL)
3078		obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
3079
3080	/* refresh layout failed, need to wait */
3081	wait_layout = rc == -EBUSY;
3082	EXIT;
3083
3084out:
3085	LDLM_LOCK_PUT(lock);
3086	ldlm_lock_decref(lockh, mode);
3087
3088	/* wait for IO to complete if it's still being used. */
3089	if (wait_layout) {
3090		CDEBUG(D_INODE, "%s: %p/"DFID" wait for layout reconf.\n",
3091			ll_get_fsname(inode->i_sb, NULL, 0),
3092			inode, PFID(&lli->lli_fid));
3093
3094		memset(&conf, 0, sizeof conf);
3095		conf.coc_opc = OBJECT_CONF_WAIT;
3096		conf.coc_inode = inode;
3097		rc = ll_layout_conf(inode, &conf);
3098		if (rc == 0)
3099			rc = -EAGAIN;
3100
3101		CDEBUG(D_INODE, "file: "DFID" waiting layout return: %d.\n",
3102			PFID(&lli->lli_fid), rc);
3103	}
3104	RETURN(rc);
3105}
3106
3107/**
3108 * This function checks if there exists a LAYOUT lock on the client side,
3109 * or enqueues it if it doesn't have one in cache.
3110 *
3111 * This function will not hold layout lock so it may be revoked any time after
3112 * this function returns. Any operations depend on layout should be redone
3113 * in that case.
3114 *
3115 * This function should be called before lov_io_init() to get an uptodate
3116 * layout version, the caller should save the version number and after IO
3117 * is finished, this function should be called again to verify that layout
3118 * is not changed during IO time.
3119 */
3120int ll_layout_refresh(struct inode *inode, __u32 *gen)
3121{
3122	struct ll_inode_info  *lli = ll_i2info(inode);
3123	struct ll_sb_info     *sbi = ll_i2sbi(inode);
3124	struct md_op_data     *op_data;
3125	struct lookup_intent   it;
3126	struct lustre_handle   lockh;
3127	ldlm_mode_t	       mode;
3128	struct ldlm_enqueue_info einfo = {
3129		.ei_type = LDLM_IBITS,
3130		.ei_mode = LCK_CR,
3131		.ei_cb_bl = ll_md_blocking_ast,
3132		.ei_cb_cp = ldlm_completion_ast,
3133	};
3134	int rc;
3135	ENTRY;
3136
3137	*gen = lli->lli_layout_gen;
3138	if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK))
3139		RETURN(0);
3140
3141	/* sanity checks */
3142	LASSERT(fid_is_sane(ll_inode2fid(inode)));
3143	LASSERT(S_ISREG(inode->i_mode));
3144
3145	/* mostly layout lock is caching on the local side, so try to match
3146	 * it before grabbing layout lock mutex. */
3147	mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0);
3148	if (mode != 0) { /* hit cached lock */
3149		rc = ll_layout_lock_set(&lockh, mode, inode, gen, false);
3150		if (rc == 0)
3151			RETURN(0);
3152
3153		/* better hold lli_layout_mutex to try again otherwise
3154		 * it will have starvation problem. */
3155	}
3156
3157	/* take layout lock mutex to enqueue layout lock exclusively. */
3158	mutex_lock(&lli->lli_layout_mutex);
3159
3160again:
3161	/* try again. Maybe somebody else has done this. */
3162	mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0);
3163	if (mode != 0) { /* hit cached lock */
3164		rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
3165		if (rc == -EAGAIN)
3166			goto again;
3167
3168		mutex_unlock(&lli->lli_layout_mutex);
3169		RETURN(rc);
3170	}
3171
3172	op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
3173			0, 0, LUSTRE_OPC_ANY, NULL);
3174	if (IS_ERR(op_data)) {
3175		mutex_unlock(&lli->lli_layout_mutex);
3176		RETURN(PTR_ERR(op_data));
3177	}
3178
3179	/* have to enqueue one */
3180	memset(&it, 0, sizeof(it));
3181	it.it_op = IT_LAYOUT;
3182	lockh.cookie = 0ULL;
3183
3184	LDLM_DEBUG_NOLOCK("%s: requeue layout lock for file %p/"DFID".\n",
3185			ll_get_fsname(inode->i_sb, NULL, 0), inode,
3186			PFID(&lli->lli_fid));
3187
3188	rc = md_enqueue(sbi->ll_md_exp, &einfo, &it, op_data, &lockh,
3189			NULL, 0, NULL, 0);
3190	if (it.d.lustre.it_data != NULL)
3191		ptlrpc_req_finished(it.d.lustre.it_data);
3192	it.d.lustre.it_data = NULL;
3193
3194	ll_finish_md_op_data(op_data);
3195
3196	mode = it.d.lustre.it_lock_mode;
3197	it.d.lustre.it_lock_mode = 0;
3198	ll_intent_drop_lock(&it);
3199
3200	if (rc == 0) {
3201		/* set lock data in case this is a new lock */
3202		ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
3203		rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
3204		if (rc == -EAGAIN)
3205			goto again;
3206	}
3207	mutex_unlock(&lli->lli_layout_mutex);
3208
3209	RETURN(rc);
3210}
3211