1/* 2 * GPL HEADER START 3 * 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License version 2 only, 8 * as published by the Free Software Foundation. 9 * 10 * This program is distributed in the hope that it will be useful, but 11 * WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * General Public License version 2 for more details (a copy is included 14 * in the LICENSE file that accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License 17 * version 2 along with this program; If not, see 18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf 19 * 20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 21 * CA 95054 USA or visit www.sun.com if you need additional information or 22 * have any questions. 23 * 24 * GPL HEADER END 25 */ 26/* 27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. 28 * Use is subject to license terms. 29 * 30 * Copyright (c) 2010, 2012, Intel Corporation. 31 */ 32/* 33 * This file is part of Lustre, http://www.lustre.org/ 34 * Lustre is a trademark of Sun Microsystems, Inc. 35 */ 36/** 37 * This file contains Asynchronous System Trap (AST) handlers and related 38 * LDLM request-processing routines. 39 * 40 * An AST is a callback issued on a lock when its state is changed. There are 41 * several different types of ASTs (callbacks) registered for each lock: 42 * 43 * - completion AST: when a lock is enqueued by some process, but cannot be 44 * granted immediately due to other conflicting locks on the same resource, 45 * the completion AST is sent to notify the caller when the lock is 46 * eventually granted 47 * 48 * - blocking AST: when a lock is granted to some process, if another process 49 * enqueues a conflicting (blocking) lock on a resource, a blocking AST is 50 * sent to notify the holder(s) of the lock(s) of the conflicting lock 51 * request. The lock holder(s) must release their lock(s) on that resource in 52 * a timely manner or be evicted by the server. 53 * 54 * - glimpse AST: this is used when a process wants information about a lock 55 * (i.e. the lock value block (LVB)) but does not necessarily require holding 56 * the lock. If the resource is locked, the lock holder(s) are sent glimpse 57 * ASTs and the LVB is returned to the caller, and lock holder(s) may CANCEL 58 * their lock(s) if they are idle. If the resource is not locked, the server 59 * may grant the lock. 60 */ 61 62#define DEBUG_SUBSYSTEM S_LDLM 63 64#include "../include/lustre_dlm.h" 65#include "../include/obd_class.h" 66#include "../include/obd.h" 67 68#include "ldlm_internal.h" 69 70int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT; 71module_param(ldlm_enqueue_min, int, 0644); 72MODULE_PARM_DESC(ldlm_enqueue_min, "lock enqueue timeout minimum"); 73 74/* in client side, whether the cached locks will be canceled before replay */ 75unsigned int ldlm_cancel_unused_locks_before_replay = 1; 76 77static void interrupted_completion_wait(void *data) 78{ 79} 80 81struct lock_wait_data { 82 struct ldlm_lock *lwd_lock; 83 __u32 lwd_conn_cnt; 84}; 85 86struct ldlm_async_args { 87 struct lustre_handle lock_handle; 88}; 89 90int ldlm_expired_completion_wait(void *data) 91{ 92 struct lock_wait_data *lwd = data; 93 struct ldlm_lock *lock = lwd->lwd_lock; 94 struct obd_import *imp; 95 struct obd_device *obd; 96 97 if (lock->l_conn_export == NULL) { 98 static unsigned long next_dump = 0, last_dump = 0; 99 100 LCONSOLE_WARN("lock timed out (enqueued at "CFS_TIME_T", " 101 CFS_DURATION_T"s ago)\n", 102 lock->l_last_activity, 103 cfs_time_sub(get_seconds(), 104 lock->l_last_activity)); 105 LDLM_DEBUG(lock, "lock timed out (enqueued at "CFS_TIME_T", " 106 CFS_DURATION_T"s ago); not entering recovery in " 107 "server code, just going back to sleep", 108 lock->l_last_activity, 109 cfs_time_sub(get_seconds(), 110 lock->l_last_activity)); 111 if (cfs_time_after(cfs_time_current(), next_dump)) { 112 last_dump = next_dump; 113 next_dump = cfs_time_shift(300); 114 ldlm_namespace_dump(D_DLMTRACE, 115 ldlm_lock_to_ns(lock)); 116 if (last_dump == 0) 117 libcfs_debug_dumplog(); 118 } 119 return 0; 120 } 121 122 obd = lock->l_conn_export->exp_obd; 123 imp = obd->u.cli.cl_import; 124 ptlrpc_fail_import(imp, lwd->lwd_conn_cnt); 125 LDLM_ERROR(lock, "lock timed out (enqueued at "CFS_TIME_T", " 126 CFS_DURATION_T"s ago), entering recovery for %s@%s", 127 lock->l_last_activity, 128 cfs_time_sub(get_seconds(), lock->l_last_activity), 129 obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid); 130 131 return 0; 132} 133EXPORT_SYMBOL(ldlm_expired_completion_wait); 134 135/* We use the same basis for both server side and client side functions 136 from a single node. */ 137int ldlm_get_enq_timeout(struct ldlm_lock *lock) 138{ 139 int timeout = at_get(ldlm_lock_to_ns_at(lock)); 140 if (AT_OFF) 141 return obd_timeout / 2; 142 /* Since these are non-updating timeouts, we should be conservative. 143 It would be nice to have some kind of "early reply" mechanism for 144 lock callbacks too... */ 145 timeout = min_t(int, at_max, timeout + (timeout >> 1)); /* 150% */ 146 return max(timeout, ldlm_enqueue_min); 147} 148EXPORT_SYMBOL(ldlm_get_enq_timeout); 149 150/** 151 * Helper function for ldlm_completion_ast(), updating timings when lock is 152 * actually granted. 153 */ 154static int ldlm_completion_tail(struct ldlm_lock *lock) 155{ 156 long delay; 157 int result; 158 159 if (lock->l_flags & (LDLM_FL_DESTROYED | LDLM_FL_FAILED)) { 160 LDLM_DEBUG(lock, "client-side enqueue: destroyed"); 161 result = -EIO; 162 } else { 163 delay = cfs_time_sub(get_seconds(), 164 lock->l_last_activity); 165 LDLM_DEBUG(lock, "client-side enqueue: granted after " 166 CFS_DURATION_T"s", delay); 167 168 /* Update our time estimate */ 169 at_measured(ldlm_lock_to_ns_at(lock), 170 delay); 171 result = 0; 172 } 173 return result; 174} 175 176/** 177 * Implementation of ->l_completion_ast() for a client, that doesn't wait 178 * until lock is granted. Suitable for locks enqueued through ptlrpcd, of 179 * other threads that cannot block for long. 180 */ 181int ldlm_completion_ast_async(struct ldlm_lock *lock, __u64 flags, void *data) 182{ 183 if (flags == LDLM_FL_WAIT_NOREPROC) { 184 LDLM_DEBUG(lock, "client-side enqueue waiting on pending lock"); 185 return 0; 186 } 187 188 if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED | 189 LDLM_FL_BLOCK_CONV))) { 190 wake_up(&lock->l_waitq); 191 return ldlm_completion_tail(lock); 192 } 193 194 LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, " 195 "going forward"); 196 ldlm_reprocess_all(lock->l_resource); 197 return 0; 198} 199EXPORT_SYMBOL(ldlm_completion_ast_async); 200 201/** 202 * Generic LDLM "completion" AST. This is called in several cases: 203 * 204 * - when a reply to an ENQUEUE RPC is received from the server 205 * (ldlm_cli_enqueue_fini()). Lock might be granted or not granted at 206 * this point (determined by flags); 207 * 208 * - when LDLM_CP_CALLBACK RPC comes to client to notify it that lock has 209 * been granted; 210 * 211 * - when ldlm_lock_match(LDLM_FL_LVB_READY) is about to wait until lock 212 * gets correct lvb; 213 * 214 * - to force all locks when resource is destroyed (cleanup_resource()); 215 * 216 * - during lock conversion (not used currently). 217 * 218 * If lock is not granted in the first case, this function waits until second 219 * or penultimate cases happen in some other thread. 220 * 221 */ 222int ldlm_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) 223{ 224 /* XXX ALLOCATE - 160 bytes */ 225 struct lock_wait_data lwd; 226 struct obd_device *obd; 227 struct obd_import *imp = NULL; 228 struct l_wait_info lwi; 229 __u32 timeout; 230 int rc = 0; 231 232 if (flags == LDLM_FL_WAIT_NOREPROC) { 233 LDLM_DEBUG(lock, "client-side enqueue waiting on pending lock"); 234 goto noreproc; 235 } 236 237 if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED | 238 LDLM_FL_BLOCK_CONV))) { 239 wake_up(&lock->l_waitq); 240 return 0; 241 } 242 243 LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, " 244 "sleeping"); 245 246noreproc: 247 248 obd = class_exp2obd(lock->l_conn_export); 249 250 /* if this is a local lock, then there is no import */ 251 if (obd != NULL) { 252 imp = obd->u.cli.cl_import; 253 } 254 255 /* Wait a long time for enqueue - server may have to callback a 256 lock from another client. Server will evict the other client if it 257 doesn't respond reasonably, and then give us the lock. */ 258 timeout = ldlm_get_enq_timeout(lock) * 2; 259 260 lwd.lwd_lock = lock; 261 262 if (lock->l_flags & LDLM_FL_NO_TIMEOUT) { 263 LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT"); 264 lwi = LWI_INTR(interrupted_completion_wait, &lwd); 265 } else { 266 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout), 267 ldlm_expired_completion_wait, 268 interrupted_completion_wait, &lwd); 269 } 270 271 if (imp != NULL) { 272 spin_lock(&imp->imp_lock); 273 lwd.lwd_conn_cnt = imp->imp_conn_cnt; 274 spin_unlock(&imp->imp_lock); 275 } 276 277 if (ns_is_client(ldlm_lock_to_ns(lock)) && 278 OBD_FAIL_CHECK_RESET(OBD_FAIL_LDLM_INTR_CP_AST, 279 OBD_FAIL_LDLM_CP_BL_RACE | OBD_FAIL_ONCE)) { 280 lock->l_flags |= LDLM_FL_FAIL_LOC; 281 rc = -EINTR; 282 } else { 283 /* Go to sleep until the lock is granted or cancelled. */ 284 rc = l_wait_event(lock->l_waitq, 285 is_granted_or_cancelled(lock), &lwi); 286 } 287 288 if (rc) { 289 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)", 290 rc); 291 return rc; 292 } 293 294 return ldlm_completion_tail(lock); 295} 296EXPORT_SYMBOL(ldlm_completion_ast); 297 298/** 299 * A helper to build a blocking AST function 300 * 301 * Perform a common operation for blocking ASTs: 302 * deferred lock cancellation. 303 * 304 * \param lock the lock blocking or canceling AST was called on 305 * \retval 0 306 * \see mdt_blocking_ast 307 * \see ldlm_blocking_ast 308 */ 309int ldlm_blocking_ast_nocheck(struct ldlm_lock *lock) 310{ 311 int do_ast; 312 313 lock->l_flags |= LDLM_FL_CBPENDING; 314 do_ast = (!lock->l_readers && !lock->l_writers); 315 unlock_res_and_lock(lock); 316 317 if (do_ast) { 318 struct lustre_handle lockh; 319 int rc; 320 321 LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel"); 322 ldlm_lock2handle(lock, &lockh); 323 rc = ldlm_cli_cancel(&lockh, LCF_ASYNC); 324 if (rc < 0) 325 CERROR("ldlm_cli_cancel: %d\n", rc); 326 } else { 327 LDLM_DEBUG(lock, "Lock still has references, will be " 328 "cancelled later"); 329 } 330 return 0; 331} 332EXPORT_SYMBOL(ldlm_blocking_ast_nocheck); 333 334/** 335 * Server blocking AST 336 * 337 * ->l_blocking_ast() callback for LDLM locks acquired by server-side 338 * OBDs. 339 * 340 * \param lock the lock which blocks a request or cancelling lock 341 * \param desc unused 342 * \param data unused 343 * \param flag indicates whether this cancelling or blocking callback 344 * \retval 0 345 * \see ldlm_blocking_ast_nocheck 346 */ 347int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, 348 void *data, int flag) 349{ 350 if (flag == LDLM_CB_CANCELING) { 351 /* Don't need to do anything here. */ 352 return 0; 353 } 354 355 lock_res_and_lock(lock); 356 /* Get this: if ldlm_blocking_ast is racing with intent_policy, such 357 * that ldlm_blocking_ast is called just before intent_policy method 358 * takes the lr_lock, then by the time we get the lock, we might not 359 * be the correct blocking function anymore. So check, and return 360 * early, if so. */ 361 if (lock->l_blocking_ast != ldlm_blocking_ast) { 362 unlock_res_and_lock(lock); 363 return 0; 364 } 365 return ldlm_blocking_ast_nocheck(lock); 366} 367EXPORT_SYMBOL(ldlm_blocking_ast); 368 369/** 370 * ->l_glimpse_ast() for DLM extent locks acquired on the server-side. See 371 * comment in filter_intent_policy() on why you may need this. 372 */ 373int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp) 374{ 375 /* 376 * Returning -ELDLM_NO_LOCK_DATA actually works, but the reason for 377 * that is rather subtle: with OST-side locking, it may so happen that 378 * _all_ extent locks are held by the OST. If client wants to obtain 379 * current file size it calls ll{,u}_glimpse_size(), and (as locks are 380 * on the server), dummy glimpse callback fires and does 381 * nothing. Client still receives correct file size due to the 382 * following fragment in filter_intent_policy(): 383 * 384 * rc = l->l_glimpse_ast(l, NULL); // this will update the LVB 385 * if (rc != 0 && res->lr_namespace->ns_lvbo && 386 * res->lr_namespace->ns_lvbo->lvbo_update) { 387 * res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, 0, 1); 388 * } 389 * 390 * that is, after glimpse_ast() fails, filter_lvbo_update() runs, and 391 * returns correct file size to the client. 392 */ 393 return -ELDLM_NO_LOCK_DATA; 394} 395EXPORT_SYMBOL(ldlm_glimpse_ast); 396 397/** 398 * Enqueue a local lock (typically on a server). 399 */ 400int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, 401 const struct ldlm_res_id *res_id, 402 ldlm_type_t type, ldlm_policy_data_t *policy, 403 ldlm_mode_t mode, __u64 *flags, 404 ldlm_blocking_callback blocking, 405 ldlm_completion_callback completion, 406 ldlm_glimpse_callback glimpse, 407 void *data, __u32 lvb_len, enum lvb_type lvb_type, 408 const __u64 *client_cookie, 409 struct lustre_handle *lockh) 410{ 411 struct ldlm_lock *lock; 412 int err; 413 const struct ldlm_callback_suite cbs = { .lcs_completion = completion, 414 .lcs_blocking = blocking, 415 .lcs_glimpse = glimpse, 416 }; 417 418 LASSERT(!(*flags & LDLM_FL_REPLAY)); 419 if (unlikely(ns_is_client(ns))) { 420 CERROR("Trying to enqueue local lock in a shadow namespace\n"); 421 LBUG(); 422 } 423 424 lock = ldlm_lock_create(ns, res_id, type, mode, &cbs, data, lvb_len, 425 lvb_type); 426 if (unlikely(!lock)) { 427 err = -ENOMEM; 428 goto out_nolock; 429 } 430 431 ldlm_lock2handle(lock, lockh); 432 433 /* NB: we don't have any lock now (lock_res_and_lock) 434 * because it's a new lock */ 435 ldlm_lock_addref_internal_nolock(lock, mode); 436 lock->l_flags |= LDLM_FL_LOCAL; 437 if (*flags & LDLM_FL_ATOMIC_CB) 438 lock->l_flags |= LDLM_FL_ATOMIC_CB; 439 440 if (policy != NULL) 441 lock->l_policy_data = *policy; 442 if (client_cookie != NULL) 443 lock->l_client_cookie = *client_cookie; 444 if (type == LDLM_EXTENT) 445 lock->l_req_extent = policy->l_extent; 446 447 err = ldlm_lock_enqueue(ns, &lock, policy, flags); 448 if (unlikely(err != ELDLM_OK)) 449 goto out; 450 451 if (policy != NULL) 452 *policy = lock->l_policy_data; 453 454 if (lock->l_completion_ast) 455 lock->l_completion_ast(lock, *flags, NULL); 456 457 LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created"); 458 out: 459 LDLM_LOCK_RELEASE(lock); 460 out_nolock: 461 return err; 462} 463EXPORT_SYMBOL(ldlm_cli_enqueue_local); 464 465static void failed_lock_cleanup(struct ldlm_namespace *ns, 466 struct ldlm_lock *lock, int mode) 467{ 468 int need_cancel = 0; 469 470 /* Set a flag to prevent us from sending a CANCEL (bug 407) */ 471 lock_res_and_lock(lock); 472 /* Check that lock is not granted or failed, we might race. */ 473 if ((lock->l_req_mode != lock->l_granted_mode) && 474 !(lock->l_flags & LDLM_FL_FAILED)) { 475 /* Make sure that this lock will not be found by raced 476 * bl_ast and -EINVAL reply is sent to server anyways. 477 * bug 17645 */ 478 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_FAILED | 479 LDLM_FL_ATOMIC_CB | LDLM_FL_CBPENDING; 480 need_cancel = 1; 481 } 482 unlock_res_and_lock(lock); 483 484 if (need_cancel) 485 LDLM_DEBUG(lock, 486 "setting FL_LOCAL_ONLY | LDLM_FL_FAILED | " 487 "LDLM_FL_ATOMIC_CB | LDLM_FL_CBPENDING"); 488 else 489 LDLM_DEBUG(lock, "lock was granted or failed in race"); 490 491 ldlm_lock_decref_internal(lock, mode); 492 493 /* XXX - HACK because we shouldn't call ldlm_lock_destroy() 494 * from llite/file.c/ll_file_flock(). */ 495 /* This code makes for the fact that we do not have blocking handler on 496 * a client for flock locks. As such this is the place where we must 497 * completely kill failed locks. (interrupted and those that 498 * were waiting to be granted when server evicted us. */ 499 if (lock->l_resource->lr_type == LDLM_FLOCK) { 500 lock_res_and_lock(lock); 501 ldlm_resource_unlink_lock(lock); 502 ldlm_lock_destroy_nolock(lock); 503 unlock_res_and_lock(lock); 504 } 505} 506 507/** 508 * Finishing portion of client lock enqueue code. 509 * 510 * Called after receiving reply from server. 511 */ 512int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, 513 ldlm_type_t type, __u8 with_policy, ldlm_mode_t mode, 514 __u64 *flags, void *lvb, __u32 lvb_len, 515 struct lustre_handle *lockh, int rc) 516{ 517 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; 518 int is_replay = *flags & LDLM_FL_REPLAY; 519 struct ldlm_lock *lock; 520 struct ldlm_reply *reply; 521 int cleanup_phase = 1; 522 int size = 0; 523 524 lock = ldlm_handle2lock(lockh); 525 /* ldlm_cli_enqueue is holding a reference on this lock. */ 526 if (!lock) { 527 LASSERT(type == LDLM_FLOCK); 528 return -ENOLCK; 529 } 530 531 LASSERTF(ergo(lvb_len != 0, lvb_len == lock->l_lvb_len), 532 "lvb_len = %d, l_lvb_len = %d\n", lvb_len, lock->l_lvb_len); 533 534 if (rc != ELDLM_OK) { 535 LASSERT(!is_replay); 536 LDLM_DEBUG(lock, "client-side enqueue END (%s)", 537 rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED"); 538 539 if (rc != ELDLM_LOCK_ABORTED) 540 goto cleanup; 541 } 542 543 /* Before we return, swab the reply */ 544 reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); 545 if (reply == NULL) { 546 rc = -EPROTO; 547 goto cleanup; 548 } 549 550 if (lvb_len != 0) { 551 LASSERT(lvb != NULL); 552 553 size = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, 554 RCL_SERVER); 555 if (size < 0) { 556 LDLM_ERROR(lock, "Fail to get lvb_len, rc = %d", size); 557 rc = size; 558 goto cleanup; 559 } else if (unlikely(size > lvb_len)) { 560 LDLM_ERROR(lock, "Replied LVB is larger than " 561 "expectation, expected = %d, replied = %d", 562 lvb_len, size); 563 rc = -EINVAL; 564 goto cleanup; 565 } 566 } 567 568 if (rc == ELDLM_LOCK_ABORTED) { 569 if (lvb_len != 0) 570 rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER, 571 lvb, size); 572 if (rc == 0) 573 rc = ELDLM_LOCK_ABORTED; 574 goto cleanup; 575 } 576 577 /* lock enqueued on the server */ 578 cleanup_phase = 0; 579 580 lock_res_and_lock(lock); 581 /* Key change rehash lock in per-export hash with new key */ 582 if (exp->exp_lock_hash) { 583 /* In the function below, .hs_keycmp resolves to 584 * ldlm_export_lock_keycmp() */ 585 /* coverity[overrun-buffer-val] */ 586 cfs_hash_rehash_key(exp->exp_lock_hash, 587 &lock->l_remote_handle, 588 &reply->lock_handle, 589 &lock->l_exp_hash); 590 } else { 591 lock->l_remote_handle = reply->lock_handle; 592 } 593 594 *flags = ldlm_flags_from_wire(reply->lock_flags); 595 lock->l_flags |= ldlm_flags_from_wire(reply->lock_flags & 596 LDLM_INHERIT_FLAGS); 597 /* move NO_TIMEOUT flag to the lock to force ldlm_lock_match() 598 * to wait with no timeout as well */ 599 lock->l_flags |= ldlm_flags_from_wire(reply->lock_flags & 600 LDLM_FL_NO_TIMEOUT); 601 unlock_res_and_lock(lock); 602 603 CDEBUG(D_INFO, "local: %p, remote cookie: %#llx, flags: 0x%llx\n", 604 lock, reply->lock_handle.cookie, *flags); 605 606 /* If enqueue returned a blocked lock but the completion handler has 607 * already run, then it fixed up the resource and we don't need to do it 608 * again. */ 609 if ((*flags) & LDLM_FL_LOCK_CHANGED) { 610 int newmode = reply->lock_desc.l_req_mode; 611 LASSERT(!is_replay); 612 if (newmode && newmode != lock->l_req_mode) { 613 LDLM_DEBUG(lock, "server returned different mode %s", 614 ldlm_lockname[newmode]); 615 lock->l_req_mode = newmode; 616 } 617 618 if (!ldlm_res_eq(&reply->lock_desc.l_resource.lr_name, 619 &lock->l_resource->lr_name)) { 620 CDEBUG(D_INFO, "remote intent success, locking "DLDLMRES 621 " instead of "DLDLMRES"\n", 622 PLDLMRES(&reply->lock_desc.l_resource), 623 PLDLMRES(lock->l_resource)); 624 625 rc = ldlm_lock_change_resource(ns, lock, 626 &reply->lock_desc.l_resource.lr_name); 627 if (rc || lock->l_resource == NULL) { 628 rc = -ENOMEM; 629 goto cleanup; 630 } 631 LDLM_DEBUG(lock, "client-side enqueue, new resource"); 632 } 633 if (with_policy) 634 if (!(type == LDLM_IBITS && 635 !(exp_connect_flags(exp) & OBD_CONNECT_IBITS))) 636 /* We assume lock type cannot change on server*/ 637 ldlm_convert_policy_to_local(exp, 638 lock->l_resource->lr_type, 639 &reply->lock_desc.l_policy_data, 640 &lock->l_policy_data); 641 if (type != LDLM_PLAIN) 642 LDLM_DEBUG(lock, 643 "client-side enqueue, new policy data"); 644 } 645 646 if ((*flags) & LDLM_FL_AST_SENT || 647 /* Cancel extent locks as soon as possible on a liblustre client, 648 * because it cannot handle asynchronous ASTs robustly (see 649 * bug 7311). */ 650 (LIBLUSTRE_CLIENT && type == LDLM_EXTENT)) { 651 lock_res_and_lock(lock); 652 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST; 653 unlock_res_and_lock(lock); 654 LDLM_DEBUG(lock, "enqueue reply includes blocking AST"); 655 } 656 657 /* If the lock has already been granted by a completion AST, don't 658 * clobber the LVB with an older one. */ 659 if (lvb_len != 0) { 660 /* We must lock or a racing completion might update lvb without 661 * letting us know and we'll clobber the correct value. 662 * Cannot unlock after the check either, a that still leaves 663 * a tiny window for completion to get in */ 664 lock_res_and_lock(lock); 665 if (lock->l_req_mode != lock->l_granted_mode) 666 rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER, 667 lock->l_lvb_data, size); 668 unlock_res_and_lock(lock); 669 if (rc < 0) { 670 cleanup_phase = 1; 671 goto cleanup; 672 } 673 } 674 675 if (!is_replay) { 676 rc = ldlm_lock_enqueue(ns, &lock, NULL, flags); 677 if (lock->l_completion_ast != NULL) { 678 int err = lock->l_completion_ast(lock, *flags, NULL); 679 if (!rc) 680 rc = err; 681 if (rc) 682 cleanup_phase = 1; 683 } 684 } 685 686 if (lvb_len && lvb != NULL) { 687 /* Copy the LVB here, and not earlier, because the completion 688 * AST (if any) can override what we got in the reply */ 689 memcpy(lvb, lock->l_lvb_data, lvb_len); 690 } 691 692 LDLM_DEBUG(lock, "client-side enqueue END"); 693cleanup: 694 if (cleanup_phase == 1 && rc) 695 failed_lock_cleanup(ns, lock, mode); 696 /* Put lock 2 times, the second reference is held by ldlm_cli_enqueue */ 697 LDLM_LOCK_PUT(lock); 698 LDLM_LOCK_RELEASE(lock); 699 return rc; 700} 701EXPORT_SYMBOL(ldlm_cli_enqueue_fini); 702 703/** 704 * Estimate number of lock handles that would fit into request of given 705 * size. PAGE_SIZE-512 is to allow TCP/IP and LNET headers to fit into 706 * a single page on the send/receive side. XXX: 512 should be changed to 707 * more adequate value. 708 */ 709static inline int ldlm_req_handles_avail(int req_size, int off) 710{ 711 int avail; 712 713 avail = min_t(int, LDLM_MAXREQSIZE, PAGE_CACHE_SIZE - 512) - req_size; 714 if (likely(avail >= 0)) 715 avail /= (int)sizeof(struct lustre_handle); 716 else 717 avail = 0; 718 avail += LDLM_LOCKREQ_HANDLES - off; 719 720 return avail; 721} 722 723static inline int ldlm_capsule_handles_avail(struct req_capsule *pill, 724 enum req_location loc, 725 int off) 726{ 727 int size = req_capsule_msg_size(pill, loc); 728 return ldlm_req_handles_avail(size, off); 729} 730 731static inline int ldlm_format_handles_avail(struct obd_import *imp, 732 const struct req_format *fmt, 733 enum req_location loc, int off) 734{ 735 int size = req_capsule_fmt_size(imp->imp_msg_magic, fmt, loc); 736 return ldlm_req_handles_avail(size, off); 737} 738 739/** 740 * Cancel LRU locks and pack them into the enqueue request. Pack there the given 741 * \a count locks in \a cancels. 742 * 743 * This is to be called by functions preparing their own requests that 744 * might contain lists of locks to cancel in addition to actual operation 745 * that needs to be performed. 746 */ 747int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req, 748 int version, int opc, int canceloff, 749 struct list_head *cancels, int count) 750{ 751 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; 752 struct req_capsule *pill = &req->rq_pill; 753 struct ldlm_request *dlm = NULL; 754 int flags, avail, to_free, pack = 0; 755 LIST_HEAD(head); 756 int rc; 757 758 if (cancels == NULL) 759 cancels = &head; 760 if (ns_connect_cancelset(ns)) { 761 /* Estimate the amount of available space in the request. */ 762 req_capsule_filled_sizes(pill, RCL_CLIENT); 763 avail = ldlm_capsule_handles_avail(pill, RCL_CLIENT, canceloff); 764 765 flags = ns_connect_lru_resize(ns) ? 766 LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED; 767 to_free = !ns_connect_lru_resize(ns) && 768 opc == LDLM_ENQUEUE ? 1 : 0; 769 770 /* Cancel LRU locks here _only_ if the server supports 771 * EARLY_CANCEL. Otherwise we have to send extra CANCEL 772 * RPC, which will make us slower. */ 773 if (avail > count) 774 count += ldlm_cancel_lru_local(ns, cancels, to_free, 775 avail - count, 0, flags); 776 if (avail > count) 777 pack = count; 778 else 779 pack = avail; 780 req_capsule_set_size(pill, &RMF_DLM_REQ, RCL_CLIENT, 781 ldlm_request_bufsize(pack, opc)); 782 } 783 784 rc = ptlrpc_request_pack(req, version, opc); 785 if (rc) { 786 ldlm_lock_list_put(cancels, l_bl_ast, count); 787 return rc; 788 } 789 790 if (ns_connect_cancelset(ns)) { 791 if (canceloff) { 792 dlm = req_capsule_client_get(pill, &RMF_DLM_REQ); 793 LASSERT(dlm); 794 /* Skip first lock handler in ldlm_request_pack(), 795 * this method will increment @lock_count according 796 * to the lock handle amount actually written to 797 * the buffer. */ 798 dlm->lock_count = canceloff; 799 } 800 /* Pack into the request @pack lock handles. */ 801 ldlm_cli_cancel_list(cancels, pack, req, 0); 802 /* Prepare and send separate cancel RPC for others. */ 803 ldlm_cli_cancel_list(cancels, count - pack, NULL, 0); 804 } else { 805 ldlm_lock_list_put(cancels, l_bl_ast, count); 806 } 807 return 0; 808} 809EXPORT_SYMBOL(ldlm_prep_elc_req); 810 811int ldlm_prep_enqueue_req(struct obd_export *exp, struct ptlrpc_request *req, 812 struct list_head *cancels, int count) 813{ 814 return ldlm_prep_elc_req(exp, req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE, 815 LDLM_ENQUEUE_CANCEL_OFF, cancels, count); 816} 817EXPORT_SYMBOL(ldlm_prep_enqueue_req); 818 819struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp, int lvb_len) 820{ 821 struct ptlrpc_request *req; 822 int rc; 823 824 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE); 825 if (req == NULL) 826 return ERR_PTR(-ENOMEM); 827 828 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); 829 if (rc) { 830 ptlrpc_request_free(req); 831 return ERR_PTR(rc); 832 } 833 834 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len); 835 ptlrpc_request_set_replen(req); 836 return req; 837} 838EXPORT_SYMBOL(ldlm_enqueue_pack); 839 840/** 841 * Client-side lock enqueue. 842 * 843 * If a request has some specific initialisation it is passed in \a reqp, 844 * otherwise it is created in ldlm_cli_enqueue. 845 * 846 * Supports sync and async requests, pass \a async flag accordingly. If a 847 * request was created in ldlm_cli_enqueue and it is the async request, 848 * pass it to the caller in \a reqp. 849 */ 850int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, 851 struct ldlm_enqueue_info *einfo, 852 const struct ldlm_res_id *res_id, 853 ldlm_policy_data_t const *policy, __u64 *flags, 854 void *lvb, __u32 lvb_len, enum lvb_type lvb_type, 855 struct lustre_handle *lockh, int async) 856{ 857 struct ldlm_namespace *ns; 858 struct ldlm_lock *lock; 859 struct ldlm_request *body; 860 int is_replay = *flags & LDLM_FL_REPLAY; 861 int req_passed_in = 1; 862 int rc, err; 863 struct ptlrpc_request *req; 864 865 LASSERT(exp != NULL); 866 867 ns = exp->exp_obd->obd_namespace; 868 869 /* If we're replaying this lock, just check some invariants. 870 * If we're creating a new lock, get everything all setup nice. */ 871 if (is_replay) { 872 lock = ldlm_handle2lock_long(lockh, 0); 873 LASSERT(lock != NULL); 874 LDLM_DEBUG(lock, "client-side enqueue START"); 875 LASSERT(exp == lock->l_conn_export); 876 } else { 877 const struct ldlm_callback_suite cbs = { 878 .lcs_completion = einfo->ei_cb_cp, 879 .lcs_blocking = einfo->ei_cb_bl, 880 .lcs_glimpse = einfo->ei_cb_gl 881 }; 882 lock = ldlm_lock_create(ns, res_id, einfo->ei_type, 883 einfo->ei_mode, &cbs, einfo->ei_cbdata, 884 lvb_len, lvb_type); 885 if (lock == NULL) 886 return -ENOMEM; 887 /* for the local lock, add the reference */ 888 ldlm_lock_addref_internal(lock, einfo->ei_mode); 889 ldlm_lock2handle(lock, lockh); 890 if (policy != NULL) 891 lock->l_policy_data = *policy; 892 893 if (einfo->ei_type == LDLM_EXTENT) 894 lock->l_req_extent = policy->l_extent; 895 LDLM_DEBUG(lock, "client-side enqueue START, flags %llx\n", 896 *flags); 897 } 898 899 lock->l_conn_export = exp; 900 lock->l_export = NULL; 901 lock->l_blocking_ast = einfo->ei_cb_bl; 902 lock->l_flags |= (*flags & (LDLM_FL_NO_LRU | LDLM_FL_EXCL)); 903 904 /* lock not sent to server yet */ 905 906 if (reqp == NULL || *reqp == NULL) { 907 req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), 908 &RQF_LDLM_ENQUEUE, 909 LUSTRE_DLM_VERSION, 910 LDLM_ENQUEUE); 911 if (req == NULL) { 912 failed_lock_cleanup(ns, lock, einfo->ei_mode); 913 LDLM_LOCK_RELEASE(lock); 914 return -ENOMEM; 915 } 916 req_passed_in = 0; 917 if (reqp) 918 *reqp = req; 919 } else { 920 int len; 921 922 req = *reqp; 923 len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ, 924 RCL_CLIENT); 925 LASSERTF(len >= sizeof(*body), "buflen[%d] = %d, not %d\n", 926 DLM_LOCKREQ_OFF, len, (int)sizeof(*body)); 927 } 928 929 /* Dump lock data into the request buffer */ 930 body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); 931 ldlm_lock2desc(lock, &body->lock_desc); 932 body->lock_flags = ldlm_flags_to_wire(*flags); 933 body->lock_handle[0] = *lockh; 934 935 /* Continue as normal. */ 936 if (!req_passed_in) { 937 if (lvb_len > 0) 938 req_capsule_extend(&req->rq_pill, 939 &RQF_LDLM_ENQUEUE_LVB); 940 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, 941 lvb_len); 942 ptlrpc_request_set_replen(req); 943 } 944 945 /* 946 * Liblustre client doesn't get extent locks, except for O_APPEND case 947 * where [0, OBD_OBJECT_EOF] lock is taken, or truncate, where 948 * [i_size, OBD_OBJECT_EOF] lock is taken. 949 */ 950 LASSERT(ergo(LIBLUSTRE_CLIENT, einfo->ei_type != LDLM_EXTENT || 951 policy->l_extent.end == OBD_OBJECT_EOF)); 952 953 if (async) { 954 LASSERT(reqp != NULL); 955 return 0; 956 } 957 958 LDLM_DEBUG(lock, "sending request"); 959 960 rc = ptlrpc_queue_wait(req); 961 962 err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0, 963 einfo->ei_mode, flags, lvb, lvb_len, 964 lockh, rc); 965 966 /* If ldlm_cli_enqueue_fini did not find the lock, we need to free 967 * one reference that we took */ 968 if (err == -ENOLCK) 969 LDLM_LOCK_RELEASE(lock); 970 else 971 rc = err; 972 973 if (!req_passed_in && req != NULL) { 974 ptlrpc_req_finished(req); 975 if (reqp) 976 *reqp = NULL; 977 } 978 979 return rc; 980} 981EXPORT_SYMBOL(ldlm_cli_enqueue); 982 983static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode, 984 __u32 *flags) 985{ 986 struct ldlm_resource *res; 987 int rc; 988 989 if (ns_is_client(ldlm_lock_to_ns(lock))) { 990 CERROR("Trying to cancel local lock\n"); 991 LBUG(); 992 } 993 LDLM_DEBUG(lock, "client-side local convert"); 994 995 res = ldlm_lock_convert(lock, new_mode, flags); 996 if (res) { 997 ldlm_reprocess_all(res); 998 rc = 0; 999 } else { 1000 rc = LUSTRE_EDEADLK; 1001 } 1002 LDLM_DEBUG(lock, "client-side local convert handler END"); 1003 LDLM_LOCK_PUT(lock); 1004 return rc; 1005} 1006 1007/* FIXME: one of ldlm_cli_convert or the server side should reject attempted 1008 * conversion of locks which are on the waiting or converting queue */ 1009/* Caller of this code is supposed to take care of lock readers/writers 1010 accounting */ 1011int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, __u32 *flags) 1012{ 1013 struct ldlm_request *body; 1014 struct ldlm_reply *reply; 1015 struct ldlm_lock *lock; 1016 struct ldlm_resource *res; 1017 struct ptlrpc_request *req; 1018 int rc; 1019 1020 lock = ldlm_handle2lock(lockh); 1021 if (!lock) { 1022 LBUG(); 1023 return -EINVAL; 1024 } 1025 *flags = 0; 1026 1027 if (lock->l_conn_export == NULL) 1028 return ldlm_cli_convert_local(lock, new_mode, flags); 1029 1030 LDLM_DEBUG(lock, "client-side convert"); 1031 1032 req = ptlrpc_request_alloc_pack(class_exp2cliimp(lock->l_conn_export), 1033 &RQF_LDLM_CONVERT, LUSTRE_DLM_VERSION, 1034 LDLM_CONVERT); 1035 if (req == NULL) { 1036 LDLM_LOCK_PUT(lock); 1037 return -ENOMEM; 1038 } 1039 1040 body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); 1041 body->lock_handle[0] = lock->l_remote_handle; 1042 1043 body->lock_desc.l_req_mode = new_mode; 1044 body->lock_flags = ldlm_flags_to_wire(*flags); 1045 1046 1047 ptlrpc_request_set_replen(req); 1048 rc = ptlrpc_queue_wait(req); 1049 if (rc != ELDLM_OK) 1050 goto out; 1051 1052 reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); 1053 if (reply == NULL) { 1054 rc = -EPROTO; 1055 goto out; 1056 } 1057 1058 if (req->rq_status) { 1059 rc = req->rq_status; 1060 goto out; 1061 } 1062 1063 res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags); 1064 if (res != NULL) { 1065 ldlm_reprocess_all(res); 1066 /* Go to sleep until the lock is granted. */ 1067 /* FIXME: or cancelled. */ 1068 if (lock->l_completion_ast) { 1069 rc = lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC, 1070 NULL); 1071 if (rc) 1072 goto out; 1073 } 1074 } else { 1075 rc = LUSTRE_EDEADLK; 1076 } 1077 out: 1078 LDLM_LOCK_PUT(lock); 1079 ptlrpc_req_finished(req); 1080 return rc; 1081} 1082EXPORT_SYMBOL(ldlm_cli_convert); 1083 1084/** 1085 * Cancel locks locally. 1086 * Returns: 1087 * \retval LDLM_FL_LOCAL_ONLY if there is no need for a CANCEL RPC to the server 1088 * \retval LDLM_FL_CANCELING otherwise; 1089 * \retval LDLM_FL_BL_AST if there is a need for a separate CANCEL RPC. 1090 */ 1091static __u64 ldlm_cli_cancel_local(struct ldlm_lock *lock) 1092{ 1093 __u64 rc = LDLM_FL_LOCAL_ONLY; 1094 1095 if (lock->l_conn_export) { 1096 bool local_only; 1097 1098 LDLM_DEBUG(lock, "client-side cancel"); 1099 /* Set this flag to prevent others from getting new references*/ 1100 lock_res_and_lock(lock); 1101 lock->l_flags |= LDLM_FL_CBPENDING; 1102 local_only = !!(lock->l_flags & 1103 (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK)); 1104 ldlm_cancel_callback(lock); 1105 rc = (lock->l_flags & LDLM_FL_BL_AST) ? 1106 LDLM_FL_BL_AST : LDLM_FL_CANCELING; 1107 unlock_res_and_lock(lock); 1108 1109 if (local_only) { 1110 CDEBUG(D_DLMTRACE, "not sending request (at caller's " 1111 "instruction)\n"); 1112 rc = LDLM_FL_LOCAL_ONLY; 1113 } 1114 ldlm_lock_cancel(lock); 1115 } else { 1116 if (ns_is_client(ldlm_lock_to_ns(lock))) { 1117 LDLM_ERROR(lock, "Trying to cancel local lock"); 1118 LBUG(); 1119 } 1120 LDLM_DEBUG(lock, "server-side local cancel"); 1121 ldlm_lock_cancel(lock); 1122 ldlm_reprocess_all(lock->l_resource); 1123 } 1124 1125 return rc; 1126} 1127 1128/** 1129 * Pack \a count locks in \a head into ldlm_request buffer of request \a req. 1130 */ 1131static void ldlm_cancel_pack(struct ptlrpc_request *req, 1132 struct list_head *head, int count) 1133{ 1134 struct ldlm_request *dlm; 1135 struct ldlm_lock *lock; 1136 int max, packed = 0; 1137 1138 dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); 1139 LASSERT(dlm != NULL); 1140 1141 /* Check the room in the request buffer. */ 1142 max = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT) - 1143 sizeof(struct ldlm_request); 1144 max /= sizeof(struct lustre_handle); 1145 max += LDLM_LOCKREQ_HANDLES; 1146 LASSERT(max >= dlm->lock_count + count); 1147 1148 /* XXX: it would be better to pack lock handles grouped by resource. 1149 * so that the server cancel would call filter_lvbo_update() less 1150 * frequently. */ 1151 list_for_each_entry(lock, head, l_bl_ast) { 1152 if (!count--) 1153 break; 1154 LASSERT(lock->l_conn_export); 1155 /* Pack the lock handle to the given request buffer. */ 1156 LDLM_DEBUG(lock, "packing"); 1157 dlm->lock_handle[dlm->lock_count++] = lock->l_remote_handle; 1158 packed++; 1159 } 1160 CDEBUG(D_DLMTRACE, "%d locks packed\n", packed); 1161} 1162 1163/** 1164 * Prepare and send a batched cancel RPC. It will include \a count lock 1165 * handles of locks given in \a cancels list. */ 1166int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels, 1167 int count, ldlm_cancel_flags_t flags) 1168{ 1169 struct ptlrpc_request *req = NULL; 1170 struct obd_import *imp; 1171 int free, sent = 0; 1172 int rc = 0; 1173 1174 LASSERT(exp != NULL); 1175 LASSERT(count > 0); 1176 1177 CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL, cfs_fail_val); 1178 1179 if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE)) 1180 return count; 1181 1182 free = ldlm_format_handles_avail(class_exp2cliimp(exp), 1183 &RQF_LDLM_CANCEL, RCL_CLIENT, 0); 1184 if (count > free) 1185 count = free; 1186 1187 while (1) { 1188 imp = class_exp2cliimp(exp); 1189 if (imp == NULL || imp->imp_invalid) { 1190 CDEBUG(D_DLMTRACE, 1191 "skipping cancel on invalid import %p\n", imp); 1192 return count; 1193 } 1194 1195 req = ptlrpc_request_alloc(imp, &RQF_LDLM_CANCEL); 1196 if (req == NULL) { 1197 rc = -ENOMEM; 1198 goto out; 1199 } 1200 1201 req_capsule_filled_sizes(&req->rq_pill, RCL_CLIENT); 1202 req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT, 1203 ldlm_request_bufsize(count, LDLM_CANCEL)); 1204 1205 rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CANCEL); 1206 if (rc) { 1207 ptlrpc_request_free(req); 1208 goto out; 1209 } 1210 1211 req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL; 1212 req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL; 1213 ptlrpc_at_set_req_timeout(req); 1214 1215 ldlm_cancel_pack(req, cancels, count); 1216 1217 ptlrpc_request_set_replen(req); 1218 if (flags & LCF_ASYNC) { 1219 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1); 1220 sent = count; 1221 goto out; 1222 } else { 1223 rc = ptlrpc_queue_wait(req); 1224 } 1225 if (rc == LUSTRE_ESTALE) { 1226 CDEBUG(D_DLMTRACE, "client/server (nid %s) " 1227 "out of sync -- not fatal\n", 1228 libcfs_nid2str(req->rq_import-> 1229 imp_connection->c_peer.nid)); 1230 rc = 0; 1231 } else if (rc == -ETIMEDOUT && /* check there was no reconnect*/ 1232 req->rq_import_generation == imp->imp_generation) { 1233 ptlrpc_req_finished(req); 1234 continue; 1235 } else if (rc != ELDLM_OK) { 1236 /* -ESHUTDOWN is common on umount */ 1237 CDEBUG_LIMIT(rc == -ESHUTDOWN ? D_DLMTRACE : D_ERROR, 1238 "Got rc %d from cancel RPC: " 1239 "canceling anyway\n", rc); 1240 break; 1241 } 1242 sent = count; 1243 break; 1244 } 1245 1246 ptlrpc_req_finished(req); 1247out: 1248 return sent ? sent : rc; 1249} 1250EXPORT_SYMBOL(ldlm_cli_cancel_req); 1251 1252static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp) 1253{ 1254 LASSERT(imp != NULL); 1255 return &imp->imp_obd->obd_namespace->ns_pool; 1256} 1257 1258/** 1259 * Update client's OBD pool related fields with new SLV and Limit from \a req. 1260 */ 1261int ldlm_cli_update_pool(struct ptlrpc_request *req) 1262{ 1263 struct obd_device *obd; 1264 __u64 new_slv; 1265 __u32 new_limit; 1266 1267 if (unlikely(!req->rq_import || !req->rq_import->imp_obd || 1268 !imp_connect_lru_resize(req->rq_import))) { 1269 /* 1270 * Do nothing for corner cases. 1271 */ 1272 return 0; 1273 } 1274 1275 /* In some cases RPC may contain SLV and limit zeroed out. This 1276 * is the case when server does not support LRU resize feature. 1277 * This is also possible in some recovery cases when server-side 1278 * reqs have no reference to the OBD export and thus access to 1279 * server-side namespace is not possible. */ 1280 if (lustre_msg_get_slv(req->rq_repmsg) == 0 || 1281 lustre_msg_get_limit(req->rq_repmsg) == 0) { 1282 DEBUG_REQ(D_HA, req, "Zero SLV or Limit found (SLV: %llu, Limit: %u)", 1283 lustre_msg_get_slv(req->rq_repmsg), 1284 lustre_msg_get_limit(req->rq_repmsg)); 1285 return 0; 1286 } 1287 1288 new_limit = lustre_msg_get_limit(req->rq_repmsg); 1289 new_slv = lustre_msg_get_slv(req->rq_repmsg); 1290 obd = req->rq_import->imp_obd; 1291 1292 /* Set new SLV and limit in OBD fields to make them accessible 1293 * to the pool thread. We do not access obd_namespace and pool 1294 * directly here as there is no reliable way to make sure that 1295 * they are still alive at cleanup time. Evil races are possible 1296 * which may cause Oops at that time. */ 1297 write_lock(&obd->obd_pool_lock); 1298 obd->obd_pool_slv = new_slv; 1299 obd->obd_pool_limit = new_limit; 1300 write_unlock(&obd->obd_pool_lock); 1301 1302 return 0; 1303} 1304EXPORT_SYMBOL(ldlm_cli_update_pool); 1305 1306/** 1307 * Client side lock cancel. 1308 * 1309 * Lock must not have any readers or writers by this time. 1310 */ 1311int ldlm_cli_cancel(struct lustre_handle *lockh, 1312 ldlm_cancel_flags_t cancel_flags) 1313{ 1314 struct obd_export *exp; 1315 int avail, flags, count = 1; 1316 __u64 rc = 0; 1317 struct ldlm_namespace *ns; 1318 struct ldlm_lock *lock; 1319 LIST_HEAD(cancels); 1320 1321 /* concurrent cancels on the same handle can happen */ 1322 lock = ldlm_handle2lock_long(lockh, LDLM_FL_CANCELING); 1323 if (lock == NULL) { 1324 LDLM_DEBUG_NOLOCK("lock is already being destroyed\n"); 1325 return 0; 1326 } 1327 1328 rc = ldlm_cli_cancel_local(lock); 1329 if (rc == LDLM_FL_LOCAL_ONLY || cancel_flags & LCF_LOCAL) { 1330 LDLM_LOCK_RELEASE(lock); 1331 return 0; 1332 } 1333 /* Even if the lock is marked as LDLM_FL_BL_AST, this is a LDLM_CANCEL 1334 * RPC which goes to canceld portal, so we can cancel other LRU locks 1335 * here and send them all as one LDLM_CANCEL RPC. */ 1336 LASSERT(list_empty(&lock->l_bl_ast)); 1337 list_add(&lock->l_bl_ast, &cancels); 1338 1339 exp = lock->l_conn_export; 1340 if (exp_connect_cancelset(exp)) { 1341 avail = ldlm_format_handles_avail(class_exp2cliimp(exp), 1342 &RQF_LDLM_CANCEL, 1343 RCL_CLIENT, 0); 1344 LASSERT(avail > 0); 1345 1346 ns = ldlm_lock_to_ns(lock); 1347 flags = ns_connect_lru_resize(ns) ? 1348 LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED; 1349 count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1, 1350 LCF_BL_AST, flags); 1351 } 1352 ldlm_cli_cancel_list(&cancels, count, NULL, cancel_flags); 1353 return 0; 1354} 1355EXPORT_SYMBOL(ldlm_cli_cancel); 1356 1357/** 1358 * Locally cancel up to \a count locks in list \a cancels. 1359 * Return the number of cancelled locks. 1360 */ 1361int ldlm_cli_cancel_list_local(struct list_head *cancels, int count, 1362 ldlm_cancel_flags_t flags) 1363{ 1364 LIST_HEAD(head); 1365 struct ldlm_lock *lock, *next; 1366 int left = 0, bl_ast = 0; 1367 __u64 rc; 1368 1369 left = count; 1370 list_for_each_entry_safe(lock, next, cancels, l_bl_ast) { 1371 if (left-- == 0) 1372 break; 1373 1374 if (flags & LCF_LOCAL) { 1375 rc = LDLM_FL_LOCAL_ONLY; 1376 ldlm_lock_cancel(lock); 1377 } else { 1378 rc = ldlm_cli_cancel_local(lock); 1379 } 1380 /* Until we have compound requests and can send LDLM_CANCEL 1381 * requests batched with generic RPCs, we need to send cancels 1382 * with the LDLM_FL_BL_AST flag in a separate RPC from 1383 * the one being generated now. */ 1384 if (!(flags & LCF_BL_AST) && (rc == LDLM_FL_BL_AST)) { 1385 LDLM_DEBUG(lock, "Cancel lock separately"); 1386 list_del_init(&lock->l_bl_ast); 1387 list_add(&lock->l_bl_ast, &head); 1388 bl_ast++; 1389 continue; 1390 } 1391 if (rc == LDLM_FL_LOCAL_ONLY) { 1392 /* CANCEL RPC should not be sent to server. */ 1393 list_del_init(&lock->l_bl_ast); 1394 LDLM_LOCK_RELEASE(lock); 1395 count--; 1396 } 1397 } 1398 if (bl_ast > 0) { 1399 count -= bl_ast; 1400 ldlm_cli_cancel_list(&head, bl_ast, NULL, 0); 1401 } 1402 1403 return count; 1404} 1405EXPORT_SYMBOL(ldlm_cli_cancel_list_local); 1406 1407/** 1408 * Cancel as many locks as possible w/o sending any RPCs (e.g. to write back 1409 * dirty data, to close a file, ...) or waiting for any RPCs in-flight (e.g. 1410 * readahead requests, ...) 1411 */ 1412static ldlm_policy_res_t ldlm_cancel_no_wait_policy(struct ldlm_namespace *ns, 1413 struct ldlm_lock *lock, 1414 int unused, int added, 1415 int count) 1416{ 1417 ldlm_policy_res_t result = LDLM_POLICY_CANCEL_LOCK; 1418 ldlm_cancel_for_recovery cb = ns->ns_cancel_for_recovery; 1419 lock_res_and_lock(lock); 1420 1421 /* don't check added & count since we want to process all locks 1422 * from unused list */ 1423 switch (lock->l_resource->lr_type) { 1424 case LDLM_EXTENT: 1425 case LDLM_IBITS: 1426 if (cb && cb(lock)) 1427 break; 1428 default: 1429 result = LDLM_POLICY_SKIP_LOCK; 1430 lock->l_flags |= LDLM_FL_SKIPPED; 1431 break; 1432 } 1433 1434 unlock_res_and_lock(lock); 1435 return result; 1436} 1437 1438/** 1439 * Callback function for LRU-resize policy. Decides whether to keep 1440 * \a lock in LRU for current \a LRU size \a unused, added in current 1441 * scan \a added and number of locks to be preferably canceled \a count. 1442 * 1443 * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning 1444 * 1445 * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU 1446 */ 1447static ldlm_policy_res_t ldlm_cancel_lrur_policy(struct ldlm_namespace *ns, 1448 struct ldlm_lock *lock, 1449 int unused, int added, 1450 int count) 1451{ 1452 unsigned long cur = cfs_time_current(); 1453 struct ldlm_pool *pl = &ns->ns_pool; 1454 __u64 slv, lvf, lv; 1455 unsigned long la; 1456 1457 /* Stop LRU processing when we reach past @count or have checked all 1458 * locks in LRU. */ 1459 if (count && added >= count) 1460 return LDLM_POLICY_KEEP_LOCK; 1461 1462 slv = ldlm_pool_get_slv(pl); 1463 lvf = ldlm_pool_get_lvf(pl); 1464 la = cfs_duration_sec(cfs_time_sub(cur, 1465 lock->l_last_used)); 1466 lv = lvf * la * unused; 1467 1468 /* Inform pool about current CLV to see it via proc. */ 1469 ldlm_pool_set_clv(pl, lv); 1470 1471 /* Stop when SLV is not yet come from server or lv is smaller than 1472 * it is. */ 1473 return (slv == 0 || lv < slv) ? 1474 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK; 1475} 1476 1477/** 1478 * Callback function for proc used policy. Makes decision whether to keep 1479 * \a lock in LRU for current \a LRU size \a unused, added in current scan \a 1480 * added and number of locks to be preferably canceled \a count. 1481 * 1482 * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning 1483 * 1484 * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU 1485 */ 1486static ldlm_policy_res_t ldlm_cancel_passed_policy(struct ldlm_namespace *ns, 1487 struct ldlm_lock *lock, 1488 int unused, int added, 1489 int count) 1490{ 1491 /* Stop LRU processing when we reach past @count or have checked all 1492 * locks in LRU. */ 1493 return (added >= count) ? 1494 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK; 1495} 1496 1497/** 1498 * Callback function for aged policy. Makes decision whether to keep \a lock in 1499 * LRU for current LRU size \a unused, added in current scan \a added and 1500 * number of locks to be preferably canceled \a count. 1501 * 1502 * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning 1503 * 1504 * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU 1505 */ 1506static ldlm_policy_res_t ldlm_cancel_aged_policy(struct ldlm_namespace *ns, 1507 struct ldlm_lock *lock, 1508 int unused, int added, 1509 int count) 1510{ 1511 /* Stop LRU processing if young lock is found and we reach past count */ 1512 return ((added >= count) && 1513 time_before(cfs_time_current(), 1514 cfs_time_add(lock->l_last_used, ns->ns_max_age))) ? 1515 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK; 1516} 1517 1518/** 1519 * Callback function for default policy. Makes decision whether to keep \a lock 1520 * in LRU for current LRU size \a unused, added in current scan \a added and 1521 * number of locks to be preferably canceled \a count. 1522 * 1523 * \retval LDLM_POLICY_KEEP_LOCK keep lock in LRU in stop scanning 1524 * 1525 * \retval LDLM_POLICY_CANCEL_LOCK cancel lock from LRU 1526 */ 1527static ldlm_policy_res_t ldlm_cancel_default_policy(struct ldlm_namespace *ns, 1528 struct ldlm_lock *lock, 1529 int unused, int added, 1530 int count) 1531{ 1532 /* Stop LRU processing when we reach past count or have checked all 1533 * locks in LRU. */ 1534 return (added >= count) ? 1535 LDLM_POLICY_KEEP_LOCK : LDLM_POLICY_CANCEL_LOCK; 1536} 1537 1538typedef ldlm_policy_res_t (*ldlm_cancel_lru_policy_t)(struct ldlm_namespace *, 1539 struct ldlm_lock *, int, 1540 int, int); 1541 1542static ldlm_cancel_lru_policy_t 1543ldlm_cancel_lru_policy(struct ldlm_namespace *ns, int flags) 1544{ 1545 if (flags & LDLM_CANCEL_NO_WAIT) 1546 return ldlm_cancel_no_wait_policy; 1547 1548 if (ns_connect_lru_resize(ns)) { 1549 if (flags & LDLM_CANCEL_SHRINK) 1550 /* We kill passed number of old locks. */ 1551 return ldlm_cancel_passed_policy; 1552 else if (flags & LDLM_CANCEL_LRUR) 1553 return ldlm_cancel_lrur_policy; 1554 else if (flags & LDLM_CANCEL_PASSED) 1555 return ldlm_cancel_passed_policy; 1556 } else { 1557 if (flags & LDLM_CANCEL_AGED) 1558 return ldlm_cancel_aged_policy; 1559 } 1560 1561 return ldlm_cancel_default_policy; 1562} 1563 1564/** 1565 * - Free space in LRU for \a count new locks, 1566 * redundant unused locks are canceled locally; 1567 * - also cancel locally unused aged locks; 1568 * - do not cancel more than \a max locks; 1569 * - GET the found locks and add them into the \a cancels list. 1570 * 1571 * A client lock can be added to the l_bl_ast list only when it is 1572 * marked LDLM_FL_CANCELING. Otherwise, somebody is already doing 1573 * CANCEL. There are the following use cases: 1574 * ldlm_cancel_resource_local(), ldlm_cancel_lru_local() and 1575 * ldlm_cli_cancel(), which check and set this flag properly. As any 1576 * attempt to cancel a lock rely on this flag, l_bl_ast list is accessed 1577 * later without any special locking. 1578 * 1579 * Calling policies for enabled LRU resize: 1580 * ---------------------------------------- 1581 * flags & LDLM_CANCEL_LRUR - use LRU resize policy (SLV from server) to 1582 * cancel not more than \a count locks; 1583 * 1584 * flags & LDLM_CANCEL_PASSED - cancel \a count number of old locks (located at 1585 * the beginning of LRU list); 1586 * 1587 * flags & LDLM_CANCEL_SHRINK - cancel not more than \a count locks according to 1588 * memory pressure policy function; 1589 * 1590 * flags & LDLM_CANCEL_AGED - cancel \a count locks according to "aged policy". 1591 * 1592 * flags & LDLM_CANCEL_NO_WAIT - cancel as many unused locks as possible 1593 * (typically before replaying locks) w/o 1594 * sending any RPCs or waiting for any 1595 * outstanding RPC to complete. 1596 */ 1597static int ldlm_prepare_lru_list(struct ldlm_namespace *ns, struct list_head *cancels, 1598 int count, int max, int flags) 1599{ 1600 ldlm_cancel_lru_policy_t pf; 1601 struct ldlm_lock *lock, *next; 1602 int added = 0, unused, remained; 1603 1604 spin_lock(&ns->ns_lock); 1605 unused = ns->ns_nr_unused; 1606 remained = unused; 1607 1608 if (!ns_connect_lru_resize(ns)) 1609 count += unused - ns->ns_max_unused; 1610 1611 pf = ldlm_cancel_lru_policy(ns, flags); 1612 LASSERT(pf != NULL); 1613 1614 while (!list_empty(&ns->ns_unused_list)) { 1615 ldlm_policy_res_t result; 1616 1617 /* all unused locks */ 1618 if (remained-- <= 0) 1619 break; 1620 1621 /* For any flags, stop scanning if @max is reached. */ 1622 if (max && added >= max) 1623 break; 1624 1625 list_for_each_entry_safe(lock, next, &ns->ns_unused_list, 1626 l_lru) { 1627 /* No locks which got blocking requests. */ 1628 LASSERT(!(lock->l_flags & LDLM_FL_BL_AST)); 1629 1630 if (flags & LDLM_CANCEL_NO_WAIT && 1631 lock->l_flags & LDLM_FL_SKIPPED) 1632 /* already processed */ 1633 continue; 1634 1635 /* Somebody is already doing CANCEL. No need for this 1636 * lock in LRU, do not traverse it again. */ 1637 if (!(lock->l_flags & LDLM_FL_CANCELING)) 1638 break; 1639 1640 ldlm_lock_remove_from_lru_nolock(lock); 1641 } 1642 if (&lock->l_lru == &ns->ns_unused_list) 1643 break; 1644 1645 LDLM_LOCK_GET(lock); 1646 spin_unlock(&ns->ns_lock); 1647 lu_ref_add(&lock->l_reference, __func__, current); 1648 1649 /* Pass the lock through the policy filter and see if it 1650 * should stay in LRU. 1651 * 1652 * Even for shrinker policy we stop scanning if 1653 * we find a lock that should stay in the cache. 1654 * We should take into account lock age anyway 1655 * as a new lock is a valuable resource even if 1656 * it has a low weight. 1657 * 1658 * That is, for shrinker policy we drop only 1659 * old locks, but additionally choose them by 1660 * their weight. Big extent locks will stay in 1661 * the cache. */ 1662 result = pf(ns, lock, unused, added, count); 1663 if (result == LDLM_POLICY_KEEP_LOCK) { 1664 lu_ref_del(&lock->l_reference, 1665 __func__, current); 1666 LDLM_LOCK_RELEASE(lock); 1667 spin_lock(&ns->ns_lock); 1668 break; 1669 } 1670 if (result == LDLM_POLICY_SKIP_LOCK) { 1671 lu_ref_del(&lock->l_reference, 1672 __func__, current); 1673 LDLM_LOCK_RELEASE(lock); 1674 spin_lock(&ns->ns_lock); 1675 continue; 1676 } 1677 1678 lock_res_and_lock(lock); 1679 /* Check flags again under the lock. */ 1680 if ((lock->l_flags & LDLM_FL_CANCELING) || 1681 (ldlm_lock_remove_from_lru(lock) == 0)) { 1682 /* Another thread is removing lock from LRU, or 1683 * somebody is already doing CANCEL, or there 1684 * is a blocking request which will send cancel 1685 * by itself, or the lock is no longer unused. */ 1686 unlock_res_and_lock(lock); 1687 lu_ref_del(&lock->l_reference, 1688 __func__, current); 1689 LDLM_LOCK_RELEASE(lock); 1690 spin_lock(&ns->ns_lock); 1691 continue; 1692 } 1693 LASSERT(!lock->l_readers && !lock->l_writers); 1694 1695 /* If we have chosen to cancel this lock voluntarily, we 1696 * better send cancel notification to server, so that it 1697 * frees appropriate state. This might lead to a race 1698 * where while we are doing cancel here, server is also 1699 * silently cancelling this lock. */ 1700 lock->l_flags &= ~LDLM_FL_CANCEL_ON_BLOCK; 1701 1702 /* Setting the CBPENDING flag is a little misleading, 1703 * but prevents an important race; namely, once 1704 * CBPENDING is set, the lock can accumulate no more 1705 * readers/writers. Since readers and writers are 1706 * already zero here, ldlm_lock_decref() won't see 1707 * this flag and call l_blocking_ast */ 1708 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING; 1709 1710 /* We can't re-add to l_lru as it confuses the 1711 * refcounting in ldlm_lock_remove_from_lru() if an AST 1712 * arrives after we drop lr_lock below. We use l_bl_ast 1713 * and can't use l_pending_chain as it is used both on 1714 * server and client nevertheless bug 5666 says it is 1715 * used only on server */ 1716 LASSERT(list_empty(&lock->l_bl_ast)); 1717 list_add(&lock->l_bl_ast, cancels); 1718 unlock_res_and_lock(lock); 1719 lu_ref_del(&lock->l_reference, __func__, current); 1720 spin_lock(&ns->ns_lock); 1721 added++; 1722 unused--; 1723 } 1724 spin_unlock(&ns->ns_lock); 1725 return added; 1726} 1727 1728int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, 1729 int count, int max, ldlm_cancel_flags_t cancel_flags, 1730 int flags) 1731{ 1732 int added; 1733 added = ldlm_prepare_lru_list(ns, cancels, count, max, flags); 1734 if (added <= 0) 1735 return added; 1736 return ldlm_cli_cancel_list_local(cancels, added, cancel_flags); 1737} 1738 1739/** 1740 * Cancel at least \a nr locks from given namespace LRU. 1741 * 1742 * When called with LCF_ASYNC the blocking callback will be handled 1743 * in a thread and this function will return after the thread has been 1744 * asked to call the callback. When called with LCF_ASYNC the blocking 1745 * callback will be performed in this function. 1746 */ 1747int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, 1748 ldlm_cancel_flags_t cancel_flags, 1749 int flags) 1750{ 1751 LIST_HEAD(cancels); 1752 int count, rc; 1753 1754 /* Just prepare the list of locks, do not actually cancel them yet. 1755 * Locks are cancelled later in a separate thread. */ 1756 count = ldlm_prepare_lru_list(ns, &cancels, nr, 0, flags); 1757 rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count, cancel_flags); 1758 if (rc == 0) 1759 return count; 1760 1761 return 0; 1762} 1763 1764/** 1765 * Find and cancel locally unused locks found on resource, matched to the 1766 * given policy, mode. GET the found locks and add them into the \a cancels 1767 * list. 1768 */ 1769int ldlm_cancel_resource_local(struct ldlm_resource *res, 1770 struct list_head *cancels, 1771 ldlm_policy_data_t *policy, 1772 ldlm_mode_t mode, __u64 lock_flags, 1773 ldlm_cancel_flags_t cancel_flags, void *opaque) 1774{ 1775 struct ldlm_lock *lock; 1776 int count = 0; 1777 1778 lock_res(res); 1779 list_for_each_entry(lock, &res->lr_granted, l_res_link) { 1780 if (opaque != NULL && lock->l_ast_data != opaque) { 1781 LDLM_ERROR(lock, "data %p doesn't match opaque %p", 1782 lock->l_ast_data, opaque); 1783 //LBUG(); 1784 continue; 1785 } 1786 1787 if (lock->l_readers || lock->l_writers) 1788 continue; 1789 1790 /* If somebody is already doing CANCEL, or blocking AST came, 1791 * skip this lock. */ 1792 if (lock->l_flags & LDLM_FL_BL_AST || 1793 lock->l_flags & LDLM_FL_CANCELING) 1794 continue; 1795 1796 if (lockmode_compat(lock->l_granted_mode, mode)) 1797 continue; 1798 1799 /* If policy is given and this is IBITS lock, add to list only 1800 * those locks that match by policy. */ 1801 if (policy && (lock->l_resource->lr_type == LDLM_IBITS) && 1802 !(lock->l_policy_data.l_inodebits.bits & 1803 policy->l_inodebits.bits)) 1804 continue; 1805 1806 /* See CBPENDING comment in ldlm_cancel_lru */ 1807 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING | 1808 lock_flags; 1809 1810 LASSERT(list_empty(&lock->l_bl_ast)); 1811 list_add(&lock->l_bl_ast, cancels); 1812 LDLM_LOCK_GET(lock); 1813 count++; 1814 } 1815 unlock_res(res); 1816 1817 return ldlm_cli_cancel_list_local(cancels, count, cancel_flags); 1818} 1819EXPORT_SYMBOL(ldlm_cancel_resource_local); 1820 1821/** 1822 * Cancel client-side locks from a list and send/prepare cancel RPCs to the 1823 * server. 1824 * If \a req is NULL, send CANCEL request to server with handles of locks 1825 * in the \a cancels. If EARLY_CANCEL is not supported, send CANCEL requests 1826 * separately per lock. 1827 * If \a req is not NULL, put handles of locks in \a cancels into the request 1828 * buffer at the offset \a off. 1829 * Destroy \a cancels at the end. 1830 */ 1831int ldlm_cli_cancel_list(struct list_head *cancels, int count, 1832 struct ptlrpc_request *req, ldlm_cancel_flags_t flags) 1833{ 1834 struct ldlm_lock *lock; 1835 int res = 0; 1836 1837 if (list_empty(cancels) || count == 0) 1838 return 0; 1839 1840 /* XXX: requests (both batched and not) could be sent in parallel. 1841 * Usually it is enough to have just 1 RPC, but it is possible that 1842 * there are too many locks to be cancelled in LRU or on a resource. 1843 * It would also speed up the case when the server does not support 1844 * the feature. */ 1845 while (count > 0) { 1846 LASSERT(!list_empty(cancels)); 1847 lock = list_entry(cancels->next, struct ldlm_lock, 1848 l_bl_ast); 1849 LASSERT(lock->l_conn_export); 1850 1851 if (exp_connect_cancelset(lock->l_conn_export)) { 1852 res = count; 1853 if (req) 1854 ldlm_cancel_pack(req, cancels, count); 1855 else 1856 res = ldlm_cli_cancel_req(lock->l_conn_export, 1857 cancels, count, 1858 flags); 1859 } else { 1860 res = ldlm_cli_cancel_req(lock->l_conn_export, 1861 cancels, 1, flags); 1862 } 1863 1864 if (res < 0) { 1865 CDEBUG_LIMIT(res == -ESHUTDOWN ? D_DLMTRACE : D_ERROR, 1866 "ldlm_cli_cancel_list: %d\n", res); 1867 res = count; 1868 } 1869 1870 count -= res; 1871 ldlm_lock_list_put(cancels, l_bl_ast, res); 1872 } 1873 LASSERT(count == 0); 1874 return 0; 1875} 1876EXPORT_SYMBOL(ldlm_cli_cancel_list); 1877 1878/** 1879 * Cancel all locks on a resource that have 0 readers/writers. 1880 * 1881 * If flags & LDLM_FL_LOCAL_ONLY, throw the locks away without trying 1882 * to notify the server. */ 1883int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, 1884 const struct ldlm_res_id *res_id, 1885 ldlm_policy_data_t *policy, 1886 ldlm_mode_t mode, 1887 ldlm_cancel_flags_t flags, 1888 void *opaque) 1889{ 1890 struct ldlm_resource *res; 1891 LIST_HEAD(cancels); 1892 int count; 1893 int rc; 1894 1895 res = ldlm_resource_get(ns, NULL, res_id, 0, 0); 1896 if (res == NULL) { 1897 /* This is not a problem. */ 1898 CDEBUG(D_INFO, "No resource %llu\n", res_id->name[0]); 1899 return 0; 1900 } 1901 1902 LDLM_RESOURCE_ADDREF(res); 1903 count = ldlm_cancel_resource_local(res, &cancels, policy, mode, 1904 0, flags | LCF_BL_AST, opaque); 1905 rc = ldlm_cli_cancel_list(&cancels, count, NULL, flags); 1906 if (rc != ELDLM_OK) 1907 CERROR("canceling unused lock "DLDLMRES": rc = %d\n", 1908 PLDLMRES(res), rc); 1909 1910 LDLM_RESOURCE_DELREF(res); 1911 ldlm_resource_putref(res); 1912 return 0; 1913} 1914EXPORT_SYMBOL(ldlm_cli_cancel_unused_resource); 1915 1916struct ldlm_cli_cancel_arg { 1917 int lc_flags; 1918 void *lc_opaque; 1919}; 1920 1921static int ldlm_cli_hash_cancel_unused(struct cfs_hash *hs, struct cfs_hash_bd *bd, 1922 struct hlist_node *hnode, void *arg) 1923{ 1924 struct ldlm_resource *res = cfs_hash_object(hs, hnode); 1925 struct ldlm_cli_cancel_arg *lc = arg; 1926 1927 ldlm_cli_cancel_unused_resource(ldlm_res_to_ns(res), &res->lr_name, 1928 NULL, LCK_MINMODE, 1929 lc->lc_flags, lc->lc_opaque); 1930 /* must return 0 for hash iteration */ 1931 return 0; 1932} 1933 1934/** 1935 * Cancel all locks on a namespace (or a specific resource, if given) 1936 * that have 0 readers/writers. 1937 * 1938 * If flags & LCF_LOCAL, throw the locks away without trying 1939 * to notify the server. */ 1940int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, 1941 const struct ldlm_res_id *res_id, 1942 ldlm_cancel_flags_t flags, void *opaque) 1943{ 1944 struct ldlm_cli_cancel_arg arg = { 1945 .lc_flags = flags, 1946 .lc_opaque = opaque, 1947 }; 1948 1949 if (ns == NULL) 1950 return ELDLM_OK; 1951 1952 if (res_id != NULL) { 1953 return ldlm_cli_cancel_unused_resource(ns, res_id, NULL, 1954 LCK_MINMODE, flags, 1955 opaque); 1956 } else { 1957 cfs_hash_for_each_nolock(ns->ns_rs_hash, 1958 ldlm_cli_hash_cancel_unused, &arg); 1959 return ELDLM_OK; 1960 } 1961} 1962EXPORT_SYMBOL(ldlm_cli_cancel_unused); 1963 1964/* Lock iterators. */ 1965 1966int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter, 1967 void *closure) 1968{ 1969 struct list_head *tmp, *next; 1970 struct ldlm_lock *lock; 1971 int rc = LDLM_ITER_CONTINUE; 1972 1973 if (!res) 1974 return LDLM_ITER_CONTINUE; 1975 1976 lock_res(res); 1977 list_for_each_safe(tmp, next, &res->lr_granted) { 1978 lock = list_entry(tmp, struct ldlm_lock, l_res_link); 1979 1980 if (iter(lock, closure) == LDLM_ITER_STOP) { 1981 rc = LDLM_ITER_STOP; 1982 goto out; 1983 } 1984 } 1985 1986 list_for_each_safe(tmp, next, &res->lr_converting) { 1987 lock = list_entry(tmp, struct ldlm_lock, l_res_link); 1988 1989 if (iter(lock, closure) == LDLM_ITER_STOP) { 1990 rc = LDLM_ITER_STOP; 1991 goto out; 1992 } 1993 } 1994 1995 list_for_each_safe(tmp, next, &res->lr_waiting) { 1996 lock = list_entry(tmp, struct ldlm_lock, l_res_link); 1997 1998 if (iter(lock, closure) == LDLM_ITER_STOP) { 1999 rc = LDLM_ITER_STOP; 2000 goto out; 2001 } 2002 } 2003 out: 2004 unlock_res(res); 2005 return rc; 2006} 2007EXPORT_SYMBOL(ldlm_resource_foreach); 2008 2009struct iter_helper_data { 2010 ldlm_iterator_t iter; 2011 void *closure; 2012}; 2013 2014static int ldlm_iter_helper(struct ldlm_lock *lock, void *closure) 2015{ 2016 struct iter_helper_data *helper = closure; 2017 return helper->iter(lock, helper->closure); 2018} 2019 2020static int ldlm_res_iter_helper(struct cfs_hash *hs, struct cfs_hash_bd *bd, 2021 struct hlist_node *hnode, void *arg) 2022 2023{ 2024 struct ldlm_resource *res = cfs_hash_object(hs, hnode); 2025 2026 return ldlm_resource_foreach(res, ldlm_iter_helper, arg) == 2027 LDLM_ITER_STOP; 2028} 2029 2030void ldlm_namespace_foreach(struct ldlm_namespace *ns, 2031 ldlm_iterator_t iter, void *closure) 2032 2033{ 2034 struct iter_helper_data helper = { 2035 .iter = iter, 2036 .closure = closure, 2037 }; 2038 2039 cfs_hash_for_each_nolock(ns->ns_rs_hash, 2040 ldlm_res_iter_helper, &helper); 2041 2042} 2043EXPORT_SYMBOL(ldlm_namespace_foreach); 2044 2045/* non-blocking function to manipulate a lock whose cb_data is being put away. 2046 * return 0: find no resource 2047 * > 0: must be LDLM_ITER_STOP/LDLM_ITER_CONTINUE. 2048 * < 0: errors 2049 */ 2050int ldlm_resource_iterate(struct ldlm_namespace *ns, 2051 const struct ldlm_res_id *res_id, 2052 ldlm_iterator_t iter, void *data) 2053{ 2054 struct ldlm_resource *res; 2055 int rc; 2056 2057 if (ns == NULL) { 2058 CERROR("must pass in namespace\n"); 2059 LBUG(); 2060 } 2061 2062 res = ldlm_resource_get(ns, NULL, res_id, 0, 0); 2063 if (res == NULL) 2064 return 0; 2065 2066 LDLM_RESOURCE_ADDREF(res); 2067 rc = ldlm_resource_foreach(res, iter, data); 2068 LDLM_RESOURCE_DELREF(res); 2069 ldlm_resource_putref(res); 2070 return rc; 2071} 2072EXPORT_SYMBOL(ldlm_resource_iterate); 2073 2074/* Lock replay */ 2075 2076static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure) 2077{ 2078 struct list_head *list = closure; 2079 2080 /* we use l_pending_chain here, because it's unused on clients. */ 2081 LASSERTF(list_empty(&lock->l_pending_chain), 2082 "lock %p next %p prev %p\n", 2083 lock, &lock->l_pending_chain.next,&lock->l_pending_chain.prev); 2084 /* bug 9573: don't replay locks left after eviction, or 2085 * bug 17614: locks being actively cancelled. Get a reference 2086 * on a lock so that it does not disappear under us (e.g. due to cancel) 2087 */ 2088 if (!(lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_CANCELING))) { 2089 list_add(&lock->l_pending_chain, list); 2090 LDLM_LOCK_GET(lock); 2091 } 2092 2093 return LDLM_ITER_CONTINUE; 2094} 2095 2096static int replay_lock_interpret(const struct lu_env *env, 2097 struct ptlrpc_request *req, 2098 struct ldlm_async_args *aa, int rc) 2099{ 2100 struct ldlm_lock *lock; 2101 struct ldlm_reply *reply; 2102 struct obd_export *exp; 2103 2104 atomic_dec(&req->rq_import->imp_replay_inflight); 2105 if (rc != ELDLM_OK) 2106 goto out; 2107 2108 2109 reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); 2110 if (reply == NULL) { 2111 rc = -EPROTO; 2112 goto out; 2113 } 2114 2115 lock = ldlm_handle2lock(&aa->lock_handle); 2116 if (!lock) { 2117 CERROR("received replay ack for unknown local cookie %#llx" 2118 " remote cookie %#llx from server %s id %s\n", 2119 aa->lock_handle.cookie, reply->lock_handle.cookie, 2120 req->rq_export->exp_client_uuid.uuid, 2121 libcfs_id2str(req->rq_peer)); 2122 rc = -ESTALE; 2123 goto out; 2124 } 2125 2126 /* Key change rehash lock in per-export hash with new key */ 2127 exp = req->rq_export; 2128 if (exp && exp->exp_lock_hash) { 2129 /* In the function below, .hs_keycmp resolves to 2130 * ldlm_export_lock_keycmp() */ 2131 /* coverity[overrun-buffer-val] */ 2132 cfs_hash_rehash_key(exp->exp_lock_hash, 2133 &lock->l_remote_handle, 2134 &reply->lock_handle, 2135 &lock->l_exp_hash); 2136 } else { 2137 lock->l_remote_handle = reply->lock_handle; 2138 } 2139 2140 LDLM_DEBUG(lock, "replayed lock:"); 2141 ptlrpc_import_recovery_state_machine(req->rq_import); 2142 LDLM_LOCK_PUT(lock); 2143out: 2144 if (rc != ELDLM_OK) 2145 ptlrpc_connect_import(req->rq_import); 2146 2147 return rc; 2148} 2149 2150static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock) 2151{ 2152 struct ptlrpc_request *req; 2153 struct ldlm_async_args *aa; 2154 struct ldlm_request *body; 2155 int flags; 2156 2157 /* Bug 11974: Do not replay a lock which is actively being canceled */ 2158 if (lock->l_flags & LDLM_FL_CANCELING) { 2159 LDLM_DEBUG(lock, "Not replaying canceled lock:"); 2160 return 0; 2161 } 2162 2163 /* If this is reply-less callback lock, we cannot replay it, since 2164 * server might have long dropped it, but notification of that event was 2165 * lost by network. (and server granted conflicting lock already) */ 2166 if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) { 2167 LDLM_DEBUG(lock, "Not replaying reply-less lock:"); 2168 ldlm_lock_cancel(lock); 2169 return 0; 2170 } 2171 2172 /* 2173 * If granted mode matches the requested mode, this lock is granted. 2174 * 2175 * If they differ, but we have a granted mode, then we were granted 2176 * one mode and now want another: ergo, converting. 2177 * 2178 * If we haven't been granted anything and are on a resource list, 2179 * then we're blocked/waiting. 2180 * 2181 * If we haven't been granted anything and we're NOT on a resource list, 2182 * then we haven't got a reply yet and don't have a known disposition. 2183 * This happens whenever a lock enqueue is the request that triggers 2184 * recovery. 2185 */ 2186 if (lock->l_granted_mode == lock->l_req_mode) 2187 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_GRANTED; 2188 else if (lock->l_granted_mode) 2189 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_CONV; 2190 else if (!list_empty(&lock->l_res_link)) 2191 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_WAIT; 2192 else 2193 flags = LDLM_FL_REPLAY; 2194 2195 req = ptlrpc_request_alloc_pack(imp, &RQF_LDLM_ENQUEUE, 2196 LUSTRE_DLM_VERSION, LDLM_ENQUEUE); 2197 if (req == NULL) 2198 return -ENOMEM; 2199 2200 /* We're part of recovery, so don't wait for it. */ 2201 req->rq_send_state = LUSTRE_IMP_REPLAY_LOCKS; 2202 2203 body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ); 2204 ldlm_lock2desc(lock, &body->lock_desc); 2205 body->lock_flags = ldlm_flags_to_wire(flags); 2206 2207 ldlm_lock2handle(lock, &body->lock_handle[0]); 2208 if (lock->l_lvb_len > 0) 2209 req_capsule_extend(&req->rq_pill, &RQF_LDLM_ENQUEUE_LVB); 2210 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, 2211 lock->l_lvb_len); 2212 ptlrpc_request_set_replen(req); 2213 /* notify the server we've replayed all requests. 2214 * also, we mark the request to be put on a dedicated 2215 * queue to be processed after all request replayes. 2216 * bug 6063 */ 2217 lustre_msg_set_flags(req->rq_reqmsg, MSG_REQ_REPLAY_DONE); 2218 2219 LDLM_DEBUG(lock, "replaying lock:"); 2220 2221 atomic_inc(&req->rq_import->imp_replay_inflight); 2222 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); 2223 aa = ptlrpc_req_async_args(req); 2224 aa->lock_handle = body->lock_handle[0]; 2225 req->rq_interpret_reply = (ptlrpc_interpterer_t)replay_lock_interpret; 2226 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1); 2227 2228 return 0; 2229} 2230 2231/** 2232 * Cancel as many unused locks as possible before replay. since we are 2233 * in recovery, we can't wait for any outstanding RPCs to send any RPC 2234 * to the server. 2235 * 2236 * Called only in recovery before replaying locks. there is no need to 2237 * replay locks that are unused. since the clients may hold thousands of 2238 * cached unused locks, dropping the unused locks can greatly reduce the 2239 * load on the servers at recovery time. 2240 */ 2241static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns) 2242{ 2243 int canceled; 2244 LIST_HEAD(cancels); 2245 2246 CDEBUG(D_DLMTRACE, "Dropping as many unused locks as possible before" 2247 "replay for namespace %s (%d)\n", 2248 ldlm_ns_name(ns), ns->ns_nr_unused); 2249 2250 /* We don't need to care whether or not LRU resize is enabled 2251 * because the LDLM_CANCEL_NO_WAIT policy doesn't use the 2252 * count parameter */ 2253 canceled = ldlm_cancel_lru_local(ns, &cancels, ns->ns_nr_unused, 0, 2254 LCF_LOCAL, LDLM_CANCEL_NO_WAIT); 2255 2256 CDEBUG(D_DLMTRACE, "Canceled %d unused locks from namespace %s\n", 2257 canceled, ldlm_ns_name(ns)); 2258} 2259 2260int ldlm_replay_locks(struct obd_import *imp) 2261{ 2262 struct ldlm_namespace *ns = imp->imp_obd->obd_namespace; 2263 LIST_HEAD(list); 2264 struct ldlm_lock *lock, *next; 2265 int rc = 0; 2266 2267 LASSERT(atomic_read(&imp->imp_replay_inflight) == 0); 2268 2269 /* don't replay locks if import failed recovery */ 2270 if (imp->imp_vbr_failed) 2271 return 0; 2272 2273 /* ensure this doesn't fall to 0 before all have been queued */ 2274 atomic_inc(&imp->imp_replay_inflight); 2275 2276 if (ldlm_cancel_unused_locks_before_replay) 2277 ldlm_cancel_unused_locks_for_replay(ns); 2278 2279 ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay, &list); 2280 2281 list_for_each_entry_safe(lock, next, &list, l_pending_chain) { 2282 list_del_init(&lock->l_pending_chain); 2283 if (rc) { 2284 LDLM_LOCK_RELEASE(lock); 2285 continue; /* or try to do the rest? */ 2286 } 2287 rc = replay_one_lock(imp, lock); 2288 LDLM_LOCK_RELEASE(lock); 2289 } 2290 2291 atomic_dec(&imp->imp_replay_inflight); 2292 2293 return rc; 2294} 2295EXPORT_SYMBOL(ldlm_replay_locks); 2296