1/* 2 * GPL HEADER START 3 * 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License version 2 only, 8 * as published by the Free Software Foundation. 9 * 10 * This program is distributed in the hope that it will be useful, but 11 * WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * General Public License version 2 for more details (a copy is included 14 * in the LICENSE file that accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License 17 * version 2 along with this program; If not, see 18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf 19 * 20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 21 * CA 95054 USA or visit www.sun.com if you need additional information or 22 * have any questions. 23 * 24 * GPL HEADER END 25 */ 26/* 27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. 28 * Use is subject to license terms. 29 * 30 * Copyright (c) 2010, 2012, Intel Corporation. 31 */ 32/* 33 * This file is part of Lustre, http://www.lustre.org/ 34 * Lustre is a trademark of Sun Microsystems, Inc. 35 * 36 * lustre/ldlm/ldlm_lock.c 37 * 38 * Author: Peter Braam <braam@clusterfs.com> 39 * Author: Phil Schwan <phil@clusterfs.com> 40 */ 41 42#define DEBUG_SUBSYSTEM S_LDLM 43 44#include "../../include/linux/libcfs/libcfs.h" 45#include "../include/lustre_intent.h" 46#include "../include/obd_class.h" 47#include "ldlm_internal.h" 48 49/* lock types */ 50char *ldlm_lockname[] = { 51 [0] = "--", 52 [LCK_EX] = "EX", 53 [LCK_PW] = "PW", 54 [LCK_PR] = "PR", 55 [LCK_CW] = "CW", 56 [LCK_CR] = "CR", 57 [LCK_NL] = "NL", 58 [LCK_GROUP] = "GROUP", 59 [LCK_COS] = "COS", 60}; 61EXPORT_SYMBOL(ldlm_lockname); 62 63char *ldlm_typename[] = { 64 [LDLM_PLAIN] = "PLN", 65 [LDLM_EXTENT] = "EXT", 66 [LDLM_FLOCK] = "FLK", 67 [LDLM_IBITS] = "IBT", 68}; 69EXPORT_SYMBOL(ldlm_typename); 70 71static ldlm_policy_wire_to_local_t ldlm_policy_wire18_to_local[] = { 72 [LDLM_PLAIN - LDLM_MIN_TYPE] = ldlm_plain_policy_wire_to_local, 73 [LDLM_EXTENT - LDLM_MIN_TYPE] = ldlm_extent_policy_wire_to_local, 74 [LDLM_FLOCK - LDLM_MIN_TYPE] = ldlm_flock_policy_wire18_to_local, 75 [LDLM_IBITS - LDLM_MIN_TYPE] = ldlm_ibits_policy_wire_to_local, 76}; 77 78static ldlm_policy_wire_to_local_t ldlm_policy_wire21_to_local[] = { 79 [LDLM_PLAIN - LDLM_MIN_TYPE] = ldlm_plain_policy_wire_to_local, 80 [LDLM_EXTENT - LDLM_MIN_TYPE] = ldlm_extent_policy_wire_to_local, 81 [LDLM_FLOCK - LDLM_MIN_TYPE] = ldlm_flock_policy_wire21_to_local, 82 [LDLM_IBITS - LDLM_MIN_TYPE] = ldlm_ibits_policy_wire_to_local, 83}; 84 85static ldlm_policy_local_to_wire_t ldlm_policy_local_to_wire[] = { 86 [LDLM_PLAIN - LDLM_MIN_TYPE] = ldlm_plain_policy_local_to_wire, 87 [LDLM_EXTENT - LDLM_MIN_TYPE] = ldlm_extent_policy_local_to_wire, 88 [LDLM_FLOCK - LDLM_MIN_TYPE] = ldlm_flock_policy_local_to_wire, 89 [LDLM_IBITS - LDLM_MIN_TYPE] = ldlm_ibits_policy_local_to_wire, 90}; 91 92/** 93 * Converts lock policy from local format to on the wire lock_desc format 94 */ 95void ldlm_convert_policy_to_wire(ldlm_type_t type, 96 const ldlm_policy_data_t *lpolicy, 97 ldlm_wire_policy_data_t *wpolicy) 98{ 99 ldlm_policy_local_to_wire_t convert; 100 101 convert = ldlm_policy_local_to_wire[type - LDLM_MIN_TYPE]; 102 103 convert(lpolicy, wpolicy); 104} 105 106/** 107 * Converts lock policy from on the wire lock_desc format to local format 108 */ 109void ldlm_convert_policy_to_local(struct obd_export *exp, ldlm_type_t type, 110 const ldlm_wire_policy_data_t *wpolicy, 111 ldlm_policy_data_t *lpolicy) 112{ 113 ldlm_policy_wire_to_local_t convert; 114 int new_client; 115 116 /** some badness for 2.0.0 clients, but 2.0.0 isn't supported */ 117 new_client = (exp_connect_flags(exp) & OBD_CONNECT_FULL20) != 0; 118 if (new_client) 119 convert = ldlm_policy_wire21_to_local[type - LDLM_MIN_TYPE]; 120 else 121 convert = ldlm_policy_wire18_to_local[type - LDLM_MIN_TYPE]; 122 123 convert(wpolicy, lpolicy); 124} 125 126char *ldlm_it2str(int it) 127{ 128 switch (it) { 129 case IT_OPEN: 130 return "open"; 131 case IT_CREAT: 132 return "creat"; 133 case (IT_OPEN | IT_CREAT): 134 return "open|creat"; 135 case IT_READDIR: 136 return "readdir"; 137 case IT_GETATTR: 138 return "getattr"; 139 case IT_LOOKUP: 140 return "lookup"; 141 case IT_UNLINK: 142 return "unlink"; 143 case IT_GETXATTR: 144 return "getxattr"; 145 case IT_LAYOUT: 146 return "layout"; 147 default: 148 CERROR("Unknown intent %d\n", it); 149 return "UNKNOWN"; 150 } 151} 152EXPORT_SYMBOL(ldlm_it2str); 153 154extern struct kmem_cache *ldlm_lock_slab; 155 156 157void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg) 158{ 159 ns->ns_policy = arg; 160} 161EXPORT_SYMBOL(ldlm_register_intent); 162 163/* 164 * REFCOUNTED LOCK OBJECTS 165 */ 166 167 168/** 169 * Get a reference on a lock. 170 * 171 * Lock refcounts, during creation: 172 * - one special one for allocation, dec'd only once in destroy 173 * - one for being a lock that's in-use 174 * - one for the addref associated with a new lock 175 */ 176struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock) 177{ 178 atomic_inc(&lock->l_refc); 179 return lock; 180} 181EXPORT_SYMBOL(ldlm_lock_get); 182 183/** 184 * Release lock reference. 185 * 186 * Also frees the lock if it was last reference. 187 */ 188void ldlm_lock_put(struct ldlm_lock *lock) 189{ 190 LASSERT(lock->l_resource != LP_POISON); 191 LASSERT(atomic_read(&lock->l_refc) > 0); 192 if (atomic_dec_and_test(&lock->l_refc)) { 193 struct ldlm_resource *res; 194 195 LDLM_DEBUG(lock, 196 "final lock_put on destroyed lock, freeing it."); 197 198 res = lock->l_resource; 199 LASSERT(lock->l_flags & LDLM_FL_DESTROYED); 200 LASSERT(list_empty(&lock->l_res_link)); 201 LASSERT(list_empty(&lock->l_pending_chain)); 202 203 lprocfs_counter_decr(ldlm_res_to_ns(res)->ns_stats, 204 LDLM_NSS_LOCKS); 205 lu_ref_del(&res->lr_reference, "lock", lock); 206 ldlm_resource_putref(res); 207 lock->l_resource = NULL; 208 if (lock->l_export) { 209 class_export_lock_put(lock->l_export, lock); 210 lock->l_export = NULL; 211 } 212 213 if (lock->l_lvb_data != NULL) 214 OBD_FREE(lock->l_lvb_data, lock->l_lvb_len); 215 216 ldlm_interval_free(ldlm_interval_detach(lock)); 217 lu_ref_fini(&lock->l_reference); 218 OBD_FREE_RCU(lock, sizeof(*lock), &lock->l_handle); 219 } 220} 221EXPORT_SYMBOL(ldlm_lock_put); 222 223/** 224 * Removes LDLM lock \a lock from LRU. Assumes LRU is already locked. 225 */ 226int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock) 227{ 228 int rc = 0; 229 if (!list_empty(&lock->l_lru)) { 230 struct ldlm_namespace *ns = ldlm_lock_to_ns(lock); 231 232 LASSERT(lock->l_resource->lr_type != LDLM_FLOCK); 233 list_del_init(&lock->l_lru); 234 LASSERT(ns->ns_nr_unused > 0); 235 ns->ns_nr_unused--; 236 rc = 1; 237 } 238 return rc; 239} 240 241/** 242 * Removes LDLM lock \a lock from LRU. Obtains the LRU lock first. 243 */ 244int ldlm_lock_remove_from_lru(struct ldlm_lock *lock) 245{ 246 struct ldlm_namespace *ns = ldlm_lock_to_ns(lock); 247 int rc; 248 249 if (lock->l_flags & LDLM_FL_NS_SRV) { 250 LASSERT(list_empty(&lock->l_lru)); 251 return 0; 252 } 253 254 spin_lock(&ns->ns_lock); 255 rc = ldlm_lock_remove_from_lru_nolock(lock); 256 spin_unlock(&ns->ns_lock); 257 return rc; 258} 259 260/** 261 * Adds LDLM lock \a lock to namespace LRU. Assumes LRU is already locked. 262 */ 263void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock) 264{ 265 struct ldlm_namespace *ns = ldlm_lock_to_ns(lock); 266 267 lock->l_last_used = cfs_time_current(); 268 LASSERT(list_empty(&lock->l_lru)); 269 LASSERT(lock->l_resource->lr_type != LDLM_FLOCK); 270 list_add_tail(&lock->l_lru, &ns->ns_unused_list); 271 if (lock->l_flags & LDLM_FL_SKIPPED) 272 lock->l_flags &= ~LDLM_FL_SKIPPED; 273 LASSERT(ns->ns_nr_unused >= 0); 274 ns->ns_nr_unused++; 275} 276 277/** 278 * Adds LDLM lock \a lock to namespace LRU. Obtains necessary LRU locks 279 * first. 280 */ 281void ldlm_lock_add_to_lru(struct ldlm_lock *lock) 282{ 283 struct ldlm_namespace *ns = ldlm_lock_to_ns(lock); 284 285 spin_lock(&ns->ns_lock); 286 ldlm_lock_add_to_lru_nolock(lock); 287 spin_unlock(&ns->ns_lock); 288} 289 290/** 291 * Moves LDLM lock \a lock that is already in namespace LRU to the tail of 292 * the LRU. Performs necessary LRU locking 293 */ 294void ldlm_lock_touch_in_lru(struct ldlm_lock *lock) 295{ 296 struct ldlm_namespace *ns = ldlm_lock_to_ns(lock); 297 298 if (lock->l_flags & LDLM_FL_NS_SRV) { 299 LASSERT(list_empty(&lock->l_lru)); 300 return; 301 } 302 303 spin_lock(&ns->ns_lock); 304 if (!list_empty(&lock->l_lru)) { 305 ldlm_lock_remove_from_lru_nolock(lock); 306 ldlm_lock_add_to_lru_nolock(lock); 307 } 308 spin_unlock(&ns->ns_lock); 309} 310 311/** 312 * Helper to destroy a locked lock. 313 * 314 * Used by ldlm_lock_destroy and ldlm_lock_destroy_nolock 315 * Must be called with l_lock and lr_lock held. 316 * 317 * Does not actually free the lock data, but rather marks the lock as 318 * destroyed by setting l_destroyed field in the lock to 1. Destroys a 319 * handle->lock association too, so that the lock can no longer be found 320 * and removes the lock from LRU list. Actual lock freeing occurs when 321 * last lock reference goes away. 322 * 323 * Original comment (of some historical value): 324 * This used to have a 'strict' flag, which recovery would use to mark an 325 * in-use lock as needing-to-die. Lest I am ever tempted to put it back, I 326 * shall explain why it's gone: with the new hash table scheme, once you call 327 * ldlm_lock_destroy, you can never drop your final references on this lock. 328 * Because it's not in the hash table anymore. -phil 329 */ 330int ldlm_lock_destroy_internal(struct ldlm_lock *lock) 331{ 332 if (lock->l_readers || lock->l_writers) { 333 LDLM_ERROR(lock, "lock still has references"); 334 LBUG(); 335 } 336 337 if (!list_empty(&lock->l_res_link)) { 338 LDLM_ERROR(lock, "lock still on resource"); 339 LBUG(); 340 } 341 342 if (lock->l_flags & LDLM_FL_DESTROYED) { 343 LASSERT(list_empty(&lock->l_lru)); 344 return 0; 345 } 346 lock->l_flags |= LDLM_FL_DESTROYED; 347 348 if (lock->l_export && lock->l_export->exp_lock_hash) { 349 /* NB: it's safe to call cfs_hash_del() even lock isn't 350 * in exp_lock_hash. */ 351 /* In the function below, .hs_keycmp resolves to 352 * ldlm_export_lock_keycmp() */ 353 /* coverity[overrun-buffer-val] */ 354 cfs_hash_del(lock->l_export->exp_lock_hash, 355 &lock->l_remote_handle, &lock->l_exp_hash); 356 } 357 358 ldlm_lock_remove_from_lru(lock); 359 class_handle_unhash(&lock->l_handle); 360 361#if 0 362 /* Wake anyone waiting for this lock */ 363 /* FIXME: I should probably add yet another flag, instead of using 364 * l_export to only call this on clients */ 365 if (lock->l_export) 366 class_export_put(lock->l_export); 367 lock->l_export = NULL; 368 if (lock->l_export && lock->l_completion_ast) 369 lock->l_completion_ast(lock, 0); 370#endif 371 return 1; 372} 373 374/** 375 * Destroys a LDLM lock \a lock. Performs necessary locking first. 376 */ 377void ldlm_lock_destroy(struct ldlm_lock *lock) 378{ 379 int first; 380 381 lock_res_and_lock(lock); 382 first = ldlm_lock_destroy_internal(lock); 383 unlock_res_and_lock(lock); 384 385 /* drop reference from hashtable only for first destroy */ 386 if (first) { 387 lu_ref_del(&lock->l_reference, "hash", lock); 388 LDLM_LOCK_RELEASE(lock); 389 } 390} 391 392/** 393 * Destroys a LDLM lock \a lock that is already locked. 394 */ 395void ldlm_lock_destroy_nolock(struct ldlm_lock *lock) 396{ 397 int first; 398 399 first = ldlm_lock_destroy_internal(lock); 400 /* drop reference from hashtable only for first destroy */ 401 if (first) { 402 lu_ref_del(&lock->l_reference, "hash", lock); 403 LDLM_LOCK_RELEASE(lock); 404 } 405} 406 407/* this is called by portals_handle2object with the handle lock taken */ 408static void lock_handle_addref(void *lock) 409{ 410 LDLM_LOCK_GET((struct ldlm_lock *)lock); 411} 412 413static void lock_handle_free(void *lock, int size) 414{ 415 LASSERT(size == sizeof(struct ldlm_lock)); 416 OBD_SLAB_FREE(lock, ldlm_lock_slab, size); 417} 418 419struct portals_handle_ops lock_handle_ops = { 420 .hop_addref = lock_handle_addref, 421 .hop_free = lock_handle_free, 422}; 423 424/** 425 * 426 * Allocate and initialize new lock structure. 427 * 428 * usage: pass in a resource on which you have done ldlm_resource_get 429 * new lock will take over the refcount. 430 * returns: lock with refcount 2 - one for current caller and one for remote 431 */ 432static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource) 433{ 434 struct ldlm_lock *lock; 435 436 if (resource == NULL) 437 LBUG(); 438 439 OBD_SLAB_ALLOC_PTR_GFP(lock, ldlm_lock_slab, GFP_NOFS); 440 if (lock == NULL) 441 return NULL; 442 443 spin_lock_init(&lock->l_lock); 444 lock->l_resource = resource; 445 lu_ref_add(&resource->lr_reference, "lock", lock); 446 447 atomic_set(&lock->l_refc, 2); 448 INIT_LIST_HEAD(&lock->l_res_link); 449 INIT_LIST_HEAD(&lock->l_lru); 450 INIT_LIST_HEAD(&lock->l_pending_chain); 451 INIT_LIST_HEAD(&lock->l_bl_ast); 452 INIT_LIST_HEAD(&lock->l_cp_ast); 453 INIT_LIST_HEAD(&lock->l_rk_ast); 454 init_waitqueue_head(&lock->l_waitq); 455 lock->l_blocking_lock = NULL; 456 INIT_LIST_HEAD(&lock->l_sl_mode); 457 INIT_LIST_HEAD(&lock->l_sl_policy); 458 INIT_HLIST_NODE(&lock->l_exp_hash); 459 INIT_HLIST_NODE(&lock->l_exp_flock_hash); 460 461 lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats, 462 LDLM_NSS_LOCKS); 463 INIT_LIST_HEAD(&lock->l_handle.h_link); 464 class_handle_hash(&lock->l_handle, &lock_handle_ops); 465 466 lu_ref_init(&lock->l_reference); 467 lu_ref_add(&lock->l_reference, "hash", lock); 468 lock->l_callback_timeout = 0; 469 470#if LUSTRE_TRACKS_LOCK_EXP_REFS 471 INIT_LIST_HEAD(&lock->l_exp_refs_link); 472 lock->l_exp_refs_nr = 0; 473 lock->l_exp_refs_target = NULL; 474#endif 475 INIT_LIST_HEAD(&lock->l_exp_list); 476 477 return lock; 478} 479 480/** 481 * Moves LDLM lock \a lock to another resource. 482 * This is used on client when server returns some other lock than requested 483 * (typically as a result of intent operation) 484 */ 485int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock, 486 const struct ldlm_res_id *new_resid) 487{ 488 struct ldlm_resource *oldres = lock->l_resource; 489 struct ldlm_resource *newres; 490 int type; 491 492 LASSERT(ns_is_client(ns)); 493 494 lock_res_and_lock(lock); 495 if (memcmp(new_resid, &lock->l_resource->lr_name, 496 sizeof(lock->l_resource->lr_name)) == 0) { 497 /* Nothing to do */ 498 unlock_res_and_lock(lock); 499 return 0; 500 } 501 502 LASSERT(new_resid->name[0] != 0); 503 504 /* This function assumes that the lock isn't on any lists */ 505 LASSERT(list_empty(&lock->l_res_link)); 506 507 type = oldres->lr_type; 508 unlock_res_and_lock(lock); 509 510 newres = ldlm_resource_get(ns, NULL, new_resid, type, 1); 511 if (newres == NULL) 512 return -ENOMEM; 513 514 lu_ref_add(&newres->lr_reference, "lock", lock); 515 /* 516 * To flip the lock from the old to the new resource, lock, oldres and 517 * newres have to be locked. Resource spin-locks are nested within 518 * lock->l_lock, and are taken in the memory address order to avoid 519 * dead-locks. 520 */ 521 spin_lock(&lock->l_lock); 522 oldres = lock->l_resource; 523 if (oldres < newres) { 524 lock_res(oldres); 525 lock_res_nested(newres, LRT_NEW); 526 } else { 527 lock_res(newres); 528 lock_res_nested(oldres, LRT_NEW); 529 } 530 LASSERT(memcmp(new_resid, &oldres->lr_name, 531 sizeof(oldres->lr_name)) != 0); 532 lock->l_resource = newres; 533 unlock_res(oldres); 534 unlock_res_and_lock(lock); 535 536 /* ...and the flowers are still standing! */ 537 lu_ref_del(&oldres->lr_reference, "lock", lock); 538 ldlm_resource_putref(oldres); 539 540 return 0; 541} 542EXPORT_SYMBOL(ldlm_lock_change_resource); 543 544/** \defgroup ldlm_handles LDLM HANDLES 545 * Ways to get hold of locks without any addresses. 546 * @{ 547 */ 548 549/** 550 * Fills in handle for LDLM lock \a lock into supplied \a lockh 551 * Does not take any references. 552 */ 553void ldlm_lock2handle(const struct ldlm_lock *lock, struct lustre_handle *lockh) 554{ 555 lockh->cookie = lock->l_handle.h_cookie; 556} 557EXPORT_SYMBOL(ldlm_lock2handle); 558 559/** 560 * Obtain a lock reference by handle. 561 * 562 * if \a flags: atomically get the lock and set the flags. 563 * Return NULL if flag already set 564 */ 565struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle, 566 __u64 flags) 567{ 568 struct ldlm_lock *lock; 569 570 LASSERT(handle); 571 572 lock = class_handle2object(handle->cookie); 573 if (lock == NULL) 574 return NULL; 575 576 /* It's unlikely but possible that someone marked the lock as 577 * destroyed after we did handle2object on it */ 578 if (flags == 0 && ((lock->l_flags & LDLM_FL_DESTROYED)== 0)) { 579 lu_ref_add(&lock->l_reference, "handle", current); 580 return lock; 581 } 582 583 lock_res_and_lock(lock); 584 585 LASSERT(lock->l_resource != NULL); 586 587 lu_ref_add_atomic(&lock->l_reference, "handle", current); 588 if (unlikely(lock->l_flags & LDLM_FL_DESTROYED)) { 589 unlock_res_and_lock(lock); 590 CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock); 591 LDLM_LOCK_PUT(lock); 592 return NULL; 593 } 594 595 if (flags && (lock->l_flags & flags)) { 596 unlock_res_and_lock(lock); 597 LDLM_LOCK_PUT(lock); 598 return NULL; 599 } 600 601 if (flags) 602 lock->l_flags |= flags; 603 604 unlock_res_and_lock(lock); 605 return lock; 606} 607EXPORT_SYMBOL(__ldlm_handle2lock); 608/** @} ldlm_handles */ 609 610/** 611 * Fill in "on the wire" representation for given LDLM lock into supplied 612 * lock descriptor \a desc structure. 613 */ 614void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc) 615{ 616 ldlm_res2desc(lock->l_resource, &desc->l_resource); 617 desc->l_req_mode = lock->l_req_mode; 618 desc->l_granted_mode = lock->l_granted_mode; 619 ldlm_convert_policy_to_wire(lock->l_resource->lr_type, 620 &lock->l_policy_data, 621 &desc->l_policy_data); 622} 623EXPORT_SYMBOL(ldlm_lock2desc); 624 625/** 626 * Add a lock to list of conflicting locks to send AST to. 627 * 628 * Only add if we have not sent a blocking AST to the lock yet. 629 */ 630void ldlm_add_bl_work_item(struct ldlm_lock *lock, struct ldlm_lock *new, 631 struct list_head *work_list) 632{ 633 if ((lock->l_flags & LDLM_FL_AST_SENT) == 0) { 634 LDLM_DEBUG(lock, "lock incompatible; sending blocking AST."); 635 lock->l_flags |= LDLM_FL_AST_SENT; 636 /* If the enqueuing client said so, tell the AST recipient to 637 * discard dirty data, rather than writing back. */ 638 if (new->l_flags & LDLM_FL_AST_DISCARD_DATA) 639 lock->l_flags |= LDLM_FL_DISCARD_DATA; 640 LASSERT(list_empty(&lock->l_bl_ast)); 641 list_add(&lock->l_bl_ast, work_list); 642 LDLM_LOCK_GET(lock); 643 LASSERT(lock->l_blocking_lock == NULL); 644 lock->l_blocking_lock = LDLM_LOCK_GET(new); 645 } 646} 647 648/** 649 * Add a lock to list of just granted locks to send completion AST to. 650 */ 651void ldlm_add_cp_work_item(struct ldlm_lock *lock, struct list_head *work_list) 652{ 653 if ((lock->l_flags & LDLM_FL_CP_REQD) == 0) { 654 lock->l_flags |= LDLM_FL_CP_REQD; 655 LDLM_DEBUG(lock, "lock granted; sending completion AST."); 656 LASSERT(list_empty(&lock->l_cp_ast)); 657 list_add(&lock->l_cp_ast, work_list); 658 LDLM_LOCK_GET(lock); 659 } 660} 661 662/** 663 * Aggregator function to add AST work items into a list. Determines 664 * what sort of an AST work needs to be done and calls the proper 665 * adding function. 666 * Must be called with lr_lock held. 667 */ 668void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new, 669 struct list_head *work_list) 670{ 671 check_res_locked(lock->l_resource); 672 if (new) 673 ldlm_add_bl_work_item(lock, new, work_list); 674 else 675 ldlm_add_cp_work_item(lock, work_list); 676} 677 678/** 679 * Add specified reader/writer reference to LDLM lock with handle \a lockh. 680 * r/w reference type is determined by \a mode 681 * Calls ldlm_lock_addref_internal. 682 */ 683void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode) 684{ 685 struct ldlm_lock *lock; 686 687 lock = ldlm_handle2lock(lockh); 688 LASSERT(lock != NULL); 689 ldlm_lock_addref_internal(lock, mode); 690 LDLM_LOCK_PUT(lock); 691} 692EXPORT_SYMBOL(ldlm_lock_addref); 693 694/** 695 * Helper function. 696 * Add specified reader/writer reference to LDLM lock \a lock. 697 * r/w reference type is determined by \a mode 698 * Removes lock from LRU if it is there. 699 * Assumes the LDLM lock is already locked. 700 */ 701void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock, __u32 mode) 702{ 703 ldlm_lock_remove_from_lru(lock); 704 if (mode & (LCK_NL | LCK_CR | LCK_PR)) { 705 lock->l_readers++; 706 lu_ref_add_atomic(&lock->l_reference, "reader", lock); 707 } 708 if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) { 709 lock->l_writers++; 710 lu_ref_add_atomic(&lock->l_reference, "writer", lock); 711 } 712 LDLM_LOCK_GET(lock); 713 lu_ref_add_atomic(&lock->l_reference, "user", lock); 714 LDLM_DEBUG(lock, "ldlm_lock_addref(%s)", ldlm_lockname[mode]); 715} 716 717/** 718 * Attempts to add reader/writer reference to a lock with handle \a lockh, and 719 * fails if lock is already LDLM_FL_CBPENDING or destroyed. 720 * 721 * \retval 0 success, lock was addref-ed 722 * 723 * \retval -EAGAIN lock is being canceled. 724 */ 725int ldlm_lock_addref_try(struct lustre_handle *lockh, __u32 mode) 726{ 727 struct ldlm_lock *lock; 728 int result; 729 730 result = -EAGAIN; 731 lock = ldlm_handle2lock(lockh); 732 if (lock != NULL) { 733 lock_res_and_lock(lock); 734 if (lock->l_readers != 0 || lock->l_writers != 0 || 735 !(lock->l_flags & LDLM_FL_CBPENDING)) { 736 ldlm_lock_addref_internal_nolock(lock, mode); 737 result = 0; 738 } 739 unlock_res_and_lock(lock); 740 LDLM_LOCK_PUT(lock); 741 } 742 return result; 743} 744EXPORT_SYMBOL(ldlm_lock_addref_try); 745 746/** 747 * Add specified reader/writer reference to LDLM lock \a lock. 748 * Locks LDLM lock and calls ldlm_lock_addref_internal_nolock to do the work. 749 * Only called for local locks. 750 */ 751void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode) 752{ 753 lock_res_and_lock(lock); 754 ldlm_lock_addref_internal_nolock(lock, mode); 755 unlock_res_and_lock(lock); 756} 757 758/** 759 * Removes reader/writer reference for LDLM lock \a lock. 760 * Assumes LDLM lock is already locked. 761 * only called in ldlm_flock_destroy and for local locks. 762 * Does NOT add lock to LRU if no r/w references left to accommodate flock locks 763 * that cannot be placed in LRU. 764 */ 765void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode) 766{ 767 LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]); 768 if (mode & (LCK_NL | LCK_CR | LCK_PR)) { 769 LASSERT(lock->l_readers > 0); 770 lu_ref_del(&lock->l_reference, "reader", lock); 771 lock->l_readers--; 772 } 773 if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) { 774 LASSERT(lock->l_writers > 0); 775 lu_ref_del(&lock->l_reference, "writer", lock); 776 lock->l_writers--; 777 } 778 779 lu_ref_del(&lock->l_reference, "user", lock); 780 LDLM_LOCK_RELEASE(lock); /* matches the LDLM_LOCK_GET() in addref */ 781} 782 783/** 784 * Removes reader/writer reference for LDLM lock \a lock. 785 * Locks LDLM lock first. 786 * If the lock is determined to be client lock on a client and r/w refcount 787 * drops to zero and the lock is not blocked, the lock is added to LRU lock 788 * on the namespace. 789 * For blocked LDLM locks if r/w count drops to zero, blocking_ast is called. 790 */ 791void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) 792{ 793 struct ldlm_namespace *ns; 794 795 lock_res_and_lock(lock); 796 797 ns = ldlm_lock_to_ns(lock); 798 799 ldlm_lock_decref_internal_nolock(lock, mode); 800 801 if (lock->l_flags & LDLM_FL_LOCAL && 802 !lock->l_readers && !lock->l_writers) { 803 /* If this is a local lock on a server namespace and this was 804 * the last reference, cancel the lock. */ 805 CDEBUG(D_INFO, "forcing cancel of local lock\n"); 806 lock->l_flags |= LDLM_FL_CBPENDING; 807 } 808 809 if (!lock->l_readers && !lock->l_writers && 810 (lock->l_flags & LDLM_FL_CBPENDING)) { 811 /* If we received a blocked AST and this was the last reference, 812 * run the callback. */ 813 if ((lock->l_flags & LDLM_FL_NS_SRV) && lock->l_export) 814 CERROR("FL_CBPENDING set on non-local lock--just a " 815 "warning\n"); 816 817 LDLM_DEBUG(lock, "final decref done on cbpending lock"); 818 819 LDLM_LOCK_GET(lock); /* dropped by bl thread */ 820 ldlm_lock_remove_from_lru(lock); 821 unlock_res_and_lock(lock); 822 823 if (lock->l_flags & LDLM_FL_FAIL_LOC) 824 OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE); 825 826 if ((lock->l_flags & LDLM_FL_ATOMIC_CB) || 827 ldlm_bl_to_thread_lock(ns, NULL, lock) != 0) 828 ldlm_handle_bl_callback(ns, NULL, lock); 829 } else if (ns_is_client(ns) && 830 !lock->l_readers && !lock->l_writers && 831 !(lock->l_flags & LDLM_FL_NO_LRU) && 832 !(lock->l_flags & LDLM_FL_BL_AST)) { 833 834 LDLM_DEBUG(lock, "add lock into lru list"); 835 836 /* If this is a client-side namespace and this was the last 837 * reference, put it on the LRU. */ 838 ldlm_lock_add_to_lru(lock); 839 unlock_res_and_lock(lock); 840 841 if (lock->l_flags & LDLM_FL_FAIL_LOC) 842 OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE); 843 844 /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE 845 * are not supported by the server, otherwise, it is done on 846 * enqueue. */ 847 if (!exp_connect_cancelset(lock->l_conn_export) && 848 !ns_connect_lru_resize(ns)) 849 ldlm_cancel_lru(ns, 0, LCF_ASYNC, 0); 850 } else { 851 LDLM_DEBUG(lock, "do not add lock into lru list"); 852 unlock_res_and_lock(lock); 853 } 854} 855 856/** 857 * Decrease reader/writer refcount for LDLM lock with handle \a lockh 858 */ 859void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode) 860{ 861 struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0); 862 LASSERTF(lock != NULL, "Non-existing lock: %#llx\n", lockh->cookie); 863 ldlm_lock_decref_internal(lock, mode); 864 LDLM_LOCK_PUT(lock); 865} 866EXPORT_SYMBOL(ldlm_lock_decref); 867 868/** 869 * Decrease reader/writer refcount for LDLM lock with handle 870 * \a lockh and mark it for subsequent cancellation once r/w refcount 871 * drops to zero instead of putting into LRU. 872 * 873 * Typical usage is for GROUP locks which we cannot allow to be cached. 874 */ 875void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode) 876{ 877 struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0); 878 879 LASSERT(lock != NULL); 880 881 LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]); 882 lock_res_and_lock(lock); 883 lock->l_flags |= LDLM_FL_CBPENDING; 884 unlock_res_and_lock(lock); 885 ldlm_lock_decref_internal(lock, mode); 886 LDLM_LOCK_PUT(lock); 887} 888EXPORT_SYMBOL(ldlm_lock_decref_and_cancel); 889 890struct sl_insert_point { 891 struct list_head *res_link; 892 struct list_head *mode_link; 893 struct list_head *policy_link; 894}; 895 896/** 897 * Finds a position to insert the new lock into granted lock list. 898 * 899 * Used for locks eligible for skiplist optimization. 900 * 901 * Parameters: 902 * queue [input]: the granted list where search acts on; 903 * req [input]: the lock whose position to be located; 904 * prev [output]: positions within 3 lists to insert @req to 905 * Return Value: 906 * filled @prev 907 * NOTE: called by 908 * - ldlm_grant_lock_with_skiplist 909 */ 910static void search_granted_lock(struct list_head *queue, 911 struct ldlm_lock *req, 912 struct sl_insert_point *prev) 913{ 914 struct list_head *tmp; 915 struct ldlm_lock *lock, *mode_end, *policy_end; 916 917 list_for_each(tmp, queue) { 918 lock = list_entry(tmp, struct ldlm_lock, l_res_link); 919 920 mode_end = list_entry(lock->l_sl_mode.prev, 921 struct ldlm_lock, l_sl_mode); 922 923 if (lock->l_req_mode != req->l_req_mode) { 924 /* jump to last lock of mode group */ 925 tmp = &mode_end->l_res_link; 926 continue; 927 } 928 929 /* suitable mode group is found */ 930 if (lock->l_resource->lr_type == LDLM_PLAIN) { 931 /* insert point is last lock of the mode group */ 932 prev->res_link = &mode_end->l_res_link; 933 prev->mode_link = &mode_end->l_sl_mode; 934 prev->policy_link = &req->l_sl_policy; 935 return; 936 } else if (lock->l_resource->lr_type == LDLM_IBITS) { 937 for (;;) { 938 policy_end = 939 list_entry(lock->l_sl_policy.prev, 940 struct ldlm_lock, 941 l_sl_policy); 942 943 if (lock->l_policy_data.l_inodebits.bits == 944 req->l_policy_data.l_inodebits.bits) { 945 /* insert point is last lock of 946 * the policy group */ 947 prev->res_link = 948 &policy_end->l_res_link; 949 prev->mode_link = 950 &policy_end->l_sl_mode; 951 prev->policy_link = 952 &policy_end->l_sl_policy; 953 return; 954 } 955 956 if (policy_end == mode_end) 957 /* done with mode group */ 958 break; 959 960 /* go to next policy group within mode group */ 961 tmp = policy_end->l_res_link.next; 962 lock = list_entry(tmp, struct ldlm_lock, 963 l_res_link); 964 } /* loop over policy groups within the mode group */ 965 966 /* insert point is last lock of the mode group, 967 * new policy group is started */ 968 prev->res_link = &mode_end->l_res_link; 969 prev->mode_link = &mode_end->l_sl_mode; 970 prev->policy_link = &req->l_sl_policy; 971 return; 972 } else { 973 LDLM_ERROR(lock, 974 "is not LDLM_PLAIN or LDLM_IBITS lock"); 975 LBUG(); 976 } 977 } 978 979 /* insert point is last lock on the queue, 980 * new mode group and new policy group are started */ 981 prev->res_link = queue->prev; 982 prev->mode_link = &req->l_sl_mode; 983 prev->policy_link = &req->l_sl_policy; 984 return; 985} 986 987/** 988 * Add a lock into resource granted list after a position described by 989 * \a prev. 990 */ 991static void ldlm_granted_list_add_lock(struct ldlm_lock *lock, 992 struct sl_insert_point *prev) 993{ 994 struct ldlm_resource *res = lock->l_resource; 995 996 check_res_locked(res); 997 998 ldlm_resource_dump(D_INFO, res); 999 LDLM_DEBUG(lock, "About to add lock:"); 1000 1001 if (lock->l_flags & LDLM_FL_DESTROYED) { 1002 CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n"); 1003 return; 1004 } 1005 1006 LASSERT(list_empty(&lock->l_res_link)); 1007 LASSERT(list_empty(&lock->l_sl_mode)); 1008 LASSERT(list_empty(&lock->l_sl_policy)); 1009 1010 /* 1011 * lock->link == prev->link means lock is first starting the group. 1012 * Don't re-add to itself to suppress kernel warnings. 1013 */ 1014 if (&lock->l_res_link != prev->res_link) 1015 list_add(&lock->l_res_link, prev->res_link); 1016 if (&lock->l_sl_mode != prev->mode_link) 1017 list_add(&lock->l_sl_mode, prev->mode_link); 1018 if (&lock->l_sl_policy != prev->policy_link) 1019 list_add(&lock->l_sl_policy, prev->policy_link); 1020} 1021 1022/** 1023 * Add a lock to granted list on a resource maintaining skiplist 1024 * correctness. 1025 */ 1026static void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock) 1027{ 1028 struct sl_insert_point prev; 1029 1030 LASSERT(lock->l_req_mode == lock->l_granted_mode); 1031 1032 search_granted_lock(&lock->l_resource->lr_granted, lock, &prev); 1033 ldlm_granted_list_add_lock(lock, &prev); 1034} 1035 1036/** 1037 * Perform lock granting bookkeeping. 1038 * 1039 * Includes putting the lock into granted list and updating lock mode. 1040 * NOTE: called by 1041 * - ldlm_lock_enqueue 1042 * - ldlm_reprocess_queue 1043 * - ldlm_lock_convert 1044 * 1045 * must be called with lr_lock held 1046 */ 1047void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list) 1048{ 1049 struct ldlm_resource *res = lock->l_resource; 1050 1051 check_res_locked(res); 1052 1053 lock->l_granted_mode = lock->l_req_mode; 1054 if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS) 1055 ldlm_grant_lock_with_skiplist(lock); 1056 else if (res->lr_type == LDLM_EXTENT) 1057 ldlm_extent_add_lock(res, lock); 1058 else 1059 ldlm_resource_add_lock(res, &res->lr_granted, lock); 1060 1061 if (lock->l_granted_mode < res->lr_most_restr) 1062 res->lr_most_restr = lock->l_granted_mode; 1063 1064 if (work_list && lock->l_completion_ast != NULL) 1065 ldlm_add_ast_work_item(lock, NULL, work_list); 1066 1067 ldlm_pool_add(&ldlm_res_to_ns(res)->ns_pool, lock); 1068} 1069 1070/** 1071 * Search for a lock with given properties in a queue. 1072 * 1073 * \retval a referenced lock or NULL. See the flag descriptions below, in the 1074 * comment above ldlm_lock_match 1075 */ 1076static struct ldlm_lock *search_queue(struct list_head *queue, 1077 ldlm_mode_t *mode, 1078 ldlm_policy_data_t *policy, 1079 struct ldlm_lock *old_lock, 1080 __u64 flags, int unref) 1081{ 1082 struct ldlm_lock *lock; 1083 struct list_head *tmp; 1084 1085 list_for_each(tmp, queue) { 1086 ldlm_mode_t match; 1087 1088 lock = list_entry(tmp, struct ldlm_lock, l_res_link); 1089 1090 if (lock == old_lock) 1091 break; 1092 1093 /* Check if this lock can be matched. 1094 * Used by LU-2919(exclusive open) for open lease lock */ 1095 if (ldlm_is_excl(lock)) 1096 continue; 1097 1098 /* llite sometimes wants to match locks that will be 1099 * canceled when their users drop, but we allow it to match 1100 * if it passes in CBPENDING and the lock still has users. 1101 * this is generally only going to be used by children 1102 * whose parents already hold a lock so forward progress 1103 * can still happen. */ 1104 if (lock->l_flags & LDLM_FL_CBPENDING && 1105 !(flags & LDLM_FL_CBPENDING)) 1106 continue; 1107 if (!unref && lock->l_flags & LDLM_FL_CBPENDING && 1108 lock->l_readers == 0 && lock->l_writers == 0) 1109 continue; 1110 1111 if (!(lock->l_req_mode & *mode)) 1112 continue; 1113 match = lock->l_req_mode; 1114 1115 if (lock->l_resource->lr_type == LDLM_EXTENT && 1116 (lock->l_policy_data.l_extent.start > 1117 policy->l_extent.start || 1118 lock->l_policy_data.l_extent.end < policy->l_extent.end)) 1119 continue; 1120 1121 if (unlikely(match == LCK_GROUP) && 1122 lock->l_resource->lr_type == LDLM_EXTENT && 1123 lock->l_policy_data.l_extent.gid != policy->l_extent.gid) 1124 continue; 1125 1126 /* We match if we have existing lock with same or wider set 1127 of bits. */ 1128 if (lock->l_resource->lr_type == LDLM_IBITS && 1129 ((lock->l_policy_data.l_inodebits.bits & 1130 policy->l_inodebits.bits) != 1131 policy->l_inodebits.bits)) 1132 continue; 1133 1134 if (!unref && (lock->l_flags & LDLM_FL_GONE_MASK)) 1135 continue; 1136 1137 if ((flags & LDLM_FL_LOCAL_ONLY) && 1138 !(lock->l_flags & LDLM_FL_LOCAL)) 1139 continue; 1140 1141 if (flags & LDLM_FL_TEST_LOCK) { 1142 LDLM_LOCK_GET(lock); 1143 ldlm_lock_touch_in_lru(lock); 1144 } else { 1145 ldlm_lock_addref_internal_nolock(lock, match); 1146 } 1147 *mode = match; 1148 return lock; 1149 } 1150 1151 return NULL; 1152} 1153 1154void ldlm_lock_fail_match_locked(struct ldlm_lock *lock) 1155{ 1156 if ((lock->l_flags & LDLM_FL_FAIL_NOTIFIED) == 0) { 1157 lock->l_flags |= LDLM_FL_FAIL_NOTIFIED; 1158 wake_up_all(&lock->l_waitq); 1159 } 1160} 1161EXPORT_SYMBOL(ldlm_lock_fail_match_locked); 1162 1163void ldlm_lock_fail_match(struct ldlm_lock *lock) 1164{ 1165 lock_res_and_lock(lock); 1166 ldlm_lock_fail_match_locked(lock); 1167 unlock_res_and_lock(lock); 1168} 1169EXPORT_SYMBOL(ldlm_lock_fail_match); 1170 1171/** 1172 * Mark lock as "matchable" by OST. 1173 * 1174 * Used to prevent certain races in LOV/OSC where the lock is granted, but LVB 1175 * is not yet valid. 1176 * Assumes LDLM lock is already locked. 1177 */ 1178void ldlm_lock_allow_match_locked(struct ldlm_lock *lock) 1179{ 1180 lock->l_flags |= LDLM_FL_LVB_READY; 1181 wake_up_all(&lock->l_waitq); 1182} 1183EXPORT_SYMBOL(ldlm_lock_allow_match_locked); 1184 1185/** 1186 * Mark lock as "matchable" by OST. 1187 * Locks the lock and then \see ldlm_lock_allow_match_locked 1188 */ 1189void ldlm_lock_allow_match(struct ldlm_lock *lock) 1190{ 1191 lock_res_and_lock(lock); 1192 ldlm_lock_allow_match_locked(lock); 1193 unlock_res_and_lock(lock); 1194} 1195EXPORT_SYMBOL(ldlm_lock_allow_match); 1196 1197/** 1198 * Attempt to find a lock with specified properties. 1199 * 1200 * Typically returns a reference to matched lock unless LDLM_FL_TEST_LOCK is 1201 * set in \a flags 1202 * 1203 * Can be called in two ways: 1204 * 1205 * If 'ns' is NULL, then lockh describes an existing lock that we want to look 1206 * for a duplicate of. 1207 * 1208 * Otherwise, all of the fields must be filled in, to match against. 1209 * 1210 * If 'flags' contains LDLM_FL_LOCAL_ONLY, then only match local locks on the 1211 * server (ie, connh is NULL) 1212 * If 'flags' contains LDLM_FL_BLOCK_GRANTED, then only locks on the granted 1213 * list will be considered 1214 * If 'flags' contains LDLM_FL_CBPENDING, then locks that have been marked 1215 * to be canceled can still be matched as long as they still have reader 1216 * or writer referneces 1217 * If 'flags' contains LDLM_FL_TEST_LOCK, then don't actually reference a lock, 1218 * just tell us if we would have matched. 1219 * 1220 * \retval 1 if it finds an already-existing lock that is compatible; in this 1221 * case, lockh is filled in with a addref()ed lock 1222 * 1223 * We also check security context, and if that fails we simply return 0 (to 1224 * keep caller code unchanged), the context failure will be discovered by 1225 * caller sometime later. 1226 */ 1227ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, __u64 flags, 1228 const struct ldlm_res_id *res_id, ldlm_type_t type, 1229 ldlm_policy_data_t *policy, ldlm_mode_t mode, 1230 struct lustre_handle *lockh, int unref) 1231{ 1232 struct ldlm_resource *res; 1233 struct ldlm_lock *lock, *old_lock = NULL; 1234 int rc = 0; 1235 1236 if (ns == NULL) { 1237 old_lock = ldlm_handle2lock(lockh); 1238 LASSERT(old_lock); 1239 1240 ns = ldlm_lock_to_ns(old_lock); 1241 res_id = &old_lock->l_resource->lr_name; 1242 type = old_lock->l_resource->lr_type; 1243 mode = old_lock->l_req_mode; 1244 } 1245 1246 res = ldlm_resource_get(ns, NULL, res_id, type, 0); 1247 if (res == NULL) { 1248 LASSERT(old_lock == NULL); 1249 return 0; 1250 } 1251 1252 LDLM_RESOURCE_ADDREF(res); 1253 lock_res(res); 1254 1255 lock = search_queue(&res->lr_granted, &mode, policy, old_lock, 1256 flags, unref); 1257 if (lock != NULL) { 1258 rc = 1; 1259 goto out; 1260 } 1261 if (flags & LDLM_FL_BLOCK_GRANTED) { 1262 rc = 0; 1263 goto out; 1264 } 1265 lock = search_queue(&res->lr_converting, &mode, policy, old_lock, 1266 flags, unref); 1267 if (lock != NULL) { 1268 rc = 1; 1269 goto out; 1270 } 1271 lock = search_queue(&res->lr_waiting, &mode, policy, old_lock, 1272 flags, unref); 1273 if (lock != NULL) { 1274 rc = 1; 1275 goto out; 1276 } 1277 1278 out: 1279 unlock_res(res); 1280 LDLM_RESOURCE_DELREF(res); 1281 ldlm_resource_putref(res); 1282 1283 if (lock) { 1284 ldlm_lock2handle(lock, lockh); 1285 if ((flags & LDLM_FL_LVB_READY) && 1286 (!(lock->l_flags & LDLM_FL_LVB_READY))) { 1287 __u64 wait_flags = LDLM_FL_LVB_READY | 1288 LDLM_FL_DESTROYED | LDLM_FL_FAIL_NOTIFIED; 1289 struct l_wait_info lwi; 1290 if (lock->l_completion_ast) { 1291 int err = lock->l_completion_ast(lock, 1292 LDLM_FL_WAIT_NOREPROC, 1293 NULL); 1294 if (err) { 1295 if (flags & LDLM_FL_TEST_LOCK) 1296 LDLM_LOCK_RELEASE(lock); 1297 else 1298 ldlm_lock_decref_internal(lock, 1299 mode); 1300 rc = 0; 1301 goto out2; 1302 } 1303 } 1304 1305 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout), 1306 NULL, LWI_ON_SIGNAL_NOOP, NULL); 1307 1308 /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */ 1309 l_wait_event(lock->l_waitq, 1310 lock->l_flags & wait_flags, 1311 &lwi); 1312 if (!(lock->l_flags & LDLM_FL_LVB_READY)) { 1313 if (flags & LDLM_FL_TEST_LOCK) 1314 LDLM_LOCK_RELEASE(lock); 1315 else 1316 ldlm_lock_decref_internal(lock, mode); 1317 rc = 0; 1318 } 1319 } 1320 } 1321 out2: 1322 if (rc) { 1323 LDLM_DEBUG(lock, "matched (%llu %llu)", 1324 (type == LDLM_PLAIN || type == LDLM_IBITS) ? 1325 res_id->name[2] : policy->l_extent.start, 1326 (type == LDLM_PLAIN || type == LDLM_IBITS) ? 1327 res_id->name[3] : policy->l_extent.end); 1328 1329 /* check user's security context */ 1330 if (lock->l_conn_export && 1331 sptlrpc_import_check_ctx( 1332 class_exp2cliimp(lock->l_conn_export))) { 1333 if (!(flags & LDLM_FL_TEST_LOCK)) 1334 ldlm_lock_decref_internal(lock, mode); 1335 rc = 0; 1336 } 1337 1338 if (flags & LDLM_FL_TEST_LOCK) 1339 LDLM_LOCK_RELEASE(lock); 1340 1341 } else if (!(flags & LDLM_FL_TEST_LOCK)) {/*less verbose for test-only*/ 1342 LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res %llu/%llu (%llu %llu)", 1343 ns, type, mode, res_id->name[0], res_id->name[1], 1344 (type == LDLM_PLAIN || type == LDLM_IBITS) ? 1345 res_id->name[2] :policy->l_extent.start, 1346 (type == LDLM_PLAIN || type == LDLM_IBITS) ? 1347 res_id->name[3] : policy->l_extent.end); 1348 } 1349 if (old_lock) 1350 LDLM_LOCK_PUT(old_lock); 1351 1352 return rc ? mode : 0; 1353} 1354EXPORT_SYMBOL(ldlm_lock_match); 1355 1356ldlm_mode_t ldlm_revalidate_lock_handle(struct lustre_handle *lockh, 1357 __u64 *bits) 1358{ 1359 struct ldlm_lock *lock; 1360 ldlm_mode_t mode = 0; 1361 1362 lock = ldlm_handle2lock(lockh); 1363 if (lock != NULL) { 1364 lock_res_and_lock(lock); 1365 if (lock->l_flags & LDLM_FL_GONE_MASK) 1366 goto out; 1367 1368 if (lock->l_flags & LDLM_FL_CBPENDING && 1369 lock->l_readers == 0 && lock->l_writers == 0) 1370 goto out; 1371 1372 if (bits) 1373 *bits = lock->l_policy_data.l_inodebits.bits; 1374 mode = lock->l_granted_mode; 1375 ldlm_lock_addref_internal_nolock(lock, mode); 1376 } 1377 1378out: 1379 if (lock != NULL) { 1380 unlock_res_and_lock(lock); 1381 LDLM_LOCK_PUT(lock); 1382 } 1383 return mode; 1384} 1385EXPORT_SYMBOL(ldlm_revalidate_lock_handle); 1386 1387/** The caller must guarantee that the buffer is large enough. */ 1388int ldlm_fill_lvb(struct ldlm_lock *lock, struct req_capsule *pill, 1389 enum req_location loc, void *data, int size) 1390{ 1391 void *lvb; 1392 1393 LASSERT(data != NULL); 1394 LASSERT(size >= 0); 1395 1396 switch (lock->l_lvb_type) { 1397 case LVB_T_OST: 1398 if (size == sizeof(struct ost_lvb)) { 1399 if (loc == RCL_CLIENT) 1400 lvb = req_capsule_client_swab_get(pill, 1401 &RMF_DLM_LVB, 1402 lustre_swab_ost_lvb); 1403 else 1404 lvb = req_capsule_server_swab_get(pill, 1405 &RMF_DLM_LVB, 1406 lustre_swab_ost_lvb); 1407 if (unlikely(lvb == NULL)) { 1408 LDLM_ERROR(lock, "no LVB"); 1409 return -EPROTO; 1410 } 1411 1412 memcpy(data, lvb, size); 1413 } else if (size == sizeof(struct ost_lvb_v1)) { 1414 struct ost_lvb *olvb = data; 1415 1416 if (loc == RCL_CLIENT) 1417 lvb = req_capsule_client_swab_get(pill, 1418 &RMF_DLM_LVB, 1419 lustre_swab_ost_lvb_v1); 1420 else 1421 lvb = req_capsule_server_sized_swab_get(pill, 1422 &RMF_DLM_LVB, size, 1423 lustre_swab_ost_lvb_v1); 1424 if (unlikely(lvb == NULL)) { 1425 LDLM_ERROR(lock, "no LVB"); 1426 return -EPROTO; 1427 } 1428 1429 memcpy(data, lvb, size); 1430 olvb->lvb_mtime_ns = 0; 1431 olvb->lvb_atime_ns = 0; 1432 olvb->lvb_ctime_ns = 0; 1433 } else { 1434 LDLM_ERROR(lock, "Replied unexpected ost LVB size %d", 1435 size); 1436 return -EINVAL; 1437 } 1438 break; 1439 case LVB_T_LQUOTA: 1440 if (size == sizeof(struct lquota_lvb)) { 1441 if (loc == RCL_CLIENT) 1442 lvb = req_capsule_client_swab_get(pill, 1443 &RMF_DLM_LVB, 1444 lustre_swab_lquota_lvb); 1445 else 1446 lvb = req_capsule_server_swab_get(pill, 1447 &RMF_DLM_LVB, 1448 lustre_swab_lquota_lvb); 1449 if (unlikely(lvb == NULL)) { 1450 LDLM_ERROR(lock, "no LVB"); 1451 return -EPROTO; 1452 } 1453 1454 memcpy(data, lvb, size); 1455 } else { 1456 LDLM_ERROR(lock, "Replied unexpected lquota LVB size %d", 1457 size); 1458 return -EINVAL; 1459 } 1460 break; 1461 case LVB_T_LAYOUT: 1462 if (size == 0) 1463 break; 1464 1465 if (loc == RCL_CLIENT) 1466 lvb = req_capsule_client_get(pill, &RMF_DLM_LVB); 1467 else 1468 lvb = req_capsule_server_get(pill, &RMF_DLM_LVB); 1469 if (unlikely(lvb == NULL)) { 1470 LDLM_ERROR(lock, "no LVB"); 1471 return -EPROTO; 1472 } 1473 1474 memcpy(data, lvb, size); 1475 break; 1476 default: 1477 LDLM_ERROR(lock, "Unknown LVB type: %d\n", lock->l_lvb_type); 1478 dump_stack(); 1479 return -EINVAL; 1480 } 1481 1482 return 0; 1483} 1484 1485/** 1486 * Create and fill in new LDLM lock with specified properties. 1487 * Returns a referenced lock 1488 */ 1489struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns, 1490 const struct ldlm_res_id *res_id, 1491 ldlm_type_t type, 1492 ldlm_mode_t mode, 1493 const struct ldlm_callback_suite *cbs, 1494 void *data, __u32 lvb_len, 1495 enum lvb_type lvb_type) 1496{ 1497 struct ldlm_lock *lock; 1498 struct ldlm_resource *res; 1499 1500 res = ldlm_resource_get(ns, NULL, res_id, type, 1); 1501 if (res == NULL) 1502 return NULL; 1503 1504 lock = ldlm_lock_new(res); 1505 1506 if (lock == NULL) 1507 return NULL; 1508 1509 lock->l_req_mode = mode; 1510 lock->l_ast_data = data; 1511 lock->l_pid = current_pid(); 1512 if (ns_is_server(ns)) 1513 lock->l_flags |= LDLM_FL_NS_SRV; 1514 if (cbs) { 1515 lock->l_blocking_ast = cbs->lcs_blocking; 1516 lock->l_completion_ast = cbs->lcs_completion; 1517 lock->l_glimpse_ast = cbs->lcs_glimpse; 1518 } 1519 1520 lock->l_tree_node = NULL; 1521 /* if this is the extent lock, allocate the interval tree node */ 1522 if (type == LDLM_EXTENT) { 1523 if (ldlm_interval_alloc(lock) == NULL) 1524 goto out; 1525 } 1526 1527 if (lvb_len) { 1528 lock->l_lvb_len = lvb_len; 1529 OBD_ALLOC(lock->l_lvb_data, lvb_len); 1530 if (lock->l_lvb_data == NULL) 1531 goto out; 1532 } 1533 1534 lock->l_lvb_type = lvb_type; 1535 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_NEW_LOCK)) 1536 goto out; 1537 1538 return lock; 1539 1540out: 1541 ldlm_lock_destroy(lock); 1542 LDLM_LOCK_RELEASE(lock); 1543 return NULL; 1544} 1545 1546/** 1547 * Enqueue (request) a lock. 1548 * 1549 * Does not block. As a result of enqueue the lock would be put 1550 * into granted or waiting list. 1551 * 1552 * If namespace has intent policy sent and the lock has LDLM_FL_HAS_INTENT flag 1553 * set, skip all the enqueueing and delegate lock processing to intent policy 1554 * function. 1555 */ 1556ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns, 1557 struct ldlm_lock **lockp, 1558 void *cookie, __u64 *flags) 1559{ 1560 struct ldlm_lock *lock = *lockp; 1561 struct ldlm_resource *res = lock->l_resource; 1562 int local = ns_is_client(ldlm_res_to_ns(res)); 1563 ldlm_error_t rc = ELDLM_OK; 1564 struct ldlm_interval *node = NULL; 1565 1566 lock->l_last_activity = get_seconds(); 1567 /* policies are not executed on the client or during replay */ 1568 if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT 1569 && !local && ns->ns_policy) { 1570 rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags, 1571 NULL); 1572 if (rc == ELDLM_LOCK_REPLACED) { 1573 /* The lock that was returned has already been granted, 1574 * and placed into lockp. If it's not the same as the 1575 * one we passed in, then destroy the old one and our 1576 * work here is done. */ 1577 if (lock != *lockp) { 1578 ldlm_lock_destroy(lock); 1579 LDLM_LOCK_RELEASE(lock); 1580 } 1581 *flags |= LDLM_FL_LOCK_CHANGED; 1582 return 0; 1583 } else if (rc != ELDLM_OK || 1584 (rc == ELDLM_OK && (*flags & LDLM_FL_INTENT_ONLY))) { 1585 ldlm_lock_destroy(lock); 1586 return rc; 1587 } 1588 } 1589 1590 /* For a replaying lock, it might be already in granted list. So 1591 * unlinking the lock will cause the interval node to be freed, we 1592 * have to allocate the interval node early otherwise we can't regrant 1593 * this lock in the future. - jay */ 1594 if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT) 1595 OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS); 1596 1597 lock_res_and_lock(lock); 1598 if (local && lock->l_req_mode == lock->l_granted_mode) { 1599 /* The server returned a blocked lock, but it was granted 1600 * before we got a chance to actually enqueue it. We don't 1601 * need to do anything else. */ 1602 *flags &= ~(LDLM_FL_BLOCK_GRANTED | 1603 LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_WAIT); 1604 goto out; 1605 } 1606 1607 ldlm_resource_unlink_lock(lock); 1608 if (res->lr_type == LDLM_EXTENT && lock->l_tree_node == NULL) { 1609 if (node == NULL) { 1610 ldlm_lock_destroy_nolock(lock); 1611 rc = -ENOMEM; 1612 goto out; 1613 } 1614 1615 INIT_LIST_HEAD(&node->li_group); 1616 ldlm_interval_attach(node, lock); 1617 node = NULL; 1618 } 1619 1620 /* Some flags from the enqueue want to make it into the AST, via the 1621 * lock's l_flags. */ 1622 lock->l_flags |= *flags & LDLM_FL_AST_DISCARD_DATA; 1623 1624 /* This distinction between local lock trees is very important; a client 1625 * namespace only has information about locks taken by that client, and 1626 * thus doesn't have enough information to decide for itself if it can 1627 * be granted (below). In this case, we do exactly what the server 1628 * tells us to do, as dictated by the 'flags'. 1629 * 1630 * We do exactly the same thing during recovery, when the server is 1631 * more or less trusting the clients not to lie. 1632 * 1633 * FIXME (bug 268): Detect obvious lies by checking compatibility in 1634 * granted/converting queues. */ 1635 if (local) { 1636 if (*flags & LDLM_FL_BLOCK_CONV) 1637 ldlm_resource_add_lock(res, &res->lr_converting, lock); 1638 else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED)) 1639 ldlm_resource_add_lock(res, &res->lr_waiting, lock); 1640 else 1641 ldlm_grant_lock(lock, NULL); 1642 goto out; 1643 } else { 1644 CERROR("This is client-side-only module, cannot handle " 1645 "LDLM_NAMESPACE_SERVER resource type lock.\n"); 1646 LBUG(); 1647 } 1648 1649out: 1650 unlock_res_and_lock(lock); 1651 if (node) 1652 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node)); 1653 return rc; 1654} 1655 1656 1657/** 1658 * Process a call to blocking AST callback for a lock in ast_work list 1659 */ 1660static int 1661ldlm_work_bl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq) 1662{ 1663 struct ldlm_cb_set_arg *arg = opaq; 1664 struct ldlm_lock_desc d; 1665 int rc; 1666 struct ldlm_lock *lock; 1667 1668 if (list_empty(arg->list)) 1669 return -ENOENT; 1670 1671 lock = list_entry(arg->list->next, struct ldlm_lock, l_bl_ast); 1672 1673 /* nobody should touch l_bl_ast */ 1674 lock_res_and_lock(lock); 1675 list_del_init(&lock->l_bl_ast); 1676 1677 LASSERT(lock->l_flags & LDLM_FL_AST_SENT); 1678 LASSERT(lock->l_bl_ast_run == 0); 1679 LASSERT(lock->l_blocking_lock); 1680 lock->l_bl_ast_run++; 1681 unlock_res_and_lock(lock); 1682 1683 ldlm_lock2desc(lock->l_blocking_lock, &d); 1684 1685 rc = lock->l_blocking_ast(lock, &d, (void *)arg, LDLM_CB_BLOCKING); 1686 LDLM_LOCK_RELEASE(lock->l_blocking_lock); 1687 lock->l_blocking_lock = NULL; 1688 LDLM_LOCK_RELEASE(lock); 1689 1690 return rc; 1691} 1692 1693/** 1694 * Process a call to completion AST callback for a lock in ast_work list 1695 */ 1696static int 1697ldlm_work_cp_ast_lock(struct ptlrpc_request_set *rqset, void *opaq) 1698{ 1699 struct ldlm_cb_set_arg *arg = opaq; 1700 int rc = 0; 1701 struct ldlm_lock *lock; 1702 ldlm_completion_callback completion_callback; 1703 1704 if (list_empty(arg->list)) 1705 return -ENOENT; 1706 1707 lock = list_entry(arg->list->next, struct ldlm_lock, l_cp_ast); 1708 1709 /* It's possible to receive a completion AST before we've set 1710 * the l_completion_ast pointer: either because the AST arrived 1711 * before the reply, or simply because there's a small race 1712 * window between receiving the reply and finishing the local 1713 * enqueue. (bug 842) 1714 * 1715 * This can't happen with the blocking_ast, however, because we 1716 * will never call the local blocking_ast until we drop our 1717 * reader/writer reference, which we won't do until we get the 1718 * reply and finish enqueueing. */ 1719 1720 /* nobody should touch l_cp_ast */ 1721 lock_res_and_lock(lock); 1722 list_del_init(&lock->l_cp_ast); 1723 LASSERT(lock->l_flags & LDLM_FL_CP_REQD); 1724 /* save l_completion_ast since it can be changed by 1725 * mds_intent_policy(), see bug 14225 */ 1726 completion_callback = lock->l_completion_ast; 1727 lock->l_flags &= ~LDLM_FL_CP_REQD; 1728 unlock_res_and_lock(lock); 1729 1730 if (completion_callback != NULL) 1731 rc = completion_callback(lock, 0, (void *)arg); 1732 LDLM_LOCK_RELEASE(lock); 1733 1734 return rc; 1735} 1736 1737/** 1738 * Process a call to revocation AST callback for a lock in ast_work list 1739 */ 1740static int 1741ldlm_work_revoke_ast_lock(struct ptlrpc_request_set *rqset, void *opaq) 1742{ 1743 struct ldlm_cb_set_arg *arg = opaq; 1744 struct ldlm_lock_desc desc; 1745 int rc; 1746 struct ldlm_lock *lock; 1747 1748 if (list_empty(arg->list)) 1749 return -ENOENT; 1750 1751 lock = list_entry(arg->list->next, struct ldlm_lock, l_rk_ast); 1752 list_del_init(&lock->l_rk_ast); 1753 1754 /* the desc just pretend to exclusive */ 1755 ldlm_lock2desc(lock, &desc); 1756 desc.l_req_mode = LCK_EX; 1757 desc.l_granted_mode = 0; 1758 1759 rc = lock->l_blocking_ast(lock, &desc, (void *)arg, LDLM_CB_BLOCKING); 1760 LDLM_LOCK_RELEASE(lock); 1761 1762 return rc; 1763} 1764 1765/** 1766 * Process a call to glimpse AST callback for a lock in ast_work list 1767 */ 1768int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq) 1769{ 1770 struct ldlm_cb_set_arg *arg = opaq; 1771 struct ldlm_glimpse_work *gl_work; 1772 struct ldlm_lock *lock; 1773 int rc = 0; 1774 1775 if (list_empty(arg->list)) 1776 return -ENOENT; 1777 1778 gl_work = list_entry(arg->list->next, struct ldlm_glimpse_work, 1779 gl_list); 1780 list_del_init(&gl_work->gl_list); 1781 1782 lock = gl_work->gl_lock; 1783 1784 /* transfer the glimpse descriptor to ldlm_cb_set_arg */ 1785 arg->gl_desc = gl_work->gl_desc; 1786 1787 /* invoke the actual glimpse callback */ 1788 if (lock->l_glimpse_ast(lock, (void *)arg) == 0) 1789 rc = 1; 1790 1791 LDLM_LOCK_RELEASE(lock); 1792 1793 if ((gl_work->gl_flags & LDLM_GL_WORK_NOFREE) == 0) 1794 OBD_FREE_PTR(gl_work); 1795 1796 return rc; 1797} 1798 1799/** 1800 * Process list of locks in need of ASTs being sent. 1801 * 1802 * Used on server to send multiple ASTs together instead of sending one by 1803 * one. 1804 */ 1805int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list, 1806 ldlm_desc_ast_t ast_type) 1807{ 1808 struct ldlm_cb_set_arg *arg; 1809 set_producer_func work_ast_lock; 1810 int rc; 1811 1812 if (list_empty(rpc_list)) 1813 return 0; 1814 1815 OBD_ALLOC_PTR(arg); 1816 if (arg == NULL) 1817 return -ENOMEM; 1818 1819 atomic_set(&arg->restart, 0); 1820 arg->list = rpc_list; 1821 1822 switch (ast_type) { 1823 case LDLM_WORK_BL_AST: 1824 arg->type = LDLM_BL_CALLBACK; 1825 work_ast_lock = ldlm_work_bl_ast_lock; 1826 break; 1827 case LDLM_WORK_CP_AST: 1828 arg->type = LDLM_CP_CALLBACK; 1829 work_ast_lock = ldlm_work_cp_ast_lock; 1830 break; 1831 case LDLM_WORK_REVOKE_AST: 1832 arg->type = LDLM_BL_CALLBACK; 1833 work_ast_lock = ldlm_work_revoke_ast_lock; 1834 break; 1835 case LDLM_WORK_GL_AST: 1836 arg->type = LDLM_GL_CALLBACK; 1837 work_ast_lock = ldlm_work_gl_ast_lock; 1838 break; 1839 default: 1840 LBUG(); 1841 } 1842 1843 /* We create a ptlrpc request set with flow control extension. 1844 * This request set will use the work_ast_lock function to produce new 1845 * requests and will send a new request each time one completes in order 1846 * to keep the number of requests in flight to ns_max_parallel_ast */ 1847 arg->set = ptlrpc_prep_fcset(ns->ns_max_parallel_ast ? : UINT_MAX, 1848 work_ast_lock, arg); 1849 if (arg->set == NULL) { 1850 rc = -ENOMEM; 1851 goto out; 1852 } 1853 1854 ptlrpc_set_wait(arg->set); 1855 ptlrpc_set_destroy(arg->set); 1856 1857 rc = atomic_read(&arg->restart) ? -ERESTART : 0; 1858 goto out; 1859out: 1860 OBD_FREE_PTR(arg); 1861 return rc; 1862} 1863 1864static int reprocess_one_queue(struct ldlm_resource *res, void *closure) 1865{ 1866 ldlm_reprocess_all(res); 1867 return LDLM_ITER_CONTINUE; 1868} 1869 1870static int ldlm_reprocess_res(struct cfs_hash *hs, struct cfs_hash_bd *bd, 1871 struct hlist_node *hnode, void *arg) 1872{ 1873 struct ldlm_resource *res = cfs_hash_object(hs, hnode); 1874 int rc; 1875 1876 rc = reprocess_one_queue(res, arg); 1877 1878 return rc == LDLM_ITER_STOP; 1879} 1880 1881/** 1882 * Iterate through all resources on a namespace attempting to grant waiting 1883 * locks. 1884 */ 1885void ldlm_reprocess_all_ns(struct ldlm_namespace *ns) 1886{ 1887 if (ns != NULL) { 1888 cfs_hash_for_each_nolock(ns->ns_rs_hash, 1889 ldlm_reprocess_res, NULL); 1890 } 1891} 1892EXPORT_SYMBOL(ldlm_reprocess_all_ns); 1893 1894/** 1895 * Try to grant all waiting locks on a resource. 1896 * 1897 * Calls ldlm_reprocess_queue on converting and waiting queues. 1898 * 1899 * Typically called after some resource locks are cancelled to see 1900 * if anything could be granted as a result of the cancellation. 1901 */ 1902void ldlm_reprocess_all(struct ldlm_resource *res) 1903{ 1904 LIST_HEAD(rpc_list); 1905 1906 if (!ns_is_client(ldlm_res_to_ns(res))) { 1907 CERROR("This is client-side-only module, cannot handle " 1908 "LDLM_NAMESPACE_SERVER resource type lock.\n"); 1909 LBUG(); 1910 } 1911} 1912 1913/** 1914 * Helper function to call blocking AST for LDLM lock \a lock in a 1915 * "cancelling" mode. 1916 */ 1917void ldlm_cancel_callback(struct ldlm_lock *lock) 1918{ 1919 check_res_locked(lock->l_resource); 1920 if (!(lock->l_flags & LDLM_FL_CANCEL)) { 1921 lock->l_flags |= LDLM_FL_CANCEL; 1922 if (lock->l_blocking_ast) { 1923 unlock_res_and_lock(lock); 1924 lock->l_blocking_ast(lock, NULL, lock->l_ast_data, 1925 LDLM_CB_CANCELING); 1926 lock_res_and_lock(lock); 1927 } else { 1928 LDLM_DEBUG(lock, "no blocking ast"); 1929 } 1930 } 1931 lock->l_flags |= LDLM_FL_BL_DONE; 1932} 1933 1934/** 1935 * Remove skiplist-enabled LDLM lock \a req from granted list 1936 */ 1937void ldlm_unlink_lock_skiplist(struct ldlm_lock *req) 1938{ 1939 if (req->l_resource->lr_type != LDLM_PLAIN && 1940 req->l_resource->lr_type != LDLM_IBITS) 1941 return; 1942 1943 list_del_init(&req->l_sl_policy); 1944 list_del_init(&req->l_sl_mode); 1945} 1946 1947/** 1948 * Attempts to cancel LDLM lock \a lock that has no reader/writer references. 1949 */ 1950void ldlm_lock_cancel(struct ldlm_lock *lock) 1951{ 1952 struct ldlm_resource *res; 1953 struct ldlm_namespace *ns; 1954 1955 lock_res_and_lock(lock); 1956 1957 res = lock->l_resource; 1958 ns = ldlm_res_to_ns(res); 1959 1960 /* Please do not, no matter how tempting, remove this LBUG without 1961 * talking to me first. -phik */ 1962 if (lock->l_readers || lock->l_writers) { 1963 LDLM_ERROR(lock, "lock still has references"); 1964 LBUG(); 1965 } 1966 1967 if (lock->l_flags & LDLM_FL_WAITED) 1968 ldlm_del_waiting_lock(lock); 1969 1970 /* Releases cancel callback. */ 1971 ldlm_cancel_callback(lock); 1972 1973 /* Yes, second time, just in case it was added again while we were 1974 * running with no res lock in ldlm_cancel_callback */ 1975 if (lock->l_flags & LDLM_FL_WAITED) 1976 ldlm_del_waiting_lock(lock); 1977 1978 ldlm_resource_unlink_lock(lock); 1979 ldlm_lock_destroy_nolock(lock); 1980 1981 if (lock->l_granted_mode == lock->l_req_mode) 1982 ldlm_pool_del(&ns->ns_pool, lock); 1983 1984 /* Make sure we will not be called again for same lock what is possible 1985 * if not to zero out lock->l_granted_mode */ 1986 lock->l_granted_mode = LCK_MINMODE; 1987 unlock_res_and_lock(lock); 1988} 1989EXPORT_SYMBOL(ldlm_lock_cancel); 1990 1991/** 1992 * Set opaque data into the lock that only makes sense to upper layer. 1993 */ 1994int ldlm_lock_set_data(struct lustre_handle *lockh, void *data) 1995{ 1996 struct ldlm_lock *lock = ldlm_handle2lock(lockh); 1997 int rc = -EINVAL; 1998 1999 if (lock) { 2000 if (lock->l_ast_data == NULL) 2001 lock->l_ast_data = data; 2002 if (lock->l_ast_data == data) 2003 rc = 0; 2004 LDLM_LOCK_PUT(lock); 2005 } 2006 return rc; 2007} 2008EXPORT_SYMBOL(ldlm_lock_set_data); 2009 2010struct export_cl_data { 2011 struct obd_export *ecl_exp; 2012 int ecl_loop; 2013}; 2014 2015/** 2016 * Iterator function for ldlm_cancel_locks_for_export. 2017 * Cancels passed locks. 2018 */ 2019int ldlm_cancel_locks_for_export_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd, 2020 struct hlist_node *hnode, void *data) 2021 2022{ 2023 struct export_cl_data *ecl = (struct export_cl_data *)data; 2024 struct obd_export *exp = ecl->ecl_exp; 2025 struct ldlm_lock *lock = cfs_hash_object(hs, hnode); 2026 struct ldlm_resource *res; 2027 2028 res = ldlm_resource_getref(lock->l_resource); 2029 LDLM_LOCK_GET(lock); 2030 2031 LDLM_DEBUG(lock, "export %p", exp); 2032 ldlm_res_lvbo_update(res, NULL, 1); 2033 ldlm_lock_cancel(lock); 2034 ldlm_reprocess_all(res); 2035 ldlm_resource_putref(res); 2036 LDLM_LOCK_RELEASE(lock); 2037 2038 ecl->ecl_loop++; 2039 if ((ecl->ecl_loop & -ecl->ecl_loop) == ecl->ecl_loop) { 2040 CDEBUG(D_INFO, 2041 "Cancel lock %p for export %p (loop %d), still have " 2042 "%d locks left on hash table.\n", 2043 lock, exp, ecl->ecl_loop, 2044 atomic_read(&hs->hs_count)); 2045 } 2046 2047 return 0; 2048} 2049 2050/** 2051 * Cancel all locks for given export. 2052 * 2053 * Typically called on client disconnection/eviction 2054 */ 2055void ldlm_cancel_locks_for_export(struct obd_export *exp) 2056{ 2057 struct export_cl_data ecl = { 2058 .ecl_exp = exp, 2059 .ecl_loop = 0, 2060 }; 2061 2062 cfs_hash_for_each_empty(exp->exp_lock_hash, 2063 ldlm_cancel_locks_for_export_cb, &ecl); 2064} 2065 2066/** 2067 * Downgrade an exclusive lock. 2068 * 2069 * A fast variant of ldlm_lock_convert for conversion of exclusive 2070 * locks. The conversion is always successful. 2071 * Used by Commit on Sharing (COS) code. 2072 * 2073 * \param lock A lock to convert 2074 * \param new_mode new lock mode 2075 */ 2076void ldlm_lock_downgrade(struct ldlm_lock *lock, int new_mode) 2077{ 2078 LASSERT(lock->l_granted_mode & (LCK_PW | LCK_EX)); 2079 LASSERT(new_mode == LCK_COS); 2080 2081 lock_res_and_lock(lock); 2082 ldlm_resource_unlink_lock(lock); 2083 /* 2084 * Remove the lock from pool as it will be added again in 2085 * ldlm_grant_lock() called below. 2086 */ 2087 ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock); 2088 2089 lock->l_req_mode = new_mode; 2090 ldlm_grant_lock(lock, NULL); 2091 unlock_res_and_lock(lock); 2092 ldlm_reprocess_all(lock->l_resource); 2093} 2094EXPORT_SYMBOL(ldlm_lock_downgrade); 2095 2096/** 2097 * Attempt to convert already granted lock to a different mode. 2098 * 2099 * While lock conversion is not currently used, future client-side 2100 * optimizations could take advantage of it to avoid discarding cached 2101 * pages on a file. 2102 */ 2103struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode, 2104 __u32 *flags) 2105{ 2106 LIST_HEAD(rpc_list); 2107 struct ldlm_resource *res; 2108 struct ldlm_namespace *ns; 2109 int granted = 0; 2110 struct ldlm_interval *node; 2111 2112 /* Just return if mode is unchanged. */ 2113 if (new_mode == lock->l_granted_mode) { 2114 *flags |= LDLM_FL_BLOCK_GRANTED; 2115 return lock->l_resource; 2116 } 2117 2118 /* I can't check the type of lock here because the bitlock of lock 2119 * is not held here, so do the allocation blindly. -jay */ 2120 OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS); 2121 if (node == NULL) 2122 /* Actually, this causes EDEADLOCK to be returned */ 2123 return NULL; 2124 2125 LASSERTF((new_mode == LCK_PW && lock->l_granted_mode == LCK_PR), 2126 "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode); 2127 2128 lock_res_and_lock(lock); 2129 2130 res = lock->l_resource; 2131 ns = ldlm_res_to_ns(res); 2132 2133 lock->l_req_mode = new_mode; 2134 if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS) { 2135 ldlm_resource_unlink_lock(lock); 2136 } else { 2137 ldlm_resource_unlink_lock(lock); 2138 if (res->lr_type == LDLM_EXTENT) { 2139 /* FIXME: ugly code, I have to attach the lock to a 2140 * interval node again since perhaps it will be granted 2141 * soon */ 2142 INIT_LIST_HEAD(&node->li_group); 2143 ldlm_interval_attach(node, lock); 2144 node = NULL; 2145 } 2146 } 2147 2148 /* 2149 * Remove old lock from the pool before adding the lock with new 2150 * mode below in ->policy() 2151 */ 2152 ldlm_pool_del(&ns->ns_pool, lock); 2153 2154 /* If this is a local resource, put it on the appropriate list. */ 2155 if (ns_is_client(ldlm_res_to_ns(res))) { 2156 if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) { 2157 ldlm_resource_add_lock(res, &res->lr_converting, lock); 2158 } else { 2159 /* This should never happen, because of the way the 2160 * server handles conversions. */ 2161 LDLM_ERROR(lock, "Erroneous flags %x on local lock\n", 2162 *flags); 2163 LBUG(); 2164 2165 ldlm_grant_lock(lock, &rpc_list); 2166 granted = 1; 2167 /* FIXME: completion handling not with lr_lock held ! */ 2168 if (lock->l_completion_ast) 2169 lock->l_completion_ast(lock, 0, NULL); 2170 } 2171 } else { 2172 CERROR("This is client-side-only module, cannot handle " 2173 "LDLM_NAMESPACE_SERVER resource type lock.\n"); 2174 LBUG(); 2175 } 2176 unlock_res_and_lock(lock); 2177 2178 if (granted) 2179 ldlm_run_ast_work(ns, &rpc_list, LDLM_WORK_CP_AST); 2180 if (node) 2181 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node)); 2182 return res; 2183} 2184EXPORT_SYMBOL(ldlm_lock_convert); 2185 2186/** 2187 * Print lock with lock handle \a lockh description into debug log. 2188 * 2189 * Used when printing all locks on a resource for debug purposes. 2190 */ 2191void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh) 2192{ 2193 struct ldlm_lock *lock; 2194 2195 if (!((libcfs_debug | D_ERROR) & level)) 2196 return; 2197 2198 lock = ldlm_handle2lock(lockh); 2199 if (lock == NULL) 2200 return; 2201 2202 LDLM_DEBUG_LIMIT(level, lock, "###"); 2203 2204 LDLM_LOCK_PUT(lock); 2205} 2206EXPORT_SYMBOL(ldlm_lock_dump_handle); 2207 2208/** 2209 * Print lock information with custom message into debug log. 2210 * Helper function. 2211 */ 2212void _ldlm_lock_debug(struct ldlm_lock *lock, 2213 struct libcfs_debug_msg_data *msgdata, 2214 const char *fmt, ...) 2215{ 2216 va_list args; 2217 struct obd_export *exp = lock->l_export; 2218 struct ldlm_resource *resource = lock->l_resource; 2219 char *nid = "local"; 2220 2221 va_start(args, fmt); 2222 2223 if (exp && exp->exp_connection) { 2224 nid = libcfs_nid2str(exp->exp_connection->c_peer.nid); 2225 } else if (exp && exp->exp_obd != NULL) { 2226 struct obd_import *imp = exp->exp_obd->u.cli.cl_import; 2227 nid = libcfs_nid2str(imp->imp_connection->c_peer.nid); 2228 } 2229 2230 if (resource == NULL) { 2231 libcfs_debug_vmsg2(msgdata, fmt, args, 2232 " ns: \?\? lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s " 2233 "res: \?\? rrc=\?\? type: \?\?\? flags: %#llx nid: %s " 2234 "remote: %#llx expref: %d pid: %u timeout: %lu " 2235 "lvb_type: %d\n", 2236 lock, 2237 lock->l_handle.h_cookie, atomic_read(&lock->l_refc), 2238 lock->l_readers, lock->l_writers, 2239 ldlm_lockname[lock->l_granted_mode], 2240 ldlm_lockname[lock->l_req_mode], 2241 lock->l_flags, nid, lock->l_remote_handle.cookie, 2242 exp ? atomic_read(&exp->exp_refcount) : -99, 2243 lock->l_pid, lock->l_callback_timeout, lock->l_lvb_type); 2244 va_end(args); 2245 return; 2246 } 2247 2248 switch (resource->lr_type) { 2249 case LDLM_EXTENT: 2250 libcfs_debug_vmsg2(msgdata, fmt, args, 2251 " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s " 2252 "res: "DLDLMRES" rrc: %d type: %s [%llu->%llu] " 2253 "(req %llu->%llu) flags: %#llx nid: %s remote: " 2254 "%#llx expref: %d pid: %u timeout: %lu lvb_type: %d\n", 2255 ldlm_lock_to_ns_name(lock), lock, 2256 lock->l_handle.h_cookie, atomic_read(&lock->l_refc), 2257 lock->l_readers, lock->l_writers, 2258 ldlm_lockname[lock->l_granted_mode], 2259 ldlm_lockname[lock->l_req_mode], 2260 PLDLMRES(resource), 2261 atomic_read(&resource->lr_refcount), 2262 ldlm_typename[resource->lr_type], 2263 lock->l_policy_data.l_extent.start, 2264 lock->l_policy_data.l_extent.end, 2265 lock->l_req_extent.start, lock->l_req_extent.end, 2266 lock->l_flags, nid, lock->l_remote_handle.cookie, 2267 exp ? atomic_read(&exp->exp_refcount) : -99, 2268 lock->l_pid, lock->l_callback_timeout, 2269 lock->l_lvb_type); 2270 break; 2271 2272 case LDLM_FLOCK: 2273 libcfs_debug_vmsg2(msgdata, fmt, args, 2274 " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s " 2275 "res: "DLDLMRES" rrc: %d type: %s pid: %d " 2276 "[%llu->%llu] flags: %#llx nid: %s " 2277 "remote: %#llx expref: %d pid: %u timeout: %lu\n", 2278 ldlm_lock_to_ns_name(lock), lock, 2279 lock->l_handle.h_cookie, atomic_read(&lock->l_refc), 2280 lock->l_readers, lock->l_writers, 2281 ldlm_lockname[lock->l_granted_mode], 2282 ldlm_lockname[lock->l_req_mode], 2283 PLDLMRES(resource), 2284 atomic_read(&resource->lr_refcount), 2285 ldlm_typename[resource->lr_type], 2286 lock->l_policy_data.l_flock.pid, 2287 lock->l_policy_data.l_flock.start, 2288 lock->l_policy_data.l_flock.end, 2289 lock->l_flags, nid, lock->l_remote_handle.cookie, 2290 exp ? atomic_read(&exp->exp_refcount) : -99, 2291 lock->l_pid, lock->l_callback_timeout); 2292 break; 2293 2294 case LDLM_IBITS: 2295 libcfs_debug_vmsg2(msgdata, fmt, args, 2296 " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s " 2297 "res: "DLDLMRES" bits %#llx rrc: %d type: %s " 2298 "flags: %#llx nid: %s remote: %#llx expref: %d " 2299 "pid: %u timeout: %lu lvb_type: %d\n", 2300 ldlm_lock_to_ns_name(lock), 2301 lock, lock->l_handle.h_cookie, 2302 atomic_read(&lock->l_refc), 2303 lock->l_readers, lock->l_writers, 2304 ldlm_lockname[lock->l_granted_mode], 2305 ldlm_lockname[lock->l_req_mode], 2306 PLDLMRES(resource), 2307 lock->l_policy_data.l_inodebits.bits, 2308 atomic_read(&resource->lr_refcount), 2309 ldlm_typename[resource->lr_type], 2310 lock->l_flags, nid, lock->l_remote_handle.cookie, 2311 exp ? atomic_read(&exp->exp_refcount) : -99, 2312 lock->l_pid, lock->l_callback_timeout, 2313 lock->l_lvb_type); 2314 break; 2315 2316 default: 2317 libcfs_debug_vmsg2(msgdata, fmt, args, 2318 " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s " 2319 "res: "DLDLMRES" rrc: %d type: %s flags: %#llx " 2320 "nid: %s remote: %#llx expref: %d pid: %u " 2321 "timeout: %lu lvb_type: %d\n", 2322 ldlm_lock_to_ns_name(lock), 2323 lock, lock->l_handle.h_cookie, 2324 atomic_read(&lock->l_refc), 2325 lock->l_readers, lock->l_writers, 2326 ldlm_lockname[lock->l_granted_mode], 2327 ldlm_lockname[lock->l_req_mode], 2328 PLDLMRES(resource), 2329 atomic_read(&resource->lr_refcount), 2330 ldlm_typename[resource->lr_type], 2331 lock->l_flags, nid, lock->l_remote_handle.cookie, 2332 exp ? atomic_read(&exp->exp_refcount) : -99, 2333 lock->l_pid, lock->l_callback_timeout, 2334 lock->l_lvb_type); 2335 break; 2336 } 2337 va_end(args); 2338} 2339EXPORT_SYMBOL(_ldlm_lock_debug); 2340