1/* 2 * GPL HEADER START 3 * 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License version 2 only, 8 * as published by the Free Software Foundation. 9 * 10 * This program is distributed in the hope that it will be useful, but 11 * WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * General Public License version 2 for more details (a copy is included 14 * in the LICENSE file that accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License 17 * version 2 along with this program; If not, see 18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf 19 * 20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 21 * CA 95054 USA or visit www.sun.com if you need additional information or 22 * have any questions. 23 * 24 * GPL HEADER END 25 */ 26/* 27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. 28 * Use is subject to license terms. 29 * 30 * Copyright (c) 2011, 2012, Intel Corporation. 31 */ 32/* 33 * This file is part of Lustre, http://www.lustre.org/ 34 * Lustre is a trademark of Sun Microsystems, Inc. 35 * 36 * lustre/obdclass/dt_object.c 37 * 38 * Dt Object. 39 * Generic functions from dt_object.h 40 * 41 * Author: Nikita Danilov <nikita@clusterfs.com> 42 */ 43 44#define DEBUG_SUBSYSTEM S_CLASS 45 46#include "../include/obd.h" 47#include "../include/dt_object.h" 48#include <linux/list.h> 49/* fid_be_to_cpu() */ 50#include "../include/lustre_fid.h" 51 52#include "../include/lustre_quota.h" 53 54/* context key constructor/destructor: dt_global_key_init, dt_global_key_fini */ 55LU_KEY_INIT(dt_global, struct dt_thread_info); 56LU_KEY_FINI(dt_global, struct dt_thread_info); 57 58struct lu_context_key dt_key = { 59 .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD | LCT_MG_THREAD | LCT_LOCAL, 60 .lct_init = dt_global_key_init, 61 .lct_fini = dt_global_key_fini 62}; 63EXPORT_SYMBOL(dt_key); 64 65/* no lock is necessary to protect the list, because call-backs 66 * are added during system startup. Please refer to "struct dt_device". 67 */ 68void dt_txn_callback_add(struct dt_device *dev, struct dt_txn_callback *cb) 69{ 70 list_add(&cb->dtc_linkage, &dev->dd_txn_callbacks); 71} 72EXPORT_SYMBOL(dt_txn_callback_add); 73 74void dt_txn_callback_del(struct dt_device *dev, struct dt_txn_callback *cb) 75{ 76 list_del_init(&cb->dtc_linkage); 77} 78EXPORT_SYMBOL(dt_txn_callback_del); 79 80int dt_txn_hook_start(const struct lu_env *env, 81 struct dt_device *dev, struct thandle *th) 82{ 83 int rc = 0; 84 struct dt_txn_callback *cb; 85 86 if (th->th_local) 87 return 0; 88 89 list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) { 90 if (cb->dtc_txn_start == NULL || 91 !(cb->dtc_tag & env->le_ctx.lc_tags)) 92 continue; 93 rc = cb->dtc_txn_start(env, th, cb->dtc_cookie); 94 if (rc < 0) 95 break; 96 } 97 return rc; 98} 99EXPORT_SYMBOL(dt_txn_hook_start); 100 101int dt_txn_hook_stop(const struct lu_env *env, struct thandle *txn) 102{ 103 struct dt_device *dev = txn->th_dev; 104 struct dt_txn_callback *cb; 105 int rc = 0; 106 107 if (txn->th_local) 108 return 0; 109 110 list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) { 111 if (cb->dtc_txn_stop == NULL || 112 !(cb->dtc_tag & env->le_ctx.lc_tags)) 113 continue; 114 rc = cb->dtc_txn_stop(env, txn, cb->dtc_cookie); 115 if (rc < 0) 116 break; 117 } 118 return rc; 119} 120EXPORT_SYMBOL(dt_txn_hook_stop); 121 122void dt_txn_hook_commit(struct thandle *txn) 123{ 124 struct dt_txn_callback *cb; 125 126 if (txn->th_local) 127 return; 128 129 list_for_each_entry(cb, &txn->th_dev->dd_txn_callbacks, 130 dtc_linkage) { 131 if (cb->dtc_txn_commit) 132 cb->dtc_txn_commit(txn, cb->dtc_cookie); 133 } 134} 135EXPORT_SYMBOL(dt_txn_hook_commit); 136 137int dt_device_init(struct dt_device *dev, struct lu_device_type *t) 138{ 139 140 INIT_LIST_HEAD(&dev->dd_txn_callbacks); 141 return lu_device_init(&dev->dd_lu_dev, t); 142} 143EXPORT_SYMBOL(dt_device_init); 144 145void dt_device_fini(struct dt_device *dev) 146{ 147 lu_device_fini(&dev->dd_lu_dev); 148} 149EXPORT_SYMBOL(dt_device_fini); 150 151int dt_object_init(struct dt_object *obj, 152 struct lu_object_header *h, struct lu_device *d) 153 154{ 155 return lu_object_init(&obj->do_lu, h, d); 156} 157EXPORT_SYMBOL(dt_object_init); 158 159void dt_object_fini(struct dt_object *obj) 160{ 161 lu_object_fini(&obj->do_lu); 162} 163EXPORT_SYMBOL(dt_object_fini); 164 165int dt_try_as_dir(const struct lu_env *env, struct dt_object *obj) 166{ 167 if (obj->do_index_ops == NULL) 168 obj->do_ops->do_index_try(env, obj, &dt_directory_features); 169 return obj->do_index_ops != NULL; 170} 171EXPORT_SYMBOL(dt_try_as_dir); 172 173enum dt_format_type dt_mode_to_dft(__u32 mode) 174{ 175 enum dt_format_type result; 176 177 switch (mode & S_IFMT) { 178 case S_IFDIR: 179 result = DFT_DIR; 180 break; 181 case S_IFREG: 182 result = DFT_REGULAR; 183 break; 184 case S_IFLNK: 185 result = DFT_SYM; 186 break; 187 case S_IFCHR: 188 case S_IFBLK: 189 case S_IFIFO: 190 case S_IFSOCK: 191 result = DFT_NODE; 192 break; 193 default: 194 LBUG(); 195 break; 196 } 197 return result; 198} 199EXPORT_SYMBOL(dt_mode_to_dft); 200 201/** 202 * lookup fid for object named \a name in directory \a dir. 203 */ 204 205int dt_lookup_dir(const struct lu_env *env, struct dt_object *dir, 206 const char *name, struct lu_fid *fid) 207{ 208 if (dt_try_as_dir(env, dir)) 209 return dt_lookup(env, dir, (struct dt_rec *)fid, 210 (const struct dt_key *)name, BYPASS_CAPA); 211 return -ENOTDIR; 212} 213EXPORT_SYMBOL(dt_lookup_dir); 214 215/* this differs from dt_locate by top_dev as parameter 216 * but not one from lu_site */ 217struct dt_object *dt_locate_at(const struct lu_env *env, 218 struct dt_device *dev, const struct lu_fid *fid, 219 struct lu_device *top_dev) 220{ 221 struct lu_object *lo, *n; 222 223 lo = lu_object_find_at(env, top_dev, fid, NULL); 224 if (IS_ERR(lo)) 225 return (void *)lo; 226 227 LASSERT(lo != NULL); 228 229 list_for_each_entry(n, &lo->lo_header->loh_layers, lo_linkage) { 230 if (n->lo_dev == &dev->dd_lu_dev) 231 return container_of0(n, struct dt_object, do_lu); 232 } 233 return ERR_PTR(-ENOENT); 234} 235EXPORT_SYMBOL(dt_locate_at); 236 237/** 238 * find a object named \a entry in given \a dfh->dfh_o directory. 239 */ 240static int dt_find_entry(const struct lu_env *env, const char *entry, void *data) 241{ 242 struct dt_find_hint *dfh = data; 243 struct dt_device *dt = dfh->dfh_dt; 244 struct lu_fid *fid = dfh->dfh_fid; 245 struct dt_object *obj = dfh->dfh_o; 246 int result; 247 248 result = dt_lookup_dir(env, obj, entry, fid); 249 lu_object_put(env, &obj->do_lu); 250 if (result == 0) { 251 obj = dt_locate(env, dt, fid); 252 if (IS_ERR(obj)) 253 result = PTR_ERR(obj); 254 } 255 dfh->dfh_o = obj; 256 return result; 257} 258 259/** 260 * Abstract function which parses path name. This function feeds 261 * path component to \a entry_func. 262 */ 263int dt_path_parser(const struct lu_env *env, 264 char *path, dt_entry_func_t entry_func, 265 void *data) 266{ 267 char *e; 268 int rc = 0; 269 270 while (1) { 271 e = strsep(&path, "/"); 272 if (e == NULL) 273 break; 274 275 if (e[0] == 0) { 276 if (!path || path[0] == '\0') 277 break; 278 continue; 279 } 280 rc = entry_func(env, e, data); 281 if (rc) 282 break; 283 } 284 285 return rc; 286} 287 288struct dt_object * 289dt_store_resolve(const struct lu_env *env, struct dt_device *dt, 290 const char *path, struct lu_fid *fid) 291{ 292 struct dt_thread_info *info = dt_info(env); 293 struct dt_find_hint *dfh = &info->dti_dfh; 294 struct dt_object *obj; 295 char *local = info->dti_buf; 296 int result; 297 298 299 dfh->dfh_dt = dt; 300 dfh->dfh_fid = fid; 301 302 strncpy(local, path, DT_MAX_PATH); 303 local[DT_MAX_PATH - 1] = '\0'; 304 305 result = dt->dd_ops->dt_root_get(env, dt, fid); 306 if (result == 0) { 307 obj = dt_locate(env, dt, fid); 308 if (!IS_ERR(obj)) { 309 dfh->dfh_o = obj; 310 result = dt_path_parser(env, local, dt_find_entry, dfh); 311 if (result != 0) 312 obj = ERR_PTR(result); 313 else 314 obj = dfh->dfh_o; 315 } 316 } else { 317 obj = ERR_PTR(result); 318 } 319 return obj; 320} 321EXPORT_SYMBOL(dt_store_resolve); 322 323static struct dt_object *dt_reg_open(const struct lu_env *env, 324 struct dt_device *dt, 325 struct dt_object *p, 326 const char *name, 327 struct lu_fid *fid) 328{ 329 struct dt_object *o; 330 int result; 331 332 result = dt_lookup_dir(env, p, name, fid); 333 if (result == 0){ 334 o = dt_locate(env, dt, fid); 335 } 336 else 337 o = ERR_PTR(result); 338 339 return o; 340} 341 342/** 343 * Open dt object named \a filename from \a dirname directory. 344 * \param dt dt device 345 * \param fid on success, object fid is stored in *fid 346 */ 347struct dt_object *dt_store_open(const struct lu_env *env, 348 struct dt_device *dt, 349 const char *dirname, 350 const char *filename, 351 struct lu_fid *fid) 352{ 353 struct dt_object *file; 354 struct dt_object *dir; 355 356 dir = dt_store_resolve(env, dt, dirname, fid); 357 if (!IS_ERR(dir)) { 358 file = dt_reg_open(env, dt, dir, 359 filename, fid); 360 lu_object_put(env, &dir->do_lu); 361 } else { 362 file = dir; 363 } 364 return file; 365} 366EXPORT_SYMBOL(dt_store_open); 367 368struct dt_object *dt_find_or_create(const struct lu_env *env, 369 struct dt_device *dt, 370 const struct lu_fid *fid, 371 struct dt_object_format *dof, 372 struct lu_attr *at) 373{ 374 struct dt_object *dto; 375 struct thandle *th; 376 int rc; 377 378 dto = dt_locate(env, dt, fid); 379 if (IS_ERR(dto)) 380 return dto; 381 382 LASSERT(dto != NULL); 383 if (dt_object_exists(dto)) 384 return dto; 385 386 th = dt_trans_create(env, dt); 387 if (IS_ERR(th)) { 388 rc = PTR_ERR(th); 389 goto out; 390 } 391 392 rc = dt_declare_create(env, dto, at, NULL, dof, th); 393 if (rc) 394 goto trans_stop; 395 396 rc = dt_trans_start_local(env, dt, th); 397 if (rc) 398 goto trans_stop; 399 400 dt_write_lock(env, dto, 0); 401 if (dt_object_exists(dto)) { 402 rc = 0; 403 goto unlock; 404 } 405 406 CDEBUG(D_OTHER, "create new object "DFID"\n", PFID(fid)); 407 408 rc = dt_create(env, dto, at, NULL, dof, th); 409 if (rc) 410 goto unlock; 411 LASSERT(dt_object_exists(dto)); 412unlock: 413 dt_write_unlock(env, dto); 414trans_stop: 415 dt_trans_stop(env, dt, th); 416out: 417 if (rc) { 418 lu_object_put(env, &dto->do_lu); 419 return ERR_PTR(rc); 420 } 421 return dto; 422} 423EXPORT_SYMBOL(dt_find_or_create); 424 425/* dt class init function. */ 426int dt_global_init(void) 427{ 428 int result; 429 430 LU_CONTEXT_KEY_INIT(&dt_key); 431 result = lu_context_key_register(&dt_key); 432 return result; 433} 434 435void dt_global_fini(void) 436{ 437 lu_context_key_degister(&dt_key); 438} 439 440/** 441 * Generic read helper. May return an error for partial reads. 442 * 443 * \param env lustre environment 444 * \param dt object to be read 445 * \param buf lu_buf to be filled, with buffer pointer and length 446 * \param pos position to start reading, updated as data is read 447 * 448 * \retval real size of data read 449 * \retval -ve errno on failure 450 */ 451int dt_read(const struct lu_env *env, struct dt_object *dt, 452 struct lu_buf *buf, loff_t *pos) 453{ 454 LASSERTF(dt != NULL, "dt is NULL when we want to read record\n"); 455 return dt->do_body_ops->dbo_read(env, dt, buf, pos, BYPASS_CAPA); 456} 457EXPORT_SYMBOL(dt_read); 458 459/** 460 * Read structures of fixed size from storage. Unlike dt_read(), using 461 * dt_record_read() will return an error for partial reads. 462 * 463 * \param env lustre environment 464 * \param dt object to be read 465 * \param buf lu_buf to be filled, with buffer pointer and length 466 * \param pos position to start reading, updated as data is read 467 * 468 * \retval 0 on successfully reading full buffer 469 * \retval -EFAULT on short read 470 * \retval -ve errno on failure 471 */ 472int dt_record_read(const struct lu_env *env, struct dt_object *dt, 473 struct lu_buf *buf, loff_t *pos) 474{ 475 int rc; 476 477 LASSERTF(dt != NULL, "dt is NULL when we want to read record\n"); 478 479 rc = dt->do_body_ops->dbo_read(env, dt, buf, pos, BYPASS_CAPA); 480 481 if (rc == buf->lb_len) 482 rc = 0; 483 else if (rc >= 0) 484 rc = -EFAULT; 485 return rc; 486} 487EXPORT_SYMBOL(dt_record_read); 488 489int dt_record_write(const struct lu_env *env, struct dt_object *dt, 490 const struct lu_buf *buf, loff_t *pos, struct thandle *th) 491{ 492 int rc; 493 494 LASSERTF(dt != NULL, "dt is NULL when we want to write record\n"); 495 LASSERT(th != NULL); 496 LASSERT(dt->do_body_ops); 497 LASSERT(dt->do_body_ops->dbo_write); 498 rc = dt->do_body_ops->dbo_write(env, dt, buf, pos, th, BYPASS_CAPA, 1); 499 if (rc == buf->lb_len) 500 rc = 0; 501 else if (rc >= 0) 502 rc = -EFAULT; 503 return rc; 504} 505EXPORT_SYMBOL(dt_record_write); 506 507int dt_declare_version_set(const struct lu_env *env, struct dt_object *o, 508 struct thandle *th) 509{ 510 struct lu_buf vbuf; 511 char *xname = XATTR_NAME_VERSION; 512 513 LASSERT(o); 514 vbuf.lb_buf = NULL; 515 vbuf.lb_len = sizeof(dt_obj_version_t); 516 return dt_declare_xattr_set(env, o, &vbuf, xname, 0, th); 517 518} 519EXPORT_SYMBOL(dt_declare_version_set); 520 521void dt_version_set(const struct lu_env *env, struct dt_object *o, 522 dt_obj_version_t version, struct thandle *th) 523{ 524 struct lu_buf vbuf; 525 char *xname = XATTR_NAME_VERSION; 526 int rc; 527 528 LASSERT(o); 529 vbuf.lb_buf = &version; 530 vbuf.lb_len = sizeof(version); 531 532 rc = dt_xattr_set(env, o, &vbuf, xname, 0, th, BYPASS_CAPA); 533 if (rc < 0) 534 CDEBUG(D_INODE, "Can't set version, rc %d\n", rc); 535 return; 536} 537EXPORT_SYMBOL(dt_version_set); 538 539dt_obj_version_t dt_version_get(const struct lu_env *env, struct dt_object *o) 540{ 541 struct lu_buf vbuf; 542 char *xname = XATTR_NAME_VERSION; 543 dt_obj_version_t version; 544 int rc; 545 546 LASSERT(o); 547 vbuf.lb_buf = &version; 548 vbuf.lb_len = sizeof(version); 549 rc = dt_xattr_get(env, o, &vbuf, xname, BYPASS_CAPA); 550 if (rc != sizeof(version)) { 551 CDEBUG(D_INODE, "Can't get version, rc %d\n", rc); 552 version = 0; 553 } 554 return version; 555} 556EXPORT_SYMBOL(dt_version_get); 557 558/* list of all supported index types */ 559 560/* directories */ 561const struct dt_index_features dt_directory_features; 562EXPORT_SYMBOL(dt_directory_features); 563 564/* scrub iterator */ 565const struct dt_index_features dt_otable_features; 566EXPORT_SYMBOL(dt_otable_features); 567 568/* lfsck */ 569const struct dt_index_features dt_lfsck_features = { 570 .dif_flags = DT_IND_UPDATE, 571 .dif_keysize_min = sizeof(struct lu_fid), 572 .dif_keysize_max = sizeof(struct lu_fid), 573 .dif_recsize_min = sizeof(__u8), 574 .dif_recsize_max = sizeof(__u8), 575 .dif_ptrsize = 4 576}; 577EXPORT_SYMBOL(dt_lfsck_features); 578 579/* accounting indexes */ 580const struct dt_index_features dt_acct_features = { 581 .dif_flags = DT_IND_UPDATE, 582 .dif_keysize_min = sizeof(__u64), /* 64-bit uid/gid */ 583 .dif_keysize_max = sizeof(__u64), /* 64-bit uid/gid */ 584 .dif_recsize_min = sizeof(struct lquota_acct_rec), /* 16 bytes */ 585 .dif_recsize_max = sizeof(struct lquota_acct_rec), /* 16 bytes */ 586 .dif_ptrsize = 4 587}; 588EXPORT_SYMBOL(dt_acct_features); 589 590/* global quota files */ 591const struct dt_index_features dt_quota_glb_features = { 592 .dif_flags = DT_IND_UPDATE, 593 /* a different key would have to be used for per-directory quota */ 594 .dif_keysize_min = sizeof(__u64), /* 64-bit uid/gid */ 595 .dif_keysize_max = sizeof(__u64), /* 64-bit uid/gid */ 596 .dif_recsize_min = sizeof(struct lquota_glb_rec), /* 32 bytes */ 597 .dif_recsize_max = sizeof(struct lquota_glb_rec), /* 32 bytes */ 598 .dif_ptrsize = 4 599}; 600EXPORT_SYMBOL(dt_quota_glb_features); 601 602/* slave quota files */ 603const struct dt_index_features dt_quota_slv_features = { 604 .dif_flags = DT_IND_UPDATE, 605 /* a different key would have to be used for per-directory quota */ 606 .dif_keysize_min = sizeof(__u64), /* 64-bit uid/gid */ 607 .dif_keysize_max = sizeof(__u64), /* 64-bit uid/gid */ 608 .dif_recsize_min = sizeof(struct lquota_slv_rec), /* 8 bytes */ 609 .dif_recsize_max = sizeof(struct lquota_slv_rec), /* 8 bytes */ 610 .dif_ptrsize = 4 611}; 612EXPORT_SYMBOL(dt_quota_slv_features); 613 614/* helper function returning what dt_index_features structure should be used 615 * based on the FID sequence. This is used by OBD_IDX_READ RPC */ 616static inline const struct dt_index_features *dt_index_feat_select(__u64 seq, 617 __u32 mode) 618{ 619 if (seq == FID_SEQ_QUOTA_GLB) { 620 /* global quota index */ 621 if (!S_ISREG(mode)) 622 /* global quota index should be a regular file */ 623 return ERR_PTR(-ENOENT); 624 return &dt_quota_glb_features; 625 } else if (seq == FID_SEQ_QUOTA) { 626 /* quota slave index */ 627 if (!S_ISREG(mode)) 628 /* slave index should be a regular file */ 629 return ERR_PTR(-ENOENT); 630 return &dt_quota_slv_features; 631 } else if (seq >= FID_SEQ_NORMAL) { 632 /* object is part of the namespace, verify that it is a 633 * directory */ 634 if (!S_ISDIR(mode)) 635 /* sorry, we can only deal with directory */ 636 return ERR_PTR(-ENOTDIR); 637 return &dt_directory_features; 638 } 639 640 return ERR_PTR(-EOPNOTSUPP); 641} 642 643/* 644 * Fill a lu_idxpage with key/record pairs read for transfer via OBD_IDX_READ 645 * RPC 646 * 647 * \param env - is the environment passed by the caller 648 * \param lp - is a pointer to the lu_page to fill 649 * \param nob - is the maximum number of bytes that should be copied 650 * \param iops - is the index operation vector associated with the index object 651 * \param it - is a pointer to the current iterator 652 * \param attr - is the index attribute to pass to iops->rec() 653 * \param arg - is a pointer to the idx_info structure 654 */ 655static int dt_index_page_build(const struct lu_env *env, union lu_page *lp, 656 int nob, const struct dt_it_ops *iops, 657 struct dt_it *it, __u32 attr, void *arg) 658{ 659 struct idx_info *ii = (struct idx_info *)arg; 660 struct lu_idxpage *lip = &lp->lp_idx; 661 char *entry; 662 int rc, size; 663 664 /* no support for variable key & record size for now */ 665 LASSERT((ii->ii_flags & II_FL_VARKEY) == 0); 666 LASSERT((ii->ii_flags & II_FL_VARREC) == 0); 667 668 /* initialize the header of the new container */ 669 memset(lip, 0, LIP_HDR_SIZE); 670 lip->lip_magic = LIP_MAGIC; 671 nob -= LIP_HDR_SIZE; 672 673 /* compute size needed to store a key/record pair */ 674 size = ii->ii_recsize + ii->ii_keysize; 675 if ((ii->ii_flags & II_FL_NOHASH) == 0) 676 /* add hash if the client wants it */ 677 size += sizeof(__u64); 678 679 entry = lip->lip_entries; 680 do { 681 char *tmp_entry = entry; 682 struct dt_key *key; 683 __u64 hash; 684 685 /* fetch 64-bit hash value */ 686 hash = iops->store(env, it); 687 ii->ii_hash_end = hash; 688 689 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_IDX_READ_BREAK)) { 690 if (lip->lip_nr != 0) { 691 rc = 0; 692 goto out; 693 } 694 } 695 696 if (nob < size) { 697 if (lip->lip_nr == 0) 698 rc = -EINVAL; 699 else 700 rc = 0; 701 goto out; 702 } 703 704 if ((ii->ii_flags & II_FL_NOHASH) == 0) { 705 /* client wants to the 64-bit hash value associated with 706 * each record */ 707 memcpy(tmp_entry, &hash, sizeof(hash)); 708 tmp_entry += sizeof(hash); 709 } 710 711 /* then the key value */ 712 LASSERT(iops->key_size(env, it) == ii->ii_keysize); 713 key = iops->key(env, it); 714 memcpy(tmp_entry, key, ii->ii_keysize); 715 tmp_entry += ii->ii_keysize; 716 717 /* and finally the record */ 718 rc = iops->rec(env, it, (struct dt_rec *)tmp_entry, attr); 719 if (rc != -ESTALE) { 720 if (rc != 0) 721 goto out; 722 723 /* hash/key/record successfully copied! */ 724 lip->lip_nr++; 725 if (unlikely(lip->lip_nr == 1 && ii->ii_count == 0)) 726 ii->ii_hash_start = hash; 727 entry = tmp_entry + ii->ii_recsize; 728 nob -= size; 729 } 730 731 /* move on to the next record */ 732 do { 733 rc = iops->next(env, it); 734 } while (rc == -ESTALE); 735 736 } while (rc == 0); 737 738 goto out; 739out: 740 if (rc >= 0 && lip->lip_nr > 0) 741 /* one more container */ 742 ii->ii_count++; 743 if (rc > 0) 744 /* no more entries */ 745 ii->ii_hash_end = II_END_OFF; 746 return rc; 747} 748 749/* 750 * Walk index and fill lu_page containers with key/record pairs 751 * 752 * \param env - is the environment passed by the caller 753 * \param obj - is the index object to parse 754 * \param rdpg - is the lu_rdpg descriptor associated with the transfer 755 * \param filler - is the callback function responsible for filling a lu_page 756 * with key/record pairs in the format wanted by the caller 757 * \param arg - is an opaq argument passed to the filler function 758 * 759 * \retval sum (in bytes) of all filled lu_pages 760 * \retval -ve errno on failure 761 */ 762int dt_index_walk(const struct lu_env *env, struct dt_object *obj, 763 const struct lu_rdpg *rdpg, dt_index_page_build_t filler, 764 void *arg) 765{ 766 struct dt_it *it; 767 const struct dt_it_ops *iops; 768 unsigned int pageidx, nob, nlupgs = 0; 769 int rc; 770 771 LASSERT(rdpg->rp_pages != NULL); 772 LASSERT(obj->do_index_ops != NULL); 773 774 nob = rdpg->rp_count; 775 if (nob <= 0) 776 return -EFAULT; 777 778 /* Iterate through index and fill containers from @rdpg */ 779 iops = &obj->do_index_ops->dio_it; 780 LASSERT(iops != NULL); 781 it = iops->init(env, obj, rdpg->rp_attrs, BYPASS_CAPA); 782 if (IS_ERR(it)) 783 return PTR_ERR(it); 784 785 rc = iops->load(env, it, rdpg->rp_hash); 786 if (rc == 0) { 787 /* 788 * Iterator didn't find record with exactly the key requested. 789 * 790 * It is currently either 791 * 792 * - positioned above record with key less than 793 * requested---skip it. 794 * - or not positioned at all (is in IAM_IT_SKEWED 795 * state)---position it on the next item. 796 */ 797 rc = iops->next(env, it); 798 } else if (rc > 0) { 799 rc = 0; 800 } 801 802 /* Fill containers one after the other. There might be multiple 803 * containers per physical page. 804 * 805 * At this point and across for-loop: 806 * rc == 0 -> ok, proceed. 807 * rc > 0 -> end of index. 808 * rc < 0 -> error. */ 809 for (pageidx = 0; rc == 0 && nob > 0; pageidx++) { 810 union lu_page *lp; 811 int i; 812 813 LASSERT(pageidx < rdpg->rp_npages); 814 lp = kmap(rdpg->rp_pages[pageidx]); 815 816 /* fill lu pages */ 817 for (i = 0; i < LU_PAGE_COUNT; i++, lp++, nob -= LU_PAGE_SIZE) { 818 rc = filler(env, lp, min_t(int, nob, LU_PAGE_SIZE), 819 iops, it, rdpg->rp_attrs, arg); 820 if (rc < 0) 821 break; 822 /* one more lu_page */ 823 nlupgs++; 824 if (rc > 0) 825 /* end of index */ 826 break; 827 } 828 kunmap(rdpg->rp_pages[i]); 829 } 830 831 iops->put(env, it); 832 iops->fini(env, it); 833 834 if (rc >= 0) 835 rc = min_t(unsigned int, nlupgs * LU_PAGE_SIZE, rdpg->rp_count); 836 837 return rc; 838} 839EXPORT_SYMBOL(dt_index_walk); 840 841/** 842 * Walk key/record pairs of an index and copy them into 4KB containers to be 843 * transferred over the network. This is the common handler for OBD_IDX_READ 844 * RPC processing. 845 * 846 * \param env - is the environment passed by the caller 847 * \param dev - is the dt_device storing the index 848 * \param ii - is the idx_info structure packed by the client in the 849 * OBD_IDX_READ request 850 * \param rdpg - is the lu_rdpg descriptor 851 * 852 * \retval on success, return sum (in bytes) of all filled containers 853 * \retval appropriate error otherwise. 854 */ 855int dt_index_read(const struct lu_env *env, struct dt_device *dev, 856 struct idx_info *ii, const struct lu_rdpg *rdpg) 857{ 858 const struct dt_index_features *feat; 859 struct dt_object *obj; 860 int rc; 861 862 /* rp_count shouldn't be null and should be a multiple of the container 863 * size */ 864 if (rdpg->rp_count <= 0 && (rdpg->rp_count & (LU_PAGE_SIZE - 1)) != 0) 865 return -EFAULT; 866 867 if (fid_seq(&ii->ii_fid) >= FID_SEQ_NORMAL) 868 /* we don't support directory transfer via OBD_IDX_READ for the 869 * time being */ 870 return -EOPNOTSUPP; 871 872 if (!fid_is_quota(&ii->ii_fid)) 873 /* block access to all local files except quota files */ 874 return -EPERM; 875 876 /* lookup index object subject to the transfer */ 877 obj = dt_locate(env, dev, &ii->ii_fid); 878 if (IS_ERR(obj)) 879 return PTR_ERR(obj); 880 if (dt_object_exists(obj) == 0) { 881 rc = -ENOENT; 882 goto out; 883 } 884 885 /* fetch index features associated with index object */ 886 feat = dt_index_feat_select(fid_seq(&ii->ii_fid), 887 lu_object_attr(&obj->do_lu)); 888 if (IS_ERR(feat)) { 889 rc = PTR_ERR(feat); 890 goto out; 891 } 892 893 /* load index feature if not done already */ 894 if (obj->do_index_ops == NULL) { 895 rc = obj->do_ops->do_index_try(env, obj, feat); 896 if (rc) 897 goto out; 898 } 899 900 /* fill ii_flags with supported index features */ 901 ii->ii_flags &= II_FL_NOHASH; 902 903 ii->ii_keysize = feat->dif_keysize_max; 904 if ((feat->dif_flags & DT_IND_VARKEY) != 0) { 905 /* key size is variable */ 906 ii->ii_flags |= II_FL_VARKEY; 907 /* we don't support variable key size for the time being */ 908 rc = -EOPNOTSUPP; 909 goto out; 910 } 911 912 ii->ii_recsize = feat->dif_recsize_max; 913 if ((feat->dif_flags & DT_IND_VARREC) != 0) { 914 /* record size is variable */ 915 ii->ii_flags |= II_FL_VARREC; 916 /* we don't support variable record size for the time being */ 917 rc = -EOPNOTSUPP; 918 goto out; 919 } 920 921 if ((feat->dif_flags & DT_IND_NONUNQ) != 0) 922 /* key isn't necessarily unique */ 923 ii->ii_flags |= II_FL_NONUNQ; 924 925 dt_read_lock(env, obj, 0); 926 /* fetch object version before walking the index */ 927 ii->ii_version = dt_version_get(env, obj); 928 929 /* walk the index and fill lu_idxpages with key/record pairs */ 930 rc = dt_index_walk(env, obj, rdpg, dt_index_page_build ,ii); 931 dt_read_unlock(env, obj); 932 933 if (rc == 0) { 934 /* index is empty */ 935 LASSERT(ii->ii_count == 0); 936 ii->ii_hash_end = II_END_OFF; 937 } 938 939 goto out; 940out: 941 lu_object_put(env, &obj->do_lu); 942 return rc; 943} 944EXPORT_SYMBOL(dt_index_read); 945 946#if defined (CONFIG_PROC_FS) 947 948int lprocfs_dt_rd_blksize(char *page, char **start, off_t off, 949 int count, int *eof, void *data) 950{ 951 struct dt_device *dt = data; 952 struct obd_statfs osfs; 953 954 int rc = dt_statfs(NULL, dt, &osfs); 955 if (rc == 0) { 956 *eof = 1; 957 rc = snprintf(page, count, "%u\n", 958 (unsigned) osfs.os_bsize); 959 } 960 961 return rc; 962} 963EXPORT_SYMBOL(lprocfs_dt_rd_blksize); 964 965int lprocfs_dt_rd_kbytestotal(char *page, char **start, off_t off, 966 int count, int *eof, void *data) 967{ 968 struct dt_device *dt = data; 969 struct obd_statfs osfs; 970 971 int rc = dt_statfs(NULL, dt, &osfs); 972 if (rc == 0) { 973 __u32 blk_size = osfs.os_bsize >> 10; 974 __u64 result = osfs.os_blocks; 975 976 while (blk_size >>= 1) 977 result <<= 1; 978 979 *eof = 1; 980 rc = snprintf(page, count, "%llu\n", result); 981 } 982 983 return rc; 984} 985EXPORT_SYMBOL(lprocfs_dt_rd_kbytestotal); 986 987int lprocfs_dt_rd_kbytesfree(char *page, char **start, off_t off, 988 int count, int *eof, void *data) 989{ 990 struct dt_device *dt = data; 991 struct obd_statfs osfs; 992 993 int rc = dt_statfs(NULL, dt, &osfs); 994 if (rc == 0) { 995 __u32 blk_size = osfs.os_bsize >> 10; 996 __u64 result = osfs.os_bfree; 997 998 while (blk_size >>= 1) 999 result <<= 1; 1000 1001 *eof = 1; 1002 rc = snprintf(page, count, "%llu\n", result); 1003 } 1004 1005 return rc; 1006} 1007EXPORT_SYMBOL(lprocfs_dt_rd_kbytesfree); 1008 1009int lprocfs_dt_rd_kbytesavail(char *page, char **start, off_t off, 1010 int count, int *eof, void *data) 1011{ 1012 struct dt_device *dt = data; 1013 struct obd_statfs osfs; 1014 1015 int rc = dt_statfs(NULL, dt, &osfs); 1016 if (rc == 0) { 1017 __u32 blk_size = osfs.os_bsize >> 10; 1018 __u64 result = osfs.os_bavail; 1019 1020 while (blk_size >>= 1) 1021 result <<= 1; 1022 1023 *eof = 1; 1024 rc = snprintf(page, count, "%llu\n", result); 1025 } 1026 1027 return rc; 1028} 1029EXPORT_SYMBOL(lprocfs_dt_rd_kbytesavail); 1030 1031int lprocfs_dt_rd_filestotal(char *page, char **start, off_t off, 1032 int count, int *eof, void *data) 1033{ 1034 struct dt_device *dt = data; 1035 struct obd_statfs osfs; 1036 1037 int rc = dt_statfs(NULL, dt, &osfs); 1038 if (rc == 0) { 1039 *eof = 1; 1040 rc = snprintf(page, count, "%llu\n", osfs.os_files); 1041 } 1042 1043 return rc; 1044} 1045EXPORT_SYMBOL(lprocfs_dt_rd_filestotal); 1046 1047int lprocfs_dt_rd_filesfree(char *page, char **start, off_t off, 1048 int count, int *eof, void *data) 1049{ 1050 struct dt_device *dt = data; 1051 struct obd_statfs osfs; 1052 1053 int rc = dt_statfs(NULL, dt, &osfs); 1054 if (rc == 0) { 1055 *eof = 1; 1056 rc = snprintf(page, count, "%llu\n", osfs.os_ffree); 1057 } 1058 1059 return rc; 1060} 1061EXPORT_SYMBOL(lprocfs_dt_rd_filesfree); 1062 1063#endif /* CONFIG_PROC_FS */ 1064