[go: nahoru, domu]

1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 *
36 * Implementation of cl_lock for OSC layer.
37 *
38 *   Author: Nikita Danilov <nikita.danilov@sun.com>
39 */
40
41#define DEBUG_SUBSYSTEM S_OSC
42
43#include "../../include/linux/libcfs/libcfs.h"
44/* fid_build_reg_res_name() */
45#include "../include/lustre_fid.h"
46
47#include "osc_cl_internal.h"
48
49/** \addtogroup osc
50 *  @{
51 */
52
53#define _PAGEREF_MAGIC  (-10000000)
54
55/*****************************************************************************
56 *
57 * Type conversions.
58 *
59 */
60
61static const struct cl_lock_operations osc_lock_ops;
62static const struct cl_lock_operations osc_lock_lockless_ops;
63static void osc_lock_to_lockless(const struct lu_env *env,
64				 struct osc_lock *ols, int force);
65static int osc_lock_has_pages(struct osc_lock *olck);
66
67int osc_lock_is_lockless(const struct osc_lock *olck)
68{
69	return (olck->ols_cl.cls_ops == &osc_lock_lockless_ops);
70}
71
72/**
73 * Returns a weak pointer to the ldlm lock identified by a handle. Returned
74 * pointer cannot be dereferenced, as lock is not protected from concurrent
75 * reclaim. This function is a helper for osc_lock_invariant().
76 */
77static struct ldlm_lock *osc_handle_ptr(struct lustre_handle *handle)
78{
79	struct ldlm_lock *lock;
80
81	lock = ldlm_handle2lock(handle);
82	if (lock != NULL)
83		LDLM_LOCK_PUT(lock);
84	return lock;
85}
86
87/**
88 * Invariant that has to be true all of the time.
89 */
90static int osc_lock_invariant(struct osc_lock *ols)
91{
92	struct ldlm_lock *lock	      = osc_handle_ptr(&ols->ols_handle);
93	struct ldlm_lock *olock	      = ols->ols_lock;
94	int		  handle_used = lustre_handle_is_used(&ols->ols_handle);
95
96	if (ergo(osc_lock_is_lockless(ols),
97		 ols->ols_locklessable && ols->ols_lock == NULL))
98		return 1;
99
100	/*
101	 * If all the following "ergo"s are true, return 1, otherwise 0
102	 */
103	if (! ergo(olock != NULL, handle_used))
104		return 0;
105
106	if (! ergo(olock != NULL,
107		   olock->l_handle.h_cookie == ols->ols_handle.cookie))
108		return 0;
109
110	if (! ergo(handle_used,
111		   ergo(lock != NULL && olock != NULL, lock == olock) &&
112		   ergo(lock == NULL, olock == NULL)))
113		return 0;
114	/*
115	 * Check that ->ols_handle and ->ols_lock are consistent, but
116	 * take into account that they are set at the different time.
117	 */
118	if (! ergo(ols->ols_state == OLS_CANCELLED,
119		   olock == NULL && !handle_used))
120		return 0;
121	/*
122	 * DLM lock is destroyed only after we have seen cancellation
123	 * ast.
124	 */
125	if (! ergo(olock != NULL && ols->ols_state < OLS_CANCELLED,
126		   ((olock->l_flags & LDLM_FL_DESTROYED) == 0)))
127		return 0;
128
129	if (! ergo(ols->ols_state == OLS_GRANTED,
130		   olock != NULL &&
131		   olock->l_req_mode == olock->l_granted_mode &&
132		   ols->ols_hold))
133		return 0;
134	return 1;
135}
136
137/*****************************************************************************
138 *
139 * Lock operations.
140 *
141 */
142
143/**
144 * Breaks a link between osc_lock and dlm_lock.
145 */
146static void osc_lock_detach(const struct lu_env *env, struct osc_lock *olck)
147{
148	struct ldlm_lock *dlmlock;
149
150	spin_lock(&osc_ast_guard);
151	dlmlock = olck->ols_lock;
152	if (dlmlock == NULL) {
153		spin_unlock(&osc_ast_guard);
154		return;
155	}
156
157	olck->ols_lock = NULL;
158	/* wb(); --- for all who checks (ols->ols_lock != NULL) before
159	 * call to osc_lock_detach() */
160	dlmlock->l_ast_data = NULL;
161	olck->ols_handle.cookie = 0ULL;
162	spin_unlock(&osc_ast_guard);
163
164	lock_res_and_lock(dlmlock);
165	if (dlmlock->l_granted_mode == dlmlock->l_req_mode) {
166		struct cl_object *obj = olck->ols_cl.cls_obj;
167		struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
168		__u64 old_kms;
169
170		cl_object_attr_lock(obj);
171		/* Must get the value under the lock to avoid possible races. */
172		old_kms = cl2osc(obj)->oo_oinfo->loi_kms;
173		/* Update the kms. Need to loop all granted locks.
174		 * Not a problem for the client */
175		attr->cat_kms = ldlm_extent_shift_kms(dlmlock, old_kms);
176
177		cl_object_attr_set(env, obj, attr, CAT_KMS);
178		cl_object_attr_unlock(obj);
179	}
180	unlock_res_and_lock(dlmlock);
181
182	/* release a reference taken in osc_lock_upcall0(). */
183	LASSERT(olck->ols_has_ref);
184	lu_ref_del(&dlmlock->l_reference, "osc_lock", olck);
185	LDLM_LOCK_RELEASE(dlmlock);
186	olck->ols_has_ref = 0;
187}
188
189static int osc_lock_unhold(struct osc_lock *ols)
190{
191	int result = 0;
192
193	if (ols->ols_hold) {
194		ols->ols_hold = 0;
195		result = osc_cancel_base(&ols->ols_handle,
196					 ols->ols_einfo.ei_mode);
197	}
198	return result;
199}
200
201static int osc_lock_unuse(const struct lu_env *env,
202			  const struct cl_lock_slice *slice)
203{
204	struct osc_lock *ols = cl2osc_lock(slice);
205
206	LINVRNT(osc_lock_invariant(ols));
207
208	switch (ols->ols_state) {
209	case OLS_NEW:
210		LASSERT(!ols->ols_hold);
211		LASSERT(ols->ols_agl);
212		return 0;
213	case OLS_UPCALL_RECEIVED:
214		osc_lock_unhold(ols);
215	case OLS_ENQUEUED:
216		LASSERT(!ols->ols_hold);
217		osc_lock_detach(env, ols);
218		ols->ols_state = OLS_NEW;
219		return 0;
220	case OLS_GRANTED:
221		LASSERT(!ols->ols_glimpse);
222		LASSERT(ols->ols_hold);
223		/*
224		 * Move lock into OLS_RELEASED state before calling
225		 * osc_cancel_base() so that possible synchronous cancellation
226		 * (that always happens e.g., for liblustre) sees that lock is
227		 * released.
228		 */
229		ols->ols_state = OLS_RELEASED;
230		return osc_lock_unhold(ols);
231	default:
232		CERROR("Impossible state: %d\n", ols->ols_state);
233		LBUG();
234	}
235}
236
237static void osc_lock_fini(const struct lu_env *env,
238			  struct cl_lock_slice *slice)
239{
240	struct osc_lock  *ols = cl2osc_lock(slice);
241
242	LINVRNT(osc_lock_invariant(ols));
243	/*
244	 * ->ols_hold can still be true at this point if, for example, a
245	 * thread that requested a lock was killed (and released a reference
246	 * to the lock), before reply from a server was received. In this case
247	 * lock is destroyed immediately after upcall.
248	 */
249	osc_lock_unhold(ols);
250	LASSERT(ols->ols_lock == NULL);
251	LASSERT(atomic_read(&ols->ols_pageref) == 0 ||
252		atomic_read(&ols->ols_pageref) == _PAGEREF_MAGIC);
253
254	OBD_SLAB_FREE_PTR(ols, osc_lock_kmem);
255}
256
257static void osc_lock_build_policy(const struct lu_env *env,
258				  const struct cl_lock *lock,
259				  ldlm_policy_data_t *policy)
260{
261	const struct cl_lock_descr *d = &lock->cll_descr;
262
263	osc_index2policy(policy, d->cld_obj, d->cld_start, d->cld_end);
264	policy->l_extent.gid = d->cld_gid;
265}
266
267static __u64 osc_enq2ldlm_flags(__u32 enqflags)
268{
269	__u64 result = 0;
270
271	LASSERT((enqflags & ~CEF_MASK) == 0);
272
273	if (enqflags & CEF_NONBLOCK)
274		result |= LDLM_FL_BLOCK_NOWAIT;
275	if (enqflags & CEF_ASYNC)
276		result |= LDLM_FL_HAS_INTENT;
277	if (enqflags & CEF_DISCARD_DATA)
278		result |= LDLM_FL_AST_DISCARD_DATA;
279	return result;
280}
281
282/**
283 * Global spin-lock protecting consistency of ldlm_lock::l_ast_data
284 * pointers. Initialized in osc_init().
285 */
286spinlock_t osc_ast_guard;
287
288static struct osc_lock *osc_ast_data_get(struct ldlm_lock *dlm_lock)
289{
290	struct osc_lock *olck;
291
292	lock_res_and_lock(dlm_lock);
293	spin_lock(&osc_ast_guard);
294	olck = dlm_lock->l_ast_data;
295	if (olck != NULL) {
296		struct cl_lock *lock = olck->ols_cl.cls_lock;
297		/*
298		 * If osc_lock holds a reference on ldlm lock, return it even
299		 * when cl_lock is in CLS_FREEING state. This way
300		 *
301		 *	 osc_ast_data_get(dlmlock) == NULL
302		 *
303		 * guarantees that all osc references on dlmlock were
304		 * released. osc_dlm_blocking_ast0() relies on that.
305		 */
306		if (lock->cll_state < CLS_FREEING || olck->ols_has_ref) {
307			cl_lock_get_trust(lock);
308			lu_ref_add_atomic(&lock->cll_reference,
309					  "ast", current);
310		} else
311			olck = NULL;
312	}
313	spin_unlock(&osc_ast_guard);
314	unlock_res_and_lock(dlm_lock);
315	return olck;
316}
317
318static void osc_ast_data_put(const struct lu_env *env, struct osc_lock *olck)
319{
320	struct cl_lock *lock;
321
322	lock = olck->ols_cl.cls_lock;
323	lu_ref_del(&lock->cll_reference, "ast", current);
324	cl_lock_put(env, lock);
325}
326
327/**
328 * Updates object attributes from a lock value block (lvb) received together
329 * with the DLM lock reply from the server. Copy of osc_update_enqueue()
330 * logic.
331 *
332 * This can be optimized to not update attributes when lock is a result of a
333 * local match.
334 *
335 * Called under lock and resource spin-locks.
336 */
337static void osc_lock_lvb_update(const struct lu_env *env, struct osc_lock *olck,
338				int rc)
339{
340	struct ost_lvb    *lvb;
341	struct cl_object  *obj;
342	struct lov_oinfo  *oinfo;
343	struct cl_attr    *attr;
344	unsigned	   valid;
345
346	if (!(olck->ols_flags & LDLM_FL_LVB_READY))
347		return;
348
349	lvb   = &olck->ols_lvb;
350	obj   = olck->ols_cl.cls_obj;
351	oinfo = cl2osc(obj)->oo_oinfo;
352	attr  = &osc_env_info(env)->oti_attr;
353	valid = CAT_BLOCKS | CAT_ATIME | CAT_CTIME | CAT_MTIME | CAT_SIZE;
354	cl_lvb2attr(attr, lvb);
355
356	cl_object_attr_lock(obj);
357	if (rc == 0) {
358		struct ldlm_lock  *dlmlock;
359		__u64 size;
360
361		dlmlock = olck->ols_lock;
362		LASSERT(dlmlock != NULL);
363
364		/* re-grab LVB from a dlm lock under DLM spin-locks. */
365		*lvb = *(struct ost_lvb *)dlmlock->l_lvb_data;
366		size = lvb->lvb_size;
367		/* Extend KMS up to the end of this lock and no further
368		 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
369		if (size > dlmlock->l_policy_data.l_extent.end)
370			size = dlmlock->l_policy_data.l_extent.end + 1;
371		if (size >= oinfo->loi_kms) {
372			LDLM_DEBUG(dlmlock, "lock acquired, setting rss=%llu, kms=%llu",
373				   lvb->lvb_size, size);
374			valid |= CAT_KMS;
375			attr->cat_kms = size;
376		} else {
377			LDLM_DEBUG(dlmlock, "lock acquired, setting rss=%llu; leaving kms=%llu, end=%llu",
378				   lvb->lvb_size, oinfo->loi_kms,
379				   dlmlock->l_policy_data.l_extent.end);
380		}
381		ldlm_lock_allow_match_locked(dlmlock);
382	} else if (rc == -ENAVAIL && olck->ols_glimpse) {
383		CDEBUG(D_INODE, "glimpsed, setting rss=%llu; leaving kms=%llu\n",
384		       lvb->lvb_size, oinfo->loi_kms);
385	} else
386		valid = 0;
387
388	if (valid != 0)
389		cl_object_attr_set(env, obj, attr, valid);
390
391	cl_object_attr_unlock(obj);
392}
393
394/**
395 * Called when a lock is granted, from an upcall (when server returned a
396 * granted lock), or from completion AST, when server returned a blocked lock.
397 *
398 * Called under lock and resource spin-locks, that are released temporarily
399 * here.
400 */
401static void osc_lock_granted(const struct lu_env *env, struct osc_lock *olck,
402			     struct ldlm_lock *dlmlock, int rc)
403{
404	struct ldlm_extent   *ext;
405	struct cl_lock       *lock;
406	struct cl_lock_descr *descr;
407
408	LASSERT(dlmlock->l_granted_mode == dlmlock->l_req_mode);
409
410	if (olck->ols_state < OLS_GRANTED) {
411		lock  = olck->ols_cl.cls_lock;
412		ext   = &dlmlock->l_policy_data.l_extent;
413		descr = &osc_env_info(env)->oti_descr;
414		descr->cld_obj = lock->cll_descr.cld_obj;
415
416		/* XXX check that ->l_granted_mode is valid. */
417		descr->cld_mode  = osc_ldlm2cl_lock(dlmlock->l_granted_mode);
418		descr->cld_start = cl_index(descr->cld_obj, ext->start);
419		descr->cld_end   = cl_index(descr->cld_obj, ext->end);
420		descr->cld_gid   = ext->gid;
421		/*
422		 * tell upper layers the extent of the lock that was actually
423		 * granted
424		 */
425		olck->ols_state = OLS_GRANTED;
426		osc_lock_lvb_update(env, olck, rc);
427
428		/* release DLM spin-locks to allow cl_lock_{modify,signal}()
429		 * to take a semaphore on a parent lock. This is safe, because
430		 * spin-locks are needed to protect consistency of
431		 * dlmlock->l_*_mode and LVB, and we have finished processing
432		 * them. */
433		unlock_res_and_lock(dlmlock);
434		cl_lock_modify(env, lock, descr);
435		cl_lock_signal(env, lock);
436		LINVRNT(osc_lock_invariant(olck));
437		lock_res_and_lock(dlmlock);
438	}
439}
440
441static void osc_lock_upcall0(const struct lu_env *env, struct osc_lock *olck)
442
443{
444	struct ldlm_lock *dlmlock;
445
446	dlmlock = ldlm_handle2lock_long(&olck->ols_handle, 0);
447	LASSERT(dlmlock != NULL);
448
449	lock_res_and_lock(dlmlock);
450	spin_lock(&osc_ast_guard);
451	LASSERT(dlmlock->l_ast_data == olck);
452	LASSERT(olck->ols_lock == NULL);
453	olck->ols_lock = dlmlock;
454	spin_unlock(&osc_ast_guard);
455
456	/*
457	 * Lock might be not yet granted. In this case, completion ast
458	 * (osc_ldlm_completion_ast()) comes later and finishes lock
459	 * granting.
460	 */
461	if (dlmlock->l_granted_mode == dlmlock->l_req_mode)
462		osc_lock_granted(env, olck, dlmlock, 0);
463	unlock_res_and_lock(dlmlock);
464
465	/*
466	 * osc_enqueue_interpret() decrefs asynchronous locks, counter
467	 * this.
468	 */
469	ldlm_lock_addref(&olck->ols_handle, olck->ols_einfo.ei_mode);
470	olck->ols_hold = 1;
471
472	/* lock reference taken by ldlm_handle2lock_long() is owned by
473	 * osc_lock and released in osc_lock_detach() */
474	lu_ref_add(&dlmlock->l_reference, "osc_lock", olck);
475	olck->ols_has_ref = 1;
476}
477
478/**
479 * Lock upcall function that is executed either when a reply to ENQUEUE rpc is
480 * received from a server, or after osc_enqueue_base() matched a local DLM
481 * lock.
482 */
483static int osc_lock_upcall(void *cookie, int errcode)
484{
485	struct osc_lock	 *olck  = cookie;
486	struct cl_lock_slice    *slice = &olck->ols_cl;
487	struct cl_lock	  *lock  = slice->cls_lock;
488	struct lu_env	   *env;
489	struct cl_env_nest       nest;
490
491	env = cl_env_nested_get(&nest);
492	if (!IS_ERR(env)) {
493		int rc;
494
495		cl_lock_mutex_get(env, lock);
496
497		LASSERT(lock->cll_state >= CLS_QUEUING);
498		if (olck->ols_state == OLS_ENQUEUED) {
499			olck->ols_state = OLS_UPCALL_RECEIVED;
500			rc = ldlm_error2errno(errcode);
501		} else if (olck->ols_state == OLS_CANCELLED) {
502			rc = -EIO;
503		} else {
504			CERROR("Impossible state: %d\n", olck->ols_state);
505			LBUG();
506		}
507		if (rc) {
508			struct ldlm_lock *dlmlock;
509
510			dlmlock = ldlm_handle2lock(&olck->ols_handle);
511			if (dlmlock != NULL) {
512				lock_res_and_lock(dlmlock);
513				spin_lock(&osc_ast_guard);
514				LASSERT(olck->ols_lock == NULL);
515				dlmlock->l_ast_data = NULL;
516				olck->ols_handle.cookie = 0ULL;
517				spin_unlock(&osc_ast_guard);
518				ldlm_lock_fail_match_locked(dlmlock);
519				unlock_res_and_lock(dlmlock);
520				LDLM_LOCK_PUT(dlmlock);
521			}
522		} else {
523			if (olck->ols_glimpse)
524				olck->ols_glimpse = 0;
525			osc_lock_upcall0(env, olck);
526		}
527
528		/* Error handling, some errors are tolerable. */
529		if (olck->ols_locklessable && rc == -EUSERS) {
530			/* This is a tolerable error, turn this lock into
531			 * lockless lock.
532			 */
533			osc_object_set_contended(cl2osc(slice->cls_obj));
534			LASSERT(slice->cls_ops == &osc_lock_ops);
535
536			/* Change this lock to ldlmlock-less lock. */
537			osc_lock_to_lockless(env, olck, 1);
538			olck->ols_state = OLS_GRANTED;
539			rc = 0;
540		} else if (olck->ols_glimpse && rc == -ENAVAIL) {
541			osc_lock_lvb_update(env, olck, rc);
542			cl_lock_delete(env, lock);
543			/* Hide the error. */
544			rc = 0;
545		}
546
547		if (rc == 0) {
548			/* For AGL case, the RPC sponsor may exits the cl_lock
549			*  processing without wait() called before related OSC
550			*  lock upcall(). So update the lock status according
551			*  to the enqueue result inside AGL upcall(). */
552			if (olck->ols_agl) {
553				lock->cll_flags |= CLF_FROM_UPCALL;
554				cl_wait_try(env, lock);
555				lock->cll_flags &= ~CLF_FROM_UPCALL;
556				if (!olck->ols_glimpse)
557					olck->ols_agl = 0;
558			}
559			cl_lock_signal(env, lock);
560			/* del user for lock upcall cookie */
561			cl_unuse_try(env, lock);
562		} else {
563			/* del user for lock upcall cookie */
564			cl_lock_user_del(env, lock);
565			cl_lock_error(env, lock, rc);
566		}
567
568		/* release cookie reference, acquired by osc_lock_enqueue() */
569		cl_lock_hold_release(env, lock, "upcall", lock);
570		cl_lock_mutex_put(env, lock);
571
572		lu_ref_del(&lock->cll_reference, "upcall", lock);
573		/* This maybe the last reference, so must be called after
574		 * cl_lock_mutex_put(). */
575		cl_lock_put(env, lock);
576
577		cl_env_nested_put(&nest, env);
578	} else {
579		/* should never happen, similar to osc_ldlm_blocking_ast(). */
580		LBUG();
581	}
582	return errcode;
583}
584
585/**
586 * Core of osc_dlm_blocking_ast() logic.
587 */
588static void osc_lock_blocking(const struct lu_env *env,
589			      struct ldlm_lock *dlmlock,
590			      struct osc_lock *olck, int blocking)
591{
592	struct cl_lock *lock = olck->ols_cl.cls_lock;
593
594	LASSERT(olck->ols_lock == dlmlock);
595	CLASSERT(OLS_BLOCKED < OLS_CANCELLED);
596	LASSERT(!osc_lock_is_lockless(olck));
597
598	/*
599	 * Lock might be still addref-ed here, if e.g., blocking ast
600	 * is sent for a failed lock.
601	 */
602	osc_lock_unhold(olck);
603
604	if (blocking && olck->ols_state < OLS_BLOCKED)
605		/*
606		 * Move osc_lock into OLS_BLOCKED before canceling the lock,
607		 * because it recursively re-enters osc_lock_blocking(), with
608		 * the state set to OLS_CANCELLED.
609		 */
610		olck->ols_state = OLS_BLOCKED;
611	/*
612	 * cancel and destroy lock at least once no matter how blocking ast is
613	 * entered (see comment above osc_ldlm_blocking_ast() for use
614	 * cases). cl_lock_cancel() and cl_lock_delete() are idempotent.
615	 */
616	cl_lock_cancel(env, lock);
617	cl_lock_delete(env, lock);
618}
619
620/**
621 * Helper for osc_dlm_blocking_ast() handling discrepancies between cl_lock
622 * and ldlm_lock caches.
623 */
624static int osc_dlm_blocking_ast0(const struct lu_env *env,
625				 struct ldlm_lock *dlmlock,
626				 void *data, int flag)
627{
628	struct osc_lock *olck;
629	struct cl_lock  *lock;
630	int result;
631	int cancel;
632
633	LASSERT(flag == LDLM_CB_BLOCKING || flag == LDLM_CB_CANCELING);
634
635	cancel = 0;
636	olck = osc_ast_data_get(dlmlock);
637	if (olck != NULL) {
638		lock = olck->ols_cl.cls_lock;
639		cl_lock_mutex_get(env, lock);
640		LINVRNT(osc_lock_invariant(olck));
641		if (olck->ols_ast_wait) {
642			/* wake up osc_lock_use() */
643			cl_lock_signal(env, lock);
644			olck->ols_ast_wait = 0;
645		}
646		/*
647		 * Lock might have been canceled while this thread was
648		 * sleeping for lock mutex, but olck is pinned in memory.
649		 */
650		if (olck == dlmlock->l_ast_data) {
651			/*
652			 * NOTE: DLM sends blocking AST's for failed locks
653			 *       (that are still in pre-OLS_GRANTED state)
654			 *       too, and they have to be canceled otherwise
655			 *       DLM lock is never destroyed and stuck in
656			 *       the memory.
657			 *
658			 *       Alternatively, ldlm_cli_cancel() can be
659			 *       called here directly for osc_locks with
660			 *       ols_state < OLS_GRANTED to maintain an
661			 *       invariant that ->clo_cancel() is only called
662			 *       for locks that were granted.
663			 */
664			LASSERT(data == olck);
665			osc_lock_blocking(env, dlmlock,
666					  olck, flag == LDLM_CB_BLOCKING);
667		} else
668			cancel = 1;
669		cl_lock_mutex_put(env, lock);
670		osc_ast_data_put(env, olck);
671	} else
672		/*
673		 * DLM lock exists, but there is no cl_lock attached to it.
674		 * This is a `normal' race. cl_object and its cl_lock's can be
675		 * removed by memory pressure, together with all pages.
676		 */
677		cancel = (flag == LDLM_CB_BLOCKING);
678
679	if (cancel) {
680		struct lustre_handle *lockh;
681
682		lockh = &osc_env_info(env)->oti_handle;
683		ldlm_lock2handle(dlmlock, lockh);
684		result = ldlm_cli_cancel(lockh, LCF_ASYNC);
685	} else
686		result = 0;
687	return result;
688}
689
690/**
691 * Blocking ast invoked by ldlm when dlm lock is either blocking progress of
692 * some other lock, or is canceled. This function is installed as a
693 * ldlm_lock::l_blocking_ast() for client extent locks.
694 *
695 * Control flow is tricky, because ldlm uses the same call-back
696 * (ldlm_lock::l_blocking_ast()) for both blocking and cancellation ast's.
697 *
698 * \param dlmlock lock for which ast occurred.
699 *
700 * \param new description of a conflicting lock in case of blocking ast.
701 *
702 * \param data value of dlmlock->l_ast_data
703 *
704 * \param flag LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
705 *	     cancellation and blocking ast's.
706 *
707 * Possible use cases:
708 *
709 *     - ldlm calls dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING) to cancel
710 *       lock due to lock lru pressure, or explicit user request to purge
711 *       locks.
712 *
713 *     - ldlm calls dlmlock->l_blocking_ast(..., LDLM_CB_BLOCKING) to notify
714 *       us that dlmlock conflicts with another lock that some client is
715 *       enqueing. Lock is canceled.
716 *
717 *	   - cl_lock_cancel() is called. osc_lock_cancel() calls
718 *	     ldlm_cli_cancel() that calls
719 *
720 *		  dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING)
721 *
722 *	     recursively entering osc_ldlm_blocking_ast().
723 *
724 *     - client cancels lock voluntary (e.g., as a part of early cancellation):
725 *
726 *	   cl_lock_cancel()->
727 *	     osc_lock_cancel()->
728 *	       ldlm_cli_cancel()->
729 *		 dlmlock->l_blocking_ast(..., LDLM_CB_CANCELING)
730 *
731 */
732static int osc_ldlm_blocking_ast(struct ldlm_lock *dlmlock,
733				 struct ldlm_lock_desc *new, void *data,
734				 int flag)
735{
736	struct lu_env     *env;
737	struct cl_env_nest nest;
738	int		result;
739
740	/*
741	 * This can be called in the context of outer IO, e.g.,
742	 *
743	 *     cl_enqueue()->...
744	 *       ->osc_enqueue_base()->...
745	 *	 ->ldlm_prep_elc_req()->...
746	 *	   ->ldlm_cancel_callback()->...
747	 *	     ->osc_ldlm_blocking_ast()
748	 *
749	 * new environment has to be created to not corrupt outer context.
750	 */
751	env = cl_env_nested_get(&nest);
752	if (!IS_ERR(env)) {
753		result = osc_dlm_blocking_ast0(env, dlmlock, data, flag);
754		cl_env_nested_put(&nest, env);
755	} else {
756		result = PTR_ERR(env);
757		/*
758		 * XXX This should never happen, as cl_lock is
759		 * stuck. Pre-allocated environment a la vvp_inode_fini_env
760		 * should be used.
761		 */
762		LBUG();
763	}
764	if (result != 0) {
765		if (result == -ENODATA)
766			result = 0;
767		else
768			CERROR("BAST failed: %d\n", result);
769	}
770	return result;
771}
772
773static int osc_ldlm_completion_ast(struct ldlm_lock *dlmlock,
774				   __u64 flags, void *data)
775{
776	struct cl_env_nest nest;
777	struct lu_env     *env;
778	struct osc_lock   *olck;
779	struct cl_lock    *lock;
780	int result;
781	int dlmrc;
782
783	/* first, do dlm part of the work */
784	dlmrc = ldlm_completion_ast_async(dlmlock, flags, data);
785	/* then, notify cl_lock */
786	env = cl_env_nested_get(&nest);
787	if (!IS_ERR(env)) {
788		olck = osc_ast_data_get(dlmlock);
789		if (olck != NULL) {
790			lock = olck->ols_cl.cls_lock;
791			cl_lock_mutex_get(env, lock);
792			/*
793			 * ldlm_handle_cp_callback() copied LVB from request
794			 * to lock->l_lvb_data, store it in osc_lock.
795			 */
796			LASSERT(dlmlock->l_lvb_data != NULL);
797			lock_res_and_lock(dlmlock);
798			olck->ols_lvb = *(struct ost_lvb *)dlmlock->l_lvb_data;
799			if (olck->ols_lock == NULL) {
800				/*
801				 * upcall (osc_lock_upcall()) hasn't yet been
802				 * called. Do nothing now, upcall will bind
803				 * olck to dlmlock and signal the waiters.
804				 *
805				 * This maintains an invariant that osc_lock
806				 * and ldlm_lock are always bound when
807				 * osc_lock is in OLS_GRANTED state.
808				 */
809			} else if (dlmlock->l_granted_mode ==
810				   dlmlock->l_req_mode) {
811				osc_lock_granted(env, olck, dlmlock, dlmrc);
812			}
813			unlock_res_and_lock(dlmlock);
814
815			if (dlmrc != 0) {
816				CL_LOCK_DEBUG(D_ERROR, env, lock,
817					      "dlmlock returned %d\n", dlmrc);
818				cl_lock_error(env, lock, dlmrc);
819			}
820			cl_lock_mutex_put(env, lock);
821			osc_ast_data_put(env, olck);
822			result = 0;
823		} else
824			result = -ELDLM_NO_LOCK_DATA;
825		cl_env_nested_put(&nest, env);
826	} else
827		result = PTR_ERR(env);
828	return dlmrc ?: result;
829}
830
831static int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data)
832{
833	struct ptlrpc_request  *req  = data;
834	struct osc_lock	*olck;
835	struct cl_lock	 *lock;
836	struct cl_object       *obj;
837	struct cl_env_nest      nest;
838	struct lu_env	  *env;
839	struct ost_lvb	 *lvb;
840	struct req_capsule     *cap;
841	int		     result;
842
843	LASSERT(lustre_msg_get_opc(req->rq_reqmsg) == LDLM_GL_CALLBACK);
844
845	env = cl_env_nested_get(&nest);
846	if (!IS_ERR(env)) {
847		/* osc_ast_data_get() has to go after environment is
848		 * allocated, because osc_ast_data() acquires a
849		 * reference to a lock, and it can only be released in
850		 * environment.
851		 */
852		olck = osc_ast_data_get(dlmlock);
853		if (olck != NULL) {
854			lock = olck->ols_cl.cls_lock;
855			/* Do not grab the mutex of cl_lock for glimpse.
856			 * See LU-1274 for details.
857			 * BTW, it's okay for cl_lock to be cancelled during
858			 * this period because server can handle this race.
859			 * See ldlm_server_glimpse_ast() for details.
860			 * cl_lock_mutex_get(env, lock); */
861			cap = &req->rq_pill;
862			req_capsule_extend(cap, &RQF_LDLM_GL_CALLBACK);
863			req_capsule_set_size(cap, &RMF_DLM_LVB, RCL_SERVER,
864					     sizeof(*lvb));
865			result = req_capsule_server_pack(cap);
866			if (result == 0) {
867				lvb = req_capsule_server_get(cap, &RMF_DLM_LVB);
868				obj = lock->cll_descr.cld_obj;
869				result = cl_object_glimpse(env, obj, lvb);
870			}
871			if (!exp_connect_lvb_type(req->rq_export))
872				req_capsule_shrink(&req->rq_pill,
873						   &RMF_DLM_LVB,
874						   sizeof(struct ost_lvb_v1),
875						   RCL_SERVER);
876			osc_ast_data_put(env, olck);
877		} else {
878			/*
879			 * These errors are normal races, so we don't want to
880			 * fill the console with messages by calling
881			 * ptlrpc_error()
882			 */
883			lustre_pack_reply(req, 1, NULL, NULL);
884			result = -ELDLM_NO_LOCK_DATA;
885		}
886		cl_env_nested_put(&nest, env);
887	} else
888		result = PTR_ERR(env);
889	req->rq_status = result;
890	return result;
891}
892
893static unsigned long osc_lock_weigh(const struct lu_env *env,
894				    const struct cl_lock_slice *slice)
895{
896	/*
897	 * don't need to grab coh_page_guard since we don't care the exact #
898	 * of pages..
899	 */
900	return cl_object_header(slice->cls_obj)->coh_pages;
901}
902
903static void osc_lock_build_einfo(const struct lu_env *env,
904				 const struct cl_lock *clock,
905				 struct osc_lock *lock,
906				 struct ldlm_enqueue_info *einfo)
907{
908	enum cl_lock_mode mode;
909
910	mode = clock->cll_descr.cld_mode;
911	if (mode == CLM_PHANTOM)
912		/*
913		 * For now, enqueue all glimpse locks in read mode. In the
914		 * future, client might choose to enqueue LCK_PW lock for
915		 * glimpse on a file opened for write.
916		 */
917		mode = CLM_READ;
918
919	einfo->ei_type   = LDLM_EXTENT;
920	einfo->ei_mode   = osc_cl_lock2ldlm(mode);
921	einfo->ei_cb_bl  = osc_ldlm_blocking_ast;
922	einfo->ei_cb_cp  = osc_ldlm_completion_ast;
923	einfo->ei_cb_gl  = osc_ldlm_glimpse_ast;
924	einfo->ei_cbdata = lock; /* value to be put into ->l_ast_data */
925}
926
927/**
928 * Determine if the lock should be converted into a lockless lock.
929 *
930 * Steps to check:
931 * - if the lock has an explicit requirement for a non-lockless lock;
932 * - if the io lock request type ci_lockreq;
933 * - send the enqueue rpc to ost to make the further decision;
934 * - special treat to truncate lockless lock
935 *
936 *  Additional policy can be implemented here, e.g., never do lockless-io
937 *  for large extents.
938 */
939static void osc_lock_to_lockless(const struct lu_env *env,
940				 struct osc_lock *ols, int force)
941{
942	struct cl_lock_slice *slice = &ols->ols_cl;
943
944	LASSERT(ols->ols_state == OLS_NEW ||
945		ols->ols_state == OLS_UPCALL_RECEIVED);
946
947	if (force) {
948		ols->ols_locklessable = 1;
949		slice->cls_ops = &osc_lock_lockless_ops;
950	} else {
951		struct osc_io *oio     = osc_env_io(env);
952		struct cl_io  *io      = oio->oi_cl.cis_io;
953		struct cl_object *obj  = slice->cls_obj;
954		struct osc_object *oob = cl2osc(obj);
955		const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
956		struct obd_connect_data *ocd;
957
958		LASSERT(io->ci_lockreq == CILR_MANDATORY ||
959			io->ci_lockreq == CILR_MAYBE ||
960			io->ci_lockreq == CILR_NEVER);
961
962		ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data;
963		ols->ols_locklessable = (io->ci_type != CIT_SETATTR) &&
964				(io->ci_lockreq == CILR_MAYBE) &&
965				(ocd->ocd_connect_flags & OBD_CONNECT_SRVLOCK);
966		if (io->ci_lockreq == CILR_NEVER ||
967			/* lockless IO */
968		    (ols->ols_locklessable && osc_object_is_contended(oob)) ||
969			/* lockless truncate */
970		    (cl_io_is_trunc(io) &&
971		     (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK) &&
972		      osd->od_lockless_truncate)) {
973			ols->ols_locklessable = 1;
974			slice->cls_ops = &osc_lock_lockless_ops;
975		}
976	}
977	LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols)));
978}
979
980static int osc_lock_compatible(const struct osc_lock *qing,
981			       const struct osc_lock *qed)
982{
983	enum cl_lock_mode qing_mode;
984	enum cl_lock_mode qed_mode;
985
986	qing_mode = qing->ols_cl.cls_lock->cll_descr.cld_mode;
987	if (qed->ols_glimpse &&
988	    (qed->ols_state >= OLS_UPCALL_RECEIVED || qing_mode == CLM_READ))
989		return 1;
990
991	qed_mode = qed->ols_cl.cls_lock->cll_descr.cld_mode;
992	return ((qing_mode == CLM_READ) && (qed_mode == CLM_READ));
993}
994
995/**
996 * Cancel all conflicting locks and wait for them to be destroyed.
997 *
998 * This function is used for two purposes:
999 *
1000 *     - early cancel all conflicting locks before starting IO, and
1001 *
1002 *     - guarantee that pages added to the page cache by lockless IO are never
1003 *       covered by locks other than lockless IO lock, and, hence, are not
1004 *       visible to other threads.
1005 */
1006static int osc_lock_enqueue_wait(const struct lu_env *env,
1007				 const struct osc_lock *olck)
1008{
1009	struct cl_lock	  *lock    = olck->ols_cl.cls_lock;
1010	struct cl_lock_descr    *descr   = &lock->cll_descr;
1011	struct cl_object_header *hdr     = cl_object_header(descr->cld_obj);
1012	struct cl_lock	  *scan;
1013	struct cl_lock	  *conflict= NULL;
1014	int lockless		     = osc_lock_is_lockless(olck);
1015	int rc			   = 0;
1016
1017	LASSERT(cl_lock_is_mutexed(lock));
1018
1019	/* make it enqueue anyway for glimpse lock, because we actually
1020	 * don't need to cancel any conflicting locks. */
1021	if (olck->ols_glimpse)
1022		return 0;
1023
1024	spin_lock(&hdr->coh_lock_guard);
1025	list_for_each_entry(scan, &hdr->coh_locks, cll_linkage) {
1026		struct cl_lock_descr *cld = &scan->cll_descr;
1027		const struct osc_lock *scan_ols;
1028
1029		if (scan == lock)
1030			break;
1031
1032		if (scan->cll_state < CLS_QUEUING ||
1033		    scan->cll_state == CLS_FREEING ||
1034		    cld->cld_start > descr->cld_end ||
1035		    cld->cld_end < descr->cld_start)
1036			continue;
1037
1038		/* overlapped and living locks. */
1039
1040		/* We're not supposed to give up group lock. */
1041		if (scan->cll_descr.cld_mode == CLM_GROUP) {
1042			LASSERT(descr->cld_mode != CLM_GROUP ||
1043				descr->cld_gid != scan->cll_descr.cld_gid);
1044			continue;
1045		}
1046
1047		scan_ols = osc_lock_at(scan);
1048
1049		/* We need to cancel the compatible locks if we're enqueuing
1050		 * a lockless lock, for example:
1051		 * imagine that client has PR lock on [0, 1000], and thread T0
1052		 * is doing lockless IO in [500, 1500] region. Concurrent
1053		 * thread T1 can see lockless data in [500, 1000], which is
1054		 * wrong, because these data are possibly stale. */
1055		if (!lockless && osc_lock_compatible(olck, scan_ols))
1056			continue;
1057
1058		cl_lock_get_trust(scan);
1059		conflict = scan;
1060		break;
1061	}
1062	spin_unlock(&hdr->coh_lock_guard);
1063
1064	if (conflict) {
1065		if (lock->cll_descr.cld_mode == CLM_GROUP) {
1066			/* we want a group lock but a previous lock request
1067			 * conflicts, we do not wait but return 0 so the
1068			 * request is send to the server
1069			 */
1070			CDEBUG(D_DLMTRACE, "group lock %p is conflicted "
1071					   "with %p, no wait, send to server\n",
1072			       lock, conflict);
1073			cl_lock_put(env, conflict);
1074			rc = 0;
1075		} else {
1076			CDEBUG(D_DLMTRACE, "lock %p is conflicted with %p, "
1077					   "will wait\n",
1078			       lock, conflict);
1079			LASSERT(lock->cll_conflict == NULL);
1080			lu_ref_add(&conflict->cll_reference, "cancel-wait",
1081				   lock);
1082			lock->cll_conflict = conflict;
1083			rc = CLO_WAIT;
1084		}
1085	}
1086	return rc;
1087}
1088
1089/**
1090 * Implementation of cl_lock_operations::clo_enqueue() method for osc
1091 * layer. This initiates ldlm enqueue:
1092 *
1093 *     - cancels conflicting locks early (osc_lock_enqueue_wait());
1094 *
1095 *     - calls osc_enqueue_base() to do actual enqueue.
1096 *
1097 * osc_enqueue_base() is supplied with an upcall function that is executed
1098 * when lock is received either after a local cached ldlm lock is matched, or
1099 * when a reply from the server is received.
1100 *
1101 * This function does not wait for the network communication to complete.
1102 */
1103static int osc_lock_enqueue(const struct lu_env *env,
1104			    const struct cl_lock_slice *slice,
1105			    struct cl_io *unused, __u32 enqflags)
1106{
1107	struct osc_lock	  *ols     = cl2osc_lock(slice);
1108	struct cl_lock	   *lock    = ols->ols_cl.cls_lock;
1109	int result;
1110
1111	LASSERT(cl_lock_is_mutexed(lock));
1112	LASSERTF(ols->ols_state == OLS_NEW,
1113		 "Impossible state: %d\n", ols->ols_state);
1114
1115	LASSERTF(ergo(ols->ols_glimpse, lock->cll_descr.cld_mode <= CLM_READ),
1116		"lock = %p, ols = %p\n", lock, ols);
1117
1118	result = osc_lock_enqueue_wait(env, ols);
1119	if (result == 0) {
1120		if (!osc_lock_is_lockless(ols)) {
1121			struct osc_object	*obj = cl2osc(slice->cls_obj);
1122			struct osc_thread_info   *info = osc_env_info(env);
1123			struct ldlm_res_id       *resname = &info->oti_resname;
1124			ldlm_policy_data_t       *policy = &info->oti_policy;
1125			struct ldlm_enqueue_info *einfo = &ols->ols_einfo;
1126
1127			/* lock will be passed as upcall cookie,
1128			 * hold ref to prevent to be released. */
1129			cl_lock_hold_add(env, lock, "upcall", lock);
1130			/* a user for lock also */
1131			cl_lock_user_add(env, lock);
1132			ols->ols_state = OLS_ENQUEUED;
1133
1134			/*
1135			 * XXX: this is possible blocking point as
1136			 * ldlm_lock_match(LDLM_FL_LVB_READY) waits for
1137			 * LDLM_CP_CALLBACK.
1138			 */
1139			ostid_build_res_name(&obj->oo_oinfo->loi_oi, resname);
1140			osc_lock_build_policy(env, lock, policy);
1141			result = osc_enqueue_base(osc_export(obj), resname,
1142					  &ols->ols_flags, policy,
1143					  &ols->ols_lvb,
1144					  obj->oo_oinfo->loi_kms_valid,
1145					  osc_lock_upcall,
1146					  ols, einfo, &ols->ols_handle,
1147					  PTLRPCD_SET, 1, ols->ols_agl);
1148			if (result != 0) {
1149				cl_lock_user_del(env, lock);
1150				cl_lock_unhold(env, lock, "upcall", lock);
1151				if (unlikely(result == -ECANCELED)) {
1152					ols->ols_state = OLS_NEW;
1153					result = 0;
1154				}
1155			}
1156		} else {
1157			ols->ols_state = OLS_GRANTED;
1158			ols->ols_owner = osc_env_io(env);
1159		}
1160	}
1161	LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols)));
1162	return result;
1163}
1164
1165static int osc_lock_wait(const struct lu_env *env,
1166			 const struct cl_lock_slice *slice)
1167{
1168	struct osc_lock *olck = cl2osc_lock(slice);
1169	struct cl_lock  *lock = olck->ols_cl.cls_lock;
1170
1171	LINVRNT(osc_lock_invariant(olck));
1172
1173	if (olck->ols_glimpse && olck->ols_state >= OLS_UPCALL_RECEIVED) {
1174		if (olck->ols_flags & LDLM_FL_LVB_READY) {
1175			return 0;
1176		} else if (olck->ols_agl) {
1177			if (lock->cll_flags & CLF_FROM_UPCALL)
1178				/* It is from enqueue RPC reply upcall for
1179				 * updating state. Do not re-enqueue. */
1180				return -ENAVAIL;
1181			else
1182				olck->ols_state = OLS_NEW;
1183		} else {
1184			LASSERT(lock->cll_error);
1185			return lock->cll_error;
1186		}
1187	}
1188
1189	if (olck->ols_state == OLS_NEW) {
1190		int rc;
1191
1192		LASSERT(olck->ols_agl);
1193		olck->ols_agl = 0;
1194		olck->ols_flags &= ~LDLM_FL_BLOCK_NOWAIT;
1195		rc = osc_lock_enqueue(env, slice, NULL, CEF_ASYNC | CEF_MUST);
1196		if (rc != 0)
1197			return rc;
1198		else
1199			return CLO_REENQUEUED;
1200	}
1201
1202	LASSERT(equi(olck->ols_state >= OLS_UPCALL_RECEIVED &&
1203		     lock->cll_error == 0, olck->ols_lock != NULL));
1204
1205	return lock->cll_error ?: olck->ols_state >= OLS_GRANTED ? 0 : CLO_WAIT;
1206}
1207
1208/**
1209 * An implementation of cl_lock_operations::clo_use() method that pins cached
1210 * lock.
1211 */
1212static int osc_lock_use(const struct lu_env *env,
1213			const struct cl_lock_slice *slice)
1214{
1215	struct osc_lock *olck = cl2osc_lock(slice);
1216	int rc;
1217
1218	LASSERT(!olck->ols_hold);
1219
1220	/*
1221	 * Atomically check for LDLM_FL_CBPENDING and addref a lock if this
1222	 * flag is not set. This protects us from a concurrent blocking ast.
1223	 */
1224	rc = ldlm_lock_addref_try(&olck->ols_handle, olck->ols_einfo.ei_mode);
1225	if (rc == 0) {
1226		olck->ols_hold = 1;
1227		olck->ols_state = OLS_GRANTED;
1228	} else {
1229		struct cl_lock *lock;
1230
1231		/*
1232		 * Lock is being cancelled somewhere within
1233		 * ldlm_handle_bl_callback(): LDLM_FL_CBPENDING is already
1234		 * set, but osc_ldlm_blocking_ast() hasn't yet acquired
1235		 * cl_lock mutex.
1236		 */
1237		lock = slice->cls_lock;
1238		LASSERT(lock->cll_state == CLS_INTRANSIT);
1239		LASSERT(lock->cll_users > 0);
1240		/* set a flag for osc_dlm_blocking_ast0() to signal the
1241		 * lock.*/
1242		olck->ols_ast_wait = 1;
1243		rc = CLO_WAIT;
1244	}
1245	return rc;
1246}
1247
1248static int osc_lock_flush(struct osc_lock *ols, int discard)
1249{
1250	struct cl_lock       *lock  = ols->ols_cl.cls_lock;
1251	struct cl_env_nest    nest;
1252	struct lu_env	*env;
1253	int result = 0;
1254
1255	env = cl_env_nested_get(&nest);
1256	if (!IS_ERR(env)) {
1257		struct osc_object    *obj   = cl2osc(ols->ols_cl.cls_obj);
1258		struct cl_lock_descr *descr = &lock->cll_descr;
1259		int rc = 0;
1260
1261		if (descr->cld_mode >= CLM_WRITE) {
1262			result = osc_cache_writeback_range(env, obj,
1263					descr->cld_start, descr->cld_end,
1264					1, discard);
1265			LDLM_DEBUG(ols->ols_lock,
1266				"lock %p: %d pages were %s.\n", lock, result,
1267				discard ? "discarded" : "written");
1268			if (result > 0)
1269				result = 0;
1270		}
1271
1272		rc = cl_lock_discard_pages(env, lock);
1273		if (result == 0 && rc < 0)
1274			result = rc;
1275
1276		cl_env_nested_put(&nest, env);
1277	} else
1278		result = PTR_ERR(env);
1279	if (result == 0) {
1280		ols->ols_flush = 1;
1281		LINVRNT(!osc_lock_has_pages(ols));
1282	}
1283	return result;
1284}
1285
1286/**
1287 * Implements cl_lock_operations::clo_cancel() method for osc layer. This is
1288 * called (as part of cl_lock_cancel()) when lock is canceled either voluntary
1289 * (LRU pressure, early cancellation, umount, etc.) or due to the conflict
1290 * with some other lock some where in the cluster. This function does the
1291 * following:
1292 *
1293 *     - invalidates all pages protected by this lock (after sending dirty
1294 *       ones to the server, as necessary);
1295 *
1296 *     - decref's underlying ldlm lock;
1297 *
1298 *     - cancels ldlm lock (ldlm_cli_cancel()).
1299 */
1300static void osc_lock_cancel(const struct lu_env *env,
1301			    const struct cl_lock_slice *slice)
1302{
1303	struct cl_lock   *lock    = slice->cls_lock;
1304	struct osc_lock  *olck    = cl2osc_lock(slice);
1305	struct ldlm_lock *dlmlock = olck->ols_lock;
1306	int	       result  = 0;
1307	int	       discard;
1308
1309	LASSERT(cl_lock_is_mutexed(lock));
1310	LINVRNT(osc_lock_invariant(olck));
1311
1312	if (dlmlock != NULL) {
1313		int do_cancel;
1314
1315		discard = !!(dlmlock->l_flags & LDLM_FL_DISCARD_DATA);
1316		if (olck->ols_state >= OLS_GRANTED)
1317			result = osc_lock_flush(olck, discard);
1318		osc_lock_unhold(olck);
1319
1320		lock_res_and_lock(dlmlock);
1321		/* Now that we're the only user of dlm read/write reference,
1322		 * mostly the ->l_readers + ->l_writers should be zero.
1323		 * However, there is a corner case.
1324		 * See bug 18829 for details.*/
1325		do_cancel = (dlmlock->l_readers == 0 &&
1326			     dlmlock->l_writers == 0);
1327		dlmlock->l_flags |= LDLM_FL_CBPENDING;
1328		unlock_res_and_lock(dlmlock);
1329		if (do_cancel)
1330			result = ldlm_cli_cancel(&olck->ols_handle, LCF_ASYNC);
1331		if (result < 0)
1332			CL_LOCK_DEBUG(D_ERROR, env, lock,
1333				      "lock %p cancel failure with error(%d)\n",
1334				      lock, result);
1335	}
1336	olck->ols_state = OLS_CANCELLED;
1337	olck->ols_flags &= ~LDLM_FL_LVB_READY;
1338	osc_lock_detach(env, olck);
1339}
1340
1341static int osc_lock_has_pages(struct osc_lock *olck)
1342{
1343	return 0;
1344}
1345
1346static void osc_lock_delete(const struct lu_env *env,
1347			    const struct cl_lock_slice *slice)
1348{
1349	struct osc_lock *olck;
1350
1351	olck = cl2osc_lock(slice);
1352	if (olck->ols_glimpse) {
1353		LASSERT(!olck->ols_hold);
1354		LASSERT(!olck->ols_lock);
1355		return;
1356	}
1357
1358	LINVRNT(osc_lock_invariant(olck));
1359	LINVRNT(!osc_lock_has_pages(olck));
1360
1361	osc_lock_unhold(olck);
1362	osc_lock_detach(env, olck);
1363}
1364
1365/**
1366 * Implements cl_lock_operations::clo_state() method for osc layer.
1367 *
1368 * Maintains osc_lock::ols_owner field.
1369 *
1370 * This assumes that lock always enters CLS_HELD (from some other state) in
1371 * the same IO context as one that requested the lock. This should not be a
1372 * problem, because context is by definition shared by all activity pertaining
1373 * to the same high-level IO.
1374 */
1375static void osc_lock_state(const struct lu_env *env,
1376			   const struct cl_lock_slice *slice,
1377			   enum cl_lock_state state)
1378{
1379	struct osc_lock *lock = cl2osc_lock(slice);
1380
1381	/*
1382	 * XXX multiple io contexts can use the lock at the same time.
1383	 */
1384	LINVRNT(osc_lock_invariant(lock));
1385	if (state == CLS_HELD && slice->cls_lock->cll_state != CLS_HELD) {
1386		struct osc_io *oio = osc_env_io(env);
1387
1388		LASSERT(lock->ols_owner == NULL);
1389		lock->ols_owner = oio;
1390	} else if (state != CLS_HELD)
1391		lock->ols_owner = NULL;
1392}
1393
1394static int osc_lock_print(const struct lu_env *env, void *cookie,
1395			  lu_printer_t p, const struct cl_lock_slice *slice)
1396{
1397	struct osc_lock *lock = cl2osc_lock(slice);
1398
1399	/*
1400	 * XXX print ldlm lock and einfo properly.
1401	 */
1402	(*p)(env, cookie, "%p %#16llx %#llx %d %p ",
1403	     lock->ols_lock, lock->ols_flags, lock->ols_handle.cookie,
1404	     lock->ols_state, lock->ols_owner);
1405	osc_lvb_print(env, cookie, p, &lock->ols_lvb);
1406	return 0;
1407}
1408
1409static int osc_lock_fits_into(const struct lu_env *env,
1410			      const struct cl_lock_slice *slice,
1411			      const struct cl_lock_descr *need,
1412			      const struct cl_io *io)
1413{
1414	struct osc_lock *ols = cl2osc_lock(slice);
1415
1416	if (need->cld_enq_flags & CEF_NEVER)
1417		return 0;
1418
1419	if (ols->ols_state >= OLS_CANCELLED)
1420		return 0;
1421
1422	if (need->cld_mode == CLM_PHANTOM) {
1423		if (ols->ols_agl)
1424			return !(ols->ols_state > OLS_RELEASED);
1425
1426		/*
1427		 * Note: the QUEUED lock can't be matched here, otherwise
1428		 * it might cause the deadlocks.
1429		 * In read_process,
1430		 * P1: enqueued read lock, create sublock1
1431		 * P2: enqueued write lock, create sublock2(conflicted
1432		 *     with sublock1).
1433		 * P1: Grant read lock.
1434		 * P1: enqueued glimpse lock(with holding sublock1_read),
1435		 *     matched with sublock2, waiting sublock2 to be granted.
1436		 *     But sublock2 can not be granted, because P1
1437		 *     will not release sublock1. Bang!
1438		 */
1439		if (ols->ols_state < OLS_GRANTED ||
1440		    ols->ols_state > OLS_RELEASED)
1441			return 0;
1442	} else if (need->cld_enq_flags & CEF_MUST) {
1443		/*
1444		 * If the lock hasn't ever enqueued, it can't be matched
1445		 * because enqueue process brings in many information
1446		 * which can be used to determine things such as lockless,
1447		 * CEF_MUST, etc.
1448		 */
1449		if (ols->ols_state < OLS_UPCALL_RECEIVED &&
1450		    ols->ols_locklessable)
1451			return 0;
1452	}
1453	return 1;
1454}
1455
1456static const struct cl_lock_operations osc_lock_ops = {
1457	.clo_fini    = osc_lock_fini,
1458	.clo_enqueue = osc_lock_enqueue,
1459	.clo_wait    = osc_lock_wait,
1460	.clo_unuse   = osc_lock_unuse,
1461	.clo_use     = osc_lock_use,
1462	.clo_delete  = osc_lock_delete,
1463	.clo_state   = osc_lock_state,
1464	.clo_cancel  = osc_lock_cancel,
1465	.clo_weigh   = osc_lock_weigh,
1466	.clo_print   = osc_lock_print,
1467	.clo_fits_into = osc_lock_fits_into,
1468};
1469
1470static int osc_lock_lockless_unuse(const struct lu_env *env,
1471				   const struct cl_lock_slice *slice)
1472{
1473	struct osc_lock *ols = cl2osc_lock(slice);
1474	struct cl_lock *lock = slice->cls_lock;
1475
1476	LASSERT(ols->ols_state == OLS_GRANTED);
1477	LINVRNT(osc_lock_invariant(ols));
1478
1479	cl_lock_cancel(env, lock);
1480	cl_lock_delete(env, lock);
1481	return 0;
1482}
1483
1484static void osc_lock_lockless_cancel(const struct lu_env *env,
1485				     const struct cl_lock_slice *slice)
1486{
1487	struct osc_lock   *ols  = cl2osc_lock(slice);
1488	int result;
1489
1490	result = osc_lock_flush(ols, 0);
1491	if (result)
1492		CERROR("Pages for lockless lock %p were not purged(%d)\n",
1493		       ols, result);
1494	ols->ols_state = OLS_CANCELLED;
1495}
1496
1497static int osc_lock_lockless_wait(const struct lu_env *env,
1498				  const struct cl_lock_slice *slice)
1499{
1500	struct osc_lock *olck = cl2osc_lock(slice);
1501	struct cl_lock  *lock = olck->ols_cl.cls_lock;
1502
1503	LINVRNT(osc_lock_invariant(olck));
1504	LASSERT(olck->ols_state >= OLS_UPCALL_RECEIVED);
1505
1506	return lock->cll_error;
1507}
1508
1509static void osc_lock_lockless_state(const struct lu_env *env,
1510				    const struct cl_lock_slice *slice,
1511				    enum cl_lock_state state)
1512{
1513	struct osc_lock *lock = cl2osc_lock(slice);
1514
1515	LINVRNT(osc_lock_invariant(lock));
1516	if (state == CLS_HELD) {
1517		struct osc_io *oio  = osc_env_io(env);
1518
1519		LASSERT(ergo(lock->ols_owner, lock->ols_owner == oio));
1520		lock->ols_owner = oio;
1521
1522		/* set the io to be lockless if this lock is for io's
1523		 * host object */
1524		if (cl_object_same(oio->oi_cl.cis_obj, slice->cls_obj))
1525			oio->oi_lockless = 1;
1526	}
1527}
1528
1529static int osc_lock_lockless_fits_into(const struct lu_env *env,
1530				       const struct cl_lock_slice *slice,
1531				       const struct cl_lock_descr *need,
1532				       const struct cl_io *io)
1533{
1534	struct osc_lock *lock = cl2osc_lock(slice);
1535
1536	if (!(need->cld_enq_flags & CEF_NEVER))
1537		return 0;
1538
1539	/* lockless lock should only be used by its owning io. b22147 */
1540	return (lock->ols_owner == osc_env_io(env));
1541}
1542
1543static const struct cl_lock_operations osc_lock_lockless_ops = {
1544	.clo_fini      = osc_lock_fini,
1545	.clo_enqueue   = osc_lock_enqueue,
1546	.clo_wait      = osc_lock_lockless_wait,
1547	.clo_unuse     = osc_lock_lockless_unuse,
1548	.clo_state     = osc_lock_lockless_state,
1549	.clo_fits_into = osc_lock_lockless_fits_into,
1550	.clo_cancel    = osc_lock_lockless_cancel,
1551	.clo_print     = osc_lock_print
1552};
1553
1554int osc_lock_init(const struct lu_env *env,
1555		  struct cl_object *obj, struct cl_lock *lock,
1556		  const struct cl_io *unused)
1557{
1558	struct osc_lock *clk;
1559	int result;
1560
1561	OBD_SLAB_ALLOC_PTR_GFP(clk, osc_lock_kmem, GFP_NOFS);
1562	if (clk != NULL) {
1563		__u32 enqflags = lock->cll_descr.cld_enq_flags;
1564
1565		osc_lock_build_einfo(env, lock, clk, &clk->ols_einfo);
1566		atomic_set(&clk->ols_pageref, 0);
1567		clk->ols_state = OLS_NEW;
1568
1569		clk->ols_flags = osc_enq2ldlm_flags(enqflags);
1570		clk->ols_agl = !!(enqflags & CEF_AGL);
1571		if (clk->ols_agl)
1572			clk->ols_flags |= LDLM_FL_BLOCK_NOWAIT;
1573		if (clk->ols_flags & LDLM_FL_HAS_INTENT)
1574			clk->ols_glimpse = 1;
1575
1576		cl_lock_slice_add(lock, &clk->ols_cl, obj, &osc_lock_ops);
1577
1578		if (!(enqflags & CEF_MUST))
1579			/* try to convert this lock to a lockless lock */
1580			osc_lock_to_lockless(env, clk, (enqflags & CEF_NEVER));
1581		if (clk->ols_locklessable && !(enqflags & CEF_DISCARD_DATA))
1582			clk->ols_flags |= LDLM_FL_DENY_ON_CONTENTION;
1583
1584		LDLM_DEBUG_NOLOCK("lock %p, osc lock %p, flags %llx\n",
1585				lock, clk, clk->ols_flags);
1586
1587		result = 0;
1588	} else
1589		result = -ENOMEM;
1590	return result;
1591}
1592
1593int osc_dlm_lock_pageref(struct ldlm_lock *dlm)
1594{
1595	struct osc_lock *olock;
1596	int	      rc = 0;
1597
1598	spin_lock(&osc_ast_guard);
1599	olock = dlm->l_ast_data;
1600	/*
1601	 * there's a very rare race with osc_page_addref_lock(), but that
1602	 * doesn't matter because in the worst case we don't cancel a lock
1603	 * which we actually can, that's no harm.
1604	 */
1605	if (olock != NULL &&
1606	    atomic_add_return(_PAGEREF_MAGIC,
1607				  &olock->ols_pageref) != _PAGEREF_MAGIC) {
1608		atomic_sub(_PAGEREF_MAGIC, &olock->ols_pageref);
1609		rc = 1;
1610	}
1611	spin_unlock(&osc_ast_guard);
1612	return rc;
1613}
1614
1615/** @} osc */
1616