1/* 2 * GPL HEADER START 3 * 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License version 2 only, 8 * as published by the Free Software Foundation. 9 * 10 * This program is distributed in the hope that it will be useful, but 11 * WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * General Public License version 2 for more details (a copy is included 14 * in the LICENSE file that accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License 17 * version 2 along with this program; If not, see 18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf 19 * 20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 21 * CA 95054 USA or visit www.sun.com if you need additional information or 22 * have any questions. 23 * 24 * GPL HEADER END 25 */ 26/* 27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. 28 * Use is subject to license terms. 29 * 30 * Copyright (c) 2011, 2012, Intel Corporation. 31 */ 32/* 33 * This file is part of Lustre, http://www.lustre.org/ 34 * Lustre is a trademark of Sun Microsystems, Inc. 35 * 36 * lustre/obdclass/lu_object.c 37 * 38 * Lustre Object. 39 * These are the only exported functions, they provide some generic 40 * infrastructure for managing object devices 41 * 42 * Author: Nikita Danilov <nikita.danilov@sun.com> 43 */ 44 45#define DEBUG_SUBSYSTEM S_CLASS 46 47#include "../../include/linux/libcfs/libcfs.h" 48 49# include <linux/module.h> 50 51/* hash_long() */ 52#include "../../include/linux/libcfs/libcfs_hash.h" 53#include "../include/obd_class.h" 54#include "../include/obd_support.h" 55#include "../include/lustre_disk.h" 56#include "../include/lustre_fid.h" 57#include "../include/lu_object.h" 58#include "../include/lu_ref.h" 59#include <linux/list.h> 60 61static void lu_object_free(const struct lu_env *env, struct lu_object *o); 62 63/** 64 * Decrease reference counter on object. If last reference is freed, return 65 * object to the cache, unless lu_object_is_dying(o) holds. In the latter 66 * case, free object immediately. 67 */ 68void lu_object_put(const struct lu_env *env, struct lu_object *o) 69{ 70 struct lu_site_bkt_data *bkt; 71 struct lu_object_header *top; 72 struct lu_site *site; 73 struct lu_object *orig; 74 struct cfs_hash_bd bd; 75 const struct lu_fid *fid; 76 77 top = o->lo_header; 78 site = o->lo_dev->ld_site; 79 orig = o; 80 81 /* 82 * till we have full fids-on-OST implemented anonymous objects 83 * are possible in OSP. such an object isn't listed in the site 84 * so we should not remove it from the site. 85 */ 86 fid = lu_object_fid(o); 87 if (fid_is_zero(fid)) { 88 LASSERT(top->loh_hash.next == NULL 89 && top->loh_hash.pprev == NULL); 90 LASSERT(list_empty(&top->loh_lru)); 91 if (!atomic_dec_and_test(&top->loh_ref)) 92 return; 93 list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) { 94 if (o->lo_ops->loo_object_release != NULL) 95 o->lo_ops->loo_object_release(env, o); 96 } 97 lu_object_free(env, orig); 98 return; 99 } 100 101 cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd); 102 bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd); 103 104 if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) { 105 if (lu_object_is_dying(top)) { 106 107 /* 108 * somebody may be waiting for this, currently only 109 * used for cl_object, see cl_object_put_last(). 110 */ 111 wake_up_all(&bkt->lsb_marche_funebre); 112 } 113 return; 114 } 115 116 LASSERT(bkt->lsb_busy > 0); 117 bkt->lsb_busy--; 118 /* 119 * When last reference is released, iterate over object 120 * layers, and notify them that object is no longer busy. 121 */ 122 list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) { 123 if (o->lo_ops->loo_object_release != NULL) 124 o->lo_ops->loo_object_release(env, o); 125 } 126 127 if (!lu_object_is_dying(top)) { 128 LASSERT(list_empty(&top->loh_lru)); 129 list_add_tail(&top->loh_lru, &bkt->lsb_lru); 130 cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1); 131 return; 132 } 133 134 /* 135 * If object is dying (will not be cached), removed it 136 * from hash table and LRU. 137 * 138 * This is done with hash table and LRU lists locked. As the only 139 * way to acquire first reference to previously unreferenced 140 * object is through hash-table lookup (lu_object_find()), 141 * or LRU scanning (lu_site_purge()), that are done under hash-table 142 * and LRU lock, no race with concurrent object lookup is possible 143 * and we can safely destroy object below. 144 */ 145 if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags)) 146 cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash); 147 cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1); 148 /* 149 * Object was already removed from hash and lru above, can 150 * kill it. 151 */ 152 lu_object_free(env, orig); 153} 154EXPORT_SYMBOL(lu_object_put); 155 156/** 157 * Put object and don't keep in cache. This is temporary solution for 158 * multi-site objects when its layering is not constant. 159 */ 160void lu_object_put_nocache(const struct lu_env *env, struct lu_object *o) 161{ 162 set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags); 163 return lu_object_put(env, o); 164} 165EXPORT_SYMBOL(lu_object_put_nocache); 166 167/** 168 * Kill the object and take it out of LRU cache. 169 * Currently used by client code for layout change. 170 */ 171void lu_object_unhash(const struct lu_env *env, struct lu_object *o) 172{ 173 struct lu_object_header *top; 174 175 top = o->lo_header; 176 set_bit(LU_OBJECT_HEARD_BANSHEE, &top->loh_flags); 177 if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags)) { 178 struct cfs_hash *obj_hash = o->lo_dev->ld_site->ls_obj_hash; 179 struct cfs_hash_bd bd; 180 181 cfs_hash_bd_get_and_lock(obj_hash, &top->loh_fid, &bd, 1); 182 list_del_init(&top->loh_lru); 183 cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash); 184 cfs_hash_bd_unlock(obj_hash, &bd, 1); 185 } 186} 187EXPORT_SYMBOL(lu_object_unhash); 188 189/** 190 * Allocate new object. 191 * 192 * This follows object creation protocol, described in the comment within 193 * struct lu_device_operations definition. 194 */ 195static struct lu_object *lu_object_alloc(const struct lu_env *env, 196 struct lu_device *dev, 197 const struct lu_fid *f, 198 const struct lu_object_conf *conf) 199{ 200 struct lu_object *scan; 201 struct lu_object *top; 202 struct list_head *layers; 203 unsigned int init_mask = 0; 204 unsigned int init_flag; 205 int clean; 206 int result; 207 208 /* 209 * Create top-level object slice. This will also create 210 * lu_object_header. 211 */ 212 top = dev->ld_ops->ldo_object_alloc(env, NULL, dev); 213 if (top == NULL) 214 return ERR_PTR(-ENOMEM); 215 if (IS_ERR(top)) 216 return top; 217 /* 218 * This is the only place where object fid is assigned. It's constant 219 * after this point. 220 */ 221 top->lo_header->loh_fid = *f; 222 layers = &top->lo_header->loh_layers; 223 224 do { 225 /* 226 * Call ->loo_object_init() repeatedly, until no more new 227 * object slices are created. 228 */ 229 clean = 1; 230 init_flag = 1; 231 list_for_each_entry(scan, layers, lo_linkage) { 232 if (init_mask & init_flag) 233 goto next; 234 clean = 0; 235 scan->lo_header = top->lo_header; 236 result = scan->lo_ops->loo_object_init(env, scan, conf); 237 if (result != 0) { 238 lu_object_free(env, top); 239 return ERR_PTR(result); 240 } 241 init_mask |= init_flag; 242next: 243 init_flag <<= 1; 244 } 245 } while (!clean); 246 247 list_for_each_entry_reverse(scan, layers, lo_linkage) { 248 if (scan->lo_ops->loo_object_start != NULL) { 249 result = scan->lo_ops->loo_object_start(env, scan); 250 if (result != 0) { 251 lu_object_free(env, top); 252 return ERR_PTR(result); 253 } 254 } 255 } 256 257 lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED); 258 return top; 259} 260 261/** 262 * Free an object. 263 */ 264static void lu_object_free(const struct lu_env *env, struct lu_object *o) 265{ 266 struct lu_site_bkt_data *bkt; 267 struct lu_site *site; 268 struct lu_object *scan; 269 struct list_head *layers; 270 struct list_head splice; 271 272 site = o->lo_dev->ld_site; 273 layers = &o->lo_header->loh_layers; 274 bkt = lu_site_bkt_from_fid(site, &o->lo_header->loh_fid); 275 /* 276 * First call ->loo_object_delete() method to release all resources. 277 */ 278 list_for_each_entry_reverse(scan, layers, lo_linkage) { 279 if (scan->lo_ops->loo_object_delete != NULL) 280 scan->lo_ops->loo_object_delete(env, scan); 281 } 282 283 /* 284 * Then, splice object layers into stand-alone list, and call 285 * ->loo_object_free() on all layers to free memory. Splice is 286 * necessary, because lu_object_header is freed together with the 287 * top-level slice. 288 */ 289 INIT_LIST_HEAD(&splice); 290 list_splice_init(layers, &splice); 291 while (!list_empty(&splice)) { 292 /* 293 * Free layers in bottom-to-top order, so that object header 294 * lives as long as possible and ->loo_object_free() methods 295 * can look at its contents. 296 */ 297 o = container_of0(splice.prev, struct lu_object, lo_linkage); 298 list_del_init(&o->lo_linkage); 299 LASSERT(o->lo_ops->loo_object_free != NULL); 300 o->lo_ops->loo_object_free(env, o); 301 } 302 303 if (waitqueue_active(&bkt->lsb_marche_funebre)) 304 wake_up_all(&bkt->lsb_marche_funebre); 305} 306 307/** 308 * Free \a nr objects from the cold end of the site LRU list. 309 */ 310int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr) 311{ 312 struct lu_object_header *h; 313 struct lu_object_header *temp; 314 struct lu_site_bkt_data *bkt; 315 struct cfs_hash_bd bd; 316 struct cfs_hash_bd bd2; 317 struct list_head dispose; 318 int did_sth; 319 int start; 320 int count; 321 int bnr; 322 int i; 323 324 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_NO_LRU)) 325 return 0; 326 327 INIT_LIST_HEAD(&dispose); 328 /* 329 * Under LRU list lock, scan LRU list and move unreferenced objects to 330 * the dispose list, removing them from LRU and hash table. 331 */ 332 start = s->ls_purge_start; 333 bnr = (nr == ~0) ? -1 : nr / CFS_HASH_NBKT(s->ls_obj_hash) + 1; 334 again: 335 did_sth = 0; 336 cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) { 337 if (i < start) 338 continue; 339 count = bnr; 340 cfs_hash_bd_lock(s->ls_obj_hash, &bd, 1); 341 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd); 342 343 list_for_each_entry_safe(h, temp, &bkt->lsb_lru, loh_lru) { 344 LASSERT(atomic_read(&h->loh_ref) == 0); 345 346 cfs_hash_bd_get(s->ls_obj_hash, &h->loh_fid, &bd2); 347 LASSERT(bd.bd_bucket == bd2.bd_bucket); 348 349 cfs_hash_bd_del_locked(s->ls_obj_hash, 350 &bd2, &h->loh_hash); 351 list_move(&h->loh_lru, &dispose); 352 if (did_sth == 0) 353 did_sth = 1; 354 355 if (nr != ~0 && --nr == 0) 356 break; 357 358 if (count > 0 && --count == 0) 359 break; 360 361 } 362 cfs_hash_bd_unlock(s->ls_obj_hash, &bd, 1); 363 cond_resched(); 364 /* 365 * Free everything on the dispose list. This is safe against 366 * races due to the reasons described in lu_object_put(). 367 */ 368 while (!list_empty(&dispose)) { 369 h = container_of0(dispose.next, 370 struct lu_object_header, loh_lru); 371 list_del_init(&h->loh_lru); 372 lu_object_free(env, lu_object_top(h)); 373 lprocfs_counter_incr(s->ls_stats, LU_SS_LRU_PURGED); 374 } 375 376 if (nr == 0) 377 break; 378 } 379 380 if (nr != 0 && did_sth && start != 0) { 381 start = 0; /* restart from the first bucket */ 382 goto again; 383 } 384 /* race on s->ls_purge_start, but nobody cares */ 385 s->ls_purge_start = i % CFS_HASH_NBKT(s->ls_obj_hash); 386 387 return nr; 388} 389EXPORT_SYMBOL(lu_site_purge); 390 391/* 392 * Object printing. 393 * 394 * Code below has to jump through certain loops to output object description 395 * into libcfs_debug_msg-based log. The problem is that lu_object_print() 396 * composes object description from strings that are parts of _lines_ of 397 * output (i.e., strings that are not terminated by newline). This doesn't fit 398 * very well into libcfs_debug_msg() interface that assumes that each message 399 * supplied to it is a self-contained output line. 400 * 401 * To work around this, strings are collected in a temporary buffer 402 * (implemented as a value of lu_cdebug_key key), until terminating newline 403 * character is detected. 404 * 405 */ 406 407enum { 408 /** 409 * Maximal line size. 410 * 411 * XXX overflow is not handled correctly. 412 */ 413 LU_CDEBUG_LINE = 512 414}; 415 416struct lu_cdebug_data { 417 /** 418 * Temporary buffer. 419 */ 420 char lck_area[LU_CDEBUG_LINE]; 421}; 422 423/* context key constructor/destructor: lu_global_key_init, lu_global_key_fini */ 424LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data); 425 426/** 427 * Key, holding temporary buffer. This key is registered very early by 428 * lu_global_init(). 429 */ 430struct lu_context_key lu_global_key = { 431 .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD | 432 LCT_MG_THREAD | LCT_CL_THREAD | LCT_LOCAL, 433 .lct_init = lu_global_key_init, 434 .lct_fini = lu_global_key_fini 435}; 436 437/** 438 * Printer function emitting messages through libcfs_debug_msg(). 439 */ 440int lu_cdebug_printer(const struct lu_env *env, 441 void *cookie, const char *format, ...) 442{ 443 struct libcfs_debug_msg_data *msgdata = cookie; 444 struct lu_cdebug_data *key; 445 int used; 446 int complete; 447 va_list args; 448 449 va_start(args, format); 450 451 key = lu_context_key_get(&env->le_ctx, &lu_global_key); 452 LASSERT(key != NULL); 453 454 used = strlen(key->lck_area); 455 complete = format[strlen(format) - 1] == '\n'; 456 /* 457 * Append new chunk to the buffer. 458 */ 459 vsnprintf(key->lck_area + used, 460 ARRAY_SIZE(key->lck_area) - used, format, args); 461 if (complete) { 462 if (cfs_cdebug_show(msgdata->msg_mask, msgdata->msg_subsys)) 463 libcfs_debug_msg(msgdata, "%s", key->lck_area); 464 key->lck_area[0] = 0; 465 } 466 va_end(args); 467 return 0; 468} 469EXPORT_SYMBOL(lu_cdebug_printer); 470 471/** 472 * Print object header. 473 */ 474void lu_object_header_print(const struct lu_env *env, void *cookie, 475 lu_printer_t printer, 476 const struct lu_object_header *hdr) 477{ 478 (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]", 479 hdr, hdr->loh_flags, atomic_read(&hdr->loh_ref), 480 PFID(&hdr->loh_fid), 481 hlist_unhashed(&hdr->loh_hash) ? "" : " hash", 482 list_empty((struct list_head *)&hdr->loh_lru) ? \ 483 "" : " lru", 484 hdr->loh_attr & LOHA_EXISTS ? " exist":""); 485} 486EXPORT_SYMBOL(lu_object_header_print); 487 488/** 489 * Print human readable representation of the \a o to the \a printer. 490 */ 491void lu_object_print(const struct lu_env *env, void *cookie, 492 lu_printer_t printer, const struct lu_object *o) 493{ 494 static const char ruler[] = "........................................"; 495 struct lu_object_header *top; 496 int depth = 4; 497 498 top = o->lo_header; 499 lu_object_header_print(env, cookie, printer, top); 500 (*printer)(env, cookie, "{\n"); 501 502 list_for_each_entry(o, &top->loh_layers, lo_linkage) { 503 /* 504 * print `.' \a depth times followed by type name and address 505 */ 506 (*printer)(env, cookie, "%*.*s%s@%p", depth, depth, ruler, 507 o->lo_dev->ld_type->ldt_name, o); 508 509 if (o->lo_ops->loo_object_print != NULL) 510 (*o->lo_ops->loo_object_print)(env, cookie, printer, o); 511 512 (*printer)(env, cookie, "\n"); 513 } 514 515 (*printer)(env, cookie, "} header@%p\n", top); 516} 517EXPORT_SYMBOL(lu_object_print); 518 519/** 520 * Check object consistency. 521 */ 522int lu_object_invariant(const struct lu_object *o) 523{ 524 struct lu_object_header *top; 525 526 top = o->lo_header; 527 list_for_each_entry(o, &top->loh_layers, lo_linkage) { 528 if (o->lo_ops->loo_object_invariant != NULL && 529 !o->lo_ops->loo_object_invariant(o)) 530 return 0; 531 } 532 return 1; 533} 534EXPORT_SYMBOL(lu_object_invariant); 535 536static struct lu_object *htable_lookup(struct lu_site *s, 537 struct cfs_hash_bd *bd, 538 const struct lu_fid *f, 539 wait_queue_t *waiter, 540 __u64 *version) 541{ 542 struct lu_site_bkt_data *bkt; 543 struct lu_object_header *h; 544 struct hlist_node *hnode; 545 __u64 ver = cfs_hash_bd_version_get(bd); 546 547 if (*version == ver) 548 return ERR_PTR(-ENOENT); 549 550 *version = ver; 551 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd); 552 /* cfs_hash_bd_peek_locked is a somehow "internal" function 553 * of cfs_hash, it doesn't add refcount on object. */ 554 hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f); 555 if (hnode == NULL) { 556 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS); 557 return ERR_PTR(-ENOENT); 558 } 559 560 h = container_of0(hnode, struct lu_object_header, loh_hash); 561 if (likely(!lu_object_is_dying(h))) { 562 cfs_hash_get(s->ls_obj_hash, hnode); 563 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT); 564 list_del_init(&h->loh_lru); 565 return lu_object_top(h); 566 } 567 568 /* 569 * Lookup found an object being destroyed this object cannot be 570 * returned (to assure that references to dying objects are eventually 571 * drained), and moreover, lookup has to wait until object is freed. 572 */ 573 574 init_waitqueue_entry(waiter, current); 575 add_wait_queue(&bkt->lsb_marche_funebre, waiter); 576 set_current_state(TASK_UNINTERRUPTIBLE); 577 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_DEATH_RACE); 578 return ERR_PTR(-EAGAIN); 579} 580 581/** 582 * Search cache for an object with the fid \a f. If such object is found, 583 * return it. Otherwise, create new object, insert it into cache and return 584 * it. In any case, additional reference is acquired on the returned object. 585 */ 586struct lu_object *lu_object_find(const struct lu_env *env, 587 struct lu_device *dev, const struct lu_fid *f, 588 const struct lu_object_conf *conf) 589{ 590 return lu_object_find_at(env, dev->ld_site->ls_top_dev, f, conf); 591} 592EXPORT_SYMBOL(lu_object_find); 593 594static struct lu_object *lu_object_new(const struct lu_env *env, 595 struct lu_device *dev, 596 const struct lu_fid *f, 597 const struct lu_object_conf *conf) 598{ 599 struct lu_object *o; 600 struct cfs_hash *hs; 601 struct cfs_hash_bd bd; 602 struct lu_site_bkt_data *bkt; 603 604 o = lu_object_alloc(env, dev, f, conf); 605 if (unlikely(IS_ERR(o))) 606 return o; 607 608 hs = dev->ld_site->ls_obj_hash; 609 cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1); 610 bkt = cfs_hash_bd_extra_get(hs, &bd); 611 cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash); 612 bkt->lsb_busy++; 613 cfs_hash_bd_unlock(hs, &bd, 1); 614 return o; 615} 616 617/** 618 * Core logic of lu_object_find*() functions. 619 */ 620static struct lu_object *lu_object_find_try(const struct lu_env *env, 621 struct lu_device *dev, 622 const struct lu_fid *f, 623 const struct lu_object_conf *conf, 624 wait_queue_t *waiter) 625{ 626 struct lu_object *o; 627 struct lu_object *shadow; 628 struct lu_site *s; 629 struct cfs_hash *hs; 630 struct cfs_hash_bd bd; 631 __u64 version = 0; 632 633 /* 634 * This uses standard index maintenance protocol: 635 * 636 * - search index under lock, and return object if found; 637 * - otherwise, unlock index, allocate new object; 638 * - lock index and search again; 639 * - if nothing is found (usual case), insert newly created 640 * object into index; 641 * - otherwise (race: other thread inserted object), free 642 * object just allocated. 643 * - unlock index; 644 * - return object. 645 * 646 * For "LOC_F_NEW" case, we are sure the object is new established. 647 * It is unnecessary to perform lookup-alloc-lookup-insert, instead, 648 * just alloc and insert directly. 649 * 650 * If dying object is found during index search, add @waiter to the 651 * site wait-queue and return ERR_PTR(-EAGAIN). 652 */ 653 if (conf != NULL && conf->loc_flags & LOC_F_NEW) 654 return lu_object_new(env, dev, f, conf); 655 656 s = dev->ld_site; 657 hs = s->ls_obj_hash; 658 cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1); 659 o = htable_lookup(s, &bd, f, waiter, &version); 660 cfs_hash_bd_unlock(hs, &bd, 1); 661 if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT) 662 return o; 663 664 /* 665 * Allocate new object. This may result in rather complicated 666 * operations, including fld queries, inode loading, etc. 667 */ 668 o = lu_object_alloc(env, dev, f, conf); 669 if (unlikely(IS_ERR(o))) 670 return o; 671 672 LASSERT(lu_fid_eq(lu_object_fid(o), f)); 673 674 cfs_hash_bd_lock(hs, &bd, 1); 675 676 shadow = htable_lookup(s, &bd, f, waiter, &version); 677 if (likely(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT)) { 678 struct lu_site_bkt_data *bkt; 679 680 bkt = cfs_hash_bd_extra_get(hs, &bd); 681 cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash); 682 bkt->lsb_busy++; 683 cfs_hash_bd_unlock(hs, &bd, 1); 684 return o; 685 } 686 687 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE); 688 cfs_hash_bd_unlock(hs, &bd, 1); 689 lu_object_free(env, o); 690 return shadow; 691} 692 693/** 694 * Much like lu_object_find(), but top level device of object is specifically 695 * \a dev rather than top level device of the site. This interface allows 696 * objects of different "stacking" to be created within the same site. 697 */ 698struct lu_object *lu_object_find_at(const struct lu_env *env, 699 struct lu_device *dev, 700 const struct lu_fid *f, 701 const struct lu_object_conf *conf) 702{ 703 struct lu_site_bkt_data *bkt; 704 struct lu_object *obj; 705 wait_queue_t wait; 706 707 while (1) { 708 obj = lu_object_find_try(env, dev, f, conf, &wait); 709 if (obj != ERR_PTR(-EAGAIN)) 710 return obj; 711 /* 712 * lu_object_find_try() already added waiter into the 713 * wait queue. 714 */ 715 schedule(); 716 bkt = lu_site_bkt_from_fid(dev->ld_site, (void *)f); 717 remove_wait_queue(&bkt->lsb_marche_funebre, &wait); 718 } 719} 720EXPORT_SYMBOL(lu_object_find_at); 721 722/** 723 * Find object with given fid, and return its slice belonging to given device. 724 */ 725struct lu_object *lu_object_find_slice(const struct lu_env *env, 726 struct lu_device *dev, 727 const struct lu_fid *f, 728 const struct lu_object_conf *conf) 729{ 730 struct lu_object *top; 731 struct lu_object *obj; 732 733 top = lu_object_find(env, dev, f, conf); 734 if (!IS_ERR(top)) { 735 obj = lu_object_locate(top->lo_header, dev->ld_type); 736 if (obj == NULL) 737 lu_object_put(env, top); 738 } else 739 obj = top; 740 return obj; 741} 742EXPORT_SYMBOL(lu_object_find_slice); 743 744/** 745 * Global list of all device types. 746 */ 747static LIST_HEAD(lu_device_types); 748 749int lu_device_type_init(struct lu_device_type *ldt) 750{ 751 int result = 0; 752 753 INIT_LIST_HEAD(&ldt->ldt_linkage); 754 if (ldt->ldt_ops->ldto_init) 755 result = ldt->ldt_ops->ldto_init(ldt); 756 if (result == 0) 757 list_add(&ldt->ldt_linkage, &lu_device_types); 758 return result; 759} 760EXPORT_SYMBOL(lu_device_type_init); 761 762void lu_device_type_fini(struct lu_device_type *ldt) 763{ 764 list_del_init(&ldt->ldt_linkage); 765 if (ldt->ldt_ops->ldto_fini) 766 ldt->ldt_ops->ldto_fini(ldt); 767} 768EXPORT_SYMBOL(lu_device_type_fini); 769 770void lu_types_stop(void) 771{ 772 struct lu_device_type *ldt; 773 774 list_for_each_entry(ldt, &lu_device_types, ldt_linkage) { 775 if (ldt->ldt_device_nr == 0 && ldt->ldt_ops->ldto_stop) 776 ldt->ldt_ops->ldto_stop(ldt); 777 } 778} 779EXPORT_SYMBOL(lu_types_stop); 780 781/** 782 * Global list of all sites on this node 783 */ 784static LIST_HEAD(lu_sites); 785static DEFINE_MUTEX(lu_sites_guard); 786 787/** 788 * Global environment used by site shrinker. 789 */ 790static struct lu_env lu_shrink_env; 791 792struct lu_site_print_arg { 793 struct lu_env *lsp_env; 794 void *lsp_cookie; 795 lu_printer_t lsp_printer; 796}; 797 798static int 799lu_site_obj_print(struct cfs_hash *hs, struct cfs_hash_bd *bd, 800 struct hlist_node *hnode, void *data) 801{ 802 struct lu_site_print_arg *arg = (struct lu_site_print_arg *)data; 803 struct lu_object_header *h; 804 805 h = hlist_entry(hnode, struct lu_object_header, loh_hash); 806 if (!list_empty(&h->loh_layers)) { 807 const struct lu_object *o; 808 809 o = lu_object_top(h); 810 lu_object_print(arg->lsp_env, arg->lsp_cookie, 811 arg->lsp_printer, o); 812 } else { 813 lu_object_header_print(arg->lsp_env, arg->lsp_cookie, 814 arg->lsp_printer, h); 815 } 816 return 0; 817} 818 819/** 820 * Print all objects in \a s. 821 */ 822void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie, 823 lu_printer_t printer) 824{ 825 struct lu_site_print_arg arg = { 826 .lsp_env = (struct lu_env *)env, 827 .lsp_cookie = cookie, 828 .lsp_printer = printer, 829 }; 830 831 cfs_hash_for_each(s->ls_obj_hash, lu_site_obj_print, &arg); 832} 833EXPORT_SYMBOL(lu_site_print); 834 835enum { 836 LU_CACHE_PERCENT_MAX = 50, 837 LU_CACHE_PERCENT_DEFAULT = 20 838}; 839 840static unsigned int lu_cache_percent = LU_CACHE_PERCENT_DEFAULT; 841module_param(lu_cache_percent, int, 0644); 842MODULE_PARM_DESC(lu_cache_percent, "Percentage of memory to be used as lu_object cache"); 843 844/** 845 * Return desired hash table order. 846 */ 847static int lu_htable_order(void) 848{ 849 unsigned long cache_size; 850 int bits; 851 852 /* 853 * Calculate hash table size, assuming that we want reasonable 854 * performance when 20% of total memory is occupied by cache of 855 * lu_objects. 856 * 857 * Size of lu_object is (arbitrary) taken as 1K (together with inode). 858 */ 859 cache_size = totalram_pages; 860 861#if BITS_PER_LONG == 32 862 /* limit hashtable size for lowmem systems to low RAM */ 863 if (cache_size > 1 << (30 - PAGE_CACHE_SHIFT)) 864 cache_size = 1 << (30 - PAGE_CACHE_SHIFT) * 3 / 4; 865#endif 866 867 /* clear off unreasonable cache setting. */ 868 if (lu_cache_percent == 0 || lu_cache_percent > LU_CACHE_PERCENT_MAX) { 869 CWARN("obdclass: invalid lu_cache_percent: %u, it must be in" 870 " the range of (0, %u]. Will use default value: %u.\n", 871 lu_cache_percent, LU_CACHE_PERCENT_MAX, 872 LU_CACHE_PERCENT_DEFAULT); 873 874 lu_cache_percent = LU_CACHE_PERCENT_DEFAULT; 875 } 876 cache_size = cache_size / 100 * lu_cache_percent * 877 (PAGE_CACHE_SIZE / 1024); 878 879 for (bits = 1; (1 << bits) < cache_size; ++bits) { 880 ; 881 } 882 return bits; 883} 884 885static unsigned lu_obj_hop_hash(struct cfs_hash *hs, 886 const void *key, unsigned mask) 887{ 888 struct lu_fid *fid = (struct lu_fid *)key; 889 __u32 hash; 890 891 hash = fid_flatten32(fid); 892 hash += (hash >> 4) + (hash << 12); /* mixing oid and seq */ 893 hash = hash_long(hash, hs->hs_bkt_bits); 894 895 /* give me another random factor */ 896 hash -= hash_long((unsigned long)hs, fid_oid(fid) % 11 + 3); 897 898 hash <<= hs->hs_cur_bits - hs->hs_bkt_bits; 899 hash |= (fid_seq(fid) + fid_oid(fid)) & (CFS_HASH_NBKT(hs) - 1); 900 901 return hash & mask; 902} 903 904static void *lu_obj_hop_object(struct hlist_node *hnode) 905{ 906 return hlist_entry(hnode, struct lu_object_header, loh_hash); 907} 908 909static void *lu_obj_hop_key(struct hlist_node *hnode) 910{ 911 struct lu_object_header *h; 912 913 h = hlist_entry(hnode, struct lu_object_header, loh_hash); 914 return &h->loh_fid; 915} 916 917static int lu_obj_hop_keycmp(const void *key, struct hlist_node *hnode) 918{ 919 struct lu_object_header *h; 920 921 h = hlist_entry(hnode, struct lu_object_header, loh_hash); 922 return lu_fid_eq(&h->loh_fid, (struct lu_fid *)key); 923} 924 925static void lu_obj_hop_get(struct cfs_hash *hs, struct hlist_node *hnode) 926{ 927 struct lu_object_header *h; 928 929 h = hlist_entry(hnode, struct lu_object_header, loh_hash); 930 if (atomic_add_return(1, &h->loh_ref) == 1) { 931 struct lu_site_bkt_data *bkt; 932 struct cfs_hash_bd bd; 933 934 cfs_hash_bd_get(hs, &h->loh_fid, &bd); 935 bkt = cfs_hash_bd_extra_get(hs, &bd); 936 bkt->lsb_busy++; 937 } 938} 939 940static void lu_obj_hop_put_locked(struct cfs_hash *hs, struct hlist_node *hnode) 941{ 942 LBUG(); /* we should never called it */ 943} 944 945cfs_hash_ops_t lu_site_hash_ops = { 946 .hs_hash = lu_obj_hop_hash, 947 .hs_key = lu_obj_hop_key, 948 .hs_keycmp = lu_obj_hop_keycmp, 949 .hs_object = lu_obj_hop_object, 950 .hs_get = lu_obj_hop_get, 951 .hs_put_locked = lu_obj_hop_put_locked, 952}; 953 954void lu_dev_add_linkage(struct lu_site *s, struct lu_device *d) 955{ 956 spin_lock(&s->ls_ld_lock); 957 if (list_empty(&d->ld_linkage)) 958 list_add(&d->ld_linkage, &s->ls_ld_linkage); 959 spin_unlock(&s->ls_ld_lock); 960} 961EXPORT_SYMBOL(lu_dev_add_linkage); 962 963void lu_dev_del_linkage(struct lu_site *s, struct lu_device *d) 964{ 965 spin_lock(&s->ls_ld_lock); 966 list_del_init(&d->ld_linkage); 967 spin_unlock(&s->ls_ld_lock); 968} 969EXPORT_SYMBOL(lu_dev_del_linkage); 970 971/** 972 * Initialize site \a s, with \a d as the top level device. 973 */ 974#define LU_SITE_BITS_MIN 12 975#define LU_SITE_BITS_MAX 24 976/** 977 * total 256 buckets, we don't want too many buckets because: 978 * - consume too much memory 979 * - avoid unbalanced LRU list 980 */ 981#define LU_SITE_BKT_BITS 8 982 983int lu_site_init(struct lu_site *s, struct lu_device *top) 984{ 985 struct lu_site_bkt_data *bkt; 986 struct cfs_hash_bd bd; 987 char name[16]; 988 int bits; 989 int i; 990 991 memset(s, 0, sizeof(*s)); 992 bits = lu_htable_order(); 993 snprintf(name, 16, "lu_site_%s", top->ld_type->ldt_name); 994 for (bits = min(max(LU_SITE_BITS_MIN, bits), LU_SITE_BITS_MAX); 995 bits >= LU_SITE_BITS_MIN; bits--) { 996 s->ls_obj_hash = cfs_hash_create(name, bits, bits, 997 bits - LU_SITE_BKT_BITS, 998 sizeof(*bkt), 0, 0, 999 &lu_site_hash_ops, 1000 CFS_HASH_SPIN_BKTLOCK | 1001 CFS_HASH_NO_ITEMREF | 1002 CFS_HASH_DEPTH | 1003 CFS_HASH_ASSERT_EMPTY); 1004 if (s->ls_obj_hash != NULL) 1005 break; 1006 } 1007 1008 if (s->ls_obj_hash == NULL) { 1009 CERROR("failed to create lu_site hash with bits: %d\n", bits); 1010 return -ENOMEM; 1011 } 1012 1013 cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) { 1014 bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd); 1015 INIT_LIST_HEAD(&bkt->lsb_lru); 1016 init_waitqueue_head(&bkt->lsb_marche_funebre); 1017 } 1018 1019 s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0); 1020 if (s->ls_stats == NULL) { 1021 cfs_hash_putref(s->ls_obj_hash); 1022 s->ls_obj_hash = NULL; 1023 return -ENOMEM; 1024 } 1025 1026 lprocfs_counter_init(s->ls_stats, LU_SS_CREATED, 1027 0, "created", "created"); 1028 lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_HIT, 1029 0, "cache_hit", "cache_hit"); 1030 lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_MISS, 1031 0, "cache_miss", "cache_miss"); 1032 lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_RACE, 1033 0, "cache_race", "cache_race"); 1034 lprocfs_counter_init(s->ls_stats, LU_SS_CACHE_DEATH_RACE, 1035 0, "cache_death_race", "cache_death_race"); 1036 lprocfs_counter_init(s->ls_stats, LU_SS_LRU_PURGED, 1037 0, "lru_purged", "lru_purged"); 1038 1039 INIT_LIST_HEAD(&s->ls_linkage); 1040 s->ls_top_dev = top; 1041 top->ld_site = s; 1042 lu_device_get(top); 1043 lu_ref_add(&top->ld_reference, "site-top", s); 1044 1045 INIT_LIST_HEAD(&s->ls_ld_linkage); 1046 spin_lock_init(&s->ls_ld_lock); 1047 1048 lu_dev_add_linkage(s, top); 1049 1050 return 0; 1051} 1052EXPORT_SYMBOL(lu_site_init); 1053 1054/** 1055 * Finalize \a s and release its resources. 1056 */ 1057void lu_site_fini(struct lu_site *s) 1058{ 1059 mutex_lock(&lu_sites_guard); 1060 list_del_init(&s->ls_linkage); 1061 mutex_unlock(&lu_sites_guard); 1062 1063 if (s->ls_obj_hash != NULL) { 1064 cfs_hash_putref(s->ls_obj_hash); 1065 s->ls_obj_hash = NULL; 1066 } 1067 1068 if (s->ls_top_dev != NULL) { 1069 s->ls_top_dev->ld_site = NULL; 1070 lu_ref_del(&s->ls_top_dev->ld_reference, "site-top", s); 1071 lu_device_put(s->ls_top_dev); 1072 s->ls_top_dev = NULL; 1073 } 1074 1075 if (s->ls_stats != NULL) 1076 lprocfs_free_stats(&s->ls_stats); 1077} 1078EXPORT_SYMBOL(lu_site_fini); 1079 1080/** 1081 * Called when initialization of stack for this site is completed. 1082 */ 1083int lu_site_init_finish(struct lu_site *s) 1084{ 1085 int result; 1086 mutex_lock(&lu_sites_guard); 1087 result = lu_context_refill(&lu_shrink_env.le_ctx); 1088 if (result == 0) 1089 list_add(&s->ls_linkage, &lu_sites); 1090 mutex_unlock(&lu_sites_guard); 1091 return result; 1092} 1093EXPORT_SYMBOL(lu_site_init_finish); 1094 1095/** 1096 * Acquire additional reference on device \a d 1097 */ 1098void lu_device_get(struct lu_device *d) 1099{ 1100 atomic_inc(&d->ld_ref); 1101} 1102EXPORT_SYMBOL(lu_device_get); 1103 1104/** 1105 * Release reference on device \a d. 1106 */ 1107void lu_device_put(struct lu_device *d) 1108{ 1109 LASSERT(atomic_read(&d->ld_ref) > 0); 1110 atomic_dec(&d->ld_ref); 1111} 1112EXPORT_SYMBOL(lu_device_put); 1113 1114/** 1115 * Initialize device \a d of type \a t. 1116 */ 1117int lu_device_init(struct lu_device *d, struct lu_device_type *t) 1118{ 1119 if (t->ldt_device_nr++ == 0 && t->ldt_ops->ldto_start != NULL) 1120 t->ldt_ops->ldto_start(t); 1121 memset(d, 0, sizeof(*d)); 1122 atomic_set(&d->ld_ref, 0); 1123 d->ld_type = t; 1124 lu_ref_init(&d->ld_reference); 1125 INIT_LIST_HEAD(&d->ld_linkage); 1126 return 0; 1127} 1128EXPORT_SYMBOL(lu_device_init); 1129 1130/** 1131 * Finalize device \a d. 1132 */ 1133void lu_device_fini(struct lu_device *d) 1134{ 1135 struct lu_device_type *t; 1136 1137 t = d->ld_type; 1138 if (d->ld_obd != NULL) { 1139 d->ld_obd->obd_lu_dev = NULL; 1140 d->ld_obd = NULL; 1141 } 1142 1143 lu_ref_fini(&d->ld_reference); 1144 LASSERTF(atomic_read(&d->ld_ref) == 0, 1145 "Refcount is %u\n", atomic_read(&d->ld_ref)); 1146 LASSERT(t->ldt_device_nr > 0); 1147 if (--t->ldt_device_nr == 0 && t->ldt_ops->ldto_stop != NULL) 1148 t->ldt_ops->ldto_stop(t); 1149} 1150EXPORT_SYMBOL(lu_device_fini); 1151 1152/** 1153 * Initialize object \a o that is part of compound object \a h and was created 1154 * by device \a d. 1155 */ 1156int lu_object_init(struct lu_object *o, struct lu_object_header *h, 1157 struct lu_device *d) 1158{ 1159 memset(o, 0, sizeof(*o)); 1160 o->lo_header = h; 1161 o->lo_dev = d; 1162 lu_device_get(d); 1163 lu_ref_add_at(&d->ld_reference, &o->lo_dev_ref, "lu_object", o); 1164 INIT_LIST_HEAD(&o->lo_linkage); 1165 1166 return 0; 1167} 1168EXPORT_SYMBOL(lu_object_init); 1169 1170/** 1171 * Finalize object and release its resources. 1172 */ 1173void lu_object_fini(struct lu_object *o) 1174{ 1175 struct lu_device *dev = o->lo_dev; 1176 1177 LASSERT(list_empty(&o->lo_linkage)); 1178 1179 if (dev != NULL) { 1180 lu_ref_del_at(&dev->ld_reference, &o->lo_dev_ref, 1181 "lu_object", o); 1182 lu_device_put(dev); 1183 o->lo_dev = NULL; 1184 } 1185} 1186EXPORT_SYMBOL(lu_object_fini); 1187 1188/** 1189 * Add object \a o as first layer of compound object \a h 1190 * 1191 * This is typically called by the ->ldo_object_alloc() method of top-level 1192 * device. 1193 */ 1194void lu_object_add_top(struct lu_object_header *h, struct lu_object *o) 1195{ 1196 list_move(&o->lo_linkage, &h->loh_layers); 1197} 1198EXPORT_SYMBOL(lu_object_add_top); 1199 1200/** 1201 * Add object \a o as a layer of compound object, going after \a before. 1202 * 1203 * This is typically called by the ->ldo_object_alloc() method of \a 1204 * before->lo_dev. 1205 */ 1206void lu_object_add(struct lu_object *before, struct lu_object *o) 1207{ 1208 list_move(&o->lo_linkage, &before->lo_linkage); 1209} 1210EXPORT_SYMBOL(lu_object_add); 1211 1212/** 1213 * Initialize compound object. 1214 */ 1215int lu_object_header_init(struct lu_object_header *h) 1216{ 1217 memset(h, 0, sizeof(*h)); 1218 atomic_set(&h->loh_ref, 1); 1219 INIT_HLIST_NODE(&h->loh_hash); 1220 INIT_LIST_HEAD(&h->loh_lru); 1221 INIT_LIST_HEAD(&h->loh_layers); 1222 lu_ref_init(&h->loh_reference); 1223 return 0; 1224} 1225EXPORT_SYMBOL(lu_object_header_init); 1226 1227/** 1228 * Finalize compound object. 1229 */ 1230void lu_object_header_fini(struct lu_object_header *h) 1231{ 1232 LASSERT(list_empty(&h->loh_layers)); 1233 LASSERT(list_empty(&h->loh_lru)); 1234 LASSERT(hlist_unhashed(&h->loh_hash)); 1235 lu_ref_fini(&h->loh_reference); 1236} 1237EXPORT_SYMBOL(lu_object_header_fini); 1238 1239/** 1240 * Given a compound object, find its slice, corresponding to the device type 1241 * \a dtype. 1242 */ 1243struct lu_object *lu_object_locate(struct lu_object_header *h, 1244 const struct lu_device_type *dtype) 1245{ 1246 struct lu_object *o; 1247 1248 list_for_each_entry(o, &h->loh_layers, lo_linkage) { 1249 if (o->lo_dev->ld_type == dtype) 1250 return o; 1251 } 1252 return NULL; 1253} 1254EXPORT_SYMBOL(lu_object_locate); 1255 1256 1257 1258/** 1259 * Finalize and free devices in the device stack. 1260 * 1261 * Finalize device stack by purging object cache, and calling 1262 * lu_device_type_operations::ldto_device_fini() and 1263 * lu_device_type_operations::ldto_device_free() on all devices in the stack. 1264 */ 1265void lu_stack_fini(const struct lu_env *env, struct lu_device *top) 1266{ 1267 struct lu_site *site = top->ld_site; 1268 struct lu_device *scan; 1269 struct lu_device *next; 1270 1271 lu_site_purge(env, site, ~0); 1272 for (scan = top; scan != NULL; scan = next) { 1273 next = scan->ld_type->ldt_ops->ldto_device_fini(env, scan); 1274 lu_ref_del(&scan->ld_reference, "lu-stack", &lu_site_init); 1275 lu_device_put(scan); 1276 } 1277 1278 /* purge again. */ 1279 lu_site_purge(env, site, ~0); 1280 1281 for (scan = top; scan != NULL; scan = next) { 1282 const struct lu_device_type *ldt = scan->ld_type; 1283 struct obd_type *type; 1284 1285 next = ldt->ldt_ops->ldto_device_free(env, scan); 1286 type = ldt->ldt_obd_type; 1287 if (type != NULL) { 1288 type->typ_refcnt--; 1289 class_put_type(type); 1290 } 1291 } 1292} 1293EXPORT_SYMBOL(lu_stack_fini); 1294 1295enum { 1296 /** 1297 * Maximal number of tld slots. 1298 */ 1299 LU_CONTEXT_KEY_NR = 40 1300}; 1301 1302static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, }; 1303 1304static DEFINE_SPINLOCK(lu_keys_guard); 1305 1306/** 1307 * Global counter incremented whenever key is registered, unregistered, 1308 * revived or quiesced. This is used to void unnecessary calls to 1309 * lu_context_refill(). No locking is provided, as initialization and shutdown 1310 * are supposed to be externally serialized. 1311 */ 1312static unsigned key_set_version = 0; 1313 1314/** 1315 * Register new key. 1316 */ 1317int lu_context_key_register(struct lu_context_key *key) 1318{ 1319 int result; 1320 int i; 1321 1322 LASSERT(key->lct_init != NULL); 1323 LASSERT(key->lct_fini != NULL); 1324 LASSERT(key->lct_tags != 0); 1325 1326 result = -ENFILE; 1327 spin_lock(&lu_keys_guard); 1328 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) { 1329 if (lu_keys[i] == NULL) { 1330 key->lct_index = i; 1331 atomic_set(&key->lct_used, 1); 1332 lu_keys[i] = key; 1333 lu_ref_init(&key->lct_reference); 1334 result = 0; 1335 ++key_set_version; 1336 break; 1337 } 1338 } 1339 spin_unlock(&lu_keys_guard); 1340 return result; 1341} 1342EXPORT_SYMBOL(lu_context_key_register); 1343 1344static void key_fini(struct lu_context *ctx, int index) 1345{ 1346 if (ctx->lc_value != NULL && ctx->lc_value[index] != NULL) { 1347 struct lu_context_key *key; 1348 1349 key = lu_keys[index]; 1350 LASSERT(key != NULL); 1351 LASSERT(key->lct_fini != NULL); 1352 LASSERT(atomic_read(&key->lct_used) > 1); 1353 1354 key->lct_fini(ctx, key, ctx->lc_value[index]); 1355 lu_ref_del(&key->lct_reference, "ctx", ctx); 1356 atomic_dec(&key->lct_used); 1357 1358 if ((ctx->lc_tags & LCT_NOREF) == 0) { 1359#ifdef CONFIG_MODULE_UNLOAD 1360 LINVRNT(module_refcount(key->lct_owner) > 0); 1361#endif 1362 module_put(key->lct_owner); 1363 } 1364 ctx->lc_value[index] = NULL; 1365 } 1366} 1367 1368/** 1369 * Deregister key. 1370 */ 1371void lu_context_key_degister(struct lu_context_key *key) 1372{ 1373 LASSERT(atomic_read(&key->lct_used) >= 1); 1374 LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys)); 1375 1376 lu_context_key_quiesce(key); 1377 1378 ++key_set_version; 1379 spin_lock(&lu_keys_guard); 1380 key_fini(&lu_shrink_env.le_ctx, key->lct_index); 1381 if (lu_keys[key->lct_index]) { 1382 lu_keys[key->lct_index] = NULL; 1383 lu_ref_fini(&key->lct_reference); 1384 } 1385 spin_unlock(&lu_keys_guard); 1386 1387 LASSERTF(atomic_read(&key->lct_used) == 1, 1388 "key has instances: %d\n", 1389 atomic_read(&key->lct_used)); 1390} 1391EXPORT_SYMBOL(lu_context_key_degister); 1392 1393/** 1394 * Register a number of keys. This has to be called after all keys have been 1395 * initialized by a call to LU_CONTEXT_KEY_INIT(). 1396 */ 1397int lu_context_key_register_many(struct lu_context_key *k, ...) 1398{ 1399 struct lu_context_key *key = k; 1400 va_list args; 1401 int result; 1402 1403 va_start(args, k); 1404 do { 1405 result = lu_context_key_register(key); 1406 if (result) 1407 break; 1408 key = va_arg(args, struct lu_context_key *); 1409 } while (key != NULL); 1410 va_end(args); 1411 1412 if (result != 0) { 1413 va_start(args, k); 1414 while (k != key) { 1415 lu_context_key_degister(k); 1416 k = va_arg(args, struct lu_context_key *); 1417 } 1418 va_end(args); 1419 } 1420 1421 return result; 1422} 1423EXPORT_SYMBOL(lu_context_key_register_many); 1424 1425/** 1426 * De-register a number of keys. This is a dual to 1427 * lu_context_key_register_many(). 1428 */ 1429void lu_context_key_degister_many(struct lu_context_key *k, ...) 1430{ 1431 va_list args; 1432 1433 va_start(args, k); 1434 do { 1435 lu_context_key_degister(k); 1436 k = va_arg(args, struct lu_context_key*); 1437 } while (k != NULL); 1438 va_end(args); 1439} 1440EXPORT_SYMBOL(lu_context_key_degister_many); 1441 1442/** 1443 * Revive a number of keys. 1444 */ 1445void lu_context_key_revive_many(struct lu_context_key *k, ...) 1446{ 1447 va_list args; 1448 1449 va_start(args, k); 1450 do { 1451 lu_context_key_revive(k); 1452 k = va_arg(args, struct lu_context_key*); 1453 } while (k != NULL); 1454 va_end(args); 1455} 1456EXPORT_SYMBOL(lu_context_key_revive_many); 1457 1458/** 1459 * Quiescent a number of keys. 1460 */ 1461void lu_context_key_quiesce_many(struct lu_context_key *k, ...) 1462{ 1463 va_list args; 1464 1465 va_start(args, k); 1466 do { 1467 lu_context_key_quiesce(k); 1468 k = va_arg(args, struct lu_context_key*); 1469 } while (k != NULL); 1470 va_end(args); 1471} 1472EXPORT_SYMBOL(lu_context_key_quiesce_many); 1473 1474/** 1475 * Return value associated with key \a key in context \a ctx. 1476 */ 1477void *lu_context_key_get(const struct lu_context *ctx, 1478 const struct lu_context_key *key) 1479{ 1480 LINVRNT(ctx->lc_state == LCS_ENTERED); 1481 LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys)); 1482 LASSERT(lu_keys[key->lct_index] == key); 1483 return ctx->lc_value[key->lct_index]; 1484} 1485EXPORT_SYMBOL(lu_context_key_get); 1486 1487/** 1488 * List of remembered contexts. XXX document me. 1489 */ 1490static LIST_HEAD(lu_context_remembered); 1491 1492/** 1493 * Destroy \a key in all remembered contexts. This is used to destroy key 1494 * values in "shared" contexts (like service threads), when a module owning 1495 * the key is about to be unloaded. 1496 */ 1497void lu_context_key_quiesce(struct lu_context_key *key) 1498{ 1499 struct lu_context *ctx; 1500 1501 if (!(key->lct_tags & LCT_QUIESCENT)) { 1502 /* 1503 * XXX layering violation. 1504 */ 1505 key->lct_tags |= LCT_QUIESCENT; 1506 /* 1507 * XXX memory barrier has to go here. 1508 */ 1509 spin_lock(&lu_keys_guard); 1510 list_for_each_entry(ctx, &lu_context_remembered, 1511 lc_remember) 1512 key_fini(ctx, key->lct_index); 1513 spin_unlock(&lu_keys_guard); 1514 ++key_set_version; 1515 } 1516} 1517EXPORT_SYMBOL(lu_context_key_quiesce); 1518 1519void lu_context_key_revive(struct lu_context_key *key) 1520{ 1521 key->lct_tags &= ~LCT_QUIESCENT; 1522 ++key_set_version; 1523} 1524EXPORT_SYMBOL(lu_context_key_revive); 1525 1526static void keys_fini(struct lu_context *ctx) 1527{ 1528 int i; 1529 1530 if (ctx->lc_value == NULL) 1531 return; 1532 1533 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) 1534 key_fini(ctx, i); 1535 1536 OBD_FREE(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof(ctx->lc_value[0])); 1537 ctx->lc_value = NULL; 1538} 1539 1540static int keys_fill(struct lu_context *ctx) 1541{ 1542 int i; 1543 1544 LINVRNT(ctx->lc_value != NULL); 1545 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) { 1546 struct lu_context_key *key; 1547 1548 key = lu_keys[i]; 1549 if (ctx->lc_value[i] == NULL && key != NULL && 1550 (key->lct_tags & ctx->lc_tags) && 1551 /* 1552 * Don't create values for a LCT_QUIESCENT key, as this 1553 * will pin module owning a key. 1554 */ 1555 !(key->lct_tags & LCT_QUIESCENT)) { 1556 void *value; 1557 1558 LINVRNT(key->lct_init != NULL); 1559 LINVRNT(key->lct_index == i); 1560 1561 value = key->lct_init(ctx, key); 1562 if (unlikely(IS_ERR(value))) 1563 return PTR_ERR(value); 1564 1565 if (!(ctx->lc_tags & LCT_NOREF)) 1566 try_module_get(key->lct_owner); 1567 lu_ref_add_atomic(&key->lct_reference, "ctx", ctx); 1568 atomic_inc(&key->lct_used); 1569 /* 1570 * This is the only place in the code, where an 1571 * element of ctx->lc_value[] array is set to non-NULL 1572 * value. 1573 */ 1574 ctx->lc_value[i] = value; 1575 if (key->lct_exit != NULL) 1576 ctx->lc_tags |= LCT_HAS_EXIT; 1577 } 1578 ctx->lc_version = key_set_version; 1579 } 1580 return 0; 1581} 1582 1583static int keys_init(struct lu_context *ctx) 1584{ 1585 OBD_ALLOC(ctx->lc_value, 1586 ARRAY_SIZE(lu_keys) * sizeof(ctx->lc_value[0])); 1587 if (likely(ctx->lc_value != NULL)) 1588 return keys_fill(ctx); 1589 1590 return -ENOMEM; 1591} 1592 1593/** 1594 * Initialize context data-structure. Create values for all keys. 1595 */ 1596int lu_context_init(struct lu_context *ctx, __u32 tags) 1597{ 1598 int rc; 1599 1600 memset(ctx, 0, sizeof(*ctx)); 1601 ctx->lc_state = LCS_INITIALIZED; 1602 ctx->lc_tags = tags; 1603 if (tags & LCT_REMEMBER) { 1604 spin_lock(&lu_keys_guard); 1605 list_add(&ctx->lc_remember, &lu_context_remembered); 1606 spin_unlock(&lu_keys_guard); 1607 } else { 1608 INIT_LIST_HEAD(&ctx->lc_remember); 1609 } 1610 1611 rc = keys_init(ctx); 1612 if (rc != 0) 1613 lu_context_fini(ctx); 1614 1615 return rc; 1616} 1617EXPORT_SYMBOL(lu_context_init); 1618 1619/** 1620 * Finalize context data-structure. Destroy key values. 1621 */ 1622void lu_context_fini(struct lu_context *ctx) 1623{ 1624 LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT); 1625 ctx->lc_state = LCS_FINALIZED; 1626 1627 if ((ctx->lc_tags & LCT_REMEMBER) == 0) { 1628 LASSERT(list_empty(&ctx->lc_remember)); 1629 keys_fini(ctx); 1630 1631 } else { /* could race with key degister */ 1632 spin_lock(&lu_keys_guard); 1633 keys_fini(ctx); 1634 list_del_init(&ctx->lc_remember); 1635 spin_unlock(&lu_keys_guard); 1636 } 1637} 1638EXPORT_SYMBOL(lu_context_fini); 1639 1640/** 1641 * Called before entering context. 1642 */ 1643void lu_context_enter(struct lu_context *ctx) 1644{ 1645 LINVRNT(ctx->lc_state == LCS_INITIALIZED || ctx->lc_state == LCS_LEFT); 1646 ctx->lc_state = LCS_ENTERED; 1647} 1648EXPORT_SYMBOL(lu_context_enter); 1649 1650/** 1651 * Called after exiting from \a ctx 1652 */ 1653void lu_context_exit(struct lu_context *ctx) 1654{ 1655 int i; 1656 1657 LINVRNT(ctx->lc_state == LCS_ENTERED); 1658 ctx->lc_state = LCS_LEFT; 1659 if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value != NULL) { 1660 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) { 1661 if (ctx->lc_value[i] != NULL) { 1662 struct lu_context_key *key; 1663 1664 key = lu_keys[i]; 1665 LASSERT(key != NULL); 1666 if (key->lct_exit != NULL) 1667 key->lct_exit(ctx, 1668 key, ctx->lc_value[i]); 1669 } 1670 } 1671 } 1672} 1673EXPORT_SYMBOL(lu_context_exit); 1674 1675/** 1676 * Allocate for context all missing keys that were registered after context 1677 * creation. key_set_version is only changed in rare cases when modules 1678 * are loaded and removed. 1679 */ 1680int lu_context_refill(struct lu_context *ctx) 1681{ 1682 return likely(ctx->lc_version == key_set_version) ? 0 : keys_fill(ctx); 1683} 1684EXPORT_SYMBOL(lu_context_refill); 1685 1686/** 1687 * lu_ctx_tags/lu_ses_tags will be updated if there are new types of 1688 * obd being added. Currently, this is only used on client side, specifically 1689 * for echo device client, for other stack (like ptlrpc threads), context are 1690 * predefined when the lu_device type are registered, during the module probe 1691 * phase. 1692 */ 1693__u32 lu_context_tags_default = 0; 1694__u32 lu_session_tags_default = 0; 1695 1696void lu_context_tags_update(__u32 tags) 1697{ 1698 spin_lock(&lu_keys_guard); 1699 lu_context_tags_default |= tags; 1700 key_set_version++; 1701 spin_unlock(&lu_keys_guard); 1702} 1703EXPORT_SYMBOL(lu_context_tags_update); 1704 1705void lu_context_tags_clear(__u32 tags) 1706{ 1707 spin_lock(&lu_keys_guard); 1708 lu_context_tags_default &= ~tags; 1709 key_set_version++; 1710 spin_unlock(&lu_keys_guard); 1711} 1712EXPORT_SYMBOL(lu_context_tags_clear); 1713 1714void lu_session_tags_update(__u32 tags) 1715{ 1716 spin_lock(&lu_keys_guard); 1717 lu_session_tags_default |= tags; 1718 key_set_version++; 1719 spin_unlock(&lu_keys_guard); 1720} 1721EXPORT_SYMBOL(lu_session_tags_update); 1722 1723void lu_session_tags_clear(__u32 tags) 1724{ 1725 spin_lock(&lu_keys_guard); 1726 lu_session_tags_default &= ~tags; 1727 key_set_version++; 1728 spin_unlock(&lu_keys_guard); 1729} 1730EXPORT_SYMBOL(lu_session_tags_clear); 1731 1732int lu_env_init(struct lu_env *env, __u32 tags) 1733{ 1734 int result; 1735 1736 env->le_ses = NULL; 1737 result = lu_context_init(&env->le_ctx, tags); 1738 if (likely(result == 0)) 1739 lu_context_enter(&env->le_ctx); 1740 return result; 1741} 1742EXPORT_SYMBOL(lu_env_init); 1743 1744void lu_env_fini(struct lu_env *env) 1745{ 1746 lu_context_exit(&env->le_ctx); 1747 lu_context_fini(&env->le_ctx); 1748 env->le_ses = NULL; 1749} 1750EXPORT_SYMBOL(lu_env_fini); 1751 1752int lu_env_refill(struct lu_env *env) 1753{ 1754 int result; 1755 1756 result = lu_context_refill(&env->le_ctx); 1757 if (result == 0 && env->le_ses != NULL) 1758 result = lu_context_refill(env->le_ses); 1759 return result; 1760} 1761EXPORT_SYMBOL(lu_env_refill); 1762 1763/** 1764 * Currently, this API will only be used by echo client. 1765 * Because echo client and normal lustre client will share 1766 * same cl_env cache. So echo client needs to refresh 1767 * the env context after it get one from the cache, especially 1768 * when normal client and echo client co-exist in the same client. 1769 */ 1770int lu_env_refill_by_tags(struct lu_env *env, __u32 ctags, 1771 __u32 stags) 1772{ 1773 int result; 1774 1775 if ((env->le_ctx.lc_tags & ctags) != ctags) { 1776 env->le_ctx.lc_version = 0; 1777 env->le_ctx.lc_tags |= ctags; 1778 } 1779 1780 if (env->le_ses && (env->le_ses->lc_tags & stags) != stags) { 1781 env->le_ses->lc_version = 0; 1782 env->le_ses->lc_tags |= stags; 1783 } 1784 1785 result = lu_env_refill(env); 1786 1787 return result; 1788} 1789EXPORT_SYMBOL(lu_env_refill_by_tags); 1790 1791 1792typedef struct lu_site_stats{ 1793 unsigned lss_populated; 1794 unsigned lss_max_search; 1795 unsigned lss_total; 1796 unsigned lss_busy; 1797} lu_site_stats_t; 1798 1799static void lu_site_stats_get(struct cfs_hash *hs, 1800 lu_site_stats_t *stats, int populated) 1801{ 1802 struct cfs_hash_bd bd; 1803 int i; 1804 1805 cfs_hash_for_each_bucket(hs, &bd, i) { 1806 struct lu_site_bkt_data *bkt = cfs_hash_bd_extra_get(hs, &bd); 1807 struct hlist_head *hhead; 1808 1809 cfs_hash_bd_lock(hs, &bd, 1); 1810 stats->lss_busy += bkt->lsb_busy; 1811 stats->lss_total += cfs_hash_bd_count_get(&bd); 1812 stats->lss_max_search = max((int)stats->lss_max_search, 1813 cfs_hash_bd_depmax_get(&bd)); 1814 if (!populated) { 1815 cfs_hash_bd_unlock(hs, &bd, 1); 1816 continue; 1817 } 1818 1819 cfs_hash_bd_for_each_hlist(hs, &bd, hhead) { 1820 if (!hlist_empty(hhead)) 1821 stats->lss_populated++; 1822 } 1823 cfs_hash_bd_unlock(hs, &bd, 1); 1824 } 1825} 1826 1827 1828/* 1829 * There exists a potential lock inversion deadlock scenario when using 1830 * Lustre on top of ZFS. This occurs between one of ZFS's 1831 * buf_hash_table.ht_lock's, and Lustre's lu_sites_guard lock. Essentially, 1832 * thread A will take the lu_sites_guard lock and sleep on the ht_lock, 1833 * while thread B will take the ht_lock and sleep on the lu_sites_guard 1834 * lock. Obviously neither thread will wake and drop their respective hold 1835 * on their lock. 1836 * 1837 * To prevent this from happening we must ensure the lu_sites_guard lock is 1838 * not taken while down this code path. ZFS reliably does not set the 1839 * __GFP_FS bit in its code paths, so this can be used to determine if it 1840 * is safe to take the lu_sites_guard lock. 1841 * 1842 * Ideally we should accurately return the remaining number of cached 1843 * objects without taking the lu_sites_guard lock, but this is not 1844 * possible in the current implementation. 1845 */ 1846static unsigned long lu_cache_shrink_count(struct shrinker *sk, 1847 struct shrink_control *sc) 1848{ 1849 lu_site_stats_t stats; 1850 struct lu_site *s; 1851 struct lu_site *tmp; 1852 unsigned long cached = 0; 1853 1854 if (!(sc->gfp_mask & __GFP_FS)) 1855 return 0; 1856 1857 mutex_lock(&lu_sites_guard); 1858 list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) { 1859 memset(&stats, 0, sizeof(stats)); 1860 lu_site_stats_get(s->ls_obj_hash, &stats, 0); 1861 cached += stats.lss_total - stats.lss_busy; 1862 } 1863 mutex_unlock(&lu_sites_guard); 1864 1865 cached = (cached / 100) * sysctl_vfs_cache_pressure; 1866 CDEBUG(D_INODE, "%ld objects cached\n", cached); 1867 return cached; 1868} 1869 1870static unsigned long lu_cache_shrink_scan(struct shrinker *sk, 1871 struct shrink_control *sc) 1872{ 1873 struct lu_site *s; 1874 struct lu_site *tmp; 1875 unsigned long remain = sc->nr_to_scan, freed = 0; 1876 LIST_HEAD(splice); 1877 1878 if (!(sc->gfp_mask & __GFP_FS)) 1879 /* We must not take the lu_sites_guard lock when 1880 * __GFP_FS is *not* set because of the deadlock 1881 * possibility detailed above. Additionally, 1882 * since we cannot determine the number of 1883 * objects in the cache without taking this 1884 * lock, we're in a particularly tough spot. As 1885 * a result, we'll just lie and say our cache is 1886 * empty. This _should_ be ok, as we can't 1887 * reclaim objects when __GFP_FS is *not* set 1888 * anyways. 1889 */ 1890 return SHRINK_STOP; 1891 1892 mutex_lock(&lu_sites_guard); 1893 list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) { 1894 freed = lu_site_purge(&lu_shrink_env, s, remain); 1895 remain -= freed; 1896 /* 1897 * Move just shrunk site to the tail of site list to 1898 * assure shrinking fairness. 1899 */ 1900 list_move_tail(&s->ls_linkage, &splice); 1901 } 1902 list_splice(&splice, lu_sites.prev); 1903 mutex_unlock(&lu_sites_guard); 1904 1905 return sc->nr_to_scan - remain; 1906} 1907 1908/* 1909 * Debugging stuff. 1910 */ 1911 1912/** 1913 * Environment to be used in debugger, contains all tags. 1914 */ 1915struct lu_env lu_debugging_env; 1916 1917/** 1918 * Debugging printer function using printk(). 1919 */ 1920int lu_printk_printer(const struct lu_env *env, 1921 void *unused, const char *format, ...) 1922{ 1923 va_list args; 1924 1925 va_start(args, format); 1926 vprintk(format, args); 1927 va_end(args); 1928 return 0; 1929} 1930 1931static struct shrinker lu_site_shrinker = { 1932 .count_objects = lu_cache_shrink_count, 1933 .scan_objects = lu_cache_shrink_scan, 1934 .seeks = DEFAULT_SEEKS, 1935}; 1936 1937/** 1938 * Initialization of global lu_* data. 1939 */ 1940int lu_global_init(void) 1941{ 1942 int result; 1943 1944 CDEBUG(D_INFO, "Lustre LU module (%p).\n", &lu_keys); 1945 1946 result = lu_ref_global_init(); 1947 if (result != 0) 1948 return result; 1949 1950 LU_CONTEXT_KEY_INIT(&lu_global_key); 1951 result = lu_context_key_register(&lu_global_key); 1952 if (result != 0) 1953 return result; 1954 1955 /* 1956 * At this level, we don't know what tags are needed, so allocate them 1957 * conservatively. This should not be too bad, because this 1958 * environment is global. 1959 */ 1960 mutex_lock(&lu_sites_guard); 1961 result = lu_env_init(&lu_shrink_env, LCT_SHRINKER); 1962 mutex_unlock(&lu_sites_guard); 1963 if (result != 0) 1964 return result; 1965 1966 /* 1967 * seeks estimation: 3 seeks to read a record from oi, one to read 1968 * inode, one for ea. Unfortunately setting this high value results in 1969 * lu_object/inode cache consuming all the memory. 1970 */ 1971 register_shrinker(&lu_site_shrinker); 1972 1973 return result; 1974} 1975 1976/** 1977 * Dual to lu_global_init(). 1978 */ 1979void lu_global_fini(void) 1980{ 1981 unregister_shrinker(&lu_site_shrinker); 1982 lu_context_key_degister(&lu_global_key); 1983 1984 /* 1985 * Tear shrinker environment down _after_ de-registering 1986 * lu_global_key, because the latter has a value in the former. 1987 */ 1988 mutex_lock(&lu_sites_guard); 1989 lu_env_fini(&lu_shrink_env); 1990 mutex_unlock(&lu_sites_guard); 1991 1992 lu_ref_global_fini(); 1993} 1994 1995static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx) 1996{ 1997#if defined (CONFIG_PROC_FS) 1998 struct lprocfs_counter ret; 1999 2000 lprocfs_stats_collect(stats, idx, &ret); 2001 return (__u32)ret.lc_count; 2002#else 2003 return 0; 2004#endif 2005} 2006 2007/** 2008 * Output site statistical counters into a buffer. Suitable for 2009 * lprocfs_rd_*()-style functions. 2010 */ 2011int lu_site_stats_print(const struct lu_site *s, struct seq_file *m) 2012{ 2013 lu_site_stats_t stats; 2014 2015 memset(&stats, 0, sizeof(stats)); 2016 lu_site_stats_get(s->ls_obj_hash, &stats, 1); 2017 2018 return seq_printf(m, "%d/%d %d/%d %d %d %d %d %d %d %d\n", 2019 stats.lss_busy, 2020 stats.lss_total, 2021 stats.lss_populated, 2022 CFS_HASH_NHLIST(s->ls_obj_hash), 2023 stats.lss_max_search, 2024 ls_stats_read(s->ls_stats, LU_SS_CREATED), 2025 ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT), 2026 ls_stats_read(s->ls_stats, LU_SS_CACHE_MISS), 2027 ls_stats_read(s->ls_stats, LU_SS_CACHE_RACE), 2028 ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE), 2029 ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED)); 2030} 2031EXPORT_SYMBOL(lu_site_stats_print); 2032 2033/** 2034 * Helper function to initialize a number of kmem slab caches at once. 2035 */ 2036int lu_kmem_init(struct lu_kmem_descr *caches) 2037{ 2038 int result; 2039 struct lu_kmem_descr *iter = caches; 2040 2041 for (result = 0; iter->ckd_cache != NULL; ++iter) { 2042 *iter->ckd_cache = kmem_cache_create(iter->ckd_name, 2043 iter->ckd_size, 2044 0, 0, NULL); 2045 if (*iter->ckd_cache == NULL) { 2046 result = -ENOMEM; 2047 /* free all previously allocated caches */ 2048 lu_kmem_fini(caches); 2049 break; 2050 } 2051 } 2052 return result; 2053} 2054EXPORT_SYMBOL(lu_kmem_init); 2055 2056/** 2057 * Helper function to finalize a number of kmem slab cached at once. Dual to 2058 * lu_kmem_init(). 2059 */ 2060void lu_kmem_fini(struct lu_kmem_descr *caches) 2061{ 2062 for (; caches->ckd_cache != NULL; ++caches) { 2063 if (*caches->ckd_cache != NULL) { 2064 kmem_cache_destroy(*caches->ckd_cache); 2065 *caches->ckd_cache = NULL; 2066 } 2067 } 2068} 2069EXPORT_SYMBOL(lu_kmem_fini); 2070 2071/** 2072 * Temporary solution to be able to assign fid in ->do_create() 2073 * till we have fully-functional OST fids 2074 */ 2075void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o, 2076 const struct lu_fid *fid) 2077{ 2078 struct lu_site *s = o->lo_dev->ld_site; 2079 struct lu_fid *old = &o->lo_header->loh_fid; 2080 struct lu_site_bkt_data *bkt; 2081 struct lu_object *shadow; 2082 wait_queue_t waiter; 2083 struct cfs_hash *hs; 2084 struct cfs_hash_bd bd; 2085 __u64 version = 0; 2086 2087 LASSERT(fid_is_zero(old)); 2088 2089 hs = s->ls_obj_hash; 2090 cfs_hash_bd_get_and_lock(hs, (void *)fid, &bd, 1); 2091 shadow = htable_lookup(s, &bd, fid, &waiter, &version); 2092 /* supposed to be unique */ 2093 LASSERT(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT); 2094 *old = *fid; 2095 bkt = cfs_hash_bd_extra_get(hs, &bd); 2096 cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash); 2097 bkt->lsb_busy++; 2098 cfs_hash_bd_unlock(hs, &bd, 1); 2099} 2100EXPORT_SYMBOL(lu_object_assign_fid); 2101 2102/** 2103 * allocates object with 0 (non-assigned) fid 2104 * XXX: temporary solution to be able to assign fid in ->do_create() 2105 * till we have fully-functional OST fids 2106 */ 2107struct lu_object *lu_object_anon(const struct lu_env *env, 2108 struct lu_device *dev, 2109 const struct lu_object_conf *conf) 2110{ 2111 struct lu_fid fid; 2112 struct lu_object *o; 2113 2114 fid_zero(&fid); 2115 o = lu_object_alloc(env, dev, &fid, conf); 2116 2117 return o; 2118} 2119EXPORT_SYMBOL(lu_object_anon); 2120 2121struct lu_buf LU_BUF_NULL = { 2122 .lb_buf = NULL, 2123 .lb_len = 0 2124}; 2125EXPORT_SYMBOL(LU_BUF_NULL); 2126 2127void lu_buf_free(struct lu_buf *buf) 2128{ 2129 LASSERT(buf); 2130 if (buf->lb_buf) { 2131 LASSERT(buf->lb_len > 0); 2132 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len); 2133 buf->lb_buf = NULL; 2134 buf->lb_len = 0; 2135 } 2136} 2137EXPORT_SYMBOL(lu_buf_free); 2138 2139void lu_buf_alloc(struct lu_buf *buf, int size) 2140{ 2141 LASSERT(buf); 2142 LASSERT(buf->lb_buf == NULL); 2143 LASSERT(buf->lb_len == 0); 2144 OBD_ALLOC_LARGE(buf->lb_buf, size); 2145 if (likely(buf->lb_buf)) 2146 buf->lb_len = size; 2147} 2148EXPORT_SYMBOL(lu_buf_alloc); 2149 2150void lu_buf_realloc(struct lu_buf *buf, int size) 2151{ 2152 lu_buf_free(buf); 2153 lu_buf_alloc(buf, size); 2154} 2155EXPORT_SYMBOL(lu_buf_realloc); 2156 2157struct lu_buf *lu_buf_check_and_alloc(struct lu_buf *buf, int len) 2158{ 2159 if (buf->lb_buf == NULL && buf->lb_len == 0) 2160 lu_buf_alloc(buf, len); 2161 2162 if ((len > buf->lb_len) && (buf->lb_buf != NULL)) 2163 lu_buf_realloc(buf, len); 2164 2165 return buf; 2166} 2167EXPORT_SYMBOL(lu_buf_check_and_alloc); 2168 2169/** 2170 * Increase the size of the \a buf. 2171 * preserves old data in buffer 2172 * old buffer remains unchanged on error 2173 * \retval 0 or -ENOMEM 2174 */ 2175int lu_buf_check_and_grow(struct lu_buf *buf, int len) 2176{ 2177 char *ptr; 2178 2179 if (len <= buf->lb_len) 2180 return 0; 2181 2182 OBD_ALLOC_LARGE(ptr, len); 2183 if (ptr == NULL) 2184 return -ENOMEM; 2185 2186 /* Free the old buf */ 2187 if (buf->lb_buf != NULL) { 2188 memcpy(ptr, buf->lb_buf, buf->lb_len); 2189 OBD_FREE_LARGE(buf->lb_buf, buf->lb_len); 2190 } 2191 2192 buf->lb_buf = ptr; 2193 buf->lb_len = len; 2194 return 0; 2195} 2196EXPORT_SYMBOL(lu_buf_check_and_grow); 2197