[go: nahoru, domu]

1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 */
36
37#define DEBUG_SUBSYSTEM S_ECHO
38#include "../../include/linux/libcfs/libcfs.h"
39
40#include "../include/obd.h"
41#include "../include/obd_support.h"
42#include "../include/obd_class.h"
43#include "../include/lustre_debug.h"
44#include "../include/lprocfs_status.h"
45#include "../include/cl_object.h"
46#include "../include/lustre_fid.h"
47#include "../include/lustre_acl.h"
48#include "../include/lustre_net.h"
49
50#include "echo_internal.h"
51
52/** \defgroup echo_client Echo Client
53 * @{
54 */
55
56struct echo_device {
57	struct cl_device	ed_cl;
58	struct echo_client_obd *ed_ec;
59
60	struct cl_site	  ed_site_myself;
61	struct cl_site	 *ed_site;
62	struct lu_device       *ed_next;
63	int		     ed_next_islov;
64};
65
66struct echo_object {
67	struct cl_object	eo_cl;
68	struct cl_object_header eo_hdr;
69
70	struct echo_device     *eo_dev;
71	struct list_head	      eo_obj_chain;
72	struct lov_stripe_md   *eo_lsm;
73	atomic_t	    eo_npages;
74	int		     eo_deleted;
75};
76
77struct echo_object_conf {
78	struct cl_object_conf  eoc_cl;
79	struct lov_stripe_md **eoc_md;
80};
81
82struct echo_page {
83	struct cl_page_slice   ep_cl;
84	struct mutex		ep_lock;
85	struct page	    *ep_vmpage;
86};
87
88struct echo_lock {
89	struct cl_lock_slice   el_cl;
90	struct list_head	     el_chain;
91	struct echo_object    *el_object;
92	__u64		  el_cookie;
93	atomic_t	   el_refcount;
94};
95
96static int echo_client_setup(const struct lu_env *env,
97			     struct obd_device *obddev,
98			     struct lustre_cfg *lcfg);
99static int echo_client_cleanup(struct obd_device *obddev);
100
101
102/** \defgroup echo_helpers Helper functions
103 * @{
104 */
105static inline struct echo_device *cl2echo_dev(const struct cl_device *dev)
106{
107	return container_of0(dev, struct echo_device, ed_cl);
108}
109
110static inline struct cl_device *echo_dev2cl(struct echo_device *d)
111{
112	return &d->ed_cl;
113}
114
115static inline struct echo_device *obd2echo_dev(const struct obd_device *obd)
116{
117	return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev));
118}
119
120static inline struct cl_object *echo_obj2cl(struct echo_object *eco)
121{
122	return &eco->eo_cl;
123}
124
125static inline struct echo_object *cl2echo_obj(const struct cl_object *o)
126{
127	return container_of(o, struct echo_object, eo_cl);
128}
129
130static inline struct echo_page *cl2echo_page(const struct cl_page_slice *s)
131{
132	return container_of(s, struct echo_page, ep_cl);
133}
134
135static inline struct echo_lock *cl2echo_lock(const struct cl_lock_slice *s)
136{
137	return container_of(s, struct echo_lock, el_cl);
138}
139
140static inline struct cl_lock *echo_lock2cl(const struct echo_lock *ecl)
141{
142	return ecl->el_cl.cls_lock;
143}
144
145static struct lu_context_key echo_thread_key;
146static inline struct echo_thread_info *echo_env_info(const struct lu_env *env)
147{
148	struct echo_thread_info *info;
149	info = lu_context_key_get(&env->le_ctx, &echo_thread_key);
150	LASSERT(info != NULL);
151	return info;
152}
153
154static inline
155struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c)
156{
157	return container_of(c, struct echo_object_conf, eoc_cl);
158}
159
160/** @} echo_helpers */
161
162static struct echo_object *cl_echo_object_find(struct echo_device *d,
163					       struct lov_stripe_md **lsm);
164static int cl_echo_object_put(struct echo_object *eco);
165static int cl_echo_enqueue(struct echo_object *eco, u64 start,
166			   u64 end, int mode, __u64 *cookie);
167static int cl_echo_cancel(struct echo_device *d, __u64 cookie);
168static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
169			      struct page **pages, int npages, int async);
170
171static struct echo_thread_info *echo_env_info(const struct lu_env *env);
172
173struct echo_thread_info {
174	struct echo_object_conf eti_conf;
175	struct lustre_md	eti_md;
176
177	struct cl_2queue	eti_queue;
178	struct cl_io	    eti_io;
179	struct cl_lock_descr    eti_descr;
180	struct lu_fid	   eti_fid;
181	struct lu_fid		eti_fid2;
182};
183
184/* No session used right now */
185struct echo_session_info {
186	unsigned long dummy;
187};
188
189static struct kmem_cache *echo_lock_kmem;
190static struct kmem_cache *echo_object_kmem;
191static struct kmem_cache *echo_thread_kmem;
192static struct kmem_cache *echo_session_kmem;
193
194static struct lu_kmem_descr echo_caches[] = {
195	{
196		.ckd_cache = &echo_lock_kmem,
197		.ckd_name  = "echo_lock_kmem",
198		.ckd_size  = sizeof (struct echo_lock)
199	},
200	{
201		.ckd_cache = &echo_object_kmem,
202		.ckd_name  = "echo_object_kmem",
203		.ckd_size  = sizeof (struct echo_object)
204	},
205	{
206		.ckd_cache = &echo_thread_kmem,
207		.ckd_name  = "echo_thread_kmem",
208		.ckd_size  = sizeof (struct echo_thread_info)
209	},
210	{
211		.ckd_cache = &echo_session_kmem,
212		.ckd_name  = "echo_session_kmem",
213		.ckd_size  = sizeof (struct echo_session_info)
214	},
215	{
216		.ckd_cache = NULL
217	}
218};
219
220/** \defgroup echo_page Page operations
221 *
222 * Echo page operations.
223 *
224 * @{
225 */
226static struct page *echo_page_vmpage(const struct lu_env *env,
227				    const struct cl_page_slice *slice)
228{
229	return cl2echo_page(slice)->ep_vmpage;
230}
231
232static int echo_page_own(const struct lu_env *env,
233			 const struct cl_page_slice *slice,
234			 struct cl_io *io, int nonblock)
235{
236	struct echo_page *ep = cl2echo_page(slice);
237
238	if (!nonblock)
239		mutex_lock(&ep->ep_lock);
240	else if (!mutex_trylock(&ep->ep_lock))
241		return -EAGAIN;
242	return 0;
243}
244
245static void echo_page_disown(const struct lu_env *env,
246			     const struct cl_page_slice *slice,
247			     struct cl_io *io)
248{
249	struct echo_page *ep = cl2echo_page(slice);
250
251	LASSERT(mutex_is_locked(&ep->ep_lock));
252	mutex_unlock(&ep->ep_lock);
253}
254
255static void echo_page_discard(const struct lu_env *env,
256			      const struct cl_page_slice *slice,
257			      struct cl_io *unused)
258{
259	cl_page_delete(env, slice->cpl_page);
260}
261
262static int echo_page_is_vmlocked(const struct lu_env *env,
263				 const struct cl_page_slice *slice)
264{
265	if (mutex_is_locked(&cl2echo_page(slice)->ep_lock))
266		return -EBUSY;
267	return -ENODATA;
268}
269
270static void echo_page_completion(const struct lu_env *env,
271				 const struct cl_page_slice *slice,
272				 int ioret)
273{
274	LASSERT(slice->cpl_page->cp_sync_io != NULL);
275}
276
277static void echo_page_fini(const struct lu_env *env,
278			   struct cl_page_slice *slice)
279{
280	struct echo_page *ep    = cl2echo_page(slice);
281	struct echo_object *eco = cl2echo_obj(slice->cpl_obj);
282	struct page *vmpage      = ep->ep_vmpage;
283
284	atomic_dec(&eco->eo_npages);
285	page_cache_release(vmpage);
286}
287
288static int echo_page_prep(const struct lu_env *env,
289			  const struct cl_page_slice *slice,
290			  struct cl_io *unused)
291{
292	return 0;
293}
294
295static int echo_page_print(const struct lu_env *env,
296			   const struct cl_page_slice *slice,
297			   void *cookie, lu_printer_t printer)
298{
299	struct echo_page *ep = cl2echo_page(slice);
300
301	(*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n",
302		   ep, mutex_is_locked(&ep->ep_lock), ep->ep_vmpage);
303	return 0;
304}
305
306static const struct cl_page_operations echo_page_ops = {
307	.cpo_own	   = echo_page_own,
308	.cpo_disown	= echo_page_disown,
309	.cpo_discard       = echo_page_discard,
310	.cpo_vmpage	= echo_page_vmpage,
311	.cpo_fini	  = echo_page_fini,
312	.cpo_print	 = echo_page_print,
313	.cpo_is_vmlocked   = echo_page_is_vmlocked,
314	.io = {
315		[CRT_READ] = {
316			.cpo_prep	= echo_page_prep,
317			.cpo_completion  = echo_page_completion,
318		},
319		[CRT_WRITE] = {
320			.cpo_prep	= echo_page_prep,
321			.cpo_completion  = echo_page_completion,
322		}
323	}
324};
325/** @} echo_page */
326
327/** \defgroup echo_lock Locking
328 *
329 * echo lock operations
330 *
331 * @{
332 */
333static void echo_lock_fini(const struct lu_env *env,
334			   struct cl_lock_slice *slice)
335{
336	struct echo_lock *ecl = cl2echo_lock(slice);
337
338	LASSERT(list_empty(&ecl->el_chain));
339	OBD_SLAB_FREE_PTR(ecl, echo_lock_kmem);
340}
341
342static void echo_lock_delete(const struct lu_env *env,
343			     const struct cl_lock_slice *slice)
344{
345	struct echo_lock *ecl      = cl2echo_lock(slice);
346
347	LASSERT(list_empty(&ecl->el_chain));
348}
349
350static int echo_lock_fits_into(const struct lu_env *env,
351			       const struct cl_lock_slice *slice,
352			       const struct cl_lock_descr *need,
353			       const struct cl_io *unused)
354{
355	return 1;
356}
357
358static struct cl_lock_operations echo_lock_ops = {
359	.clo_fini      = echo_lock_fini,
360	.clo_delete    = echo_lock_delete,
361	.clo_fits_into = echo_lock_fits_into
362};
363
364/** @} echo_lock */
365
366/** \defgroup echo_cl_ops cl_object operations
367 *
368 * operations for cl_object
369 *
370 * @{
371 */
372static int echo_page_init(const struct lu_env *env, struct cl_object *obj,
373			struct cl_page *page, struct page *vmpage)
374{
375	struct echo_page *ep = cl_object_page_slice(obj, page);
376	struct echo_object *eco = cl2echo_obj(obj);
377
378	ep->ep_vmpage = vmpage;
379	page_cache_get(vmpage);
380	mutex_init(&ep->ep_lock);
381	cl_page_slice_add(page, &ep->ep_cl, obj, &echo_page_ops);
382	atomic_inc(&eco->eo_npages);
383	return 0;
384}
385
386static int echo_io_init(const struct lu_env *env, struct cl_object *obj,
387			struct cl_io *io)
388{
389	return 0;
390}
391
392static int echo_lock_init(const struct lu_env *env,
393			  struct cl_object *obj, struct cl_lock *lock,
394			  const struct cl_io *unused)
395{
396	struct echo_lock *el;
397
398	OBD_SLAB_ALLOC_PTR_GFP(el, echo_lock_kmem, GFP_NOFS);
399	if (el != NULL) {
400		cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops);
401		el->el_object = cl2echo_obj(obj);
402		INIT_LIST_HEAD(&el->el_chain);
403		atomic_set(&el->el_refcount, 0);
404	}
405	return el == NULL ? -ENOMEM : 0;
406}
407
408static int echo_conf_set(const struct lu_env *env, struct cl_object *obj,
409			 const struct cl_object_conf *conf)
410{
411	return 0;
412}
413
414static const struct cl_object_operations echo_cl_obj_ops = {
415	.coo_page_init = echo_page_init,
416	.coo_lock_init = echo_lock_init,
417	.coo_io_init   = echo_io_init,
418	.coo_conf_set  = echo_conf_set
419};
420/** @} echo_cl_ops */
421
422/** \defgroup echo_lu_ops lu_object operations
423 *
424 * operations for echo lu object.
425 *
426 * @{
427 */
428static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
429			    const struct lu_object_conf *conf)
430{
431	struct echo_device *ed	 = cl2echo_dev(lu2cl_dev(obj->lo_dev));
432	struct echo_client_obd *ec     = ed->ed_ec;
433	struct echo_object *eco	= cl2echo_obj(lu2cl(obj));
434	const struct cl_object_conf *cconf;
435	struct echo_object_conf *econf;
436
437	if (ed->ed_next) {
438		struct lu_object  *below;
439		struct lu_device  *under;
440
441		under = ed->ed_next;
442		below = under->ld_ops->ldo_object_alloc(env, obj->lo_header,
443							under);
444		if (below == NULL)
445			return -ENOMEM;
446		lu_object_add(obj, below);
447	}
448
449	cconf = lu2cl_conf(conf);
450	econf = cl2echo_conf(cconf);
451
452	LASSERT(econf->eoc_md);
453	eco->eo_lsm = *econf->eoc_md;
454	/* clear the lsm pointer so that it won't get freed. */
455	*econf->eoc_md = NULL;
456
457	eco->eo_dev = ed;
458	atomic_set(&eco->eo_npages, 0);
459	cl_object_page_init(lu2cl(obj), sizeof(struct echo_page));
460
461	spin_lock(&ec->ec_lock);
462	list_add_tail(&eco->eo_obj_chain, &ec->ec_objects);
463	spin_unlock(&ec->ec_lock);
464
465	return 0;
466}
467
468/* taken from osc_unpackmd() */
469static int echo_alloc_memmd(struct echo_device *ed,
470			    struct lov_stripe_md **lsmp)
471{
472	int lsm_size;
473
474	/* If export is lov/osc then use their obd method */
475	if (ed->ed_next != NULL)
476		return obd_alloc_memmd(ed->ed_ec->ec_exp, lsmp);
477	/* OFD has no unpackmd method, do everything here */
478	lsm_size = lov_stripe_md_size(1);
479
480	LASSERT(*lsmp == NULL);
481	OBD_ALLOC(*lsmp, lsm_size);
482	if (*lsmp == NULL)
483		return -ENOMEM;
484
485	OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
486	if ((*lsmp)->lsm_oinfo[0] == NULL) {
487		OBD_FREE(*lsmp, lsm_size);
488		return -ENOMEM;
489	}
490
491	loi_init((*lsmp)->lsm_oinfo[0]);
492	(*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
493	ostid_set_seq_echo(&(*lsmp)->lsm_oi);
494
495	return lsm_size;
496}
497
498static int echo_free_memmd(struct echo_device *ed, struct lov_stripe_md **lsmp)
499{
500	int lsm_size;
501
502	/* If export is lov/osc then use their obd method */
503	if (ed->ed_next != NULL)
504		return obd_free_memmd(ed->ed_ec->ec_exp, lsmp);
505	/* OFD has no unpackmd method, do everything here */
506	lsm_size = lov_stripe_md_size(1);
507
508	LASSERT(*lsmp != NULL);
509	OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
510	OBD_FREE(*lsmp, lsm_size);
511	*lsmp = NULL;
512	return 0;
513}
514
515static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
516{
517	struct echo_object *eco    = cl2echo_obj(lu2cl(obj));
518	struct echo_client_obd *ec = eco->eo_dev->ed_ec;
519
520	LASSERT(atomic_read(&eco->eo_npages) == 0);
521
522	spin_lock(&ec->ec_lock);
523	list_del_init(&eco->eo_obj_chain);
524	spin_unlock(&ec->ec_lock);
525
526	lu_object_fini(obj);
527	lu_object_header_fini(obj->lo_header);
528
529	if (eco->eo_lsm)
530		echo_free_memmd(eco->eo_dev, &eco->eo_lsm);
531	OBD_SLAB_FREE_PTR(eco, echo_object_kmem);
532}
533
534static int echo_object_print(const struct lu_env *env, void *cookie,
535			    lu_printer_t p, const struct lu_object *o)
536{
537	struct echo_object *obj = cl2echo_obj(lu2cl(o));
538
539	return (*p)(env, cookie, "echoclient-object@%p", obj);
540}
541
542static const struct lu_object_operations echo_lu_obj_ops = {
543	.loo_object_init      = echo_object_init,
544	.loo_object_delete    = NULL,
545	.loo_object_release   = NULL,
546	.loo_object_free      = echo_object_free,
547	.loo_object_print     = echo_object_print,
548	.loo_object_invariant = NULL
549};
550/** @} echo_lu_ops */
551
552/** \defgroup echo_lu_dev_ops  lu_device operations
553 *
554 * Operations for echo lu device.
555 *
556 * @{
557 */
558static struct lu_object *echo_object_alloc(const struct lu_env *env,
559					   const struct lu_object_header *hdr,
560					   struct lu_device *dev)
561{
562	struct echo_object *eco;
563	struct lu_object *obj = NULL;
564
565	/* we're the top dev. */
566	LASSERT(hdr == NULL);
567	OBD_SLAB_ALLOC_PTR_GFP(eco, echo_object_kmem, GFP_NOFS);
568	if (eco != NULL) {
569		struct cl_object_header *hdr = &eco->eo_hdr;
570
571		obj = &echo_obj2cl(eco)->co_lu;
572		cl_object_header_init(hdr);
573		lu_object_init(obj, &hdr->coh_lu, dev);
574		lu_object_add_top(&hdr->coh_lu, obj);
575
576		eco->eo_cl.co_ops = &echo_cl_obj_ops;
577		obj->lo_ops       = &echo_lu_obj_ops;
578	}
579	return obj;
580}
581
582static struct lu_device_operations echo_device_lu_ops = {
583	.ldo_object_alloc   = echo_object_alloc,
584};
585
586/** @} echo_lu_dev_ops */
587
588static struct cl_device_operations echo_device_cl_ops = {
589};
590
591/** \defgroup echo_init Setup and teardown
592 *
593 * Init and fini functions for echo client.
594 *
595 * @{
596 */
597static int echo_site_init(const struct lu_env *env, struct echo_device *ed)
598{
599	struct cl_site *site = &ed->ed_site_myself;
600	int rc;
601
602	/* initialize site */
603	rc = cl_site_init(site, &ed->ed_cl);
604	if (rc) {
605		CERROR("Cannot initialize site for echo client(%d)\n", rc);
606		return rc;
607	}
608
609	rc = lu_site_init_finish(&site->cs_lu);
610	if (rc)
611		return rc;
612
613	ed->ed_site = site;
614	return 0;
615}
616
617static void echo_site_fini(const struct lu_env *env, struct echo_device *ed)
618{
619	if (ed->ed_site) {
620		cl_site_fini(ed->ed_site);
621		ed->ed_site = NULL;
622	}
623}
624
625static void *echo_thread_key_init(const struct lu_context *ctx,
626			  struct lu_context_key *key)
627{
628	struct echo_thread_info *info;
629
630	OBD_SLAB_ALLOC_PTR_GFP(info, echo_thread_kmem, GFP_NOFS);
631	if (info == NULL)
632		info = ERR_PTR(-ENOMEM);
633	return info;
634}
635
636static void echo_thread_key_fini(const struct lu_context *ctx,
637			 struct lu_context_key *key, void *data)
638{
639	struct echo_thread_info *info = data;
640	OBD_SLAB_FREE_PTR(info, echo_thread_kmem);
641}
642
643static void echo_thread_key_exit(const struct lu_context *ctx,
644			 struct lu_context_key *key, void *data)
645{
646}
647
648static struct lu_context_key echo_thread_key = {
649	.lct_tags = LCT_CL_THREAD,
650	.lct_init = echo_thread_key_init,
651	.lct_fini = echo_thread_key_fini,
652	.lct_exit = echo_thread_key_exit
653};
654
655static void *echo_session_key_init(const struct lu_context *ctx,
656				  struct lu_context_key *key)
657{
658	struct echo_session_info *session;
659
660	OBD_SLAB_ALLOC_PTR_GFP(session, echo_session_kmem, GFP_NOFS);
661	if (session == NULL)
662		session = ERR_PTR(-ENOMEM);
663	return session;
664}
665
666static void echo_session_key_fini(const struct lu_context *ctx,
667				 struct lu_context_key *key, void *data)
668{
669	struct echo_session_info *session = data;
670	OBD_SLAB_FREE_PTR(session, echo_session_kmem);
671}
672
673static void echo_session_key_exit(const struct lu_context *ctx,
674				 struct lu_context_key *key, void *data)
675{
676}
677
678static struct lu_context_key echo_session_key = {
679	.lct_tags = LCT_SESSION,
680	.lct_init = echo_session_key_init,
681	.lct_fini = echo_session_key_fini,
682	.lct_exit = echo_session_key_exit
683};
684
685LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
686
687static struct lu_device *echo_device_alloc(const struct lu_env *env,
688					   struct lu_device_type *t,
689					   struct lustre_cfg *cfg)
690{
691	struct lu_device   *next;
692	struct echo_device *ed;
693	struct cl_device   *cd;
694	struct obd_device  *obd = NULL; /* to keep compiler happy */
695	struct obd_device  *tgt;
696	const char *tgt_type_name;
697	int rc;
698	int cleanup = 0;
699
700	OBD_ALLOC_PTR(ed);
701	if (ed == NULL)
702		GOTO(out, rc = -ENOMEM);
703
704	cleanup = 1;
705	cd = &ed->ed_cl;
706	rc = cl_device_init(cd, t);
707	if (rc)
708		GOTO(out, rc);
709
710	cd->cd_lu_dev.ld_ops = &echo_device_lu_ops;
711	cd->cd_ops = &echo_device_cl_ops;
712
713	cleanup = 2;
714	obd = class_name2obd(lustre_cfg_string(cfg, 0));
715	LASSERT(obd != NULL);
716	LASSERT(env != NULL);
717
718	tgt = class_name2obd(lustre_cfg_string(cfg, 1));
719	if (tgt == NULL) {
720		CERROR("Can not find tgt device %s\n",
721			lustre_cfg_string(cfg, 1));
722		GOTO(out, rc = -ENODEV);
723	}
724
725	next = tgt->obd_lu_dev;
726	if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
727		CERROR("echo MDT client must be run on server\n");
728		GOTO(out, rc = -EOPNOTSUPP);
729	}
730
731	rc = echo_site_init(env, ed);
732	if (rc)
733		GOTO(out, rc);
734
735	cleanup = 3;
736
737	rc = echo_client_setup(env, obd, cfg);
738	if (rc)
739		GOTO(out, rc);
740
741	ed->ed_ec = &obd->u.echo_client;
742	cleanup = 4;
743
744	/* if echo client is to be stacked upon ost device, the next is
745	 * NULL since ost is not a clio device so far */
746	if (next != NULL && !lu_device_is_cl(next))
747		next = NULL;
748
749	tgt_type_name = tgt->obd_type->typ_name;
750	if (next != NULL) {
751		LASSERT(next != NULL);
752		if (next->ld_site != NULL)
753			GOTO(out, rc = -EBUSY);
754
755		next->ld_site = &ed->ed_site->cs_lu;
756		rc = next->ld_type->ldt_ops->ldto_device_init(env, next,
757						next->ld_type->ldt_name,
758							      NULL);
759		if (rc)
760			GOTO(out, rc);
761
762		/* Tricky case, I have to determine the obd type since
763		 * CLIO uses the different parameters to initialize
764		 * objects for lov & osc. */
765		if (strcmp(tgt_type_name, LUSTRE_LOV_NAME) == 0)
766			ed->ed_next_islov = 1;
767		else
768			LASSERT(strcmp(tgt_type_name,
769				       LUSTRE_OSC_NAME) == 0);
770	} else {
771		LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0);
772	}
773
774	ed->ed_next = next;
775	return &cd->cd_lu_dev;
776out:
777	switch (cleanup) {
778	case 4: {
779		int rc2;
780		rc2 = echo_client_cleanup(obd);
781		if (rc2)
782			CERROR("Cleanup obd device %s error(%d)\n",
783			       obd->obd_name, rc2);
784	}
785
786	case 3:
787		echo_site_fini(env, ed);
788	case 2:
789		cl_device_fini(&ed->ed_cl);
790	case 1:
791		OBD_FREE_PTR(ed);
792	case 0:
793	default:
794		break;
795	}
796	return ERR_PTR(rc);
797}
798
799static int echo_device_init(const struct lu_env *env, struct lu_device *d,
800			  const char *name, struct lu_device *next)
801{
802	LBUG();
803	return 0;
804}
805
806static struct lu_device *echo_device_fini(const struct lu_env *env,
807					  struct lu_device *d)
808{
809	struct echo_device *ed = cl2echo_dev(lu2cl_dev(d));
810	struct lu_device *next = ed->ed_next;
811
812	while (next)
813		next = next->ld_type->ldt_ops->ldto_device_fini(env, next);
814	return NULL;
815}
816
817static void echo_lock_release(const struct lu_env *env,
818			      struct echo_lock *ecl,
819			      int still_used)
820{
821	struct cl_lock *clk = echo_lock2cl(ecl);
822
823	cl_lock_get(clk);
824	cl_unuse(env, clk);
825	cl_lock_release(env, clk, "ec enqueue", ecl->el_object);
826	if (!still_used) {
827		cl_lock_mutex_get(env, clk);
828		cl_lock_cancel(env, clk);
829		cl_lock_delete(env, clk);
830		cl_lock_mutex_put(env, clk);
831	}
832	cl_lock_put(env, clk);
833}
834
835static struct lu_device *echo_device_free(const struct lu_env *env,
836					  struct lu_device *d)
837{
838	struct echo_device     *ed   = cl2echo_dev(lu2cl_dev(d));
839	struct echo_client_obd *ec   = ed->ed_ec;
840	struct echo_object     *eco;
841	struct lu_device       *next = ed->ed_next;
842
843	CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n",
844	       ed, next);
845
846	lu_site_purge(env, &ed->ed_site->cs_lu, -1);
847
848	/* check if there are objects still alive.
849	 * It shouldn't have any object because lu_site_purge would cleanup
850	 * all of cached objects. Anyway, probably the echo device is being
851	 * parallelly accessed.
852	 */
853	spin_lock(&ec->ec_lock);
854	list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain)
855		eco->eo_deleted = 1;
856	spin_unlock(&ec->ec_lock);
857
858	/* purge again */
859	lu_site_purge(env, &ed->ed_site->cs_lu, -1);
860
861	CDEBUG(D_INFO,
862	       "Waiting for the reference of echo object to be dropped\n");
863
864	/* Wait for the last reference to be dropped. */
865	spin_lock(&ec->ec_lock);
866	while (!list_empty(&ec->ec_objects)) {
867		spin_unlock(&ec->ec_lock);
868		CERROR("echo_client still has objects at cleanup time, "
869		       "wait for 1 second\n");
870		set_current_state(TASK_UNINTERRUPTIBLE);
871		schedule_timeout(cfs_time_seconds(1));
872		lu_site_purge(env, &ed->ed_site->cs_lu, -1);
873		spin_lock(&ec->ec_lock);
874	}
875	spin_unlock(&ec->ec_lock);
876
877	LASSERT(list_empty(&ec->ec_locks));
878
879	CDEBUG(D_INFO, "No object exists, exiting...\n");
880
881	echo_client_cleanup(d->ld_obd);
882
883	while (next)
884		next = next->ld_type->ldt_ops->ldto_device_free(env, next);
885
886	LASSERT(ed->ed_site == lu2cl_site(d->ld_site));
887	echo_site_fini(env, ed);
888	cl_device_fini(&ed->ed_cl);
889	OBD_FREE_PTR(ed);
890
891	return NULL;
892}
893
894static const struct lu_device_type_operations echo_device_type_ops = {
895	.ldto_init = echo_type_init,
896	.ldto_fini = echo_type_fini,
897
898	.ldto_start = echo_type_start,
899	.ldto_stop  = echo_type_stop,
900
901	.ldto_device_alloc = echo_device_alloc,
902	.ldto_device_free  = echo_device_free,
903	.ldto_device_init  = echo_device_init,
904	.ldto_device_fini  = echo_device_fini
905};
906
907static struct lu_device_type echo_device_type = {
908	.ldt_tags     = LU_DEVICE_CL,
909	.ldt_name     = LUSTRE_ECHO_CLIENT_NAME,
910	.ldt_ops      = &echo_device_type_ops,
911	.ldt_ctx_tags = LCT_CL_THREAD,
912};
913/** @} echo_init */
914
915/** \defgroup echo_exports Exported operations
916 *
917 * exporting functions to echo client
918 *
919 * @{
920 */
921
922/* Interfaces to echo client obd device */
923static struct echo_object *cl_echo_object_find(struct echo_device *d,
924					       struct lov_stripe_md **lsmp)
925{
926	struct lu_env *env;
927	struct echo_thread_info *info;
928	struct echo_object_conf *conf;
929	struct lov_stripe_md    *lsm;
930	struct echo_object *eco;
931	struct cl_object   *obj;
932	struct lu_fid *fid;
933	int refcheck;
934	int rc;
935
936	LASSERT(lsmp);
937	lsm = *lsmp;
938	LASSERT(lsm);
939	LASSERTF(ostid_id(&lsm->lsm_oi) != 0, DOSTID"\n", POSTID(&lsm->lsm_oi));
940	LASSERTF(ostid_seq(&lsm->lsm_oi) == FID_SEQ_ECHO, DOSTID"\n",
941		 POSTID(&lsm->lsm_oi));
942
943	/* Never return an object if the obd is to be freed. */
944	if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping)
945		return ERR_PTR(-ENODEV);
946
947	env = cl_env_get(&refcheck);
948	if (IS_ERR(env))
949		return (void *)env;
950
951	info = echo_env_info(env);
952	conf = &info->eti_conf;
953	if (d->ed_next) {
954		if (!d->ed_next_islov) {
955			struct lov_oinfo *oinfo = lsm->lsm_oinfo[0];
956			LASSERT(oinfo != NULL);
957			oinfo->loi_oi = lsm->lsm_oi;
958			conf->eoc_cl.u.coc_oinfo = oinfo;
959		} else {
960			struct lustre_md *md;
961			md = &info->eti_md;
962			memset(md, 0, sizeof(*md));
963			md->lsm = lsm;
964			conf->eoc_cl.u.coc_md = md;
965		}
966	}
967	conf->eoc_md = lsmp;
968
969	fid  = &info->eti_fid;
970	rc = ostid_to_fid(fid, &lsm->lsm_oi, 0);
971	if (rc != 0)
972		GOTO(out, eco = ERR_PTR(rc));
973
974	/* In the function below, .hs_keycmp resolves to
975	 * lu_obj_hop_keycmp() */
976	/* coverity[overrun-buffer-val] */
977	obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl);
978	if (IS_ERR(obj))
979		GOTO(out, eco = (void *)obj);
980
981	eco = cl2echo_obj(obj);
982	if (eco->eo_deleted) {
983		cl_object_put(env, obj);
984		eco = ERR_PTR(-EAGAIN);
985	}
986
987out:
988	cl_env_put(env, &refcheck);
989	return eco;
990}
991
992static int cl_echo_object_put(struct echo_object *eco)
993{
994	struct lu_env *env;
995	struct cl_object *obj = echo_obj2cl(eco);
996	int refcheck;
997
998	env = cl_env_get(&refcheck);
999	if (IS_ERR(env))
1000		return PTR_ERR(env);
1001
1002	/* an external function to kill an object? */
1003	if (eco->eo_deleted) {
1004		struct lu_object_header *loh = obj->co_lu.lo_header;
1005		LASSERT(&eco->eo_hdr == luh2coh(loh));
1006		set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags);
1007	}
1008
1009	cl_object_put(env, obj);
1010	cl_env_put(env, &refcheck);
1011	return 0;
1012}
1013
1014static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
1015			    u64 start, u64 end, int mode,
1016			    __u64 *cookie , __u32 enqflags)
1017{
1018	struct cl_io *io;
1019	struct cl_lock *lck;
1020	struct cl_object *obj;
1021	struct cl_lock_descr *descr;
1022	struct echo_thread_info *info;
1023	int rc = -ENOMEM;
1024
1025	info = echo_env_info(env);
1026	io = &info->eti_io;
1027	descr = &info->eti_descr;
1028	obj = echo_obj2cl(eco);
1029
1030	descr->cld_obj   = obj;
1031	descr->cld_start = cl_index(obj, start);
1032	descr->cld_end   = cl_index(obj, end);
1033	descr->cld_mode  = mode == LCK_PW ? CLM_WRITE : CLM_READ;
1034	descr->cld_enq_flags = enqflags;
1035	io->ci_obj = obj;
1036
1037	lck = cl_lock_request(env, io, descr, "ec enqueue", eco);
1038	if (lck) {
1039		struct echo_client_obd *ec = eco->eo_dev->ed_ec;
1040		struct echo_lock *el;
1041
1042		rc = cl_wait(env, lck);
1043		if (rc == 0) {
1044			el = cl2echo_lock(cl_lock_at(lck, &echo_device_type));
1045			spin_lock(&ec->ec_lock);
1046			if (list_empty(&el->el_chain)) {
1047				list_add(&el->el_chain, &ec->ec_locks);
1048				el->el_cookie = ++ec->ec_unique;
1049			}
1050			atomic_inc(&el->el_refcount);
1051			*cookie = el->el_cookie;
1052			spin_unlock(&ec->ec_lock);
1053		} else {
1054			cl_lock_release(env, lck, "ec enqueue", current);
1055		}
1056	}
1057	return rc;
1058}
1059
1060static int cl_echo_enqueue(struct echo_object *eco, u64 start, u64 end,
1061			   int mode, __u64 *cookie)
1062{
1063	struct echo_thread_info *info;
1064	struct lu_env *env;
1065	struct cl_io *io;
1066	int refcheck;
1067	int result;
1068
1069	env = cl_env_get(&refcheck);
1070	if (IS_ERR(env))
1071		return PTR_ERR(env);
1072
1073	info = echo_env_info(env);
1074	io = &info->eti_io;
1075
1076	io->ci_ignore_layout = 1;
1077	result = cl_io_init(env, io, CIT_MISC, echo_obj2cl(eco));
1078	if (result < 0)
1079		GOTO(out, result);
1080	LASSERT(result == 0);
1081
1082	result = cl_echo_enqueue0(env, eco, start, end, mode, cookie, 0);
1083	cl_io_fini(env, io);
1084
1085out:
1086	cl_env_put(env, &refcheck);
1087	return result;
1088}
1089
1090static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed,
1091			   __u64 cookie)
1092{
1093	struct echo_client_obd *ec = ed->ed_ec;
1094	struct echo_lock       *ecl = NULL;
1095	struct list_head	     *el;
1096	int found = 0, still_used = 0;
1097
1098	LASSERT(ec != NULL);
1099	spin_lock(&ec->ec_lock);
1100	list_for_each (el, &ec->ec_locks) {
1101		ecl = list_entry (el, struct echo_lock, el_chain);
1102		CDEBUG(D_INFO, "ecl: %p, cookie: %#llx\n", ecl, ecl->el_cookie);
1103		found = (ecl->el_cookie == cookie);
1104		if (found) {
1105			if (atomic_dec_and_test(&ecl->el_refcount))
1106				list_del_init(&ecl->el_chain);
1107			else
1108				still_used = 1;
1109			break;
1110		}
1111	}
1112	spin_unlock(&ec->ec_lock);
1113
1114	if (!found)
1115		return -ENOENT;
1116
1117	echo_lock_release(env, ecl, still_used);
1118	return 0;
1119}
1120
1121static int cl_echo_cancel(struct echo_device *ed, __u64 cookie)
1122{
1123	struct lu_env *env;
1124	int refcheck;
1125	int rc;
1126
1127	env = cl_env_get(&refcheck);
1128	if (IS_ERR(env))
1129		return PTR_ERR(env);
1130
1131	rc = cl_echo_cancel0(env, ed, cookie);
1132
1133	cl_env_put(env, &refcheck);
1134	return rc;
1135}
1136
1137static int cl_echo_async_brw(const struct lu_env *env, struct cl_io *io,
1138			     enum cl_req_type unused, struct cl_2queue *queue)
1139{
1140	struct cl_page *clp;
1141	struct cl_page *temp;
1142	int result = 0;
1143
1144	cl_page_list_for_each_safe(clp, temp, &queue->c2_qin) {
1145		int rc;
1146		rc = cl_page_cache_add(env, io, clp, CRT_WRITE);
1147		if (rc == 0)
1148			continue;
1149		result = result ?: rc;
1150	}
1151	return result;
1152}
1153
1154static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
1155			      struct page **pages, int npages, int async)
1156{
1157	struct lu_env	   *env;
1158	struct echo_thread_info *info;
1159	struct cl_object	*obj = echo_obj2cl(eco);
1160	struct echo_device      *ed  = eco->eo_dev;
1161	struct cl_2queue	*queue;
1162	struct cl_io	    *io;
1163	struct cl_page	  *clp;
1164	struct lustre_handle    lh = { 0 };
1165	int page_size = cl_page_size(obj);
1166	int refcheck;
1167	int rc;
1168	int i;
1169
1170	LASSERT((offset & ~CFS_PAGE_MASK) == 0);
1171	LASSERT(ed->ed_next != NULL);
1172	env = cl_env_get(&refcheck);
1173	if (IS_ERR(env))
1174		return PTR_ERR(env);
1175
1176	info    = echo_env_info(env);
1177	io      = &info->eti_io;
1178	queue   = &info->eti_queue;
1179
1180	cl_2queue_init(queue);
1181
1182	io->ci_ignore_layout = 1;
1183	rc = cl_io_init(env, io, CIT_MISC, obj);
1184	if (rc < 0)
1185		GOTO(out, rc);
1186	LASSERT(rc == 0);
1187
1188
1189	rc = cl_echo_enqueue0(env, eco, offset,
1190			      offset + npages * PAGE_CACHE_SIZE - 1,
1191			      rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
1192			      CEF_NEVER);
1193	if (rc < 0)
1194		GOTO(error_lock, rc);
1195
1196	for (i = 0; i < npages; i++) {
1197		LASSERT(pages[i]);
1198		clp = cl_page_find(env, obj, cl_index(obj, offset),
1199				   pages[i], CPT_TRANSIENT);
1200		if (IS_ERR(clp)) {
1201			rc = PTR_ERR(clp);
1202			break;
1203		}
1204		LASSERT(clp->cp_type == CPT_TRANSIENT);
1205
1206		rc = cl_page_own(env, io, clp);
1207		if (rc) {
1208			LASSERT(clp->cp_state == CPS_FREEING);
1209			cl_page_put(env, clp);
1210			break;
1211		}
1212
1213		cl_2queue_add(queue, clp);
1214
1215		/* drop the reference count for cl_page_find, so that the page
1216		 * will be freed in cl_2queue_fini. */
1217		cl_page_put(env, clp);
1218		cl_page_clip(env, clp, 0, page_size);
1219
1220		offset += page_size;
1221	}
1222
1223	if (rc == 0) {
1224		enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
1225
1226		async = async && (typ == CRT_WRITE);
1227		if (async)
1228			rc = cl_echo_async_brw(env, io, typ, queue);
1229		else
1230			rc = cl_io_submit_sync(env, io, typ, queue, 0);
1231		CDEBUG(D_INFO, "echo_client %s write returns %d\n",
1232		       async ? "async" : "sync", rc);
1233	}
1234
1235	cl_echo_cancel0(env, ed, lh.cookie);
1236error_lock:
1237	cl_2queue_discard(env, io, queue);
1238	cl_2queue_disown(env, io, queue);
1239	cl_2queue_fini(env, queue);
1240	cl_io_fini(env, io);
1241out:
1242	cl_env_put(env, &refcheck);
1243	return rc;
1244}
1245/** @} echo_exports */
1246
1247
1248static u64 last_object_id;
1249
1250static int
1251echo_copyout_lsm (struct lov_stripe_md *lsm, void *_ulsm, int ulsm_nob)
1252{
1253	struct lov_stripe_md *ulsm = _ulsm;
1254	int nob, i;
1255
1256	nob = offsetof (struct lov_stripe_md, lsm_oinfo[lsm->lsm_stripe_count]);
1257	if (nob > ulsm_nob)
1258		return -EINVAL;
1259
1260	if (copy_to_user (ulsm, lsm, sizeof(*ulsm)))
1261		return -EFAULT;
1262
1263	for (i = 0; i < lsm->lsm_stripe_count; i++) {
1264		if (copy_to_user (ulsm->lsm_oinfo[i], lsm->lsm_oinfo[i],
1265				      sizeof(lsm->lsm_oinfo[0])))
1266			return -EFAULT;
1267	}
1268	return 0;
1269}
1270
1271static int
1272echo_copyin_lsm (struct echo_device *ed, struct lov_stripe_md *lsm,
1273		 void *ulsm, int ulsm_nob)
1274{
1275	struct echo_client_obd *ec = ed->ed_ec;
1276	int		     i;
1277
1278	if (ulsm_nob < sizeof (*lsm))
1279		return -EINVAL;
1280
1281	if (copy_from_user (lsm, ulsm, sizeof (*lsm)))
1282		return -EFAULT;
1283
1284	if (lsm->lsm_stripe_count > ec->ec_nstripes ||
1285	    lsm->lsm_magic != LOV_MAGIC ||
1286	    (lsm->lsm_stripe_size & (~CFS_PAGE_MASK)) != 0 ||
1287	    ((__u64)lsm->lsm_stripe_size * lsm->lsm_stripe_count > ~0UL))
1288		return -EINVAL;
1289
1290
1291	for (i = 0; i < lsm->lsm_stripe_count; i++) {
1292		if (copy_from_user(lsm->lsm_oinfo[i],
1293				       ((struct lov_stripe_md *)ulsm)-> \
1294				       lsm_oinfo[i],
1295				       sizeof(lsm->lsm_oinfo[0])))
1296			return -EFAULT;
1297	}
1298	return 0;
1299}
1300
1301static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
1302			      int on_target, struct obdo *oa, void *ulsm,
1303			      int ulsm_nob, struct obd_trans_info *oti)
1304{
1305	struct echo_object     *eco;
1306	struct echo_client_obd *ec = ed->ed_ec;
1307	struct lov_stripe_md   *lsm = NULL;
1308	int		     rc;
1309	int		     created = 0;
1310
1311	if ((oa->o_valid & OBD_MD_FLID) == 0 && /* no obj id */
1312	    (on_target ||		       /* set_stripe */
1313	     ec->ec_nstripes != 0)) {	   /* LOV */
1314		CERROR ("No valid oid\n");
1315		return -EINVAL;
1316	}
1317
1318	rc = echo_alloc_memmd(ed, &lsm);
1319	if (rc < 0) {
1320		CERROR("Cannot allocate md: rc = %d\n", rc);
1321		GOTO(failed, rc);
1322	}
1323
1324	if (ulsm != NULL) {
1325		int i, idx;
1326
1327		rc = echo_copyin_lsm (ed, lsm, ulsm, ulsm_nob);
1328		if (rc != 0)
1329			GOTO(failed, rc);
1330
1331		if (lsm->lsm_stripe_count == 0)
1332			lsm->lsm_stripe_count = ec->ec_nstripes;
1333
1334		if (lsm->lsm_stripe_size == 0)
1335			lsm->lsm_stripe_size = PAGE_CACHE_SIZE;
1336
1337		idx = cfs_rand();
1338
1339		/* setup stripes: indices + default ids if required */
1340		for (i = 0; i < lsm->lsm_stripe_count; i++) {
1341			if (ostid_id(&lsm->lsm_oinfo[i]->loi_oi) == 0)
1342				lsm->lsm_oinfo[i]->loi_oi = lsm->lsm_oi;
1343
1344			lsm->lsm_oinfo[i]->loi_ost_idx =
1345				(idx + i) % ec->ec_nstripes;
1346		}
1347	}
1348
1349	/* setup object ID here for !on_target and LOV hint */
1350	if (oa->o_valid & OBD_MD_FLID) {
1351		LASSERT(oa->o_valid & OBD_MD_FLGROUP);
1352		lsm->lsm_oi = oa->o_oi;
1353	}
1354
1355	if (ostid_id(&lsm->lsm_oi) == 0)
1356		ostid_set_id(&lsm->lsm_oi, ++last_object_id);
1357
1358	rc = 0;
1359	if (on_target) {
1360		/* Only echo objects are allowed to be created */
1361		LASSERT((oa->o_valid & OBD_MD_FLGROUP) &&
1362			(ostid_seq(&oa->o_oi) == FID_SEQ_ECHO));
1363		rc = obd_create(env, ec->ec_exp, oa, &lsm, oti);
1364		if (rc != 0) {
1365			CERROR("Cannot create objects: rc = %d\n", rc);
1366			GOTO(failed, rc);
1367		}
1368		created = 1;
1369	}
1370
1371	/* See what object ID we were given */
1372	oa->o_oi = lsm->lsm_oi;
1373	oa->o_valid |= OBD_MD_FLID;
1374
1375	eco = cl_echo_object_find(ed, &lsm);
1376	if (IS_ERR(eco))
1377		GOTO(failed, rc = PTR_ERR(eco));
1378	cl_echo_object_put(eco);
1379
1380	CDEBUG(D_INFO, "oa oid "DOSTID"\n", POSTID(&oa->o_oi));
1381
1382 failed:
1383	if (created && rc)
1384		obd_destroy(env, ec->ec_exp, oa, lsm, oti, NULL, NULL);
1385	if (lsm)
1386		echo_free_memmd(ed, &lsm);
1387	if (rc)
1388		CERROR("create object failed with: rc = %d\n", rc);
1389	return rc;
1390}
1391
1392static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
1393			   struct obdo *oa)
1394{
1395	struct lov_stripe_md   *lsm = NULL;
1396	struct echo_object     *eco;
1397	int		     rc;
1398
1399	if ((oa->o_valid & OBD_MD_FLID) == 0 || ostid_id(&oa->o_oi) == 0) {
1400		/* disallow use of object id 0 */
1401		CERROR ("No valid oid\n");
1402		return -EINVAL;
1403	}
1404
1405	rc = echo_alloc_memmd(ed, &lsm);
1406	if (rc < 0)
1407		return rc;
1408
1409	lsm->lsm_oi = oa->o_oi;
1410	if (!(oa->o_valid & OBD_MD_FLGROUP))
1411		ostid_set_seq_echo(&lsm->lsm_oi);
1412
1413	rc = 0;
1414	eco = cl_echo_object_find(ed, &lsm);
1415	if (!IS_ERR(eco))
1416		*ecop = eco;
1417	else
1418		rc = PTR_ERR(eco);
1419	if (lsm)
1420		echo_free_memmd(ed, &lsm);
1421	return rc;
1422}
1423
1424static void echo_put_object(struct echo_object *eco)
1425{
1426	if (cl_echo_object_put(eco))
1427		CERROR("echo client: drop an object failed");
1428}
1429
1430static void
1431echo_get_stripe_off_id(struct lov_stripe_md *lsm, u64 *offp, u64 *idp)
1432{
1433	unsigned long stripe_count;
1434	unsigned long stripe_size;
1435	unsigned long width;
1436	unsigned long woffset;
1437	int	   stripe_index;
1438	u64       offset;
1439
1440	if (lsm->lsm_stripe_count <= 1)
1441		return;
1442
1443	offset       = *offp;
1444	stripe_size  = lsm->lsm_stripe_size;
1445	stripe_count = lsm->lsm_stripe_count;
1446
1447	/* width = # bytes in all stripes */
1448	width = stripe_size * stripe_count;
1449
1450	/* woffset = offset within a width; offset = whole number of widths */
1451	woffset = do_div (offset, width);
1452
1453	stripe_index = woffset / stripe_size;
1454
1455	*idp = ostid_id(&lsm->lsm_oinfo[stripe_index]->loi_oi);
1456	*offp = offset * stripe_size + woffset % stripe_size;
1457}
1458
1459static void
1460echo_client_page_debug_setup(struct lov_stripe_md *lsm,
1461			     struct page *page, int rw, u64 id,
1462			     u64 offset, u64 count)
1463{
1464	char    *addr;
1465	u64	 stripe_off;
1466	u64	 stripe_id;
1467	int      delta;
1468
1469	/* no partial pages on the client */
1470	LASSERT(count == PAGE_CACHE_SIZE);
1471
1472	addr = kmap(page);
1473
1474	for (delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
1475		if (rw == OBD_BRW_WRITE) {
1476			stripe_off = offset + delta;
1477			stripe_id = id;
1478			echo_get_stripe_off_id(lsm, &stripe_off, &stripe_id);
1479		} else {
1480			stripe_off = 0xdeadbeef00c0ffeeULL;
1481			stripe_id = 0xdeadbeef00c0ffeeULL;
1482		}
1483		block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
1484				  stripe_off, stripe_id);
1485	}
1486
1487	kunmap(page);
1488}
1489
1490static int echo_client_page_debug_check(struct lov_stripe_md *lsm,
1491					struct page *page, u64 id,
1492					u64 offset, u64 count)
1493{
1494	u64	stripe_off;
1495	u64	stripe_id;
1496	char   *addr;
1497	int     delta;
1498	int     rc;
1499	int     rc2;
1500
1501	/* no partial pages on the client */
1502	LASSERT(count == PAGE_CACHE_SIZE);
1503
1504	addr = kmap(page);
1505
1506	for (rc = delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
1507		stripe_off = offset + delta;
1508		stripe_id = id;
1509		echo_get_stripe_off_id (lsm, &stripe_off, &stripe_id);
1510
1511		rc2 = block_debug_check("test_brw",
1512					addr + delta, OBD_ECHO_BLOCK_SIZE,
1513					stripe_off, stripe_id);
1514		if (rc2 != 0) {
1515			CERROR ("Error in echo object %#llx\n", id);
1516			rc = rc2;
1517		}
1518	}
1519
1520	kunmap(page);
1521	return rc;
1522}
1523
1524static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
1525			    struct echo_object *eco, u64 offset,
1526			    u64 count, int async,
1527			    struct obd_trans_info *oti)
1528{
1529	struct lov_stripe_md   *lsm = eco->eo_lsm;
1530	u32	       npages;
1531	struct brw_page	*pga;
1532	struct brw_page	*pgp;
1533	struct page	    **pages;
1534	u64		 off;
1535	int		     i;
1536	int		     rc;
1537	int		     verify;
1538	gfp_t		     gfp_mask;
1539	int		     brw_flags = 0;
1540
1541	verify = (ostid_id(&oa->o_oi) != ECHO_PERSISTENT_OBJID &&
1542		  (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
1543		  (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
1544
1545	gfp_mask = ((ostid_id(&oa->o_oi) & 2) == 0) ? GFP_IOFS : GFP_HIGHUSER;
1546
1547	LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
1548	LASSERT(lsm != NULL);
1549	LASSERT(ostid_id(&lsm->lsm_oi) == ostid_id(&oa->o_oi));
1550
1551	if (count <= 0 ||
1552	    (count & (~CFS_PAGE_MASK)) != 0)
1553		return -EINVAL;
1554
1555	/* XXX think again with misaligned I/O */
1556	npages = count >> PAGE_CACHE_SHIFT;
1557
1558	if (rw == OBD_BRW_WRITE)
1559		brw_flags = OBD_BRW_ASYNC;
1560
1561	OBD_ALLOC(pga, npages * sizeof(*pga));
1562	if (pga == NULL)
1563		return -ENOMEM;
1564
1565	OBD_ALLOC(pages, npages * sizeof(*pages));
1566	if (pages == NULL) {
1567		OBD_FREE(pga, npages * sizeof(*pga));
1568		return -ENOMEM;
1569	}
1570
1571	for (i = 0, pgp = pga, off = offset;
1572	     i < npages;
1573	     i++, pgp++, off += PAGE_CACHE_SIZE) {
1574
1575		LASSERT (pgp->pg == NULL);      /* for cleanup */
1576
1577		rc = -ENOMEM;
1578		OBD_PAGE_ALLOC(pgp->pg, gfp_mask);
1579		if (pgp->pg == NULL)
1580			goto out;
1581
1582		pages[i] = pgp->pg;
1583		pgp->count = PAGE_CACHE_SIZE;
1584		pgp->off = off;
1585		pgp->flag = brw_flags;
1586
1587		if (verify)
1588			echo_client_page_debug_setup(lsm, pgp->pg, rw,
1589						     ostid_id(&oa->o_oi), off,
1590						     pgp->count);
1591	}
1592
1593	/* brw mode can only be used at client */
1594	LASSERT(ed->ed_next != NULL);
1595	rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async);
1596
1597 out:
1598	if (rc != 0 || rw != OBD_BRW_READ)
1599		verify = 0;
1600
1601	for (i = 0, pgp = pga; i < npages; i++, pgp++) {
1602		if (pgp->pg == NULL)
1603			continue;
1604
1605		if (verify) {
1606			int vrc;
1607			vrc = echo_client_page_debug_check(lsm, pgp->pg,
1608							   ostid_id(&oa->o_oi),
1609							   pgp->off, pgp->count);
1610			if (vrc != 0 && rc == 0)
1611				rc = vrc;
1612		}
1613		OBD_PAGE_FREE(pgp->pg);
1614	}
1615	OBD_FREE(pga, npages * sizeof(*pga));
1616	OBD_FREE(pages, npages * sizeof(*pages));
1617	return rc;
1618}
1619
1620static int echo_client_prep_commit(const struct lu_env *env,
1621				   struct obd_export *exp, int rw,
1622				   struct obdo *oa, struct echo_object *eco,
1623				   u64 offset, u64 count,
1624				   u64 batch, struct obd_trans_info *oti,
1625				   int async)
1626{
1627	struct lov_stripe_md *lsm = eco->eo_lsm;
1628	struct obd_ioobj ioo;
1629	struct niobuf_local *lnb;
1630	struct niobuf_remote *rnb;
1631	u64 off;
1632	u64 npages, tot_pages;
1633	int i, ret = 0, brw_flags = 0;
1634
1635	if (count <= 0 || (count & (~CFS_PAGE_MASK)) != 0 ||
1636	    (lsm != NULL && ostid_id(&lsm->lsm_oi) != ostid_id(&oa->o_oi)))
1637		return -EINVAL;
1638
1639	npages = batch >> PAGE_CACHE_SHIFT;
1640	tot_pages = count >> PAGE_CACHE_SHIFT;
1641
1642	OBD_ALLOC(lnb, npages * sizeof(struct niobuf_local));
1643	OBD_ALLOC(rnb, npages * sizeof(struct niobuf_remote));
1644
1645	if (lnb == NULL || rnb == NULL)
1646		GOTO(out, ret = -ENOMEM);
1647
1648	if (rw == OBD_BRW_WRITE && async)
1649		brw_flags |= OBD_BRW_ASYNC;
1650
1651	obdo_to_ioobj(oa, &ioo);
1652
1653	off = offset;
1654
1655	for (; tot_pages; tot_pages -= npages) {
1656		int lpages;
1657
1658		if (tot_pages < npages)
1659			npages = tot_pages;
1660
1661		for (i = 0; i < npages; i++, off += PAGE_CACHE_SIZE) {
1662			rnb[i].offset = off;
1663			rnb[i].len = PAGE_CACHE_SIZE;
1664			rnb[i].flags = brw_flags;
1665		}
1666
1667		ioo.ioo_bufcnt = npages;
1668		oti->oti_transno = 0;
1669
1670		lpages = npages;
1671		ret = obd_preprw(env, rw, exp, oa, 1, &ioo, rnb, &lpages,
1672				 lnb, oti, NULL);
1673		if (ret != 0)
1674			GOTO(out, ret);
1675		LASSERT(lpages == npages);
1676
1677		for (i = 0; i < lpages; i++) {
1678			struct page *page = lnb[i].page;
1679
1680			/* read past eof? */
1681			if (page == NULL && lnb[i].rc == 0)
1682				continue;
1683
1684			if (async)
1685				lnb[i].flags |= OBD_BRW_ASYNC;
1686
1687			if (ostid_id(&oa->o_oi) == ECHO_PERSISTENT_OBJID ||
1688			    (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
1689			    (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
1690				continue;
1691
1692			if (rw == OBD_BRW_WRITE)
1693				echo_client_page_debug_setup(lsm, page, rw,
1694							    ostid_id(&oa->o_oi),
1695							     rnb[i].offset,
1696							     rnb[i].len);
1697			else
1698				echo_client_page_debug_check(lsm, page,
1699							    ostid_id(&oa->o_oi),
1700							     rnb[i].offset,
1701							     rnb[i].len);
1702		}
1703
1704		ret = obd_commitrw(env, rw, exp, oa, 1, &ioo,
1705				   rnb, npages, lnb, oti, ret);
1706		if (ret != 0)
1707			GOTO(out, ret);
1708
1709		/* Reset oti otherwise it would confuse ldiskfs. */
1710		memset(oti, 0, sizeof(*oti));
1711
1712		/* Reuse env context. */
1713		lu_context_exit((struct lu_context *)&env->le_ctx);
1714		lu_context_enter((struct lu_context *)&env->le_ctx);
1715	}
1716
1717out:
1718	if (lnb)
1719		OBD_FREE(lnb, npages * sizeof(struct niobuf_local));
1720	if (rnb)
1721		OBD_FREE(rnb, npages * sizeof(struct niobuf_remote));
1722	return ret;
1723}
1724
1725static int echo_client_brw_ioctl(const struct lu_env *env, int rw,
1726				 struct obd_export *exp,
1727				 struct obd_ioctl_data *data,
1728				 struct obd_trans_info *dummy_oti)
1729{
1730	struct obd_device *obd = class_exp2obd(exp);
1731	struct echo_device *ed = obd2echo_dev(obd);
1732	struct echo_client_obd *ec = ed->ed_ec;
1733	struct obdo *oa = &data->ioc_obdo1;
1734	struct echo_object *eco;
1735	int rc;
1736	int async = 1;
1737	long test_mode;
1738
1739	LASSERT(oa->o_valid & OBD_MD_FLGROUP);
1740
1741	rc = echo_get_object(&eco, ed, oa);
1742	if (rc)
1743		return rc;
1744
1745	oa->o_valid &= ~OBD_MD_FLHANDLE;
1746
1747	/* OFD/obdfilter works only via prep/commit */
1748	test_mode = (long)data->ioc_pbuf1;
1749	if (test_mode == 1)
1750		async = 0;
1751
1752	if (ed->ed_next == NULL && test_mode != 3) {
1753		test_mode = 3;
1754		data->ioc_plen1 = data->ioc_count;
1755	}
1756
1757	/* Truncate batch size to maximum */
1758	if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE)
1759		data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE;
1760
1761	switch (test_mode) {
1762	case 1:
1763		/* fall through */
1764	case 2:
1765		rc = echo_client_kbrw(ed, rw, oa,
1766				      eco, data->ioc_offset,
1767				      data->ioc_count, async, dummy_oti);
1768		break;
1769	case 3:
1770		rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa,
1771					     eco, data->ioc_offset,
1772					     data->ioc_count, data->ioc_plen1,
1773					     dummy_oti, async);
1774		break;
1775	default:
1776		rc = -EINVAL;
1777	}
1778	echo_put_object(eco);
1779	return rc;
1780}
1781
1782static int
1783echo_client_enqueue(struct obd_export *exp, struct obdo *oa,
1784		    int mode, u64 offset, u64 nob)
1785{
1786	struct echo_device     *ed = obd2echo_dev(exp->exp_obd);
1787	struct lustre_handle   *ulh = &oa->o_handle;
1788	struct echo_object     *eco;
1789	u64		 end;
1790	int		     rc;
1791
1792	if (ed->ed_next == NULL)
1793		return -EOPNOTSUPP;
1794
1795	if (!(mode == LCK_PR || mode == LCK_PW))
1796		return -EINVAL;
1797
1798	if ((offset & (~CFS_PAGE_MASK)) != 0 ||
1799	    (nob & (~CFS_PAGE_MASK)) != 0)
1800		return -EINVAL;
1801
1802	rc = echo_get_object (&eco, ed, oa);
1803	if (rc != 0)
1804		return rc;
1805
1806	end = (nob == 0) ? ((u64) -1) : (offset + nob - 1);
1807	rc = cl_echo_enqueue(eco, offset, end, mode, &ulh->cookie);
1808	if (rc == 0) {
1809		oa->o_valid |= OBD_MD_FLHANDLE;
1810		CDEBUG(D_INFO, "Cookie is %#llx\n", ulh->cookie);
1811	}
1812	echo_put_object(eco);
1813	return rc;
1814}
1815
1816static int
1817echo_client_cancel(struct obd_export *exp, struct obdo *oa)
1818{
1819	struct echo_device *ed     = obd2echo_dev(exp->exp_obd);
1820	__u64	       cookie = oa->o_handle.cookie;
1821
1822	if ((oa->o_valid & OBD_MD_FLHANDLE) == 0)
1823		return -EINVAL;
1824
1825	CDEBUG(D_INFO, "Cookie is %#llx\n", cookie);
1826	return cl_echo_cancel(ed, cookie);
1827}
1828
1829static int
1830echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1831		      void *karg, void *uarg)
1832{
1833	struct obd_device      *obd = exp->exp_obd;
1834	struct echo_device     *ed = obd2echo_dev(obd);
1835	struct echo_client_obd *ec = ed->ed_ec;
1836	struct echo_object     *eco;
1837	struct obd_ioctl_data  *data = karg;
1838	struct obd_trans_info   dummy_oti;
1839	struct lu_env	  *env;
1840	struct oti_req_ack_lock *ack_lock;
1841	struct obdo	    *oa;
1842	struct lu_fid	   fid;
1843	int		     rw = OBD_BRW_READ;
1844	int		     rc = 0;
1845	int		     i;
1846
1847	memset(&dummy_oti, 0, sizeof(dummy_oti));
1848
1849	oa = &data->ioc_obdo1;
1850	if (!(oa->o_valid & OBD_MD_FLGROUP)) {
1851		oa->o_valid |= OBD_MD_FLGROUP;
1852		ostid_set_seq_echo(&oa->o_oi);
1853	}
1854
1855	/* This FID is unpacked just for validation at this point */
1856	rc = ostid_to_fid(&fid, &oa->o_oi, 0);
1857	if (rc < 0)
1858		return rc;
1859
1860	OBD_ALLOC_PTR(env);
1861	if (env == NULL)
1862		return -ENOMEM;
1863
1864	rc = lu_env_init(env, LCT_DT_THREAD);
1865	if (rc)
1866		GOTO(out, rc = -ENOMEM);
1867
1868	switch (cmd) {
1869	case OBD_IOC_CREATE:		    /* may create echo object */
1870		if (!capable(CFS_CAP_SYS_ADMIN))
1871			GOTO (out, rc = -EPERM);
1872
1873		rc = echo_create_object(env, ed, 1, oa, data->ioc_pbuf1,
1874					data->ioc_plen1, &dummy_oti);
1875		GOTO(out, rc);
1876
1877	case OBD_IOC_DESTROY:
1878		if (!capable(CFS_CAP_SYS_ADMIN))
1879			GOTO (out, rc = -EPERM);
1880
1881		rc = echo_get_object(&eco, ed, oa);
1882		if (rc == 0) {
1883			rc = obd_destroy(env, ec->ec_exp, oa, eco->eo_lsm,
1884					 &dummy_oti, NULL, NULL);
1885			if (rc == 0)
1886				eco->eo_deleted = 1;
1887			echo_put_object(eco);
1888		}
1889		GOTO(out, rc);
1890
1891	case OBD_IOC_GETATTR:
1892		rc = echo_get_object(&eco, ed, oa);
1893		if (rc == 0) {
1894			struct obd_info oinfo = { { { 0 } } };
1895			oinfo.oi_md = eco->eo_lsm;
1896			oinfo.oi_oa = oa;
1897			rc = obd_getattr(env, ec->ec_exp, &oinfo);
1898			echo_put_object(eco);
1899		}
1900		GOTO(out, rc);
1901
1902	case OBD_IOC_SETATTR:
1903		if (!capable(CFS_CAP_SYS_ADMIN))
1904			GOTO (out, rc = -EPERM);
1905
1906		rc = echo_get_object(&eco, ed, oa);
1907		if (rc == 0) {
1908			struct obd_info oinfo = { { { 0 } } };
1909			oinfo.oi_oa = oa;
1910			oinfo.oi_md = eco->eo_lsm;
1911
1912			rc = obd_setattr(env, ec->ec_exp, &oinfo, NULL);
1913			echo_put_object(eco);
1914		}
1915		GOTO(out, rc);
1916
1917	case OBD_IOC_BRW_WRITE:
1918		if (!capable(CFS_CAP_SYS_ADMIN))
1919			GOTO (out, rc = -EPERM);
1920
1921		rw = OBD_BRW_WRITE;
1922		/* fall through */
1923	case OBD_IOC_BRW_READ:
1924		rc = echo_client_brw_ioctl(env, rw, exp, data, &dummy_oti);
1925		GOTO(out, rc);
1926
1927	case ECHO_IOC_GET_STRIPE:
1928		rc = echo_get_object(&eco, ed, oa);
1929		if (rc == 0) {
1930			rc = echo_copyout_lsm(eco->eo_lsm, data->ioc_pbuf1,
1931					      data->ioc_plen1);
1932			echo_put_object(eco);
1933		}
1934		GOTO(out, rc);
1935
1936	case ECHO_IOC_SET_STRIPE:
1937		if (!capable(CFS_CAP_SYS_ADMIN))
1938			GOTO (out, rc = -EPERM);
1939
1940		if (data->ioc_pbuf1 == NULL) {  /* unset */
1941			rc = echo_get_object(&eco, ed, oa);
1942			if (rc == 0) {
1943				eco->eo_deleted = 1;
1944				echo_put_object(eco);
1945			}
1946		} else {
1947			rc = echo_create_object(env, ed, 0, oa,
1948						data->ioc_pbuf1,
1949						data->ioc_plen1, &dummy_oti);
1950		}
1951		GOTO (out, rc);
1952
1953	case ECHO_IOC_ENQUEUE:
1954		if (!capable(CFS_CAP_SYS_ADMIN))
1955			GOTO (out, rc = -EPERM);
1956
1957		rc = echo_client_enqueue(exp, oa,
1958					 data->ioc_conn1, /* lock mode */
1959					 data->ioc_offset,
1960					 data->ioc_count);/*extent*/
1961		GOTO (out, rc);
1962
1963	case ECHO_IOC_CANCEL:
1964		rc = echo_client_cancel(exp, oa);
1965		GOTO (out, rc);
1966
1967	default:
1968		CERROR ("echo_ioctl(): unrecognised ioctl %#x\n", cmd);
1969		GOTO (out, rc = -ENOTTY);
1970	}
1971
1972out:
1973	lu_env_fini(env);
1974	OBD_FREE_PTR(env);
1975
1976	/* XXX this should be in a helper also called by target_send_reply */
1977	for (ack_lock = dummy_oti.oti_ack_locks, i = 0; i < 4;
1978	     i++, ack_lock++) {
1979		if (!ack_lock->mode)
1980			break;
1981		ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
1982	}
1983
1984	return rc;
1985}
1986
1987static int echo_client_setup(const struct lu_env *env,
1988			     struct obd_device *obddev, struct lustre_cfg *lcfg)
1989{
1990	struct echo_client_obd *ec = &obddev->u.echo_client;
1991	struct obd_device *tgt;
1992	struct obd_uuid echo_uuid = { "ECHO_UUID" };
1993	struct obd_connect_data *ocd = NULL;
1994	int rc;
1995
1996	if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
1997		CERROR("requires a TARGET OBD name\n");
1998		return -EINVAL;
1999	}
2000
2001	tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
2002	if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
2003		CERROR("device not attached or not set up (%s)\n",
2004		       lustre_cfg_string(lcfg, 1));
2005		return -EINVAL;
2006	}
2007
2008	spin_lock_init(&ec->ec_lock);
2009	INIT_LIST_HEAD (&ec->ec_objects);
2010	INIT_LIST_HEAD (&ec->ec_locks);
2011	ec->ec_unique = 0;
2012	ec->ec_nstripes = 0;
2013
2014	OBD_ALLOC(ocd, sizeof(*ocd));
2015	if (ocd == NULL) {
2016		CERROR("Can't alloc ocd connecting to %s\n",
2017		       lustre_cfg_string(lcfg, 1));
2018		return -ENOMEM;
2019	}
2020
2021	ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL |
2022				 OBD_CONNECT_BRW_SIZE |
2023				 OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 |
2024				 OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE |
2025				 OBD_CONNECT_FID;
2026	ocd->ocd_brw_size = DT_MAX_BRW_SIZE;
2027	ocd->ocd_version = LUSTRE_VERSION_CODE;
2028	ocd->ocd_group = FID_SEQ_ECHO;
2029
2030	rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL);
2031	if (rc == 0) {
2032		/* Turn off pinger because it connects to tgt obd directly. */
2033		spin_lock(&tgt->obd_dev_lock);
2034		list_del_init(&ec->ec_exp->exp_obd_chain_timed);
2035		spin_unlock(&tgt->obd_dev_lock);
2036	}
2037
2038	OBD_FREE(ocd, sizeof(*ocd));
2039
2040	if (rc != 0) {
2041		CERROR("fail to connect to device %s\n",
2042		       lustre_cfg_string(lcfg, 1));
2043		return rc;
2044	}
2045
2046	return rc;
2047}
2048
2049static int echo_client_cleanup(struct obd_device *obddev)
2050{
2051	struct echo_client_obd *ec = &obddev->u.echo_client;
2052	int rc;
2053
2054	if (!list_empty(&obddev->obd_exports)) {
2055		CERROR("still has clients!\n");
2056		return -EBUSY;
2057	}
2058
2059	LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0);
2060	rc = obd_disconnect(ec->ec_exp);
2061	if (rc != 0)
2062		CERROR("fail to disconnect device: %d\n", rc);
2063
2064	return rc;
2065}
2066
2067static int echo_client_connect(const struct lu_env *env,
2068			       struct obd_export **exp,
2069			       struct obd_device *src, struct obd_uuid *cluuid,
2070			       struct obd_connect_data *data, void *localdata)
2071{
2072	int		rc;
2073	struct lustre_handle conn = { 0 };
2074
2075	rc = class_connect(&conn, src, cluuid);
2076	if (rc == 0) {
2077		*exp = class_conn2export(&conn);
2078	}
2079
2080	return rc;
2081}
2082
2083static int echo_client_disconnect(struct obd_export *exp)
2084{
2085	int		     rc;
2086
2087	if (exp == NULL)
2088		GOTO(out, rc = -EINVAL);
2089
2090	rc = class_disconnect(exp);
2091	GOTO(out, rc);
2092 out:
2093	return rc;
2094}
2095
2096static struct obd_ops echo_client_obd_ops = {
2097	.o_owner       = THIS_MODULE,
2098	.o_iocontrol   = echo_client_iocontrol,
2099	.o_connect     = echo_client_connect,
2100	.o_disconnect  = echo_client_disconnect
2101};
2102
2103int echo_client_init(void)
2104{
2105	struct lprocfs_static_vars lvars = { NULL };
2106	int rc;
2107
2108	lprocfs_echo_init_vars(&lvars);
2109
2110	rc = lu_kmem_init(echo_caches);
2111	if (rc == 0) {
2112		rc = class_register_type(&echo_client_obd_ops, NULL,
2113					 lvars.module_vars,
2114					 LUSTRE_ECHO_CLIENT_NAME,
2115					 &echo_device_type);
2116		if (rc)
2117			lu_kmem_fini(echo_caches);
2118	}
2119	return rc;
2120}
2121
2122void echo_client_exit(void)
2123{
2124	class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
2125	lu_kmem_fini(echo_caches);
2126}
2127
2128static int __init obdecho_init(void)
2129{
2130	struct lprocfs_static_vars lvars;
2131	int rc;
2132
2133	LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n");
2134
2135	LASSERT(PAGE_CACHE_SIZE % OBD_ECHO_BLOCK_SIZE == 0);
2136
2137	lprocfs_echo_init_vars(&lvars);
2138
2139
2140	rc = echo_client_init();
2141
2142	return rc;
2143}
2144
2145static void /*__exit*/ obdecho_exit(void)
2146{
2147	echo_client_exit();
2148
2149}
2150
2151MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2152MODULE_DESCRIPTION("Lustre Testing Echo OBD driver");
2153MODULE_LICENSE("GPL");
2154MODULE_VERSION(LUSTRE_VERSION_STRING);
2155
2156module_init(obdecho_init);
2157module_exit(obdecho_exit);
2158
2159/** @} echo_client */
2160