1/* 2 * GPL HEADER START 3 * 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License version 2 only, 8 * as published by the Free Software Foundation. 9 * 10 * This program is distributed in the hope that it will be useful, but 11 * WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * General Public License version 2 for more details (a copy is included 14 * in the LICENSE file that accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License 17 * version 2 along with this program; If not, see 18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf 19 * 20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 21 * CA 95054 USA or visit www.sun.com if you need additional information or 22 * have any questions. 23 * 24 * GPL HEADER END 25 */ 26/* 27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. 28 * Use is subject to license terms. 29 * 30 * Copyright (c) 2011, 2012, Intel Corporation. 31 */ 32/* 33 * This file is part of Lustre, http://www.lustre.org/ 34 * Lustre is a trademark of Sun Microsystems, Inc. 35 */ 36 37#define DEBUG_SUBSYSTEM S_ECHO 38#include "../../include/linux/libcfs/libcfs.h" 39 40#include "../include/obd.h" 41#include "../include/obd_support.h" 42#include "../include/obd_class.h" 43#include "../include/lustre_debug.h" 44#include "../include/lprocfs_status.h" 45#include "../include/cl_object.h" 46#include "../include/lustre_fid.h" 47#include "../include/lustre_acl.h" 48#include "../include/lustre_net.h" 49 50#include "echo_internal.h" 51 52/** \defgroup echo_client Echo Client 53 * @{ 54 */ 55 56struct echo_device { 57 struct cl_device ed_cl; 58 struct echo_client_obd *ed_ec; 59 60 struct cl_site ed_site_myself; 61 struct cl_site *ed_site; 62 struct lu_device *ed_next; 63 int ed_next_islov; 64}; 65 66struct echo_object { 67 struct cl_object eo_cl; 68 struct cl_object_header eo_hdr; 69 70 struct echo_device *eo_dev; 71 struct list_head eo_obj_chain; 72 struct lov_stripe_md *eo_lsm; 73 atomic_t eo_npages; 74 int eo_deleted; 75}; 76 77struct echo_object_conf { 78 struct cl_object_conf eoc_cl; 79 struct lov_stripe_md **eoc_md; 80}; 81 82struct echo_page { 83 struct cl_page_slice ep_cl; 84 struct mutex ep_lock; 85 struct page *ep_vmpage; 86}; 87 88struct echo_lock { 89 struct cl_lock_slice el_cl; 90 struct list_head el_chain; 91 struct echo_object *el_object; 92 __u64 el_cookie; 93 atomic_t el_refcount; 94}; 95 96static int echo_client_setup(const struct lu_env *env, 97 struct obd_device *obddev, 98 struct lustre_cfg *lcfg); 99static int echo_client_cleanup(struct obd_device *obddev); 100 101 102/** \defgroup echo_helpers Helper functions 103 * @{ 104 */ 105static inline struct echo_device *cl2echo_dev(const struct cl_device *dev) 106{ 107 return container_of0(dev, struct echo_device, ed_cl); 108} 109 110static inline struct cl_device *echo_dev2cl(struct echo_device *d) 111{ 112 return &d->ed_cl; 113} 114 115static inline struct echo_device *obd2echo_dev(const struct obd_device *obd) 116{ 117 return cl2echo_dev(lu2cl_dev(obd->obd_lu_dev)); 118} 119 120static inline struct cl_object *echo_obj2cl(struct echo_object *eco) 121{ 122 return &eco->eo_cl; 123} 124 125static inline struct echo_object *cl2echo_obj(const struct cl_object *o) 126{ 127 return container_of(o, struct echo_object, eo_cl); 128} 129 130static inline struct echo_page *cl2echo_page(const struct cl_page_slice *s) 131{ 132 return container_of(s, struct echo_page, ep_cl); 133} 134 135static inline struct echo_lock *cl2echo_lock(const struct cl_lock_slice *s) 136{ 137 return container_of(s, struct echo_lock, el_cl); 138} 139 140static inline struct cl_lock *echo_lock2cl(const struct echo_lock *ecl) 141{ 142 return ecl->el_cl.cls_lock; 143} 144 145static struct lu_context_key echo_thread_key; 146static inline struct echo_thread_info *echo_env_info(const struct lu_env *env) 147{ 148 struct echo_thread_info *info; 149 info = lu_context_key_get(&env->le_ctx, &echo_thread_key); 150 LASSERT(info != NULL); 151 return info; 152} 153 154static inline 155struct echo_object_conf *cl2echo_conf(const struct cl_object_conf *c) 156{ 157 return container_of(c, struct echo_object_conf, eoc_cl); 158} 159 160/** @} echo_helpers */ 161 162static struct echo_object *cl_echo_object_find(struct echo_device *d, 163 struct lov_stripe_md **lsm); 164static int cl_echo_object_put(struct echo_object *eco); 165static int cl_echo_enqueue(struct echo_object *eco, u64 start, 166 u64 end, int mode, __u64 *cookie); 167static int cl_echo_cancel(struct echo_device *d, __u64 cookie); 168static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset, 169 struct page **pages, int npages, int async); 170 171static struct echo_thread_info *echo_env_info(const struct lu_env *env); 172 173struct echo_thread_info { 174 struct echo_object_conf eti_conf; 175 struct lustre_md eti_md; 176 177 struct cl_2queue eti_queue; 178 struct cl_io eti_io; 179 struct cl_lock_descr eti_descr; 180 struct lu_fid eti_fid; 181 struct lu_fid eti_fid2; 182}; 183 184/* No session used right now */ 185struct echo_session_info { 186 unsigned long dummy; 187}; 188 189static struct kmem_cache *echo_lock_kmem; 190static struct kmem_cache *echo_object_kmem; 191static struct kmem_cache *echo_thread_kmem; 192static struct kmem_cache *echo_session_kmem; 193 194static struct lu_kmem_descr echo_caches[] = { 195 { 196 .ckd_cache = &echo_lock_kmem, 197 .ckd_name = "echo_lock_kmem", 198 .ckd_size = sizeof (struct echo_lock) 199 }, 200 { 201 .ckd_cache = &echo_object_kmem, 202 .ckd_name = "echo_object_kmem", 203 .ckd_size = sizeof (struct echo_object) 204 }, 205 { 206 .ckd_cache = &echo_thread_kmem, 207 .ckd_name = "echo_thread_kmem", 208 .ckd_size = sizeof (struct echo_thread_info) 209 }, 210 { 211 .ckd_cache = &echo_session_kmem, 212 .ckd_name = "echo_session_kmem", 213 .ckd_size = sizeof (struct echo_session_info) 214 }, 215 { 216 .ckd_cache = NULL 217 } 218}; 219 220/** \defgroup echo_page Page operations 221 * 222 * Echo page operations. 223 * 224 * @{ 225 */ 226static struct page *echo_page_vmpage(const struct lu_env *env, 227 const struct cl_page_slice *slice) 228{ 229 return cl2echo_page(slice)->ep_vmpage; 230} 231 232static int echo_page_own(const struct lu_env *env, 233 const struct cl_page_slice *slice, 234 struct cl_io *io, int nonblock) 235{ 236 struct echo_page *ep = cl2echo_page(slice); 237 238 if (!nonblock) 239 mutex_lock(&ep->ep_lock); 240 else if (!mutex_trylock(&ep->ep_lock)) 241 return -EAGAIN; 242 return 0; 243} 244 245static void echo_page_disown(const struct lu_env *env, 246 const struct cl_page_slice *slice, 247 struct cl_io *io) 248{ 249 struct echo_page *ep = cl2echo_page(slice); 250 251 LASSERT(mutex_is_locked(&ep->ep_lock)); 252 mutex_unlock(&ep->ep_lock); 253} 254 255static void echo_page_discard(const struct lu_env *env, 256 const struct cl_page_slice *slice, 257 struct cl_io *unused) 258{ 259 cl_page_delete(env, slice->cpl_page); 260} 261 262static int echo_page_is_vmlocked(const struct lu_env *env, 263 const struct cl_page_slice *slice) 264{ 265 if (mutex_is_locked(&cl2echo_page(slice)->ep_lock)) 266 return -EBUSY; 267 return -ENODATA; 268} 269 270static void echo_page_completion(const struct lu_env *env, 271 const struct cl_page_slice *slice, 272 int ioret) 273{ 274 LASSERT(slice->cpl_page->cp_sync_io != NULL); 275} 276 277static void echo_page_fini(const struct lu_env *env, 278 struct cl_page_slice *slice) 279{ 280 struct echo_page *ep = cl2echo_page(slice); 281 struct echo_object *eco = cl2echo_obj(slice->cpl_obj); 282 struct page *vmpage = ep->ep_vmpage; 283 284 atomic_dec(&eco->eo_npages); 285 page_cache_release(vmpage); 286} 287 288static int echo_page_prep(const struct lu_env *env, 289 const struct cl_page_slice *slice, 290 struct cl_io *unused) 291{ 292 return 0; 293} 294 295static int echo_page_print(const struct lu_env *env, 296 const struct cl_page_slice *slice, 297 void *cookie, lu_printer_t printer) 298{ 299 struct echo_page *ep = cl2echo_page(slice); 300 301 (*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n", 302 ep, mutex_is_locked(&ep->ep_lock), ep->ep_vmpage); 303 return 0; 304} 305 306static const struct cl_page_operations echo_page_ops = { 307 .cpo_own = echo_page_own, 308 .cpo_disown = echo_page_disown, 309 .cpo_discard = echo_page_discard, 310 .cpo_vmpage = echo_page_vmpage, 311 .cpo_fini = echo_page_fini, 312 .cpo_print = echo_page_print, 313 .cpo_is_vmlocked = echo_page_is_vmlocked, 314 .io = { 315 [CRT_READ] = { 316 .cpo_prep = echo_page_prep, 317 .cpo_completion = echo_page_completion, 318 }, 319 [CRT_WRITE] = { 320 .cpo_prep = echo_page_prep, 321 .cpo_completion = echo_page_completion, 322 } 323 } 324}; 325/** @} echo_page */ 326 327/** \defgroup echo_lock Locking 328 * 329 * echo lock operations 330 * 331 * @{ 332 */ 333static void echo_lock_fini(const struct lu_env *env, 334 struct cl_lock_slice *slice) 335{ 336 struct echo_lock *ecl = cl2echo_lock(slice); 337 338 LASSERT(list_empty(&ecl->el_chain)); 339 OBD_SLAB_FREE_PTR(ecl, echo_lock_kmem); 340} 341 342static void echo_lock_delete(const struct lu_env *env, 343 const struct cl_lock_slice *slice) 344{ 345 struct echo_lock *ecl = cl2echo_lock(slice); 346 347 LASSERT(list_empty(&ecl->el_chain)); 348} 349 350static int echo_lock_fits_into(const struct lu_env *env, 351 const struct cl_lock_slice *slice, 352 const struct cl_lock_descr *need, 353 const struct cl_io *unused) 354{ 355 return 1; 356} 357 358static struct cl_lock_operations echo_lock_ops = { 359 .clo_fini = echo_lock_fini, 360 .clo_delete = echo_lock_delete, 361 .clo_fits_into = echo_lock_fits_into 362}; 363 364/** @} echo_lock */ 365 366/** \defgroup echo_cl_ops cl_object operations 367 * 368 * operations for cl_object 369 * 370 * @{ 371 */ 372static int echo_page_init(const struct lu_env *env, struct cl_object *obj, 373 struct cl_page *page, struct page *vmpage) 374{ 375 struct echo_page *ep = cl_object_page_slice(obj, page); 376 struct echo_object *eco = cl2echo_obj(obj); 377 378 ep->ep_vmpage = vmpage; 379 page_cache_get(vmpage); 380 mutex_init(&ep->ep_lock); 381 cl_page_slice_add(page, &ep->ep_cl, obj, &echo_page_ops); 382 atomic_inc(&eco->eo_npages); 383 return 0; 384} 385 386static int echo_io_init(const struct lu_env *env, struct cl_object *obj, 387 struct cl_io *io) 388{ 389 return 0; 390} 391 392static int echo_lock_init(const struct lu_env *env, 393 struct cl_object *obj, struct cl_lock *lock, 394 const struct cl_io *unused) 395{ 396 struct echo_lock *el; 397 398 OBD_SLAB_ALLOC_PTR_GFP(el, echo_lock_kmem, GFP_NOFS); 399 if (el != NULL) { 400 cl_lock_slice_add(lock, &el->el_cl, obj, &echo_lock_ops); 401 el->el_object = cl2echo_obj(obj); 402 INIT_LIST_HEAD(&el->el_chain); 403 atomic_set(&el->el_refcount, 0); 404 } 405 return el == NULL ? -ENOMEM : 0; 406} 407 408static int echo_conf_set(const struct lu_env *env, struct cl_object *obj, 409 const struct cl_object_conf *conf) 410{ 411 return 0; 412} 413 414static const struct cl_object_operations echo_cl_obj_ops = { 415 .coo_page_init = echo_page_init, 416 .coo_lock_init = echo_lock_init, 417 .coo_io_init = echo_io_init, 418 .coo_conf_set = echo_conf_set 419}; 420/** @} echo_cl_ops */ 421 422/** \defgroup echo_lu_ops lu_object operations 423 * 424 * operations for echo lu object. 425 * 426 * @{ 427 */ 428static int echo_object_init(const struct lu_env *env, struct lu_object *obj, 429 const struct lu_object_conf *conf) 430{ 431 struct echo_device *ed = cl2echo_dev(lu2cl_dev(obj->lo_dev)); 432 struct echo_client_obd *ec = ed->ed_ec; 433 struct echo_object *eco = cl2echo_obj(lu2cl(obj)); 434 const struct cl_object_conf *cconf; 435 struct echo_object_conf *econf; 436 437 if (ed->ed_next) { 438 struct lu_object *below; 439 struct lu_device *under; 440 441 under = ed->ed_next; 442 below = under->ld_ops->ldo_object_alloc(env, obj->lo_header, 443 under); 444 if (below == NULL) 445 return -ENOMEM; 446 lu_object_add(obj, below); 447 } 448 449 cconf = lu2cl_conf(conf); 450 econf = cl2echo_conf(cconf); 451 452 LASSERT(econf->eoc_md); 453 eco->eo_lsm = *econf->eoc_md; 454 /* clear the lsm pointer so that it won't get freed. */ 455 *econf->eoc_md = NULL; 456 457 eco->eo_dev = ed; 458 atomic_set(&eco->eo_npages, 0); 459 cl_object_page_init(lu2cl(obj), sizeof(struct echo_page)); 460 461 spin_lock(&ec->ec_lock); 462 list_add_tail(&eco->eo_obj_chain, &ec->ec_objects); 463 spin_unlock(&ec->ec_lock); 464 465 return 0; 466} 467 468/* taken from osc_unpackmd() */ 469static int echo_alloc_memmd(struct echo_device *ed, 470 struct lov_stripe_md **lsmp) 471{ 472 int lsm_size; 473 474 /* If export is lov/osc then use their obd method */ 475 if (ed->ed_next != NULL) 476 return obd_alloc_memmd(ed->ed_ec->ec_exp, lsmp); 477 /* OFD has no unpackmd method, do everything here */ 478 lsm_size = lov_stripe_md_size(1); 479 480 LASSERT(*lsmp == NULL); 481 OBD_ALLOC(*lsmp, lsm_size); 482 if (*lsmp == NULL) 483 return -ENOMEM; 484 485 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo)); 486 if ((*lsmp)->lsm_oinfo[0] == NULL) { 487 OBD_FREE(*lsmp, lsm_size); 488 return -ENOMEM; 489 } 490 491 loi_init((*lsmp)->lsm_oinfo[0]); 492 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES; 493 ostid_set_seq_echo(&(*lsmp)->lsm_oi); 494 495 return lsm_size; 496} 497 498static int echo_free_memmd(struct echo_device *ed, struct lov_stripe_md **lsmp) 499{ 500 int lsm_size; 501 502 /* If export is lov/osc then use their obd method */ 503 if (ed->ed_next != NULL) 504 return obd_free_memmd(ed->ed_ec->ec_exp, lsmp); 505 /* OFD has no unpackmd method, do everything here */ 506 lsm_size = lov_stripe_md_size(1); 507 508 LASSERT(*lsmp != NULL); 509 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo)); 510 OBD_FREE(*lsmp, lsm_size); 511 *lsmp = NULL; 512 return 0; 513} 514 515static void echo_object_free(const struct lu_env *env, struct lu_object *obj) 516{ 517 struct echo_object *eco = cl2echo_obj(lu2cl(obj)); 518 struct echo_client_obd *ec = eco->eo_dev->ed_ec; 519 520 LASSERT(atomic_read(&eco->eo_npages) == 0); 521 522 spin_lock(&ec->ec_lock); 523 list_del_init(&eco->eo_obj_chain); 524 spin_unlock(&ec->ec_lock); 525 526 lu_object_fini(obj); 527 lu_object_header_fini(obj->lo_header); 528 529 if (eco->eo_lsm) 530 echo_free_memmd(eco->eo_dev, &eco->eo_lsm); 531 OBD_SLAB_FREE_PTR(eco, echo_object_kmem); 532} 533 534static int echo_object_print(const struct lu_env *env, void *cookie, 535 lu_printer_t p, const struct lu_object *o) 536{ 537 struct echo_object *obj = cl2echo_obj(lu2cl(o)); 538 539 return (*p)(env, cookie, "echoclient-object@%p", obj); 540} 541 542static const struct lu_object_operations echo_lu_obj_ops = { 543 .loo_object_init = echo_object_init, 544 .loo_object_delete = NULL, 545 .loo_object_release = NULL, 546 .loo_object_free = echo_object_free, 547 .loo_object_print = echo_object_print, 548 .loo_object_invariant = NULL 549}; 550/** @} echo_lu_ops */ 551 552/** \defgroup echo_lu_dev_ops lu_device operations 553 * 554 * Operations for echo lu device. 555 * 556 * @{ 557 */ 558static struct lu_object *echo_object_alloc(const struct lu_env *env, 559 const struct lu_object_header *hdr, 560 struct lu_device *dev) 561{ 562 struct echo_object *eco; 563 struct lu_object *obj = NULL; 564 565 /* we're the top dev. */ 566 LASSERT(hdr == NULL); 567 OBD_SLAB_ALLOC_PTR_GFP(eco, echo_object_kmem, GFP_NOFS); 568 if (eco != NULL) { 569 struct cl_object_header *hdr = &eco->eo_hdr; 570 571 obj = &echo_obj2cl(eco)->co_lu; 572 cl_object_header_init(hdr); 573 lu_object_init(obj, &hdr->coh_lu, dev); 574 lu_object_add_top(&hdr->coh_lu, obj); 575 576 eco->eo_cl.co_ops = &echo_cl_obj_ops; 577 obj->lo_ops = &echo_lu_obj_ops; 578 } 579 return obj; 580} 581 582static struct lu_device_operations echo_device_lu_ops = { 583 .ldo_object_alloc = echo_object_alloc, 584}; 585 586/** @} echo_lu_dev_ops */ 587 588static struct cl_device_operations echo_device_cl_ops = { 589}; 590 591/** \defgroup echo_init Setup and teardown 592 * 593 * Init and fini functions for echo client. 594 * 595 * @{ 596 */ 597static int echo_site_init(const struct lu_env *env, struct echo_device *ed) 598{ 599 struct cl_site *site = &ed->ed_site_myself; 600 int rc; 601 602 /* initialize site */ 603 rc = cl_site_init(site, &ed->ed_cl); 604 if (rc) { 605 CERROR("Cannot initialize site for echo client(%d)\n", rc); 606 return rc; 607 } 608 609 rc = lu_site_init_finish(&site->cs_lu); 610 if (rc) 611 return rc; 612 613 ed->ed_site = site; 614 return 0; 615} 616 617static void echo_site_fini(const struct lu_env *env, struct echo_device *ed) 618{ 619 if (ed->ed_site) { 620 cl_site_fini(ed->ed_site); 621 ed->ed_site = NULL; 622 } 623} 624 625static void *echo_thread_key_init(const struct lu_context *ctx, 626 struct lu_context_key *key) 627{ 628 struct echo_thread_info *info; 629 630 OBD_SLAB_ALLOC_PTR_GFP(info, echo_thread_kmem, GFP_NOFS); 631 if (info == NULL) 632 info = ERR_PTR(-ENOMEM); 633 return info; 634} 635 636static void echo_thread_key_fini(const struct lu_context *ctx, 637 struct lu_context_key *key, void *data) 638{ 639 struct echo_thread_info *info = data; 640 OBD_SLAB_FREE_PTR(info, echo_thread_kmem); 641} 642 643static void echo_thread_key_exit(const struct lu_context *ctx, 644 struct lu_context_key *key, void *data) 645{ 646} 647 648static struct lu_context_key echo_thread_key = { 649 .lct_tags = LCT_CL_THREAD, 650 .lct_init = echo_thread_key_init, 651 .lct_fini = echo_thread_key_fini, 652 .lct_exit = echo_thread_key_exit 653}; 654 655static void *echo_session_key_init(const struct lu_context *ctx, 656 struct lu_context_key *key) 657{ 658 struct echo_session_info *session; 659 660 OBD_SLAB_ALLOC_PTR_GFP(session, echo_session_kmem, GFP_NOFS); 661 if (session == NULL) 662 session = ERR_PTR(-ENOMEM); 663 return session; 664} 665 666static void echo_session_key_fini(const struct lu_context *ctx, 667 struct lu_context_key *key, void *data) 668{ 669 struct echo_session_info *session = data; 670 OBD_SLAB_FREE_PTR(session, echo_session_kmem); 671} 672 673static void echo_session_key_exit(const struct lu_context *ctx, 674 struct lu_context_key *key, void *data) 675{ 676} 677 678static struct lu_context_key echo_session_key = { 679 .lct_tags = LCT_SESSION, 680 .lct_init = echo_session_key_init, 681 .lct_fini = echo_session_key_fini, 682 .lct_exit = echo_session_key_exit 683}; 684 685LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key); 686 687static struct lu_device *echo_device_alloc(const struct lu_env *env, 688 struct lu_device_type *t, 689 struct lustre_cfg *cfg) 690{ 691 struct lu_device *next; 692 struct echo_device *ed; 693 struct cl_device *cd; 694 struct obd_device *obd = NULL; /* to keep compiler happy */ 695 struct obd_device *tgt; 696 const char *tgt_type_name; 697 int rc; 698 int cleanup = 0; 699 700 OBD_ALLOC_PTR(ed); 701 if (ed == NULL) 702 GOTO(out, rc = -ENOMEM); 703 704 cleanup = 1; 705 cd = &ed->ed_cl; 706 rc = cl_device_init(cd, t); 707 if (rc) 708 GOTO(out, rc); 709 710 cd->cd_lu_dev.ld_ops = &echo_device_lu_ops; 711 cd->cd_ops = &echo_device_cl_ops; 712 713 cleanup = 2; 714 obd = class_name2obd(lustre_cfg_string(cfg, 0)); 715 LASSERT(obd != NULL); 716 LASSERT(env != NULL); 717 718 tgt = class_name2obd(lustre_cfg_string(cfg, 1)); 719 if (tgt == NULL) { 720 CERROR("Can not find tgt device %s\n", 721 lustre_cfg_string(cfg, 1)); 722 GOTO(out, rc = -ENODEV); 723 } 724 725 next = tgt->obd_lu_dev; 726 if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) { 727 CERROR("echo MDT client must be run on server\n"); 728 GOTO(out, rc = -EOPNOTSUPP); 729 } 730 731 rc = echo_site_init(env, ed); 732 if (rc) 733 GOTO(out, rc); 734 735 cleanup = 3; 736 737 rc = echo_client_setup(env, obd, cfg); 738 if (rc) 739 GOTO(out, rc); 740 741 ed->ed_ec = &obd->u.echo_client; 742 cleanup = 4; 743 744 /* if echo client is to be stacked upon ost device, the next is 745 * NULL since ost is not a clio device so far */ 746 if (next != NULL && !lu_device_is_cl(next)) 747 next = NULL; 748 749 tgt_type_name = tgt->obd_type->typ_name; 750 if (next != NULL) { 751 LASSERT(next != NULL); 752 if (next->ld_site != NULL) 753 GOTO(out, rc = -EBUSY); 754 755 next->ld_site = &ed->ed_site->cs_lu; 756 rc = next->ld_type->ldt_ops->ldto_device_init(env, next, 757 next->ld_type->ldt_name, 758 NULL); 759 if (rc) 760 GOTO(out, rc); 761 762 /* Tricky case, I have to determine the obd type since 763 * CLIO uses the different parameters to initialize 764 * objects for lov & osc. */ 765 if (strcmp(tgt_type_name, LUSTRE_LOV_NAME) == 0) 766 ed->ed_next_islov = 1; 767 else 768 LASSERT(strcmp(tgt_type_name, 769 LUSTRE_OSC_NAME) == 0); 770 } else { 771 LASSERT(strcmp(tgt_type_name, LUSTRE_OST_NAME) == 0); 772 } 773 774 ed->ed_next = next; 775 return &cd->cd_lu_dev; 776out: 777 switch (cleanup) { 778 case 4: { 779 int rc2; 780 rc2 = echo_client_cleanup(obd); 781 if (rc2) 782 CERROR("Cleanup obd device %s error(%d)\n", 783 obd->obd_name, rc2); 784 } 785 786 case 3: 787 echo_site_fini(env, ed); 788 case 2: 789 cl_device_fini(&ed->ed_cl); 790 case 1: 791 OBD_FREE_PTR(ed); 792 case 0: 793 default: 794 break; 795 } 796 return ERR_PTR(rc); 797} 798 799static int echo_device_init(const struct lu_env *env, struct lu_device *d, 800 const char *name, struct lu_device *next) 801{ 802 LBUG(); 803 return 0; 804} 805 806static struct lu_device *echo_device_fini(const struct lu_env *env, 807 struct lu_device *d) 808{ 809 struct echo_device *ed = cl2echo_dev(lu2cl_dev(d)); 810 struct lu_device *next = ed->ed_next; 811 812 while (next) 813 next = next->ld_type->ldt_ops->ldto_device_fini(env, next); 814 return NULL; 815} 816 817static void echo_lock_release(const struct lu_env *env, 818 struct echo_lock *ecl, 819 int still_used) 820{ 821 struct cl_lock *clk = echo_lock2cl(ecl); 822 823 cl_lock_get(clk); 824 cl_unuse(env, clk); 825 cl_lock_release(env, clk, "ec enqueue", ecl->el_object); 826 if (!still_used) { 827 cl_lock_mutex_get(env, clk); 828 cl_lock_cancel(env, clk); 829 cl_lock_delete(env, clk); 830 cl_lock_mutex_put(env, clk); 831 } 832 cl_lock_put(env, clk); 833} 834 835static struct lu_device *echo_device_free(const struct lu_env *env, 836 struct lu_device *d) 837{ 838 struct echo_device *ed = cl2echo_dev(lu2cl_dev(d)); 839 struct echo_client_obd *ec = ed->ed_ec; 840 struct echo_object *eco; 841 struct lu_device *next = ed->ed_next; 842 843 CDEBUG(D_INFO, "echo device:%p is going to be freed, next = %p\n", 844 ed, next); 845 846 lu_site_purge(env, &ed->ed_site->cs_lu, -1); 847 848 /* check if there are objects still alive. 849 * It shouldn't have any object because lu_site_purge would cleanup 850 * all of cached objects. Anyway, probably the echo device is being 851 * parallelly accessed. 852 */ 853 spin_lock(&ec->ec_lock); 854 list_for_each_entry(eco, &ec->ec_objects, eo_obj_chain) 855 eco->eo_deleted = 1; 856 spin_unlock(&ec->ec_lock); 857 858 /* purge again */ 859 lu_site_purge(env, &ed->ed_site->cs_lu, -1); 860 861 CDEBUG(D_INFO, 862 "Waiting for the reference of echo object to be dropped\n"); 863 864 /* Wait for the last reference to be dropped. */ 865 spin_lock(&ec->ec_lock); 866 while (!list_empty(&ec->ec_objects)) { 867 spin_unlock(&ec->ec_lock); 868 CERROR("echo_client still has objects at cleanup time, " 869 "wait for 1 second\n"); 870 set_current_state(TASK_UNINTERRUPTIBLE); 871 schedule_timeout(cfs_time_seconds(1)); 872 lu_site_purge(env, &ed->ed_site->cs_lu, -1); 873 spin_lock(&ec->ec_lock); 874 } 875 spin_unlock(&ec->ec_lock); 876 877 LASSERT(list_empty(&ec->ec_locks)); 878 879 CDEBUG(D_INFO, "No object exists, exiting...\n"); 880 881 echo_client_cleanup(d->ld_obd); 882 883 while (next) 884 next = next->ld_type->ldt_ops->ldto_device_free(env, next); 885 886 LASSERT(ed->ed_site == lu2cl_site(d->ld_site)); 887 echo_site_fini(env, ed); 888 cl_device_fini(&ed->ed_cl); 889 OBD_FREE_PTR(ed); 890 891 return NULL; 892} 893 894static const struct lu_device_type_operations echo_device_type_ops = { 895 .ldto_init = echo_type_init, 896 .ldto_fini = echo_type_fini, 897 898 .ldto_start = echo_type_start, 899 .ldto_stop = echo_type_stop, 900 901 .ldto_device_alloc = echo_device_alloc, 902 .ldto_device_free = echo_device_free, 903 .ldto_device_init = echo_device_init, 904 .ldto_device_fini = echo_device_fini 905}; 906 907static struct lu_device_type echo_device_type = { 908 .ldt_tags = LU_DEVICE_CL, 909 .ldt_name = LUSTRE_ECHO_CLIENT_NAME, 910 .ldt_ops = &echo_device_type_ops, 911 .ldt_ctx_tags = LCT_CL_THREAD, 912}; 913/** @} echo_init */ 914 915/** \defgroup echo_exports Exported operations 916 * 917 * exporting functions to echo client 918 * 919 * @{ 920 */ 921 922/* Interfaces to echo client obd device */ 923static struct echo_object *cl_echo_object_find(struct echo_device *d, 924 struct lov_stripe_md **lsmp) 925{ 926 struct lu_env *env; 927 struct echo_thread_info *info; 928 struct echo_object_conf *conf; 929 struct lov_stripe_md *lsm; 930 struct echo_object *eco; 931 struct cl_object *obj; 932 struct lu_fid *fid; 933 int refcheck; 934 int rc; 935 936 LASSERT(lsmp); 937 lsm = *lsmp; 938 LASSERT(lsm); 939 LASSERTF(ostid_id(&lsm->lsm_oi) != 0, DOSTID"\n", POSTID(&lsm->lsm_oi)); 940 LASSERTF(ostid_seq(&lsm->lsm_oi) == FID_SEQ_ECHO, DOSTID"\n", 941 POSTID(&lsm->lsm_oi)); 942 943 /* Never return an object if the obd is to be freed. */ 944 if (echo_dev2cl(d)->cd_lu_dev.ld_obd->obd_stopping) 945 return ERR_PTR(-ENODEV); 946 947 env = cl_env_get(&refcheck); 948 if (IS_ERR(env)) 949 return (void *)env; 950 951 info = echo_env_info(env); 952 conf = &info->eti_conf; 953 if (d->ed_next) { 954 if (!d->ed_next_islov) { 955 struct lov_oinfo *oinfo = lsm->lsm_oinfo[0]; 956 LASSERT(oinfo != NULL); 957 oinfo->loi_oi = lsm->lsm_oi; 958 conf->eoc_cl.u.coc_oinfo = oinfo; 959 } else { 960 struct lustre_md *md; 961 md = &info->eti_md; 962 memset(md, 0, sizeof(*md)); 963 md->lsm = lsm; 964 conf->eoc_cl.u.coc_md = md; 965 } 966 } 967 conf->eoc_md = lsmp; 968 969 fid = &info->eti_fid; 970 rc = ostid_to_fid(fid, &lsm->lsm_oi, 0); 971 if (rc != 0) 972 GOTO(out, eco = ERR_PTR(rc)); 973 974 /* In the function below, .hs_keycmp resolves to 975 * lu_obj_hop_keycmp() */ 976 /* coverity[overrun-buffer-val] */ 977 obj = cl_object_find(env, echo_dev2cl(d), fid, &conf->eoc_cl); 978 if (IS_ERR(obj)) 979 GOTO(out, eco = (void *)obj); 980 981 eco = cl2echo_obj(obj); 982 if (eco->eo_deleted) { 983 cl_object_put(env, obj); 984 eco = ERR_PTR(-EAGAIN); 985 } 986 987out: 988 cl_env_put(env, &refcheck); 989 return eco; 990} 991 992static int cl_echo_object_put(struct echo_object *eco) 993{ 994 struct lu_env *env; 995 struct cl_object *obj = echo_obj2cl(eco); 996 int refcheck; 997 998 env = cl_env_get(&refcheck); 999 if (IS_ERR(env)) 1000 return PTR_ERR(env); 1001 1002 /* an external function to kill an object? */ 1003 if (eco->eo_deleted) { 1004 struct lu_object_header *loh = obj->co_lu.lo_header; 1005 LASSERT(&eco->eo_hdr == luh2coh(loh)); 1006 set_bit(LU_OBJECT_HEARD_BANSHEE, &loh->loh_flags); 1007 } 1008 1009 cl_object_put(env, obj); 1010 cl_env_put(env, &refcheck); 1011 return 0; 1012} 1013 1014static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco, 1015 u64 start, u64 end, int mode, 1016 __u64 *cookie , __u32 enqflags) 1017{ 1018 struct cl_io *io; 1019 struct cl_lock *lck; 1020 struct cl_object *obj; 1021 struct cl_lock_descr *descr; 1022 struct echo_thread_info *info; 1023 int rc = -ENOMEM; 1024 1025 info = echo_env_info(env); 1026 io = &info->eti_io; 1027 descr = &info->eti_descr; 1028 obj = echo_obj2cl(eco); 1029 1030 descr->cld_obj = obj; 1031 descr->cld_start = cl_index(obj, start); 1032 descr->cld_end = cl_index(obj, end); 1033 descr->cld_mode = mode == LCK_PW ? CLM_WRITE : CLM_READ; 1034 descr->cld_enq_flags = enqflags; 1035 io->ci_obj = obj; 1036 1037 lck = cl_lock_request(env, io, descr, "ec enqueue", eco); 1038 if (lck) { 1039 struct echo_client_obd *ec = eco->eo_dev->ed_ec; 1040 struct echo_lock *el; 1041 1042 rc = cl_wait(env, lck); 1043 if (rc == 0) { 1044 el = cl2echo_lock(cl_lock_at(lck, &echo_device_type)); 1045 spin_lock(&ec->ec_lock); 1046 if (list_empty(&el->el_chain)) { 1047 list_add(&el->el_chain, &ec->ec_locks); 1048 el->el_cookie = ++ec->ec_unique; 1049 } 1050 atomic_inc(&el->el_refcount); 1051 *cookie = el->el_cookie; 1052 spin_unlock(&ec->ec_lock); 1053 } else { 1054 cl_lock_release(env, lck, "ec enqueue", current); 1055 } 1056 } 1057 return rc; 1058} 1059 1060static int cl_echo_enqueue(struct echo_object *eco, u64 start, u64 end, 1061 int mode, __u64 *cookie) 1062{ 1063 struct echo_thread_info *info; 1064 struct lu_env *env; 1065 struct cl_io *io; 1066 int refcheck; 1067 int result; 1068 1069 env = cl_env_get(&refcheck); 1070 if (IS_ERR(env)) 1071 return PTR_ERR(env); 1072 1073 info = echo_env_info(env); 1074 io = &info->eti_io; 1075 1076 io->ci_ignore_layout = 1; 1077 result = cl_io_init(env, io, CIT_MISC, echo_obj2cl(eco)); 1078 if (result < 0) 1079 GOTO(out, result); 1080 LASSERT(result == 0); 1081 1082 result = cl_echo_enqueue0(env, eco, start, end, mode, cookie, 0); 1083 cl_io_fini(env, io); 1084 1085out: 1086 cl_env_put(env, &refcheck); 1087 return result; 1088} 1089 1090static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed, 1091 __u64 cookie) 1092{ 1093 struct echo_client_obd *ec = ed->ed_ec; 1094 struct echo_lock *ecl = NULL; 1095 struct list_head *el; 1096 int found = 0, still_used = 0; 1097 1098 LASSERT(ec != NULL); 1099 spin_lock(&ec->ec_lock); 1100 list_for_each (el, &ec->ec_locks) { 1101 ecl = list_entry (el, struct echo_lock, el_chain); 1102 CDEBUG(D_INFO, "ecl: %p, cookie: %#llx\n", ecl, ecl->el_cookie); 1103 found = (ecl->el_cookie == cookie); 1104 if (found) { 1105 if (atomic_dec_and_test(&ecl->el_refcount)) 1106 list_del_init(&ecl->el_chain); 1107 else 1108 still_used = 1; 1109 break; 1110 } 1111 } 1112 spin_unlock(&ec->ec_lock); 1113 1114 if (!found) 1115 return -ENOENT; 1116 1117 echo_lock_release(env, ecl, still_used); 1118 return 0; 1119} 1120 1121static int cl_echo_cancel(struct echo_device *ed, __u64 cookie) 1122{ 1123 struct lu_env *env; 1124 int refcheck; 1125 int rc; 1126 1127 env = cl_env_get(&refcheck); 1128 if (IS_ERR(env)) 1129 return PTR_ERR(env); 1130 1131 rc = cl_echo_cancel0(env, ed, cookie); 1132 1133 cl_env_put(env, &refcheck); 1134 return rc; 1135} 1136 1137static int cl_echo_async_brw(const struct lu_env *env, struct cl_io *io, 1138 enum cl_req_type unused, struct cl_2queue *queue) 1139{ 1140 struct cl_page *clp; 1141 struct cl_page *temp; 1142 int result = 0; 1143 1144 cl_page_list_for_each_safe(clp, temp, &queue->c2_qin) { 1145 int rc; 1146 rc = cl_page_cache_add(env, io, clp, CRT_WRITE); 1147 if (rc == 0) 1148 continue; 1149 result = result ?: rc; 1150 } 1151 return result; 1152} 1153 1154static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset, 1155 struct page **pages, int npages, int async) 1156{ 1157 struct lu_env *env; 1158 struct echo_thread_info *info; 1159 struct cl_object *obj = echo_obj2cl(eco); 1160 struct echo_device *ed = eco->eo_dev; 1161 struct cl_2queue *queue; 1162 struct cl_io *io; 1163 struct cl_page *clp; 1164 struct lustre_handle lh = { 0 }; 1165 int page_size = cl_page_size(obj); 1166 int refcheck; 1167 int rc; 1168 int i; 1169 1170 LASSERT((offset & ~CFS_PAGE_MASK) == 0); 1171 LASSERT(ed->ed_next != NULL); 1172 env = cl_env_get(&refcheck); 1173 if (IS_ERR(env)) 1174 return PTR_ERR(env); 1175 1176 info = echo_env_info(env); 1177 io = &info->eti_io; 1178 queue = &info->eti_queue; 1179 1180 cl_2queue_init(queue); 1181 1182 io->ci_ignore_layout = 1; 1183 rc = cl_io_init(env, io, CIT_MISC, obj); 1184 if (rc < 0) 1185 GOTO(out, rc); 1186 LASSERT(rc == 0); 1187 1188 1189 rc = cl_echo_enqueue0(env, eco, offset, 1190 offset + npages * PAGE_CACHE_SIZE - 1, 1191 rw == READ ? LCK_PR : LCK_PW, &lh.cookie, 1192 CEF_NEVER); 1193 if (rc < 0) 1194 GOTO(error_lock, rc); 1195 1196 for (i = 0; i < npages; i++) { 1197 LASSERT(pages[i]); 1198 clp = cl_page_find(env, obj, cl_index(obj, offset), 1199 pages[i], CPT_TRANSIENT); 1200 if (IS_ERR(clp)) { 1201 rc = PTR_ERR(clp); 1202 break; 1203 } 1204 LASSERT(clp->cp_type == CPT_TRANSIENT); 1205 1206 rc = cl_page_own(env, io, clp); 1207 if (rc) { 1208 LASSERT(clp->cp_state == CPS_FREEING); 1209 cl_page_put(env, clp); 1210 break; 1211 } 1212 1213 cl_2queue_add(queue, clp); 1214 1215 /* drop the reference count for cl_page_find, so that the page 1216 * will be freed in cl_2queue_fini. */ 1217 cl_page_put(env, clp); 1218 cl_page_clip(env, clp, 0, page_size); 1219 1220 offset += page_size; 1221 } 1222 1223 if (rc == 0) { 1224 enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE; 1225 1226 async = async && (typ == CRT_WRITE); 1227 if (async) 1228 rc = cl_echo_async_brw(env, io, typ, queue); 1229 else 1230 rc = cl_io_submit_sync(env, io, typ, queue, 0); 1231 CDEBUG(D_INFO, "echo_client %s write returns %d\n", 1232 async ? "async" : "sync", rc); 1233 } 1234 1235 cl_echo_cancel0(env, ed, lh.cookie); 1236error_lock: 1237 cl_2queue_discard(env, io, queue); 1238 cl_2queue_disown(env, io, queue); 1239 cl_2queue_fini(env, queue); 1240 cl_io_fini(env, io); 1241out: 1242 cl_env_put(env, &refcheck); 1243 return rc; 1244} 1245/** @} echo_exports */ 1246 1247 1248static u64 last_object_id; 1249 1250static int 1251echo_copyout_lsm (struct lov_stripe_md *lsm, void *_ulsm, int ulsm_nob) 1252{ 1253 struct lov_stripe_md *ulsm = _ulsm; 1254 int nob, i; 1255 1256 nob = offsetof (struct lov_stripe_md, lsm_oinfo[lsm->lsm_stripe_count]); 1257 if (nob > ulsm_nob) 1258 return -EINVAL; 1259 1260 if (copy_to_user (ulsm, lsm, sizeof(*ulsm))) 1261 return -EFAULT; 1262 1263 for (i = 0; i < lsm->lsm_stripe_count; i++) { 1264 if (copy_to_user (ulsm->lsm_oinfo[i], lsm->lsm_oinfo[i], 1265 sizeof(lsm->lsm_oinfo[0]))) 1266 return -EFAULT; 1267 } 1268 return 0; 1269} 1270 1271static int 1272echo_copyin_lsm (struct echo_device *ed, struct lov_stripe_md *lsm, 1273 void *ulsm, int ulsm_nob) 1274{ 1275 struct echo_client_obd *ec = ed->ed_ec; 1276 int i; 1277 1278 if (ulsm_nob < sizeof (*lsm)) 1279 return -EINVAL; 1280 1281 if (copy_from_user (lsm, ulsm, sizeof (*lsm))) 1282 return -EFAULT; 1283 1284 if (lsm->lsm_stripe_count > ec->ec_nstripes || 1285 lsm->lsm_magic != LOV_MAGIC || 1286 (lsm->lsm_stripe_size & (~CFS_PAGE_MASK)) != 0 || 1287 ((__u64)lsm->lsm_stripe_size * lsm->lsm_stripe_count > ~0UL)) 1288 return -EINVAL; 1289 1290 1291 for (i = 0; i < lsm->lsm_stripe_count; i++) { 1292 if (copy_from_user(lsm->lsm_oinfo[i], 1293 ((struct lov_stripe_md *)ulsm)-> \ 1294 lsm_oinfo[i], 1295 sizeof(lsm->lsm_oinfo[0]))) 1296 return -EFAULT; 1297 } 1298 return 0; 1299} 1300 1301static int echo_create_object(const struct lu_env *env, struct echo_device *ed, 1302 int on_target, struct obdo *oa, void *ulsm, 1303 int ulsm_nob, struct obd_trans_info *oti) 1304{ 1305 struct echo_object *eco; 1306 struct echo_client_obd *ec = ed->ed_ec; 1307 struct lov_stripe_md *lsm = NULL; 1308 int rc; 1309 int created = 0; 1310 1311 if ((oa->o_valid & OBD_MD_FLID) == 0 && /* no obj id */ 1312 (on_target || /* set_stripe */ 1313 ec->ec_nstripes != 0)) { /* LOV */ 1314 CERROR ("No valid oid\n"); 1315 return -EINVAL; 1316 } 1317 1318 rc = echo_alloc_memmd(ed, &lsm); 1319 if (rc < 0) { 1320 CERROR("Cannot allocate md: rc = %d\n", rc); 1321 GOTO(failed, rc); 1322 } 1323 1324 if (ulsm != NULL) { 1325 int i, idx; 1326 1327 rc = echo_copyin_lsm (ed, lsm, ulsm, ulsm_nob); 1328 if (rc != 0) 1329 GOTO(failed, rc); 1330 1331 if (lsm->lsm_stripe_count == 0) 1332 lsm->lsm_stripe_count = ec->ec_nstripes; 1333 1334 if (lsm->lsm_stripe_size == 0) 1335 lsm->lsm_stripe_size = PAGE_CACHE_SIZE; 1336 1337 idx = cfs_rand(); 1338 1339 /* setup stripes: indices + default ids if required */ 1340 for (i = 0; i < lsm->lsm_stripe_count; i++) { 1341 if (ostid_id(&lsm->lsm_oinfo[i]->loi_oi) == 0) 1342 lsm->lsm_oinfo[i]->loi_oi = lsm->lsm_oi; 1343 1344 lsm->lsm_oinfo[i]->loi_ost_idx = 1345 (idx + i) % ec->ec_nstripes; 1346 } 1347 } 1348 1349 /* setup object ID here for !on_target and LOV hint */ 1350 if (oa->o_valid & OBD_MD_FLID) { 1351 LASSERT(oa->o_valid & OBD_MD_FLGROUP); 1352 lsm->lsm_oi = oa->o_oi; 1353 } 1354 1355 if (ostid_id(&lsm->lsm_oi) == 0) 1356 ostid_set_id(&lsm->lsm_oi, ++last_object_id); 1357 1358 rc = 0; 1359 if (on_target) { 1360 /* Only echo objects are allowed to be created */ 1361 LASSERT((oa->o_valid & OBD_MD_FLGROUP) && 1362 (ostid_seq(&oa->o_oi) == FID_SEQ_ECHO)); 1363 rc = obd_create(env, ec->ec_exp, oa, &lsm, oti); 1364 if (rc != 0) { 1365 CERROR("Cannot create objects: rc = %d\n", rc); 1366 GOTO(failed, rc); 1367 } 1368 created = 1; 1369 } 1370 1371 /* See what object ID we were given */ 1372 oa->o_oi = lsm->lsm_oi; 1373 oa->o_valid |= OBD_MD_FLID; 1374 1375 eco = cl_echo_object_find(ed, &lsm); 1376 if (IS_ERR(eco)) 1377 GOTO(failed, rc = PTR_ERR(eco)); 1378 cl_echo_object_put(eco); 1379 1380 CDEBUG(D_INFO, "oa oid "DOSTID"\n", POSTID(&oa->o_oi)); 1381 1382 failed: 1383 if (created && rc) 1384 obd_destroy(env, ec->ec_exp, oa, lsm, oti, NULL, NULL); 1385 if (lsm) 1386 echo_free_memmd(ed, &lsm); 1387 if (rc) 1388 CERROR("create object failed with: rc = %d\n", rc); 1389 return rc; 1390} 1391 1392static int echo_get_object(struct echo_object **ecop, struct echo_device *ed, 1393 struct obdo *oa) 1394{ 1395 struct lov_stripe_md *lsm = NULL; 1396 struct echo_object *eco; 1397 int rc; 1398 1399 if ((oa->o_valid & OBD_MD_FLID) == 0 || ostid_id(&oa->o_oi) == 0) { 1400 /* disallow use of object id 0 */ 1401 CERROR ("No valid oid\n"); 1402 return -EINVAL; 1403 } 1404 1405 rc = echo_alloc_memmd(ed, &lsm); 1406 if (rc < 0) 1407 return rc; 1408 1409 lsm->lsm_oi = oa->o_oi; 1410 if (!(oa->o_valid & OBD_MD_FLGROUP)) 1411 ostid_set_seq_echo(&lsm->lsm_oi); 1412 1413 rc = 0; 1414 eco = cl_echo_object_find(ed, &lsm); 1415 if (!IS_ERR(eco)) 1416 *ecop = eco; 1417 else 1418 rc = PTR_ERR(eco); 1419 if (lsm) 1420 echo_free_memmd(ed, &lsm); 1421 return rc; 1422} 1423 1424static void echo_put_object(struct echo_object *eco) 1425{ 1426 if (cl_echo_object_put(eco)) 1427 CERROR("echo client: drop an object failed"); 1428} 1429 1430static void 1431echo_get_stripe_off_id(struct lov_stripe_md *lsm, u64 *offp, u64 *idp) 1432{ 1433 unsigned long stripe_count; 1434 unsigned long stripe_size; 1435 unsigned long width; 1436 unsigned long woffset; 1437 int stripe_index; 1438 u64 offset; 1439 1440 if (lsm->lsm_stripe_count <= 1) 1441 return; 1442 1443 offset = *offp; 1444 stripe_size = lsm->lsm_stripe_size; 1445 stripe_count = lsm->lsm_stripe_count; 1446 1447 /* width = # bytes in all stripes */ 1448 width = stripe_size * stripe_count; 1449 1450 /* woffset = offset within a width; offset = whole number of widths */ 1451 woffset = do_div (offset, width); 1452 1453 stripe_index = woffset / stripe_size; 1454 1455 *idp = ostid_id(&lsm->lsm_oinfo[stripe_index]->loi_oi); 1456 *offp = offset * stripe_size + woffset % stripe_size; 1457} 1458 1459static void 1460echo_client_page_debug_setup(struct lov_stripe_md *lsm, 1461 struct page *page, int rw, u64 id, 1462 u64 offset, u64 count) 1463{ 1464 char *addr; 1465 u64 stripe_off; 1466 u64 stripe_id; 1467 int delta; 1468 1469 /* no partial pages on the client */ 1470 LASSERT(count == PAGE_CACHE_SIZE); 1471 1472 addr = kmap(page); 1473 1474 for (delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) { 1475 if (rw == OBD_BRW_WRITE) { 1476 stripe_off = offset + delta; 1477 stripe_id = id; 1478 echo_get_stripe_off_id(lsm, &stripe_off, &stripe_id); 1479 } else { 1480 stripe_off = 0xdeadbeef00c0ffeeULL; 1481 stripe_id = 0xdeadbeef00c0ffeeULL; 1482 } 1483 block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE, 1484 stripe_off, stripe_id); 1485 } 1486 1487 kunmap(page); 1488} 1489 1490static int echo_client_page_debug_check(struct lov_stripe_md *lsm, 1491 struct page *page, u64 id, 1492 u64 offset, u64 count) 1493{ 1494 u64 stripe_off; 1495 u64 stripe_id; 1496 char *addr; 1497 int delta; 1498 int rc; 1499 int rc2; 1500 1501 /* no partial pages on the client */ 1502 LASSERT(count == PAGE_CACHE_SIZE); 1503 1504 addr = kmap(page); 1505 1506 for (rc = delta = 0; delta < PAGE_CACHE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) { 1507 stripe_off = offset + delta; 1508 stripe_id = id; 1509 echo_get_stripe_off_id (lsm, &stripe_off, &stripe_id); 1510 1511 rc2 = block_debug_check("test_brw", 1512 addr + delta, OBD_ECHO_BLOCK_SIZE, 1513 stripe_off, stripe_id); 1514 if (rc2 != 0) { 1515 CERROR ("Error in echo object %#llx\n", id); 1516 rc = rc2; 1517 } 1518 } 1519 1520 kunmap(page); 1521 return rc; 1522} 1523 1524static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa, 1525 struct echo_object *eco, u64 offset, 1526 u64 count, int async, 1527 struct obd_trans_info *oti) 1528{ 1529 struct lov_stripe_md *lsm = eco->eo_lsm; 1530 u32 npages; 1531 struct brw_page *pga; 1532 struct brw_page *pgp; 1533 struct page **pages; 1534 u64 off; 1535 int i; 1536 int rc; 1537 int verify; 1538 gfp_t gfp_mask; 1539 int brw_flags = 0; 1540 1541 verify = (ostid_id(&oa->o_oi) != ECHO_PERSISTENT_OBJID && 1542 (oa->o_valid & OBD_MD_FLFLAGS) != 0 && 1543 (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0); 1544 1545 gfp_mask = ((ostid_id(&oa->o_oi) & 2) == 0) ? GFP_IOFS : GFP_HIGHUSER; 1546 1547 LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ); 1548 LASSERT(lsm != NULL); 1549 LASSERT(ostid_id(&lsm->lsm_oi) == ostid_id(&oa->o_oi)); 1550 1551 if (count <= 0 || 1552 (count & (~CFS_PAGE_MASK)) != 0) 1553 return -EINVAL; 1554 1555 /* XXX think again with misaligned I/O */ 1556 npages = count >> PAGE_CACHE_SHIFT; 1557 1558 if (rw == OBD_BRW_WRITE) 1559 brw_flags = OBD_BRW_ASYNC; 1560 1561 OBD_ALLOC(pga, npages * sizeof(*pga)); 1562 if (pga == NULL) 1563 return -ENOMEM; 1564 1565 OBD_ALLOC(pages, npages * sizeof(*pages)); 1566 if (pages == NULL) { 1567 OBD_FREE(pga, npages * sizeof(*pga)); 1568 return -ENOMEM; 1569 } 1570 1571 for (i = 0, pgp = pga, off = offset; 1572 i < npages; 1573 i++, pgp++, off += PAGE_CACHE_SIZE) { 1574 1575 LASSERT (pgp->pg == NULL); /* for cleanup */ 1576 1577 rc = -ENOMEM; 1578 OBD_PAGE_ALLOC(pgp->pg, gfp_mask); 1579 if (pgp->pg == NULL) 1580 goto out; 1581 1582 pages[i] = pgp->pg; 1583 pgp->count = PAGE_CACHE_SIZE; 1584 pgp->off = off; 1585 pgp->flag = brw_flags; 1586 1587 if (verify) 1588 echo_client_page_debug_setup(lsm, pgp->pg, rw, 1589 ostid_id(&oa->o_oi), off, 1590 pgp->count); 1591 } 1592 1593 /* brw mode can only be used at client */ 1594 LASSERT(ed->ed_next != NULL); 1595 rc = cl_echo_object_brw(eco, rw, offset, pages, npages, async); 1596 1597 out: 1598 if (rc != 0 || rw != OBD_BRW_READ) 1599 verify = 0; 1600 1601 for (i = 0, pgp = pga; i < npages; i++, pgp++) { 1602 if (pgp->pg == NULL) 1603 continue; 1604 1605 if (verify) { 1606 int vrc; 1607 vrc = echo_client_page_debug_check(lsm, pgp->pg, 1608 ostid_id(&oa->o_oi), 1609 pgp->off, pgp->count); 1610 if (vrc != 0 && rc == 0) 1611 rc = vrc; 1612 } 1613 OBD_PAGE_FREE(pgp->pg); 1614 } 1615 OBD_FREE(pga, npages * sizeof(*pga)); 1616 OBD_FREE(pages, npages * sizeof(*pages)); 1617 return rc; 1618} 1619 1620static int echo_client_prep_commit(const struct lu_env *env, 1621 struct obd_export *exp, int rw, 1622 struct obdo *oa, struct echo_object *eco, 1623 u64 offset, u64 count, 1624 u64 batch, struct obd_trans_info *oti, 1625 int async) 1626{ 1627 struct lov_stripe_md *lsm = eco->eo_lsm; 1628 struct obd_ioobj ioo; 1629 struct niobuf_local *lnb; 1630 struct niobuf_remote *rnb; 1631 u64 off; 1632 u64 npages, tot_pages; 1633 int i, ret = 0, brw_flags = 0; 1634 1635 if (count <= 0 || (count & (~CFS_PAGE_MASK)) != 0 || 1636 (lsm != NULL && ostid_id(&lsm->lsm_oi) != ostid_id(&oa->o_oi))) 1637 return -EINVAL; 1638 1639 npages = batch >> PAGE_CACHE_SHIFT; 1640 tot_pages = count >> PAGE_CACHE_SHIFT; 1641 1642 OBD_ALLOC(lnb, npages * sizeof(struct niobuf_local)); 1643 OBD_ALLOC(rnb, npages * sizeof(struct niobuf_remote)); 1644 1645 if (lnb == NULL || rnb == NULL) 1646 GOTO(out, ret = -ENOMEM); 1647 1648 if (rw == OBD_BRW_WRITE && async) 1649 brw_flags |= OBD_BRW_ASYNC; 1650 1651 obdo_to_ioobj(oa, &ioo); 1652 1653 off = offset; 1654 1655 for (; tot_pages; tot_pages -= npages) { 1656 int lpages; 1657 1658 if (tot_pages < npages) 1659 npages = tot_pages; 1660 1661 for (i = 0; i < npages; i++, off += PAGE_CACHE_SIZE) { 1662 rnb[i].offset = off; 1663 rnb[i].len = PAGE_CACHE_SIZE; 1664 rnb[i].flags = brw_flags; 1665 } 1666 1667 ioo.ioo_bufcnt = npages; 1668 oti->oti_transno = 0; 1669 1670 lpages = npages; 1671 ret = obd_preprw(env, rw, exp, oa, 1, &ioo, rnb, &lpages, 1672 lnb, oti, NULL); 1673 if (ret != 0) 1674 GOTO(out, ret); 1675 LASSERT(lpages == npages); 1676 1677 for (i = 0; i < lpages; i++) { 1678 struct page *page = lnb[i].page; 1679 1680 /* read past eof? */ 1681 if (page == NULL && lnb[i].rc == 0) 1682 continue; 1683 1684 if (async) 1685 lnb[i].flags |= OBD_BRW_ASYNC; 1686 1687 if (ostid_id(&oa->o_oi) == ECHO_PERSISTENT_OBJID || 1688 (oa->o_valid & OBD_MD_FLFLAGS) == 0 || 1689 (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0) 1690 continue; 1691 1692 if (rw == OBD_BRW_WRITE) 1693 echo_client_page_debug_setup(lsm, page, rw, 1694 ostid_id(&oa->o_oi), 1695 rnb[i].offset, 1696 rnb[i].len); 1697 else 1698 echo_client_page_debug_check(lsm, page, 1699 ostid_id(&oa->o_oi), 1700 rnb[i].offset, 1701 rnb[i].len); 1702 } 1703 1704 ret = obd_commitrw(env, rw, exp, oa, 1, &ioo, 1705 rnb, npages, lnb, oti, ret); 1706 if (ret != 0) 1707 GOTO(out, ret); 1708 1709 /* Reset oti otherwise it would confuse ldiskfs. */ 1710 memset(oti, 0, sizeof(*oti)); 1711 1712 /* Reuse env context. */ 1713 lu_context_exit((struct lu_context *)&env->le_ctx); 1714 lu_context_enter((struct lu_context *)&env->le_ctx); 1715 } 1716 1717out: 1718 if (lnb) 1719 OBD_FREE(lnb, npages * sizeof(struct niobuf_local)); 1720 if (rnb) 1721 OBD_FREE(rnb, npages * sizeof(struct niobuf_remote)); 1722 return ret; 1723} 1724 1725static int echo_client_brw_ioctl(const struct lu_env *env, int rw, 1726 struct obd_export *exp, 1727 struct obd_ioctl_data *data, 1728 struct obd_trans_info *dummy_oti) 1729{ 1730 struct obd_device *obd = class_exp2obd(exp); 1731 struct echo_device *ed = obd2echo_dev(obd); 1732 struct echo_client_obd *ec = ed->ed_ec; 1733 struct obdo *oa = &data->ioc_obdo1; 1734 struct echo_object *eco; 1735 int rc; 1736 int async = 1; 1737 long test_mode; 1738 1739 LASSERT(oa->o_valid & OBD_MD_FLGROUP); 1740 1741 rc = echo_get_object(&eco, ed, oa); 1742 if (rc) 1743 return rc; 1744 1745 oa->o_valid &= ~OBD_MD_FLHANDLE; 1746 1747 /* OFD/obdfilter works only via prep/commit */ 1748 test_mode = (long)data->ioc_pbuf1; 1749 if (test_mode == 1) 1750 async = 0; 1751 1752 if (ed->ed_next == NULL && test_mode != 3) { 1753 test_mode = 3; 1754 data->ioc_plen1 = data->ioc_count; 1755 } 1756 1757 /* Truncate batch size to maximum */ 1758 if (data->ioc_plen1 > PTLRPC_MAX_BRW_SIZE) 1759 data->ioc_plen1 = PTLRPC_MAX_BRW_SIZE; 1760 1761 switch (test_mode) { 1762 case 1: 1763 /* fall through */ 1764 case 2: 1765 rc = echo_client_kbrw(ed, rw, oa, 1766 eco, data->ioc_offset, 1767 data->ioc_count, async, dummy_oti); 1768 break; 1769 case 3: 1770 rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa, 1771 eco, data->ioc_offset, 1772 data->ioc_count, data->ioc_plen1, 1773 dummy_oti, async); 1774 break; 1775 default: 1776 rc = -EINVAL; 1777 } 1778 echo_put_object(eco); 1779 return rc; 1780} 1781 1782static int 1783echo_client_enqueue(struct obd_export *exp, struct obdo *oa, 1784 int mode, u64 offset, u64 nob) 1785{ 1786 struct echo_device *ed = obd2echo_dev(exp->exp_obd); 1787 struct lustre_handle *ulh = &oa->o_handle; 1788 struct echo_object *eco; 1789 u64 end; 1790 int rc; 1791 1792 if (ed->ed_next == NULL) 1793 return -EOPNOTSUPP; 1794 1795 if (!(mode == LCK_PR || mode == LCK_PW)) 1796 return -EINVAL; 1797 1798 if ((offset & (~CFS_PAGE_MASK)) != 0 || 1799 (nob & (~CFS_PAGE_MASK)) != 0) 1800 return -EINVAL; 1801 1802 rc = echo_get_object (&eco, ed, oa); 1803 if (rc != 0) 1804 return rc; 1805 1806 end = (nob == 0) ? ((u64) -1) : (offset + nob - 1); 1807 rc = cl_echo_enqueue(eco, offset, end, mode, &ulh->cookie); 1808 if (rc == 0) { 1809 oa->o_valid |= OBD_MD_FLHANDLE; 1810 CDEBUG(D_INFO, "Cookie is %#llx\n", ulh->cookie); 1811 } 1812 echo_put_object(eco); 1813 return rc; 1814} 1815 1816static int 1817echo_client_cancel(struct obd_export *exp, struct obdo *oa) 1818{ 1819 struct echo_device *ed = obd2echo_dev(exp->exp_obd); 1820 __u64 cookie = oa->o_handle.cookie; 1821 1822 if ((oa->o_valid & OBD_MD_FLHANDLE) == 0) 1823 return -EINVAL; 1824 1825 CDEBUG(D_INFO, "Cookie is %#llx\n", cookie); 1826 return cl_echo_cancel(ed, cookie); 1827} 1828 1829static int 1830echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len, 1831 void *karg, void *uarg) 1832{ 1833 struct obd_device *obd = exp->exp_obd; 1834 struct echo_device *ed = obd2echo_dev(obd); 1835 struct echo_client_obd *ec = ed->ed_ec; 1836 struct echo_object *eco; 1837 struct obd_ioctl_data *data = karg; 1838 struct obd_trans_info dummy_oti; 1839 struct lu_env *env; 1840 struct oti_req_ack_lock *ack_lock; 1841 struct obdo *oa; 1842 struct lu_fid fid; 1843 int rw = OBD_BRW_READ; 1844 int rc = 0; 1845 int i; 1846 1847 memset(&dummy_oti, 0, sizeof(dummy_oti)); 1848 1849 oa = &data->ioc_obdo1; 1850 if (!(oa->o_valid & OBD_MD_FLGROUP)) { 1851 oa->o_valid |= OBD_MD_FLGROUP; 1852 ostid_set_seq_echo(&oa->o_oi); 1853 } 1854 1855 /* This FID is unpacked just for validation at this point */ 1856 rc = ostid_to_fid(&fid, &oa->o_oi, 0); 1857 if (rc < 0) 1858 return rc; 1859 1860 OBD_ALLOC_PTR(env); 1861 if (env == NULL) 1862 return -ENOMEM; 1863 1864 rc = lu_env_init(env, LCT_DT_THREAD); 1865 if (rc) 1866 GOTO(out, rc = -ENOMEM); 1867 1868 switch (cmd) { 1869 case OBD_IOC_CREATE: /* may create echo object */ 1870 if (!capable(CFS_CAP_SYS_ADMIN)) 1871 GOTO (out, rc = -EPERM); 1872 1873 rc = echo_create_object(env, ed, 1, oa, data->ioc_pbuf1, 1874 data->ioc_plen1, &dummy_oti); 1875 GOTO(out, rc); 1876 1877 case OBD_IOC_DESTROY: 1878 if (!capable(CFS_CAP_SYS_ADMIN)) 1879 GOTO (out, rc = -EPERM); 1880 1881 rc = echo_get_object(&eco, ed, oa); 1882 if (rc == 0) { 1883 rc = obd_destroy(env, ec->ec_exp, oa, eco->eo_lsm, 1884 &dummy_oti, NULL, NULL); 1885 if (rc == 0) 1886 eco->eo_deleted = 1; 1887 echo_put_object(eco); 1888 } 1889 GOTO(out, rc); 1890 1891 case OBD_IOC_GETATTR: 1892 rc = echo_get_object(&eco, ed, oa); 1893 if (rc == 0) { 1894 struct obd_info oinfo = { { { 0 } } }; 1895 oinfo.oi_md = eco->eo_lsm; 1896 oinfo.oi_oa = oa; 1897 rc = obd_getattr(env, ec->ec_exp, &oinfo); 1898 echo_put_object(eco); 1899 } 1900 GOTO(out, rc); 1901 1902 case OBD_IOC_SETATTR: 1903 if (!capable(CFS_CAP_SYS_ADMIN)) 1904 GOTO (out, rc = -EPERM); 1905 1906 rc = echo_get_object(&eco, ed, oa); 1907 if (rc == 0) { 1908 struct obd_info oinfo = { { { 0 } } }; 1909 oinfo.oi_oa = oa; 1910 oinfo.oi_md = eco->eo_lsm; 1911 1912 rc = obd_setattr(env, ec->ec_exp, &oinfo, NULL); 1913 echo_put_object(eco); 1914 } 1915 GOTO(out, rc); 1916 1917 case OBD_IOC_BRW_WRITE: 1918 if (!capable(CFS_CAP_SYS_ADMIN)) 1919 GOTO (out, rc = -EPERM); 1920 1921 rw = OBD_BRW_WRITE; 1922 /* fall through */ 1923 case OBD_IOC_BRW_READ: 1924 rc = echo_client_brw_ioctl(env, rw, exp, data, &dummy_oti); 1925 GOTO(out, rc); 1926 1927 case ECHO_IOC_GET_STRIPE: 1928 rc = echo_get_object(&eco, ed, oa); 1929 if (rc == 0) { 1930 rc = echo_copyout_lsm(eco->eo_lsm, data->ioc_pbuf1, 1931 data->ioc_plen1); 1932 echo_put_object(eco); 1933 } 1934 GOTO(out, rc); 1935 1936 case ECHO_IOC_SET_STRIPE: 1937 if (!capable(CFS_CAP_SYS_ADMIN)) 1938 GOTO (out, rc = -EPERM); 1939 1940 if (data->ioc_pbuf1 == NULL) { /* unset */ 1941 rc = echo_get_object(&eco, ed, oa); 1942 if (rc == 0) { 1943 eco->eo_deleted = 1; 1944 echo_put_object(eco); 1945 } 1946 } else { 1947 rc = echo_create_object(env, ed, 0, oa, 1948 data->ioc_pbuf1, 1949 data->ioc_plen1, &dummy_oti); 1950 } 1951 GOTO (out, rc); 1952 1953 case ECHO_IOC_ENQUEUE: 1954 if (!capable(CFS_CAP_SYS_ADMIN)) 1955 GOTO (out, rc = -EPERM); 1956 1957 rc = echo_client_enqueue(exp, oa, 1958 data->ioc_conn1, /* lock mode */ 1959 data->ioc_offset, 1960 data->ioc_count);/*extent*/ 1961 GOTO (out, rc); 1962 1963 case ECHO_IOC_CANCEL: 1964 rc = echo_client_cancel(exp, oa); 1965 GOTO (out, rc); 1966 1967 default: 1968 CERROR ("echo_ioctl(): unrecognised ioctl %#x\n", cmd); 1969 GOTO (out, rc = -ENOTTY); 1970 } 1971 1972out: 1973 lu_env_fini(env); 1974 OBD_FREE_PTR(env); 1975 1976 /* XXX this should be in a helper also called by target_send_reply */ 1977 for (ack_lock = dummy_oti.oti_ack_locks, i = 0; i < 4; 1978 i++, ack_lock++) { 1979 if (!ack_lock->mode) 1980 break; 1981 ldlm_lock_decref(&ack_lock->lock, ack_lock->mode); 1982 } 1983 1984 return rc; 1985} 1986 1987static int echo_client_setup(const struct lu_env *env, 1988 struct obd_device *obddev, struct lustre_cfg *lcfg) 1989{ 1990 struct echo_client_obd *ec = &obddev->u.echo_client; 1991 struct obd_device *tgt; 1992 struct obd_uuid echo_uuid = { "ECHO_UUID" }; 1993 struct obd_connect_data *ocd = NULL; 1994 int rc; 1995 1996 if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) { 1997 CERROR("requires a TARGET OBD name\n"); 1998 return -EINVAL; 1999 } 2000 2001 tgt = class_name2obd(lustre_cfg_string(lcfg, 1)); 2002 if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) { 2003 CERROR("device not attached or not set up (%s)\n", 2004 lustre_cfg_string(lcfg, 1)); 2005 return -EINVAL; 2006 } 2007 2008 spin_lock_init(&ec->ec_lock); 2009 INIT_LIST_HEAD (&ec->ec_objects); 2010 INIT_LIST_HEAD (&ec->ec_locks); 2011 ec->ec_unique = 0; 2012 ec->ec_nstripes = 0; 2013 2014 OBD_ALLOC(ocd, sizeof(*ocd)); 2015 if (ocd == NULL) { 2016 CERROR("Can't alloc ocd connecting to %s\n", 2017 lustre_cfg_string(lcfg, 1)); 2018 return -ENOMEM; 2019 } 2020 2021 ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL | 2022 OBD_CONNECT_BRW_SIZE | 2023 OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 | 2024 OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE | 2025 OBD_CONNECT_FID; 2026 ocd->ocd_brw_size = DT_MAX_BRW_SIZE; 2027 ocd->ocd_version = LUSTRE_VERSION_CODE; 2028 ocd->ocd_group = FID_SEQ_ECHO; 2029 2030 rc = obd_connect(env, &ec->ec_exp, tgt, &echo_uuid, ocd, NULL); 2031 if (rc == 0) { 2032 /* Turn off pinger because it connects to tgt obd directly. */ 2033 spin_lock(&tgt->obd_dev_lock); 2034 list_del_init(&ec->ec_exp->exp_obd_chain_timed); 2035 spin_unlock(&tgt->obd_dev_lock); 2036 } 2037 2038 OBD_FREE(ocd, sizeof(*ocd)); 2039 2040 if (rc != 0) { 2041 CERROR("fail to connect to device %s\n", 2042 lustre_cfg_string(lcfg, 1)); 2043 return rc; 2044 } 2045 2046 return rc; 2047} 2048 2049static int echo_client_cleanup(struct obd_device *obddev) 2050{ 2051 struct echo_client_obd *ec = &obddev->u.echo_client; 2052 int rc; 2053 2054 if (!list_empty(&obddev->obd_exports)) { 2055 CERROR("still has clients!\n"); 2056 return -EBUSY; 2057 } 2058 2059 LASSERT(atomic_read(&ec->ec_exp->exp_refcount) > 0); 2060 rc = obd_disconnect(ec->ec_exp); 2061 if (rc != 0) 2062 CERROR("fail to disconnect device: %d\n", rc); 2063 2064 return rc; 2065} 2066 2067static int echo_client_connect(const struct lu_env *env, 2068 struct obd_export **exp, 2069 struct obd_device *src, struct obd_uuid *cluuid, 2070 struct obd_connect_data *data, void *localdata) 2071{ 2072 int rc; 2073 struct lustre_handle conn = { 0 }; 2074 2075 rc = class_connect(&conn, src, cluuid); 2076 if (rc == 0) { 2077 *exp = class_conn2export(&conn); 2078 } 2079 2080 return rc; 2081} 2082 2083static int echo_client_disconnect(struct obd_export *exp) 2084{ 2085 int rc; 2086 2087 if (exp == NULL) 2088 GOTO(out, rc = -EINVAL); 2089 2090 rc = class_disconnect(exp); 2091 GOTO(out, rc); 2092 out: 2093 return rc; 2094} 2095 2096static struct obd_ops echo_client_obd_ops = { 2097 .o_owner = THIS_MODULE, 2098 .o_iocontrol = echo_client_iocontrol, 2099 .o_connect = echo_client_connect, 2100 .o_disconnect = echo_client_disconnect 2101}; 2102 2103int echo_client_init(void) 2104{ 2105 struct lprocfs_static_vars lvars = { NULL }; 2106 int rc; 2107 2108 lprocfs_echo_init_vars(&lvars); 2109 2110 rc = lu_kmem_init(echo_caches); 2111 if (rc == 0) { 2112 rc = class_register_type(&echo_client_obd_ops, NULL, 2113 lvars.module_vars, 2114 LUSTRE_ECHO_CLIENT_NAME, 2115 &echo_device_type); 2116 if (rc) 2117 lu_kmem_fini(echo_caches); 2118 } 2119 return rc; 2120} 2121 2122void echo_client_exit(void) 2123{ 2124 class_unregister_type(LUSTRE_ECHO_CLIENT_NAME); 2125 lu_kmem_fini(echo_caches); 2126} 2127 2128static int __init obdecho_init(void) 2129{ 2130 struct lprocfs_static_vars lvars; 2131 int rc; 2132 2133 LCONSOLE_INFO("Echo OBD driver; http://www.lustre.org/\n"); 2134 2135 LASSERT(PAGE_CACHE_SIZE % OBD_ECHO_BLOCK_SIZE == 0); 2136 2137 lprocfs_echo_init_vars(&lvars); 2138 2139 2140 rc = echo_client_init(); 2141 2142 return rc; 2143} 2144 2145static void /*__exit*/ obdecho_exit(void) 2146{ 2147 echo_client_exit(); 2148 2149} 2150 2151MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>"); 2152MODULE_DESCRIPTION("Lustre Testing Echo OBD driver"); 2153MODULE_LICENSE("GPL"); 2154MODULE_VERSION(LUSTRE_VERSION_STRING); 2155 2156module_init(obdecho_init); 2157module_exit(obdecho_exit); 2158 2159/** @} echo_client */ 2160