[go: nahoru, domu]

1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 *
36 * Implementation of cl_io for OSC layer.
37 *
38 *   Author: Nikita Danilov <nikita.danilov@sun.com>
39 *   Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
40 */
41
42#define DEBUG_SUBSYSTEM S_OSC
43
44#include "osc_cl_internal.h"
45
46/** \addtogroup osc
47 *  @{
48 */
49
50/*****************************************************************************
51 *
52 * Type conversions.
53 *
54 */
55
56static struct osc_req *cl2osc_req(const struct cl_req_slice *slice)
57{
58	LINVRNT(slice->crs_dev->cd_lu_dev.ld_type == &osc_device_type);
59	return container_of0(slice, struct osc_req, or_cl);
60}
61
62static struct osc_io *cl2osc_io(const struct lu_env *env,
63				const struct cl_io_slice *slice)
64{
65	struct osc_io *oio = container_of0(slice, struct osc_io, oi_cl);
66
67	LINVRNT(oio == osc_env_io(env));
68	return oio;
69}
70
71static struct osc_page *osc_cl_page_osc(struct cl_page *page)
72{
73	const struct cl_page_slice *slice;
74
75	slice = cl_page_at(page, &osc_device_type);
76	LASSERT(slice != NULL);
77
78	return cl2osc_page(slice);
79}
80
81
82/*****************************************************************************
83 *
84 * io operations.
85 *
86 */
87
88static void osc_io_fini(const struct lu_env *env, const struct cl_io_slice *io)
89{
90}
91
92/**
93 * An implementation of cl_io_operations::cio_io_submit() method for osc
94 * layer. Iterates over pages in the in-queue, prepares each for io by calling
95 * cl_page_prep() and then either submits them through osc_io_submit_page()
96 * or, if page is already submitted, changes osc flags through
97 * osc_set_async_flags().
98 */
99static int osc_io_submit(const struct lu_env *env,
100			 const struct cl_io_slice *ios,
101			 enum cl_req_type crt, struct cl_2queue *queue)
102{
103	struct cl_page    *page;
104	struct cl_page    *tmp;
105	struct client_obd *cli  = NULL;
106	struct osc_object *osc  = NULL; /* to keep gcc happy */
107	struct osc_page   *opg;
108	struct cl_io      *io;
109	LIST_HEAD(list);
110
111	struct cl_page_list *qin      = &queue->c2_qin;
112	struct cl_page_list *qout     = &queue->c2_qout;
113	int queued = 0;
114	int result = 0;
115	int cmd;
116	int brw_flags;
117	int max_pages;
118
119	LASSERT(qin->pl_nr > 0);
120
121	CDEBUG(D_CACHE, "%d %d\n", qin->pl_nr, crt);
122
123	osc = cl2osc(ios->cis_obj);
124	cli = osc_cli(osc);
125	max_pages = cli->cl_max_pages_per_rpc;
126
127	cmd = crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ;
128	brw_flags = osc_io_srvlock(cl2osc_io(env, ios)) ? OBD_BRW_SRVLOCK : 0;
129
130	/*
131	 * NOTE: here @page is a top-level page. This is done to avoid
132	 *       creation of sub-page-list.
133	 */
134	cl_page_list_for_each_safe(page, tmp, qin) {
135		struct osc_async_page *oap;
136
137		/* Top level IO. */
138		io = page->cp_owner;
139		LASSERT(io != NULL);
140
141		opg = osc_cl_page_osc(page);
142		oap = &opg->ops_oap;
143		LASSERT(osc == oap->oap_obj);
144
145		if (!list_empty(&oap->oap_pending_item) ||
146		    !list_empty(&oap->oap_rpc_item)) {
147			CDEBUG(D_CACHE, "Busy oap %p page %p for submit.\n",
148			       oap, opg);
149			result = -EBUSY;
150			break;
151		}
152
153		result = cl_page_prep(env, io, page, crt);
154		if (result != 0) {
155			LASSERT(result < 0);
156			if (result != -EALREADY)
157				break;
158			/*
159			 * Handle -EALREADY error: for read case, the page is
160			 * already in UPTODATE state; for write, the page
161			 * is not dirty.
162			 */
163			result = 0;
164			continue;
165		}
166
167		cl_page_list_move(qout, qin, page);
168		oap->oap_async_flags = ASYNC_URGENT|ASYNC_READY;
169		oap->oap_async_flags |= ASYNC_COUNT_STABLE;
170
171		osc_page_submit(env, opg, crt, brw_flags);
172		list_add_tail(&oap->oap_pending_item, &list);
173		if (++queued == max_pages) {
174			queued = 0;
175			result = osc_queue_sync_pages(env, osc, &list, cmd,
176						      brw_flags);
177			if (result < 0)
178				break;
179		}
180	}
181
182	if (queued > 0)
183		result = osc_queue_sync_pages(env, osc, &list, cmd, brw_flags);
184
185	CDEBUG(D_INFO, "%d/%d %d\n", qin->pl_nr, qout->pl_nr, result);
186	return qout->pl_nr > 0 ? 0 : result;
187}
188
189static void osc_page_touch_at(const struct lu_env *env,
190			      struct cl_object *obj, pgoff_t idx, unsigned to)
191{
192	struct lov_oinfo  *loi  = cl2osc(obj)->oo_oinfo;
193	struct cl_attr    *attr = &osc_env_info(env)->oti_attr;
194	int valid;
195	__u64 kms;
196
197	/* offset within stripe */
198	kms = cl_offset(obj, idx) + to;
199
200	cl_object_attr_lock(obj);
201	/*
202	 * XXX old code used
203	 *
204	 *	 ll_inode_size_lock(inode, 0); lov_stripe_lock(lsm);
205	 *
206	 * here
207	 */
208	CDEBUG(D_INODE, "stripe KMS %sincreasing %llu->%llu %llu\n",
209	       kms > loi->loi_kms ? "" : "not ", loi->loi_kms, kms,
210	       loi->loi_lvb.lvb_size);
211
212	valid = 0;
213	if (kms > loi->loi_kms) {
214		attr->cat_kms = kms;
215		valid |= CAT_KMS;
216	}
217	if (kms > loi->loi_lvb.lvb_size) {
218		attr->cat_size = kms;
219		valid |= CAT_SIZE;
220	}
221	cl_object_attr_set(env, obj, attr, valid);
222	cl_object_attr_unlock(obj);
223}
224
225/**
226 * This is called when a page is accessed within file in a way that creates
227 * new page, if one were missing (i.e., if there were a hole at that place in
228 * the file, or accessed page is beyond the current file size). Examples:
229 * ->commit_write() and ->nopage() methods.
230 *
231 * Expand stripe KMS if necessary.
232 */
233static void osc_page_touch(const struct lu_env *env,
234			   struct osc_page *opage, unsigned to)
235{
236	struct cl_page    *page = opage->ops_cl.cpl_page;
237	struct cl_object  *obj  = opage->ops_cl.cpl_obj;
238
239	osc_page_touch_at(env, obj, page->cp_index, to);
240}
241
242/**
243 * Implements cl_io_operations::cio_prepare_write() method for osc layer.
244 *
245 * \retval -EIO transfer initiated against this osc will most likely fail
246 * \retval 0    transfer initiated against this osc will most likely succeed.
247 *
248 * The reason for this check is to immediately return an error to the caller
249 * in the case of a deactivated import. Note, that import can be deactivated
250 * later, while pages, dirtied by this IO, are still in the cache, but this is
251 * irrelevant, because that would still return an error to the application (if
252 * it does fsync), but many applications don't do fsync because of performance
253 * issues, and we wanted to return an -EIO at write time to notify the
254 * application.
255 */
256static int osc_io_prepare_write(const struct lu_env *env,
257				const struct cl_io_slice *ios,
258				const struct cl_page_slice *slice,
259				unsigned from, unsigned to)
260{
261	struct osc_device *dev = lu2osc_dev(slice->cpl_obj->co_lu.lo_dev);
262	struct obd_import *imp = class_exp2cliimp(dev->od_exp);
263	struct osc_io     *oio = cl2osc_io(env, ios);
264	int result = 0;
265
266	/*
267	 * This implements OBD_BRW_CHECK logic from old client.
268	 */
269
270	if (imp == NULL || imp->imp_invalid)
271		result = -EIO;
272	if (result == 0 && oio->oi_lockless)
273		/* this page contains `invalid' data, but who cares?
274		 * nobody can access the invalid data.
275		 * in osc_io_commit_write(), we're going to write exact
276		 * [from, to) bytes of this page to OST. -jay */
277		cl_page_export(env, slice->cpl_page, 1);
278
279	return result;
280}
281
282static int osc_io_commit_write(const struct lu_env *env,
283			       const struct cl_io_slice *ios,
284			       const struct cl_page_slice *slice,
285			       unsigned from, unsigned to)
286{
287	struct osc_io	 *oio = cl2osc_io(env, ios);
288	struct osc_page       *opg = cl2osc_page(slice);
289	struct osc_object     *obj = cl2osc(opg->ops_cl.cpl_obj);
290	struct osc_async_page *oap = &opg->ops_oap;
291
292	LASSERT(to > 0);
293	/*
294	 * XXX instead of calling osc_page_touch() here and in
295	 * osc_io_fault_start() it might be more logical to introduce
296	 * cl_page_touch() method, that generic cl_io_commit_write() and page
297	 * fault code calls.
298	 */
299	osc_page_touch(env, cl2osc_page(slice), to);
300	if (!client_is_remote(osc_export(obj)) &&
301	    capable(CFS_CAP_SYS_RESOURCE))
302		oap->oap_brw_flags |= OBD_BRW_NOQUOTA;
303
304	if (oio->oi_lockless)
305		/* see osc_io_prepare_write() for lockless io handling. */
306		cl_page_clip(env, slice->cpl_page, from, to);
307
308	return 0;
309}
310
311static int osc_io_fault_start(const struct lu_env *env,
312			      const struct cl_io_slice *ios)
313{
314	struct cl_io       *io;
315	struct cl_fault_io *fio;
316
317	io  = ios->cis_io;
318	fio = &io->u.ci_fault;
319	CDEBUG(D_INFO, "%lu %d %d\n",
320	       fio->ft_index, fio->ft_writable, fio->ft_nob);
321	/*
322	 * If mapping is writeable, adjust kms to cover this page,
323	 * but do not extend kms beyond actual file size.
324	 * See bug 10919.
325	 */
326	if (fio->ft_writable)
327		osc_page_touch_at(env, ios->cis_obj,
328				  fio->ft_index, fio->ft_nob);
329	return 0;
330}
331
332static int osc_async_upcall(void *a, int rc)
333{
334	struct osc_async_cbargs *args = a;
335
336	args->opc_rc = rc;
337	complete(&args->opc_sync);
338	return 0;
339}
340
341/**
342 * Checks that there are no pages being written in the extent being truncated.
343 */
344static int trunc_check_cb(const struct lu_env *env, struct cl_io *io,
345			  struct cl_page *page, void *cbdata)
346{
347	const struct cl_page_slice *slice;
348	struct osc_page *ops;
349	struct osc_async_page *oap;
350	__u64 start = *(__u64 *)cbdata;
351
352	slice = cl_page_at(page, &osc_device_type);
353	LASSERT(slice != NULL);
354	ops = cl2osc_page(slice);
355	oap = &ops->ops_oap;
356
357	if (oap->oap_cmd & OBD_BRW_WRITE &&
358	    !list_empty(&oap->oap_pending_item))
359		CL_PAGE_DEBUG(D_ERROR, env, page, "exists %llu/%s.\n",
360				start, current->comm);
361
362	{
363		struct page *vmpage = cl_page_vmpage(env, page);
364
365		if (PageLocked(vmpage))
366			CDEBUG(D_CACHE, "page %p index %lu locked for %d.\n",
367			       ops, page->cp_index,
368			       (oap->oap_cmd & OBD_BRW_RWMASK));
369	}
370
371	return CLP_GANG_OKAY;
372}
373
374static void osc_trunc_check(const struct lu_env *env, struct cl_io *io,
375			    struct osc_io *oio, __u64 size)
376{
377	struct cl_object *clob;
378	int     partial;
379	pgoff_t start;
380
381	clob    = oio->oi_cl.cis_obj;
382	start   = cl_index(clob, size);
383	partial = cl_offset(clob, start) < size;
384
385	/*
386	 * Complain if there are pages in the truncated region.
387	 */
388	cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF,
389			    trunc_check_cb, (void *)&size);
390}
391
392static int osc_io_setattr_start(const struct lu_env *env,
393				const struct cl_io_slice *slice)
394{
395	struct cl_io	    *io     = slice->cis_io;
396	struct osc_io	   *oio    = cl2osc_io(env, slice);
397	struct cl_object	*obj    = slice->cis_obj;
398	struct lov_oinfo	*loi    = cl2osc(obj)->oo_oinfo;
399	struct cl_attr	  *attr   = &osc_env_info(env)->oti_attr;
400	struct obdo	     *oa     = &oio->oi_oa;
401	struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
402	__u64		    size   = io->u.ci_setattr.sa_attr.lvb_size;
403	unsigned int	     ia_valid = io->u.ci_setattr.sa_valid;
404	int		      result = 0;
405	struct obd_info	  oinfo = { { { 0 } } };
406
407	/* truncate cache dirty pages first */
408	if (cl_io_is_trunc(io))
409		result = osc_cache_truncate_start(env, oio, cl2osc(obj), size);
410
411	if (result == 0 && oio->oi_lockless == 0) {
412		cl_object_attr_lock(obj);
413		result = cl_object_attr_get(env, obj, attr);
414		if (result == 0) {
415			struct ost_lvb *lvb = &io->u.ci_setattr.sa_attr;
416			unsigned int cl_valid = 0;
417
418			if (ia_valid & ATTR_SIZE) {
419				attr->cat_size = attr->cat_kms = size;
420				cl_valid = (CAT_SIZE | CAT_KMS);
421			}
422			if (ia_valid & ATTR_MTIME_SET) {
423				attr->cat_mtime = lvb->lvb_mtime;
424				cl_valid |= CAT_MTIME;
425			}
426			if (ia_valid & ATTR_ATIME_SET) {
427				attr->cat_atime = lvb->lvb_atime;
428				cl_valid |= CAT_ATIME;
429			}
430			if (ia_valid & ATTR_CTIME_SET) {
431				attr->cat_ctime = lvb->lvb_ctime;
432				cl_valid |= CAT_CTIME;
433			}
434			result = cl_object_attr_set(env, obj, attr, cl_valid);
435		}
436		cl_object_attr_unlock(obj);
437	}
438	memset(oa, 0, sizeof(*oa));
439	if (result == 0) {
440		oa->o_oi = loi->loi_oi;
441		oa->o_mtime = attr->cat_mtime;
442		oa->o_atime = attr->cat_atime;
443		oa->o_ctime = attr->cat_ctime;
444		oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLATIME |
445			OBD_MD_FLCTIME | OBD_MD_FLMTIME;
446		if (ia_valid & ATTR_SIZE) {
447			oa->o_size = size;
448			oa->o_blocks = OBD_OBJECT_EOF;
449			oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
450
451			if (oio->oi_lockless) {
452				oa->o_flags = OBD_FL_SRVLOCK;
453				oa->o_valid |= OBD_MD_FLFLAGS;
454			}
455		} else {
456			LASSERT(oio->oi_lockless == 0);
457		}
458
459		oinfo.oi_oa = oa;
460		oinfo.oi_capa = io->u.ci_setattr.sa_capa;
461		init_completion(&cbargs->opc_sync);
462
463		if (ia_valid & ATTR_SIZE)
464			result = osc_punch_base(osc_export(cl2osc(obj)),
465						&oinfo, osc_async_upcall,
466						cbargs, PTLRPCD_SET);
467		else
468			result = osc_setattr_async_base(osc_export(cl2osc(obj)),
469							&oinfo, NULL,
470							osc_async_upcall,
471							cbargs, PTLRPCD_SET);
472		cbargs->opc_rpc_sent = result == 0;
473	}
474	return result;
475}
476
477static void osc_io_setattr_end(const struct lu_env *env,
478			       const struct cl_io_slice *slice)
479{
480	struct cl_io     *io  = slice->cis_io;
481	struct osc_io    *oio = cl2osc_io(env, slice);
482	struct cl_object *obj = slice->cis_obj;
483	struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
484	int result = 0;
485
486	if (cbargs->opc_rpc_sent) {
487		wait_for_completion(&cbargs->opc_sync);
488		result = io->ci_result = cbargs->opc_rc;
489	}
490	if (result == 0) {
491		if (oio->oi_lockless) {
492			/* lockless truncate */
493			struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
494
495			LASSERT(cl_io_is_trunc(io));
496			/* XXX: Need a lock. */
497			osd->od_stats.os_lockless_truncates++;
498		}
499	}
500
501	if (cl_io_is_trunc(io)) {
502		__u64 size = io->u.ci_setattr.sa_attr.lvb_size;
503
504		osc_trunc_check(env, io, oio, size);
505		if (oio->oi_trunc != NULL) {
506			osc_cache_truncate_end(env, oio, cl2osc(obj));
507			oio->oi_trunc = NULL;
508		}
509	}
510}
511
512static int osc_io_read_start(const struct lu_env *env,
513			     const struct cl_io_slice *slice)
514{
515	struct cl_object *obj   = slice->cis_obj;
516	struct cl_attr   *attr  = &osc_env_info(env)->oti_attr;
517	int rc = 0;
518
519	if (!slice->cis_io->ci_noatime) {
520		cl_object_attr_lock(obj);
521		attr->cat_atime = LTIME_S(CURRENT_TIME);
522		rc = cl_object_attr_set(env, obj, attr, CAT_ATIME);
523		cl_object_attr_unlock(obj);
524	}
525	return rc;
526}
527
528static int osc_io_write_start(const struct lu_env *env,
529			      const struct cl_io_slice *slice)
530{
531	struct cl_object *obj   = slice->cis_obj;
532	struct cl_attr   *attr  = &osc_env_info(env)->oti_attr;
533	int rc = 0;
534
535	OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_SETTIME, 1);
536	cl_object_attr_lock(obj);
537	attr->cat_mtime = attr->cat_ctime = LTIME_S(CURRENT_TIME);
538	rc = cl_object_attr_set(env, obj, attr, CAT_MTIME | CAT_CTIME);
539	cl_object_attr_unlock(obj);
540
541	return rc;
542}
543
544static int osc_fsync_ost(const struct lu_env *env, struct osc_object *obj,
545			 struct cl_fsync_io *fio)
546{
547	struct osc_io    *oio   = osc_env_io(env);
548	struct obdo      *oa    = &oio->oi_oa;
549	struct obd_info  *oinfo = &oio->oi_info;
550	struct lov_oinfo *loi   = obj->oo_oinfo;
551	struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
552	int rc = 0;
553
554	memset(oa, 0, sizeof(*oa));
555	oa->o_oi = loi->loi_oi;
556	oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
557
558	/* reload size abd blocks for start and end of sync range */
559	oa->o_size = fio->fi_start;
560	oa->o_blocks = fio->fi_end;
561	oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
562
563	obdo_set_parent_fid(oa, fio->fi_fid);
564
565	memset(oinfo, 0, sizeof(*oinfo));
566	oinfo->oi_oa = oa;
567	oinfo->oi_capa = fio->fi_capa;
568	init_completion(&cbargs->opc_sync);
569
570	rc = osc_sync_base(osc_export(obj), oinfo, osc_async_upcall, cbargs,
571			   PTLRPCD_SET);
572	return rc;
573}
574
575static int osc_io_fsync_start(const struct lu_env *env,
576			      const struct cl_io_slice *slice)
577{
578	struct cl_io       *io  = slice->cis_io;
579	struct cl_fsync_io *fio = &io->u.ci_fsync;
580	struct cl_object   *obj = slice->cis_obj;
581	struct osc_object  *osc = cl2osc(obj);
582	pgoff_t start  = cl_index(obj, fio->fi_start);
583	pgoff_t end    = cl_index(obj, fio->fi_end);
584	int     result = 0;
585
586	if (fio->fi_end == OBD_OBJECT_EOF)
587		end = CL_PAGE_EOF;
588
589	result = osc_cache_writeback_range(env, osc, start, end, 0,
590					   fio->fi_mode == CL_FSYNC_DISCARD);
591	if (result > 0) {
592		fio->fi_nr_written += result;
593		result = 0;
594	}
595	if (fio->fi_mode == CL_FSYNC_ALL) {
596		int rc;
597
598		/* we have to wait for writeback to finish before we can
599		 * send OST_SYNC RPC. This is bad because it causes extents
600		 * to be written osc by osc. However, we usually start
601		 * writeback before CL_FSYNC_ALL so this won't have any real
602		 * problem. */
603		rc = osc_cache_wait_range(env, osc, start, end);
604		if (result == 0)
605			result = rc;
606		rc = osc_fsync_ost(env, osc, fio);
607		if (result == 0)
608			result = rc;
609	}
610
611	return result;
612}
613
614static void osc_io_fsync_end(const struct lu_env *env,
615			     const struct cl_io_slice *slice)
616{
617	struct cl_fsync_io *fio = &slice->cis_io->u.ci_fsync;
618	struct cl_object   *obj = slice->cis_obj;
619	pgoff_t start = cl_index(obj, fio->fi_start);
620	pgoff_t end   = cl_index(obj, fio->fi_end);
621	int result = 0;
622
623	if (fio->fi_mode == CL_FSYNC_LOCAL) {
624		result = osc_cache_wait_range(env, cl2osc(obj), start, end);
625	} else if (fio->fi_mode == CL_FSYNC_ALL) {
626		struct osc_io	   *oio    = cl2osc_io(env, slice);
627		struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
628
629		wait_for_completion(&cbargs->opc_sync);
630		if (result == 0)
631			result = cbargs->opc_rc;
632	}
633	slice->cis_io->ci_result = result;
634}
635
636static void osc_io_end(const struct lu_env *env,
637		       const struct cl_io_slice *slice)
638{
639	struct osc_io *oio = cl2osc_io(env, slice);
640
641	if (oio->oi_active) {
642		osc_extent_release(env, oio->oi_active);
643		oio->oi_active = NULL;
644	}
645}
646
647static const struct cl_io_operations osc_io_ops = {
648	.op = {
649		[CIT_READ] = {
650			.cio_start  = osc_io_read_start,
651			.cio_fini   = osc_io_fini
652		},
653		[CIT_WRITE] = {
654			.cio_start  = osc_io_write_start,
655			.cio_end    = osc_io_end,
656			.cio_fini   = osc_io_fini
657		},
658		[CIT_SETATTR] = {
659			.cio_start  = osc_io_setattr_start,
660			.cio_end    = osc_io_setattr_end
661		},
662		[CIT_FAULT] = {
663			.cio_start  = osc_io_fault_start,
664			.cio_end    = osc_io_end,
665			.cio_fini   = osc_io_fini
666		},
667		[CIT_FSYNC] = {
668			.cio_start  = osc_io_fsync_start,
669			.cio_end    = osc_io_fsync_end,
670			.cio_fini   = osc_io_fini
671		},
672		[CIT_MISC] = {
673			.cio_fini   = osc_io_fini
674		}
675	},
676	.req_op = {
677		 [CRT_READ] = {
678			 .cio_submit    = osc_io_submit
679		 },
680		 [CRT_WRITE] = {
681			 .cio_submit    = osc_io_submit
682		 }
683	 },
684	.cio_prepare_write = osc_io_prepare_write,
685	.cio_commit_write  = osc_io_commit_write
686};
687
688/*****************************************************************************
689 *
690 * Transfer operations.
691 *
692 */
693
694static int osc_req_prep(const struct lu_env *env,
695			const struct cl_req_slice *slice)
696{
697	return 0;
698}
699
700static void osc_req_completion(const struct lu_env *env,
701			       const struct cl_req_slice *slice, int ioret)
702{
703	struct osc_req *or;
704
705	or = cl2osc_req(slice);
706	OBD_SLAB_FREE_PTR(or, osc_req_kmem);
707}
708
709/**
710 * Implementation of struct cl_req_operations::cro_attr_set() for osc
711 * layer. osc is responsible for struct obdo::o_id and struct obdo::o_seq
712 * fields.
713 */
714static void osc_req_attr_set(const struct lu_env *env,
715			     const struct cl_req_slice *slice,
716			     const struct cl_object *obj,
717			     struct cl_req_attr *attr, u64 flags)
718{
719	struct lov_oinfo *oinfo;
720	struct cl_req    *clerq;
721	struct cl_page   *apage; /* _some_ page in @clerq */
722	struct cl_lock   *lock;  /* _some_ lock protecting @apage */
723	struct osc_lock  *olck;
724	struct osc_page  *opg;
725	struct obdo      *oa;
726	struct ost_lvb   *lvb;
727
728	oinfo	= cl2osc(obj)->oo_oinfo;
729	lvb	= &oinfo->loi_lvb;
730	oa	= attr->cra_oa;
731
732	if ((flags & OBD_MD_FLMTIME) != 0) {
733		oa->o_mtime = lvb->lvb_mtime;
734		oa->o_valid |= OBD_MD_FLMTIME;
735	}
736	if ((flags & OBD_MD_FLATIME) != 0) {
737		oa->o_atime = lvb->lvb_atime;
738		oa->o_valid |= OBD_MD_FLATIME;
739	}
740	if ((flags & OBD_MD_FLCTIME) != 0) {
741		oa->o_ctime = lvb->lvb_ctime;
742		oa->o_valid |= OBD_MD_FLCTIME;
743	}
744	if (flags & OBD_MD_FLGROUP) {
745		ostid_set_seq(&oa->o_oi, ostid_seq(&oinfo->loi_oi));
746		oa->o_valid |= OBD_MD_FLGROUP;
747	}
748	if (flags & OBD_MD_FLID) {
749		ostid_set_id(&oa->o_oi, ostid_id(&oinfo->loi_oi));
750		oa->o_valid |= OBD_MD_FLID;
751	}
752	if (flags & OBD_MD_FLHANDLE) {
753		clerq = slice->crs_req;
754		LASSERT(!list_empty(&clerq->crq_pages));
755		apage = container_of(clerq->crq_pages.next,
756				     struct cl_page, cp_flight);
757		opg = osc_cl_page_osc(apage);
758		apage = opg->ops_cl.cpl_page; /* now apage is a sub-page */
759		lock = cl_lock_at_page(env, apage->cp_obj, apage, NULL, 1, 1);
760		if (lock == NULL) {
761			struct cl_object_header *head;
762			struct cl_lock	  *scan;
763
764			head = cl_object_header(apage->cp_obj);
765			list_for_each_entry(scan, &head->coh_locks,
766						cll_linkage)
767				CL_LOCK_DEBUG(D_ERROR, env, scan,
768					      "no cover page!\n");
769			CL_PAGE_DEBUG(D_ERROR, env, apage,
770				      "dump uncover page!\n");
771			dump_stack();
772			LBUG();
773		}
774
775		olck = osc_lock_at(lock);
776		LASSERT(olck != NULL);
777		LASSERT(ergo(opg->ops_srvlock, olck->ols_lock == NULL));
778		/* check for lockless io. */
779		if (olck->ols_lock != NULL) {
780			oa->o_handle = olck->ols_lock->l_remote_handle;
781			oa->o_valid |= OBD_MD_FLHANDLE;
782		}
783		cl_lock_put(env, lock);
784	}
785}
786
787static const struct cl_req_operations osc_req_ops = {
788	.cro_prep       = osc_req_prep,
789	.cro_attr_set   = osc_req_attr_set,
790	.cro_completion = osc_req_completion
791};
792
793
794int osc_io_init(const struct lu_env *env,
795		struct cl_object *obj, struct cl_io *io)
796{
797	struct osc_io *oio = osc_env_io(env);
798
799	CL_IO_SLICE_CLEAN(oio, oi_cl);
800	cl_io_slice_add(io, &oio->oi_cl, obj, &osc_io_ops);
801	return 0;
802}
803
804int osc_req_init(const struct lu_env *env, struct cl_device *dev,
805		 struct cl_req *req)
806{
807	struct osc_req *or;
808	int result;
809
810	OBD_SLAB_ALLOC_PTR_GFP(or, osc_req_kmem, GFP_NOFS);
811	if (or != NULL) {
812		cl_req_slice_add(req, &or->or_cl, dev, &osc_req_ops);
813		result = 0;
814	} else
815		result = -ENOMEM;
816	return result;
817}
818
819/** @} osc */
820