[go: nahoru, domu]

1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 *
36 * lustre/obdclass/genops.c
37 *
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
40 */
41
42#define DEBUG_SUBSYSTEM S_CLASS
43#include "../include/obd_class.h"
44#include "../include/lprocfs_status.h"
45
46extern struct list_head obd_types;
47spinlock_t obd_types_lock;
48
49struct kmem_cache *obd_device_cachep;
50struct kmem_cache *obdo_cachep;
51EXPORT_SYMBOL(obdo_cachep);
52struct kmem_cache *import_cachep;
53
54struct list_head      obd_zombie_imports;
55struct list_head      obd_zombie_exports;
56spinlock_t  obd_zombie_impexp_lock;
57static void obd_zombie_impexp_notify(void);
58static void obd_zombie_export_add(struct obd_export *exp);
59static void obd_zombie_import_add(struct obd_import *imp);
60static void print_export_data(struct obd_export *exp,
61			      const char *status, int locks);
62
63int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
64EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
65
66/*
67 * support functions: we could use inter-module communication, but this
68 * is more portable to other OS's
69 */
70static struct obd_device *obd_device_alloc(void)
71{
72	struct obd_device *obd;
73
74	OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
75	if (obd != NULL) {
76		obd->obd_magic = OBD_DEVICE_MAGIC;
77	}
78	return obd;
79}
80
81static void obd_device_free(struct obd_device *obd)
82{
83	LASSERT(obd != NULL);
84	LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
85		 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
86	if (obd->obd_namespace != NULL) {
87		CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
88		       obd, obd->obd_namespace, obd->obd_force);
89		LBUG();
90	}
91	lu_ref_fini(&obd->obd_reference);
92	OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
93}
94
95struct obd_type *class_search_type(const char *name)
96{
97	struct list_head *tmp;
98	struct obd_type *type;
99
100	spin_lock(&obd_types_lock);
101	list_for_each(tmp, &obd_types) {
102		type = list_entry(tmp, struct obd_type, typ_chain);
103		if (strcmp(type->typ_name, name) == 0) {
104			spin_unlock(&obd_types_lock);
105			return type;
106		}
107	}
108	spin_unlock(&obd_types_lock);
109	return NULL;
110}
111EXPORT_SYMBOL(class_search_type);
112
113struct obd_type *class_get_type(const char *name)
114{
115	struct obd_type *type = class_search_type(name);
116
117	if (!type) {
118		const char *modname = name;
119
120		if (strcmp(modname, "obdfilter") == 0)
121			modname = "ofd";
122
123		if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
124			modname = LUSTRE_OSP_NAME;
125
126		if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
127			modname = LUSTRE_MDT_NAME;
128
129		if (!request_module("%s", modname)) {
130			CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131			type = class_search_type(name);
132		} else {
133			LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134					   modname);
135		}
136	}
137	if (type) {
138		spin_lock(&type->obd_type_lock);
139		type->typ_refcnt++;
140		try_module_get(type->typ_dt_ops->o_owner);
141		spin_unlock(&type->obd_type_lock);
142	}
143	return type;
144}
145EXPORT_SYMBOL(class_get_type);
146
147void class_put_type(struct obd_type *type)
148{
149	LASSERT(type);
150	spin_lock(&type->obd_type_lock);
151	type->typ_refcnt--;
152	module_put(type->typ_dt_ops->o_owner);
153	spin_unlock(&type->obd_type_lock);
154}
155EXPORT_SYMBOL(class_put_type);
156
157#define CLASS_MAX_NAME 1024
158
159int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
160			struct lprocfs_vars *vars, const char *name,
161			struct lu_device_type *ldt)
162{
163	struct obd_type *type;
164	int rc = 0;
165
166	/* sanity check */
167	LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
168
169	if (class_search_type(name)) {
170		CDEBUG(D_IOCTL, "Type %s already registered\n", name);
171		return -EEXIST;
172	}
173
174	rc = -ENOMEM;
175	OBD_ALLOC(type, sizeof(*type));
176	if (type == NULL)
177		return rc;
178
179	OBD_ALLOC_PTR(type->typ_dt_ops);
180	OBD_ALLOC_PTR(type->typ_md_ops);
181	OBD_ALLOC(type->typ_name, strlen(name) + 1);
182
183	if (type->typ_dt_ops == NULL ||
184	    type->typ_md_ops == NULL ||
185	    type->typ_name == NULL)
186		goto failed;
187
188	*(type->typ_dt_ops) = *dt_ops;
189	/* md_ops is optional */
190	if (md_ops)
191		*(type->typ_md_ops) = *md_ops;
192	strcpy(type->typ_name, name);
193	spin_lock_init(&type->obd_type_lock);
194
195	type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
196					      vars, type);
197	if (IS_ERR(type->typ_procroot)) {
198		rc = PTR_ERR(type->typ_procroot);
199		type->typ_procroot = NULL;
200		goto failed;
201	}
202
203	if (ldt != NULL) {
204		type->typ_lu = ldt;
205		rc = lu_device_type_init(ldt);
206		if (rc != 0)
207			goto failed;
208	}
209
210	spin_lock(&obd_types_lock);
211	list_add(&type->typ_chain, &obd_types);
212	spin_unlock(&obd_types_lock);
213
214	return 0;
215
216 failed:
217	if (type->typ_name != NULL)
218		OBD_FREE(type->typ_name, strlen(name) + 1);
219	if (type->typ_md_ops != NULL)
220		OBD_FREE_PTR(type->typ_md_ops);
221	if (type->typ_dt_ops != NULL)
222		OBD_FREE_PTR(type->typ_dt_ops);
223	OBD_FREE(type, sizeof(*type));
224	return rc;
225}
226EXPORT_SYMBOL(class_register_type);
227
228int class_unregister_type(const char *name)
229{
230	struct obd_type *type = class_search_type(name);
231
232	if (!type) {
233		CERROR("unknown obd type\n");
234		return -EINVAL;
235	}
236
237	if (type->typ_refcnt) {
238		CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
239		/* This is a bad situation, let's make the best of it */
240		/* Remove ops, but leave the name for debugging */
241		OBD_FREE_PTR(type->typ_dt_ops);
242		OBD_FREE_PTR(type->typ_md_ops);
243		return -EBUSY;
244	}
245
246	if (type->typ_procroot) {
247		lprocfs_remove(&type->typ_procroot);
248	}
249
250	if (type->typ_lu)
251		lu_device_type_fini(type->typ_lu);
252
253	spin_lock(&obd_types_lock);
254	list_del(&type->typ_chain);
255	spin_unlock(&obd_types_lock);
256	OBD_FREE(type->typ_name, strlen(name) + 1);
257	if (type->typ_dt_ops != NULL)
258		OBD_FREE_PTR(type->typ_dt_ops);
259	if (type->typ_md_ops != NULL)
260		OBD_FREE_PTR(type->typ_md_ops);
261	OBD_FREE(type, sizeof(*type));
262	return 0;
263} /* class_unregister_type */
264EXPORT_SYMBOL(class_unregister_type);
265
266/**
267 * Create a new obd device.
268 *
269 * Find an empty slot in ::obd_devs[], create a new obd device in it.
270 *
271 * \param[in] type_name obd device type string.
272 * \param[in] name      obd device name.
273 *
274 * \retval NULL if create fails, otherwise return the obd device
275 *	 pointer created.
276 */
277struct obd_device *class_newdev(const char *type_name, const char *name)
278{
279	struct obd_device *result = NULL;
280	struct obd_device *newdev;
281	struct obd_type *type = NULL;
282	int i;
283	int new_obd_minor = 0;
284
285	if (strlen(name) >= MAX_OBD_NAME) {
286		CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
287		return ERR_PTR(-EINVAL);
288	}
289
290	type = class_get_type(type_name);
291	if (type == NULL){
292		CERROR("OBD: unknown type: %s\n", type_name);
293		return ERR_PTR(-ENODEV);
294	}
295
296	newdev = obd_device_alloc();
297	if (newdev == NULL) {
298		result = ERR_PTR(-ENOMEM);
299		goto out_type;
300	}
301
302	LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
303
304	write_lock(&obd_dev_lock);
305	for (i = 0; i < class_devno_max(); i++) {
306		struct obd_device *obd = class_num2obd(i);
307
308		if (obd && (strcmp(name, obd->obd_name) == 0)) {
309			CERROR("Device %s already exists at %d, won't add\n",
310			       name, i);
311			if (result) {
312				LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
313					 "%p obd_magic %08x != %08x\n", result,
314					 result->obd_magic, OBD_DEVICE_MAGIC);
315				LASSERTF(result->obd_minor == new_obd_minor,
316					 "%p obd_minor %d != %d\n", result,
317					 result->obd_minor, new_obd_minor);
318
319				obd_devs[result->obd_minor] = NULL;
320				result->obd_name[0]='\0';
321			 }
322			result = ERR_PTR(-EEXIST);
323			break;
324		}
325		if (!result && !obd) {
326			result = newdev;
327			result->obd_minor = i;
328			new_obd_minor = i;
329			result->obd_type = type;
330			strncpy(result->obd_name, name,
331				sizeof(result->obd_name) - 1);
332			obd_devs[i] = result;
333		}
334	}
335	write_unlock(&obd_dev_lock);
336
337	if (result == NULL && i >= class_devno_max()) {
338		CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
339		       class_devno_max());
340		result = ERR_PTR(-EOVERFLOW);
341		goto out;
342	}
343
344	if (IS_ERR(result))
345		goto out;
346
347	CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
348	       result->obd_name, result);
349
350	return result;
351out:
352	obd_device_free(newdev);
353out_type:
354	class_put_type(type);
355	return result;
356}
357
358void class_release_dev(struct obd_device *obd)
359{
360	struct obd_type *obd_type = obd->obd_type;
361
362	LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
363		 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
364	LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
365		 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
366	LASSERT(obd_type != NULL);
367
368	CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
369	       obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
370
371	write_lock(&obd_dev_lock);
372	obd_devs[obd->obd_minor] = NULL;
373	write_unlock(&obd_dev_lock);
374	obd_device_free(obd);
375
376	class_put_type(obd_type);
377}
378
379int class_name2dev(const char *name)
380{
381	int i;
382
383	if (!name)
384		return -1;
385
386	read_lock(&obd_dev_lock);
387	for (i = 0; i < class_devno_max(); i++) {
388		struct obd_device *obd = class_num2obd(i);
389
390		if (obd && strcmp(name, obd->obd_name) == 0) {
391			/* Make sure we finished attaching before we give
392			   out any references */
393			LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
394			if (obd->obd_attached) {
395				read_unlock(&obd_dev_lock);
396				return i;
397			}
398			break;
399		}
400	}
401	read_unlock(&obd_dev_lock);
402
403	return -1;
404}
405EXPORT_SYMBOL(class_name2dev);
406
407struct obd_device *class_name2obd(const char *name)
408{
409	int dev = class_name2dev(name);
410
411	if (dev < 0 || dev > class_devno_max())
412		return NULL;
413	return class_num2obd(dev);
414}
415EXPORT_SYMBOL(class_name2obd);
416
417int class_uuid2dev(struct obd_uuid *uuid)
418{
419	int i;
420
421	read_lock(&obd_dev_lock);
422	for (i = 0; i < class_devno_max(); i++) {
423		struct obd_device *obd = class_num2obd(i);
424
425		if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
426			LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
427			read_unlock(&obd_dev_lock);
428			return i;
429		}
430	}
431	read_unlock(&obd_dev_lock);
432
433	return -1;
434}
435EXPORT_SYMBOL(class_uuid2dev);
436
437struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
438{
439	int dev = class_uuid2dev(uuid);
440	if (dev < 0)
441		return NULL;
442	return class_num2obd(dev);
443}
444EXPORT_SYMBOL(class_uuid2obd);
445
446/**
447 * Get obd device from ::obd_devs[]
448 *
449 * \param num [in] array index
450 *
451 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
452 *	 otherwise return the obd device there.
453 */
454struct obd_device *class_num2obd(int num)
455{
456	struct obd_device *obd = NULL;
457
458	if (num < class_devno_max()) {
459		obd = obd_devs[num];
460		if (obd == NULL)
461			return NULL;
462
463		LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
464			 "%p obd_magic %08x != %08x\n",
465			 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
466		LASSERTF(obd->obd_minor == num,
467			 "%p obd_minor %0d != %0d\n",
468			 obd, obd->obd_minor, num);
469	}
470
471	return obd;
472}
473EXPORT_SYMBOL(class_num2obd);
474
475/**
476 * Get obd devices count. Device in any
477 *    state are counted
478 * \retval obd device count
479 */
480int get_devices_count(void)
481{
482	int index, max_index = class_devno_max(), dev_count = 0;
483
484	read_lock(&obd_dev_lock);
485	for (index = 0; index <= max_index; index++) {
486		struct obd_device *obd = class_num2obd(index);
487		if (obd != NULL)
488			dev_count++;
489	}
490	read_unlock(&obd_dev_lock);
491
492	return dev_count;
493}
494EXPORT_SYMBOL(get_devices_count);
495
496void class_obd_list(void)
497{
498	char *status;
499	int i;
500
501	read_lock(&obd_dev_lock);
502	for (i = 0; i < class_devno_max(); i++) {
503		struct obd_device *obd = class_num2obd(i);
504
505		if (obd == NULL)
506			continue;
507		if (obd->obd_stopping)
508			status = "ST";
509		else if (obd->obd_set_up)
510			status = "UP";
511		else if (obd->obd_attached)
512			status = "AT";
513		else
514			status = "--";
515		LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
516			 i, status, obd->obd_type->typ_name,
517			 obd->obd_name, obd->obd_uuid.uuid,
518			 atomic_read(&obd->obd_refcount));
519	}
520	read_unlock(&obd_dev_lock);
521	return;
522}
523
524/* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
525   specified, then only the client with that uuid is returned,
526   otherwise any client connected to the tgt is returned. */
527struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
528					  const char * typ_name,
529					  struct obd_uuid *grp_uuid)
530{
531	int i;
532
533	read_lock(&obd_dev_lock);
534	for (i = 0; i < class_devno_max(); i++) {
535		struct obd_device *obd = class_num2obd(i);
536
537		if (obd == NULL)
538			continue;
539		if ((strncmp(obd->obd_type->typ_name, typ_name,
540			     strlen(typ_name)) == 0)) {
541			if (obd_uuid_equals(tgt_uuid,
542					    &obd->u.cli.cl_target_uuid) &&
543			    ((grp_uuid)? obd_uuid_equals(grp_uuid,
544							 &obd->obd_uuid) : 1)) {
545				read_unlock(&obd_dev_lock);
546				return obd;
547			}
548		}
549	}
550	read_unlock(&obd_dev_lock);
551
552	return NULL;
553}
554EXPORT_SYMBOL(class_find_client_obd);
555
556/* Iterate the obd_device list looking devices have grp_uuid. Start
557   searching at *next, and if a device is found, the next index to look
558   at is saved in *next. If next is NULL, then the first matching device
559   will always be returned. */
560struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
561{
562	int i;
563
564	if (next == NULL)
565		i = 0;
566	else if (*next >= 0 && *next < class_devno_max())
567		i = *next;
568	else
569		return NULL;
570
571	read_lock(&obd_dev_lock);
572	for (; i < class_devno_max(); i++) {
573		struct obd_device *obd = class_num2obd(i);
574
575		if (obd == NULL)
576			continue;
577		if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
578			if (next != NULL)
579				*next = i+1;
580			read_unlock(&obd_dev_lock);
581			return obd;
582		}
583	}
584	read_unlock(&obd_dev_lock);
585
586	return NULL;
587}
588EXPORT_SYMBOL(class_devices_in_group);
589
590/**
591 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
592 * adjust sptlrpc settings accordingly.
593 */
594int class_notify_sptlrpc_conf(const char *fsname, int namelen)
595{
596	struct obd_device  *obd;
597	const char	 *type;
598	int		 i, rc = 0, rc2;
599
600	LASSERT(namelen > 0);
601
602	read_lock(&obd_dev_lock);
603	for (i = 0; i < class_devno_max(); i++) {
604		obd = class_num2obd(i);
605
606		if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
607			continue;
608
609		/* only notify mdc, osc, mdt, ost */
610		type = obd->obd_type->typ_name;
611		if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
612		    strcmp(type, LUSTRE_OSC_NAME) != 0 &&
613		    strcmp(type, LUSTRE_MDT_NAME) != 0 &&
614		    strcmp(type, LUSTRE_OST_NAME) != 0)
615			continue;
616
617		if (strncmp(obd->obd_name, fsname, namelen))
618			continue;
619
620		class_incref(obd, __func__, obd);
621		read_unlock(&obd_dev_lock);
622		rc2 = obd_set_info_async(NULL, obd->obd_self_export,
623					 sizeof(KEY_SPTLRPC_CONF),
624					 KEY_SPTLRPC_CONF, 0, NULL, NULL);
625		rc = rc ? rc : rc2;
626		class_decref(obd, __func__, obd);
627		read_lock(&obd_dev_lock);
628	}
629	read_unlock(&obd_dev_lock);
630	return rc;
631}
632EXPORT_SYMBOL(class_notify_sptlrpc_conf);
633
634void obd_cleanup_caches(void)
635{
636	if (obd_device_cachep) {
637		kmem_cache_destroy(obd_device_cachep);
638		obd_device_cachep = NULL;
639	}
640	if (obdo_cachep) {
641		kmem_cache_destroy(obdo_cachep);
642		obdo_cachep = NULL;
643	}
644	if (import_cachep) {
645		kmem_cache_destroy(import_cachep);
646		import_cachep = NULL;
647	}
648	if (capa_cachep) {
649		kmem_cache_destroy(capa_cachep);
650		capa_cachep = NULL;
651	}
652}
653
654int obd_init_caches(void)
655{
656	LASSERT(obd_device_cachep == NULL);
657	obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
658						 sizeof(struct obd_device),
659						 0, 0, NULL);
660	if (!obd_device_cachep)
661		goto out;
662
663	LASSERT(obdo_cachep == NULL);
664	obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
665					   0, 0, NULL);
666	if (!obdo_cachep)
667		goto out;
668
669	LASSERT(import_cachep == NULL);
670	import_cachep = kmem_cache_create("ll_import_cache",
671					     sizeof(struct obd_import),
672					     0, 0, NULL);
673	if (!import_cachep)
674		goto out;
675
676	LASSERT(capa_cachep == NULL);
677	capa_cachep = kmem_cache_create("capa_cache",
678					   sizeof(struct obd_capa), 0, 0, NULL);
679	if (!capa_cachep)
680		goto out;
681
682	return 0;
683 out:
684	obd_cleanup_caches();
685	return -ENOMEM;
686
687}
688
689/* map connection to client */
690struct obd_export *class_conn2export(struct lustre_handle *conn)
691{
692	struct obd_export *export;
693
694	if (!conn) {
695		CDEBUG(D_CACHE, "looking for null handle\n");
696		return NULL;
697	}
698
699	if (conn->cookie == -1) {  /* this means assign a new connection */
700		CDEBUG(D_CACHE, "want a new connection\n");
701		return NULL;
702	}
703
704	CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
705	export = class_handle2object(conn->cookie);
706	return export;
707}
708EXPORT_SYMBOL(class_conn2export);
709
710struct obd_device *class_exp2obd(struct obd_export *exp)
711{
712	if (exp)
713		return exp->exp_obd;
714	return NULL;
715}
716EXPORT_SYMBOL(class_exp2obd);
717
718struct obd_device *class_conn2obd(struct lustre_handle *conn)
719{
720	struct obd_export *export;
721	export = class_conn2export(conn);
722	if (export) {
723		struct obd_device *obd = export->exp_obd;
724		class_export_put(export);
725		return obd;
726	}
727	return NULL;
728}
729EXPORT_SYMBOL(class_conn2obd);
730
731struct obd_import *class_exp2cliimp(struct obd_export *exp)
732{
733	struct obd_device *obd = exp->exp_obd;
734	if (obd == NULL)
735		return NULL;
736	return obd->u.cli.cl_import;
737}
738EXPORT_SYMBOL(class_exp2cliimp);
739
740struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
741{
742	struct obd_device *obd = class_conn2obd(conn);
743	if (obd == NULL)
744		return NULL;
745	return obd->u.cli.cl_import;
746}
747EXPORT_SYMBOL(class_conn2cliimp);
748
749/* Export management functions */
750static void class_export_destroy(struct obd_export *exp)
751{
752	struct obd_device *obd = exp->exp_obd;
753
754	LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
755	LASSERT(obd != NULL);
756
757	CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
758	       exp->exp_client_uuid.uuid, obd->obd_name);
759
760	/* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
761	if (exp->exp_connection)
762		ptlrpc_put_connection_superhack(exp->exp_connection);
763
764	LASSERT(list_empty(&exp->exp_outstanding_replies));
765	LASSERT(list_empty(&exp->exp_uncommitted_replies));
766	LASSERT(list_empty(&exp->exp_req_replay_queue));
767	LASSERT(list_empty(&exp->exp_hp_rpcs));
768	obd_destroy_export(exp);
769	class_decref(obd, "export", exp);
770
771	OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
772}
773
774static void export_handle_addref(void *export)
775{
776	class_export_get(export);
777}
778
779static struct portals_handle_ops export_handle_ops = {
780	.hop_addref = export_handle_addref,
781	.hop_free   = NULL,
782};
783
784struct obd_export *class_export_get(struct obd_export *exp)
785{
786	atomic_inc(&exp->exp_refcount);
787	CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
788	       atomic_read(&exp->exp_refcount));
789	return exp;
790}
791EXPORT_SYMBOL(class_export_get);
792
793void class_export_put(struct obd_export *exp)
794{
795	LASSERT(exp != NULL);
796	LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
797	CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
798	       atomic_read(&exp->exp_refcount) - 1);
799
800	if (atomic_dec_and_test(&exp->exp_refcount)) {
801		LASSERT(!list_empty(&exp->exp_obd_chain));
802		CDEBUG(D_IOCTL, "final put %p/%s\n",
803		       exp, exp->exp_client_uuid.uuid);
804
805		/* release nid stat refererence */
806		lprocfs_exp_cleanup(exp);
807
808		obd_zombie_export_add(exp);
809	}
810}
811EXPORT_SYMBOL(class_export_put);
812
813/* Creates a new export, adds it to the hash table, and returns a
814 * pointer to it. The refcount is 2: one for the hash reference, and
815 * one for the pointer returned by this function. */
816struct obd_export *class_new_export(struct obd_device *obd,
817				    struct obd_uuid *cluuid)
818{
819	struct obd_export *export;
820	struct cfs_hash *hash = NULL;
821	int rc = 0;
822
823	OBD_ALLOC_PTR(export);
824	if (!export)
825		return ERR_PTR(-ENOMEM);
826
827	export->exp_conn_cnt = 0;
828	export->exp_lock_hash = NULL;
829	export->exp_flock_hash = NULL;
830	atomic_set(&export->exp_refcount, 2);
831	atomic_set(&export->exp_rpc_count, 0);
832	atomic_set(&export->exp_cb_count, 0);
833	atomic_set(&export->exp_locks_count, 0);
834#if LUSTRE_TRACKS_LOCK_EXP_REFS
835	INIT_LIST_HEAD(&export->exp_locks_list);
836	spin_lock_init(&export->exp_locks_list_guard);
837#endif
838	atomic_set(&export->exp_replay_count, 0);
839	export->exp_obd = obd;
840	INIT_LIST_HEAD(&export->exp_outstanding_replies);
841	spin_lock_init(&export->exp_uncommitted_replies_lock);
842	INIT_LIST_HEAD(&export->exp_uncommitted_replies);
843	INIT_LIST_HEAD(&export->exp_req_replay_queue);
844	INIT_LIST_HEAD(&export->exp_handle.h_link);
845	INIT_LIST_HEAD(&export->exp_hp_rpcs);
846	class_handle_hash(&export->exp_handle, &export_handle_ops);
847	export->exp_last_request_time = get_seconds();
848	spin_lock_init(&export->exp_lock);
849	spin_lock_init(&export->exp_rpc_lock);
850	INIT_HLIST_NODE(&export->exp_uuid_hash);
851	INIT_HLIST_NODE(&export->exp_nid_hash);
852	spin_lock_init(&export->exp_bl_list_lock);
853	INIT_LIST_HEAD(&export->exp_bl_list);
854
855	export->exp_sp_peer = LUSTRE_SP_ANY;
856	export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
857	export->exp_client_uuid = *cluuid;
858	obd_init_export(export);
859
860	spin_lock(&obd->obd_dev_lock);
861	/* shouldn't happen, but might race */
862	if (obd->obd_stopping) {
863		rc = -ENODEV;
864		goto exit_unlock;
865	}
866
867	hash = cfs_hash_getref(obd->obd_uuid_hash);
868	if (hash == NULL) {
869		rc = -ENODEV;
870		goto exit_unlock;
871	}
872	spin_unlock(&obd->obd_dev_lock);
873
874	if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
875		rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
876		if (rc != 0) {
877			LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
878				      obd->obd_name, cluuid->uuid, rc);
879			rc = -EALREADY;
880			goto exit_err;
881		}
882	}
883
884	spin_lock(&obd->obd_dev_lock);
885	if (obd->obd_stopping) {
886		cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
887		rc = -ENODEV;
888		goto exit_unlock;
889	}
890
891	class_incref(obd, "export", export);
892	list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
893	list_add_tail(&export->exp_obd_chain_timed,
894			  &export->exp_obd->obd_exports_timed);
895	export->exp_obd->obd_num_exports++;
896	spin_unlock(&obd->obd_dev_lock);
897	cfs_hash_putref(hash);
898	return export;
899
900exit_unlock:
901	spin_unlock(&obd->obd_dev_lock);
902exit_err:
903	if (hash)
904		cfs_hash_putref(hash);
905	class_handle_unhash(&export->exp_handle);
906	LASSERT(hlist_unhashed(&export->exp_uuid_hash));
907	obd_destroy_export(export);
908	OBD_FREE_PTR(export);
909	return ERR_PTR(rc);
910}
911EXPORT_SYMBOL(class_new_export);
912
913void class_unlink_export(struct obd_export *exp)
914{
915	class_handle_unhash(&exp->exp_handle);
916
917	spin_lock(&exp->exp_obd->obd_dev_lock);
918	/* delete an uuid-export hashitem from hashtables */
919	if (!hlist_unhashed(&exp->exp_uuid_hash))
920		cfs_hash_del(exp->exp_obd->obd_uuid_hash,
921			     &exp->exp_client_uuid,
922			     &exp->exp_uuid_hash);
923
924	list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
925	list_del_init(&exp->exp_obd_chain_timed);
926	exp->exp_obd->obd_num_exports--;
927	spin_unlock(&exp->exp_obd->obd_dev_lock);
928	class_export_put(exp);
929}
930EXPORT_SYMBOL(class_unlink_export);
931
932/* Import management functions */
933void class_import_destroy(struct obd_import *imp)
934{
935	CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
936		imp->imp_obd->obd_name);
937
938	LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
939
940	ptlrpc_put_connection_superhack(imp->imp_connection);
941
942	while (!list_empty(&imp->imp_conn_list)) {
943		struct obd_import_conn *imp_conn;
944
945		imp_conn = list_entry(imp->imp_conn_list.next,
946					  struct obd_import_conn, oic_item);
947		list_del_init(&imp_conn->oic_item);
948		ptlrpc_put_connection_superhack(imp_conn->oic_conn);
949		OBD_FREE(imp_conn, sizeof(*imp_conn));
950	}
951
952	LASSERT(imp->imp_sec == NULL);
953	class_decref(imp->imp_obd, "import", imp);
954	OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
955}
956
957static void import_handle_addref(void *import)
958{
959	class_import_get(import);
960}
961
962static struct portals_handle_ops import_handle_ops = {
963	.hop_addref = import_handle_addref,
964	.hop_free   = NULL,
965};
966
967struct obd_import *class_import_get(struct obd_import *import)
968{
969	atomic_inc(&import->imp_refcount);
970	CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
971	       atomic_read(&import->imp_refcount),
972	       import->imp_obd->obd_name);
973	return import;
974}
975EXPORT_SYMBOL(class_import_get);
976
977void class_import_put(struct obd_import *imp)
978{
979	LASSERT(list_empty(&imp->imp_zombie_chain));
980	LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
981
982	CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
983	       atomic_read(&imp->imp_refcount) - 1,
984	       imp->imp_obd->obd_name);
985
986	if (atomic_dec_and_test(&imp->imp_refcount)) {
987		CDEBUG(D_INFO, "final put import %p\n", imp);
988		obd_zombie_import_add(imp);
989	}
990
991	/* catch possible import put race */
992	LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
993}
994EXPORT_SYMBOL(class_import_put);
995
996static void init_imp_at(struct imp_at *at) {
997	int i;
998	at_init(&at->iat_net_latency, 0, 0);
999	for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1000		/* max service estimates are tracked on the server side, so
1001		   don't use the AT history here, just use the last reported
1002		   val. (But keep hist for proc histogram, worst_ever) */
1003		at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1004			AT_FLG_NOHIST);
1005	}
1006}
1007
1008struct obd_import *class_new_import(struct obd_device *obd)
1009{
1010	struct obd_import *imp;
1011
1012	OBD_ALLOC(imp, sizeof(*imp));
1013	if (imp == NULL)
1014		return NULL;
1015
1016	INIT_LIST_HEAD(&imp->imp_pinger_chain);
1017	INIT_LIST_HEAD(&imp->imp_zombie_chain);
1018	INIT_LIST_HEAD(&imp->imp_replay_list);
1019	INIT_LIST_HEAD(&imp->imp_sending_list);
1020	INIT_LIST_HEAD(&imp->imp_delayed_list);
1021	INIT_LIST_HEAD(&imp->imp_committed_list);
1022	imp->imp_replay_cursor = &imp->imp_committed_list;
1023	spin_lock_init(&imp->imp_lock);
1024	imp->imp_last_success_conn = 0;
1025	imp->imp_state = LUSTRE_IMP_NEW;
1026	imp->imp_obd = class_incref(obd, "import", imp);
1027	mutex_init(&imp->imp_sec_mutex);
1028	init_waitqueue_head(&imp->imp_recovery_waitq);
1029
1030	atomic_set(&imp->imp_refcount, 2);
1031	atomic_set(&imp->imp_unregistering, 0);
1032	atomic_set(&imp->imp_inflight, 0);
1033	atomic_set(&imp->imp_replay_inflight, 0);
1034	atomic_set(&imp->imp_inval_count, 0);
1035	INIT_LIST_HEAD(&imp->imp_conn_list);
1036	INIT_LIST_HEAD(&imp->imp_handle.h_link);
1037	class_handle_hash(&imp->imp_handle, &import_handle_ops);
1038	init_imp_at(&imp->imp_at);
1039
1040	/* the default magic is V2, will be used in connect RPC, and
1041	 * then adjusted according to the flags in request/reply. */
1042	imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1043
1044	return imp;
1045}
1046EXPORT_SYMBOL(class_new_import);
1047
1048void class_destroy_import(struct obd_import *import)
1049{
1050	LASSERT(import != NULL);
1051	LASSERT(import != LP_POISON);
1052
1053	class_handle_unhash(&import->imp_handle);
1054
1055	spin_lock(&import->imp_lock);
1056	import->imp_generation++;
1057	spin_unlock(&import->imp_lock);
1058	class_import_put(import);
1059}
1060EXPORT_SYMBOL(class_destroy_import);
1061
1062#if LUSTRE_TRACKS_LOCK_EXP_REFS
1063
1064void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1065{
1066	spin_lock(&exp->exp_locks_list_guard);
1067
1068	LASSERT(lock->l_exp_refs_nr >= 0);
1069
1070	if (lock->l_exp_refs_target != NULL &&
1071	    lock->l_exp_refs_target != exp) {
1072		LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1073			      exp, lock, lock->l_exp_refs_target);
1074	}
1075	if ((lock->l_exp_refs_nr ++) == 0) {
1076		list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1077		lock->l_exp_refs_target = exp;
1078	}
1079	CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1080	       lock, exp, lock->l_exp_refs_nr);
1081	spin_unlock(&exp->exp_locks_list_guard);
1082}
1083EXPORT_SYMBOL(__class_export_add_lock_ref);
1084
1085void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1086{
1087	spin_lock(&exp->exp_locks_list_guard);
1088	LASSERT(lock->l_exp_refs_nr > 0);
1089	if (lock->l_exp_refs_target != exp) {
1090		LCONSOLE_WARN("lock %p, "
1091			      "mismatching export pointers: %p, %p\n",
1092			      lock, lock->l_exp_refs_target, exp);
1093	}
1094	if (-- lock->l_exp_refs_nr == 0) {
1095		list_del_init(&lock->l_exp_refs_link);
1096		lock->l_exp_refs_target = NULL;
1097	}
1098	CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1099	       lock, exp, lock->l_exp_refs_nr);
1100	spin_unlock(&exp->exp_locks_list_guard);
1101}
1102EXPORT_SYMBOL(__class_export_del_lock_ref);
1103#endif
1104
1105/* A connection defines an export context in which preallocation can
1106   be managed. This releases the export pointer reference, and returns
1107   the export handle, so the export refcount is 1 when this function
1108   returns. */
1109int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1110		  struct obd_uuid *cluuid)
1111{
1112	struct obd_export *export;
1113	LASSERT(conn != NULL);
1114	LASSERT(obd != NULL);
1115	LASSERT(cluuid != NULL);
1116
1117	export = class_new_export(obd, cluuid);
1118	if (IS_ERR(export))
1119		return PTR_ERR(export);
1120
1121	conn->cookie = export->exp_handle.h_cookie;
1122	class_export_put(export);
1123
1124	CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1125	       cluuid->uuid, conn->cookie);
1126	return 0;
1127}
1128EXPORT_SYMBOL(class_connect);
1129
1130/* if export is involved in recovery then clean up related things */
1131void class_export_recovery_cleanup(struct obd_export *exp)
1132{
1133	struct obd_device *obd = exp->exp_obd;
1134
1135	spin_lock(&obd->obd_recovery_task_lock);
1136	if (exp->exp_delayed)
1137		obd->obd_delayed_clients--;
1138	if (obd->obd_recovering) {
1139		if (exp->exp_in_recovery) {
1140			spin_lock(&exp->exp_lock);
1141			exp->exp_in_recovery = 0;
1142			spin_unlock(&exp->exp_lock);
1143			LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1144			atomic_dec(&obd->obd_connected_clients);
1145		}
1146
1147		/* if called during recovery then should update
1148		 * obd_stale_clients counter,
1149		 * lightweight exports are not counted */
1150		if (exp->exp_failed &&
1151		    (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1152			exp->exp_obd->obd_stale_clients++;
1153	}
1154	spin_unlock(&obd->obd_recovery_task_lock);
1155	/** Cleanup req replay fields */
1156	if (exp->exp_req_replay_needed) {
1157		spin_lock(&exp->exp_lock);
1158		exp->exp_req_replay_needed = 0;
1159		spin_unlock(&exp->exp_lock);
1160		LASSERT(atomic_read(&obd->obd_req_replay_clients));
1161		atomic_dec(&obd->obd_req_replay_clients);
1162	}
1163	/** Cleanup lock replay data */
1164	if (exp->exp_lock_replay_needed) {
1165		spin_lock(&exp->exp_lock);
1166		exp->exp_lock_replay_needed = 0;
1167		spin_unlock(&exp->exp_lock);
1168		LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1169		atomic_dec(&obd->obd_lock_replay_clients);
1170	}
1171}
1172
1173/* This function removes 1-3 references from the export:
1174 * 1 - for export pointer passed
1175 * and if disconnect really need
1176 * 2 - removing from hash
1177 * 3 - in client_unlink_export
1178 * The export pointer passed to this function can destroyed */
1179int class_disconnect(struct obd_export *export)
1180{
1181	int already_disconnected;
1182
1183	if (export == NULL) {
1184		CWARN("attempting to free NULL export %p\n", export);
1185		return -EINVAL;
1186	}
1187
1188	spin_lock(&export->exp_lock);
1189	already_disconnected = export->exp_disconnected;
1190	export->exp_disconnected = 1;
1191	spin_unlock(&export->exp_lock);
1192
1193	/* class_cleanup(), abort_recovery(), and class_fail_export()
1194	 * all end up in here, and if any of them race we shouldn't
1195	 * call extra class_export_puts(). */
1196	if (already_disconnected) {
1197		LASSERT(hlist_unhashed(&export->exp_nid_hash));
1198		goto no_disconn;
1199	}
1200
1201	CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1202	       export->exp_handle.h_cookie);
1203
1204	if (!hlist_unhashed(&export->exp_nid_hash))
1205		cfs_hash_del(export->exp_obd->obd_nid_hash,
1206			     &export->exp_connection->c_peer.nid,
1207			     &export->exp_nid_hash);
1208
1209	class_export_recovery_cleanup(export);
1210	class_unlink_export(export);
1211no_disconn:
1212	class_export_put(export);
1213	return 0;
1214}
1215EXPORT_SYMBOL(class_disconnect);
1216
1217/* Return non-zero for a fully connected export */
1218int class_connected_export(struct obd_export *exp)
1219{
1220	if (exp) {
1221		int connected;
1222		spin_lock(&exp->exp_lock);
1223		connected = (exp->exp_conn_cnt > 0);
1224		spin_unlock(&exp->exp_lock);
1225		return connected;
1226	}
1227	return 0;
1228}
1229EXPORT_SYMBOL(class_connected_export);
1230
1231static void class_disconnect_export_list(struct list_head *list,
1232					 enum obd_option flags)
1233{
1234	int rc;
1235	struct obd_export *exp;
1236
1237	/* It's possible that an export may disconnect itself, but
1238	 * nothing else will be added to this list. */
1239	while (!list_empty(list)) {
1240		exp = list_entry(list->next, struct obd_export,
1241				     exp_obd_chain);
1242		/* need for safe call CDEBUG after obd_disconnect */
1243		class_export_get(exp);
1244
1245		spin_lock(&exp->exp_lock);
1246		exp->exp_flags = flags;
1247		spin_unlock(&exp->exp_lock);
1248
1249		if (obd_uuid_equals(&exp->exp_client_uuid,
1250				    &exp->exp_obd->obd_uuid)) {
1251			CDEBUG(D_HA,
1252			       "exp %p export uuid == obd uuid, don't discon\n",
1253			       exp);
1254			/* Need to delete this now so we don't end up pointing
1255			 * to work_list later when this export is cleaned up. */
1256			list_del_init(&exp->exp_obd_chain);
1257			class_export_put(exp);
1258			continue;
1259		}
1260
1261		class_export_get(exp);
1262		CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1263		       "last request at "CFS_TIME_T"\n",
1264		       exp->exp_obd->obd_name, obd_export_nid2str(exp),
1265		       exp, exp->exp_last_request_time);
1266		/* release one export reference anyway */
1267		rc = obd_disconnect(exp);
1268
1269		CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1270		       obd_export_nid2str(exp), exp, rc);
1271		class_export_put(exp);
1272	}
1273}
1274
1275void class_disconnect_exports(struct obd_device *obd)
1276{
1277	struct list_head work_list;
1278
1279	/* Move all of the exports from obd_exports to a work list, en masse. */
1280	INIT_LIST_HEAD(&work_list);
1281	spin_lock(&obd->obd_dev_lock);
1282	list_splice_init(&obd->obd_exports, &work_list);
1283	list_splice_init(&obd->obd_delayed_exports, &work_list);
1284	spin_unlock(&obd->obd_dev_lock);
1285
1286	if (!list_empty(&work_list)) {
1287		CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1288		       "disconnecting them\n", obd->obd_minor, obd);
1289		class_disconnect_export_list(&work_list,
1290					     exp_flags_from_obd(obd));
1291	} else
1292		CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1293		       obd->obd_minor, obd);
1294}
1295EXPORT_SYMBOL(class_disconnect_exports);
1296
1297/* Remove exports that have not completed recovery.
1298 */
1299void class_disconnect_stale_exports(struct obd_device *obd,
1300				    int (*test_export)(struct obd_export *))
1301{
1302	struct list_head work_list;
1303	struct obd_export *exp, *n;
1304	int evicted = 0;
1305
1306	INIT_LIST_HEAD(&work_list);
1307	spin_lock(&obd->obd_dev_lock);
1308	list_for_each_entry_safe(exp, n, &obd->obd_exports,
1309				     exp_obd_chain) {
1310		/* don't count self-export as client */
1311		if (obd_uuid_equals(&exp->exp_client_uuid,
1312				    &exp->exp_obd->obd_uuid))
1313			continue;
1314
1315		/* don't evict clients which have no slot in last_rcvd
1316		 * (e.g. lightweight connection) */
1317		if (exp->exp_target_data.ted_lr_idx == -1)
1318			continue;
1319
1320		spin_lock(&exp->exp_lock);
1321		if (exp->exp_failed || test_export(exp)) {
1322			spin_unlock(&exp->exp_lock);
1323			continue;
1324		}
1325		exp->exp_failed = 1;
1326		spin_unlock(&exp->exp_lock);
1327
1328		list_move(&exp->exp_obd_chain, &work_list);
1329		evicted++;
1330		CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1331		       obd->obd_name, exp->exp_client_uuid.uuid,
1332		       exp->exp_connection == NULL ? "<unknown>" :
1333		       libcfs_nid2str(exp->exp_connection->c_peer.nid));
1334		print_export_data(exp, "EVICTING", 0);
1335	}
1336	spin_unlock(&obd->obd_dev_lock);
1337
1338	if (evicted)
1339		LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1340			      obd->obd_name, evicted);
1341
1342	class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1343						 OBD_OPT_ABORT_RECOV);
1344}
1345EXPORT_SYMBOL(class_disconnect_stale_exports);
1346
1347void class_fail_export(struct obd_export *exp)
1348{
1349	int rc, already_failed;
1350
1351	spin_lock(&exp->exp_lock);
1352	already_failed = exp->exp_failed;
1353	exp->exp_failed = 1;
1354	spin_unlock(&exp->exp_lock);
1355
1356	if (already_failed) {
1357		CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1358		       exp, exp->exp_client_uuid.uuid);
1359		return;
1360	}
1361
1362	CDEBUG(D_HA, "disconnecting export %p/%s\n",
1363	       exp, exp->exp_client_uuid.uuid);
1364
1365	if (obd_dump_on_timeout)
1366		libcfs_debug_dumplog();
1367
1368	/* need for safe call CDEBUG after obd_disconnect */
1369	class_export_get(exp);
1370
1371	/* Most callers into obd_disconnect are removing their own reference
1372	 * (request, for example) in addition to the one from the hash table.
1373	 * We don't have such a reference here, so make one. */
1374	class_export_get(exp);
1375	rc = obd_disconnect(exp);
1376	if (rc)
1377		CERROR("disconnecting export %p failed: %d\n", exp, rc);
1378	else
1379		CDEBUG(D_HA, "disconnected export %p/%s\n",
1380		       exp, exp->exp_client_uuid.uuid);
1381	class_export_put(exp);
1382}
1383EXPORT_SYMBOL(class_fail_export);
1384
1385char *obd_export_nid2str(struct obd_export *exp)
1386{
1387	if (exp->exp_connection != NULL)
1388		return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1389
1390	return "(no nid)";
1391}
1392EXPORT_SYMBOL(obd_export_nid2str);
1393
1394int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1395{
1396	struct cfs_hash *nid_hash;
1397	struct obd_export *doomed_exp = NULL;
1398	int exports_evicted = 0;
1399
1400	lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1401
1402	spin_lock(&obd->obd_dev_lock);
1403	/* umount has run already, so evict thread should leave
1404	 * its task to umount thread now */
1405	if (obd->obd_stopping) {
1406		spin_unlock(&obd->obd_dev_lock);
1407		return exports_evicted;
1408	}
1409	nid_hash = obd->obd_nid_hash;
1410	cfs_hash_getref(nid_hash);
1411	spin_unlock(&obd->obd_dev_lock);
1412
1413	do {
1414		doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1415		if (doomed_exp == NULL)
1416			break;
1417
1418		LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1419			 "nid %s found, wanted nid %s, requested nid %s\n",
1420			 obd_export_nid2str(doomed_exp),
1421			 libcfs_nid2str(nid_key), nid);
1422		LASSERTF(doomed_exp != obd->obd_self_export,
1423			 "self-export is hashed by NID?\n");
1424		exports_evicted++;
1425		LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1426			      "request\n", obd->obd_name,
1427			      obd_uuid2str(&doomed_exp->exp_client_uuid),
1428			      obd_export_nid2str(doomed_exp));
1429		class_fail_export(doomed_exp);
1430		class_export_put(doomed_exp);
1431	} while (1);
1432
1433	cfs_hash_putref(nid_hash);
1434
1435	if (!exports_evicted)
1436		CDEBUG(D_HA,
1437		       "%s: can't disconnect NID '%s': no exports found\n",
1438		       obd->obd_name, nid);
1439	return exports_evicted;
1440}
1441EXPORT_SYMBOL(obd_export_evict_by_nid);
1442
1443int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1444{
1445	struct cfs_hash *uuid_hash;
1446	struct obd_export *doomed_exp = NULL;
1447	struct obd_uuid doomed_uuid;
1448	int exports_evicted = 0;
1449
1450	spin_lock(&obd->obd_dev_lock);
1451	if (obd->obd_stopping) {
1452		spin_unlock(&obd->obd_dev_lock);
1453		return exports_evicted;
1454	}
1455	uuid_hash = obd->obd_uuid_hash;
1456	cfs_hash_getref(uuid_hash);
1457	spin_unlock(&obd->obd_dev_lock);
1458
1459	obd_str2uuid(&doomed_uuid, uuid);
1460	if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1461		CERROR("%s: can't evict myself\n", obd->obd_name);
1462		cfs_hash_putref(uuid_hash);
1463		return exports_evicted;
1464	}
1465
1466	doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1467
1468	if (doomed_exp == NULL) {
1469		CERROR("%s: can't disconnect %s: no exports found\n",
1470		       obd->obd_name, uuid);
1471	} else {
1472		CWARN("%s: evicting %s at administrative request\n",
1473		       obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1474		class_fail_export(doomed_exp);
1475		class_export_put(doomed_exp);
1476		exports_evicted++;
1477	}
1478	cfs_hash_putref(uuid_hash);
1479
1480	return exports_evicted;
1481}
1482EXPORT_SYMBOL(obd_export_evict_by_uuid);
1483
1484#if LUSTRE_TRACKS_LOCK_EXP_REFS
1485void (*class_export_dump_hook)(struct obd_export*) = NULL;
1486EXPORT_SYMBOL(class_export_dump_hook);
1487#endif
1488
1489static void print_export_data(struct obd_export *exp, const char *status,
1490			      int locks)
1491{
1492	struct ptlrpc_reply_state *rs;
1493	struct ptlrpc_reply_state *first_reply = NULL;
1494	int nreplies = 0;
1495
1496	spin_lock(&exp->exp_lock);
1497	list_for_each_entry(rs, &exp->exp_outstanding_replies,
1498				rs_exp_list) {
1499		if (nreplies == 0)
1500			first_reply = rs;
1501		nreplies++;
1502	}
1503	spin_unlock(&exp->exp_lock);
1504
1505	CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s %llu\n",
1506	       exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1507	       obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1508	       atomic_read(&exp->exp_rpc_count),
1509	       atomic_read(&exp->exp_cb_count),
1510	       atomic_read(&exp->exp_locks_count),
1511	       exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1512	       nreplies, first_reply, nreplies > 3 ? "..." : "",
1513	       exp->exp_last_committed);
1514#if LUSTRE_TRACKS_LOCK_EXP_REFS
1515	if (locks && class_export_dump_hook != NULL)
1516		class_export_dump_hook(exp);
1517#endif
1518}
1519
1520void dump_exports(struct obd_device *obd, int locks)
1521{
1522	struct obd_export *exp;
1523
1524	spin_lock(&obd->obd_dev_lock);
1525	list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1526		print_export_data(exp, "ACTIVE", locks);
1527	list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1528		print_export_data(exp, "UNLINKED", locks);
1529	list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1530		print_export_data(exp, "DELAYED", locks);
1531	spin_unlock(&obd->obd_dev_lock);
1532	spin_lock(&obd_zombie_impexp_lock);
1533	list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1534		print_export_data(exp, "ZOMBIE", locks);
1535	spin_unlock(&obd_zombie_impexp_lock);
1536}
1537EXPORT_SYMBOL(dump_exports);
1538
1539void obd_exports_barrier(struct obd_device *obd)
1540{
1541	int waited = 2;
1542	LASSERT(list_empty(&obd->obd_exports));
1543	spin_lock(&obd->obd_dev_lock);
1544	while (!list_empty(&obd->obd_unlinked_exports)) {
1545		spin_unlock(&obd->obd_dev_lock);
1546		set_current_state(TASK_UNINTERRUPTIBLE);
1547		schedule_timeout(cfs_time_seconds(waited));
1548		if (waited > 5 && IS_PO2(waited)) {
1549			LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1550				      "more than %d seconds. "
1551				      "The obd refcount = %d. Is it stuck?\n",
1552				      obd->obd_name, waited,
1553				      atomic_read(&obd->obd_refcount));
1554			dump_exports(obd, 1);
1555		}
1556		waited *= 2;
1557		spin_lock(&obd->obd_dev_lock);
1558	}
1559	spin_unlock(&obd->obd_dev_lock);
1560}
1561EXPORT_SYMBOL(obd_exports_barrier);
1562
1563/* Total amount of zombies to be destroyed */
1564static int zombies_count = 0;
1565
1566/**
1567 * kill zombie imports and exports
1568 */
1569void obd_zombie_impexp_cull(void)
1570{
1571	struct obd_import *import;
1572	struct obd_export *export;
1573
1574	do {
1575		spin_lock(&obd_zombie_impexp_lock);
1576
1577		import = NULL;
1578		if (!list_empty(&obd_zombie_imports)) {
1579			import = list_entry(obd_zombie_imports.next,
1580						struct obd_import,
1581						imp_zombie_chain);
1582			list_del_init(&import->imp_zombie_chain);
1583		}
1584
1585		export = NULL;
1586		if (!list_empty(&obd_zombie_exports)) {
1587			export = list_entry(obd_zombie_exports.next,
1588						struct obd_export,
1589						exp_obd_chain);
1590			list_del_init(&export->exp_obd_chain);
1591		}
1592
1593		spin_unlock(&obd_zombie_impexp_lock);
1594
1595		if (import != NULL) {
1596			class_import_destroy(import);
1597			spin_lock(&obd_zombie_impexp_lock);
1598			zombies_count--;
1599			spin_unlock(&obd_zombie_impexp_lock);
1600		}
1601
1602		if (export != NULL) {
1603			class_export_destroy(export);
1604			spin_lock(&obd_zombie_impexp_lock);
1605			zombies_count--;
1606			spin_unlock(&obd_zombie_impexp_lock);
1607		}
1608
1609		cond_resched();
1610	} while (import != NULL || export != NULL);
1611}
1612
1613static struct completion	obd_zombie_start;
1614static struct completion	obd_zombie_stop;
1615static unsigned long		obd_zombie_flags;
1616static wait_queue_head_t		obd_zombie_waitq;
1617static pid_t			obd_zombie_pid;
1618
1619enum {
1620	OBD_ZOMBIE_STOP		= 0x0001,
1621};
1622
1623/**
1624 * check for work for kill zombie import/export thread.
1625 */
1626static int obd_zombie_impexp_check(void *arg)
1627{
1628	int rc;
1629
1630	spin_lock(&obd_zombie_impexp_lock);
1631	rc = (zombies_count == 0) &&
1632	     !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1633	spin_unlock(&obd_zombie_impexp_lock);
1634
1635	return rc;
1636}
1637
1638/**
1639 * Add export to the obd_zombie thread and notify it.
1640 */
1641static void obd_zombie_export_add(struct obd_export *exp) {
1642	spin_lock(&exp->exp_obd->obd_dev_lock);
1643	LASSERT(!list_empty(&exp->exp_obd_chain));
1644	list_del_init(&exp->exp_obd_chain);
1645	spin_unlock(&exp->exp_obd->obd_dev_lock);
1646	spin_lock(&obd_zombie_impexp_lock);
1647	zombies_count++;
1648	list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1649	spin_unlock(&obd_zombie_impexp_lock);
1650
1651	obd_zombie_impexp_notify();
1652}
1653
1654/**
1655 * Add import to the obd_zombie thread and notify it.
1656 */
1657static void obd_zombie_import_add(struct obd_import *imp) {
1658	LASSERT(imp->imp_sec == NULL);
1659	LASSERT(imp->imp_rq_pool == NULL);
1660	spin_lock(&obd_zombie_impexp_lock);
1661	LASSERT(list_empty(&imp->imp_zombie_chain));
1662	zombies_count++;
1663	list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1664	spin_unlock(&obd_zombie_impexp_lock);
1665
1666	obd_zombie_impexp_notify();
1667}
1668
1669/**
1670 * notify import/export destroy thread about new zombie.
1671 */
1672static void obd_zombie_impexp_notify(void)
1673{
1674	/*
1675	 * Make sure obd_zombie_impexp_thread get this notification.
1676	 * It is possible this signal only get by obd_zombie_barrier, and
1677	 * barrier gulps this notification and sleeps away and hangs ensues
1678	 */
1679	wake_up_all(&obd_zombie_waitq);
1680}
1681
1682/**
1683 * check whether obd_zombie is idle
1684 */
1685static int obd_zombie_is_idle(void)
1686{
1687	int rc;
1688
1689	LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1690	spin_lock(&obd_zombie_impexp_lock);
1691	rc = (zombies_count == 0);
1692	spin_unlock(&obd_zombie_impexp_lock);
1693	return rc;
1694}
1695
1696/**
1697 * wait when obd_zombie import/export queues become empty
1698 */
1699void obd_zombie_barrier(void)
1700{
1701	struct l_wait_info lwi = { 0 };
1702
1703	if (obd_zombie_pid == current_pid())
1704		/* don't wait for myself */
1705		return;
1706	l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1707}
1708EXPORT_SYMBOL(obd_zombie_barrier);
1709
1710
1711/**
1712 * destroy zombie export/import thread.
1713 */
1714static int obd_zombie_impexp_thread(void *unused)
1715{
1716	unshare_fs_struct();
1717	complete(&obd_zombie_start);
1718
1719	obd_zombie_pid = current_pid();
1720
1721	while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1722		struct l_wait_info lwi = { 0 };
1723
1724		l_wait_event(obd_zombie_waitq,
1725			     !obd_zombie_impexp_check(NULL), &lwi);
1726		obd_zombie_impexp_cull();
1727
1728		/*
1729		 * Notify obd_zombie_barrier callers that queues
1730		 * may be empty.
1731		 */
1732		wake_up(&obd_zombie_waitq);
1733	}
1734
1735	complete(&obd_zombie_stop);
1736
1737	return 0;
1738}
1739
1740
1741/**
1742 * start destroy zombie import/export thread
1743 */
1744int obd_zombie_impexp_init(void)
1745{
1746	struct task_struct *task;
1747
1748	INIT_LIST_HEAD(&obd_zombie_imports);
1749	INIT_LIST_HEAD(&obd_zombie_exports);
1750	spin_lock_init(&obd_zombie_impexp_lock);
1751	init_completion(&obd_zombie_start);
1752	init_completion(&obd_zombie_stop);
1753	init_waitqueue_head(&obd_zombie_waitq);
1754	obd_zombie_pid = 0;
1755
1756	task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1757	if (IS_ERR(task))
1758		return PTR_ERR(task);
1759
1760	wait_for_completion(&obd_zombie_start);
1761	return 0;
1762}
1763/**
1764 * stop destroy zombie import/export thread
1765 */
1766void obd_zombie_impexp_stop(void)
1767{
1768	set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1769	obd_zombie_impexp_notify();
1770	wait_for_completion(&obd_zombie_stop);
1771}
1772
1773/***** Kernel-userspace comm helpers *******/
1774
1775/* Get length of entire message, including header */
1776int kuc_len(int payload_len)
1777{
1778	return sizeof(struct kuc_hdr) + payload_len;
1779}
1780EXPORT_SYMBOL(kuc_len);
1781
1782/* Get a pointer to kuc header, given a ptr to the payload
1783 * @param p Pointer to payload area
1784 * @returns Pointer to kuc header
1785 */
1786struct kuc_hdr * kuc_ptr(void *p)
1787{
1788	struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1789	LASSERT(lh->kuc_magic == KUC_MAGIC);
1790	return lh;
1791}
1792EXPORT_SYMBOL(kuc_ptr);
1793
1794/* Test if payload is part of kuc message
1795 * @param p Pointer to payload area
1796 * @returns boolean
1797 */
1798int kuc_ispayload(void *p)
1799{
1800	struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1801
1802	if (kh->kuc_magic == KUC_MAGIC)
1803		return 1;
1804	else
1805		return 0;
1806}
1807EXPORT_SYMBOL(kuc_ispayload);
1808
1809/* Alloc space for a message, and fill in header
1810 * @return Pointer to payload area
1811 */
1812void *kuc_alloc(int payload_len, int transport, int type)
1813{
1814	struct kuc_hdr *lh;
1815	int len = kuc_len(payload_len);
1816
1817	OBD_ALLOC(lh, len);
1818	if (lh == NULL)
1819		return ERR_PTR(-ENOMEM);
1820
1821	lh->kuc_magic = KUC_MAGIC;
1822	lh->kuc_transport = transport;
1823	lh->kuc_msgtype = type;
1824	lh->kuc_msglen = len;
1825
1826	return (void *)(lh + 1);
1827}
1828EXPORT_SYMBOL(kuc_alloc);
1829
1830/* Takes pointer to payload area */
1831inline void kuc_free(void *p, int payload_len)
1832{
1833	struct kuc_hdr *lh = kuc_ptr(p);
1834	OBD_FREE(lh, kuc_len(payload_len));
1835}
1836EXPORT_SYMBOL(kuc_free);
1837