[go: nahoru, domu]

1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 *
36 * Client Extent Lock.
37 *
38 *   Author: Nikita Danilov <nikita.danilov@sun.com>
39 */
40
41#define DEBUG_SUBSYSTEM S_CLASS
42
43#include "../include/obd_class.h"
44#include "../include/obd_support.h"
45#include "../include/lustre_fid.h"
46#include <linux/list.h>
47#include "../include/cl_object.h"
48#include "cl_internal.h"
49
50/** Lock class of cl_lock::cll_guard */
51static struct lock_class_key cl_lock_guard_class;
52static struct kmem_cache *cl_lock_kmem;
53
54static struct lu_kmem_descr cl_lock_caches[] = {
55	{
56		.ckd_cache = &cl_lock_kmem,
57		.ckd_name  = "cl_lock_kmem",
58		.ckd_size  = sizeof (struct cl_lock)
59	},
60	{
61		.ckd_cache = NULL
62	}
63};
64
65#define CS_LOCK_INC(o, item)
66#define CS_LOCK_DEC(o, item)
67#define CS_LOCKSTATE_INC(o, state)
68#define CS_LOCKSTATE_DEC(o, state)
69
70/**
71 * Basic lock invariant that is maintained at all times. Caller either has a
72 * reference to \a lock, or somehow assures that \a lock cannot be freed.
73 *
74 * \see cl_lock_invariant()
75 */
76static int cl_lock_invariant_trusted(const struct lu_env *env,
77				     const struct cl_lock *lock)
78{
79	return  ergo(lock->cll_state == CLS_FREEING, lock->cll_holds == 0) &&
80		atomic_read(&lock->cll_ref) >= lock->cll_holds &&
81		lock->cll_holds >= lock->cll_users &&
82		lock->cll_holds >= 0 &&
83		lock->cll_users >= 0 &&
84		lock->cll_depth >= 0;
85}
86
87/**
88 * Stronger lock invariant, checking that caller has a reference on a lock.
89 *
90 * \see cl_lock_invariant_trusted()
91 */
92static int cl_lock_invariant(const struct lu_env *env,
93			     const struct cl_lock *lock)
94{
95	int result;
96
97	result = atomic_read(&lock->cll_ref) > 0 &&
98		cl_lock_invariant_trusted(env, lock);
99	if (!result && env != NULL)
100		CL_LOCK_DEBUG(D_ERROR, env, lock, "invariant broken");
101	return result;
102}
103
104/**
105 * Returns lock "nesting": 0 for a top-lock and 1 for a sub-lock.
106 */
107static enum clt_nesting_level cl_lock_nesting(const struct cl_lock *lock)
108{
109	return cl_object_header(lock->cll_descr.cld_obj)->coh_nesting;
110}
111
112/**
113 * Returns a set of counters for this lock, depending on a lock nesting.
114 */
115static struct cl_thread_counters *cl_lock_counters(const struct lu_env *env,
116						   const struct cl_lock *lock)
117{
118	struct cl_thread_info *info;
119	enum clt_nesting_level nesting;
120
121	info = cl_env_info(env);
122	nesting = cl_lock_nesting(lock);
123	LASSERT(nesting < ARRAY_SIZE(info->clt_counters));
124	return &info->clt_counters[nesting];
125}
126
127static void cl_lock_trace0(int level, const struct lu_env *env,
128			   const char *prefix, const struct cl_lock *lock,
129			   const char *func, const int line)
130{
131	struct cl_object_header *h = cl_object_header(lock->cll_descr.cld_obj);
132	CDEBUG(level, "%s: %p@(%d %p %d %d %d %d %d %lx)"
133		      "(%p/%d/%d) at %s():%d\n",
134	       prefix, lock, atomic_read(&lock->cll_ref),
135	       lock->cll_guarder, lock->cll_depth,
136	       lock->cll_state, lock->cll_error, lock->cll_holds,
137	       lock->cll_users, lock->cll_flags,
138	       env, h->coh_nesting, cl_lock_nr_mutexed(env),
139	       func, line);
140}
141#define cl_lock_trace(level, env, prefix, lock)			 \
142	cl_lock_trace0(level, env, prefix, lock, __func__, __LINE__)
143
144#define RETIP ((unsigned long)__builtin_return_address(0))
145
146#ifdef CONFIG_LOCKDEP
147static struct lock_class_key cl_lock_key;
148
149static void cl_lock_lockdep_init(struct cl_lock *lock)
150{
151	lockdep_set_class_and_name(lock, &cl_lock_key, "EXT");
152}
153
154static void cl_lock_lockdep_acquire(const struct lu_env *env,
155				    struct cl_lock *lock, __u32 enqflags)
156{
157	cl_lock_counters(env, lock)->ctc_nr_locks_acquired++;
158	lock_map_acquire(&lock->dep_map);
159}
160
161static void cl_lock_lockdep_release(const struct lu_env *env,
162				    struct cl_lock *lock)
163{
164	cl_lock_counters(env, lock)->ctc_nr_locks_acquired--;
165	lock_release(&lock->dep_map, 0, RETIP);
166}
167
168#else /* !CONFIG_LOCKDEP */
169
170static void cl_lock_lockdep_init(struct cl_lock *lock)
171{}
172static void cl_lock_lockdep_acquire(const struct lu_env *env,
173				    struct cl_lock *lock, __u32 enqflags)
174{}
175static void cl_lock_lockdep_release(const struct lu_env *env,
176				    struct cl_lock *lock)
177{}
178
179#endif /* !CONFIG_LOCKDEP */
180
181/**
182 * Adds lock slice to the compound lock.
183 *
184 * This is called by cl_object_operations::coo_lock_init() methods to add a
185 * per-layer state to the lock. New state is added at the end of
186 * cl_lock::cll_layers list, that is, it is at the bottom of the stack.
187 *
188 * \see cl_req_slice_add(), cl_page_slice_add(), cl_io_slice_add()
189 */
190void cl_lock_slice_add(struct cl_lock *lock, struct cl_lock_slice *slice,
191		       struct cl_object *obj,
192		       const struct cl_lock_operations *ops)
193{
194	slice->cls_lock = lock;
195	list_add_tail(&slice->cls_linkage, &lock->cll_layers);
196	slice->cls_obj = obj;
197	slice->cls_ops = ops;
198}
199EXPORT_SYMBOL(cl_lock_slice_add);
200
201/**
202 * Returns true iff a lock with the mode \a has provides at least the same
203 * guarantees as a lock with the mode \a need.
204 */
205int cl_lock_mode_match(enum cl_lock_mode has, enum cl_lock_mode need)
206{
207	LINVRNT(need == CLM_READ || need == CLM_WRITE ||
208		need == CLM_PHANTOM || need == CLM_GROUP);
209	LINVRNT(has == CLM_READ || has == CLM_WRITE ||
210		has == CLM_PHANTOM || has == CLM_GROUP);
211	CLASSERT(CLM_PHANTOM < CLM_READ);
212	CLASSERT(CLM_READ < CLM_WRITE);
213	CLASSERT(CLM_WRITE < CLM_GROUP);
214
215	if (has != CLM_GROUP)
216		return need <= has;
217	else
218		return need == has;
219}
220EXPORT_SYMBOL(cl_lock_mode_match);
221
222/**
223 * Returns true iff extent portions of lock descriptions match.
224 */
225int cl_lock_ext_match(const struct cl_lock_descr *has,
226		      const struct cl_lock_descr *need)
227{
228	return
229		has->cld_start <= need->cld_start &&
230		has->cld_end >= need->cld_end &&
231		cl_lock_mode_match(has->cld_mode, need->cld_mode) &&
232		(has->cld_mode != CLM_GROUP || has->cld_gid == need->cld_gid);
233}
234EXPORT_SYMBOL(cl_lock_ext_match);
235
236/**
237 * Returns true iff a lock with the description \a has provides at least the
238 * same guarantees as a lock with the description \a need.
239 */
240int cl_lock_descr_match(const struct cl_lock_descr *has,
241			const struct cl_lock_descr *need)
242{
243	return
244		cl_object_same(has->cld_obj, need->cld_obj) &&
245		cl_lock_ext_match(has, need);
246}
247EXPORT_SYMBOL(cl_lock_descr_match);
248
249static void cl_lock_free(const struct lu_env *env, struct cl_lock *lock)
250{
251	struct cl_object *obj = lock->cll_descr.cld_obj;
252
253	LINVRNT(!cl_lock_is_mutexed(lock));
254
255	cl_lock_trace(D_DLMTRACE, env, "free lock", lock);
256	might_sleep();
257	while (!list_empty(&lock->cll_layers)) {
258		struct cl_lock_slice *slice;
259
260		slice = list_entry(lock->cll_layers.next,
261				       struct cl_lock_slice, cls_linkage);
262		list_del_init(lock->cll_layers.next);
263		slice->cls_ops->clo_fini(env, slice);
264	}
265	CS_LOCK_DEC(obj, total);
266	CS_LOCKSTATE_DEC(obj, lock->cll_state);
267	lu_object_ref_del_at(&obj->co_lu, &lock->cll_obj_ref, "cl_lock", lock);
268	cl_object_put(env, obj);
269	lu_ref_fini(&lock->cll_reference);
270	lu_ref_fini(&lock->cll_holders);
271	mutex_destroy(&lock->cll_guard);
272	OBD_SLAB_FREE_PTR(lock, cl_lock_kmem);
273}
274
275/**
276 * Releases a reference on a lock.
277 *
278 * When last reference is released, lock is returned to the cache, unless it
279 * is in cl_lock_state::CLS_FREEING state, in which case it is destroyed
280 * immediately.
281 *
282 * \see cl_object_put(), cl_page_put()
283 */
284void cl_lock_put(const struct lu_env *env, struct cl_lock *lock)
285{
286	struct cl_object	*obj;
287
288	LINVRNT(cl_lock_invariant(env, lock));
289	obj = lock->cll_descr.cld_obj;
290	LINVRNT(obj != NULL);
291
292	CDEBUG(D_TRACE, "releasing reference: %d %p %lu\n",
293	       atomic_read(&lock->cll_ref), lock, RETIP);
294
295	if (atomic_dec_and_test(&lock->cll_ref)) {
296		if (lock->cll_state == CLS_FREEING) {
297			LASSERT(list_empty(&lock->cll_linkage));
298			cl_lock_free(env, lock);
299		}
300		CS_LOCK_DEC(obj, busy);
301	}
302}
303EXPORT_SYMBOL(cl_lock_put);
304
305/**
306 * Acquires an additional reference to a lock.
307 *
308 * This can be called only by caller already possessing a reference to \a
309 * lock.
310 *
311 * \see cl_object_get(), cl_page_get()
312 */
313void cl_lock_get(struct cl_lock *lock)
314{
315	LINVRNT(cl_lock_invariant(NULL, lock));
316	CDEBUG(D_TRACE, "acquiring reference: %d %p %lu\n",
317	       atomic_read(&lock->cll_ref), lock, RETIP);
318	atomic_inc(&lock->cll_ref);
319}
320EXPORT_SYMBOL(cl_lock_get);
321
322/**
323 * Acquires a reference to a lock.
324 *
325 * This is much like cl_lock_get(), except that this function can be used to
326 * acquire initial reference to the cached lock. Caller has to deal with all
327 * possible races. Use with care!
328 *
329 * \see cl_page_get_trust()
330 */
331void cl_lock_get_trust(struct cl_lock *lock)
332{
333	CDEBUG(D_TRACE, "acquiring trusted reference: %d %p %lu\n",
334	       atomic_read(&lock->cll_ref), lock, RETIP);
335	if (atomic_inc_return(&lock->cll_ref) == 1)
336		CS_LOCK_INC(lock->cll_descr.cld_obj, busy);
337}
338EXPORT_SYMBOL(cl_lock_get_trust);
339
340/**
341 * Helper function destroying the lock that wasn't completely initialized.
342 *
343 * Other threads can acquire references to the top-lock through its
344 * sub-locks. Hence, it cannot be cl_lock_free()-ed immediately.
345 */
346static void cl_lock_finish(const struct lu_env *env, struct cl_lock *lock)
347{
348	cl_lock_mutex_get(env, lock);
349	cl_lock_cancel(env, lock);
350	cl_lock_delete(env, lock);
351	cl_lock_mutex_put(env, lock);
352	cl_lock_put(env, lock);
353}
354
355static struct cl_lock *cl_lock_alloc(const struct lu_env *env,
356				     struct cl_object *obj,
357				     const struct cl_io *io,
358				     const struct cl_lock_descr *descr)
359{
360	struct cl_lock	  *lock;
361	struct lu_object_header *head;
362
363	OBD_SLAB_ALLOC_PTR_GFP(lock, cl_lock_kmem, GFP_NOFS);
364	if (lock != NULL) {
365		atomic_set(&lock->cll_ref, 1);
366		lock->cll_descr = *descr;
367		lock->cll_state = CLS_NEW;
368		cl_object_get(obj);
369		lu_object_ref_add_at(&obj->co_lu, &lock->cll_obj_ref, "cl_lock",
370				     lock);
371		INIT_LIST_HEAD(&lock->cll_layers);
372		INIT_LIST_HEAD(&lock->cll_linkage);
373		INIT_LIST_HEAD(&lock->cll_inclosure);
374		lu_ref_init(&lock->cll_reference);
375		lu_ref_init(&lock->cll_holders);
376		mutex_init(&lock->cll_guard);
377		lockdep_set_class(&lock->cll_guard, &cl_lock_guard_class);
378		init_waitqueue_head(&lock->cll_wq);
379		head = obj->co_lu.lo_header;
380		CS_LOCKSTATE_INC(obj, CLS_NEW);
381		CS_LOCK_INC(obj, total);
382		CS_LOCK_INC(obj, create);
383		cl_lock_lockdep_init(lock);
384		list_for_each_entry(obj, &head->loh_layers,
385					co_lu.lo_linkage) {
386			int err;
387
388			err = obj->co_ops->coo_lock_init(env, obj, lock, io);
389			if (err != 0) {
390				cl_lock_finish(env, lock);
391				lock = ERR_PTR(err);
392				break;
393			}
394		}
395	} else
396		lock = ERR_PTR(-ENOMEM);
397	return lock;
398}
399
400/**
401 * Transfer the lock into INTRANSIT state and return the original state.
402 *
403 * \pre  state: CLS_CACHED, CLS_HELD or CLS_ENQUEUED
404 * \post state: CLS_INTRANSIT
405 * \see CLS_INTRANSIT
406 */
407enum cl_lock_state cl_lock_intransit(const struct lu_env *env,
408				     struct cl_lock *lock)
409{
410	enum cl_lock_state state = lock->cll_state;
411
412	LASSERT(cl_lock_is_mutexed(lock));
413	LASSERT(state != CLS_INTRANSIT);
414	LASSERTF(state >= CLS_ENQUEUED && state <= CLS_CACHED,
415		 "Malformed lock state %d.\n", state);
416
417	cl_lock_state_set(env, lock, CLS_INTRANSIT);
418	lock->cll_intransit_owner = current;
419	cl_lock_hold_add(env, lock, "intransit", current);
420	return state;
421}
422EXPORT_SYMBOL(cl_lock_intransit);
423
424/**
425 *  Exit the intransit state and restore the lock state to the original state
426 */
427void cl_lock_extransit(const struct lu_env *env, struct cl_lock *lock,
428		       enum cl_lock_state state)
429{
430	LASSERT(cl_lock_is_mutexed(lock));
431	LASSERT(lock->cll_state == CLS_INTRANSIT);
432	LASSERT(state != CLS_INTRANSIT);
433	LASSERT(lock->cll_intransit_owner == current);
434
435	lock->cll_intransit_owner = NULL;
436	cl_lock_state_set(env, lock, state);
437	cl_lock_unhold(env, lock, "intransit", current);
438}
439EXPORT_SYMBOL(cl_lock_extransit);
440
441/**
442 * Checking whether the lock is intransit state
443 */
444int cl_lock_is_intransit(struct cl_lock *lock)
445{
446	LASSERT(cl_lock_is_mutexed(lock));
447	return lock->cll_state == CLS_INTRANSIT &&
448	       lock->cll_intransit_owner != current;
449}
450EXPORT_SYMBOL(cl_lock_is_intransit);
451/**
452 * Returns true iff lock is "suitable" for given io. E.g., locks acquired by
453 * truncate and O_APPEND cannot be reused for read/non-append-write, as they
454 * cover multiple stripes and can trigger cascading timeouts.
455 */
456static int cl_lock_fits_into(const struct lu_env *env,
457			     const struct cl_lock *lock,
458			     const struct cl_lock_descr *need,
459			     const struct cl_io *io)
460{
461	const struct cl_lock_slice *slice;
462
463	LINVRNT(cl_lock_invariant_trusted(env, lock));
464	list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
465		if (slice->cls_ops->clo_fits_into != NULL &&
466		    !slice->cls_ops->clo_fits_into(env, slice, need, io))
467			return 0;
468	}
469	return 1;
470}
471
472static struct cl_lock *cl_lock_lookup(const struct lu_env *env,
473				      struct cl_object *obj,
474				      const struct cl_io *io,
475				      const struct cl_lock_descr *need)
476{
477	struct cl_lock	  *lock;
478	struct cl_object_header *head;
479
480	head = cl_object_header(obj);
481	assert_spin_locked(&head->coh_lock_guard);
482	CS_LOCK_INC(obj, lookup);
483	list_for_each_entry(lock, &head->coh_locks, cll_linkage) {
484		int matched;
485
486		matched = cl_lock_ext_match(&lock->cll_descr, need) &&
487			  lock->cll_state < CLS_FREEING &&
488			  lock->cll_error == 0 &&
489			  !(lock->cll_flags & CLF_CANCELLED) &&
490			  cl_lock_fits_into(env, lock, need, io);
491		CDEBUG(D_DLMTRACE, "has: "DDESCR"(%d) need: "DDESCR": %d\n",
492		       PDESCR(&lock->cll_descr), lock->cll_state, PDESCR(need),
493		       matched);
494		if (matched) {
495			cl_lock_get_trust(lock);
496			CS_LOCK_INC(obj, hit);
497			return lock;
498		}
499	}
500	return NULL;
501}
502
503/**
504 * Returns a lock matching description \a need.
505 *
506 * This is the main entry point into the cl_lock caching interface. First, a
507 * cache (implemented as a per-object linked list) is consulted. If lock is
508 * found there, it is returned immediately. Otherwise new lock is allocated
509 * and returned. In any case, additional reference to lock is acquired.
510 *
511 * \see cl_object_find(), cl_page_find()
512 */
513static struct cl_lock *cl_lock_find(const struct lu_env *env,
514				    const struct cl_io *io,
515				    const struct cl_lock_descr *need)
516{
517	struct cl_object_header *head;
518	struct cl_object	*obj;
519	struct cl_lock	  *lock;
520
521	obj  = need->cld_obj;
522	head = cl_object_header(obj);
523
524	spin_lock(&head->coh_lock_guard);
525	lock = cl_lock_lookup(env, obj, io, need);
526	spin_unlock(&head->coh_lock_guard);
527
528	if (lock == NULL) {
529		lock = cl_lock_alloc(env, obj, io, need);
530		if (!IS_ERR(lock)) {
531			struct cl_lock *ghost;
532
533			spin_lock(&head->coh_lock_guard);
534			ghost = cl_lock_lookup(env, obj, io, need);
535			if (ghost == NULL) {
536				cl_lock_get_trust(lock);
537				list_add_tail(&lock->cll_linkage,
538						  &head->coh_locks);
539				spin_unlock(&head->coh_lock_guard);
540				CS_LOCK_INC(obj, busy);
541			} else {
542				spin_unlock(&head->coh_lock_guard);
543				/*
544				 * Other threads can acquire references to the
545				 * top-lock through its sub-locks. Hence, it
546				 * cannot be cl_lock_free()-ed immediately.
547				 */
548				cl_lock_finish(env, lock);
549				lock = ghost;
550			}
551		}
552	}
553	return lock;
554}
555
556/**
557 * Returns existing lock matching given description. This is similar to
558 * cl_lock_find() except that no new lock is created, and returned lock is
559 * guaranteed to be in enum cl_lock_state::CLS_HELD state.
560 */
561struct cl_lock *cl_lock_peek(const struct lu_env *env, const struct cl_io *io,
562			     const struct cl_lock_descr *need,
563			     const char *scope, const void *source)
564{
565	struct cl_object_header *head;
566	struct cl_object	*obj;
567	struct cl_lock	  *lock;
568
569	obj  = need->cld_obj;
570	head = cl_object_header(obj);
571
572	do {
573		spin_lock(&head->coh_lock_guard);
574		lock = cl_lock_lookup(env, obj, io, need);
575		spin_unlock(&head->coh_lock_guard);
576		if (lock == NULL)
577			return NULL;
578
579		cl_lock_mutex_get(env, lock);
580		if (lock->cll_state == CLS_INTRANSIT)
581			/* Don't care return value. */
582			cl_lock_state_wait(env, lock);
583		if (lock->cll_state == CLS_FREEING) {
584			cl_lock_mutex_put(env, lock);
585			cl_lock_put(env, lock);
586			lock = NULL;
587		}
588	} while (lock == NULL);
589
590	cl_lock_hold_add(env, lock, scope, source);
591	cl_lock_user_add(env, lock);
592	if (lock->cll_state == CLS_CACHED)
593		cl_use_try(env, lock, 1);
594	if (lock->cll_state == CLS_HELD) {
595		cl_lock_mutex_put(env, lock);
596		cl_lock_lockdep_acquire(env, lock, 0);
597		cl_lock_put(env, lock);
598	} else {
599		cl_unuse_try(env, lock);
600		cl_lock_unhold(env, lock, scope, source);
601		cl_lock_mutex_put(env, lock);
602		cl_lock_put(env, lock);
603		lock = NULL;
604	}
605
606	return lock;
607}
608EXPORT_SYMBOL(cl_lock_peek);
609
610/**
611 * Returns a slice within a lock, corresponding to the given layer in the
612 * device stack.
613 *
614 * \see cl_page_at()
615 */
616const struct cl_lock_slice *cl_lock_at(const struct cl_lock *lock,
617				       const struct lu_device_type *dtype)
618{
619	const struct cl_lock_slice *slice;
620
621	LINVRNT(cl_lock_invariant_trusted(NULL, lock));
622
623	list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
624		if (slice->cls_obj->co_lu.lo_dev->ld_type == dtype)
625			return slice;
626	}
627	return NULL;
628}
629EXPORT_SYMBOL(cl_lock_at);
630
631static void cl_lock_mutex_tail(const struct lu_env *env, struct cl_lock *lock)
632{
633	struct cl_thread_counters *counters;
634
635	counters = cl_lock_counters(env, lock);
636	lock->cll_depth++;
637	counters->ctc_nr_locks_locked++;
638	lu_ref_add(&counters->ctc_locks_locked, "cll_guard", lock);
639	cl_lock_trace(D_TRACE, env, "got mutex", lock);
640}
641
642/**
643 * Locks cl_lock object.
644 *
645 * This is used to manipulate cl_lock fields, and to serialize state
646 * transitions in the lock state machine.
647 *
648 * \post cl_lock_is_mutexed(lock)
649 *
650 * \see cl_lock_mutex_put()
651 */
652void cl_lock_mutex_get(const struct lu_env *env, struct cl_lock *lock)
653{
654	LINVRNT(cl_lock_invariant(env, lock));
655
656	if (lock->cll_guarder == current) {
657		LINVRNT(cl_lock_is_mutexed(lock));
658		LINVRNT(lock->cll_depth > 0);
659	} else {
660		struct cl_object_header *hdr;
661		struct cl_thread_info   *info;
662		int i;
663
664		LINVRNT(lock->cll_guarder != current);
665		hdr = cl_object_header(lock->cll_descr.cld_obj);
666		/*
667		 * Check that mutices are taken in the bottom-to-top order.
668		 */
669		info = cl_env_info(env);
670		for (i = 0; i < hdr->coh_nesting; ++i)
671			LASSERT(info->clt_counters[i].ctc_nr_locks_locked == 0);
672		mutex_lock_nested(&lock->cll_guard, hdr->coh_nesting);
673		lock->cll_guarder = current;
674		LINVRNT(lock->cll_depth == 0);
675	}
676	cl_lock_mutex_tail(env, lock);
677}
678EXPORT_SYMBOL(cl_lock_mutex_get);
679
680/**
681 * Try-locks cl_lock object.
682 *
683 * \retval 0 \a lock was successfully locked
684 *
685 * \retval -EBUSY \a lock cannot be locked right now
686 *
687 * \post ergo(result == 0, cl_lock_is_mutexed(lock))
688 *
689 * \see cl_lock_mutex_get()
690 */
691int cl_lock_mutex_try(const struct lu_env *env, struct cl_lock *lock)
692{
693	int result;
694
695	LINVRNT(cl_lock_invariant_trusted(env, lock));
696
697	result = 0;
698	if (lock->cll_guarder == current) {
699		LINVRNT(lock->cll_depth > 0);
700		cl_lock_mutex_tail(env, lock);
701	} else if (mutex_trylock(&lock->cll_guard)) {
702		LINVRNT(lock->cll_depth == 0);
703		lock->cll_guarder = current;
704		cl_lock_mutex_tail(env, lock);
705	} else
706		result = -EBUSY;
707	return result;
708}
709EXPORT_SYMBOL(cl_lock_mutex_try);
710
711/**
712 {* Unlocks cl_lock object.
713 *
714 * \pre cl_lock_is_mutexed(lock)
715 *
716 * \see cl_lock_mutex_get()
717 */
718void cl_lock_mutex_put(const struct lu_env *env, struct cl_lock *lock)
719{
720	struct cl_thread_counters *counters;
721
722	LINVRNT(cl_lock_invariant(env, lock));
723	LINVRNT(cl_lock_is_mutexed(lock));
724	LINVRNT(lock->cll_guarder == current);
725	LINVRNT(lock->cll_depth > 0);
726
727	counters = cl_lock_counters(env, lock);
728	LINVRNT(counters->ctc_nr_locks_locked > 0);
729
730	cl_lock_trace(D_TRACE, env, "put mutex", lock);
731	lu_ref_del(&counters->ctc_locks_locked, "cll_guard", lock);
732	counters->ctc_nr_locks_locked--;
733	if (--lock->cll_depth == 0) {
734		lock->cll_guarder = NULL;
735		mutex_unlock(&lock->cll_guard);
736	}
737}
738EXPORT_SYMBOL(cl_lock_mutex_put);
739
740/**
741 * Returns true iff lock's mutex is owned by the current thread.
742 */
743int cl_lock_is_mutexed(struct cl_lock *lock)
744{
745	return lock->cll_guarder == current;
746}
747EXPORT_SYMBOL(cl_lock_is_mutexed);
748
749/**
750 * Returns number of cl_lock mutices held by the current thread (environment).
751 */
752int cl_lock_nr_mutexed(const struct lu_env *env)
753{
754	struct cl_thread_info *info;
755	int i;
756	int locked;
757
758	/*
759	 * NOTE: if summation across all nesting levels (currently 2) proves
760	 *       too expensive, a summary counter can be added to
761	 *       struct cl_thread_info.
762	 */
763	info = cl_env_info(env);
764	for (i = 0, locked = 0; i < ARRAY_SIZE(info->clt_counters); ++i)
765		locked += info->clt_counters[i].ctc_nr_locks_locked;
766	return locked;
767}
768EXPORT_SYMBOL(cl_lock_nr_mutexed);
769
770static void cl_lock_cancel0(const struct lu_env *env, struct cl_lock *lock)
771{
772	LINVRNT(cl_lock_is_mutexed(lock));
773	LINVRNT(cl_lock_invariant(env, lock));
774	if (!(lock->cll_flags & CLF_CANCELLED)) {
775		const struct cl_lock_slice *slice;
776
777		lock->cll_flags |= CLF_CANCELLED;
778		list_for_each_entry_reverse(slice, &lock->cll_layers,
779						cls_linkage) {
780			if (slice->cls_ops->clo_cancel != NULL)
781				slice->cls_ops->clo_cancel(env, slice);
782		}
783	}
784}
785
786static void cl_lock_delete0(const struct lu_env *env, struct cl_lock *lock)
787{
788	struct cl_object_header    *head;
789	const struct cl_lock_slice *slice;
790
791	LINVRNT(cl_lock_is_mutexed(lock));
792	LINVRNT(cl_lock_invariant(env, lock));
793
794	if (lock->cll_state < CLS_FREEING) {
795		bool in_cache;
796
797		LASSERT(lock->cll_state != CLS_INTRANSIT);
798		cl_lock_state_set(env, lock, CLS_FREEING);
799
800		head = cl_object_header(lock->cll_descr.cld_obj);
801
802		spin_lock(&head->coh_lock_guard);
803		in_cache = !list_empty(&lock->cll_linkage);
804		if (in_cache)
805			list_del_init(&lock->cll_linkage);
806		spin_unlock(&head->coh_lock_guard);
807
808		if (in_cache) /* coh_locks cache holds a refcount. */
809			cl_lock_put(env, lock);
810
811		/*
812		 * From now on, no new references to this lock can be acquired
813		 * by cl_lock_lookup().
814		 */
815		list_for_each_entry_reverse(slice, &lock->cll_layers,
816						cls_linkage) {
817			if (slice->cls_ops->clo_delete != NULL)
818				slice->cls_ops->clo_delete(env, slice);
819		}
820		/*
821		 * From now on, no new references to this lock can be acquired
822		 * by layer-specific means (like a pointer from struct
823		 * ldlm_lock in osc, or a pointer from top-lock to sub-lock in
824		 * lov).
825		 *
826		 * Lock will be finally freed in cl_lock_put() when last of
827		 * existing references goes away.
828		 */
829	}
830}
831
832/**
833 * Mod(ifie)s cl_lock::cll_holds counter for a given lock. Also, for a
834 * top-lock (nesting == 0) accounts for this modification in the per-thread
835 * debugging counters. Sub-lock holds can be released by a thread different
836 * from one that acquired it.
837 */
838static void cl_lock_hold_mod(const struct lu_env *env, struct cl_lock *lock,
839			     int delta)
840{
841	struct cl_thread_counters *counters;
842	enum clt_nesting_level     nesting;
843
844	lock->cll_holds += delta;
845	nesting = cl_lock_nesting(lock);
846	if (nesting == CNL_TOP) {
847		counters = &cl_env_info(env)->clt_counters[CNL_TOP];
848		counters->ctc_nr_held += delta;
849		LASSERT(counters->ctc_nr_held >= 0);
850	}
851}
852
853/**
854 * Mod(ifie)s cl_lock::cll_users counter for a given lock. See
855 * cl_lock_hold_mod() for the explanation of the debugging code.
856 */
857static void cl_lock_used_mod(const struct lu_env *env, struct cl_lock *lock,
858			     int delta)
859{
860	struct cl_thread_counters *counters;
861	enum clt_nesting_level     nesting;
862
863	lock->cll_users += delta;
864	nesting = cl_lock_nesting(lock);
865	if (nesting == CNL_TOP) {
866		counters = &cl_env_info(env)->clt_counters[CNL_TOP];
867		counters->ctc_nr_used += delta;
868		LASSERT(counters->ctc_nr_used >= 0);
869	}
870}
871
872void cl_lock_hold_release(const struct lu_env *env, struct cl_lock *lock,
873			  const char *scope, const void *source)
874{
875	LINVRNT(cl_lock_is_mutexed(lock));
876	LINVRNT(cl_lock_invariant(env, lock));
877	LASSERT(lock->cll_holds > 0);
878
879	cl_lock_trace(D_DLMTRACE, env, "hold release lock", lock);
880	lu_ref_del(&lock->cll_holders, scope, source);
881	cl_lock_hold_mod(env, lock, -1);
882	if (lock->cll_holds == 0) {
883		CL_LOCK_ASSERT(lock->cll_state != CLS_HELD, env, lock);
884		if (lock->cll_descr.cld_mode == CLM_PHANTOM ||
885		    lock->cll_descr.cld_mode == CLM_GROUP ||
886		    lock->cll_state != CLS_CACHED)
887			/*
888			 * If lock is still phantom or grouplock when user is
889			 * done with it---destroy the lock.
890			 */
891			lock->cll_flags |= CLF_CANCELPEND|CLF_DOOMED;
892		if (lock->cll_flags & CLF_CANCELPEND) {
893			lock->cll_flags &= ~CLF_CANCELPEND;
894			cl_lock_cancel0(env, lock);
895		}
896		if (lock->cll_flags & CLF_DOOMED) {
897			/* no longer doomed: it's dead... Jim. */
898			lock->cll_flags &= ~CLF_DOOMED;
899			cl_lock_delete0(env, lock);
900		}
901	}
902}
903EXPORT_SYMBOL(cl_lock_hold_release);
904
905/**
906 * Waits until lock state is changed.
907 *
908 * This function is called with cl_lock mutex locked, atomically releases
909 * mutex and goes to sleep, waiting for a lock state change (signaled by
910 * cl_lock_signal()), and re-acquires the mutex before return.
911 *
912 * This function is used to wait until lock state machine makes some progress
913 * and to emulate synchronous operations on top of asynchronous lock
914 * interface.
915 *
916 * \retval -EINTR wait was interrupted
917 *
918 * \retval 0 wait wasn't interrupted
919 *
920 * \pre cl_lock_is_mutexed(lock)
921 *
922 * \see cl_lock_signal()
923 */
924int cl_lock_state_wait(const struct lu_env *env, struct cl_lock *lock)
925{
926	wait_queue_t waiter;
927	sigset_t blocked;
928	int result;
929
930	LINVRNT(cl_lock_is_mutexed(lock));
931	LINVRNT(cl_lock_invariant(env, lock));
932	LASSERT(lock->cll_depth == 1);
933	LASSERT(lock->cll_state != CLS_FREEING); /* too late to wait */
934
935	cl_lock_trace(D_DLMTRACE, env, "state wait lock", lock);
936	result = lock->cll_error;
937	if (result == 0) {
938		/* To avoid being interrupted by the 'non-fatal' signals
939		 * (SIGCHLD, for instance), we'd block them temporarily.
940		 * LU-305 */
941		blocked = cfs_block_sigsinv(LUSTRE_FATAL_SIGS);
942
943		init_waitqueue_entry(&waiter, current);
944		add_wait_queue(&lock->cll_wq, &waiter);
945		set_current_state(TASK_INTERRUPTIBLE);
946		cl_lock_mutex_put(env, lock);
947
948		LASSERT(cl_lock_nr_mutexed(env) == 0);
949
950		/* Returning ERESTARTSYS instead of EINTR so syscalls
951		 * can be restarted if signals are pending here */
952		result = -ERESTARTSYS;
953		if (likely(!OBD_FAIL_CHECK(OBD_FAIL_LOCK_STATE_WAIT_INTR))) {
954			schedule();
955			if (!cfs_signal_pending())
956				result = 0;
957		}
958
959		cl_lock_mutex_get(env, lock);
960		set_current_state(TASK_RUNNING);
961		remove_wait_queue(&lock->cll_wq, &waiter);
962
963		/* Restore old blocked signals */
964		cfs_restore_sigs(blocked);
965	}
966	return result;
967}
968EXPORT_SYMBOL(cl_lock_state_wait);
969
970static void cl_lock_state_signal(const struct lu_env *env, struct cl_lock *lock,
971				 enum cl_lock_state state)
972{
973	const struct cl_lock_slice *slice;
974
975	LINVRNT(cl_lock_is_mutexed(lock));
976	LINVRNT(cl_lock_invariant(env, lock));
977
978	list_for_each_entry(slice, &lock->cll_layers, cls_linkage)
979		if (slice->cls_ops->clo_state != NULL)
980			slice->cls_ops->clo_state(env, slice, state);
981	wake_up_all(&lock->cll_wq);
982}
983
984/**
985 * Notifies waiters that lock state changed.
986 *
987 * Wakes up all waiters sleeping in cl_lock_state_wait(), also notifies all
988 * layers about state change by calling cl_lock_operations::clo_state()
989 * top-to-bottom.
990 */
991void cl_lock_signal(const struct lu_env *env, struct cl_lock *lock)
992{
993	cl_lock_trace(D_DLMTRACE, env, "state signal lock", lock);
994	cl_lock_state_signal(env, lock, lock->cll_state);
995}
996EXPORT_SYMBOL(cl_lock_signal);
997
998/**
999 * Changes lock state.
1000 *
1001 * This function is invoked to notify layers that lock state changed, possible
1002 * as a result of an asynchronous event such as call-back reception.
1003 *
1004 * \post lock->cll_state == state
1005 *
1006 * \see cl_lock_operations::clo_state()
1007 */
1008void cl_lock_state_set(const struct lu_env *env, struct cl_lock *lock,
1009		       enum cl_lock_state state)
1010{
1011	LASSERT(lock->cll_state <= state ||
1012		(lock->cll_state == CLS_CACHED &&
1013		 (state == CLS_HELD || /* lock found in cache */
1014		  state == CLS_NEW  ||   /* sub-lock canceled */
1015		  state == CLS_INTRANSIT)) ||
1016		/* lock is in transit state */
1017		lock->cll_state == CLS_INTRANSIT);
1018
1019	if (lock->cll_state != state) {
1020		CS_LOCKSTATE_DEC(lock->cll_descr.cld_obj, lock->cll_state);
1021		CS_LOCKSTATE_INC(lock->cll_descr.cld_obj, state);
1022
1023		cl_lock_state_signal(env, lock, state);
1024		lock->cll_state = state;
1025	}
1026}
1027EXPORT_SYMBOL(cl_lock_state_set);
1028
1029static int cl_unuse_try_internal(const struct lu_env *env, struct cl_lock *lock)
1030{
1031	const struct cl_lock_slice *slice;
1032	int result;
1033
1034	do {
1035		result = 0;
1036
1037		LINVRNT(cl_lock_is_mutexed(lock));
1038		LINVRNT(cl_lock_invariant(env, lock));
1039		LASSERT(lock->cll_state == CLS_INTRANSIT);
1040
1041		result = -ENOSYS;
1042		list_for_each_entry_reverse(slice, &lock->cll_layers,
1043						cls_linkage) {
1044			if (slice->cls_ops->clo_unuse != NULL) {
1045				result = slice->cls_ops->clo_unuse(env, slice);
1046				if (result != 0)
1047					break;
1048			}
1049		}
1050		LASSERT(result != -ENOSYS);
1051	} while (result == CLO_REPEAT);
1052
1053	return result;
1054}
1055
1056/**
1057 * Yanks lock from the cache (cl_lock_state::CLS_CACHED state) by calling
1058 * cl_lock_operations::clo_use() top-to-bottom to notify layers.
1059 * @atomic = 1, it must unuse the lock to recovery the lock to keep the
1060 *  use process atomic
1061 */
1062int cl_use_try(const struct lu_env *env, struct cl_lock *lock, int atomic)
1063{
1064	const struct cl_lock_slice *slice;
1065	int result;
1066	enum cl_lock_state state;
1067
1068	cl_lock_trace(D_DLMTRACE, env, "use lock", lock);
1069
1070	LASSERT(lock->cll_state == CLS_CACHED);
1071	if (lock->cll_error)
1072		return lock->cll_error;
1073
1074	result = -ENOSYS;
1075	state = cl_lock_intransit(env, lock);
1076	list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
1077		if (slice->cls_ops->clo_use != NULL) {
1078			result = slice->cls_ops->clo_use(env, slice);
1079			if (result != 0)
1080				break;
1081		}
1082	}
1083	LASSERT(result != -ENOSYS);
1084
1085	LASSERTF(lock->cll_state == CLS_INTRANSIT, "Wrong state %d.\n",
1086		 lock->cll_state);
1087
1088	if (result == 0) {
1089		state = CLS_HELD;
1090	} else {
1091		if (result == -ESTALE) {
1092			/*
1093			 * ESTALE means sublock being cancelled
1094			 * at this time, and set lock state to
1095			 * be NEW here and ask the caller to repeat.
1096			 */
1097			state = CLS_NEW;
1098			result = CLO_REPEAT;
1099		}
1100
1101		/* @atomic means back-off-on-failure. */
1102		if (atomic) {
1103			int rc;
1104			rc = cl_unuse_try_internal(env, lock);
1105			/* Vet the results. */
1106			if (rc < 0 && result > 0)
1107				result = rc;
1108		}
1109
1110	}
1111	cl_lock_extransit(env, lock, state);
1112	return result;
1113}
1114EXPORT_SYMBOL(cl_use_try);
1115
1116/**
1117 * Helper for cl_enqueue_try() that calls ->clo_enqueue() across all layers
1118 * top-to-bottom.
1119 */
1120static int cl_enqueue_kick(const struct lu_env *env,
1121			   struct cl_lock *lock,
1122			   struct cl_io *io, __u32 flags)
1123{
1124	int result;
1125	const struct cl_lock_slice *slice;
1126
1127	result = -ENOSYS;
1128	list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
1129		if (slice->cls_ops->clo_enqueue != NULL) {
1130			result = slice->cls_ops->clo_enqueue(env,
1131							     slice, io, flags);
1132			if (result != 0)
1133				break;
1134		}
1135	}
1136	LASSERT(result != -ENOSYS);
1137	return result;
1138}
1139
1140/**
1141 * Tries to enqueue a lock.
1142 *
1143 * This function is called repeatedly by cl_enqueue() until either lock is
1144 * enqueued, or error occurs. This function does not block waiting for
1145 * networking communication to complete.
1146 *
1147 * \post ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
1148 *			 lock->cll_state == CLS_HELD)
1149 *
1150 * \see cl_enqueue() cl_lock_operations::clo_enqueue()
1151 * \see cl_lock_state::CLS_ENQUEUED
1152 */
1153int cl_enqueue_try(const struct lu_env *env, struct cl_lock *lock,
1154		   struct cl_io *io, __u32 flags)
1155{
1156	int result;
1157
1158	cl_lock_trace(D_DLMTRACE, env, "enqueue lock", lock);
1159	do {
1160		LINVRNT(cl_lock_is_mutexed(lock));
1161
1162		result = lock->cll_error;
1163		if (result != 0)
1164			break;
1165
1166		switch (lock->cll_state) {
1167		case CLS_NEW:
1168			cl_lock_state_set(env, lock, CLS_QUEUING);
1169			/* fall-through */
1170		case CLS_QUEUING:
1171			/* kick layers. */
1172			result = cl_enqueue_kick(env, lock, io, flags);
1173			/* For AGL case, the cl_lock::cll_state may
1174			 * become CLS_HELD already. */
1175			if (result == 0 && lock->cll_state == CLS_QUEUING)
1176				cl_lock_state_set(env, lock, CLS_ENQUEUED);
1177			break;
1178		case CLS_INTRANSIT:
1179			LASSERT(cl_lock_is_intransit(lock));
1180			result = CLO_WAIT;
1181			break;
1182		case CLS_CACHED:
1183			/* yank lock from the cache. */
1184			result = cl_use_try(env, lock, 0);
1185			break;
1186		case CLS_ENQUEUED:
1187		case CLS_HELD:
1188			result = 0;
1189			break;
1190		default:
1191		case CLS_FREEING:
1192			/*
1193			 * impossible, only held locks with increased
1194			 * ->cll_holds can be enqueued, and they cannot be
1195			 * freed.
1196			 */
1197			LBUG();
1198		}
1199	} while (result == CLO_REPEAT);
1200	return result;
1201}
1202EXPORT_SYMBOL(cl_enqueue_try);
1203
1204/**
1205 * Cancel the conflicting lock found during previous enqueue.
1206 *
1207 * \retval 0 conflicting lock has been canceled.
1208 * \retval -ve error code.
1209 */
1210int cl_lock_enqueue_wait(const struct lu_env *env,
1211			 struct cl_lock *lock,
1212			 int keep_mutex)
1213{
1214	struct cl_lock  *conflict;
1215	int	      rc = 0;
1216
1217	LASSERT(cl_lock_is_mutexed(lock));
1218	LASSERT(lock->cll_state == CLS_QUEUING);
1219	LASSERT(lock->cll_conflict != NULL);
1220
1221	conflict = lock->cll_conflict;
1222	lock->cll_conflict = NULL;
1223
1224	cl_lock_mutex_put(env, lock);
1225	LASSERT(cl_lock_nr_mutexed(env) == 0);
1226
1227	cl_lock_mutex_get(env, conflict);
1228	cl_lock_trace(D_DLMTRACE, env, "enqueue wait", conflict);
1229	cl_lock_cancel(env, conflict);
1230	cl_lock_delete(env, conflict);
1231
1232	while (conflict->cll_state != CLS_FREEING) {
1233		rc = cl_lock_state_wait(env, conflict);
1234		if (rc != 0)
1235			break;
1236	}
1237	cl_lock_mutex_put(env, conflict);
1238	lu_ref_del(&conflict->cll_reference, "cancel-wait", lock);
1239	cl_lock_put(env, conflict);
1240
1241	if (keep_mutex)
1242		cl_lock_mutex_get(env, lock);
1243
1244	LASSERT(rc <= 0);
1245	return rc;
1246}
1247EXPORT_SYMBOL(cl_lock_enqueue_wait);
1248
1249static int cl_enqueue_locked(const struct lu_env *env, struct cl_lock *lock,
1250			     struct cl_io *io, __u32 enqflags)
1251{
1252	int result;
1253
1254	LINVRNT(cl_lock_is_mutexed(lock));
1255	LINVRNT(cl_lock_invariant(env, lock));
1256	LASSERT(lock->cll_holds > 0);
1257
1258	cl_lock_user_add(env, lock);
1259	do {
1260		result = cl_enqueue_try(env, lock, io, enqflags);
1261		if (result == CLO_WAIT) {
1262			if (lock->cll_conflict != NULL)
1263				result = cl_lock_enqueue_wait(env, lock, 1);
1264			else
1265				result = cl_lock_state_wait(env, lock);
1266			if (result == 0)
1267				continue;
1268		}
1269		break;
1270	} while (1);
1271	if (result != 0)
1272		cl_unuse_try(env, lock);
1273	LASSERT(ergo(result == 0 && !(enqflags & CEF_AGL),
1274		     lock->cll_state == CLS_ENQUEUED ||
1275		     lock->cll_state == CLS_HELD));
1276	return result;
1277}
1278
1279/**
1280 * Enqueues a lock.
1281 *
1282 * \pre current thread or io owns a hold on lock.
1283 *
1284 * \post ergo(result == 0, lock->users increased)
1285 * \post ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
1286 *			 lock->cll_state == CLS_HELD)
1287 */
1288int cl_enqueue(const struct lu_env *env, struct cl_lock *lock,
1289	       struct cl_io *io, __u32 enqflags)
1290{
1291	int result;
1292
1293	cl_lock_lockdep_acquire(env, lock, enqflags);
1294	cl_lock_mutex_get(env, lock);
1295	result = cl_enqueue_locked(env, lock, io, enqflags);
1296	cl_lock_mutex_put(env, lock);
1297	if (result != 0)
1298		cl_lock_lockdep_release(env, lock);
1299	LASSERT(ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
1300		     lock->cll_state == CLS_HELD));
1301	return result;
1302}
1303EXPORT_SYMBOL(cl_enqueue);
1304
1305/**
1306 * Tries to unlock a lock.
1307 *
1308 * This function is called to release underlying resource:
1309 * 1. for top lock, the resource is sublocks it held;
1310 * 2. for sublock, the resource is the reference to dlmlock.
1311 *
1312 * cl_unuse_try is a one-shot operation, so it must NOT return CLO_WAIT.
1313 *
1314 * \see cl_unuse() cl_lock_operations::clo_unuse()
1315 * \see cl_lock_state::CLS_CACHED
1316 */
1317int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock)
1318{
1319	int			 result;
1320	enum cl_lock_state	  state = CLS_NEW;
1321
1322	cl_lock_trace(D_DLMTRACE, env, "unuse lock", lock);
1323
1324	if (lock->cll_users > 1) {
1325		cl_lock_user_del(env, lock);
1326		return 0;
1327	}
1328
1329	/* Only if the lock is in CLS_HELD or CLS_ENQUEUED state, it can hold
1330	 * underlying resources. */
1331	if (!(lock->cll_state == CLS_HELD || lock->cll_state == CLS_ENQUEUED)) {
1332		cl_lock_user_del(env, lock);
1333		return 0;
1334	}
1335
1336	/*
1337	 * New lock users (->cll_users) are not protecting unlocking
1338	 * from proceeding. From this point, lock eventually reaches
1339	 * CLS_CACHED, is reinitialized to CLS_NEW or fails into
1340	 * CLS_FREEING.
1341	 */
1342	state = cl_lock_intransit(env, lock);
1343
1344	result = cl_unuse_try_internal(env, lock);
1345	LASSERT(lock->cll_state == CLS_INTRANSIT);
1346	LASSERT(result != CLO_WAIT);
1347	cl_lock_user_del(env, lock);
1348	if (result == 0 || result == -ESTALE) {
1349		/*
1350		 * Return lock back to the cache. This is the only
1351		 * place where lock is moved into CLS_CACHED state.
1352		 *
1353		 * If one of ->clo_unuse() methods returned -ESTALE, lock
1354		 * cannot be placed into cache and has to be
1355		 * re-initialized. This happens e.g., when a sub-lock was
1356		 * canceled while unlocking was in progress.
1357		 */
1358		if (state == CLS_HELD && result == 0)
1359			state = CLS_CACHED;
1360		else
1361			state = CLS_NEW;
1362		cl_lock_extransit(env, lock, state);
1363
1364		/*
1365		 * Hide -ESTALE error.
1366		 * If the lock is a glimpse lock, and it has multiple
1367		 * stripes. Assuming that one of its sublock returned -ENAVAIL,
1368		 * and other sublocks are matched write locks. In this case,
1369		 * we can't set this lock to error because otherwise some of
1370		 * its sublocks may not be canceled. This causes some dirty
1371		 * pages won't be written to OSTs. -jay
1372		 */
1373		result = 0;
1374	} else {
1375		CERROR("result = %d, this is unlikely!\n", result);
1376		state = CLS_NEW;
1377		cl_lock_extransit(env, lock, state);
1378	}
1379	return result ?: lock->cll_error;
1380}
1381EXPORT_SYMBOL(cl_unuse_try);
1382
1383static void cl_unuse_locked(const struct lu_env *env, struct cl_lock *lock)
1384{
1385	int result;
1386
1387	result = cl_unuse_try(env, lock);
1388	if (result)
1389		CL_LOCK_DEBUG(D_ERROR, env, lock, "unuse return %d\n", result);
1390}
1391
1392/**
1393 * Unlocks a lock.
1394 */
1395void cl_unuse(const struct lu_env *env, struct cl_lock *lock)
1396{
1397	cl_lock_mutex_get(env, lock);
1398	cl_unuse_locked(env, lock);
1399	cl_lock_mutex_put(env, lock);
1400	cl_lock_lockdep_release(env, lock);
1401}
1402EXPORT_SYMBOL(cl_unuse);
1403
1404/**
1405 * Tries to wait for a lock.
1406 *
1407 * This function is called repeatedly by cl_wait() until either lock is
1408 * granted, or error occurs. This function does not block waiting for network
1409 * communication to complete.
1410 *
1411 * \see cl_wait() cl_lock_operations::clo_wait()
1412 * \see cl_lock_state::CLS_HELD
1413 */
1414int cl_wait_try(const struct lu_env *env, struct cl_lock *lock)
1415{
1416	const struct cl_lock_slice *slice;
1417	int			 result;
1418
1419	cl_lock_trace(D_DLMTRACE, env, "wait lock try", lock);
1420	do {
1421		LINVRNT(cl_lock_is_mutexed(lock));
1422		LINVRNT(cl_lock_invariant(env, lock));
1423		LASSERTF(lock->cll_state == CLS_QUEUING ||
1424			 lock->cll_state == CLS_ENQUEUED ||
1425			 lock->cll_state == CLS_HELD ||
1426			 lock->cll_state == CLS_INTRANSIT,
1427			 "lock state: %d\n", lock->cll_state);
1428		LASSERT(lock->cll_users > 0);
1429		LASSERT(lock->cll_holds > 0);
1430
1431		result = lock->cll_error;
1432		if (result != 0)
1433			break;
1434
1435		if (cl_lock_is_intransit(lock)) {
1436			result = CLO_WAIT;
1437			break;
1438		}
1439
1440		if (lock->cll_state == CLS_HELD)
1441			/* nothing to do */
1442			break;
1443
1444		result = -ENOSYS;
1445		list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
1446			if (slice->cls_ops->clo_wait != NULL) {
1447				result = slice->cls_ops->clo_wait(env, slice);
1448				if (result != 0)
1449					break;
1450			}
1451		}
1452		LASSERT(result != -ENOSYS);
1453		if (result == 0) {
1454			LASSERT(lock->cll_state != CLS_INTRANSIT);
1455			cl_lock_state_set(env, lock, CLS_HELD);
1456		}
1457	} while (result == CLO_REPEAT);
1458	return result;
1459}
1460EXPORT_SYMBOL(cl_wait_try);
1461
1462/**
1463 * Waits until enqueued lock is granted.
1464 *
1465 * \pre current thread or io owns a hold on the lock
1466 * \pre ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
1467 *			lock->cll_state == CLS_HELD)
1468 *
1469 * \post ergo(result == 0, lock->cll_state == CLS_HELD)
1470 */
1471int cl_wait(const struct lu_env *env, struct cl_lock *lock)
1472{
1473	int result;
1474
1475	cl_lock_mutex_get(env, lock);
1476
1477	LINVRNT(cl_lock_invariant(env, lock));
1478	LASSERTF(lock->cll_state == CLS_ENQUEUED || lock->cll_state == CLS_HELD,
1479		 "Wrong state %d \n", lock->cll_state);
1480	LASSERT(lock->cll_holds > 0);
1481
1482	do {
1483		result = cl_wait_try(env, lock);
1484		if (result == CLO_WAIT) {
1485			result = cl_lock_state_wait(env, lock);
1486			if (result == 0)
1487				continue;
1488		}
1489		break;
1490	} while (1);
1491	if (result < 0) {
1492		cl_unuse_try(env, lock);
1493		cl_lock_lockdep_release(env, lock);
1494	}
1495	cl_lock_trace(D_DLMTRACE, env, "wait lock", lock);
1496	cl_lock_mutex_put(env, lock);
1497	LASSERT(ergo(result == 0, lock->cll_state == CLS_HELD));
1498	return result;
1499}
1500EXPORT_SYMBOL(cl_wait);
1501
1502/**
1503 * Executes cl_lock_operations::clo_weigh(), and sums results to estimate lock
1504 * value.
1505 */
1506unsigned long cl_lock_weigh(const struct lu_env *env, struct cl_lock *lock)
1507{
1508	const struct cl_lock_slice *slice;
1509	unsigned long pound;
1510	unsigned long ounce;
1511
1512	LINVRNT(cl_lock_is_mutexed(lock));
1513	LINVRNT(cl_lock_invariant(env, lock));
1514
1515	pound = 0;
1516	list_for_each_entry_reverse(slice, &lock->cll_layers, cls_linkage) {
1517		if (slice->cls_ops->clo_weigh != NULL) {
1518			ounce = slice->cls_ops->clo_weigh(env, slice);
1519			pound += ounce;
1520			if (pound < ounce) /* over-weight^Wflow */
1521				pound = ~0UL;
1522		}
1523	}
1524	return pound;
1525}
1526EXPORT_SYMBOL(cl_lock_weigh);
1527
1528/**
1529 * Notifies layers that lock description changed.
1530 *
1531 * The server can grant client a lock different from one that was requested
1532 * (e.g., larger in extent). This method is called when actually granted lock
1533 * description becomes known to let layers to accommodate for changed lock
1534 * description.
1535 *
1536 * \see cl_lock_operations::clo_modify()
1537 */
1538int cl_lock_modify(const struct lu_env *env, struct cl_lock *lock,
1539		   const struct cl_lock_descr *desc)
1540{
1541	const struct cl_lock_slice *slice;
1542	struct cl_object	   *obj = lock->cll_descr.cld_obj;
1543	struct cl_object_header    *hdr = cl_object_header(obj);
1544	int result;
1545
1546	cl_lock_trace(D_DLMTRACE, env, "modify lock", lock);
1547	/* don't allow object to change */
1548	LASSERT(obj == desc->cld_obj);
1549	LINVRNT(cl_lock_is_mutexed(lock));
1550	LINVRNT(cl_lock_invariant(env, lock));
1551
1552	list_for_each_entry_reverse(slice, &lock->cll_layers, cls_linkage) {
1553		if (slice->cls_ops->clo_modify != NULL) {
1554			result = slice->cls_ops->clo_modify(env, slice, desc);
1555			if (result != 0)
1556				return result;
1557		}
1558	}
1559	CL_LOCK_DEBUG(D_DLMTRACE, env, lock, " -> "DDESCR"@"DFID"\n",
1560		      PDESCR(desc), PFID(lu_object_fid(&desc->cld_obj->co_lu)));
1561	/*
1562	 * Just replace description in place. Nothing more is needed for
1563	 * now. If locks were indexed according to their extent and/or mode,
1564	 * that index would have to be updated here.
1565	 */
1566	spin_lock(&hdr->coh_lock_guard);
1567	lock->cll_descr = *desc;
1568	spin_unlock(&hdr->coh_lock_guard);
1569	return 0;
1570}
1571EXPORT_SYMBOL(cl_lock_modify);
1572
1573/**
1574 * Initializes lock closure with a given origin.
1575 *
1576 * \see cl_lock_closure
1577 */
1578void cl_lock_closure_init(const struct lu_env *env,
1579			  struct cl_lock_closure *closure,
1580			  struct cl_lock *origin, int wait)
1581{
1582	LINVRNT(cl_lock_is_mutexed(origin));
1583	LINVRNT(cl_lock_invariant(env, origin));
1584
1585	INIT_LIST_HEAD(&closure->clc_list);
1586	closure->clc_origin = origin;
1587	closure->clc_wait   = wait;
1588	closure->clc_nr     = 0;
1589}
1590EXPORT_SYMBOL(cl_lock_closure_init);
1591
1592/**
1593 * Builds a closure of \a lock.
1594 *
1595 * Building of a closure consists of adding initial lock (\a lock) into it,
1596 * and calling cl_lock_operations::clo_closure() methods of \a lock. These
1597 * methods might call cl_lock_closure_build() recursively again, adding more
1598 * locks to the closure, etc.
1599 *
1600 * \see cl_lock_closure
1601 */
1602int cl_lock_closure_build(const struct lu_env *env, struct cl_lock *lock,
1603			  struct cl_lock_closure *closure)
1604{
1605	const struct cl_lock_slice *slice;
1606	int result;
1607
1608	LINVRNT(cl_lock_is_mutexed(closure->clc_origin));
1609	LINVRNT(cl_lock_invariant(env, closure->clc_origin));
1610
1611	result = cl_lock_enclosure(env, lock, closure);
1612	if (result == 0) {
1613		list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
1614			if (slice->cls_ops->clo_closure != NULL) {
1615				result = slice->cls_ops->clo_closure(env, slice,
1616								     closure);
1617				if (result != 0)
1618					break;
1619			}
1620		}
1621	}
1622	if (result != 0)
1623		cl_lock_disclosure(env, closure);
1624	return result;
1625}
1626EXPORT_SYMBOL(cl_lock_closure_build);
1627
1628/**
1629 * Adds new lock to a closure.
1630 *
1631 * Try-locks \a lock and if succeeded, adds it to the closure (never more than
1632 * once). If try-lock failed, returns CLO_REPEAT, after optionally waiting
1633 * until next try-lock is likely to succeed.
1634 */
1635int cl_lock_enclosure(const struct lu_env *env, struct cl_lock *lock,
1636		      struct cl_lock_closure *closure)
1637{
1638	int result = 0;
1639
1640	cl_lock_trace(D_DLMTRACE, env, "enclosure lock", lock);
1641	if (!cl_lock_mutex_try(env, lock)) {
1642		/*
1643		 * If lock->cll_inclosure is not empty, lock is already in
1644		 * this closure.
1645		 */
1646		if (list_empty(&lock->cll_inclosure)) {
1647			cl_lock_get_trust(lock);
1648			lu_ref_add(&lock->cll_reference, "closure", closure);
1649			list_add(&lock->cll_inclosure, &closure->clc_list);
1650			closure->clc_nr++;
1651		} else
1652			cl_lock_mutex_put(env, lock);
1653		result = 0;
1654	} else {
1655		cl_lock_disclosure(env, closure);
1656		if (closure->clc_wait) {
1657			cl_lock_get_trust(lock);
1658			lu_ref_add(&lock->cll_reference, "closure-w", closure);
1659			cl_lock_mutex_put(env, closure->clc_origin);
1660
1661			LASSERT(cl_lock_nr_mutexed(env) == 0);
1662			cl_lock_mutex_get(env, lock);
1663			cl_lock_mutex_put(env, lock);
1664
1665			cl_lock_mutex_get(env, closure->clc_origin);
1666			lu_ref_del(&lock->cll_reference, "closure-w", closure);
1667			cl_lock_put(env, lock);
1668		}
1669		result = CLO_REPEAT;
1670	}
1671	return result;
1672}
1673EXPORT_SYMBOL(cl_lock_enclosure);
1674
1675/** Releases mutices of enclosed locks. */
1676void cl_lock_disclosure(const struct lu_env *env,
1677			struct cl_lock_closure *closure)
1678{
1679	struct cl_lock *scan;
1680	struct cl_lock *temp;
1681
1682	cl_lock_trace(D_DLMTRACE, env, "disclosure lock", closure->clc_origin);
1683	list_for_each_entry_safe(scan, temp, &closure->clc_list,
1684				     cll_inclosure){
1685		list_del_init(&scan->cll_inclosure);
1686		cl_lock_mutex_put(env, scan);
1687		lu_ref_del(&scan->cll_reference, "closure", closure);
1688		cl_lock_put(env, scan);
1689		closure->clc_nr--;
1690	}
1691	LASSERT(closure->clc_nr == 0);
1692}
1693EXPORT_SYMBOL(cl_lock_disclosure);
1694
1695/** Finalizes a closure. */
1696void cl_lock_closure_fini(struct cl_lock_closure *closure)
1697{
1698	LASSERT(closure->clc_nr == 0);
1699	LASSERT(list_empty(&closure->clc_list));
1700}
1701EXPORT_SYMBOL(cl_lock_closure_fini);
1702
1703/**
1704 * Destroys this lock. Notifies layers (bottom-to-top) that lock is being
1705 * destroyed, then destroy the lock. If there are holds on the lock, postpone
1706 * destruction until all holds are released. This is called when a decision is
1707 * made to destroy the lock in the future. E.g., when a blocking AST is
1708 * received on it, or fatal communication error happens.
1709 *
1710 * Caller must have a reference on this lock to prevent a situation, when
1711 * deleted lock lingers in memory for indefinite time, because nobody calls
1712 * cl_lock_put() to finish it.
1713 *
1714 * \pre atomic_read(&lock->cll_ref) > 0
1715 * \pre ergo(cl_lock_nesting(lock) == CNL_TOP,
1716 *	   cl_lock_nr_mutexed(env) == 1)
1717 *      [i.e., if a top-lock is deleted, mutices of no other locks can be
1718 *      held, as deletion of sub-locks might require releasing a top-lock
1719 *      mutex]
1720 *
1721 * \see cl_lock_operations::clo_delete()
1722 * \see cl_lock::cll_holds
1723 */
1724void cl_lock_delete(const struct lu_env *env, struct cl_lock *lock)
1725{
1726	LINVRNT(cl_lock_is_mutexed(lock));
1727	LINVRNT(cl_lock_invariant(env, lock));
1728	LASSERT(ergo(cl_lock_nesting(lock) == CNL_TOP,
1729		     cl_lock_nr_mutexed(env) == 1));
1730
1731	cl_lock_trace(D_DLMTRACE, env, "delete lock", lock);
1732	if (lock->cll_holds == 0)
1733		cl_lock_delete0(env, lock);
1734	else
1735		lock->cll_flags |= CLF_DOOMED;
1736}
1737EXPORT_SYMBOL(cl_lock_delete);
1738
1739/**
1740 * Mark lock as irrecoverably failed, and mark it for destruction. This
1741 * happens when, e.g., server fails to grant a lock to us, or networking
1742 * time-out happens.
1743 *
1744 * \pre atomic_read(&lock->cll_ref) > 0
1745 *
1746 * \see clo_lock_delete()
1747 * \see cl_lock::cll_holds
1748 */
1749void cl_lock_error(const struct lu_env *env, struct cl_lock *lock, int error)
1750{
1751	LINVRNT(cl_lock_is_mutexed(lock));
1752	LINVRNT(cl_lock_invariant(env, lock));
1753
1754	if (lock->cll_error == 0 && error != 0) {
1755		cl_lock_trace(D_DLMTRACE, env, "set lock error", lock);
1756		lock->cll_error = error;
1757		cl_lock_signal(env, lock);
1758		cl_lock_cancel(env, lock);
1759		cl_lock_delete(env, lock);
1760	}
1761}
1762EXPORT_SYMBOL(cl_lock_error);
1763
1764/**
1765 * Cancels this lock. Notifies layers
1766 * (bottom-to-top) that lock is being cancelled, then destroy the lock. If
1767 * there are holds on the lock, postpone cancellation until
1768 * all holds are released.
1769 *
1770 * Cancellation notification is delivered to layers at most once.
1771 *
1772 * \see cl_lock_operations::clo_cancel()
1773 * \see cl_lock::cll_holds
1774 */
1775void cl_lock_cancel(const struct lu_env *env, struct cl_lock *lock)
1776{
1777	LINVRNT(cl_lock_is_mutexed(lock));
1778	LINVRNT(cl_lock_invariant(env, lock));
1779
1780	cl_lock_trace(D_DLMTRACE, env, "cancel lock", lock);
1781	if (lock->cll_holds == 0)
1782		cl_lock_cancel0(env, lock);
1783	else
1784		lock->cll_flags |= CLF_CANCELPEND;
1785}
1786EXPORT_SYMBOL(cl_lock_cancel);
1787
1788/**
1789 * Finds an existing lock covering given index and optionally different from a
1790 * given \a except lock.
1791 */
1792struct cl_lock *cl_lock_at_pgoff(const struct lu_env *env,
1793				 struct cl_object *obj, pgoff_t index,
1794				 struct cl_lock *except,
1795				 int pending, int canceld)
1796{
1797	struct cl_object_header *head;
1798	struct cl_lock	  *scan;
1799	struct cl_lock	  *lock;
1800	struct cl_lock_descr    *need;
1801
1802	head = cl_object_header(obj);
1803	need = &cl_env_info(env)->clt_descr;
1804	lock = NULL;
1805
1806	need->cld_mode = CLM_READ; /* CLM_READ matches both READ & WRITE, but
1807				    * not PHANTOM */
1808	need->cld_start = need->cld_end = index;
1809	need->cld_enq_flags = 0;
1810
1811	spin_lock(&head->coh_lock_guard);
1812	/* It is fine to match any group lock since there could be only one
1813	 * with a uniq gid and it conflicts with all other lock modes too */
1814	list_for_each_entry(scan, &head->coh_locks, cll_linkage) {
1815		if (scan != except &&
1816		    (scan->cll_descr.cld_mode == CLM_GROUP ||
1817		    cl_lock_ext_match(&scan->cll_descr, need)) &&
1818		    scan->cll_state >= CLS_HELD &&
1819		    scan->cll_state < CLS_FREEING &&
1820		    /*
1821		     * This check is racy as the lock can be canceled right
1822		     * after it is done, but this is fine, because page exists
1823		     * already.
1824		     */
1825		    (canceld || !(scan->cll_flags & CLF_CANCELLED)) &&
1826		    (pending || !(scan->cll_flags & CLF_CANCELPEND))) {
1827			/* Don't increase cs_hit here since this
1828			 * is just a helper function. */
1829			cl_lock_get_trust(scan);
1830			lock = scan;
1831			break;
1832		}
1833	}
1834	spin_unlock(&head->coh_lock_guard);
1835	return lock;
1836}
1837EXPORT_SYMBOL(cl_lock_at_pgoff);
1838
1839/**
1840 * Calculate the page offset at the layer of @lock.
1841 * At the time of this writing, @page is top page and @lock is sub lock.
1842 */
1843static pgoff_t pgoff_at_lock(struct cl_page *page, struct cl_lock *lock)
1844{
1845	struct lu_device_type *dtype;
1846	const struct cl_page_slice *slice;
1847
1848	dtype = lock->cll_descr.cld_obj->co_lu.lo_dev->ld_type;
1849	slice = cl_page_at(page, dtype);
1850	LASSERT(slice != NULL);
1851	return slice->cpl_page->cp_index;
1852}
1853
1854/**
1855 * Check if page @page is covered by an extra lock or discard it.
1856 */
1857static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
1858				struct cl_page *page, void *cbdata)
1859{
1860	struct cl_thread_info *info = cl_env_info(env);
1861	struct cl_lock *lock = cbdata;
1862	pgoff_t index = pgoff_at_lock(page, lock);
1863
1864	if (index >= info->clt_fn_index) {
1865		struct cl_lock *tmp;
1866
1867		/* refresh non-overlapped index */
1868		tmp = cl_lock_at_pgoff(env, lock->cll_descr.cld_obj, index,
1869					lock, 1, 0);
1870		if (tmp != NULL) {
1871			/* Cache the first-non-overlapped index so as to skip
1872			 * all pages within [index, clt_fn_index). This
1873			 * is safe because if tmp lock is canceled, it will
1874			 * discard these pages. */
1875			info->clt_fn_index = tmp->cll_descr.cld_end + 1;
1876			if (tmp->cll_descr.cld_end == CL_PAGE_EOF)
1877				info->clt_fn_index = CL_PAGE_EOF;
1878			cl_lock_put(env, tmp);
1879		} else if (cl_page_own(env, io, page) == 0) {
1880			/* discard the page */
1881			cl_page_unmap(env, io, page);
1882			cl_page_discard(env, io, page);
1883			cl_page_disown(env, io, page);
1884		} else {
1885			LASSERT(page->cp_state == CPS_FREEING);
1886		}
1887	}
1888
1889	info->clt_next_index = index + 1;
1890	return CLP_GANG_OKAY;
1891}
1892
1893static int discard_cb(const struct lu_env *env, struct cl_io *io,
1894		      struct cl_page *page, void *cbdata)
1895{
1896	struct cl_thread_info *info = cl_env_info(env);
1897	struct cl_lock *lock   = cbdata;
1898
1899	LASSERT(lock->cll_descr.cld_mode >= CLM_WRITE);
1900	KLASSERT(ergo(page->cp_type == CPT_CACHEABLE,
1901		      !PageWriteback(cl_page_vmpage(env, page))));
1902	KLASSERT(ergo(page->cp_type == CPT_CACHEABLE,
1903		      !PageDirty(cl_page_vmpage(env, page))));
1904
1905	info->clt_next_index = pgoff_at_lock(page, lock) + 1;
1906	if (cl_page_own(env, io, page) == 0) {
1907		/* discard the page */
1908		cl_page_unmap(env, io, page);
1909		cl_page_discard(env, io, page);
1910		cl_page_disown(env, io, page);
1911	} else {
1912		LASSERT(page->cp_state == CPS_FREEING);
1913	}
1914
1915	return CLP_GANG_OKAY;
1916}
1917
1918/**
1919 * Discard pages protected by the given lock. This function traverses radix
1920 * tree to find all covering pages and discard them. If a page is being covered
1921 * by other locks, it should remain in cache.
1922 *
1923 * If error happens on any step, the process continues anyway (the reasoning
1924 * behind this being that lock cancellation cannot be delayed indefinitely).
1925 */
1926int cl_lock_discard_pages(const struct lu_env *env, struct cl_lock *lock)
1927{
1928	struct cl_thread_info *info  = cl_env_info(env);
1929	struct cl_io	  *io    = &info->clt_io;
1930	struct cl_lock_descr  *descr = &lock->cll_descr;
1931	cl_page_gang_cb_t      cb;
1932	int res;
1933	int result;
1934
1935	LINVRNT(cl_lock_invariant(env, lock));
1936
1937	io->ci_obj = cl_object_top(descr->cld_obj);
1938	io->ci_ignore_layout = 1;
1939	result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
1940	if (result != 0)
1941		goto out;
1942
1943	cb = descr->cld_mode == CLM_READ ? check_and_discard_cb : discard_cb;
1944	info->clt_fn_index = info->clt_next_index = descr->cld_start;
1945	do {
1946		res = cl_page_gang_lookup(env, descr->cld_obj, io,
1947					  info->clt_next_index, descr->cld_end,
1948					  cb, (void *)lock);
1949		if (info->clt_next_index > descr->cld_end)
1950			break;
1951
1952		if (res == CLP_GANG_RESCHED)
1953			cond_resched();
1954	} while (res != CLP_GANG_OKAY);
1955out:
1956	cl_io_fini(env, io);
1957	return result;
1958}
1959EXPORT_SYMBOL(cl_lock_discard_pages);
1960
1961/**
1962 * Eliminate all locks for a given object.
1963 *
1964 * Caller has to guarantee that no lock is in active use.
1965 *
1966 * \param cancel when this is set, cl_locks_prune() cancels locks before
1967 *	       destroying.
1968 */
1969void cl_locks_prune(const struct lu_env *env, struct cl_object *obj, int cancel)
1970{
1971	struct cl_object_header *head;
1972	struct cl_lock	  *lock;
1973
1974	head = cl_object_header(obj);
1975	/*
1976	 * If locks are destroyed without cancellation, all pages must be
1977	 * already destroyed (as otherwise they will be left unprotected).
1978	 */
1979	LASSERT(ergo(!cancel,
1980		     head->coh_tree.rnode == NULL && head->coh_pages == 0));
1981
1982	spin_lock(&head->coh_lock_guard);
1983	while (!list_empty(&head->coh_locks)) {
1984		lock = container_of(head->coh_locks.next,
1985				    struct cl_lock, cll_linkage);
1986		cl_lock_get_trust(lock);
1987		spin_unlock(&head->coh_lock_guard);
1988		lu_ref_add(&lock->cll_reference, "prune", current);
1989
1990again:
1991		cl_lock_mutex_get(env, lock);
1992		if (lock->cll_state < CLS_FREEING) {
1993			LASSERT(lock->cll_users <= 1);
1994			if (unlikely(lock->cll_users == 1)) {
1995				struct l_wait_info lwi = { 0 };
1996
1997				cl_lock_mutex_put(env, lock);
1998				l_wait_event(lock->cll_wq,
1999					     lock->cll_users == 0,
2000					     &lwi);
2001				goto again;
2002			}
2003
2004			if (cancel)
2005				cl_lock_cancel(env, lock);
2006			cl_lock_delete(env, lock);
2007		}
2008		cl_lock_mutex_put(env, lock);
2009		lu_ref_del(&lock->cll_reference, "prune", current);
2010		cl_lock_put(env, lock);
2011		spin_lock(&head->coh_lock_guard);
2012	}
2013	spin_unlock(&head->coh_lock_guard);
2014}
2015EXPORT_SYMBOL(cl_locks_prune);
2016
2017static struct cl_lock *cl_lock_hold_mutex(const struct lu_env *env,
2018					  const struct cl_io *io,
2019					  const struct cl_lock_descr *need,
2020					  const char *scope, const void *source)
2021{
2022	struct cl_lock *lock;
2023
2024	while (1) {
2025		lock = cl_lock_find(env, io, need);
2026		if (IS_ERR(lock))
2027			break;
2028		cl_lock_mutex_get(env, lock);
2029		if (lock->cll_state < CLS_FREEING &&
2030		    !(lock->cll_flags & CLF_CANCELLED)) {
2031			cl_lock_hold_mod(env, lock, +1);
2032			lu_ref_add(&lock->cll_holders, scope, source);
2033			lu_ref_add(&lock->cll_reference, scope, source);
2034			break;
2035		}
2036		cl_lock_mutex_put(env, lock);
2037		cl_lock_put(env, lock);
2038	}
2039	return lock;
2040}
2041
2042/**
2043 * Returns a lock matching \a need description with a reference and a hold on
2044 * it.
2045 *
2046 * This is much like cl_lock_find(), except that cl_lock_hold() additionally
2047 * guarantees that lock is not in the CLS_FREEING state on return.
2048 */
2049struct cl_lock *cl_lock_hold(const struct lu_env *env, const struct cl_io *io,
2050			     const struct cl_lock_descr *need,
2051			     const char *scope, const void *source)
2052{
2053	struct cl_lock *lock;
2054
2055	lock = cl_lock_hold_mutex(env, io, need, scope, source);
2056	if (!IS_ERR(lock))
2057		cl_lock_mutex_put(env, lock);
2058	return lock;
2059}
2060EXPORT_SYMBOL(cl_lock_hold);
2061
2062/**
2063 * Main high-level entry point of cl_lock interface that finds existing or
2064 * enqueues new lock matching given description.
2065 */
2066struct cl_lock *cl_lock_request(const struct lu_env *env, struct cl_io *io,
2067				const struct cl_lock_descr *need,
2068				const char *scope, const void *source)
2069{
2070	struct cl_lock       *lock;
2071	int		   rc;
2072	__u32		 enqflags = need->cld_enq_flags;
2073
2074	do {
2075		lock = cl_lock_hold_mutex(env, io, need, scope, source);
2076		if (IS_ERR(lock))
2077			break;
2078
2079		rc = cl_enqueue_locked(env, lock, io, enqflags);
2080		if (rc == 0) {
2081			if (cl_lock_fits_into(env, lock, need, io)) {
2082				if (!(enqflags & CEF_AGL)) {
2083					cl_lock_mutex_put(env, lock);
2084					cl_lock_lockdep_acquire(env, lock,
2085								enqflags);
2086					break;
2087				}
2088				rc = 1;
2089			}
2090			cl_unuse_locked(env, lock);
2091		}
2092		cl_lock_trace(D_DLMTRACE, env,
2093			      rc <= 0 ? "enqueue failed" : "agl succeed", lock);
2094		cl_lock_hold_release(env, lock, scope, source);
2095		cl_lock_mutex_put(env, lock);
2096		lu_ref_del(&lock->cll_reference, scope, source);
2097		cl_lock_put(env, lock);
2098		if (rc > 0) {
2099			LASSERT(enqflags & CEF_AGL);
2100			lock = NULL;
2101		} else if (rc != 0) {
2102			lock = ERR_PTR(rc);
2103		}
2104	} while (rc == 0);
2105	return lock;
2106}
2107EXPORT_SYMBOL(cl_lock_request);
2108
2109/**
2110 * Adds a hold to a known lock.
2111 */
2112void cl_lock_hold_add(const struct lu_env *env, struct cl_lock *lock,
2113		      const char *scope, const void *source)
2114{
2115	LINVRNT(cl_lock_is_mutexed(lock));
2116	LINVRNT(cl_lock_invariant(env, lock));
2117	LASSERT(lock->cll_state != CLS_FREEING);
2118
2119	cl_lock_hold_mod(env, lock, +1);
2120	cl_lock_get(lock);
2121	lu_ref_add(&lock->cll_holders, scope, source);
2122	lu_ref_add(&lock->cll_reference, scope, source);
2123}
2124EXPORT_SYMBOL(cl_lock_hold_add);
2125
2126/**
2127 * Releases a hold and a reference on a lock, on which caller acquired a
2128 * mutex.
2129 */
2130void cl_lock_unhold(const struct lu_env *env, struct cl_lock *lock,
2131		    const char *scope, const void *source)
2132{
2133	LINVRNT(cl_lock_invariant(env, lock));
2134	cl_lock_hold_release(env, lock, scope, source);
2135	lu_ref_del(&lock->cll_reference, scope, source);
2136	cl_lock_put(env, lock);
2137}
2138EXPORT_SYMBOL(cl_lock_unhold);
2139
2140/**
2141 * Releases a hold and a reference on a lock, obtained by cl_lock_hold().
2142 */
2143void cl_lock_release(const struct lu_env *env, struct cl_lock *lock,
2144		     const char *scope, const void *source)
2145{
2146	LINVRNT(cl_lock_invariant(env, lock));
2147	cl_lock_trace(D_DLMTRACE, env, "release lock", lock);
2148	cl_lock_mutex_get(env, lock);
2149	cl_lock_hold_release(env, lock, scope, source);
2150	cl_lock_mutex_put(env, lock);
2151	lu_ref_del(&lock->cll_reference, scope, source);
2152	cl_lock_put(env, lock);
2153}
2154EXPORT_SYMBOL(cl_lock_release);
2155
2156void cl_lock_user_add(const struct lu_env *env, struct cl_lock *lock)
2157{
2158	LINVRNT(cl_lock_is_mutexed(lock));
2159	LINVRNT(cl_lock_invariant(env, lock));
2160
2161	cl_lock_used_mod(env, lock, +1);
2162}
2163EXPORT_SYMBOL(cl_lock_user_add);
2164
2165void cl_lock_user_del(const struct lu_env *env, struct cl_lock *lock)
2166{
2167	LINVRNT(cl_lock_is_mutexed(lock));
2168	LINVRNT(cl_lock_invariant(env, lock));
2169	LASSERT(lock->cll_users > 0);
2170
2171	cl_lock_used_mod(env, lock, -1);
2172	if (lock->cll_users == 0)
2173		wake_up_all(&lock->cll_wq);
2174}
2175EXPORT_SYMBOL(cl_lock_user_del);
2176
2177const char *cl_lock_mode_name(const enum cl_lock_mode mode)
2178{
2179	static const char *names[] = {
2180		[CLM_PHANTOM] = "P",
2181		[CLM_READ]    = "R",
2182		[CLM_WRITE]   = "W",
2183		[CLM_GROUP]   = "G"
2184	};
2185	if (0 <= mode && mode < ARRAY_SIZE(names))
2186		return names[mode];
2187	else
2188		return "U";
2189}
2190EXPORT_SYMBOL(cl_lock_mode_name);
2191
2192/**
2193 * Prints human readable representation of a lock description.
2194 */
2195void cl_lock_descr_print(const struct lu_env *env, void *cookie,
2196		       lu_printer_t printer,
2197		       const struct cl_lock_descr *descr)
2198{
2199	const struct lu_fid  *fid;
2200
2201	fid = lu_object_fid(&descr->cld_obj->co_lu);
2202	(*printer)(env, cookie, DDESCR"@"DFID, PDESCR(descr), PFID(fid));
2203}
2204EXPORT_SYMBOL(cl_lock_descr_print);
2205
2206/**
2207 * Prints human readable representation of \a lock to the \a f.
2208 */
2209void cl_lock_print(const struct lu_env *env, void *cookie,
2210		   lu_printer_t printer, const struct cl_lock *lock)
2211{
2212	const struct cl_lock_slice *slice;
2213	(*printer)(env, cookie, "lock@%p[%d %d %d %d %d %08lx] ",
2214		   lock, atomic_read(&lock->cll_ref),
2215		   lock->cll_state, lock->cll_error, lock->cll_holds,
2216		   lock->cll_users, lock->cll_flags);
2217	cl_lock_descr_print(env, cookie, printer, &lock->cll_descr);
2218	(*printer)(env, cookie, " {\n");
2219
2220	list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
2221		(*printer)(env, cookie, "    %s@%p: ",
2222			   slice->cls_obj->co_lu.lo_dev->ld_type->ldt_name,
2223			   slice);
2224		if (slice->cls_ops->clo_print != NULL)
2225			slice->cls_ops->clo_print(env, cookie, printer, slice);
2226		(*printer)(env, cookie, "\n");
2227	}
2228	(*printer)(env, cookie, "} lock@%p\n", lock);
2229}
2230EXPORT_SYMBOL(cl_lock_print);
2231
2232int cl_lock_init(void)
2233{
2234	return lu_kmem_init(cl_lock_caches);
2235}
2236
2237void cl_lock_fini(void)
2238{
2239	lu_kmem_fini(cl_lock_caches);
2240}
2241