1/* 2 * GPL HEADER START 3 * 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License version 2 only, 8 * as published by the Free Software Foundation. 9 * 10 * This program is distributed in the hope that it will be useful, but 11 * WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * General Public License version 2 for more details (a copy is included 14 * in the LICENSE file that accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License 17 * version 2 along with this program; If not, see 18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf 19 * 20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 21 * CA 95054 USA or visit www.sun.com if you need additional information or 22 * have any questions. 23 * 24 * GPL HEADER END 25 */ 26/* 27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. 28 * Use is subject to license terms. 29 * 30 * Copyright (c) 2011, 2012, Intel Corporation. 31 */ 32/* 33 * This file is part of Lustre, http://www.lustre.org/ 34 * Lustre is a trademark of Sun Microsystems, Inc. 35 * 36 * Implementation of cl_lock for OSC layer. 37 * 38 * Author: Nikita Danilov <nikita.danilov@sun.com> 39 */ 40 41#define DEBUG_SUBSYSTEM S_OSC 42 43#include "../../include/linux/libcfs/libcfs.h" 44/* fid_build_reg_res_name() */ 45#include "../include/lustre_fid.h" 46 47#include "osc_cl_internal.h" 48 49/** \addtogroup osc 50 * @{ 51 */ 52 53#define _PAGEREF_MAGIC (-10000000) 54 55/***************************************************************************** 56 * 57 * Type conversions. 58 * 59 */ 60 61static const struct cl_lock_operations osc_lock_ops; 62static const struct cl_lock_operations osc_lock_lockless_ops; 63static void osc_lock_to_lockless(const struct lu_env *env, 64 struct osc_lock *ols, int force); 65static int osc_lock_has_pages(struct osc_lock *olck); 66 67int osc_lock_is_lockless(const struct osc_lock *olck) 68{ 69 return (olck->ols_cl.cls_ops == &osc_lock_lockless_ops); 70} 71 72/** 73 * Returns a weak pointer to the ldlm lock identified by a handle. Returned 74 * pointer cannot be dereferenced, as lock is not protected from concurrent 75 * reclaim. This function is a helper for osc_lock_invariant(). 76 */ 77static struct ldlm_lock *osc_handle_ptr(struct lustre_handle *handle) 78{ 79 struct ldlm_lock *lock; 80 81 lock = ldlm_handle2lock(handle); 82 if (lock != NULL) 83 LDLM_LOCK_PUT(lock); 84 return lock; 85} 86 87/** 88 * Invariant that has to be true all of the time. 89 */ 90static int osc_lock_invariant(struct osc_lock *ols) 91{ 92 struct ldlm_lock *lock = osc_handle_ptr(&ols->ols_handle); 93 struct ldlm_lock *olock = ols->ols_lock; 94 int handle_used = lustre_handle_is_used(&ols->ols_handle); 95 96 if (ergo(osc_lock_is_lockless(ols), 97 ols->ols_locklessable && ols->ols_lock == NULL)) 98 return 1; 99 100 /* 101 * If all the following "ergo"s are true, return 1, otherwise 0 102 */ 103 if (! ergo(olock != NULL, handle_used)) 104 return 0; 105 106 if (! ergo(olock != NULL, 107 olock->l_handle.h_cookie == ols->ols_handle.cookie)) 108 return 0; 109 110 if (! ergo(handle_used, 111 ergo(lock != NULL && olock != NULL, lock == olock) && 112 ergo(lock == NULL, olock == NULL))) 113 return 0; 114 /* 115 * Check that ->ols_handle and ->ols_lock are consistent, but 116 * take into account that they are set at the different time. 117 */ 118 if (! ergo(ols->ols_state == OLS_CANCELLED, 119 olock == NULL && !handle_used)) 120 return 0; 121 /* 122 * DLM lock is destroyed only after we have seen cancellation 123 * ast. 124 */ 125 if (! ergo(olock != NULL && ols->ols_state < OLS_CANCELLED, 126 ((olock->l_flags & LDLM_FL_DESTROYED) == 0))) 127 return 0; 128 129 if (! ergo(ols->ols_state == OLS_GRANTED, 130 olock != NULL && 131 olock->l_req_mode == olock->l_granted_mode && 132 ols->ols_hold)) 133 return 0; 134 return 1; 135} 136 137/***************************************************************************** 138 * 139 * Lock operations. 140 * 141 */ 142 143/** 144 * Breaks a link between osc_lock and dlm_lock. 145 */ 146static void osc_lock_detach(const struct lu_env *env, struct osc_lock *olck) 147{ 148 struct ldlm_lock *dlmlock; 149 150 spin_lock(&osc_ast_guard); 151 dlmlock = olck->ols_lock; 152 if (dlmlock == NULL) { 153 spin_unlock(&osc_ast_guard); 154 return; 155 } 156 157 olck->ols_lock = NULL; 158 /* wb(); --- for all who checks (ols->ols_lock != NULL) before 159 * call to osc_lock_detach() */ 160 dlmlock->l_ast_data = NULL; 161 olck->ols_handle.cookie = 0ULL; 162 spin_unlock(&osc_ast_guard); 163 164 lock_res_and_lock(dlmlock); 165 if (dlmlock->l_granted_mode == dlmlock->l_req_mode) { 166 struct cl_object *obj = olck->ols_cl.cls_obj; 167 struct cl_attr *attr = &osc_env_info(env)->oti_attr; 168 __u64 old_kms; 169 170 cl_object_attr_lock(obj); 171 /* Must get the value under the lock to avoid possible races. */ 172 old_kms = cl2osc(obj)->oo_oinfo->loi_kms; 173 /* Update the kms. Need to loop all granted locks. 174 * Not a problem for the client */ 175 attr->cat_kms = ldlm_extent_shift_kms(dlmlock, old_kms); 176 177 cl_object_attr_set(env, obj, attr, CAT_KMS); 178 cl_object_attr_unlock(obj); 179 } 180 unlock_res_and_lock(dlmlock); 181 182 /* release a reference taken in osc_lock_upcall0(). */ 183 LASSERT(olck->ols_has_ref); 184 lu_ref_del(&dlmlock->l_reference, "osc_lock", olck); 185 LDLM_LOCK_RELEASE(dlmlock); 186 olck->ols_has_ref = 0; 187} 188 189static int osc_lock_unhold(struct osc_lock *ols) 190{ 191 int result = 0; 192 193 if (ols->ols_hold) { 194 ols->ols_hold = 0; 195 result = osc_cancel_base(&ols->ols_handle, 196 ols->ols_einfo.ei_mode); 197 } 198 return result; 199} 200 201static int osc_lock_unuse(const struct lu_env *env, 202 const struct cl_lock_slice *slice) 203{ 204 struct osc_lock *ols = cl2osc_lock(slice); 205 206 LINVRNT(osc_lock_invariant(ols)); 207 208 switch (ols->ols_state) { 209 case OLS_NEW: 210 LASSERT(!ols->ols_hold); 211 LASSERT(ols->ols_agl); 212 return 0; 213 case OLS_UPCALL_RECEIVED: 214 osc_lock_unhold(ols); 215 case OLS_ENQUEUED: 216 LASSERT(!ols->ols_hold); 217 osc_lock_detach(env, ols); 218 ols->ols_state = OLS_NEW; 219 return 0; 220 case OLS_GRANTED: 221 LASSERT(!ols->ols_glimpse); 222 LASSERT(ols->ols_hold); 223 /* 224 * Move lock into OLS_RELEASED state before calling 225 * osc_cancel_base() so that possible synchronous cancellation 226 * (that always happens e.g., for liblustre) sees that lock is 227 * released. 228 */ 229 ols->ols_state = OLS_RELEASED; 230 return osc_lock_unhold(ols); 231 default: 232 CERROR("Impossible state: %d\n", ols->ols_state); 233 LBUG(); 234 } 235} 236 237static void osc_lock_fini(const struct lu_env *env, 238 struct cl_lock_slice *slice) 239{ 240 struct osc_lock *ols = cl2osc_lock(slice); 241 242 LINVRNT(osc_lock_invariant(ols)); 243 /* 244 * ->ols_hold can still be true at this point if, for example, a 245 * thread that requested a lock was killed (and released a reference 246 * to the lock), before reply from a server was received. In this case 247 * lock is destroyed immediately after upcall. 248 */ 249 osc_lock_unhold(ols); 250 LASSERT(ols->ols_lock == NULL); 251 LASSERT(atomic_read(&ols->ols_pageref) == 0 || 252 atomic_read(&ols->ols_pageref) == _PAGEREF_MAGIC); 253 254 OBD_SLAB_FREE_PTR(ols, osc_lock_kmem); 255} 256 257static void osc_lock_build_policy(const struct lu_env *env, 258 const struct cl_lock *lock, 259 ldlm_policy_data_t *policy) 260{ 261 const struct cl_lock_descr *d = &lock->cll_descr; 262 263 osc_index2policy(policy, d->cld_obj, d->cld_start, d->cld_end); 264 policy->l_extent.gid = d->cld_gid; 265} 266 267static __u64 osc_enq2ldlm_flags(__u32 enqflags) 268{ 269 __u64 result = 0; 270 271 LASSERT((enqflags & ~CEF_MASK) == 0); 272 273 if (enqflags & CEF_NONBLOCK) 274 result |= LDLM_FL_BLOCK_NOWAIT; 275 if (enqflags & CEF_ASYNC) 276 result |= LDLM_FL_HAS_INTENT; 277 if (enqflags & CEF_DISCARD_DATA) 278 result |= LDLM_FL_AST_DISCARD_DATA; 279 return result; 280} 281 282/** 283 * Global spin-lock protecting consistency of ldlm_lock::l_ast_data 284 * pointers. Initialized in osc_init(). 285 */ 286spinlock_t osc_ast_guard; 287 288static struct osc_lock *osc_ast_data_get(struct ldlm_lock *dlm_lock) 289{ 290 struct osc_lock *olck; 291 292 lock_res_and_lock(dlm_lock); 293 spin_lock(&osc_ast_guard); 294 olck = dlm_lock->l_ast_data; 295 if (olck != NULL) { 296 struct cl_lock *lock = olck->ols_cl.cls_lock; 297 /* 298 * If osc_lock holds a reference on ldlm lock, return it even 299 * when cl_lock is in CLS_FREEING state. This way 300 * 301 * osc_ast_data_get(dlmlock) == NULL 302 * 303 * guarantees that all osc references on dlmlock were 304 * released. osc_dlm_blocking_ast0() relies on that. 305 */ 306 if (lock->cll_state < CLS_FREEING || olck->ols_has_ref) { 307 cl_lock_get_trust(lock); 308 lu_ref_add_atomic(&lock->cll_reference, 309 "ast", current); 310 } else 311 olck = NULL; 312 } 313 spin_unlock(&osc_ast_guard); 314 unlock_res_and_lock(dlm_lock); 315 return olck; 316} 317 318static void osc_ast_data_put(const struct lu_env *env, struct osc_lock *olck) 319{ 320 struct cl_lock *lock; 321 322 lock = olck->ols_cl.cls_lock; 323 lu_ref_del(&lock->cll_reference, "ast", current); 324 cl_lock_put(env, lock); 325} 326 327/** 328 * Updates object attributes from a lock value block (lvb) received together 329 * with the DLM lock reply from the server. Copy of osc_update_enqueue() 330 * logic. 331 * 332 * This can be optimized to not update attributes when lock is a result of a 333 * local match. 334 * 335 * Called under lock and resource spin-locks. 336 */ 337static void osc_lock_lvb_update(const struct lu_env *env, struct osc_lock *olck, 338 int rc) 339{ 340 struct ost_lvb *lvb; 341 struct cl_object *obj; 342 struct lov_oinfo *oinfo; 343 struct cl_attr *attr; 344 unsigned valid; 345 346 if (!(olck->ols_flags & LDLM_FL_LVB_READY)) 347 return; 348 349 lvb = &olck->ols_lvb; 350 obj = olck->ols_cl.cls_obj; 351 oinfo = cl2osc(obj)->oo_oinfo; 352 attr = &osc_env_info(env)->oti_attr; 353 valid = CAT_BLOCKS | CAT_ATIME | CAT_CTIME | CAT_MTIME | CAT_SIZE; 354 cl_lvb2attr(attr, lvb); 355 356 cl_object_attr_lock(obj); 357 if (rc == 0) { 358 struct ldlm_lock *dlmlock; 359 __u64 size; 360 361 dlmlock = olck->ols_lock; 362 LASSERT(dlmlock != NULL); 363 364 /* re-grab LVB from a dlm lock under DLM spin-locks. */ 365 *lvb = *(struct ost_lvb *)dlmlock->l_lvb_data; 366 size = lvb->lvb_size; 367 /* Extend KMS up to the end of this lock and no further 368 * A lock on [x,y] means a KMS of up to y + 1 bytes! */ 369 if (size > dlmlock->l_policy_data.l_extent.end) 370 size = dlmlock->l_policy_data.l_extent.end + 1; 371 if (size >= oinfo->loi_kms) { 372 LDLM_DEBUG(dlmlock, "lock acquired, setting rss=%llu, kms=%llu", 373 lvb->lvb_size, size); 374 valid |= CAT_KMS; 375 attr->cat_kms = size; 376 } else { 377 LDLM_DEBUG(dlmlock, "lock acquired, setting rss=%llu; leaving kms=%llu, end=%llu", 378 lvb->lvb_size, oinfo->loi_kms, 379 dlmlock->l_policy_data.l_extent.end); 380 } 381 ldlm_lock_allow_match_locked(dlmlock); 382 } else if (rc == -ENAVAIL && olck->ols_glimpse) { 383 CDEBUG(D_INODE, "glimpsed, setting rss=%llu; leaving kms=%llu\n", 384 lvb->lvb_size, oinfo->loi_kms); 385 } else 386 valid = 0; 387 388 if (valid != 0) 389 cl_object_attr_set(env, obj, attr, valid); 390 391 cl_object_attr_unlock(obj); 392} 393 394/** 395 * Called when a lock is granted, from an upcall (when server returned a 396 * granted lock), or from completion AST, when server returned a blocked lock. 397 * 398 * Called under lock and resource spin-locks, that are released temporarily 399 * here. 400 */ 401static void osc_lock_granted(const struct lu_env *env, struct osc_lock *olck, 402 struct ldlm_lock *dlmlock, int rc) 403{ 404 struct ldlm_extent *ext; 405 struct cl_lock *lock; 406 struct cl_lock_descr *descr; 407 408 LASSERT(dlmlock->l_granted_mode == dlmlock->l_req_mode); 409 410 if (olck->ols_state < OLS_GRANTED) { 411 lock = olck->ols_cl.cls_lock; 412 ext = &dlmlock->l_policy_data.l_extent; 413 descr = &osc_env_info(env)->oti_descr; 414 descr->cld_obj = lock->cll_descr.cld_obj; 415 416 /* XXX check that ->l_granted_mode is valid. */ 417 descr->cld_mode = osc_ldlm2cl_lock(dlmlock->l_granted_mode); 418 descr->cld_start = cl_index(descr->cld_obj, ext->start); 419 descr->cld_end = cl_index(descr->cld_obj, ext->end); 420 descr->cld_gid = ext->gid; 421 /* 422 * tell upper layers the extent of the lock that was actually 423 * granted 424 */ 425 olck->ols_state = OLS_GRANTED; 426 osc_lock_lvb_update(env, olck, rc); 427 428 /* release DLM spin-locks to allow cl_lock_{modify,signal}() 429 * to take a semaphore on a parent lock. This is safe, because 430 * spin-locks are needed to protect consistency of 431 * dlmlock->l_*_mode and LVB, and we have finished processing 432 * them. */ 433 unlock_res_and_lock(dlmlock); 434 cl_lock_modify(env, lock, descr); 435 cl_lock_signal(env, lock); 436 LINVRNT(osc_lock_invariant(olck)); 437 lock_res_and_lock(dlmlock); 438 } 439} 440 441static void osc_lock_upcall0(const struct lu_env *env, struct osc_lock *olck) 442 443{ 444 struct ldlm_lock *dlmlock; 445 446 dlmlock = ldlm_handle2lock_long(&olck->ols_handle, 0); 447 LASSERT(dlmlock != NULL); 448 449 lock_res_and_lock(dlmlock); 450 spin_lock(&osc_ast_guard); 451 LASSERT(dlmlock->l_ast_data == olck); 452 LASSERT(olck->ols_lock == NULL); 453 olck->ols_lock = dlmlock; 454 spin_unlock(&osc_ast_guard); 455 456 /* 457 * Lock might be not yet granted. In this case, completion ast 458 * (osc_ldlm_completion_ast()) comes later and finishes lock 459 * granting. 460 */ 461 if (dlmlock->l_granted_mode == dlmlock->l_req_mode) 462 osc_lock_granted(env, olck, dlmlock, 0); 463 unlock_res_and_lock(dlmlock); 464 465 /* 466 * osc_enqueue_interpret() decrefs asynchronous locks, counter 467 * this. 468 */ 469 ldlm_lock_addref(&olck->ols_handle, olck->ols_einfo.ei_mode); 470 olck->ols_hold = 1; 471 472 /* lock reference taken by ldlm_handle2lock_long() is owned by 473 * osc_lock and released in osc_lock_detach() */ 474 lu_ref_add(&dlmlock->l_reference, "osc_lock", olck); 475 olck->ols_has_ref = 1; 476} 477 478/** 479 * Lock upcall function that is executed either when a reply to ENQUEUE rpc is 480 * received from a server, or after osc_enqueue_base() matched a local DLM 481 * lock. 482 */ 483static int osc_lock_upcall(void *cookie, int errcode) 484{ 485 struct osc_lock *olck = cookie; 486 struct cl_lock_slice *slice = &olck->ols_cl; 487 struct cl_lock *lock = slice->cls_lock; 488 struct lu_env *env; 489 struct cl_env_nest nest; 490 491 env = cl_env_nested_get(&nest); 492 if (!IS_ERR(env)) { 493 int rc; 494 495 cl_lock_mutex_get(env, lock); 496 497 LASSERT(lock->cll_state >= CLS_QUEUING); 498 if (olck->ols_state == OLS_ENQUEUED) { 499 olck->ols_state = OLS_UPCALL_RECEIVED; 500 rc = ldlm_error2errno(errcode); 501 } else if (olck->ols_state == OLS_CANCELLED) { 502 rc = -EIO; 503 } else { 504 CERROR("Impossible state: %d\n", olck->ols_state); 505 LBUG(); 506 } 507 if (rc) { 508 struct ldlm_lock *dlmlock; 509 510 dlmlock = ldlm_handle2lock(&olck->ols_handle); 511 if (dlmlock != NULL) { 512 lock_res_and_lock(dlmlock); 513 spin_lock(&osc_ast_guard); 514 LASSERT(olck->ols_lock == NULL); 515 dlmlock->l_ast_data = NULL; 516 olck->ols_handle.cookie = 0ULL; 517 spin_unlock(&osc_ast_guard); 518 ldlm_lock_fail_match_locked(dlmlock); 519 unlock_res_and_lock(dlmlock); 520 LDLM_LOCK_PUT(dlmlock); 521 } 522 } else { 523 if (olck->ols_glimpse) 524 olck->ols_glimpse = 0; 525 osc_lock_upcall0(env, olck); 526 } 527 528 /* Error handling, some errors are tolerable. */ 529 if (olck->ols_locklessable && rc == -EUSERS) { 530 /* This is a tolerable error, turn this lock into 531 * lockless lock. 532 */ 533 osc_object_set_contended(cl2osc(slice->cls_obj)); 534 LASSERT(slice->cls_ops == &osc_lock_ops); 535 536 /* Change this lock to ldlmlock-less lock. */ 537 osc_lock_to_lockless(env, olck, 1); 538 olck->ols_state = OLS_GRANTED; 539 rc = 0; 540 } else if (olck->ols_glimpse && rc == -ENAVAIL) { 541 osc_lock_lvb_update(env, olck, rc); 542 cl_lock_delete(env, lock); 543 /* Hide the error. */ 544 rc = 0; 545 } 546 547 if (rc == 0) { 548 /* For AGL case, the RPC sponsor may exits the cl_lock 549 * processing without wait() called before related OSC 550 * lock upcall(). So update the lock status according 551 * to the enqueue result inside AGL upcall(). */ 552 if (olck->ols_agl) { 553 lock->cll_flags |= CLF_FROM_UPCALL; 554 cl_wait_try(env, lock); 555 lock->cll_flags &= ~CLF_FROM_UPCALL; 556 if (!olck->ols_glimpse) 557 olck->ols_agl = 0; 558 } 559 cl_lock_signal(env, lock); 560 /* del user for lock upcall cookie */ 561 cl_unuse_try(env, lock); 562 } else { 563 /* del user for lock upcall cookie */ 564 cl_lock_user_del(env, lock); 565 cl_lock_error(env, lock, rc); 566 } 567 568 /* release cookie reference, acquired by osc_lock_enqueue() */ 569 cl_lock_hold_release(env, lock, "upcall", lock); 570 cl_lock_mutex_put(env, lock); 571 572 lu_ref_del(&lock->cll_reference, "upcall", lock); 573 /* This maybe the last reference, so must be called after 574 * cl_lock_mutex_put(). */ 575 cl_lock_put(env, lock); 576 577 cl_env_nested_put(&nest, env); 578 } else { 579 /* should never happen, similar to osc_ldlm_blocking_ast(). */ 580 LBUG(); 581 } 582 return errcode; 583} 584 585/** 586 * Core of osc_dlm_blocking_ast() logic. 587 */ 588static void osc_lock_blocking(const struct lu_env *env, 589 struct ldlm_lock *dlmlock, 590 struct osc_lock *olck, int blocking) 591{ 592 struct cl_lock *lock = olck->ols_cl.cls_lock; 593 594 LASSERT(olck->ols_lock == dlmlock); 595 CLASSERT(OLS_BLOCKED < OLS_CANCELLED); 596 LASSERT(!osc_lock_is_lockless(olck)); 597 598 /* 599 * Lock might be still addref-ed here, if e.g., blocking ast 600 * is sent for a failed lock. 601 */ 602 osc_lock_unhold(olck); 603 604 if (blocking && olck->ols_state < OLS_BLOCKED) 605 /* 606 * Move osc_lock into OLS_BLOCKED before canceling the lock, 607 * because it recursively re-enters osc_lock_blocking(), with 608 * the state set to OLS_CANCELLED. 609 */ 610 olck->ols_state = OLS_BLOCKED; 611 /* 612 * cancel and destroy lock at least once no matter how blocking ast is 613 * entered (see comment above osc_ldlm_blocking_ast() for use 614 * cases). cl_lock_cancel() and cl_lock_delete() are idempotent. 615 */ 616 cl_lock_cancel(env, lock); 617 cl_lock_delete(env, lock); 618} 619 620/** 621 * Helper for osc_dlm_blocking_ast() handling discrepancies between cl_lock 622 * and ldlm_lock caches. 623 */ 624static int osc_dlm_blocking_ast0(const struct lu_env *env, 625 struct ldlm_lock *dlmlock, 626 void *data, int flag) 627{ 628 struct osc_lock *olck; 629 struct cl_lock *lock; 630 int result; 631 int cancel; 632 633 LASSERT(flag == LDLM_CB_BLOCKING || flag == LDLM_CB_CANCELING); 634 635 cancel = 0; 636 olck = osc_ast_data_get(dlmlock); 637 if (olck != NULL) { 638 lock = olck->ols_cl.cls_lock; 639 cl_lock_mutex_get(env, lock); 640 LINVRNT(osc_lock_invariant(olck)); 641 if (olck->ols_ast_wait) { 642 /* wake up osc_lock_use() */ 643 cl_lock_signal(env, lock); 644 olck->ols_ast_wait = 0; 645 } 646 /* 647 * Lock might have been canceled while this thread was 648 * sleeping for lock mutex, but olck is pinned in memory. 649 */ 650 if (olck == dlmlock->l_ast_data) { 651 /* 652 * NOTE: DLM sends blocking AST's for failed locks 653 * (that are still in pre-OLS_GRANTED state) 654 * too, and they have to be canceled otherwise 655 * DLM lock is never destroyed and stuck in 656 * the memory. 657 * 658 * Alternatively, ldlm_cli_cancel() can be 659 * called here directly for osc_locks with 660 * ols_state < OLS_GRANTED to maintain an 661 * invariant that ->clo_cancel() is only called 662 * for locks that were granted. 663 */ 664 LASSERT(data == olck); 665 osc_lock_blocking(env, dlmlock, 666 olck, flag == LDLM_CB_BLOCKING); 667 } else 668 cancel = 1; 669 cl_lock_mutex_put(env, lock); 670 osc_ast_data_put(env, olck); 671 } else 672 /* 673 * DLM lock exists, but there is no cl_lock attached to it. 674 * This is a `normal' race. cl_object and its cl_lock's can be 675 * removed by memory pressure, together with all pages. 676 */ 677 cancel = (flag == LDLM_CB_BLOCKING); 678 679 if (cancel) { 680 struct lustre_handle *lockh; 681 682 lockh = &osc_env_info(env)->oti_handle; 683 ldlm_lock2handle(dlmlock, lockh); 684 result = ldlm_cli_cancel(lockh, LCF_ASYNC); 685 } else 686 result = 0; 687 return result; 688} 689 690/** 691 * Blocking ast invoked by ldlm when dlm lock is either blocking progress of 692 * some other lock, or is canceled. This function is installed as a 693 * ldlm_lock::l_blocking_ast() for client extent locks. 694 * 695 * Control flow is tricky, because ldlm uses the same call-back 696 * (ldlm_lock::l_blocking_ast()) for both blocking and cancellation ast's. 697 * 698 * \param dlmlock lock for which ast occurred. 699 * 700 * \param new description of a conflicting lock in case of blocking ast. 701 * 702 * \param data value of dlmlock->l_ast_data 703 * 704 * \param flag LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish 705 * cancellation and blocking ast's. 706 * 707 * Possible use cases: 708 * 709 * - ldlm calls dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING) to cancel 710 * lock due to lock lru pressure, or explicit user request to purge 711 * locks. 712 * 713 * - ldlm calls dlmlock->l_blocking_ast(..., LDLM_CB_BLOCKING) to notify 714 * us that dlmlock conflicts with another lock that some client is 715 * enqueing. Lock is canceled. 716 * 717 * - cl_lock_cancel() is called. osc_lock_cancel() calls 718 * ldlm_cli_cancel() that calls 719 * 720 * dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING) 721 * 722 * recursively entering osc_ldlm_blocking_ast(). 723 * 724 * - client cancels lock voluntary (e.g., as a part of early cancellation): 725 * 726 * cl_lock_cancel()-> 727 * osc_lock_cancel()-> 728 * ldlm_cli_cancel()-> 729 * dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING) 730 * 731 */ 732static int osc_ldlm_blocking_ast(struct ldlm_lock *dlmlock, 733 struct ldlm_lock_desc *new, void *data, 734 int flag) 735{ 736 struct lu_env *env; 737 struct cl_env_nest nest; 738 int result; 739 740 /* 741 * This can be called in the context of outer IO, e.g., 742 * 743 * cl_enqueue()->... 744 * ->osc_enqueue_base()->... 745 * ->ldlm_prep_elc_req()->... 746 * ->ldlm_cancel_callback()->... 747 * ->osc_ldlm_blocking_ast() 748 * 749 * new environment has to be created to not corrupt outer context. 750 */ 751 env = cl_env_nested_get(&nest); 752 if (!IS_ERR(env)) { 753 result = osc_dlm_blocking_ast0(env, dlmlock, data, flag); 754 cl_env_nested_put(&nest, env); 755 } else { 756 result = PTR_ERR(env); 757 /* 758 * XXX This should never happen, as cl_lock is 759 * stuck. Pre-allocated environment a la vvp_inode_fini_env 760 * should be used. 761 */ 762 LBUG(); 763 } 764 if (result != 0) { 765 if (result == -ENODATA) 766 result = 0; 767 else 768 CERROR("BAST failed: %d\n", result); 769 } 770 return result; 771} 772 773static int osc_ldlm_completion_ast(struct ldlm_lock *dlmlock, 774 __u64 flags, void *data) 775{ 776 struct cl_env_nest nest; 777 struct lu_env *env; 778 struct osc_lock *olck; 779 struct cl_lock *lock; 780 int result; 781 int dlmrc; 782 783 /* first, do dlm part of the work */ 784 dlmrc = ldlm_completion_ast_async(dlmlock, flags, data); 785 /* then, notify cl_lock */ 786 env = cl_env_nested_get(&nest); 787 if (!IS_ERR(env)) { 788 olck = osc_ast_data_get(dlmlock); 789 if (olck != NULL) { 790 lock = olck->ols_cl.cls_lock; 791 cl_lock_mutex_get(env, lock); 792 /* 793 * ldlm_handle_cp_callback() copied LVB from request 794 * to lock->l_lvb_data, store it in osc_lock. 795 */ 796 LASSERT(dlmlock->l_lvb_data != NULL); 797 lock_res_and_lock(dlmlock); 798 olck->ols_lvb = *(struct ost_lvb *)dlmlock->l_lvb_data; 799 if (olck->ols_lock == NULL) { 800 /* 801 * upcall (osc_lock_upcall()) hasn't yet been 802 * called. Do nothing now, upcall will bind 803 * olck to dlmlock and signal the waiters. 804 * 805 * This maintains an invariant that osc_lock 806 * and ldlm_lock are always bound when 807 * osc_lock is in OLS_GRANTED state. 808 */ 809 } else if (dlmlock->l_granted_mode == 810 dlmlock->l_req_mode) { 811 osc_lock_granted(env, olck, dlmlock, dlmrc); 812 } 813 unlock_res_and_lock(dlmlock); 814 815 if (dlmrc != 0) { 816 CL_LOCK_DEBUG(D_ERROR, env, lock, 817 "dlmlock returned %d\n", dlmrc); 818 cl_lock_error(env, lock, dlmrc); 819 } 820 cl_lock_mutex_put(env, lock); 821 osc_ast_data_put(env, olck); 822 result = 0; 823 } else 824 result = -ELDLM_NO_LOCK_DATA; 825 cl_env_nested_put(&nest, env); 826 } else 827 result = PTR_ERR(env); 828 return dlmrc ?: result; 829} 830 831static int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data) 832{ 833 struct ptlrpc_request *req = data; 834 struct osc_lock *olck; 835 struct cl_lock *lock; 836 struct cl_object *obj; 837 struct cl_env_nest nest; 838 struct lu_env *env; 839 struct ost_lvb *lvb; 840 struct req_capsule *cap; 841 int result; 842 843 LASSERT(lustre_msg_get_opc(req->rq_reqmsg) == LDLM_GL_CALLBACK); 844 845 env = cl_env_nested_get(&nest); 846 if (!IS_ERR(env)) { 847 /* osc_ast_data_get() has to go after environment is 848 * allocated, because osc_ast_data() acquires a 849 * reference to a lock, and it can only be released in 850 * environment. 851 */ 852 olck = osc_ast_data_get(dlmlock); 853 if (olck != NULL) { 854 lock = olck->ols_cl.cls_lock; 855 /* Do not grab the mutex of cl_lock for glimpse. 856 * See LU-1274 for details. 857 * BTW, it's okay for cl_lock to be cancelled during 858 * this period because server can handle this race. 859 * See ldlm_server_glimpse_ast() for details. 860 * cl_lock_mutex_get(env, lock); */ 861 cap = &req->rq_pill; 862 req_capsule_extend(cap, &RQF_LDLM_GL_CALLBACK); 863 req_capsule_set_size(cap, &RMF_DLM_LVB, RCL_SERVER, 864 sizeof(*lvb)); 865 result = req_capsule_server_pack(cap); 866 if (result == 0) { 867 lvb = req_capsule_server_get(cap, &RMF_DLM_LVB); 868 obj = lock->cll_descr.cld_obj; 869 result = cl_object_glimpse(env, obj, lvb); 870 } 871 if (!exp_connect_lvb_type(req->rq_export)) 872 req_capsule_shrink(&req->rq_pill, 873 &RMF_DLM_LVB, 874 sizeof(struct ost_lvb_v1), 875 RCL_SERVER); 876 osc_ast_data_put(env, olck); 877 } else { 878 /* 879 * These errors are normal races, so we don't want to 880 * fill the console with messages by calling 881 * ptlrpc_error() 882 */ 883 lustre_pack_reply(req, 1, NULL, NULL); 884 result = -ELDLM_NO_LOCK_DATA; 885 } 886 cl_env_nested_put(&nest, env); 887 } else 888 result = PTR_ERR(env); 889 req->rq_status = result; 890 return result; 891} 892 893static unsigned long osc_lock_weigh(const struct lu_env *env, 894 const struct cl_lock_slice *slice) 895{ 896 /* 897 * don't need to grab coh_page_guard since we don't care the exact # 898 * of pages.. 899 */ 900 return cl_object_header(slice->cls_obj)->coh_pages; 901} 902 903static void osc_lock_build_einfo(const struct lu_env *env, 904 const struct cl_lock *clock, 905 struct osc_lock *lock, 906 struct ldlm_enqueue_info *einfo) 907{ 908 enum cl_lock_mode mode; 909 910 mode = clock->cll_descr.cld_mode; 911 if (mode == CLM_PHANTOM) 912 /* 913 * For now, enqueue all glimpse locks in read mode. In the 914 * future, client might choose to enqueue LCK_PW lock for 915 * glimpse on a file opened for write. 916 */ 917 mode = CLM_READ; 918 919 einfo->ei_type = LDLM_EXTENT; 920 einfo->ei_mode = osc_cl_lock2ldlm(mode); 921 einfo->ei_cb_bl = osc_ldlm_blocking_ast; 922 einfo->ei_cb_cp = osc_ldlm_completion_ast; 923 einfo->ei_cb_gl = osc_ldlm_glimpse_ast; 924 einfo->ei_cbdata = lock; /* value to be put into ->l_ast_data */ 925} 926 927/** 928 * Determine if the lock should be converted into a lockless lock. 929 * 930 * Steps to check: 931 * - if the lock has an explicit requirement for a non-lockless lock; 932 * - if the io lock request type ci_lockreq; 933 * - send the enqueue rpc to ost to make the further decision; 934 * - special treat to truncate lockless lock 935 * 936 * Additional policy can be implemented here, e.g., never do lockless-io 937 * for large extents. 938 */ 939static void osc_lock_to_lockless(const struct lu_env *env, 940 struct osc_lock *ols, int force) 941{ 942 struct cl_lock_slice *slice = &ols->ols_cl; 943 944 LASSERT(ols->ols_state == OLS_NEW || 945 ols->ols_state == OLS_UPCALL_RECEIVED); 946 947 if (force) { 948 ols->ols_locklessable = 1; 949 slice->cls_ops = &osc_lock_lockless_ops; 950 } else { 951 struct osc_io *oio = osc_env_io(env); 952 struct cl_io *io = oio->oi_cl.cis_io; 953 struct cl_object *obj = slice->cls_obj; 954 struct osc_object *oob = cl2osc(obj); 955 const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev); 956 struct obd_connect_data *ocd; 957 958 LASSERT(io->ci_lockreq == CILR_MANDATORY || 959 io->ci_lockreq == CILR_MAYBE || 960 io->ci_lockreq == CILR_NEVER); 961 962 ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data; 963 ols->ols_locklessable = (io->ci_type != CIT_SETATTR) && 964 (io->ci_lockreq == CILR_MAYBE) && 965 (ocd->ocd_connect_flags & OBD_CONNECT_SRVLOCK); 966 if (io->ci_lockreq == CILR_NEVER || 967 /* lockless IO */ 968 (ols->ols_locklessable && osc_object_is_contended(oob)) || 969 /* lockless truncate */ 970 (cl_io_is_trunc(io) && 971 (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK) && 972 osd->od_lockless_truncate)) { 973 ols->ols_locklessable = 1; 974 slice->cls_ops = &osc_lock_lockless_ops; 975 } 976 } 977 LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols))); 978} 979 980static int osc_lock_compatible(const struct osc_lock *qing, 981 const struct osc_lock *qed) 982{ 983 enum cl_lock_mode qing_mode; 984 enum cl_lock_mode qed_mode; 985 986 qing_mode = qing->ols_cl.cls_lock->cll_descr.cld_mode; 987 if (qed->ols_glimpse && 988 (qed->ols_state >= OLS_UPCALL_RECEIVED || qing_mode == CLM_READ)) 989 return 1; 990 991 qed_mode = qed->ols_cl.cls_lock->cll_descr.cld_mode; 992 return ((qing_mode == CLM_READ) && (qed_mode == CLM_READ)); 993} 994 995/** 996 * Cancel all conflicting locks and wait for them to be destroyed. 997 * 998 * This function is used for two purposes: 999 * 1000 * - early cancel all conflicting locks before starting IO, and 1001 * 1002 * - guarantee that pages added to the page cache by lockless IO are never 1003 * covered by locks other than lockless IO lock, and, hence, are not 1004 * visible to other threads. 1005 */ 1006static int osc_lock_enqueue_wait(const struct lu_env *env, 1007 const struct osc_lock *olck) 1008{ 1009 struct cl_lock *lock = olck->ols_cl.cls_lock; 1010 struct cl_lock_descr *descr = &lock->cll_descr; 1011 struct cl_object_header *hdr = cl_object_header(descr->cld_obj); 1012 struct cl_lock *scan; 1013 struct cl_lock *conflict= NULL; 1014 int lockless = osc_lock_is_lockless(olck); 1015 int rc = 0; 1016 1017 LASSERT(cl_lock_is_mutexed(lock)); 1018 1019 /* make it enqueue anyway for glimpse lock, because we actually 1020 * don't need to cancel any conflicting locks. */ 1021 if (olck->ols_glimpse) 1022 return 0; 1023 1024 spin_lock(&hdr->coh_lock_guard); 1025 list_for_each_entry(scan, &hdr->coh_locks, cll_linkage) { 1026 struct cl_lock_descr *cld = &scan->cll_descr; 1027 const struct osc_lock *scan_ols; 1028 1029 if (scan == lock) 1030 break; 1031 1032 if (scan->cll_state < CLS_QUEUING || 1033 scan->cll_state == CLS_FREEING || 1034 cld->cld_start > descr->cld_end || 1035 cld->cld_end < descr->cld_start) 1036 continue; 1037 1038 /* overlapped and living locks. */ 1039 1040 /* We're not supposed to give up group lock. */ 1041 if (scan->cll_descr.cld_mode == CLM_GROUP) { 1042 LASSERT(descr->cld_mode != CLM_GROUP || 1043 descr->cld_gid != scan->cll_descr.cld_gid); 1044 continue; 1045 } 1046 1047 scan_ols = osc_lock_at(scan); 1048 1049 /* We need to cancel the compatible locks if we're enqueuing 1050 * a lockless lock, for example: 1051 * imagine that client has PR lock on [0, 1000], and thread T0 1052 * is doing lockless IO in [500, 1500] region. Concurrent 1053 * thread T1 can see lockless data in [500, 1000], which is 1054 * wrong, because these data are possibly stale. */ 1055 if (!lockless && osc_lock_compatible(olck, scan_ols)) 1056 continue; 1057 1058 cl_lock_get_trust(scan); 1059 conflict = scan; 1060 break; 1061 } 1062 spin_unlock(&hdr->coh_lock_guard); 1063 1064 if (conflict) { 1065 if (lock->cll_descr.cld_mode == CLM_GROUP) { 1066 /* we want a group lock but a previous lock request 1067 * conflicts, we do not wait but return 0 so the 1068 * request is send to the server 1069 */ 1070 CDEBUG(D_DLMTRACE, "group lock %p is conflicted " 1071 "with %p, no wait, send to server\n", 1072 lock, conflict); 1073 cl_lock_put(env, conflict); 1074 rc = 0; 1075 } else { 1076 CDEBUG(D_DLMTRACE, "lock %p is conflicted with %p, " 1077 "will wait\n", 1078 lock, conflict); 1079 LASSERT(lock->cll_conflict == NULL); 1080 lu_ref_add(&conflict->cll_reference, "cancel-wait", 1081 lock); 1082 lock->cll_conflict = conflict; 1083 rc = CLO_WAIT; 1084 } 1085 } 1086 return rc; 1087} 1088 1089/** 1090 * Implementation of cl_lock_operations::clo_enqueue() method for osc 1091 * layer. This initiates ldlm enqueue: 1092 * 1093 * - cancels conflicting locks early (osc_lock_enqueue_wait()); 1094 * 1095 * - calls osc_enqueue_base() to do actual enqueue. 1096 * 1097 * osc_enqueue_base() is supplied with an upcall function that is executed 1098 * when lock is received either after a local cached ldlm lock is matched, or 1099 * when a reply from the server is received. 1100 * 1101 * This function does not wait for the network communication to complete. 1102 */ 1103static int osc_lock_enqueue(const struct lu_env *env, 1104 const struct cl_lock_slice *slice, 1105 struct cl_io *unused, __u32 enqflags) 1106{ 1107 struct osc_lock *ols = cl2osc_lock(slice); 1108 struct cl_lock *lock = ols->ols_cl.cls_lock; 1109 int result; 1110 1111 LASSERT(cl_lock_is_mutexed(lock)); 1112 LASSERTF(ols->ols_state == OLS_NEW, 1113 "Impossible state: %d\n", ols->ols_state); 1114 1115 LASSERTF(ergo(ols->ols_glimpse, lock->cll_descr.cld_mode <= CLM_READ), 1116 "lock = %p, ols = %p\n", lock, ols); 1117 1118 result = osc_lock_enqueue_wait(env, ols); 1119 if (result == 0) { 1120 if (!osc_lock_is_lockless(ols)) { 1121 struct osc_object *obj = cl2osc(slice->cls_obj); 1122 struct osc_thread_info *info = osc_env_info(env); 1123 struct ldlm_res_id *resname = &info->oti_resname; 1124 ldlm_policy_data_t *policy = &info->oti_policy; 1125 struct ldlm_enqueue_info *einfo = &ols->ols_einfo; 1126 1127 /* lock will be passed as upcall cookie, 1128 * hold ref to prevent to be released. */ 1129 cl_lock_hold_add(env, lock, "upcall", lock); 1130 /* a user for lock also */ 1131 cl_lock_user_add(env, lock); 1132 ols->ols_state = OLS_ENQUEUED; 1133 1134 /* 1135 * XXX: this is possible blocking point as 1136 * ldlm_lock_match(LDLM_FL_LVB_READY) waits for 1137 * LDLM_CP_CALLBACK. 1138 */ 1139 ostid_build_res_name(&obj->oo_oinfo->loi_oi, resname); 1140 osc_lock_build_policy(env, lock, policy); 1141 result = osc_enqueue_base(osc_export(obj), resname, 1142 &ols->ols_flags, policy, 1143 &ols->ols_lvb, 1144 obj->oo_oinfo->loi_kms_valid, 1145 osc_lock_upcall, 1146 ols, einfo, &ols->ols_handle, 1147 PTLRPCD_SET, 1, ols->ols_agl); 1148 if (result != 0) { 1149 cl_lock_user_del(env, lock); 1150 cl_lock_unhold(env, lock, "upcall", lock); 1151 if (unlikely(result == -ECANCELED)) { 1152 ols->ols_state = OLS_NEW; 1153 result = 0; 1154 } 1155 } 1156 } else { 1157 ols->ols_state = OLS_GRANTED; 1158 ols->ols_owner = osc_env_io(env); 1159 } 1160 } 1161 LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols))); 1162 return result; 1163} 1164 1165static int osc_lock_wait(const struct lu_env *env, 1166 const struct cl_lock_slice *slice) 1167{ 1168 struct osc_lock *olck = cl2osc_lock(slice); 1169 struct cl_lock *lock = olck->ols_cl.cls_lock; 1170 1171 LINVRNT(osc_lock_invariant(olck)); 1172 1173 if (olck->ols_glimpse && olck->ols_state >= OLS_UPCALL_RECEIVED) { 1174 if (olck->ols_flags & LDLM_FL_LVB_READY) { 1175 return 0; 1176 } else if (olck->ols_agl) { 1177 if (lock->cll_flags & CLF_FROM_UPCALL) 1178 /* It is from enqueue RPC reply upcall for 1179 * updating state. Do not re-enqueue. */ 1180 return -ENAVAIL; 1181 else 1182 olck->ols_state = OLS_NEW; 1183 } else { 1184 LASSERT(lock->cll_error); 1185 return lock->cll_error; 1186 } 1187 } 1188 1189 if (olck->ols_state == OLS_NEW) { 1190 int rc; 1191 1192 LASSERT(olck->ols_agl); 1193 olck->ols_agl = 0; 1194 olck->ols_flags &= ~LDLM_FL_BLOCK_NOWAIT; 1195 rc = osc_lock_enqueue(env, slice, NULL, CEF_ASYNC | CEF_MUST); 1196 if (rc != 0) 1197 return rc; 1198 else 1199 return CLO_REENQUEUED; 1200 } 1201 1202 LASSERT(equi(olck->ols_state >= OLS_UPCALL_RECEIVED && 1203 lock->cll_error == 0, olck->ols_lock != NULL)); 1204 1205 return lock->cll_error ?: olck->ols_state >= OLS_GRANTED ? 0 : CLO_WAIT; 1206} 1207 1208/** 1209 * An implementation of cl_lock_operations::clo_use() method that pins cached 1210 * lock. 1211 */ 1212static int osc_lock_use(const struct lu_env *env, 1213 const struct cl_lock_slice *slice) 1214{ 1215 struct osc_lock *olck = cl2osc_lock(slice); 1216 int rc; 1217 1218 LASSERT(!olck->ols_hold); 1219 1220 /* 1221 * Atomically check for LDLM_FL_CBPENDING and addref a lock if this 1222 * flag is not set. This protects us from a concurrent blocking ast. 1223 */ 1224 rc = ldlm_lock_addref_try(&olck->ols_handle, olck->ols_einfo.ei_mode); 1225 if (rc == 0) { 1226 olck->ols_hold = 1; 1227 olck->ols_state = OLS_GRANTED; 1228 } else { 1229 struct cl_lock *lock; 1230 1231 /* 1232 * Lock is being cancelled somewhere within 1233 * ldlm_handle_bl_callback(): LDLM_FL_CBPENDING is already 1234 * set, but osc_ldlm_blocking_ast() hasn't yet acquired 1235 * cl_lock mutex. 1236 */ 1237 lock = slice->cls_lock; 1238 LASSERT(lock->cll_state == CLS_INTRANSIT); 1239 LASSERT(lock->cll_users > 0); 1240 /* set a flag for osc_dlm_blocking_ast0() to signal the 1241 * lock.*/ 1242 olck->ols_ast_wait = 1; 1243 rc = CLO_WAIT; 1244 } 1245 return rc; 1246} 1247 1248static int osc_lock_flush(struct osc_lock *ols, int discard) 1249{ 1250 struct cl_lock *lock = ols->ols_cl.cls_lock; 1251 struct cl_env_nest nest; 1252 struct lu_env *env; 1253 int result = 0; 1254 1255 env = cl_env_nested_get(&nest); 1256 if (!IS_ERR(env)) { 1257 struct osc_object *obj = cl2osc(ols->ols_cl.cls_obj); 1258 struct cl_lock_descr *descr = &lock->cll_descr; 1259 int rc = 0; 1260 1261 if (descr->cld_mode >= CLM_WRITE) { 1262 result = osc_cache_writeback_range(env, obj, 1263 descr->cld_start, descr->cld_end, 1264 1, discard); 1265 LDLM_DEBUG(ols->ols_lock, 1266 "lock %p: %d pages were %s.\n", lock, result, 1267 discard ? "discarded" : "written"); 1268 if (result > 0) 1269 result = 0; 1270 } 1271 1272 rc = cl_lock_discard_pages(env, lock); 1273 if (result == 0 && rc < 0) 1274 result = rc; 1275 1276 cl_env_nested_put(&nest, env); 1277 } else 1278 result = PTR_ERR(env); 1279 if (result == 0) { 1280 ols->ols_flush = 1; 1281 LINVRNT(!osc_lock_has_pages(ols)); 1282 } 1283 return result; 1284} 1285 1286/** 1287 * Implements cl_lock_operations::clo_cancel() method for osc layer. This is 1288 * called (as part of cl_lock_cancel()) when lock is canceled either voluntary 1289 * (LRU pressure, early cancellation, umount, etc.) or due to the conflict 1290 * with some other lock some where in the cluster. This function does the 1291 * following: 1292 * 1293 * - invalidates all pages protected by this lock (after sending dirty 1294 * ones to the server, as necessary); 1295 * 1296 * - decref's underlying ldlm lock; 1297 * 1298 * - cancels ldlm lock (ldlm_cli_cancel()). 1299 */ 1300static void osc_lock_cancel(const struct lu_env *env, 1301 const struct cl_lock_slice *slice) 1302{ 1303 struct cl_lock *lock = slice->cls_lock; 1304 struct osc_lock *olck = cl2osc_lock(slice); 1305 struct ldlm_lock *dlmlock = olck->ols_lock; 1306 int result = 0; 1307 int discard; 1308 1309 LASSERT(cl_lock_is_mutexed(lock)); 1310 LINVRNT(osc_lock_invariant(olck)); 1311 1312 if (dlmlock != NULL) { 1313 int do_cancel; 1314 1315 discard = !!(dlmlock->l_flags & LDLM_FL_DISCARD_DATA); 1316 if (olck->ols_state >= OLS_GRANTED) 1317 result = osc_lock_flush(olck, discard); 1318 osc_lock_unhold(olck); 1319 1320 lock_res_and_lock(dlmlock); 1321 /* Now that we're the only user of dlm read/write reference, 1322 * mostly the ->l_readers + ->l_writers should be zero. 1323 * However, there is a corner case. 1324 * See bug 18829 for details.*/ 1325 do_cancel = (dlmlock->l_readers == 0 && 1326 dlmlock->l_writers == 0); 1327 dlmlock->l_flags |= LDLM_FL_CBPENDING; 1328 unlock_res_and_lock(dlmlock); 1329 if (do_cancel) 1330 result = ldlm_cli_cancel(&olck->ols_handle, LCF_ASYNC); 1331 if (result < 0) 1332 CL_LOCK_DEBUG(D_ERROR, env, lock, 1333 "lock %p cancel failure with error(%d)\n", 1334 lock, result); 1335 } 1336 olck->ols_state = OLS_CANCELLED; 1337 olck->ols_flags &= ~LDLM_FL_LVB_READY; 1338 osc_lock_detach(env, olck); 1339} 1340 1341static int osc_lock_has_pages(struct osc_lock *olck) 1342{ 1343 return 0; 1344} 1345 1346static void osc_lock_delete(const struct lu_env *env, 1347 const struct cl_lock_slice *slice) 1348{ 1349 struct osc_lock *olck; 1350 1351 olck = cl2osc_lock(slice); 1352 if (olck->ols_glimpse) { 1353 LASSERT(!olck->ols_hold); 1354 LASSERT(!olck->ols_lock); 1355 return; 1356 } 1357 1358 LINVRNT(osc_lock_invariant(olck)); 1359 LINVRNT(!osc_lock_has_pages(olck)); 1360 1361 osc_lock_unhold(olck); 1362 osc_lock_detach(env, olck); 1363} 1364 1365/** 1366 * Implements cl_lock_operations::clo_state() method for osc layer. 1367 * 1368 * Maintains osc_lock::ols_owner field. 1369 * 1370 * This assumes that lock always enters CLS_HELD (from some other state) in 1371 * the same IO context as one that requested the lock. This should not be a 1372 * problem, because context is by definition shared by all activity pertaining 1373 * to the same high-level IO. 1374 */ 1375static void osc_lock_state(const struct lu_env *env, 1376 const struct cl_lock_slice *slice, 1377 enum cl_lock_state state) 1378{ 1379 struct osc_lock *lock = cl2osc_lock(slice); 1380 1381 /* 1382 * XXX multiple io contexts can use the lock at the same time. 1383 */ 1384 LINVRNT(osc_lock_invariant(lock)); 1385 if (state == CLS_HELD && slice->cls_lock->cll_state != CLS_HELD) { 1386 struct osc_io *oio = osc_env_io(env); 1387 1388 LASSERT(lock->ols_owner == NULL); 1389 lock->ols_owner = oio; 1390 } else if (state != CLS_HELD) 1391 lock->ols_owner = NULL; 1392} 1393 1394static int osc_lock_print(const struct lu_env *env, void *cookie, 1395 lu_printer_t p, const struct cl_lock_slice *slice) 1396{ 1397 struct osc_lock *lock = cl2osc_lock(slice); 1398 1399 /* 1400 * XXX print ldlm lock and einfo properly. 1401 */ 1402 (*p)(env, cookie, "%p %#16llx %#llx %d %p ", 1403 lock->ols_lock, lock->ols_flags, lock->ols_handle.cookie, 1404 lock->ols_state, lock->ols_owner); 1405 osc_lvb_print(env, cookie, p, &lock->ols_lvb); 1406 return 0; 1407} 1408 1409static int osc_lock_fits_into(const struct lu_env *env, 1410 const struct cl_lock_slice *slice, 1411 const struct cl_lock_descr *need, 1412 const struct cl_io *io) 1413{ 1414 struct osc_lock *ols = cl2osc_lock(slice); 1415 1416 if (need->cld_enq_flags & CEF_NEVER) 1417 return 0; 1418 1419 if (ols->ols_state >= OLS_CANCELLED) 1420 return 0; 1421 1422 if (need->cld_mode == CLM_PHANTOM) { 1423 if (ols->ols_agl) 1424 return !(ols->ols_state > OLS_RELEASED); 1425 1426 /* 1427 * Note: the QUEUED lock can't be matched here, otherwise 1428 * it might cause the deadlocks. 1429 * In read_process, 1430 * P1: enqueued read lock, create sublock1 1431 * P2: enqueued write lock, create sublock2(conflicted 1432 * with sublock1). 1433 * P1: Grant read lock. 1434 * P1: enqueued glimpse lock(with holding sublock1_read), 1435 * matched with sublock2, waiting sublock2 to be granted. 1436 * But sublock2 can not be granted, because P1 1437 * will not release sublock1. Bang! 1438 */ 1439 if (ols->ols_state < OLS_GRANTED || 1440 ols->ols_state > OLS_RELEASED) 1441 return 0; 1442 } else if (need->cld_enq_flags & CEF_MUST) { 1443 /* 1444 * If the lock hasn't ever enqueued, it can't be matched 1445 * because enqueue process brings in many information 1446 * which can be used to determine things such as lockless, 1447 * CEF_MUST, etc. 1448 */ 1449 if (ols->ols_state < OLS_UPCALL_RECEIVED && 1450 ols->ols_locklessable) 1451 return 0; 1452 } 1453 return 1; 1454} 1455 1456static const struct cl_lock_operations osc_lock_ops = { 1457 .clo_fini = osc_lock_fini, 1458 .clo_enqueue = osc_lock_enqueue, 1459 .clo_wait = osc_lock_wait, 1460 .clo_unuse = osc_lock_unuse, 1461 .clo_use = osc_lock_use, 1462 .clo_delete = osc_lock_delete, 1463 .clo_state = osc_lock_state, 1464 .clo_cancel = osc_lock_cancel, 1465 .clo_weigh = osc_lock_weigh, 1466 .clo_print = osc_lock_print, 1467 .clo_fits_into = osc_lock_fits_into, 1468}; 1469 1470static int osc_lock_lockless_unuse(const struct lu_env *env, 1471 const struct cl_lock_slice *slice) 1472{ 1473 struct osc_lock *ols = cl2osc_lock(slice); 1474 struct cl_lock *lock = slice->cls_lock; 1475 1476 LASSERT(ols->ols_state == OLS_GRANTED); 1477 LINVRNT(osc_lock_invariant(ols)); 1478 1479 cl_lock_cancel(env, lock); 1480 cl_lock_delete(env, lock); 1481 return 0; 1482} 1483 1484static void osc_lock_lockless_cancel(const struct lu_env *env, 1485 const struct cl_lock_slice *slice) 1486{ 1487 struct osc_lock *ols = cl2osc_lock(slice); 1488 int result; 1489 1490 result = osc_lock_flush(ols, 0); 1491 if (result) 1492 CERROR("Pages for lockless lock %p were not purged(%d)\n", 1493 ols, result); 1494 ols->ols_state = OLS_CANCELLED; 1495} 1496 1497static int osc_lock_lockless_wait(const struct lu_env *env, 1498 const struct cl_lock_slice *slice) 1499{ 1500 struct osc_lock *olck = cl2osc_lock(slice); 1501 struct cl_lock *lock = olck->ols_cl.cls_lock; 1502 1503 LINVRNT(osc_lock_invariant(olck)); 1504 LASSERT(olck->ols_state >= OLS_UPCALL_RECEIVED); 1505 1506 return lock->cll_error; 1507} 1508 1509static void osc_lock_lockless_state(const struct lu_env *env, 1510 const struct cl_lock_slice *slice, 1511 enum cl_lock_state state) 1512{ 1513 struct osc_lock *lock = cl2osc_lock(slice); 1514 1515 LINVRNT(osc_lock_invariant(lock)); 1516 if (state == CLS_HELD) { 1517 struct osc_io *oio = osc_env_io(env); 1518 1519 LASSERT(ergo(lock->ols_owner, lock->ols_owner == oio)); 1520 lock->ols_owner = oio; 1521 1522 /* set the io to be lockless if this lock is for io's 1523 * host object */ 1524 if (cl_object_same(oio->oi_cl.cis_obj, slice->cls_obj)) 1525 oio->oi_lockless = 1; 1526 } 1527} 1528 1529static int osc_lock_lockless_fits_into(const struct lu_env *env, 1530 const struct cl_lock_slice *slice, 1531 const struct cl_lock_descr *need, 1532 const struct cl_io *io) 1533{ 1534 struct osc_lock *lock = cl2osc_lock(slice); 1535 1536 if (!(need->cld_enq_flags & CEF_NEVER)) 1537 return 0; 1538 1539 /* lockless lock should only be used by its owning io. b22147 */ 1540 return (lock->ols_owner == osc_env_io(env)); 1541} 1542 1543static const struct cl_lock_operations osc_lock_lockless_ops = { 1544 .clo_fini = osc_lock_fini, 1545 .clo_enqueue = osc_lock_enqueue, 1546 .clo_wait = osc_lock_lockless_wait, 1547 .clo_unuse = osc_lock_lockless_unuse, 1548 .clo_state = osc_lock_lockless_state, 1549 .clo_fits_into = osc_lock_lockless_fits_into, 1550 .clo_cancel = osc_lock_lockless_cancel, 1551 .clo_print = osc_lock_print 1552}; 1553 1554int osc_lock_init(const struct lu_env *env, 1555 struct cl_object *obj, struct cl_lock *lock, 1556 const struct cl_io *unused) 1557{ 1558 struct osc_lock *clk; 1559 int result; 1560 1561 OBD_SLAB_ALLOC_PTR_GFP(clk, osc_lock_kmem, GFP_NOFS); 1562 if (clk != NULL) { 1563 __u32 enqflags = lock->cll_descr.cld_enq_flags; 1564 1565 osc_lock_build_einfo(env, lock, clk, &clk->ols_einfo); 1566 atomic_set(&clk->ols_pageref, 0); 1567 clk->ols_state = OLS_NEW; 1568 1569 clk->ols_flags = osc_enq2ldlm_flags(enqflags); 1570 clk->ols_agl = !!(enqflags & CEF_AGL); 1571 if (clk->ols_agl) 1572 clk->ols_flags |= LDLM_FL_BLOCK_NOWAIT; 1573 if (clk->ols_flags & LDLM_FL_HAS_INTENT) 1574 clk->ols_glimpse = 1; 1575 1576 cl_lock_slice_add(lock, &clk->ols_cl, obj, &osc_lock_ops); 1577 1578 if (!(enqflags & CEF_MUST)) 1579 /* try to convert this lock to a lockless lock */ 1580 osc_lock_to_lockless(env, clk, (enqflags & CEF_NEVER)); 1581 if (clk->ols_locklessable && !(enqflags & CEF_DISCARD_DATA)) 1582 clk->ols_flags |= LDLM_FL_DENY_ON_CONTENTION; 1583 1584 LDLM_DEBUG_NOLOCK("lock %p, osc lock %p, flags %llx\n", 1585 lock, clk, clk->ols_flags); 1586 1587 result = 0; 1588 } else 1589 result = -ENOMEM; 1590 return result; 1591} 1592 1593int osc_dlm_lock_pageref(struct ldlm_lock *dlm) 1594{ 1595 struct osc_lock *olock; 1596 int rc = 0; 1597 1598 spin_lock(&osc_ast_guard); 1599 olock = dlm->l_ast_data; 1600 /* 1601 * there's a very rare race with osc_page_addref_lock(), but that 1602 * doesn't matter because in the worst case we don't cancel a lock 1603 * which we actually can, that's no harm. 1604 */ 1605 if (olock != NULL && 1606 atomic_add_return(_PAGEREF_MAGIC, 1607 &olock->ols_pageref) != _PAGEREF_MAGIC) { 1608 atomic_sub(_PAGEREF_MAGIC, &olock->ols_pageref); 1609 rc = 1; 1610 } 1611 spin_unlock(&osc_ast_guard); 1612 return rc; 1613} 1614 1615/** @} osc */ 1616