1/* 2 * GPL HEADER START 3 * 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License version 2 only, 8 * as published by the Free Software Foundation. 9 * 10 * This program is distributed in the hope that it will be useful, but 11 * WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * General Public License version 2 for more details (a copy is included 14 * in the LICENSE file that accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License 17 * version 2 along with this program; If not, see 18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf 19 * 20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 21 * CA 95054 USA or visit www.sun.com if you need additional information or 22 * have any questions. 23 * 24 * GPL HEADER END 25 */ 26/* 27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. 28 * Use is subject to license terms. 29 * 30 * Copyright (c) 2011, 2012, Intel Corporation. 31 */ 32/* 33 * This file is part of Lustre, http://www.lustre.org/ 34 * Lustre is a trademark of Sun Microsystems, Inc. 35 * 36 * Implementation of cl_io for OSC layer. 37 * 38 * Author: Nikita Danilov <nikita.danilov@sun.com> 39 * Author: Jinshan Xiong <jinshan.xiong@whamcloud.com> 40 */ 41 42#define DEBUG_SUBSYSTEM S_OSC 43 44#include "osc_cl_internal.h" 45 46/** \addtogroup osc 47 * @{ 48 */ 49 50/***************************************************************************** 51 * 52 * Type conversions. 53 * 54 */ 55 56static struct osc_req *cl2osc_req(const struct cl_req_slice *slice) 57{ 58 LINVRNT(slice->crs_dev->cd_lu_dev.ld_type == &osc_device_type); 59 return container_of0(slice, struct osc_req, or_cl); 60} 61 62static struct osc_io *cl2osc_io(const struct lu_env *env, 63 const struct cl_io_slice *slice) 64{ 65 struct osc_io *oio = container_of0(slice, struct osc_io, oi_cl); 66 67 LINVRNT(oio == osc_env_io(env)); 68 return oio; 69} 70 71static struct osc_page *osc_cl_page_osc(struct cl_page *page) 72{ 73 const struct cl_page_slice *slice; 74 75 slice = cl_page_at(page, &osc_device_type); 76 LASSERT(slice != NULL); 77 78 return cl2osc_page(slice); 79} 80 81 82/***************************************************************************** 83 * 84 * io operations. 85 * 86 */ 87 88static void osc_io_fini(const struct lu_env *env, const struct cl_io_slice *io) 89{ 90} 91 92/** 93 * An implementation of cl_io_operations::cio_io_submit() method for osc 94 * layer. Iterates over pages in the in-queue, prepares each for io by calling 95 * cl_page_prep() and then either submits them through osc_io_submit_page() 96 * or, if page is already submitted, changes osc flags through 97 * osc_set_async_flags(). 98 */ 99static int osc_io_submit(const struct lu_env *env, 100 const struct cl_io_slice *ios, 101 enum cl_req_type crt, struct cl_2queue *queue) 102{ 103 struct cl_page *page; 104 struct cl_page *tmp; 105 struct client_obd *cli = NULL; 106 struct osc_object *osc = NULL; /* to keep gcc happy */ 107 struct osc_page *opg; 108 struct cl_io *io; 109 LIST_HEAD(list); 110 111 struct cl_page_list *qin = &queue->c2_qin; 112 struct cl_page_list *qout = &queue->c2_qout; 113 int queued = 0; 114 int result = 0; 115 int cmd; 116 int brw_flags; 117 int max_pages; 118 119 LASSERT(qin->pl_nr > 0); 120 121 CDEBUG(D_CACHE, "%d %d\n", qin->pl_nr, crt); 122 123 osc = cl2osc(ios->cis_obj); 124 cli = osc_cli(osc); 125 max_pages = cli->cl_max_pages_per_rpc; 126 127 cmd = crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ; 128 brw_flags = osc_io_srvlock(cl2osc_io(env, ios)) ? OBD_BRW_SRVLOCK : 0; 129 130 /* 131 * NOTE: here @page is a top-level page. This is done to avoid 132 * creation of sub-page-list. 133 */ 134 cl_page_list_for_each_safe(page, tmp, qin) { 135 struct osc_async_page *oap; 136 137 /* Top level IO. */ 138 io = page->cp_owner; 139 LASSERT(io != NULL); 140 141 opg = osc_cl_page_osc(page); 142 oap = &opg->ops_oap; 143 LASSERT(osc == oap->oap_obj); 144 145 if (!list_empty(&oap->oap_pending_item) || 146 !list_empty(&oap->oap_rpc_item)) { 147 CDEBUG(D_CACHE, "Busy oap %p page %p for submit.\n", 148 oap, opg); 149 result = -EBUSY; 150 break; 151 } 152 153 result = cl_page_prep(env, io, page, crt); 154 if (result != 0) { 155 LASSERT(result < 0); 156 if (result != -EALREADY) 157 break; 158 /* 159 * Handle -EALREADY error: for read case, the page is 160 * already in UPTODATE state; for write, the page 161 * is not dirty. 162 */ 163 result = 0; 164 continue; 165 } 166 167 cl_page_list_move(qout, qin, page); 168 oap->oap_async_flags = ASYNC_URGENT|ASYNC_READY; 169 oap->oap_async_flags |= ASYNC_COUNT_STABLE; 170 171 osc_page_submit(env, opg, crt, brw_flags); 172 list_add_tail(&oap->oap_pending_item, &list); 173 if (++queued == max_pages) { 174 queued = 0; 175 result = osc_queue_sync_pages(env, osc, &list, cmd, 176 brw_flags); 177 if (result < 0) 178 break; 179 } 180 } 181 182 if (queued > 0) 183 result = osc_queue_sync_pages(env, osc, &list, cmd, brw_flags); 184 185 CDEBUG(D_INFO, "%d/%d %d\n", qin->pl_nr, qout->pl_nr, result); 186 return qout->pl_nr > 0 ? 0 : result; 187} 188 189static void osc_page_touch_at(const struct lu_env *env, 190 struct cl_object *obj, pgoff_t idx, unsigned to) 191{ 192 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo; 193 struct cl_attr *attr = &osc_env_info(env)->oti_attr; 194 int valid; 195 __u64 kms; 196 197 /* offset within stripe */ 198 kms = cl_offset(obj, idx) + to; 199 200 cl_object_attr_lock(obj); 201 /* 202 * XXX old code used 203 * 204 * ll_inode_size_lock(inode, 0); lov_stripe_lock(lsm); 205 * 206 * here 207 */ 208 CDEBUG(D_INODE, "stripe KMS %sincreasing %llu->%llu %llu\n", 209 kms > loi->loi_kms ? "" : "not ", loi->loi_kms, kms, 210 loi->loi_lvb.lvb_size); 211 212 valid = 0; 213 if (kms > loi->loi_kms) { 214 attr->cat_kms = kms; 215 valid |= CAT_KMS; 216 } 217 if (kms > loi->loi_lvb.lvb_size) { 218 attr->cat_size = kms; 219 valid |= CAT_SIZE; 220 } 221 cl_object_attr_set(env, obj, attr, valid); 222 cl_object_attr_unlock(obj); 223} 224 225/** 226 * This is called when a page is accessed within file in a way that creates 227 * new page, if one were missing (i.e., if there were a hole at that place in 228 * the file, or accessed page is beyond the current file size). Examples: 229 * ->commit_write() and ->nopage() methods. 230 * 231 * Expand stripe KMS if necessary. 232 */ 233static void osc_page_touch(const struct lu_env *env, 234 struct osc_page *opage, unsigned to) 235{ 236 struct cl_page *page = opage->ops_cl.cpl_page; 237 struct cl_object *obj = opage->ops_cl.cpl_obj; 238 239 osc_page_touch_at(env, obj, page->cp_index, to); 240} 241 242/** 243 * Implements cl_io_operations::cio_prepare_write() method for osc layer. 244 * 245 * \retval -EIO transfer initiated against this osc will most likely fail 246 * \retval 0 transfer initiated against this osc will most likely succeed. 247 * 248 * The reason for this check is to immediately return an error to the caller 249 * in the case of a deactivated import. Note, that import can be deactivated 250 * later, while pages, dirtied by this IO, are still in the cache, but this is 251 * irrelevant, because that would still return an error to the application (if 252 * it does fsync), but many applications don't do fsync because of performance 253 * issues, and we wanted to return an -EIO at write time to notify the 254 * application. 255 */ 256static int osc_io_prepare_write(const struct lu_env *env, 257 const struct cl_io_slice *ios, 258 const struct cl_page_slice *slice, 259 unsigned from, unsigned to) 260{ 261 struct osc_device *dev = lu2osc_dev(slice->cpl_obj->co_lu.lo_dev); 262 struct obd_import *imp = class_exp2cliimp(dev->od_exp); 263 struct osc_io *oio = cl2osc_io(env, ios); 264 int result = 0; 265 266 /* 267 * This implements OBD_BRW_CHECK logic from old client. 268 */ 269 270 if (imp == NULL || imp->imp_invalid) 271 result = -EIO; 272 if (result == 0 && oio->oi_lockless) 273 /* this page contains `invalid' data, but who cares? 274 * nobody can access the invalid data. 275 * in osc_io_commit_write(), we're going to write exact 276 * [from, to) bytes of this page to OST. -jay */ 277 cl_page_export(env, slice->cpl_page, 1); 278 279 return result; 280} 281 282static int osc_io_commit_write(const struct lu_env *env, 283 const struct cl_io_slice *ios, 284 const struct cl_page_slice *slice, 285 unsigned from, unsigned to) 286{ 287 struct osc_io *oio = cl2osc_io(env, ios); 288 struct osc_page *opg = cl2osc_page(slice); 289 struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj); 290 struct osc_async_page *oap = &opg->ops_oap; 291 292 LASSERT(to > 0); 293 /* 294 * XXX instead of calling osc_page_touch() here and in 295 * osc_io_fault_start() it might be more logical to introduce 296 * cl_page_touch() method, that generic cl_io_commit_write() and page 297 * fault code calls. 298 */ 299 osc_page_touch(env, cl2osc_page(slice), to); 300 if (!client_is_remote(osc_export(obj)) && 301 capable(CFS_CAP_SYS_RESOURCE)) 302 oap->oap_brw_flags |= OBD_BRW_NOQUOTA; 303 304 if (oio->oi_lockless) 305 /* see osc_io_prepare_write() for lockless io handling. */ 306 cl_page_clip(env, slice->cpl_page, from, to); 307 308 return 0; 309} 310 311static int osc_io_fault_start(const struct lu_env *env, 312 const struct cl_io_slice *ios) 313{ 314 struct cl_io *io; 315 struct cl_fault_io *fio; 316 317 io = ios->cis_io; 318 fio = &io->u.ci_fault; 319 CDEBUG(D_INFO, "%lu %d %d\n", 320 fio->ft_index, fio->ft_writable, fio->ft_nob); 321 /* 322 * If mapping is writeable, adjust kms to cover this page, 323 * but do not extend kms beyond actual file size. 324 * See bug 10919. 325 */ 326 if (fio->ft_writable) 327 osc_page_touch_at(env, ios->cis_obj, 328 fio->ft_index, fio->ft_nob); 329 return 0; 330} 331 332static int osc_async_upcall(void *a, int rc) 333{ 334 struct osc_async_cbargs *args = a; 335 336 args->opc_rc = rc; 337 complete(&args->opc_sync); 338 return 0; 339} 340 341/** 342 * Checks that there are no pages being written in the extent being truncated. 343 */ 344static int trunc_check_cb(const struct lu_env *env, struct cl_io *io, 345 struct cl_page *page, void *cbdata) 346{ 347 const struct cl_page_slice *slice; 348 struct osc_page *ops; 349 struct osc_async_page *oap; 350 __u64 start = *(__u64 *)cbdata; 351 352 slice = cl_page_at(page, &osc_device_type); 353 LASSERT(slice != NULL); 354 ops = cl2osc_page(slice); 355 oap = &ops->ops_oap; 356 357 if (oap->oap_cmd & OBD_BRW_WRITE && 358 !list_empty(&oap->oap_pending_item)) 359 CL_PAGE_DEBUG(D_ERROR, env, page, "exists %llu/%s.\n", 360 start, current->comm); 361 362 { 363 struct page *vmpage = cl_page_vmpage(env, page); 364 365 if (PageLocked(vmpage)) 366 CDEBUG(D_CACHE, "page %p index %lu locked for %d.\n", 367 ops, page->cp_index, 368 (oap->oap_cmd & OBD_BRW_RWMASK)); 369 } 370 371 return CLP_GANG_OKAY; 372} 373 374static void osc_trunc_check(const struct lu_env *env, struct cl_io *io, 375 struct osc_io *oio, __u64 size) 376{ 377 struct cl_object *clob; 378 int partial; 379 pgoff_t start; 380 381 clob = oio->oi_cl.cis_obj; 382 start = cl_index(clob, size); 383 partial = cl_offset(clob, start) < size; 384 385 /* 386 * Complain if there are pages in the truncated region. 387 */ 388 cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF, 389 trunc_check_cb, (void *)&size); 390} 391 392static int osc_io_setattr_start(const struct lu_env *env, 393 const struct cl_io_slice *slice) 394{ 395 struct cl_io *io = slice->cis_io; 396 struct osc_io *oio = cl2osc_io(env, slice); 397 struct cl_object *obj = slice->cis_obj; 398 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo; 399 struct cl_attr *attr = &osc_env_info(env)->oti_attr; 400 struct obdo *oa = &oio->oi_oa; 401 struct osc_async_cbargs *cbargs = &oio->oi_cbarg; 402 __u64 size = io->u.ci_setattr.sa_attr.lvb_size; 403 unsigned int ia_valid = io->u.ci_setattr.sa_valid; 404 int result = 0; 405 struct obd_info oinfo = { { { 0 } } }; 406 407 /* truncate cache dirty pages first */ 408 if (cl_io_is_trunc(io)) 409 result = osc_cache_truncate_start(env, oio, cl2osc(obj), size); 410 411 if (result == 0 && oio->oi_lockless == 0) { 412 cl_object_attr_lock(obj); 413 result = cl_object_attr_get(env, obj, attr); 414 if (result == 0) { 415 struct ost_lvb *lvb = &io->u.ci_setattr.sa_attr; 416 unsigned int cl_valid = 0; 417 418 if (ia_valid & ATTR_SIZE) { 419 attr->cat_size = attr->cat_kms = size; 420 cl_valid = (CAT_SIZE | CAT_KMS); 421 } 422 if (ia_valid & ATTR_MTIME_SET) { 423 attr->cat_mtime = lvb->lvb_mtime; 424 cl_valid |= CAT_MTIME; 425 } 426 if (ia_valid & ATTR_ATIME_SET) { 427 attr->cat_atime = lvb->lvb_atime; 428 cl_valid |= CAT_ATIME; 429 } 430 if (ia_valid & ATTR_CTIME_SET) { 431 attr->cat_ctime = lvb->lvb_ctime; 432 cl_valid |= CAT_CTIME; 433 } 434 result = cl_object_attr_set(env, obj, attr, cl_valid); 435 } 436 cl_object_attr_unlock(obj); 437 } 438 memset(oa, 0, sizeof(*oa)); 439 if (result == 0) { 440 oa->o_oi = loi->loi_oi; 441 oa->o_mtime = attr->cat_mtime; 442 oa->o_atime = attr->cat_atime; 443 oa->o_ctime = attr->cat_ctime; 444 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLATIME | 445 OBD_MD_FLCTIME | OBD_MD_FLMTIME; 446 if (ia_valid & ATTR_SIZE) { 447 oa->o_size = size; 448 oa->o_blocks = OBD_OBJECT_EOF; 449 oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; 450 451 if (oio->oi_lockless) { 452 oa->o_flags = OBD_FL_SRVLOCK; 453 oa->o_valid |= OBD_MD_FLFLAGS; 454 } 455 } else { 456 LASSERT(oio->oi_lockless == 0); 457 } 458 459 oinfo.oi_oa = oa; 460 oinfo.oi_capa = io->u.ci_setattr.sa_capa; 461 init_completion(&cbargs->opc_sync); 462 463 if (ia_valid & ATTR_SIZE) 464 result = osc_punch_base(osc_export(cl2osc(obj)), 465 &oinfo, osc_async_upcall, 466 cbargs, PTLRPCD_SET); 467 else 468 result = osc_setattr_async_base(osc_export(cl2osc(obj)), 469 &oinfo, NULL, 470 osc_async_upcall, 471 cbargs, PTLRPCD_SET); 472 cbargs->opc_rpc_sent = result == 0; 473 } 474 return result; 475} 476 477static void osc_io_setattr_end(const struct lu_env *env, 478 const struct cl_io_slice *slice) 479{ 480 struct cl_io *io = slice->cis_io; 481 struct osc_io *oio = cl2osc_io(env, slice); 482 struct cl_object *obj = slice->cis_obj; 483 struct osc_async_cbargs *cbargs = &oio->oi_cbarg; 484 int result = 0; 485 486 if (cbargs->opc_rpc_sent) { 487 wait_for_completion(&cbargs->opc_sync); 488 result = io->ci_result = cbargs->opc_rc; 489 } 490 if (result == 0) { 491 if (oio->oi_lockless) { 492 /* lockless truncate */ 493 struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev); 494 495 LASSERT(cl_io_is_trunc(io)); 496 /* XXX: Need a lock. */ 497 osd->od_stats.os_lockless_truncates++; 498 } 499 } 500 501 if (cl_io_is_trunc(io)) { 502 __u64 size = io->u.ci_setattr.sa_attr.lvb_size; 503 504 osc_trunc_check(env, io, oio, size); 505 if (oio->oi_trunc != NULL) { 506 osc_cache_truncate_end(env, oio, cl2osc(obj)); 507 oio->oi_trunc = NULL; 508 } 509 } 510} 511 512static int osc_io_read_start(const struct lu_env *env, 513 const struct cl_io_slice *slice) 514{ 515 struct cl_object *obj = slice->cis_obj; 516 struct cl_attr *attr = &osc_env_info(env)->oti_attr; 517 int rc = 0; 518 519 if (!slice->cis_io->ci_noatime) { 520 cl_object_attr_lock(obj); 521 attr->cat_atime = LTIME_S(CURRENT_TIME); 522 rc = cl_object_attr_set(env, obj, attr, CAT_ATIME); 523 cl_object_attr_unlock(obj); 524 } 525 return rc; 526} 527 528static int osc_io_write_start(const struct lu_env *env, 529 const struct cl_io_slice *slice) 530{ 531 struct cl_object *obj = slice->cis_obj; 532 struct cl_attr *attr = &osc_env_info(env)->oti_attr; 533 int rc = 0; 534 535 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_SETTIME, 1); 536 cl_object_attr_lock(obj); 537 attr->cat_mtime = attr->cat_ctime = LTIME_S(CURRENT_TIME); 538 rc = cl_object_attr_set(env, obj, attr, CAT_MTIME | CAT_CTIME); 539 cl_object_attr_unlock(obj); 540 541 return rc; 542} 543 544static int osc_fsync_ost(const struct lu_env *env, struct osc_object *obj, 545 struct cl_fsync_io *fio) 546{ 547 struct osc_io *oio = osc_env_io(env); 548 struct obdo *oa = &oio->oi_oa; 549 struct obd_info *oinfo = &oio->oi_info; 550 struct lov_oinfo *loi = obj->oo_oinfo; 551 struct osc_async_cbargs *cbargs = &oio->oi_cbarg; 552 int rc = 0; 553 554 memset(oa, 0, sizeof(*oa)); 555 oa->o_oi = loi->loi_oi; 556 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; 557 558 /* reload size abd blocks for start and end of sync range */ 559 oa->o_size = fio->fi_start; 560 oa->o_blocks = fio->fi_end; 561 oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; 562 563 obdo_set_parent_fid(oa, fio->fi_fid); 564 565 memset(oinfo, 0, sizeof(*oinfo)); 566 oinfo->oi_oa = oa; 567 oinfo->oi_capa = fio->fi_capa; 568 init_completion(&cbargs->opc_sync); 569 570 rc = osc_sync_base(osc_export(obj), oinfo, osc_async_upcall, cbargs, 571 PTLRPCD_SET); 572 return rc; 573} 574 575static int osc_io_fsync_start(const struct lu_env *env, 576 const struct cl_io_slice *slice) 577{ 578 struct cl_io *io = slice->cis_io; 579 struct cl_fsync_io *fio = &io->u.ci_fsync; 580 struct cl_object *obj = slice->cis_obj; 581 struct osc_object *osc = cl2osc(obj); 582 pgoff_t start = cl_index(obj, fio->fi_start); 583 pgoff_t end = cl_index(obj, fio->fi_end); 584 int result = 0; 585 586 if (fio->fi_end == OBD_OBJECT_EOF) 587 end = CL_PAGE_EOF; 588 589 result = osc_cache_writeback_range(env, osc, start, end, 0, 590 fio->fi_mode == CL_FSYNC_DISCARD); 591 if (result > 0) { 592 fio->fi_nr_written += result; 593 result = 0; 594 } 595 if (fio->fi_mode == CL_FSYNC_ALL) { 596 int rc; 597 598 /* we have to wait for writeback to finish before we can 599 * send OST_SYNC RPC. This is bad because it causes extents 600 * to be written osc by osc. However, we usually start 601 * writeback before CL_FSYNC_ALL so this won't have any real 602 * problem. */ 603 rc = osc_cache_wait_range(env, osc, start, end); 604 if (result == 0) 605 result = rc; 606 rc = osc_fsync_ost(env, osc, fio); 607 if (result == 0) 608 result = rc; 609 } 610 611 return result; 612} 613 614static void osc_io_fsync_end(const struct lu_env *env, 615 const struct cl_io_slice *slice) 616{ 617 struct cl_fsync_io *fio = &slice->cis_io->u.ci_fsync; 618 struct cl_object *obj = slice->cis_obj; 619 pgoff_t start = cl_index(obj, fio->fi_start); 620 pgoff_t end = cl_index(obj, fio->fi_end); 621 int result = 0; 622 623 if (fio->fi_mode == CL_FSYNC_LOCAL) { 624 result = osc_cache_wait_range(env, cl2osc(obj), start, end); 625 } else if (fio->fi_mode == CL_FSYNC_ALL) { 626 struct osc_io *oio = cl2osc_io(env, slice); 627 struct osc_async_cbargs *cbargs = &oio->oi_cbarg; 628 629 wait_for_completion(&cbargs->opc_sync); 630 if (result == 0) 631 result = cbargs->opc_rc; 632 } 633 slice->cis_io->ci_result = result; 634} 635 636static void osc_io_end(const struct lu_env *env, 637 const struct cl_io_slice *slice) 638{ 639 struct osc_io *oio = cl2osc_io(env, slice); 640 641 if (oio->oi_active) { 642 osc_extent_release(env, oio->oi_active); 643 oio->oi_active = NULL; 644 } 645} 646 647static const struct cl_io_operations osc_io_ops = { 648 .op = { 649 [CIT_READ] = { 650 .cio_start = osc_io_read_start, 651 .cio_fini = osc_io_fini 652 }, 653 [CIT_WRITE] = { 654 .cio_start = osc_io_write_start, 655 .cio_end = osc_io_end, 656 .cio_fini = osc_io_fini 657 }, 658 [CIT_SETATTR] = { 659 .cio_start = osc_io_setattr_start, 660 .cio_end = osc_io_setattr_end 661 }, 662 [CIT_FAULT] = { 663 .cio_start = osc_io_fault_start, 664 .cio_end = osc_io_end, 665 .cio_fini = osc_io_fini 666 }, 667 [CIT_FSYNC] = { 668 .cio_start = osc_io_fsync_start, 669 .cio_end = osc_io_fsync_end, 670 .cio_fini = osc_io_fini 671 }, 672 [CIT_MISC] = { 673 .cio_fini = osc_io_fini 674 } 675 }, 676 .req_op = { 677 [CRT_READ] = { 678 .cio_submit = osc_io_submit 679 }, 680 [CRT_WRITE] = { 681 .cio_submit = osc_io_submit 682 } 683 }, 684 .cio_prepare_write = osc_io_prepare_write, 685 .cio_commit_write = osc_io_commit_write 686}; 687 688/***************************************************************************** 689 * 690 * Transfer operations. 691 * 692 */ 693 694static int osc_req_prep(const struct lu_env *env, 695 const struct cl_req_slice *slice) 696{ 697 return 0; 698} 699 700static void osc_req_completion(const struct lu_env *env, 701 const struct cl_req_slice *slice, int ioret) 702{ 703 struct osc_req *or; 704 705 or = cl2osc_req(slice); 706 OBD_SLAB_FREE_PTR(or, osc_req_kmem); 707} 708 709/** 710 * Implementation of struct cl_req_operations::cro_attr_set() for osc 711 * layer. osc is responsible for struct obdo::o_id and struct obdo::o_seq 712 * fields. 713 */ 714static void osc_req_attr_set(const struct lu_env *env, 715 const struct cl_req_slice *slice, 716 const struct cl_object *obj, 717 struct cl_req_attr *attr, u64 flags) 718{ 719 struct lov_oinfo *oinfo; 720 struct cl_req *clerq; 721 struct cl_page *apage; /* _some_ page in @clerq */ 722 struct cl_lock *lock; /* _some_ lock protecting @apage */ 723 struct osc_lock *olck; 724 struct osc_page *opg; 725 struct obdo *oa; 726 struct ost_lvb *lvb; 727 728 oinfo = cl2osc(obj)->oo_oinfo; 729 lvb = &oinfo->loi_lvb; 730 oa = attr->cra_oa; 731 732 if ((flags & OBD_MD_FLMTIME) != 0) { 733 oa->o_mtime = lvb->lvb_mtime; 734 oa->o_valid |= OBD_MD_FLMTIME; 735 } 736 if ((flags & OBD_MD_FLATIME) != 0) { 737 oa->o_atime = lvb->lvb_atime; 738 oa->o_valid |= OBD_MD_FLATIME; 739 } 740 if ((flags & OBD_MD_FLCTIME) != 0) { 741 oa->o_ctime = lvb->lvb_ctime; 742 oa->o_valid |= OBD_MD_FLCTIME; 743 } 744 if (flags & OBD_MD_FLGROUP) { 745 ostid_set_seq(&oa->o_oi, ostid_seq(&oinfo->loi_oi)); 746 oa->o_valid |= OBD_MD_FLGROUP; 747 } 748 if (flags & OBD_MD_FLID) { 749 ostid_set_id(&oa->o_oi, ostid_id(&oinfo->loi_oi)); 750 oa->o_valid |= OBD_MD_FLID; 751 } 752 if (flags & OBD_MD_FLHANDLE) { 753 clerq = slice->crs_req; 754 LASSERT(!list_empty(&clerq->crq_pages)); 755 apage = container_of(clerq->crq_pages.next, 756 struct cl_page, cp_flight); 757 opg = osc_cl_page_osc(apage); 758 apage = opg->ops_cl.cpl_page; /* now apage is a sub-page */ 759 lock = cl_lock_at_page(env, apage->cp_obj, apage, NULL, 1, 1); 760 if (lock == NULL) { 761 struct cl_object_header *head; 762 struct cl_lock *scan; 763 764 head = cl_object_header(apage->cp_obj); 765 list_for_each_entry(scan, &head->coh_locks, 766 cll_linkage) 767 CL_LOCK_DEBUG(D_ERROR, env, scan, 768 "no cover page!\n"); 769 CL_PAGE_DEBUG(D_ERROR, env, apage, 770 "dump uncover page!\n"); 771 dump_stack(); 772 LBUG(); 773 } 774 775 olck = osc_lock_at(lock); 776 LASSERT(olck != NULL); 777 LASSERT(ergo(opg->ops_srvlock, olck->ols_lock == NULL)); 778 /* check for lockless io. */ 779 if (olck->ols_lock != NULL) { 780 oa->o_handle = olck->ols_lock->l_remote_handle; 781 oa->o_valid |= OBD_MD_FLHANDLE; 782 } 783 cl_lock_put(env, lock); 784 } 785} 786 787static const struct cl_req_operations osc_req_ops = { 788 .cro_prep = osc_req_prep, 789 .cro_attr_set = osc_req_attr_set, 790 .cro_completion = osc_req_completion 791}; 792 793 794int osc_io_init(const struct lu_env *env, 795 struct cl_object *obj, struct cl_io *io) 796{ 797 struct osc_io *oio = osc_env_io(env); 798 799 CL_IO_SLICE_CLEAN(oio, oi_cl); 800 cl_io_slice_add(io, &oio->oi_cl, obj, &osc_io_ops); 801 return 0; 802} 803 804int osc_req_init(const struct lu_env *env, struct cl_device *dev, 805 struct cl_req *req) 806{ 807 struct osc_req *or; 808 int result; 809 810 OBD_SLAB_ALLOC_PTR_GFP(or, osc_req_kmem, GFP_NOFS); 811 if (or != NULL) { 812 cl_req_slice_add(req, &or->or_cl, dev, &osc_req_ops); 813 result = 0; 814 } else 815 result = -ENOMEM; 816 return result; 817} 818 819/** @} osc */ 820