[go: nahoru, domu]

file.c revision ebdc4fc54d5defaa20417eabeb7a8d7b400fd53c
1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 *
36 * lustre/llite/file.c
37 *
38 * Author: Peter Braam <braam@clusterfs.com>
39 * Author: Phil Schwan <phil@clusterfs.com>
40 * Author: Andreas Dilger <adilger@clusterfs.com>
41 */
42
43#define DEBUG_SUBSYSTEM S_LLITE
44#include <lustre_dlm.h>
45#include <lustre_lite.h>
46#include <linux/pagemap.h>
47#include <linux/file.h>
48#include "llite_internal.h"
49#include <lustre/ll_fiemap.h>
50
51#include "cl_object.h"
52
53static int
54ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg);
55
56static int ll_lease_close(struct obd_client_handle *och, struct inode *inode,
57			  bool *lease_broken);
58
59static enum llioc_iter
60ll_iocontrol_call(struct inode *inode, struct file *file,
61		  unsigned int cmd, unsigned long arg, int *rcp);
62
63static struct ll_file_data *ll_file_data_get(void)
64{
65	struct ll_file_data *fd;
66
67	OBD_SLAB_ALLOC_PTR_GFP(fd, ll_file_data_slab, GFP_NOFS);
68	if (fd == NULL)
69		return NULL;
70	fd->fd_write_failed = false;
71	return fd;
72}
73
74static void ll_file_data_put(struct ll_file_data *fd)
75{
76	if (fd != NULL)
77		OBD_SLAB_FREE_PTR(fd, ll_file_data_slab);
78}
79
80void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
81			  struct lustre_handle *fh)
82{
83	op_data->op_fid1 = ll_i2info(inode)->lli_fid;
84	op_data->op_attr.ia_mode = inode->i_mode;
85	op_data->op_attr.ia_atime = inode->i_atime;
86	op_data->op_attr.ia_mtime = inode->i_mtime;
87	op_data->op_attr.ia_ctime = inode->i_ctime;
88	op_data->op_attr.ia_size = i_size_read(inode);
89	op_data->op_attr_blocks = inode->i_blocks;
90	((struct ll_iattr *)&op_data->op_attr)->ia_attr_flags =
91					ll_inode_to_ext_flags(inode->i_flags);
92	op_data->op_ioepoch = ll_i2info(inode)->lli_ioepoch;
93	if (fh)
94		op_data->op_handle = *fh;
95	op_data->op_capa1 = ll_mdscapa_get(inode);
96
97	if (LLIF_DATA_MODIFIED & ll_i2info(inode)->lli_flags)
98		op_data->op_bias |= MDS_DATA_MODIFIED;
99}
100
101/**
102 * Closes the IO epoch and packs all the attributes into @op_data for
103 * the CLOSE rpc.
104 */
105static void ll_prepare_close(struct inode *inode, struct md_op_data *op_data,
106			     struct obd_client_handle *och)
107{
108	op_data->op_attr.ia_valid = ATTR_MODE | ATTR_ATIME | ATTR_ATIME_SET |
109					ATTR_MTIME | ATTR_MTIME_SET |
110					ATTR_CTIME | ATTR_CTIME_SET;
111
112	if (!(och->och_flags & FMODE_WRITE))
113		goto out;
114
115	if (!exp_connect_som(ll_i2mdexp(inode)) || !S_ISREG(inode->i_mode))
116		op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
117	else
118		ll_ioepoch_close(inode, op_data, &och, 0);
119
120out:
121	ll_pack_inode2opdata(inode, op_data, &och->och_fh);
122	ll_prep_md_op_data(op_data, inode, NULL, NULL,
123			   0, 0, LUSTRE_OPC_ANY, NULL);
124}
125
126static int ll_close_inode_openhandle(struct obd_export *md_exp,
127				     struct inode *inode,
128				     struct obd_client_handle *och,
129				     const __u64 *data_version)
130{
131	struct obd_export *exp = ll_i2mdexp(inode);
132	struct md_op_data *op_data;
133	struct ptlrpc_request *req = NULL;
134	struct obd_device *obd = class_exp2obd(exp);
135	int epoch_close = 1;
136	int rc;
137
138	if (obd == NULL) {
139		/*
140		 * XXX: in case of LMV, is this correct to access
141		 * ->exp_handle?
142		 */
143		CERROR("Invalid MDC connection handle "LPX64"\n",
144		       ll_i2mdexp(inode)->exp_handle.h_cookie);
145		GOTO(out, rc = 0);
146	}
147
148	OBD_ALLOC_PTR(op_data);
149	if (op_data == NULL)
150		GOTO(out, rc = -ENOMEM); // XXX We leak openhandle and request here.
151
152	ll_prepare_close(inode, op_data, och);
153	if (data_version != NULL) {
154		/* Pass in data_version implies release. */
155		op_data->op_bias |= MDS_HSM_RELEASE;
156		op_data->op_data_version = *data_version;
157		op_data->op_lease_handle = och->och_lease_handle;
158		op_data->op_attr.ia_valid |= ATTR_SIZE | ATTR_BLOCKS;
159	}
160	epoch_close = (op_data->op_flags & MF_EPOCH_CLOSE);
161	rc = md_close(md_exp, op_data, och->och_mod, &req);
162	if (rc == -EAGAIN) {
163		/* This close must have the epoch closed. */
164		LASSERT(epoch_close);
165		/* MDS has instructed us to obtain Size-on-MDS attribute from
166		 * OSTs and send setattr to back to MDS. */
167		rc = ll_som_update(inode, op_data);
168		if (rc) {
169			CERROR("inode %lu mdc Size-on-MDS update failed: "
170			       "rc = %d\n", inode->i_ino, rc);
171			rc = 0;
172		}
173	} else if (rc) {
174		CERROR("inode %lu mdc close failed: rc = %d\n",
175		       inode->i_ino, rc);
176	}
177
178	/* DATA_MODIFIED flag was successfully sent on close, cancel data
179	 * modification flag. */
180	if (rc == 0 && (op_data->op_bias & MDS_DATA_MODIFIED)) {
181		struct ll_inode_info *lli = ll_i2info(inode);
182
183		spin_lock(&lli->lli_lock);
184		lli->lli_flags &= ~LLIF_DATA_MODIFIED;
185		spin_unlock(&lli->lli_lock);
186	}
187
188	if (rc == 0) {
189		rc = ll_objects_destroy(req, inode);
190		if (rc)
191			CERROR("inode %lu ll_objects destroy: rc = %d\n",
192			       inode->i_ino, rc);
193	}
194	if (rc == 0 && op_data->op_bias & MDS_HSM_RELEASE) {
195		struct mdt_body *body;
196		body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
197		if (!(body->valid & OBD_MD_FLRELEASED))
198			rc = -EBUSY;
199	}
200
201	ll_finish_md_op_data(op_data);
202
203out:
204	if (exp_connect_som(exp) && !epoch_close &&
205	    S_ISREG(inode->i_mode) && (och->och_flags & FMODE_WRITE)) {
206		ll_queue_done_writing(inode, LLIF_DONE_WRITING);
207	} else {
208		md_clear_open_replay_data(md_exp, och);
209		/* Free @och if it is not waiting for DONE_WRITING. */
210		och->och_fh.cookie = DEAD_HANDLE_MAGIC;
211		OBD_FREE_PTR(och);
212	}
213	if (req) /* This is close request */
214		ptlrpc_req_finished(req);
215	return rc;
216}
217
218int ll_md_real_close(struct inode *inode, fmode_t fmode)
219{
220	struct ll_inode_info *lli = ll_i2info(inode);
221	struct obd_client_handle **och_p;
222	struct obd_client_handle *och;
223	__u64 *och_usecount;
224	int rc = 0;
225
226	if (fmode & FMODE_WRITE) {
227		och_p = &lli->lli_mds_write_och;
228		och_usecount = &lli->lli_open_fd_write_count;
229	} else if (fmode & FMODE_EXEC) {
230		och_p = &lli->lli_mds_exec_och;
231		och_usecount = &lli->lli_open_fd_exec_count;
232	} else {
233		LASSERT(fmode & FMODE_READ);
234		och_p = &lli->lli_mds_read_och;
235		och_usecount = &lli->lli_open_fd_read_count;
236	}
237
238	mutex_lock(&lli->lli_och_mutex);
239	if (*och_usecount > 0) {
240		/* There are still users of this handle, so skip
241		 * freeing it. */
242		mutex_unlock(&lli->lli_och_mutex);
243		return 0;
244	}
245
246	och=*och_p;
247	*och_p = NULL;
248	mutex_unlock(&lli->lli_och_mutex);
249
250	if (och != NULL) {
251		/* There might be a race and this handle may already
252		   be closed. */
253		rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
254					       inode, och, NULL);
255	}
256
257	return rc;
258}
259
260static int ll_md_close(struct obd_export *md_exp, struct inode *inode,
261		       struct file *file)
262{
263	struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
264	struct ll_inode_info *lli = ll_i2info(inode);
265	int rc = 0;
266
267	/* clear group lock, if present */
268	if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED))
269		ll_put_grouplock(inode, file, fd->fd_grouplock.cg_gid);
270
271	if (fd->fd_lease_och != NULL) {
272		bool lease_broken;
273
274		/* Usually the lease is not released when the
275		 * application crashed, we need to release here. */
276		rc = ll_lease_close(fd->fd_lease_och, inode, &lease_broken);
277		CDEBUG(rc ? D_ERROR : D_INODE, "Clean up lease "DFID" %d/%d\n",
278			PFID(&lli->lli_fid), rc, lease_broken);
279
280		fd->fd_lease_och = NULL;
281	}
282
283	if (fd->fd_och != NULL) {
284		rc = ll_close_inode_openhandle(md_exp, inode, fd->fd_och, NULL);
285		fd->fd_och = NULL;
286		GOTO(out, rc);
287	}
288
289	/* Let's see if we have good enough OPEN lock on the file and if
290	   we can skip talking to MDS */
291	if (file->f_dentry->d_inode) { /* Can this ever be false? */
292		int lockmode;
293		int flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_TEST_LOCK;
294		struct lustre_handle lockh;
295		struct inode *inode = file->f_dentry->d_inode;
296		ldlm_policy_data_t policy = {.l_inodebits={MDS_INODELOCK_OPEN}};
297
298		mutex_lock(&lli->lli_och_mutex);
299		if (fd->fd_omode & FMODE_WRITE) {
300			lockmode = LCK_CW;
301			LASSERT(lli->lli_open_fd_write_count);
302			lli->lli_open_fd_write_count--;
303		} else if (fd->fd_omode & FMODE_EXEC) {
304			lockmode = LCK_PR;
305			LASSERT(lli->lli_open_fd_exec_count);
306			lli->lli_open_fd_exec_count--;
307		} else {
308			lockmode = LCK_CR;
309			LASSERT(lli->lli_open_fd_read_count);
310			lli->lli_open_fd_read_count--;
311		}
312		mutex_unlock(&lli->lli_och_mutex);
313
314		if (!md_lock_match(md_exp, flags, ll_inode2fid(inode),
315				   LDLM_IBITS, &policy, lockmode,
316				   &lockh)) {
317			rc = ll_md_real_close(file->f_dentry->d_inode,
318					      fd->fd_omode);
319		}
320	} else {
321		CERROR("Releasing a file %p with negative dentry %p. Name %s",
322		       file, file->f_dentry, file->f_dentry->d_name.name);
323	}
324
325out:
326	LUSTRE_FPRIVATE(file) = NULL;
327	ll_file_data_put(fd);
328	ll_capa_close(inode);
329
330	return rc;
331}
332
333/* While this returns an error code, fput() the caller does not, so we need
334 * to make every effort to clean up all of our state here.  Also, applications
335 * rarely check close errors and even if an error is returned they will not
336 * re-try the close call.
337 */
338int ll_file_release(struct inode *inode, struct file *file)
339{
340	struct ll_file_data *fd;
341	struct ll_sb_info *sbi = ll_i2sbi(inode);
342	struct ll_inode_info *lli = ll_i2info(inode);
343	int rc;
344
345	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
346	       inode->i_generation, inode);
347
348#ifdef CONFIG_FS_POSIX_ACL
349	if (sbi->ll_flags & LL_SBI_RMT_CLIENT &&
350	    inode == inode->i_sb->s_root->d_inode) {
351		struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
352
353		LASSERT(fd != NULL);
354		if (unlikely(fd->fd_flags & LL_FILE_RMTACL)) {
355			fd->fd_flags &= ~LL_FILE_RMTACL;
356			rct_del(&sbi->ll_rct, current_pid());
357			et_search_free(&sbi->ll_et, current_pid());
358		}
359	}
360#endif
361
362	if (inode->i_sb->s_root != file->f_dentry)
363		ll_stats_ops_tally(sbi, LPROC_LL_RELEASE, 1);
364	fd = LUSTRE_FPRIVATE(file);
365	LASSERT(fd != NULL);
366
367	/* The last ref on @file, maybe not the the owner pid of statahead.
368	 * Different processes can open the same dir, "ll_opendir_key" means:
369	 * it is me that should stop the statahead thread. */
370	if (S_ISDIR(inode->i_mode) && lli->lli_opendir_key == fd &&
371	    lli->lli_opendir_pid != 0)
372		ll_stop_statahead(inode, lli->lli_opendir_key);
373
374	if (inode->i_sb->s_root == file->f_dentry) {
375		LUSTRE_FPRIVATE(file) = NULL;
376		ll_file_data_put(fd);
377		return 0;
378	}
379
380	if (!S_ISDIR(inode->i_mode)) {
381		lov_read_and_clear_async_rc(lli->lli_clob);
382		lli->lli_async_rc = 0;
383	}
384
385	rc = ll_md_close(sbi->ll_md_exp, inode, file);
386
387	if (CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_DUMP_LOG, cfs_fail_val))
388		libcfs_debug_dumplog();
389
390	return rc;
391}
392
393static int ll_intent_file_open(struct file *file, void *lmm,
394			       int lmmsize, struct lookup_intent *itp)
395{
396	struct ll_sb_info *sbi = ll_i2sbi(file->f_dentry->d_inode);
397	struct dentry *parent = file->f_dentry->d_parent;
398	const char *name = file->f_dentry->d_name.name;
399	const int len = file->f_dentry->d_name.len;
400	struct md_op_data *op_data;
401	struct ptlrpc_request *req;
402	__u32 opc = LUSTRE_OPC_ANY;
403	int rc;
404
405	if (!parent)
406		return -ENOENT;
407
408	/* Usually we come here only for NFSD, and we want open lock.
409	   But we can also get here with pre 2.6.15 patchless kernels, and in
410	   that case that lock is also ok */
411	/* We can also get here if there was cached open handle in revalidate_it
412	 * but it disappeared while we were getting from there to ll_file_open.
413	 * But this means this file was closed and immediately opened which
414	 * makes a good candidate for using OPEN lock */
415	/* If lmmsize & lmm are not 0, we are just setting stripe info
416	 * parameters. No need for the open lock */
417	if (lmm == NULL && lmmsize == 0) {
418		itp->it_flags |= MDS_OPEN_LOCK;
419		if (itp->it_flags & FMODE_WRITE)
420			opc = LUSTRE_OPC_CREATE;
421	}
422
423	op_data  = ll_prep_md_op_data(NULL, parent->d_inode,
424				      file->f_dentry->d_inode, name, len,
425				      O_RDWR, opc, NULL);
426	if (IS_ERR(op_data))
427		return PTR_ERR(op_data);
428
429	itp->it_flags |= MDS_OPEN_BY_FID;
430	rc = md_intent_lock(sbi->ll_md_exp, op_data, lmm, lmmsize, itp,
431			    0 /*unused */, &req, ll_md_blocking_ast, 0);
432	ll_finish_md_op_data(op_data);
433	if (rc == -ESTALE) {
434		/* reason for keep own exit path - don`t flood log
435		* with messages with -ESTALE errors.
436		*/
437		if (!it_disposition(itp, DISP_OPEN_OPEN) ||
438		     it_open_error(DISP_OPEN_OPEN, itp))
439			GOTO(out, rc);
440		ll_release_openhandle(file->f_dentry, itp);
441		GOTO(out, rc);
442	}
443
444	if (it_disposition(itp, DISP_LOOKUP_NEG))
445		GOTO(out, rc = -ENOENT);
446
447	if (rc != 0 || it_open_error(DISP_OPEN_OPEN, itp)) {
448		rc = rc ? rc : it_open_error(DISP_OPEN_OPEN, itp);
449		CDEBUG(D_VFSTRACE, "lock enqueue: err: %d\n", rc);
450		GOTO(out, rc);
451	}
452
453	rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL, itp);
454	if (!rc && itp->d.lustre.it_lock_mode)
455		ll_set_lock_data(sbi->ll_md_exp, file->f_dentry->d_inode,
456				 itp, NULL);
457
458out:
459	ptlrpc_req_finished(req);
460	ll_intent_drop_lock(itp);
461
462	return rc;
463}
464
465/**
466 * Assign an obtained @ioepoch to client's inode. No lock is needed, MDS does
467 * not believe attributes if a few ioepoch holders exist. Attributes for
468 * previous ioepoch if new one is opened are also skipped by MDS.
469 */
470void ll_ioepoch_open(struct ll_inode_info *lli, __u64 ioepoch)
471{
472	if (ioepoch && lli->lli_ioepoch != ioepoch) {
473		lli->lli_ioepoch = ioepoch;
474		CDEBUG(D_INODE, "Epoch "LPU64" opened on "DFID"\n",
475		       ioepoch, PFID(&lli->lli_fid));
476	}
477}
478
479static int ll_och_fill(struct obd_export *md_exp, struct lookup_intent *it,
480		       struct obd_client_handle *och)
481{
482	struct ptlrpc_request *req = it->d.lustre.it_data;
483	struct mdt_body *body;
484
485	body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
486	och->och_fh = body->handle;
487	och->och_fid = body->fid1;
488	och->och_lease_handle.cookie = it->d.lustre.it_lock_handle;
489	och->och_magic = OBD_CLIENT_HANDLE_MAGIC;
490	och->och_flags = it->it_flags;
491
492	return md_set_open_replay_data(md_exp, och, it);
493}
494
495static int ll_local_open(struct file *file, struct lookup_intent *it,
496			 struct ll_file_data *fd, struct obd_client_handle *och)
497{
498	struct inode *inode = file->f_dentry->d_inode;
499	struct ll_inode_info *lli = ll_i2info(inode);
500
501	LASSERT(!LUSTRE_FPRIVATE(file));
502
503	LASSERT(fd != NULL);
504
505	if (och) {
506		struct ptlrpc_request *req = it->d.lustre.it_data;
507		struct mdt_body *body;
508		int rc;
509
510		rc = ll_och_fill(ll_i2sbi(inode)->ll_md_exp, it, och);
511		if (rc != 0)
512			return rc;
513
514		body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
515		ll_ioepoch_open(lli, body->ioepoch);
516	}
517
518	LUSTRE_FPRIVATE(file) = fd;
519	ll_readahead_init(inode, &fd->fd_ras);
520	fd->fd_omode = it->it_flags & (FMODE_READ | FMODE_WRITE | FMODE_EXEC);
521	return 0;
522}
523
524/* Open a file, and (for the very first open) create objects on the OSTs at
525 * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
526 * creation or open until ll_lov_setstripe() ioctl is called.
527 *
528 * If we already have the stripe MD locally then we don't request it in
529 * md_open(), by passing a lmm_size = 0.
530 *
531 * It is up to the application to ensure no other processes open this file
532 * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
533 * used.  We might be able to avoid races of that sort by getting lli_open_sem
534 * before returning in the O_LOV_DELAY_CREATE case and dropping it here
535 * or in ll_file_release(), but I'm not sure that is desirable/necessary.
536 */
537int ll_file_open(struct inode *inode, struct file *file)
538{
539	struct ll_inode_info *lli = ll_i2info(inode);
540	struct lookup_intent *it, oit = { .it_op = IT_OPEN,
541					  .it_flags = file->f_flags };
542	struct obd_client_handle **och_p = NULL;
543	__u64 *och_usecount = NULL;
544	struct ll_file_data *fd;
545	int rc = 0, opendir_set = 0;
546
547	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), flags %o\n", inode->i_ino,
548	       inode->i_generation, inode, file->f_flags);
549
550	it = file->private_data; /* XXX: compat macro */
551	file->private_data = NULL; /* prevent ll_local_open assertion */
552
553	fd = ll_file_data_get();
554	if (fd == NULL)
555		GOTO(out_openerr, rc = -ENOMEM);
556
557	fd->fd_file = file;
558	if (S_ISDIR(inode->i_mode)) {
559		spin_lock(&lli->lli_sa_lock);
560		if (lli->lli_opendir_key == NULL && lli->lli_sai == NULL &&
561		    lli->lli_opendir_pid == 0) {
562			lli->lli_opendir_key = fd;
563			lli->lli_opendir_pid = current_pid();
564			opendir_set = 1;
565		}
566		spin_unlock(&lli->lli_sa_lock);
567	}
568
569	if (inode->i_sb->s_root == file->f_dentry) {
570		LUSTRE_FPRIVATE(file) = fd;
571		return 0;
572	}
573
574	if (!it || !it->d.lustre.it_disposition) {
575		/* Convert f_flags into access mode. We cannot use file->f_mode,
576		 * because everything but O_ACCMODE mask was stripped from
577		 * there */
578		if ((oit.it_flags + 1) & O_ACCMODE)
579			oit.it_flags++;
580		if (file->f_flags & O_TRUNC)
581			oit.it_flags |= FMODE_WRITE;
582
583		/* kernel only call f_op->open in dentry_open.  filp_open calls
584		 * dentry_open after call to open_namei that checks permissions.
585		 * Only nfsd_open call dentry_open directly without checking
586		 * permissions and because of that this code below is safe. */
587		if (oit.it_flags & (FMODE_WRITE | FMODE_READ))
588			oit.it_flags |= MDS_OPEN_OWNEROVERRIDE;
589
590		/* We do not want O_EXCL here, presumably we opened the file
591		 * already? XXX - NFS implications? */
592		oit.it_flags &= ~O_EXCL;
593
594		/* bug20584, if "it_flags" contains O_CREAT, the file will be
595		 * created if necessary, then "IT_CREAT" should be set to keep
596		 * consistent with it */
597		if (oit.it_flags & O_CREAT)
598			oit.it_op |= IT_CREAT;
599
600		it = &oit;
601	}
602
603restart:
604	/* Let's see if we have file open on MDS already. */
605	if (it->it_flags & FMODE_WRITE) {
606		och_p = &lli->lli_mds_write_och;
607		och_usecount = &lli->lli_open_fd_write_count;
608	} else if (it->it_flags & FMODE_EXEC) {
609		och_p = &lli->lli_mds_exec_och;
610		och_usecount = &lli->lli_open_fd_exec_count;
611	 } else {
612		och_p = &lli->lli_mds_read_och;
613		och_usecount = &lli->lli_open_fd_read_count;
614	}
615
616	mutex_lock(&lli->lli_och_mutex);
617	if (*och_p) { /* Open handle is present */
618		if (it_disposition(it, DISP_OPEN_OPEN)) {
619			/* Well, there's extra open request that we do not need,
620			   let's close it somehow. This will decref request. */
621			rc = it_open_error(DISP_OPEN_OPEN, it);
622			if (rc) {
623				mutex_unlock(&lli->lli_och_mutex);
624				GOTO(out_openerr, rc);
625			}
626
627			ll_release_openhandle(file->f_dentry, it);
628		}
629		(*och_usecount)++;
630
631		rc = ll_local_open(file, it, fd, NULL);
632		if (rc) {
633			(*och_usecount)--;
634			mutex_unlock(&lli->lli_och_mutex);
635			GOTO(out_openerr, rc);
636		}
637	} else {
638		LASSERT(*och_usecount == 0);
639		if (!it->d.lustre.it_disposition) {
640			/* We cannot just request lock handle now, new ELC code
641			   means that one of other OPEN locks for this file
642			   could be cancelled, and since blocking ast handler
643			   would attempt to grab och_mutex as well, that would
644			   result in a deadlock */
645			mutex_unlock(&lli->lli_och_mutex);
646			it->it_create_mode |= M_CHECK_STALE;
647			rc = ll_intent_file_open(file, NULL, 0, it);
648			it->it_create_mode &= ~M_CHECK_STALE;
649			if (rc)
650				GOTO(out_openerr, rc);
651
652			goto restart;
653		}
654		OBD_ALLOC(*och_p, sizeof (struct obd_client_handle));
655		if (!*och_p)
656			GOTO(out_och_free, rc = -ENOMEM);
657
658		(*och_usecount)++;
659
660		/* md_intent_lock() didn't get a request ref if there was an
661		 * open error, so don't do cleanup on the request here
662		 * (bug 3430) */
663		/* XXX (green): Should not we bail out on any error here, not
664		 * just open error? */
665		rc = it_open_error(DISP_OPEN_OPEN, it);
666		if (rc)
667			GOTO(out_och_free, rc);
668
669		LASSERT(it_disposition(it, DISP_ENQ_OPEN_REF));
670
671		rc = ll_local_open(file, it, fd, *och_p);
672		if (rc)
673			GOTO(out_och_free, rc);
674	}
675	mutex_unlock(&lli->lli_och_mutex);
676	fd = NULL;
677
678	/* Must do this outside lli_och_mutex lock to prevent deadlock where
679	   different kind of OPEN lock for this same inode gets cancelled
680	   by ldlm_cancel_lru */
681	if (!S_ISREG(inode->i_mode))
682		GOTO(out_och_free, rc);
683
684	ll_capa_open(inode);
685
686	if (!lli->lli_has_smd &&
687	    (cl_is_lov_delay_create(file->f_flags) ||
688	     (file->f_mode & FMODE_WRITE) == 0)) {
689		CDEBUG(D_INODE, "object creation was delayed\n");
690		GOTO(out_och_free, rc);
691	}
692	cl_lov_delay_create_clear(&file->f_flags);
693	GOTO(out_och_free, rc);
694
695out_och_free:
696	if (rc) {
697		if (och_p && *och_p) {
698			OBD_FREE(*och_p, sizeof (struct obd_client_handle));
699			*och_p = NULL; /* OBD_FREE writes some magic there */
700			(*och_usecount)--;
701		}
702		mutex_unlock(&lli->lli_och_mutex);
703
704out_openerr:
705		if (opendir_set != 0)
706			ll_stop_statahead(inode, lli->lli_opendir_key);
707		if (fd != NULL)
708			ll_file_data_put(fd);
709	} else {
710		ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_OPEN, 1);
711	}
712
713	if (it && it_disposition(it, DISP_ENQ_OPEN_REF)) {
714		ptlrpc_req_finished(it->d.lustre.it_data);
715		it_clear_disposition(it, DISP_ENQ_OPEN_REF);
716	}
717
718	return rc;
719}
720
721static int ll_md_blocking_lease_ast(struct ldlm_lock *lock,
722			struct ldlm_lock_desc *desc, void *data, int flag)
723{
724	int rc;
725	struct lustre_handle lockh;
726
727	switch (flag) {
728	case LDLM_CB_BLOCKING:
729		ldlm_lock2handle(lock, &lockh);
730		rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
731		if (rc < 0) {
732			CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
733			return rc;
734		}
735		break;
736	case LDLM_CB_CANCELING:
737		/* do nothing */
738		break;
739	}
740	return 0;
741}
742
743/**
744 * Acquire a lease and open the file.
745 */
746static struct obd_client_handle *
747ll_lease_open(struct inode *inode, struct file *file, fmode_t fmode,
748	      __u64 open_flags)
749{
750	struct lookup_intent it = { .it_op = IT_OPEN };
751	struct ll_sb_info *sbi = ll_i2sbi(inode);
752	struct md_op_data *op_data;
753	struct ptlrpc_request *req;
754	struct lustre_handle old_handle = { 0 };
755	struct obd_client_handle *och = NULL;
756	int rc;
757	int rc2;
758
759	if (fmode != FMODE_WRITE && fmode != FMODE_READ)
760		return ERR_PTR(-EINVAL);
761
762	if (file != NULL) {
763		struct ll_inode_info *lli = ll_i2info(inode);
764		struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
765		struct obd_client_handle **och_p;
766		__u64 *och_usecount;
767
768		if (!(fmode & file->f_mode) || (file->f_mode & FMODE_EXEC))
769			return ERR_PTR(-EPERM);
770
771		/* Get the openhandle of the file */
772		rc = -EBUSY;
773		mutex_lock(&lli->lli_och_mutex);
774		if (fd->fd_lease_och != NULL) {
775			mutex_unlock(&lli->lli_och_mutex);
776			return ERR_PTR(rc);
777		}
778
779		if (fd->fd_och == NULL) {
780			if (file->f_mode & FMODE_WRITE) {
781				LASSERT(lli->lli_mds_write_och != NULL);
782				och_p = &lli->lli_mds_write_och;
783				och_usecount = &lli->lli_open_fd_write_count;
784			} else {
785				LASSERT(lli->lli_mds_read_och != NULL);
786				och_p = &lli->lli_mds_read_och;
787				och_usecount = &lli->lli_open_fd_read_count;
788			}
789			if (*och_usecount == 1) {
790				fd->fd_och = *och_p;
791				*och_p = NULL;
792				*och_usecount = 0;
793				rc = 0;
794			}
795		}
796		mutex_unlock(&lli->lli_och_mutex);
797		if (rc < 0) /* more than 1 opener */
798			return ERR_PTR(rc);
799
800		LASSERT(fd->fd_och != NULL);
801		old_handle = fd->fd_och->och_fh;
802	}
803
804	OBD_ALLOC_PTR(och);
805	if (och == NULL)
806		return ERR_PTR(-ENOMEM);
807
808	op_data = ll_prep_md_op_data(NULL, inode, inode, NULL, 0, 0,
809					LUSTRE_OPC_ANY, NULL);
810	if (IS_ERR(op_data))
811		GOTO(out, rc = PTR_ERR(op_data));
812
813	/* To tell the MDT this openhandle is from the same owner */
814	op_data->op_handle = old_handle;
815
816	it.it_flags = fmode | open_flags;
817	it.it_flags |= MDS_OPEN_LOCK | MDS_OPEN_BY_FID | MDS_OPEN_LEASE;
818	rc = md_intent_lock(sbi->ll_md_exp, op_data, NULL, 0, &it, 0, &req,
819				ll_md_blocking_lease_ast,
820	/* LDLM_FL_NO_LRU: To not put the lease lock into LRU list, otherwise
821	 * it can be cancelled which may mislead applications that the lease is
822	 * broken;
823	 * LDLM_FL_EXCL: Set this flag so that it won't be matched by normal
824	 * open in ll_md_blocking_ast(). Otherwise as ll_md_blocking_lease_ast
825	 * doesn't deal with openhandle, so normal openhandle will be leaked. */
826				LDLM_FL_NO_LRU | LDLM_FL_EXCL);
827	ll_finish_md_op_data(op_data);
828	ptlrpc_req_finished(req);
829	if (rc < 0)
830		GOTO(out_release_it, rc);
831
832	if (it_disposition(&it, DISP_LOOKUP_NEG))
833		GOTO(out_release_it, rc = -ENOENT);
834
835	rc = it_open_error(DISP_OPEN_OPEN, &it);
836	if (rc)
837		GOTO(out_release_it, rc);
838
839	LASSERT(it_disposition(&it, DISP_ENQ_OPEN_REF));
840	ll_och_fill(sbi->ll_md_exp, &it, och);
841
842	if (!it_disposition(&it, DISP_OPEN_LEASE)) /* old server? */
843		GOTO(out_close, rc = -EOPNOTSUPP);
844
845	/* already get lease, handle lease lock */
846	ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
847	if (it.d.lustre.it_lock_mode == 0 ||
848	    it.d.lustre.it_lock_bits != MDS_INODELOCK_OPEN) {
849		/* open lock must return for lease */
850		CERROR(DFID "lease granted but no open lock, %d/%llu.\n",
851			PFID(ll_inode2fid(inode)), it.d.lustre.it_lock_mode,
852			it.d.lustre.it_lock_bits);
853		GOTO(out_close, rc = -EPROTO);
854	}
855
856	ll_intent_release(&it);
857	return och;
858
859out_close:
860	rc2 = ll_close_inode_openhandle(sbi->ll_md_exp, inode, och, NULL);
861	if (rc2)
862		CERROR("Close openhandle returned %d\n", rc2);
863
864	/* cancel open lock */
865	if (it.d.lustre.it_lock_mode != 0) {
866		ldlm_lock_decref_and_cancel(&och->och_lease_handle,
867						it.d.lustre.it_lock_mode);
868		it.d.lustre.it_lock_mode = 0;
869	}
870out_release_it:
871	ll_intent_release(&it);
872out:
873	OBD_FREE_PTR(och);
874	return ERR_PTR(rc);
875}
876
877/**
878 * Release lease and close the file.
879 * It will check if the lease has ever broken.
880 */
881static int ll_lease_close(struct obd_client_handle *och, struct inode *inode,
882			  bool *lease_broken)
883{
884	struct ldlm_lock *lock;
885	bool cancelled = true;
886	int rc;
887
888	lock = ldlm_handle2lock(&och->och_lease_handle);
889	if (lock != NULL) {
890		lock_res_and_lock(lock);
891		cancelled = ldlm_is_cancel(lock);
892		unlock_res_and_lock(lock);
893		ldlm_lock_put(lock);
894	}
895
896	CDEBUG(D_INODE, "lease for "DFID" broken? %d\n",
897		PFID(&ll_i2info(inode)->lli_fid), cancelled);
898
899	if (!cancelled)
900		ldlm_cli_cancel(&och->och_lease_handle, 0);
901	if (lease_broken != NULL)
902		*lease_broken = cancelled;
903
904	rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp, inode, och,
905				       NULL);
906	return rc;
907}
908
909/* Fills the obdo with the attributes for the lsm */
910static int ll_lsm_getattr(struct lov_stripe_md *lsm, struct obd_export *exp,
911			  struct obd_capa *capa, struct obdo *obdo,
912			  __u64 ioepoch, int sync)
913{
914	struct ptlrpc_request_set *set;
915	struct obd_info	    oinfo = { { { 0 } } };
916	int			rc;
917
918	LASSERT(lsm != NULL);
919
920	oinfo.oi_md = lsm;
921	oinfo.oi_oa = obdo;
922	oinfo.oi_oa->o_oi = lsm->lsm_oi;
923	oinfo.oi_oa->o_mode = S_IFREG;
924	oinfo.oi_oa->o_ioepoch = ioepoch;
925	oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE |
926			       OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
927			       OBD_MD_FLBLKSZ | OBD_MD_FLATIME |
928			       OBD_MD_FLMTIME | OBD_MD_FLCTIME |
929			       OBD_MD_FLGROUP | OBD_MD_FLEPOCH |
930			       OBD_MD_FLDATAVERSION;
931	oinfo.oi_capa = capa;
932	if (sync) {
933		oinfo.oi_oa->o_valid |= OBD_MD_FLFLAGS;
934		oinfo.oi_oa->o_flags |= OBD_FL_SRVLOCK;
935	}
936
937	set = ptlrpc_prep_set();
938	if (set == NULL) {
939		CERROR("can't allocate ptlrpc set\n");
940		rc = -ENOMEM;
941	} else {
942		rc = obd_getattr_async(exp, &oinfo, set);
943		if (rc == 0)
944			rc = ptlrpc_set_wait(set);
945		ptlrpc_set_destroy(set);
946	}
947	if (rc == 0)
948		oinfo.oi_oa->o_valid &= (OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ |
949					 OBD_MD_FLATIME | OBD_MD_FLMTIME |
950					 OBD_MD_FLCTIME | OBD_MD_FLSIZE |
951					 OBD_MD_FLDATAVERSION);
952	return rc;
953}
954
955/**
956  * Performs the getattr on the inode and updates its fields.
957  * If @sync != 0, perform the getattr under the server-side lock.
958  */
959int ll_inode_getattr(struct inode *inode, struct obdo *obdo,
960		     __u64 ioepoch, int sync)
961{
962	struct obd_capa      *capa = ll_mdscapa_get(inode);
963	struct lov_stripe_md *lsm;
964	int rc;
965
966	lsm = ccc_inode_lsm_get(inode);
967	rc = ll_lsm_getattr(lsm, ll_i2dtexp(inode),
968			    capa, obdo, ioepoch, sync);
969	capa_put(capa);
970	if (rc == 0) {
971		struct ost_id *oi = lsm ? &lsm->lsm_oi : &obdo->o_oi;
972
973		obdo_refresh_inode(inode, obdo, obdo->o_valid);
974		CDEBUG(D_INODE, "objid "DOSTID" size %llu, blocks %llu,"
975		       " blksize %lu\n", POSTID(oi), i_size_read(inode),
976		       (unsigned long long)inode->i_blocks,
977		       (unsigned long)ll_inode_blksize(inode));
978	}
979	ccc_inode_lsm_put(inode, lsm);
980	return rc;
981}
982
983int ll_merge_lvb(const struct lu_env *env, struct inode *inode)
984{
985	struct ll_inode_info *lli = ll_i2info(inode);
986	struct cl_object *obj = lli->lli_clob;
987	struct cl_attr *attr = ccc_env_thread_attr(env);
988	struct ost_lvb lvb;
989	int rc = 0;
990
991	ll_inode_size_lock(inode);
992	/* merge timestamps the most recently obtained from mds with
993	   timestamps obtained from osts */
994	LTIME_S(inode->i_atime) = lli->lli_lvb.lvb_atime;
995	LTIME_S(inode->i_mtime) = lli->lli_lvb.lvb_mtime;
996	LTIME_S(inode->i_ctime) = lli->lli_lvb.lvb_ctime;
997	inode_init_lvb(inode, &lvb);
998
999	cl_object_attr_lock(obj);
1000	rc = cl_object_attr_get(env, obj, attr);
1001	cl_object_attr_unlock(obj);
1002
1003	if (rc == 0) {
1004		if (lvb.lvb_atime < attr->cat_atime)
1005			lvb.lvb_atime = attr->cat_atime;
1006		if (lvb.lvb_ctime < attr->cat_ctime)
1007			lvb.lvb_ctime = attr->cat_ctime;
1008		if (lvb.lvb_mtime < attr->cat_mtime)
1009			lvb.lvb_mtime = attr->cat_mtime;
1010
1011		CDEBUG(D_VFSTRACE, DFID" updating i_size "LPU64"\n",
1012				PFID(&lli->lli_fid), attr->cat_size);
1013		cl_isize_write_nolock(inode, attr->cat_size);
1014
1015		inode->i_blocks = attr->cat_blocks;
1016
1017		LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
1018		LTIME_S(inode->i_atime) = lvb.lvb_atime;
1019		LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
1020	}
1021	ll_inode_size_unlock(inode);
1022
1023	return rc;
1024}
1025
1026int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
1027		     lstat_t *st)
1028{
1029	struct obdo obdo = { 0 };
1030	int rc;
1031
1032	rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, &obdo, 0, 0);
1033	if (rc == 0) {
1034		st->st_size   = obdo.o_size;
1035		st->st_blocks = obdo.o_blocks;
1036		st->st_mtime  = obdo.o_mtime;
1037		st->st_atime  = obdo.o_atime;
1038		st->st_ctime  = obdo.o_ctime;
1039	}
1040	return rc;
1041}
1042
1043static bool file_is_noatime(const struct file *file)
1044{
1045	const struct vfsmount *mnt = file->f_path.mnt;
1046	const struct inode *inode = file->f_path.dentry->d_inode;
1047
1048	/* Adapted from file_accessed() and touch_atime().*/
1049	if (file->f_flags & O_NOATIME)
1050		return true;
1051
1052	if (inode->i_flags & S_NOATIME)
1053		return true;
1054
1055	if (IS_NOATIME(inode))
1056		return true;
1057
1058	if (mnt->mnt_flags & (MNT_NOATIME | MNT_READONLY))
1059		return true;
1060
1061	if ((mnt->mnt_flags & MNT_NODIRATIME) && S_ISDIR(inode->i_mode))
1062		return true;
1063
1064	if ((inode->i_sb->s_flags & MS_NODIRATIME) && S_ISDIR(inode->i_mode))
1065		return true;
1066
1067	return false;
1068}
1069
1070void ll_io_init(struct cl_io *io, const struct file *file, int write)
1071{
1072	struct inode *inode = file->f_dentry->d_inode;
1073
1074	io->u.ci_rw.crw_nonblock = file->f_flags & O_NONBLOCK;
1075	if (write) {
1076		io->u.ci_wr.wr_append = !!(file->f_flags & O_APPEND);
1077		io->u.ci_wr.wr_sync = file->f_flags & O_SYNC ||
1078				      file->f_flags & O_DIRECT ||
1079				      IS_SYNC(inode);
1080	}
1081	io->ci_obj     = ll_i2info(inode)->lli_clob;
1082	io->ci_lockreq = CILR_MAYBE;
1083	if (ll_file_nolock(file)) {
1084		io->ci_lockreq = CILR_NEVER;
1085		io->ci_no_srvlock = 1;
1086	} else if (file->f_flags & O_APPEND) {
1087		io->ci_lockreq = CILR_MANDATORY;
1088	}
1089
1090	io->ci_noatime = file_is_noatime(file);
1091}
1092
1093static ssize_t
1094ll_file_io_generic(const struct lu_env *env, struct vvp_io_args *args,
1095		   struct file *file, enum cl_io_type iot,
1096		   loff_t *ppos, size_t count)
1097{
1098	struct ll_inode_info *lli = ll_i2info(file->f_dentry->d_inode);
1099	struct ll_file_data  *fd  = LUSTRE_FPRIVATE(file);
1100	struct cl_io	 *io;
1101	ssize_t	       result;
1102
1103restart:
1104	io = ccc_env_thread_io(env);
1105	ll_io_init(io, file, iot == CIT_WRITE);
1106
1107	if (cl_io_rw_init(env, io, iot, *ppos, count) == 0) {
1108		struct vvp_io *vio = vvp_env_io(env);
1109		struct ccc_io *cio = ccc_env_io(env);
1110		int write_mutex_locked = 0;
1111
1112		cio->cui_fd  = LUSTRE_FPRIVATE(file);
1113		vio->cui_io_subtype = args->via_io_subtype;
1114
1115		switch (vio->cui_io_subtype) {
1116		case IO_NORMAL:
1117			cio->cui_iov = args->u.normal.via_iov;
1118			cio->cui_nrsegs = args->u.normal.via_nrsegs;
1119			cio->cui_tot_nrsegs = cio->cui_nrsegs;
1120			cio->cui_iocb = args->u.normal.via_iocb;
1121			if ((iot == CIT_WRITE) &&
1122			    !(cio->cui_fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1123				if (mutex_lock_interruptible(&lli->
1124							       lli_write_mutex))
1125					GOTO(out, result = -ERESTARTSYS);
1126				write_mutex_locked = 1;
1127			} else if (iot == CIT_READ) {
1128				down_read(&lli->lli_trunc_sem);
1129			}
1130			break;
1131		case IO_SPLICE:
1132			vio->u.splice.cui_pipe = args->u.splice.via_pipe;
1133			vio->u.splice.cui_flags = args->u.splice.via_flags;
1134			break;
1135		default:
1136			CERROR("Unknown IO type - %u\n", vio->cui_io_subtype);
1137			LBUG();
1138		}
1139		result = cl_io_loop(env, io);
1140		if (write_mutex_locked)
1141			mutex_unlock(&lli->lli_write_mutex);
1142		else if (args->via_io_subtype == IO_NORMAL && iot == CIT_READ)
1143			up_read(&lli->lli_trunc_sem);
1144	} else {
1145		/* cl_io_rw_init() handled IO */
1146		result = io->ci_result;
1147	}
1148
1149	if (io->ci_nob > 0) {
1150		result = io->ci_nob;
1151		*ppos = io->u.ci_wr.wr.crw_pos;
1152	}
1153	GOTO(out, result);
1154out:
1155	cl_io_fini(env, io);
1156	/* If any bit been read/written (result != 0), we just return
1157	 * short read/write instead of restart io. */
1158	if ((result == 0 || result == -ENODATA) && io->ci_need_restart) {
1159		CDEBUG(D_VFSTRACE, "Restart %s on %s from %lld, count:%zd\n",
1160		       iot == CIT_READ ? "read" : "write",
1161		       file->f_dentry->d_name.name, *ppos, count);
1162		LASSERTF(io->ci_nob == 0, "%zd", io->ci_nob);
1163		goto restart;
1164	}
1165
1166	if (iot == CIT_READ) {
1167		if (result >= 0)
1168			ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
1169					   LPROC_LL_READ_BYTES, result);
1170	} else if (iot == CIT_WRITE) {
1171		if (result >= 0) {
1172			ll_stats_ops_tally(ll_i2sbi(file->f_dentry->d_inode),
1173					   LPROC_LL_WRITE_BYTES, result);
1174			fd->fd_write_failed = false;
1175		} else if (result != -ERESTARTSYS) {
1176			fd->fd_write_failed = true;
1177		}
1178	}
1179
1180	return result;
1181}
1182
1183static ssize_t ll_file_aio_read(struct kiocb *iocb, const struct iovec *iov,
1184				unsigned long nr_segs, loff_t pos)
1185{
1186	struct lu_env      *env;
1187	struct vvp_io_args *args;
1188	size_t	      count = 0;
1189	ssize_t	     result;
1190	int		 refcheck;
1191
1192	result = generic_segment_checks(iov, &nr_segs, &count, VERIFY_WRITE);
1193	if (result)
1194		return result;
1195
1196	env = cl_env_get(&refcheck);
1197	if (IS_ERR(env))
1198		return PTR_ERR(env);
1199
1200	args = vvp_env_args(env, IO_NORMAL);
1201	args->u.normal.via_iov = (struct iovec *)iov;
1202	args->u.normal.via_nrsegs = nr_segs;
1203	args->u.normal.via_iocb = iocb;
1204
1205	result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_READ,
1206				    &iocb->ki_pos, count);
1207	cl_env_put(env, &refcheck);
1208	return result;
1209}
1210
1211static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
1212			    loff_t *ppos)
1213{
1214	struct lu_env *env;
1215	struct iovec  *local_iov;
1216	struct kiocb  *kiocb;
1217	ssize_t	result;
1218	int	    refcheck;
1219
1220	env = cl_env_get(&refcheck);
1221	if (IS_ERR(env))
1222		return PTR_ERR(env);
1223
1224	local_iov = &vvp_env_info(env)->vti_local_iov;
1225	kiocb = &vvp_env_info(env)->vti_kiocb;
1226	local_iov->iov_base = (void __user *)buf;
1227	local_iov->iov_len = count;
1228	init_sync_kiocb(kiocb, file);
1229	kiocb->ki_pos = *ppos;
1230	kiocb->ki_nbytes = count;
1231
1232	result = ll_file_aio_read(kiocb, local_iov, 1, kiocb->ki_pos);
1233	*ppos = kiocb->ki_pos;
1234
1235	cl_env_put(env, &refcheck);
1236	return result;
1237}
1238
1239/*
1240 * Write to a file (through the page cache).
1241 */
1242static ssize_t ll_file_aio_write(struct kiocb *iocb, const struct iovec *iov,
1243				 unsigned long nr_segs, loff_t pos)
1244{
1245	struct lu_env      *env;
1246	struct vvp_io_args *args;
1247	size_t	      count = 0;
1248	ssize_t	     result;
1249	int		 refcheck;
1250
1251	result = generic_segment_checks(iov, &nr_segs, &count, VERIFY_READ);
1252	if (result)
1253		return result;
1254
1255	env = cl_env_get(&refcheck);
1256	if (IS_ERR(env))
1257		return PTR_ERR(env);
1258
1259	args = vvp_env_args(env, IO_NORMAL);
1260	args->u.normal.via_iov = (struct iovec *)iov;
1261	args->u.normal.via_nrsegs = nr_segs;
1262	args->u.normal.via_iocb = iocb;
1263
1264	result = ll_file_io_generic(env, args, iocb->ki_filp, CIT_WRITE,
1265				  &iocb->ki_pos, count);
1266	cl_env_put(env, &refcheck);
1267	return result;
1268}
1269
1270static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
1271			     loff_t *ppos)
1272{
1273	struct lu_env *env;
1274	struct iovec  *local_iov;
1275	struct kiocb  *kiocb;
1276	ssize_t	result;
1277	int	    refcheck;
1278
1279	env = cl_env_get(&refcheck);
1280	if (IS_ERR(env))
1281		return PTR_ERR(env);
1282
1283	local_iov = &vvp_env_info(env)->vti_local_iov;
1284	kiocb = &vvp_env_info(env)->vti_kiocb;
1285	local_iov->iov_base = (void __user *)buf;
1286	local_iov->iov_len = count;
1287	init_sync_kiocb(kiocb, file);
1288	kiocb->ki_pos = *ppos;
1289	kiocb->ki_nbytes = count;
1290
1291	result = ll_file_aio_write(kiocb, local_iov, 1, kiocb->ki_pos);
1292	*ppos = kiocb->ki_pos;
1293
1294	cl_env_put(env, &refcheck);
1295	return result;
1296}
1297
1298
1299
1300/*
1301 * Send file content (through pagecache) somewhere with helper
1302 */
1303static ssize_t ll_file_splice_read(struct file *in_file, loff_t *ppos,
1304				   struct pipe_inode_info *pipe, size_t count,
1305				   unsigned int flags)
1306{
1307	struct lu_env      *env;
1308	struct vvp_io_args *args;
1309	ssize_t	     result;
1310	int		 refcheck;
1311
1312	env = cl_env_get(&refcheck);
1313	if (IS_ERR(env))
1314		return PTR_ERR(env);
1315
1316	args = vvp_env_args(env, IO_SPLICE);
1317	args->u.splice.via_pipe = pipe;
1318	args->u.splice.via_flags = flags;
1319
1320	result = ll_file_io_generic(env, args, in_file, CIT_READ, ppos, count);
1321	cl_env_put(env, &refcheck);
1322	return result;
1323}
1324
1325static int ll_lov_recreate(struct inode *inode, struct ost_id *oi,
1326			   obd_count ost_idx)
1327{
1328	struct obd_export *exp = ll_i2dtexp(inode);
1329	struct obd_trans_info oti = { 0 };
1330	struct obdo *oa = NULL;
1331	int lsm_size;
1332	int rc = 0;
1333	struct lov_stripe_md *lsm = NULL, *lsm2;
1334
1335	OBDO_ALLOC(oa);
1336	if (oa == NULL)
1337		return -ENOMEM;
1338
1339	lsm = ccc_inode_lsm_get(inode);
1340	if (!lsm_has_objects(lsm))
1341		GOTO(out, rc = -ENOENT);
1342
1343	lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
1344		   (lsm->lsm_stripe_count));
1345
1346	OBD_ALLOC_LARGE(lsm2, lsm_size);
1347	if (lsm2 == NULL)
1348		GOTO(out, rc = -ENOMEM);
1349
1350	oa->o_oi = *oi;
1351	oa->o_nlink = ost_idx;
1352	oa->o_flags |= OBD_FL_RECREATE_OBJS;
1353	oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
1354	obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
1355				   OBD_MD_FLMTIME | OBD_MD_FLCTIME);
1356	obdo_set_parent_fid(oa, &ll_i2info(inode)->lli_fid);
1357	memcpy(lsm2, lsm, lsm_size);
1358	ll_inode_size_lock(inode);
1359	rc = obd_create(NULL, exp, oa, &lsm2, &oti);
1360	ll_inode_size_unlock(inode);
1361
1362	OBD_FREE_LARGE(lsm2, lsm_size);
1363	GOTO(out, rc);
1364out:
1365	ccc_inode_lsm_put(inode, lsm);
1366	OBDO_FREE(oa);
1367	return rc;
1368}
1369
1370static int ll_lov_recreate_obj(struct inode *inode, unsigned long arg)
1371{
1372	struct ll_recreate_obj ucreat;
1373	struct ost_id		oi;
1374
1375	if (!capable(CFS_CAP_SYS_ADMIN))
1376		return -EPERM;
1377
1378	if (copy_from_user(&ucreat, (struct ll_recreate_obj *)arg,
1379			   sizeof(ucreat)))
1380		return -EFAULT;
1381
1382	ostid_set_seq_mdt0(&oi);
1383	ostid_set_id(&oi, ucreat.lrc_id);
1384	return ll_lov_recreate(inode, &oi, ucreat.lrc_ost_idx);
1385}
1386
1387static int ll_lov_recreate_fid(struct inode *inode, unsigned long arg)
1388{
1389	struct lu_fid	fid;
1390	struct ost_id	oi;
1391	obd_count	ost_idx;
1392
1393	if (!capable(CFS_CAP_SYS_ADMIN))
1394		return -EPERM;
1395
1396	if (copy_from_user(&fid, (struct lu_fid *)arg, sizeof(fid)))
1397		return -EFAULT;
1398
1399	fid_to_ostid(&fid, &oi);
1400	ost_idx = (fid_seq(&fid) >> 16) & 0xffff;
1401	return ll_lov_recreate(inode, &oi, ost_idx);
1402}
1403
1404int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
1405			     int flags, struct lov_user_md *lum, int lum_size)
1406{
1407	struct lov_stripe_md *lsm = NULL;
1408	struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
1409	int rc = 0;
1410
1411	lsm = ccc_inode_lsm_get(inode);
1412	if (lsm != NULL) {
1413		ccc_inode_lsm_put(inode, lsm);
1414		CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
1415		       inode->i_ino);
1416		GOTO(out, rc = -EEXIST);
1417	}
1418
1419	ll_inode_size_lock(inode);
1420	rc = ll_intent_file_open(file, lum, lum_size, &oit);
1421	if (rc)
1422		GOTO(out_unlock, rc);
1423	rc = oit.d.lustre.it_status;
1424	if (rc < 0)
1425		GOTO(out_req_free, rc);
1426
1427	ll_release_openhandle(file->f_dentry, &oit);
1428
1429out_unlock:
1430	ll_inode_size_unlock(inode);
1431	ll_intent_release(&oit);
1432	ccc_inode_lsm_put(inode, lsm);
1433out:
1434	cl_lov_delay_create_clear(&file->f_flags);
1435	return rc;
1436out_req_free:
1437	ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
1438	goto out;
1439}
1440
1441int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
1442			     struct lov_mds_md **lmmp, int *lmm_size,
1443			     struct ptlrpc_request **request)
1444{
1445	struct ll_sb_info *sbi = ll_i2sbi(inode);
1446	struct mdt_body  *body;
1447	struct lov_mds_md *lmm = NULL;
1448	struct ptlrpc_request *req = NULL;
1449	struct md_op_data *op_data;
1450	int rc, lmmsize;
1451
1452	rc = ll_get_default_mdsize(sbi, &lmmsize);
1453	if (rc)
1454		return rc;
1455
1456	op_data = ll_prep_md_op_data(NULL, inode, NULL, filename,
1457				     strlen(filename), lmmsize,
1458				     LUSTRE_OPC_ANY, NULL);
1459	if (IS_ERR(op_data))
1460		return PTR_ERR(op_data);
1461
1462	op_data->op_valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA;
1463	rc = md_getattr_name(sbi->ll_md_exp, op_data, &req);
1464	ll_finish_md_op_data(op_data);
1465	if (rc < 0) {
1466		CDEBUG(D_INFO, "md_getattr_name failed "
1467		       "on %s: rc %d\n", filename, rc);
1468		GOTO(out, rc);
1469	}
1470
1471	body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
1472	LASSERT(body != NULL); /* checked by mdc_getattr_name */
1473
1474	lmmsize = body->eadatasize;
1475
1476	if (!(body->valid & (OBD_MD_FLEASIZE | OBD_MD_FLDIREA)) ||
1477			lmmsize == 0) {
1478		GOTO(out, rc = -ENODATA);
1479	}
1480
1481	lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_MDT_MD, lmmsize);
1482	LASSERT(lmm != NULL);
1483
1484	if ((lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V1)) &&
1485	    (lmm->lmm_magic != cpu_to_le32(LOV_MAGIC_V3))) {
1486		GOTO(out, rc = -EPROTO);
1487	}
1488
1489	/*
1490	 * This is coming from the MDS, so is probably in
1491	 * little endian.  We convert it to host endian before
1492	 * passing it to userspace.
1493	 */
1494	if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC)) {
1495		int stripe_count;
1496
1497		stripe_count = le16_to_cpu(lmm->lmm_stripe_count);
1498		if (le32_to_cpu(lmm->lmm_pattern) & LOV_PATTERN_F_RELEASED)
1499			stripe_count = 0;
1500
1501		/* if function called for directory - we should
1502		 * avoid swab not existent lsm objects */
1503		if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) {
1504			lustre_swab_lov_user_md_v1((struct lov_user_md_v1 *)lmm);
1505			if (S_ISREG(body->mode))
1506				lustre_swab_lov_user_md_objects(
1507				 ((struct lov_user_md_v1 *)lmm)->lmm_objects,
1508				 stripe_count);
1509		} else if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) {
1510			lustre_swab_lov_user_md_v3((struct lov_user_md_v3 *)lmm);
1511			if (S_ISREG(body->mode))
1512				lustre_swab_lov_user_md_objects(
1513				 ((struct lov_user_md_v3 *)lmm)->lmm_objects,
1514				 stripe_count);
1515		}
1516	}
1517
1518out:
1519	*lmmp = lmm;
1520	*lmm_size = lmmsize;
1521	*request = req;
1522	return rc;
1523}
1524
1525static int ll_lov_setea(struct inode *inode, struct file *file,
1526			    unsigned long arg)
1527{
1528	int			 flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
1529	struct lov_user_md	*lump;
1530	int			 lum_size = sizeof(struct lov_user_md) +
1531					    sizeof(struct lov_user_ost_data);
1532	int			 rc;
1533
1534	if (!capable(CFS_CAP_SYS_ADMIN))
1535		return -EPERM;
1536
1537	OBD_ALLOC_LARGE(lump, lum_size);
1538	if (lump == NULL)
1539		return -ENOMEM;
1540
1541	if (copy_from_user(lump, (struct lov_user_md  *)arg, lum_size)) {
1542		OBD_FREE_LARGE(lump, lum_size);
1543		return -EFAULT;
1544	}
1545
1546	rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
1547
1548	OBD_FREE_LARGE(lump, lum_size);
1549	return rc;
1550}
1551
1552static int ll_lov_setstripe(struct inode *inode, struct file *file,
1553			    unsigned long arg)
1554{
1555	struct lov_user_md_v3	 lumv3;
1556	struct lov_user_md_v1	*lumv1 = (struct lov_user_md_v1 *)&lumv3;
1557	struct lov_user_md_v1	*lumv1p = (struct lov_user_md_v1 *)arg;
1558	struct lov_user_md_v3	*lumv3p = (struct lov_user_md_v3 *)arg;
1559	int			 lum_size, rc;
1560	int			 flags = FMODE_WRITE;
1561
1562	/* first try with v1 which is smaller than v3 */
1563	lum_size = sizeof(struct lov_user_md_v1);
1564	if (copy_from_user(lumv1, lumv1p, lum_size))
1565		return -EFAULT;
1566
1567	if (lumv1->lmm_magic == LOV_USER_MAGIC_V3) {
1568		lum_size = sizeof(struct lov_user_md_v3);
1569		if (copy_from_user(&lumv3, lumv3p, lum_size))
1570			return -EFAULT;
1571	}
1572
1573	rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
1574	if (rc == 0) {
1575		struct lov_stripe_md *lsm;
1576		__u32 gen;
1577
1578		put_user(0, &lumv1p->lmm_stripe_count);
1579
1580		ll_layout_refresh(inode, &gen);
1581		lsm = ccc_inode_lsm_get(inode);
1582		rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
1583				   0, lsm, (void *)arg);
1584		ccc_inode_lsm_put(inode, lsm);
1585	}
1586	return rc;
1587}
1588
1589static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
1590{
1591	struct lov_stripe_md *lsm;
1592	int rc = -ENODATA;
1593
1594	lsm = ccc_inode_lsm_get(inode);
1595	if (lsm != NULL)
1596		rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0,
1597				   lsm, (void *)arg);
1598	ccc_inode_lsm_put(inode, lsm);
1599	return rc;
1600}
1601
1602static int
1603ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1604{
1605	struct ll_inode_info   *lli = ll_i2info(inode);
1606	struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1607	struct ccc_grouplock    grouplock;
1608	int		     rc;
1609
1610	if (ll_file_nolock(file))
1611		return -EOPNOTSUPP;
1612
1613	spin_lock(&lli->lli_lock);
1614	if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1615		CWARN("group lock already existed with gid %lu\n",
1616		      fd->fd_grouplock.cg_gid);
1617		spin_unlock(&lli->lli_lock);
1618		return -EINVAL;
1619	}
1620	LASSERT(fd->fd_grouplock.cg_lock == NULL);
1621	spin_unlock(&lli->lli_lock);
1622
1623	rc = cl_get_grouplock(cl_i2info(inode)->lli_clob,
1624			      arg, (file->f_flags & O_NONBLOCK), &grouplock);
1625	if (rc)
1626		return rc;
1627
1628	spin_lock(&lli->lli_lock);
1629	if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
1630		spin_unlock(&lli->lli_lock);
1631		CERROR("another thread just won the race\n");
1632		cl_put_grouplock(&grouplock);
1633		return -EINVAL;
1634	}
1635
1636	fd->fd_flags |= LL_FILE_GROUP_LOCKED;
1637	fd->fd_grouplock = grouplock;
1638	spin_unlock(&lli->lli_lock);
1639
1640	CDEBUG(D_INFO, "group lock %lu obtained\n", arg);
1641	return 0;
1642}
1643
1644int ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg)
1645{
1646	struct ll_inode_info   *lli = ll_i2info(inode);
1647	struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
1648	struct ccc_grouplock    grouplock;
1649
1650	spin_lock(&lli->lli_lock);
1651	if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
1652		spin_unlock(&lli->lli_lock);
1653		CWARN("no group lock held\n");
1654		return -EINVAL;
1655	}
1656	LASSERT(fd->fd_grouplock.cg_lock != NULL);
1657
1658	if (fd->fd_grouplock.cg_gid != arg) {
1659		CWARN("group lock %lu doesn't match current id %lu\n",
1660		       arg, fd->fd_grouplock.cg_gid);
1661		spin_unlock(&lli->lli_lock);
1662		return -EINVAL;
1663	}
1664
1665	grouplock = fd->fd_grouplock;
1666	memset(&fd->fd_grouplock, 0, sizeof(fd->fd_grouplock));
1667	fd->fd_flags &= ~LL_FILE_GROUP_LOCKED;
1668	spin_unlock(&lli->lli_lock);
1669
1670	cl_put_grouplock(&grouplock);
1671	CDEBUG(D_INFO, "group lock %lu released\n", arg);
1672	return 0;
1673}
1674
1675/**
1676 * Close inode open handle
1677 *
1678 * \param dentry [in]     dentry which contains the inode
1679 * \param it     [in,out] intent which contains open info and result
1680 *
1681 * \retval 0     success
1682 * \retval <0    failure
1683 */
1684int ll_release_openhandle(struct dentry *dentry, struct lookup_intent *it)
1685{
1686	struct inode *inode = dentry->d_inode;
1687	struct obd_client_handle *och;
1688	int rc;
1689
1690	LASSERT(inode);
1691
1692	/* Root ? Do nothing. */
1693	if (dentry->d_inode->i_sb->s_root == dentry)
1694		return 0;
1695
1696	/* No open handle to close? Move away */
1697	if (!it_disposition(it, DISP_OPEN_OPEN))
1698		return 0;
1699
1700	LASSERT(it_open_error(DISP_OPEN_OPEN, it) == 0);
1701
1702	OBD_ALLOC(och, sizeof(*och));
1703	if (!och)
1704		GOTO(out, rc = -ENOMEM);
1705
1706	ll_och_fill(ll_i2sbi(inode)->ll_md_exp, it, och);
1707
1708	rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp,
1709				       inode, och, NULL);
1710out:
1711	/* this one is in place of ll_file_open */
1712	if (it_disposition(it, DISP_ENQ_OPEN_REF)) {
1713		ptlrpc_req_finished(it->d.lustre.it_data);
1714		it_clear_disposition(it, DISP_ENQ_OPEN_REF);
1715	}
1716	return rc;
1717}
1718
1719/**
1720 * Get size for inode for which FIEMAP mapping is requested.
1721 * Make the FIEMAP get_info call and returns the result.
1722 */
1723static int ll_do_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
1724			size_t num_bytes)
1725{
1726	struct obd_export *exp = ll_i2dtexp(inode);
1727	struct lov_stripe_md *lsm = NULL;
1728	struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
1729	__u32 vallen = num_bytes;
1730	int rc;
1731
1732	/* Checks for fiemap flags */
1733	if (fiemap->fm_flags & ~LUSTRE_FIEMAP_FLAGS_COMPAT) {
1734		fiemap->fm_flags &= ~LUSTRE_FIEMAP_FLAGS_COMPAT;
1735		return -EBADR;
1736	}
1737
1738	/* Check for FIEMAP_FLAG_SYNC */
1739	if (fiemap->fm_flags & FIEMAP_FLAG_SYNC) {
1740		rc = filemap_fdatawrite(inode->i_mapping);
1741		if (rc)
1742			return rc;
1743	}
1744
1745	lsm = ccc_inode_lsm_get(inode);
1746	if (lsm == NULL)
1747		return -ENOENT;
1748
1749	/* If the stripe_count > 1 and the application does not understand
1750	 * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
1751	 */
1752	if (lsm->lsm_stripe_count > 1 &&
1753	    !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
1754		GOTO(out, rc = -EOPNOTSUPP);
1755
1756	fm_key.oa.o_oi = lsm->lsm_oi;
1757	fm_key.oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
1758
1759	obdo_from_inode(&fm_key.oa, inode, OBD_MD_FLSIZE);
1760	obdo_set_parent_fid(&fm_key.oa, &ll_i2info(inode)->lli_fid);
1761	/* If filesize is 0, then there would be no objects for mapping */
1762	if (fm_key.oa.o_size == 0) {
1763		fiemap->fm_mapped_extents = 0;
1764		GOTO(out, rc = 0);
1765	}
1766
1767	memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
1768
1769	rc = obd_get_info(NULL, exp, sizeof(fm_key), &fm_key, &vallen,
1770			  fiemap, lsm);
1771	if (rc)
1772		CERROR("obd_get_info failed: rc = %d\n", rc);
1773
1774out:
1775	ccc_inode_lsm_put(inode, lsm);
1776	return rc;
1777}
1778
1779int ll_fid2path(struct inode *inode, void *arg)
1780{
1781	struct obd_export	*exp = ll_i2mdexp(inode);
1782	struct getinfo_fid2path	*gfout, *gfin;
1783	int			 outsize, rc;
1784
1785	if (!capable(CFS_CAP_DAC_READ_SEARCH) &&
1786	    !(ll_i2sbi(inode)->ll_flags & LL_SBI_USER_FID2PATH))
1787		return -EPERM;
1788
1789	/* Need to get the buflen */
1790	OBD_ALLOC_PTR(gfin);
1791	if (gfin == NULL)
1792		return -ENOMEM;
1793	if (copy_from_user(gfin, arg, sizeof(*gfin))) {
1794		OBD_FREE_PTR(gfin);
1795		return -EFAULT;
1796	}
1797
1798	outsize = sizeof(*gfout) + gfin->gf_pathlen;
1799	OBD_ALLOC(gfout, outsize);
1800	if (gfout == NULL) {
1801		OBD_FREE_PTR(gfin);
1802		return -ENOMEM;
1803	}
1804	memcpy(gfout, gfin, sizeof(*gfout));
1805	OBD_FREE_PTR(gfin);
1806
1807	/* Call mdc_iocontrol */
1808	rc = obd_iocontrol(OBD_IOC_FID2PATH, exp, outsize, gfout, NULL);
1809	if (rc)
1810		GOTO(gf_free, rc);
1811
1812	if (copy_to_user(arg, gfout, outsize))
1813		rc = -EFAULT;
1814
1815gf_free:
1816	OBD_FREE(gfout, outsize);
1817	return rc;
1818}
1819
1820static int ll_ioctl_fiemap(struct inode *inode, unsigned long arg)
1821{
1822	struct ll_user_fiemap *fiemap_s;
1823	size_t num_bytes, ret_bytes;
1824	unsigned int extent_count;
1825	int rc = 0;
1826
1827	/* Get the extent count so we can calculate the size of
1828	 * required fiemap buffer */
1829	if (get_user(extent_count,
1830	    &((struct ll_user_fiemap __user *)arg)->fm_extent_count))
1831		return -EFAULT;
1832	num_bytes = sizeof(*fiemap_s) + (extent_count *
1833					 sizeof(struct ll_fiemap_extent));
1834
1835	OBD_ALLOC_LARGE(fiemap_s, num_bytes);
1836	if (fiemap_s == NULL)
1837		return -ENOMEM;
1838
1839	/* get the fiemap value */
1840	if (copy_from_user(fiemap_s, (struct ll_user_fiemap __user *)arg,
1841			   sizeof(*fiemap_s)))
1842		GOTO(error, rc = -EFAULT);
1843
1844	/* If fm_extent_count is non-zero, read the first extent since
1845	 * it is used to calculate end_offset and device from previous
1846	 * fiemap call. */
1847	if (extent_count) {
1848		if (copy_from_user(&fiemap_s->fm_extents[0],
1849		    (char __user *)arg + sizeof(*fiemap_s),
1850		    sizeof(struct ll_fiemap_extent)))
1851			GOTO(error, rc = -EFAULT);
1852	}
1853
1854	rc = ll_do_fiemap(inode, fiemap_s, num_bytes);
1855	if (rc)
1856		GOTO(error, rc);
1857
1858	ret_bytes = sizeof(struct ll_user_fiemap);
1859
1860	if (extent_count != 0)
1861		ret_bytes += (fiemap_s->fm_mapped_extents *
1862				 sizeof(struct ll_fiemap_extent));
1863
1864	if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
1865		rc = -EFAULT;
1866
1867error:
1868	OBD_FREE_LARGE(fiemap_s, num_bytes);
1869	return rc;
1870}
1871
1872/*
1873 * Read the data_version for inode.
1874 *
1875 * This value is computed using stripe object version on OST.
1876 * Version is computed using server side locking.
1877 *
1878 * @param extent_lock  Take extent lock. Not needed if a process is already
1879 *		       holding the OST object group locks.
1880 */
1881int ll_data_version(struct inode *inode, __u64 *data_version,
1882		    int extent_lock)
1883{
1884	struct lov_stripe_md	*lsm = NULL;
1885	struct ll_sb_info	*sbi = ll_i2sbi(inode);
1886	struct obdo		*obdo = NULL;
1887	int			 rc;
1888
1889	/* If no stripe, we consider version is 0. */
1890	lsm = ccc_inode_lsm_get(inode);
1891	if (!lsm_has_objects(lsm)) {
1892		*data_version = 0;
1893		CDEBUG(D_INODE, "No object for inode\n");
1894		GOTO(out, rc = 0);
1895	}
1896
1897	OBD_ALLOC_PTR(obdo);
1898	if (obdo == NULL)
1899		GOTO(out, rc = -ENOMEM);
1900
1901	rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, obdo, 0, extent_lock);
1902	if (rc == 0) {
1903		if (!(obdo->o_valid & OBD_MD_FLDATAVERSION))
1904			rc = -EOPNOTSUPP;
1905		else
1906			*data_version = obdo->o_data_version;
1907	}
1908
1909	OBD_FREE_PTR(obdo);
1910out:
1911	ccc_inode_lsm_put(inode, lsm);
1912	return rc;
1913}
1914
1915/*
1916 * Trigger a HSM release request for the provided inode.
1917 */
1918int ll_hsm_release(struct inode *inode)
1919{
1920	struct cl_env_nest nest;
1921	struct lu_env *env;
1922	struct obd_client_handle *och = NULL;
1923	__u64 data_version = 0;
1924	int rc;
1925
1926
1927	CDEBUG(D_INODE, "%s: Releasing file "DFID".\n",
1928	       ll_get_fsname(inode->i_sb, NULL, 0),
1929	       PFID(&ll_i2info(inode)->lli_fid));
1930
1931	och = ll_lease_open(inode, NULL, FMODE_WRITE, MDS_OPEN_RELEASE);
1932	if (IS_ERR(och))
1933		GOTO(out, rc = PTR_ERR(och));
1934
1935	/* Grab latest data_version and [am]time values */
1936	rc = ll_data_version(inode, &data_version, 1);
1937	if (rc != 0)
1938		GOTO(out, rc);
1939
1940	env = cl_env_nested_get(&nest);
1941	if (IS_ERR(env))
1942		GOTO(out, rc = PTR_ERR(env));
1943
1944	ll_merge_lvb(env, inode);
1945	cl_env_nested_put(&nest, env);
1946
1947	/* Release the file.
1948	 * NB: lease lock handle is released in mdc_hsm_release_pack() because
1949	 * we still need it to pack l_remote_handle to MDT. */
1950	rc = ll_close_inode_openhandle(ll_i2sbi(inode)->ll_md_exp, inode, och,
1951				       &data_version);
1952	och = NULL;
1953
1954
1955out:
1956	if (och != NULL && !IS_ERR(och)) /* close the file */
1957		ll_lease_close(och, inode, NULL);
1958
1959	return rc;
1960}
1961
1962struct ll_swap_stack {
1963	struct iattr		 ia1, ia2;
1964	__u64			 dv1, dv2;
1965	struct inode		*inode1, *inode2;
1966	bool			 check_dv1, check_dv2;
1967};
1968
1969static int ll_swap_layouts(struct file *file1, struct file *file2,
1970			   struct lustre_swap_layouts *lsl)
1971{
1972	struct mdc_swap_layouts	 msl;
1973	struct md_op_data	*op_data;
1974	__u32			 gid;
1975	__u64			 dv;
1976	struct ll_swap_stack	*llss = NULL;
1977	int			 rc;
1978
1979	OBD_ALLOC_PTR(llss);
1980	if (llss == NULL)
1981		return -ENOMEM;
1982
1983	llss->inode1 = file1->f_dentry->d_inode;
1984	llss->inode2 = file2->f_dentry->d_inode;
1985
1986	if (!S_ISREG(llss->inode2->i_mode))
1987		GOTO(free, rc = -EINVAL);
1988
1989	if (inode_permission(llss->inode1, MAY_WRITE) ||
1990	    inode_permission(llss->inode2, MAY_WRITE))
1991		GOTO(free, rc = -EPERM);
1992
1993	if (llss->inode2->i_sb != llss->inode1->i_sb)
1994		GOTO(free, rc = -EXDEV);
1995
1996	/* we use 2 bool because it is easier to swap than 2 bits */
1997	if (lsl->sl_flags & SWAP_LAYOUTS_CHECK_DV1)
1998		llss->check_dv1 = true;
1999
2000	if (lsl->sl_flags & SWAP_LAYOUTS_CHECK_DV2)
2001		llss->check_dv2 = true;
2002
2003	/* we cannot use lsl->sl_dvX directly because we may swap them */
2004	llss->dv1 = lsl->sl_dv1;
2005	llss->dv2 = lsl->sl_dv2;
2006
2007	rc = lu_fid_cmp(ll_inode2fid(llss->inode1), ll_inode2fid(llss->inode2));
2008	if (rc == 0) /* same file, done! */
2009		GOTO(free, rc = 0);
2010
2011	if (rc < 0) { /* sequentialize it */
2012		swap(llss->inode1, llss->inode2);
2013		swap(file1, file2);
2014		swap(llss->dv1, llss->dv2);
2015		swap(llss->check_dv1, llss->check_dv2);
2016	}
2017
2018	gid = lsl->sl_gid;
2019	if (gid != 0) { /* application asks to flush dirty cache */
2020		rc = ll_get_grouplock(llss->inode1, file1, gid);
2021		if (rc < 0)
2022			GOTO(free, rc);
2023
2024		rc = ll_get_grouplock(llss->inode2, file2, gid);
2025		if (rc < 0) {
2026			ll_put_grouplock(llss->inode1, file1, gid);
2027			GOTO(free, rc);
2028		}
2029	}
2030
2031	/* to be able to restore mtime and atime after swap
2032	 * we need to first save them */
2033	if (lsl->sl_flags &
2034	    (SWAP_LAYOUTS_KEEP_MTIME | SWAP_LAYOUTS_KEEP_ATIME)) {
2035		llss->ia1.ia_mtime = llss->inode1->i_mtime;
2036		llss->ia1.ia_atime = llss->inode1->i_atime;
2037		llss->ia1.ia_valid = ATTR_MTIME | ATTR_ATIME;
2038		llss->ia2.ia_mtime = llss->inode2->i_mtime;
2039		llss->ia2.ia_atime = llss->inode2->i_atime;
2040		llss->ia2.ia_valid = ATTR_MTIME | ATTR_ATIME;
2041	}
2042
2043	/* ultimate check, before swapping the layouts we check if
2044	 * dataversion has changed (if requested) */
2045	if (llss->check_dv1) {
2046		rc = ll_data_version(llss->inode1, &dv, 0);
2047		if (rc)
2048			GOTO(putgl, rc);
2049		if (dv != llss->dv1)
2050			GOTO(putgl, rc = -EAGAIN);
2051	}
2052
2053	if (llss->check_dv2) {
2054		rc = ll_data_version(llss->inode2, &dv, 0);
2055		if (rc)
2056			GOTO(putgl, rc);
2057		if (dv != llss->dv2)
2058			GOTO(putgl, rc = -EAGAIN);
2059	}
2060
2061	/* struct md_op_data is used to send the swap args to the mdt
2062	 * only flags is missing, so we use struct mdc_swap_layouts
2063	 * through the md_op_data->op_data */
2064	/* flags from user space have to be converted before they are send to
2065	 * server, no flag is sent today, they are only used on the client */
2066	msl.msl_flags = 0;
2067	rc = -ENOMEM;
2068	op_data = ll_prep_md_op_data(NULL, llss->inode1, llss->inode2, NULL, 0,
2069				     0, LUSTRE_OPC_ANY, &msl);
2070	if (IS_ERR(op_data))
2071		GOTO(free, rc = PTR_ERR(op_data));
2072
2073	rc = obd_iocontrol(LL_IOC_LOV_SWAP_LAYOUTS, ll_i2mdexp(llss->inode1),
2074			   sizeof(*op_data), op_data, NULL);
2075	ll_finish_md_op_data(op_data);
2076
2077putgl:
2078	if (gid != 0) {
2079		ll_put_grouplock(llss->inode2, file2, gid);
2080		ll_put_grouplock(llss->inode1, file1, gid);
2081	}
2082
2083	/* rc can be set from obd_iocontrol() or from a GOTO(putgl, ...) */
2084	if (rc != 0)
2085		GOTO(free, rc);
2086
2087	/* clear useless flags */
2088	if (!(lsl->sl_flags & SWAP_LAYOUTS_KEEP_MTIME)) {
2089		llss->ia1.ia_valid &= ~ATTR_MTIME;
2090		llss->ia2.ia_valid &= ~ATTR_MTIME;
2091	}
2092
2093	if (!(lsl->sl_flags & SWAP_LAYOUTS_KEEP_ATIME)) {
2094		llss->ia1.ia_valid &= ~ATTR_ATIME;
2095		llss->ia2.ia_valid &= ~ATTR_ATIME;
2096	}
2097
2098	/* update time if requested */
2099	rc = 0;
2100	if (llss->ia2.ia_valid != 0) {
2101		mutex_lock(&llss->inode1->i_mutex);
2102		rc = ll_setattr(file1->f_dentry, &llss->ia2);
2103		mutex_unlock(&llss->inode1->i_mutex);
2104	}
2105
2106	if (llss->ia1.ia_valid != 0) {
2107		int rc1;
2108
2109		mutex_lock(&llss->inode2->i_mutex);
2110		rc1 = ll_setattr(file2->f_dentry, &llss->ia1);
2111		mutex_unlock(&llss->inode2->i_mutex);
2112		if (rc == 0)
2113			rc = rc1;
2114	}
2115
2116free:
2117	if (llss != NULL)
2118		OBD_FREE_PTR(llss);
2119
2120	return rc;
2121}
2122
2123static int ll_hsm_state_set(struct inode *inode, struct hsm_state_set *hss)
2124{
2125	struct md_op_data	*op_data;
2126	int			 rc;
2127
2128	/* Non-root users are forbidden to set or clear flags which are
2129	 * NOT defined in HSM_USER_MASK. */
2130	if (((hss->hss_setmask | hss->hss_clearmask) & ~HSM_USER_MASK) &&
2131	    !capable(CFS_CAP_SYS_ADMIN))
2132		return -EPERM;
2133
2134	op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2135				     LUSTRE_OPC_ANY, hss);
2136	if (IS_ERR(op_data))
2137		return PTR_ERR(op_data);
2138
2139	rc = obd_iocontrol(LL_IOC_HSM_STATE_SET, ll_i2mdexp(inode),
2140			   sizeof(*op_data), op_data, NULL);
2141
2142	ll_finish_md_op_data(op_data);
2143
2144	return rc;
2145}
2146
2147static int ll_hsm_import(struct inode *inode, struct file *file,
2148			 struct hsm_user_import *hui)
2149{
2150	struct hsm_state_set	*hss = NULL;
2151	struct iattr		*attr = NULL;
2152	int			 rc;
2153
2154
2155	if (!S_ISREG(inode->i_mode))
2156		return -EINVAL;
2157
2158	/* set HSM flags */
2159	OBD_ALLOC_PTR(hss);
2160	if (hss == NULL)
2161		GOTO(out, rc = -ENOMEM);
2162
2163	hss->hss_valid = HSS_SETMASK | HSS_ARCHIVE_ID;
2164	hss->hss_archive_id = hui->hui_archive_id;
2165	hss->hss_setmask = HS_ARCHIVED | HS_EXISTS | HS_RELEASED;
2166	rc = ll_hsm_state_set(inode, hss);
2167	if (rc != 0)
2168		GOTO(out, rc);
2169
2170	OBD_ALLOC_PTR(attr);
2171	if (attr == NULL)
2172		GOTO(out, rc = -ENOMEM);
2173
2174	attr->ia_mode = hui->hui_mode & (S_IRWXU | S_IRWXG | S_IRWXO);
2175	attr->ia_mode |= S_IFREG;
2176	attr->ia_uid = make_kuid(&init_user_ns, hui->hui_uid);
2177	attr->ia_gid = make_kgid(&init_user_ns, hui->hui_gid);
2178	attr->ia_size = hui->hui_size;
2179	attr->ia_mtime.tv_sec = hui->hui_mtime;
2180	attr->ia_mtime.tv_nsec = hui->hui_mtime_ns;
2181	attr->ia_atime.tv_sec = hui->hui_atime;
2182	attr->ia_atime.tv_nsec = hui->hui_atime_ns;
2183
2184	attr->ia_valid = ATTR_SIZE | ATTR_MODE | ATTR_FORCE |
2185			 ATTR_UID | ATTR_GID |
2186			 ATTR_MTIME | ATTR_MTIME_SET |
2187			 ATTR_ATIME | ATTR_ATIME_SET;
2188
2189	rc = ll_setattr_raw(file->f_dentry, attr, true);
2190	if (rc == -ENODATA)
2191		rc = 0;
2192
2193out:
2194	if (hss != NULL)
2195		OBD_FREE_PTR(hss);
2196
2197	if (attr != NULL)
2198		OBD_FREE_PTR(attr);
2199
2200	return rc;
2201}
2202
2203static long
2204ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
2205{
2206	struct inode		*inode = file->f_dentry->d_inode;
2207	struct ll_file_data	*fd = LUSTRE_FPRIVATE(file);
2208	int			 flags, rc;
2209
2210	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),cmd=%x\n", inode->i_ino,
2211	       inode->i_generation, inode, cmd);
2212	ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_IOCTL, 1);
2213
2214	/* asm-ppc{,64} declares TCGETS, et. al. as type 't' not 'T' */
2215	if (_IOC_TYPE(cmd) == 'T' || _IOC_TYPE(cmd) == 't') /* tty ioctls */
2216		return -ENOTTY;
2217
2218	switch(cmd) {
2219	case LL_IOC_GETFLAGS:
2220		/* Get the current value of the file flags */
2221		return put_user(fd->fd_flags, (int *)arg);
2222	case LL_IOC_SETFLAGS:
2223	case LL_IOC_CLRFLAGS:
2224		/* Set or clear specific file flags */
2225		/* XXX This probably needs checks to ensure the flags are
2226		 *     not abused, and to handle any flag side effects.
2227		 */
2228		if (get_user(flags, (int *) arg))
2229			return -EFAULT;
2230
2231		if (cmd == LL_IOC_SETFLAGS) {
2232			if ((flags & LL_FILE_IGNORE_LOCK) &&
2233			    !(file->f_flags & O_DIRECT)) {
2234				CERROR("%s: unable to disable locking on "
2235				       "non-O_DIRECT file\n", current->comm);
2236				return -EINVAL;
2237			}
2238
2239			fd->fd_flags |= flags;
2240		} else {
2241			fd->fd_flags &= ~flags;
2242		}
2243		return 0;
2244	case LL_IOC_LOV_SETSTRIPE:
2245		return ll_lov_setstripe(inode, file, arg);
2246	case LL_IOC_LOV_SETEA:
2247		return ll_lov_setea(inode, file, arg);
2248	case LL_IOC_LOV_SWAP_LAYOUTS: {
2249		struct file *file2;
2250		struct lustre_swap_layouts lsl;
2251
2252		if (copy_from_user(&lsl, (char *)arg,
2253				       sizeof(struct lustre_swap_layouts)))
2254			return -EFAULT;
2255
2256		if ((file->f_flags & O_ACCMODE) == 0) /* O_RDONLY */
2257			return -EPERM;
2258
2259		file2 = fget(lsl.sl_fd);
2260		if (file2 == NULL)
2261			return -EBADF;
2262
2263		rc = -EPERM;
2264		if ((file2->f_flags & O_ACCMODE) != 0) /* O_WRONLY or O_RDWR */
2265			rc = ll_swap_layouts(file, file2, &lsl);
2266		fput(file2);
2267		return rc;
2268	}
2269	case LL_IOC_LOV_GETSTRIPE:
2270		return ll_lov_getstripe(inode, arg);
2271	case LL_IOC_RECREATE_OBJ:
2272		return ll_lov_recreate_obj(inode, arg);
2273	case LL_IOC_RECREATE_FID:
2274		return ll_lov_recreate_fid(inode, arg);
2275	case FSFILT_IOC_FIEMAP:
2276		return ll_ioctl_fiemap(inode, arg);
2277	case FSFILT_IOC_GETFLAGS:
2278	case FSFILT_IOC_SETFLAGS:
2279		return ll_iocontrol(inode, file, cmd, arg);
2280	case FSFILT_IOC_GETVERSION_OLD:
2281	case FSFILT_IOC_GETVERSION:
2282		return put_user(inode->i_generation, (int *)arg);
2283	case LL_IOC_GROUP_LOCK:
2284		return ll_get_grouplock(inode, file, arg);
2285	case LL_IOC_GROUP_UNLOCK:
2286		return ll_put_grouplock(inode, file, arg);
2287	case IOC_OBD_STATFS:
2288		return ll_obd_statfs(inode, (void *)arg);
2289
2290	/* We need to special case any other ioctls we want to handle,
2291	 * to send them to the MDS/OST as appropriate and to properly
2292	 * network encode the arg field.
2293	case FSFILT_IOC_SETVERSION_OLD:
2294	case FSFILT_IOC_SETVERSION:
2295	*/
2296	case LL_IOC_FLUSHCTX:
2297		return ll_flush_ctx(inode);
2298	case LL_IOC_PATH2FID: {
2299		if (copy_to_user((void *)arg, ll_inode2fid(inode),
2300				 sizeof(struct lu_fid)))
2301			return -EFAULT;
2302
2303		return 0;
2304	}
2305	case OBD_IOC_FID2PATH:
2306		return ll_fid2path(inode, (void *)arg);
2307	case LL_IOC_DATA_VERSION: {
2308		struct ioc_data_version	idv;
2309		int			rc;
2310
2311		if (copy_from_user(&idv, (char *)arg, sizeof(idv)))
2312			return -EFAULT;
2313
2314		rc = ll_data_version(inode, &idv.idv_version,
2315				!(idv.idv_flags & LL_DV_NOFLUSH));
2316
2317		if (rc == 0 && copy_to_user((char *) arg, &idv, sizeof(idv)))
2318			return -EFAULT;
2319
2320		return rc;
2321	}
2322
2323	case LL_IOC_GET_MDTIDX: {
2324		int mdtidx;
2325
2326		mdtidx = ll_get_mdt_idx(inode);
2327		if (mdtidx < 0)
2328			return mdtidx;
2329
2330		if (put_user((int)mdtidx, (int*)arg))
2331			return -EFAULT;
2332
2333		return 0;
2334	}
2335	case OBD_IOC_GETDTNAME:
2336	case OBD_IOC_GETMDNAME:
2337		return ll_get_obd_name(inode, cmd, arg);
2338	case LL_IOC_HSM_STATE_GET: {
2339		struct md_op_data	*op_data;
2340		struct hsm_user_state	*hus;
2341		int			 rc;
2342
2343		OBD_ALLOC_PTR(hus);
2344		if (hus == NULL)
2345			return -ENOMEM;
2346
2347		op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2348					     LUSTRE_OPC_ANY, hus);
2349		if (IS_ERR(op_data)) {
2350			OBD_FREE_PTR(hus);
2351			return PTR_ERR(op_data);
2352		}
2353
2354		rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2355				   op_data, NULL);
2356
2357		if (copy_to_user((void *)arg, hus, sizeof(*hus)))
2358			rc = -EFAULT;
2359
2360		ll_finish_md_op_data(op_data);
2361		OBD_FREE_PTR(hus);
2362		return rc;
2363	}
2364	case LL_IOC_HSM_STATE_SET: {
2365		struct hsm_state_set	*hss;
2366		int			 rc;
2367
2368		OBD_ALLOC_PTR(hss);
2369		if (hss == NULL)
2370			return -ENOMEM;
2371
2372		if (copy_from_user(hss, (char *)arg, sizeof(*hss))) {
2373			OBD_FREE_PTR(hss);
2374			return -EFAULT;
2375		}
2376
2377		rc = ll_hsm_state_set(inode, hss);
2378
2379		OBD_FREE_PTR(hss);
2380		return rc;
2381	}
2382	case LL_IOC_HSM_ACTION: {
2383		struct md_op_data		*op_data;
2384		struct hsm_current_action	*hca;
2385		int				 rc;
2386
2387		OBD_ALLOC_PTR(hca);
2388		if (hca == NULL)
2389			return -ENOMEM;
2390
2391		op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2392					     LUSTRE_OPC_ANY, hca);
2393		if (IS_ERR(op_data)) {
2394			OBD_FREE_PTR(hca);
2395			return PTR_ERR(op_data);
2396		}
2397
2398		rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
2399				   op_data, NULL);
2400
2401		if (copy_to_user((char *)arg, hca, sizeof(*hca)))
2402			rc = -EFAULT;
2403
2404		ll_finish_md_op_data(op_data);
2405		OBD_FREE_PTR(hca);
2406		return rc;
2407	}
2408	case LL_IOC_SET_LEASE: {
2409		struct ll_inode_info *lli = ll_i2info(inode);
2410		struct obd_client_handle *och = NULL;
2411		bool lease_broken;
2412		fmode_t mode = 0;
2413
2414		switch (arg) {
2415		case F_WRLCK:
2416			if (!(file->f_mode & FMODE_WRITE))
2417				return -EPERM;
2418			mode = FMODE_WRITE;
2419			break;
2420		case F_RDLCK:
2421			if (!(file->f_mode & FMODE_READ))
2422				return -EPERM;
2423			mode = FMODE_READ;
2424			break;
2425		case F_UNLCK:
2426			mutex_lock(&lli->lli_och_mutex);
2427			if (fd->fd_lease_och != NULL) {
2428				och = fd->fd_lease_och;
2429				fd->fd_lease_och = NULL;
2430			}
2431			mutex_unlock(&lli->lli_och_mutex);
2432
2433			if (och != NULL) {
2434				mode = och->och_flags &
2435				       (FMODE_READ|FMODE_WRITE);
2436				rc = ll_lease_close(och, inode, &lease_broken);
2437				if (rc == 0 && lease_broken)
2438					mode = 0;
2439			} else {
2440				rc = -ENOLCK;
2441			}
2442
2443			/* return the type of lease or error */
2444			return rc < 0 ? rc : (int)mode;
2445		default:
2446			return -EINVAL;
2447		}
2448
2449		CDEBUG(D_INODE, "Set lease with mode %d\n", mode);
2450
2451		/* apply for lease */
2452		och = ll_lease_open(inode, file, mode, 0);
2453		if (IS_ERR(och))
2454			return PTR_ERR(och);
2455
2456		rc = 0;
2457		mutex_lock(&lli->lli_och_mutex);
2458		if (fd->fd_lease_och == NULL) {
2459			fd->fd_lease_och = och;
2460			och = NULL;
2461		}
2462		mutex_unlock(&lli->lli_och_mutex);
2463		if (och != NULL) {
2464			/* impossible now that only excl is supported for now */
2465			ll_lease_close(och, inode, &lease_broken);
2466			rc = -EBUSY;
2467		}
2468		return rc;
2469	}
2470	case LL_IOC_GET_LEASE: {
2471		struct ll_inode_info *lli = ll_i2info(inode);
2472		struct ldlm_lock *lock = NULL;
2473
2474		rc = 0;
2475		mutex_lock(&lli->lli_och_mutex);
2476		if (fd->fd_lease_och != NULL) {
2477			struct obd_client_handle *och = fd->fd_lease_och;
2478
2479			lock = ldlm_handle2lock(&och->och_lease_handle);
2480			if (lock != NULL) {
2481				lock_res_and_lock(lock);
2482				if (!ldlm_is_cancel(lock))
2483					rc = och->och_flags &
2484						(FMODE_READ | FMODE_WRITE);
2485				unlock_res_and_lock(lock);
2486				ldlm_lock_put(lock);
2487			}
2488		}
2489		mutex_unlock(&lli->lli_och_mutex);
2490		return rc;
2491	}
2492	case LL_IOC_HSM_IMPORT: {
2493		struct hsm_user_import *hui;
2494
2495		OBD_ALLOC_PTR(hui);
2496		if (hui == NULL)
2497			return -ENOMEM;
2498
2499		if (copy_from_user(hui, (void *)arg, sizeof(*hui))) {
2500			OBD_FREE_PTR(hui);
2501			return -EFAULT;
2502		}
2503
2504		rc = ll_hsm_import(inode, file, hui);
2505
2506		OBD_FREE_PTR(hui);
2507		return rc;
2508	}
2509	default: {
2510		int err;
2511
2512		if (LLIOC_STOP ==
2513		     ll_iocontrol_call(inode, file, cmd, arg, &err))
2514			return err;
2515
2516		return obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
2517				     (void *)arg);
2518	}
2519	}
2520}
2521
2522
2523static loff_t ll_file_seek(struct file *file, loff_t offset, int origin)
2524{
2525	struct inode *inode = file->f_dentry->d_inode;
2526	loff_t retval, eof = 0;
2527
2528	retval = offset + ((origin == SEEK_END) ? i_size_read(inode) :
2529			   (origin == SEEK_CUR) ? file->f_pos : 0);
2530	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), to=%llu=%#llx(%d)\n",
2531	       inode->i_ino, inode->i_generation, inode, retval, retval,
2532	       origin);
2533	ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_LLSEEK, 1);
2534
2535	if (origin == SEEK_END || origin == SEEK_HOLE || origin == SEEK_DATA) {
2536		retval = ll_glimpse_size(inode);
2537		if (retval != 0)
2538			return retval;
2539		eof = i_size_read(inode);
2540	}
2541
2542	retval = generic_file_llseek_size(file, offset, origin,
2543					  ll_file_maxbytes(inode), eof);
2544	return retval;
2545}
2546
2547static int ll_flush(struct file *file, fl_owner_t id)
2548{
2549	struct inode *inode = file->f_dentry->d_inode;
2550	struct ll_inode_info *lli = ll_i2info(inode);
2551	struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2552	int rc, err;
2553
2554	LASSERT(!S_ISDIR(inode->i_mode));
2555
2556	/* catch async errors that were recorded back when async writeback
2557	 * failed for pages in this mapping. */
2558	rc = lli->lli_async_rc;
2559	lli->lli_async_rc = 0;
2560	err = lov_read_and_clear_async_rc(lli->lli_clob);
2561	if (rc == 0)
2562		rc = err;
2563
2564	/* The application has been told write failure already.
2565	 * Do not report failure again. */
2566	if (fd->fd_write_failed)
2567		return 0;
2568	return rc ? -EIO : 0;
2569}
2570
2571/**
2572 * Called to make sure a portion of file has been written out.
2573 * if @mode is not CL_FSYNC_LOCAL, it will send OST_SYNC RPCs to OST.
2574 *
2575 * Return how many pages have been written.
2576 */
2577int cl_sync_file_range(struct inode *inode, loff_t start, loff_t end,
2578		       enum cl_fsync_mode mode, int ignore_layout)
2579{
2580	struct cl_env_nest nest;
2581	struct lu_env *env;
2582	struct cl_io *io;
2583	struct obd_capa *capa = NULL;
2584	struct cl_fsync_io *fio;
2585	int result;
2586
2587	if (mode != CL_FSYNC_NONE && mode != CL_FSYNC_LOCAL &&
2588	    mode != CL_FSYNC_DISCARD && mode != CL_FSYNC_ALL)
2589		return -EINVAL;
2590
2591	env = cl_env_nested_get(&nest);
2592	if (IS_ERR(env))
2593		return PTR_ERR(env);
2594
2595	capa = ll_osscapa_get(inode, CAPA_OPC_OSS_WRITE);
2596
2597	io = ccc_env_thread_io(env);
2598	io->ci_obj = cl_i2info(inode)->lli_clob;
2599	io->ci_ignore_layout = ignore_layout;
2600
2601	/* initialize parameters for sync */
2602	fio = &io->u.ci_fsync;
2603	fio->fi_capa = capa;
2604	fio->fi_start = start;
2605	fio->fi_end = end;
2606	fio->fi_fid = ll_inode2fid(inode);
2607	fio->fi_mode = mode;
2608	fio->fi_nr_written = 0;
2609
2610	if (cl_io_init(env, io, CIT_FSYNC, io->ci_obj) == 0)
2611		result = cl_io_loop(env, io);
2612	else
2613		result = io->ci_result;
2614	if (result == 0)
2615		result = fio->fi_nr_written;
2616	cl_io_fini(env, io);
2617	cl_env_nested_put(&nest, env);
2618
2619	capa_put(capa);
2620
2621	return result;
2622}
2623
2624/*
2625 * When dentry is provided (the 'else' case), *file->f_dentry may be
2626 * null and dentry must be used directly rather than pulled from
2627 * *file->f_dentry as is done otherwise.
2628 */
2629
2630int ll_fsync(struct file *file, loff_t start, loff_t end, int datasync)
2631{
2632	struct dentry *dentry = file->f_dentry;
2633	struct inode *inode = dentry->d_inode;
2634	struct ll_inode_info *lli = ll_i2info(inode);
2635	struct ptlrpc_request *req;
2636	struct obd_capa *oc;
2637	int rc, err;
2638
2639	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
2640	       inode->i_generation, inode);
2641	ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
2642
2643	rc = filemap_write_and_wait_range(inode->i_mapping, start, end);
2644	mutex_lock(&inode->i_mutex);
2645
2646	/* catch async errors that were recorded back when async writeback
2647	 * failed for pages in this mapping. */
2648	if (!S_ISDIR(inode->i_mode)) {
2649		err = lli->lli_async_rc;
2650		lli->lli_async_rc = 0;
2651		if (rc == 0)
2652			rc = err;
2653		err = lov_read_and_clear_async_rc(lli->lli_clob);
2654		if (rc == 0)
2655			rc = err;
2656	}
2657
2658	oc = ll_mdscapa_get(inode);
2659	err = md_sync(ll_i2sbi(inode)->ll_md_exp, ll_inode2fid(inode), oc,
2660		      &req);
2661	capa_put(oc);
2662	if (!rc)
2663		rc = err;
2664	if (!err)
2665		ptlrpc_req_finished(req);
2666
2667	if (S_ISREG(inode->i_mode)) {
2668		struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
2669
2670		err = cl_sync_file_range(inode, start, end, CL_FSYNC_ALL, 0);
2671		if (rc == 0 && err < 0)
2672			rc = err;
2673		if (rc < 0)
2674			fd->fd_write_failed = true;
2675		else
2676			fd->fd_write_failed = false;
2677	}
2678
2679	mutex_unlock(&inode->i_mutex);
2680	return rc;
2681}
2682
2683static int
2684ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
2685{
2686	struct inode *inode = file->f_dentry->d_inode;
2687	struct ll_sb_info *sbi = ll_i2sbi(inode);
2688	struct ldlm_enqueue_info einfo = {
2689		.ei_type	= LDLM_FLOCK,
2690		.ei_cb_cp	= ldlm_flock_completion_ast,
2691		.ei_cbdata	= file_lock,
2692	};
2693	struct md_op_data *op_data;
2694	struct lustre_handle lockh = {0};
2695	ldlm_policy_data_t flock = {{0}};
2696	int flags = 0;
2697	int rc;
2698	int rc2 = 0;
2699
2700	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
2701	       inode->i_ino, file_lock);
2702
2703	ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FLOCK, 1);
2704
2705	if (file_lock->fl_flags & FL_FLOCK) {
2706		LASSERT((cmd == F_SETLKW) || (cmd == F_SETLK));
2707		/* flocks are whole-file locks */
2708		flock.l_flock.end = OFFSET_MAX;
2709		/* For flocks owner is determined by the local file descriptor*/
2710		flock.l_flock.owner = (unsigned long)file_lock->fl_file;
2711	} else if (file_lock->fl_flags & FL_POSIX) {
2712		flock.l_flock.owner = (unsigned long)file_lock->fl_owner;
2713		flock.l_flock.start = file_lock->fl_start;
2714		flock.l_flock.end = file_lock->fl_end;
2715	} else {
2716		return -EINVAL;
2717	}
2718	flock.l_flock.pid = file_lock->fl_pid;
2719
2720	/* Somewhat ugly workaround for svc lockd.
2721	 * lockd installs custom fl_lmops->lm_compare_owner that checks
2722	 * for the fl_owner to be the same (which it always is on local node
2723	 * I guess between lockd processes) and then compares pid.
2724	 * As such we assign pid to the owner field to make it all work,
2725	 * conflict with normal locks is unlikely since pid space and
2726	 * pointer space for current->files are not intersecting */
2727	if (file_lock->fl_lmops && file_lock->fl_lmops->lm_compare_owner)
2728		flock.l_flock.owner = (unsigned long)file_lock->fl_pid;
2729
2730	switch (file_lock->fl_type) {
2731	case F_RDLCK:
2732		einfo.ei_mode = LCK_PR;
2733		break;
2734	case F_UNLCK:
2735		/* An unlock request may or may not have any relation to
2736		 * existing locks so we may not be able to pass a lock handle
2737		 * via a normal ldlm_lock_cancel() request. The request may even
2738		 * unlock a byte range in the middle of an existing lock. In
2739		 * order to process an unlock request we need all of the same
2740		 * information that is given with a normal read or write record
2741		 * lock request. To avoid creating another ldlm unlock (cancel)
2742		 * message we'll treat a LCK_NL flock request as an unlock. */
2743		einfo.ei_mode = LCK_NL;
2744		break;
2745	case F_WRLCK:
2746		einfo.ei_mode = LCK_PW;
2747		break;
2748	default:
2749		CDEBUG(D_INFO, "Unknown fcntl lock type: %d\n",
2750			file_lock->fl_type);
2751		return -ENOTSUPP;
2752	}
2753
2754	switch (cmd) {
2755	case F_SETLKW:
2756#ifdef F_SETLKW64
2757	case F_SETLKW64:
2758#endif
2759		flags = 0;
2760		break;
2761	case F_SETLK:
2762#ifdef F_SETLK64
2763	case F_SETLK64:
2764#endif
2765		flags = LDLM_FL_BLOCK_NOWAIT;
2766		break;
2767	case F_GETLK:
2768#ifdef F_GETLK64
2769	case F_GETLK64:
2770#endif
2771		flags = LDLM_FL_TEST_LOCK;
2772		/* Save the old mode so that if the mode in the lock changes we
2773		 * can decrement the appropriate reader or writer refcount. */
2774		file_lock->fl_type = einfo.ei_mode;
2775		break;
2776	default:
2777		CERROR("unknown fcntl lock command: %d\n", cmd);
2778		return -EINVAL;
2779	}
2780
2781	op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
2782				     LUSTRE_OPC_ANY, NULL);
2783	if (IS_ERR(op_data))
2784		return PTR_ERR(op_data);
2785
2786	CDEBUG(D_DLMTRACE, "inode=%lu, pid=%u, flags=%#x, mode=%u, "
2787	       "start="LPU64", end="LPU64"\n", inode->i_ino, flock.l_flock.pid,
2788	       flags, einfo.ei_mode, flock.l_flock.start, flock.l_flock.end);
2789
2790	rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2791			op_data, &lockh, &flock, 0, NULL /* req */, flags);
2792
2793	if ((file_lock->fl_flags & FL_FLOCK) &&
2794	    (rc == 0 || file_lock->fl_type == F_UNLCK))
2795		rc2  = flock_lock_file_wait(file, file_lock);
2796	if ((file_lock->fl_flags & FL_POSIX) &&
2797	    (rc == 0 || file_lock->fl_type == F_UNLCK) &&
2798	    !(flags & LDLM_FL_TEST_LOCK))
2799		rc2  = posix_lock_file_wait(file, file_lock);
2800
2801	if (rc2 && file_lock->fl_type != F_UNLCK) {
2802		einfo.ei_mode = LCK_NL;
2803		md_enqueue(sbi->ll_md_exp, &einfo, NULL,
2804			op_data, &lockh, &flock, 0, NULL /* req */, flags);
2805		rc = rc2;
2806	}
2807
2808	ll_finish_md_op_data(op_data);
2809
2810	return rc;
2811}
2812
2813static int
2814ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
2815{
2816	return -ENOSYS;
2817}
2818
2819/**
2820 * test if some locks matching bits and l_req_mode are acquired
2821 * - bits can be in different locks
2822 * - if found clear the common lock bits in *bits
2823 * - the bits not found, are kept in *bits
2824 * \param inode [IN]
2825 * \param bits [IN] searched lock bits [IN]
2826 * \param l_req_mode [IN] searched lock mode
2827 * \retval boolean, true iff all bits are found
2828 */
2829int ll_have_md_lock(struct inode *inode, __u64 *bits,  ldlm_mode_t l_req_mode)
2830{
2831	struct lustre_handle lockh;
2832	ldlm_policy_data_t policy;
2833	ldlm_mode_t mode = (l_req_mode == LCK_MINMODE) ?
2834				(LCK_CR|LCK_CW|LCK_PR|LCK_PW) : l_req_mode;
2835	struct lu_fid *fid;
2836	__u64 flags;
2837	int i;
2838
2839	if (!inode)
2840	       return 0;
2841
2842	fid = &ll_i2info(inode)->lli_fid;
2843	CDEBUG(D_INFO, "trying to match res "DFID" mode %s\n", PFID(fid),
2844	       ldlm_lockname[mode]);
2845
2846	flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING | LDLM_FL_TEST_LOCK;
2847	for (i = 0; i <= MDS_INODELOCK_MAXSHIFT && *bits != 0; i++) {
2848		policy.l_inodebits.bits = *bits & (1 << i);
2849		if (policy.l_inodebits.bits == 0)
2850			continue;
2851
2852		if (md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS,
2853				  &policy, mode, &lockh)) {
2854			struct ldlm_lock *lock;
2855
2856			lock = ldlm_handle2lock(&lockh);
2857			if (lock) {
2858				*bits &=
2859				      ~(lock->l_policy_data.l_inodebits.bits);
2860				LDLM_LOCK_PUT(lock);
2861			} else {
2862				*bits &= ~policy.l_inodebits.bits;
2863			}
2864		}
2865	}
2866	return *bits == 0;
2867}
2868
2869ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
2870			    struct lustre_handle *lockh, __u64 flags,
2871			    ldlm_mode_t mode)
2872{
2873	ldlm_policy_data_t policy = { .l_inodebits = {bits}};
2874	struct lu_fid *fid;
2875	ldlm_mode_t rc;
2876
2877	fid = &ll_i2info(inode)->lli_fid;
2878	CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
2879
2880	rc = md_lock_match(ll_i2mdexp(inode), LDLM_FL_BLOCK_GRANTED|flags,
2881			   fid, LDLM_IBITS, &policy, mode, lockh);
2882
2883	return rc;
2884}
2885
2886static int ll_inode_revalidate_fini(struct inode *inode, int rc)
2887{
2888	/* Already unlinked. Just update nlink and return success */
2889	if (rc == -ENOENT) {
2890		clear_nlink(inode);
2891		/* This path cannot be hit for regular files unless in
2892		 * case of obscure races, so no need to validate size.
2893		 */
2894		if (!S_ISREG(inode->i_mode) && !S_ISDIR(inode->i_mode))
2895			return 0;
2896	} else if (rc != 0) {
2897		CDEBUG_LIMIT((rc == -EACCES || rc == -EIDRM) ? D_INFO : D_ERROR,
2898			     "%s: revalidate FID "DFID" error: rc = %d\n",
2899			     ll_get_fsname(inode->i_sb, NULL, 0),
2900			     PFID(ll_inode2fid(inode)), rc);
2901	}
2902
2903	return rc;
2904}
2905
2906static int __ll_inode_revalidate(struct dentry *dentry, __u64 ibits)
2907{
2908	struct inode *inode = dentry->d_inode;
2909	struct ptlrpc_request *req = NULL;
2910	struct obd_export *exp;
2911	int rc = 0;
2912
2913	LASSERT(inode != NULL);
2914
2915	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),name=%s\n",
2916	       inode->i_ino, inode->i_generation, inode, dentry->d_name.name);
2917
2918	exp = ll_i2mdexp(inode);
2919
2920	/* XXX: Enable OBD_CONNECT_ATTRFID to reduce unnecessary getattr RPC.
2921	 *      But under CMD case, it caused some lock issues, should be fixed
2922	 *      with new CMD ibits lock. See bug 12718 */
2923	if (exp_connect_flags(exp) & OBD_CONNECT_ATTRFID) {
2924		struct lookup_intent oit = { .it_op = IT_GETATTR };
2925		struct md_op_data *op_data;
2926
2927		if (ibits == MDS_INODELOCK_LOOKUP)
2928			oit.it_op = IT_LOOKUP;
2929
2930		/* Call getattr by fid, so do not provide name at all. */
2931		op_data = ll_prep_md_op_data(NULL, dentry->d_inode,
2932					     dentry->d_inode, NULL, 0, 0,
2933					     LUSTRE_OPC_ANY, NULL);
2934		if (IS_ERR(op_data))
2935			return PTR_ERR(op_data);
2936
2937		oit.it_create_mode |= M_CHECK_STALE;
2938		rc = md_intent_lock(exp, op_data, NULL, 0,
2939				    /* we are not interested in name
2940				       based lookup */
2941				    &oit, 0, &req,
2942				    ll_md_blocking_ast, 0);
2943		ll_finish_md_op_data(op_data);
2944		oit.it_create_mode &= ~M_CHECK_STALE;
2945		if (rc < 0) {
2946			rc = ll_inode_revalidate_fini(inode, rc);
2947			GOTO (out, rc);
2948		}
2949
2950		rc = ll_revalidate_it_finish(req, &oit, dentry);
2951		if (rc != 0) {
2952			ll_intent_release(&oit);
2953			GOTO(out, rc);
2954		}
2955
2956		/* Unlinked? Unhash dentry, so it is not picked up later by
2957		   do_lookup() -> ll_revalidate_it(). We cannot use d_drop
2958		   here to preserve get_cwd functionality on 2.6.
2959		   Bug 10503 */
2960		if (!dentry->d_inode->i_nlink)
2961			d_lustre_invalidate(dentry, 0);
2962
2963		ll_lookup_finish_locks(&oit, dentry);
2964	} else if (!ll_have_md_lock(dentry->d_inode, &ibits, LCK_MINMODE)) {
2965		struct ll_sb_info *sbi = ll_i2sbi(dentry->d_inode);
2966		obd_valid valid = OBD_MD_FLGETATTR;
2967		struct md_op_data *op_data;
2968		int ealen = 0;
2969
2970		if (S_ISREG(inode->i_mode)) {
2971			rc = ll_get_default_mdsize(sbi, &ealen);
2972			if (rc)
2973				return rc;
2974			valid |= OBD_MD_FLEASIZE | OBD_MD_FLMODEASIZE;
2975		}
2976
2977		op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL,
2978					     0, ealen, LUSTRE_OPC_ANY,
2979					     NULL);
2980		if (IS_ERR(op_data))
2981			return PTR_ERR(op_data);
2982
2983		op_data->op_valid = valid;
2984		/* Once OBD_CONNECT_ATTRFID is not supported, we can't find one
2985		 * capa for this inode. Because we only keep capas of dirs
2986		 * fresh. */
2987		rc = md_getattr(sbi->ll_md_exp, op_data, &req);
2988		ll_finish_md_op_data(op_data);
2989		if (rc) {
2990			rc = ll_inode_revalidate_fini(inode, rc);
2991			return rc;
2992		}
2993
2994		rc = ll_prep_inode(&inode, req, NULL, NULL);
2995	}
2996out:
2997	ptlrpc_req_finished(req);
2998	return rc;
2999}
3000
3001static int ll_inode_revalidate(struct dentry *dentry, __u64 ibits)
3002{
3003	struct inode *inode = dentry->d_inode;
3004	int rc;
3005
3006	rc = __ll_inode_revalidate(dentry, ibits);
3007	if (rc != 0)
3008		return rc;
3009
3010	/* if object isn't regular file, don't validate size */
3011	if (!S_ISREG(inode->i_mode)) {
3012		LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime;
3013		LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
3014		LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
3015	} else {
3016		/* In case of restore, the MDT has the right size and has
3017		 * already send it back without granting the layout lock,
3018		 * inode is up-to-date so glimpse is useless.
3019		 * Also to glimpse we need the layout, in case of a running
3020		 * restore the MDT holds the layout lock so the glimpse will
3021		 * block up to the end of restore (getattr will block)
3022		 */
3023		if (!(ll_i2info(inode)->lli_flags & LLIF_FILE_RESTORING))
3024			rc = ll_glimpse_size(inode);
3025	}
3026	return rc;
3027}
3028
3029int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat)
3030{
3031	struct inode *inode = de->d_inode;
3032	struct ll_sb_info *sbi = ll_i2sbi(inode);
3033	struct ll_inode_info *lli = ll_i2info(inode);
3034	int res = 0;
3035
3036	res = ll_inode_revalidate(de, MDS_INODELOCK_UPDATE |
3037				      MDS_INODELOCK_LOOKUP);
3038	ll_stats_ops_tally(sbi, LPROC_LL_GETATTR, 1);
3039
3040	if (res)
3041		return res;
3042
3043	stat->dev = inode->i_sb->s_dev;
3044	if (ll_need_32bit_api(sbi))
3045		stat->ino = cl_fid_build_ino(&lli->lli_fid, 1);
3046	else
3047		stat->ino = inode->i_ino;
3048	stat->mode = inode->i_mode;
3049	stat->nlink = inode->i_nlink;
3050	stat->uid = inode->i_uid;
3051	stat->gid = inode->i_gid;
3052	stat->rdev = inode->i_rdev;
3053	stat->atime = inode->i_atime;
3054	stat->mtime = inode->i_mtime;
3055	stat->ctime = inode->i_ctime;
3056	stat->blksize = 1 << inode->i_blkbits;
3057
3058	stat->size = i_size_read(inode);
3059	stat->blocks = inode->i_blocks;
3060
3061	return 0;
3062}
3063
3064static int ll_fiemap(struct inode *inode, struct fiemap_extent_info *fieinfo,
3065		     __u64 start, __u64 len)
3066{
3067	int rc;
3068	size_t num_bytes;
3069	struct ll_user_fiemap *fiemap;
3070	unsigned int extent_count = fieinfo->fi_extents_max;
3071
3072	num_bytes = sizeof(*fiemap) + (extent_count *
3073				       sizeof(struct ll_fiemap_extent));
3074	OBD_ALLOC_LARGE(fiemap, num_bytes);
3075
3076	if (fiemap == NULL)
3077		return -ENOMEM;
3078
3079	fiemap->fm_flags = fieinfo->fi_flags;
3080	fiemap->fm_extent_count = fieinfo->fi_extents_max;
3081	fiemap->fm_start = start;
3082	fiemap->fm_length = len;
3083	if (extent_count > 0)
3084		memcpy(&fiemap->fm_extents[0], fieinfo->fi_extents_start,
3085		       sizeof(struct ll_fiemap_extent));
3086
3087	rc = ll_do_fiemap(inode, fiemap, num_bytes);
3088
3089	fieinfo->fi_flags = fiemap->fm_flags;
3090	fieinfo->fi_extents_mapped = fiemap->fm_mapped_extents;
3091	if (extent_count > 0)
3092		memcpy(fieinfo->fi_extents_start, &fiemap->fm_extents[0],
3093		       fiemap->fm_mapped_extents *
3094		       sizeof(struct ll_fiemap_extent));
3095
3096	OBD_FREE_LARGE(fiemap, num_bytes);
3097	return rc;
3098}
3099
3100struct posix_acl *ll_get_acl(struct inode *inode, int type)
3101{
3102	struct ll_inode_info *lli = ll_i2info(inode);
3103	struct posix_acl *acl = NULL;
3104
3105	spin_lock(&lli->lli_lock);
3106	/* VFS' acl_permission_check->check_acl will release the refcount */
3107	acl = posix_acl_dup(lli->lli_posix_acl);
3108	spin_unlock(&lli->lli_lock);
3109
3110	return acl;
3111}
3112
3113
3114int ll_inode_permission(struct inode *inode, int mask)
3115{
3116	int rc = 0;
3117
3118#ifdef MAY_NOT_BLOCK
3119	if (mask & MAY_NOT_BLOCK)
3120		return -ECHILD;
3121#endif
3122
3123       /* as root inode are NOT getting validated in lookup operation,
3124	* need to do it before permission check. */
3125
3126	if (inode == inode->i_sb->s_root->d_inode) {
3127		rc = __ll_inode_revalidate(inode->i_sb->s_root,
3128					   MDS_INODELOCK_LOOKUP);
3129		if (rc)
3130			return rc;
3131	}
3132
3133	CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), inode mode %x mask %o\n",
3134	       inode->i_ino, inode->i_generation, inode, inode->i_mode, mask);
3135
3136	if (ll_i2sbi(inode)->ll_flags & LL_SBI_RMT_CLIENT)
3137		return lustre_check_remote_perm(inode, mask);
3138
3139	ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_INODE_PERM, 1);
3140	rc = generic_permission(inode, mask);
3141
3142	return rc;
3143}
3144
3145/* -o localflock - only provides locally consistent flock locks */
3146struct file_operations ll_file_operations = {
3147	.read	   = ll_file_read,
3148	.aio_read = ll_file_aio_read,
3149	.write	  = ll_file_write,
3150	.aio_write = ll_file_aio_write,
3151	.unlocked_ioctl = ll_file_ioctl,
3152	.open	   = ll_file_open,
3153	.release	= ll_file_release,
3154	.mmap	   = ll_file_mmap,
3155	.llseek	 = ll_file_seek,
3156	.splice_read    = ll_file_splice_read,
3157	.fsync	  = ll_fsync,
3158	.flush	  = ll_flush
3159};
3160
3161struct file_operations ll_file_operations_flock = {
3162	.read	   = ll_file_read,
3163	.aio_read    = ll_file_aio_read,
3164	.write	  = ll_file_write,
3165	.aio_write   = ll_file_aio_write,
3166	.unlocked_ioctl = ll_file_ioctl,
3167	.open	   = ll_file_open,
3168	.release	= ll_file_release,
3169	.mmap	   = ll_file_mmap,
3170	.llseek	 = ll_file_seek,
3171	.splice_read    = ll_file_splice_read,
3172	.fsync	  = ll_fsync,
3173	.flush	  = ll_flush,
3174	.flock	  = ll_file_flock,
3175	.lock	   = ll_file_flock
3176};
3177
3178/* These are for -o noflock - to return ENOSYS on flock calls */
3179struct file_operations ll_file_operations_noflock = {
3180	.read	   = ll_file_read,
3181	.aio_read    = ll_file_aio_read,
3182	.write	  = ll_file_write,
3183	.aio_write   = ll_file_aio_write,
3184	.unlocked_ioctl = ll_file_ioctl,
3185	.open	   = ll_file_open,
3186	.release	= ll_file_release,
3187	.mmap	   = ll_file_mmap,
3188	.llseek	 = ll_file_seek,
3189	.splice_read    = ll_file_splice_read,
3190	.fsync	  = ll_fsync,
3191	.flush	  = ll_flush,
3192	.flock	  = ll_file_noflock,
3193	.lock	   = ll_file_noflock
3194};
3195
3196struct inode_operations ll_file_inode_operations = {
3197	.setattr	= ll_setattr,
3198	.getattr	= ll_getattr,
3199	.permission	= ll_inode_permission,
3200	.setxattr	= ll_setxattr,
3201	.getxattr	= ll_getxattr,
3202	.listxattr	= ll_listxattr,
3203	.removexattr	= ll_removexattr,
3204	.fiemap		= ll_fiemap,
3205	.get_acl	= ll_get_acl,
3206};
3207
3208/* dynamic ioctl number support routines */
3209static struct llioc_ctl_data {
3210	struct rw_semaphore	ioc_sem;
3211	struct list_head	      ioc_head;
3212} llioc = {
3213	__RWSEM_INITIALIZER(llioc.ioc_sem),
3214	LIST_HEAD_INIT(llioc.ioc_head)
3215};
3216
3217
3218struct llioc_data {
3219	struct list_head	      iocd_list;
3220	unsigned int	    iocd_size;
3221	llioc_callback_t	iocd_cb;
3222	unsigned int	    iocd_count;
3223	unsigned int	    iocd_cmd[0];
3224};
3225
3226void *ll_iocontrol_register(llioc_callback_t cb, int count, unsigned int *cmd)
3227{
3228	unsigned int size;
3229	struct llioc_data *in_data = NULL;
3230
3231	if (cb == NULL || cmd == NULL ||
3232	    count > LLIOC_MAX_CMD || count < 0)
3233		return NULL;
3234
3235	size = sizeof(*in_data) + count * sizeof(unsigned int);
3236	OBD_ALLOC(in_data, size);
3237	if (in_data == NULL)
3238		return NULL;
3239
3240	memset(in_data, 0, sizeof(*in_data));
3241	in_data->iocd_size = size;
3242	in_data->iocd_cb = cb;
3243	in_data->iocd_count = count;
3244	memcpy(in_data->iocd_cmd, cmd, sizeof(unsigned int) * count);
3245
3246	down_write(&llioc.ioc_sem);
3247	list_add_tail(&in_data->iocd_list, &llioc.ioc_head);
3248	up_write(&llioc.ioc_sem);
3249
3250	return in_data;
3251}
3252
3253void ll_iocontrol_unregister(void *magic)
3254{
3255	struct llioc_data *tmp;
3256
3257	if (magic == NULL)
3258		return;
3259
3260	down_write(&llioc.ioc_sem);
3261	list_for_each_entry(tmp, &llioc.ioc_head, iocd_list) {
3262		if (tmp == magic) {
3263			unsigned int size = tmp->iocd_size;
3264
3265			list_del(&tmp->iocd_list);
3266			up_write(&llioc.ioc_sem);
3267
3268			OBD_FREE(tmp, size);
3269			return;
3270		}
3271	}
3272	up_write(&llioc.ioc_sem);
3273
3274	CWARN("didn't find iocontrol register block with magic: %p\n", magic);
3275}
3276
3277EXPORT_SYMBOL(ll_iocontrol_register);
3278EXPORT_SYMBOL(ll_iocontrol_unregister);
3279
3280static enum llioc_iter
3281ll_iocontrol_call(struct inode *inode, struct file *file,
3282		  unsigned int cmd, unsigned long arg, int *rcp)
3283{
3284	enum llioc_iter ret = LLIOC_CONT;
3285	struct llioc_data *data;
3286	int rc = -EINVAL, i;
3287
3288	down_read(&llioc.ioc_sem);
3289	list_for_each_entry(data, &llioc.ioc_head, iocd_list) {
3290		for (i = 0; i < data->iocd_count; i++) {
3291			if (cmd != data->iocd_cmd[i])
3292				continue;
3293
3294			ret = data->iocd_cb(inode, file, cmd, arg, data, &rc);
3295			break;
3296		}
3297
3298		if (ret == LLIOC_STOP)
3299			break;
3300	}
3301	up_read(&llioc.ioc_sem);
3302
3303	if (rcp)
3304		*rcp = rc;
3305	return ret;
3306}
3307
3308int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf)
3309{
3310	struct ll_inode_info *lli = ll_i2info(inode);
3311	struct cl_env_nest nest;
3312	struct lu_env *env;
3313	int result;
3314
3315	if (lli->lli_clob == NULL)
3316		return 0;
3317
3318	env = cl_env_nested_get(&nest);
3319	if (IS_ERR(env))
3320		return PTR_ERR(env);
3321
3322	result = cl_conf_set(env, lli->lli_clob, conf);
3323	cl_env_nested_put(&nest, env);
3324
3325	if (conf->coc_opc == OBJECT_CONF_SET) {
3326		struct ldlm_lock *lock = conf->coc_lock;
3327
3328		LASSERT(lock != NULL);
3329		LASSERT(ldlm_has_layout(lock));
3330		if (result == 0) {
3331			/* it can only be allowed to match after layout is
3332			 * applied to inode otherwise false layout would be
3333			 * seen. Applying layout should happen before dropping
3334			 * the intent lock. */
3335			ldlm_lock_allow_match(lock);
3336		}
3337	}
3338	return result;
3339}
3340
3341/* Fetch layout from MDT with getxattr request, if it's not ready yet */
3342static int ll_layout_fetch(struct inode *inode, struct ldlm_lock *lock)
3343
3344{
3345	struct ll_sb_info *sbi = ll_i2sbi(inode);
3346	struct obd_capa *oc;
3347	struct ptlrpc_request *req;
3348	struct mdt_body *body;
3349	void *lvbdata;
3350	void *lmm;
3351	int lmmsize;
3352	int rc;
3353
3354	CDEBUG(D_INODE, DFID" LVB_READY=%d l_lvb_data=%p l_lvb_len=%d\n",
3355	       PFID(ll_inode2fid(inode)), !!(lock->l_flags & LDLM_FL_LVB_READY),
3356	       lock->l_lvb_data, lock->l_lvb_len);
3357
3358	if ((lock->l_lvb_data != NULL) && (lock->l_flags & LDLM_FL_LVB_READY))
3359		return 0;
3360
3361	/* if layout lock was granted right away, the layout is returned
3362	 * within DLM_LVB of dlm reply; otherwise if the lock was ever
3363	 * blocked and then granted via completion ast, we have to fetch
3364	 * layout here. Please note that we can't use the LVB buffer in
3365	 * completion AST because it doesn't have a large enough buffer */
3366	oc = ll_mdscapa_get(inode);
3367	rc = ll_get_default_mdsize(sbi, &lmmsize);
3368	if (rc == 0)
3369		rc = md_getxattr(sbi->ll_md_exp, ll_inode2fid(inode), oc,
3370				OBD_MD_FLXATTR, XATTR_NAME_LOV, NULL, 0,
3371				lmmsize, 0, &req);
3372	capa_put(oc);
3373	if (rc < 0)
3374		return rc;
3375
3376	body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
3377	if (body == NULL)
3378		GOTO(out, rc = -EPROTO);
3379
3380	lmmsize = body->eadatasize;
3381	if (lmmsize == 0) /* empty layout */
3382		GOTO(out, rc = 0);
3383
3384	lmm = req_capsule_server_sized_get(&req->rq_pill, &RMF_EADATA, lmmsize);
3385	if (lmm == NULL)
3386		GOTO(out, rc = -EFAULT);
3387
3388	OBD_ALLOC_LARGE(lvbdata, lmmsize);
3389	if (lvbdata == NULL)
3390		GOTO(out, rc = -ENOMEM);
3391
3392	memcpy(lvbdata, lmm, lmmsize);
3393	lock_res_and_lock(lock);
3394	if (lock->l_lvb_data != NULL)
3395		OBD_FREE_LARGE(lock->l_lvb_data, lock->l_lvb_len);
3396
3397	lock->l_lvb_data = lvbdata;
3398	lock->l_lvb_len = lmmsize;
3399	unlock_res_and_lock(lock);
3400
3401out:
3402	ptlrpc_req_finished(req);
3403	return rc;
3404}
3405
3406/**
3407 * Apply the layout to the inode. Layout lock is held and will be released
3408 * in this function.
3409 */
3410static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode,
3411				struct inode *inode, __u32 *gen, bool reconf)
3412{
3413	struct ll_inode_info *lli = ll_i2info(inode);
3414	struct ll_sb_info    *sbi = ll_i2sbi(inode);
3415	struct ldlm_lock *lock;
3416	struct lustre_md md = { NULL };
3417	struct cl_object_conf conf;
3418	int rc = 0;
3419	bool lvb_ready;
3420	bool wait_layout = false;
3421
3422	LASSERT(lustre_handle_is_used(lockh));
3423
3424	lock = ldlm_handle2lock(lockh);
3425	LASSERT(lock != NULL);
3426	LASSERT(ldlm_has_layout(lock));
3427
3428	LDLM_DEBUG(lock, "File %p/"DFID" being reconfigured: %d.\n",
3429		   inode, PFID(&lli->lli_fid), reconf);
3430
3431	/* in case this is a caching lock and reinstate with new inode */
3432	md_set_lock_data(sbi->ll_md_exp, &lockh->cookie, inode, NULL);
3433
3434	lock_res_and_lock(lock);
3435	lvb_ready = !!(lock->l_flags & LDLM_FL_LVB_READY);
3436	unlock_res_and_lock(lock);
3437	/* checking lvb_ready is racy but this is okay. The worst case is
3438	 * that multi processes may configure the file on the same time. */
3439	if (lvb_ready || !reconf) {
3440		rc = -ENODATA;
3441		if (lvb_ready) {
3442			/* layout_gen must be valid if layout lock is not
3443			 * cancelled and stripe has already set */
3444			*gen = ll_layout_version_get(lli);
3445			rc = 0;
3446		}
3447		GOTO(out, rc);
3448	}
3449
3450	rc = ll_layout_fetch(inode, lock);
3451	if (rc < 0)
3452		GOTO(out, rc);
3453
3454	/* for layout lock, lmm is returned in lock's lvb.
3455	 * lvb_data is immutable if the lock is held so it's safe to access it
3456	 * without res lock. See the description in ldlm_lock_decref_internal()
3457	 * for the condition to free lvb_data of layout lock */
3458	if (lock->l_lvb_data != NULL) {
3459		rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
3460				  lock->l_lvb_data, lock->l_lvb_len);
3461		if (rc >= 0) {
3462			*gen = LL_LAYOUT_GEN_EMPTY;
3463			if (md.lsm != NULL)
3464				*gen = md.lsm->lsm_layout_gen;
3465			rc = 0;
3466		} else {
3467			CERROR("%s: file "DFID" unpackmd error: %d\n",
3468				ll_get_fsname(inode->i_sb, NULL, 0),
3469				PFID(&lli->lli_fid), rc);
3470		}
3471	}
3472	if (rc < 0)
3473		GOTO(out, rc);
3474
3475	/* set layout to file. Unlikely this will fail as old layout was
3476	 * surely eliminated */
3477	memset(&conf, 0, sizeof(conf));
3478	conf.coc_opc = OBJECT_CONF_SET;
3479	conf.coc_inode = inode;
3480	conf.coc_lock = lock;
3481	conf.u.coc_md = &md;
3482	rc = ll_layout_conf(inode, &conf);
3483
3484	if (md.lsm != NULL)
3485		obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
3486
3487	/* refresh layout failed, need to wait */
3488	wait_layout = rc == -EBUSY;
3489
3490out:
3491	LDLM_LOCK_PUT(lock);
3492	ldlm_lock_decref(lockh, mode);
3493
3494	/* wait for IO to complete if it's still being used. */
3495	if (wait_layout) {
3496		CDEBUG(D_INODE, "%s: %p/"DFID" wait for layout reconf.\n",
3497			ll_get_fsname(inode->i_sb, NULL, 0),
3498			inode, PFID(&lli->lli_fid));
3499
3500		memset(&conf, 0, sizeof(conf));
3501		conf.coc_opc = OBJECT_CONF_WAIT;
3502		conf.coc_inode = inode;
3503		rc = ll_layout_conf(inode, &conf);
3504		if (rc == 0)
3505			rc = -EAGAIN;
3506
3507		CDEBUG(D_INODE, "file: "DFID" waiting layout return: %d.\n",
3508			PFID(&lli->lli_fid), rc);
3509	}
3510	return rc;
3511}
3512
3513/**
3514 * This function checks if there exists a LAYOUT lock on the client side,
3515 * or enqueues it if it doesn't have one in cache.
3516 *
3517 * This function will not hold layout lock so it may be revoked any time after
3518 * this function returns. Any operations depend on layout should be redone
3519 * in that case.
3520 *
3521 * This function should be called before lov_io_init() to get an uptodate
3522 * layout version, the caller should save the version number and after IO
3523 * is finished, this function should be called again to verify that layout
3524 * is not changed during IO time.
3525 */
3526int ll_layout_refresh(struct inode *inode, __u32 *gen)
3527{
3528	struct ll_inode_info  *lli = ll_i2info(inode);
3529	struct ll_sb_info     *sbi = ll_i2sbi(inode);
3530	struct md_op_data     *op_data;
3531	struct lookup_intent   it;
3532	struct lustre_handle   lockh;
3533	ldlm_mode_t	       mode;
3534	struct ldlm_enqueue_info einfo = {
3535		.ei_type = LDLM_IBITS,
3536		.ei_mode = LCK_CR,
3537		.ei_cb_bl = ll_md_blocking_ast,
3538		.ei_cb_cp = ldlm_completion_ast,
3539	};
3540	int rc;
3541
3542	*gen = ll_layout_version_get(lli);
3543	if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK) || *gen != LL_LAYOUT_GEN_NONE)
3544		return 0;
3545
3546	/* sanity checks */
3547	LASSERT(fid_is_sane(ll_inode2fid(inode)));
3548	LASSERT(S_ISREG(inode->i_mode));
3549
3550	/* take layout lock mutex to enqueue layout lock exclusively. */
3551	mutex_lock(&lli->lli_layout_mutex);
3552
3553again:
3554	/* mostly layout lock is caching on the local side, so try to match
3555	 * it before grabbing layout lock mutex. */
3556	mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0,
3557			       LCK_CR | LCK_CW | LCK_PR | LCK_PW);
3558	if (mode != 0) { /* hit cached lock */
3559		rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
3560		if (rc == -EAGAIN)
3561			goto again;
3562
3563		mutex_unlock(&lli->lli_layout_mutex);
3564		return rc;
3565	}
3566
3567	op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
3568			0, 0, LUSTRE_OPC_ANY, NULL);
3569	if (IS_ERR(op_data)) {
3570		mutex_unlock(&lli->lli_layout_mutex);
3571		return PTR_ERR(op_data);
3572	}
3573
3574	/* have to enqueue one */
3575	memset(&it, 0, sizeof(it));
3576	it.it_op = IT_LAYOUT;
3577	lockh.cookie = 0ULL;
3578
3579	LDLM_DEBUG_NOLOCK("%s: requeue layout lock for file %p/"DFID".\n",
3580			ll_get_fsname(inode->i_sb, NULL, 0), inode,
3581			PFID(&lli->lli_fid));
3582
3583	rc = md_enqueue(sbi->ll_md_exp, &einfo, &it, op_data, &lockh,
3584			NULL, 0, NULL, 0);
3585	if (it.d.lustre.it_data != NULL)
3586		ptlrpc_req_finished(it.d.lustre.it_data);
3587	it.d.lustre.it_data = NULL;
3588
3589	ll_finish_md_op_data(op_data);
3590
3591	mode = it.d.lustre.it_lock_mode;
3592	it.d.lustre.it_lock_mode = 0;
3593	ll_intent_drop_lock(&it);
3594
3595	if (rc == 0) {
3596		/* set lock data in case this is a new lock */
3597		ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
3598		rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
3599		if (rc == -EAGAIN)
3600			goto again;
3601	}
3602	mutex_unlock(&lli->lli_layout_mutex);
3603
3604	return rc;
3605}
3606
3607/**
3608 *  This function send a restore request to the MDT
3609 */
3610int ll_layout_restore(struct inode *inode)
3611{
3612	struct hsm_user_request	*hur;
3613	int			 len, rc;
3614
3615	len = sizeof(struct hsm_user_request) +
3616	      sizeof(struct hsm_user_item);
3617	OBD_ALLOC(hur, len);
3618	if (hur == NULL)
3619		return -ENOMEM;
3620
3621	hur->hur_request.hr_action = HUA_RESTORE;
3622	hur->hur_request.hr_archive_id = 0;
3623	hur->hur_request.hr_flags = 0;
3624	memcpy(&hur->hur_user_item[0].hui_fid, &ll_i2info(inode)->lli_fid,
3625	       sizeof(hur->hur_user_item[0].hui_fid));
3626	hur->hur_user_item[0].hui_extent.length = -1;
3627	hur->hur_request.hr_itemcount = 1;
3628	rc = obd_iocontrol(LL_IOC_HSM_REQUEST, cl_i2sbi(inode)->ll_md_exp,
3629			   len, hur, NULL);
3630	OBD_FREE(hur, len);
3631	return rc;
3632}
3633