1/* 2 * GPL HEADER START 3 * 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License version 2 only, 8 * as published by the Free Software Foundation. 9 * 10 * This program is distributed in the hope that it will be useful, but 11 * WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * General Public License version 2 for more details (a copy is included 14 * in the LICENSE file that accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License 17 * version 2 along with this program; If not, see 18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf 19 * 20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 21 * CA 95054 USA or visit www.sun.com if you need additional information or 22 * have any questions. 23 * 24 * GPL HEADER END 25 */ 26/* 27 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved. 28 * Use is subject to license terms. 29 * 30 * Copyright (c) 2011, 2012, Intel Corporation. 31 */ 32/* 33 * This file is part of Lustre, http://www.lustre.org/ 34 * Lustre is a trademark of Sun Microsystems, Inc. 35 */ 36 37#define DEBUG_SUBSYSTEM S_LOV 38 39#include "../../include/linux/libcfs/libcfs.h" 40 41#include "../include/obd_class.h" 42#include "../include/lustre/lustre_idl.h" 43#include "lov_internal.h" 44 45static void lov_init_set(struct lov_request_set *set) 46{ 47 set->set_count = 0; 48 atomic_set(&set->set_completes, 0); 49 atomic_set(&set->set_success, 0); 50 atomic_set(&set->set_finish_checked, 0); 51 set->set_cookies = NULL; 52 INIT_LIST_HEAD(&set->set_list); 53 atomic_set(&set->set_refcount, 1); 54 init_waitqueue_head(&set->set_waitq); 55 spin_lock_init(&set->set_lock); 56} 57 58void lov_finish_set(struct lov_request_set *set) 59{ 60 struct list_head *pos, *n; 61 62 LASSERT(set); 63 list_for_each_safe(pos, n, &set->set_list) { 64 struct lov_request *req = list_entry(pos, 65 struct lov_request, 66 rq_link); 67 list_del_init(&req->rq_link); 68 69 if (req->rq_oi.oi_oa) 70 OBDO_FREE(req->rq_oi.oi_oa); 71 if (req->rq_oi.oi_md) 72 OBD_FREE_LARGE(req->rq_oi.oi_md, req->rq_buflen); 73 if (req->rq_oi.oi_osfs) 74 OBD_FREE(req->rq_oi.oi_osfs, 75 sizeof(*req->rq_oi.oi_osfs)); 76 OBD_FREE(req, sizeof(*req)); 77 } 78 79 if (set->set_pga) { 80 int len = set->set_oabufs * sizeof(*set->set_pga); 81 OBD_FREE_LARGE(set->set_pga, len); 82 } 83 if (set->set_lockh) 84 lov_llh_put(set->set_lockh); 85 86 OBD_FREE(set, sizeof(*set)); 87} 88 89int lov_set_finished(struct lov_request_set *set, int idempotent) 90{ 91 int completes = atomic_read(&set->set_completes); 92 93 CDEBUG(D_INFO, "check set %d/%d\n", completes, set->set_count); 94 95 if (completes == set->set_count) { 96 if (idempotent) 97 return 1; 98 if (atomic_inc_return(&set->set_finish_checked) == 1) 99 return 1; 100 } 101 return 0; 102} 103 104void lov_update_set(struct lov_request_set *set, 105 struct lov_request *req, int rc) 106{ 107 req->rq_complete = 1; 108 req->rq_rc = rc; 109 110 atomic_inc(&set->set_completes); 111 if (rc == 0) 112 atomic_inc(&set->set_success); 113 114 wake_up(&set->set_waitq); 115} 116 117int lov_update_common_set(struct lov_request_set *set, 118 struct lov_request *req, int rc) 119{ 120 struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; 121 122 lov_update_set(set, req, rc); 123 124 /* grace error on inactive ost */ 125 if (rc && !(lov->lov_tgts[req->rq_idx] && 126 lov->lov_tgts[req->rq_idx]->ltd_active)) 127 rc = 0; 128 129 /* FIXME in raid1 regime, should return 0 */ 130 return rc; 131} 132 133void lov_set_add_req(struct lov_request *req, struct lov_request_set *set) 134{ 135 list_add_tail(&req->rq_link, &set->set_list); 136 set->set_count++; 137 req->rq_rqset = set; 138} 139 140static int lov_check_set(struct lov_obd *lov, int idx) 141{ 142 int rc; 143 struct lov_tgt_desc *tgt; 144 145 mutex_lock(&lov->lov_lock); 146 tgt = lov->lov_tgts[idx]; 147 rc = !tgt || tgt->ltd_active || 148 (tgt->ltd_exp && 149 class_exp2cliimp(tgt->ltd_exp)->imp_connect_tried); 150 mutex_unlock(&lov->lov_lock); 151 152 return rc; 153} 154 155/* Check if the OSC connection exists and is active. 156 * If the OSC has not yet had a chance to connect to the OST the first time, 157 * wait once for it to connect instead of returning an error. 158 */ 159int lov_check_and_wait_active(struct lov_obd *lov, int ost_idx) 160{ 161 wait_queue_head_t waitq; 162 struct l_wait_info lwi; 163 struct lov_tgt_desc *tgt; 164 int rc = 0; 165 166 mutex_lock(&lov->lov_lock); 167 168 tgt = lov->lov_tgts[ost_idx]; 169 170 if (unlikely(tgt == NULL)) { 171 rc = 0; 172 goto out; 173 } 174 175 if (likely(tgt->ltd_active)) { 176 rc = 1; 177 goto out; 178 } 179 180 if (tgt->ltd_exp && class_exp2cliimp(tgt->ltd_exp)->imp_connect_tried) { 181 rc = 0; 182 goto out; 183 } 184 185 mutex_unlock(&lov->lov_lock); 186 187 init_waitqueue_head(&waitq); 188 lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(obd_timeout), 189 cfs_time_seconds(1), NULL, NULL); 190 191 rc = l_wait_event(waitq, lov_check_set(lov, ost_idx), &lwi); 192 if (tgt != NULL && tgt->ltd_active) 193 return 1; 194 195 return 0; 196 197out: 198 mutex_unlock(&lov->lov_lock); 199 return rc; 200} 201 202static int common_attr_done(struct lov_request_set *set) 203{ 204 struct list_head *pos; 205 struct lov_request *req; 206 struct obdo *tmp_oa; 207 int rc = 0, attrset = 0; 208 209 LASSERT(set->set_oi != NULL); 210 211 if (set->set_oi->oi_oa == NULL) 212 return 0; 213 214 if (!atomic_read(&set->set_success)) 215 return -EIO; 216 217 OBDO_ALLOC(tmp_oa); 218 if (tmp_oa == NULL) { 219 rc = -ENOMEM; 220 goto out; 221 } 222 223 list_for_each(pos, &set->set_list) { 224 req = list_entry(pos, struct lov_request, rq_link); 225 226 if (!req->rq_complete || req->rq_rc) 227 continue; 228 if (req->rq_oi.oi_oa->o_valid == 0) /* inactive stripe */ 229 continue; 230 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa, 231 req->rq_oi.oi_oa->o_valid, 232 set->set_oi->oi_md, req->rq_stripe, &attrset); 233 } 234 if (!attrset) { 235 CERROR("No stripes had valid attrs\n"); 236 rc = -EIO; 237 } 238 if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) && 239 (set->set_oi->oi_md->lsm_stripe_count != attrset)) { 240 /* When we take attributes of some epoch, we require all the 241 * ost to be active. */ 242 CERROR("Not all the stripes had valid attrs\n"); 243 rc = -EIO; 244 goto out; 245 } 246 247 tmp_oa->o_oi = set->set_oi->oi_oa->o_oi; 248 memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa)); 249out: 250 if (tmp_oa) 251 OBDO_FREE(tmp_oa); 252 return rc; 253 254} 255 256int lov_fini_getattr_set(struct lov_request_set *set) 257{ 258 int rc = 0; 259 260 if (set == NULL) 261 return 0; 262 LASSERT(set->set_exp); 263 if (atomic_read(&set->set_completes)) 264 rc = common_attr_done(set); 265 266 lov_put_reqset(set); 267 268 return rc; 269} 270 271/* The callback for osc_getattr_async that finalizes a request info when a 272 * response is received. */ 273static int cb_getattr_update(void *cookie, int rc) 274{ 275 struct obd_info *oinfo = cookie; 276 struct lov_request *lovreq; 277 278 lovreq = container_of(oinfo, struct lov_request, rq_oi); 279 return lov_update_common_set(lovreq->rq_rqset, lovreq, rc); 280} 281 282int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo, 283 struct lov_request_set **reqset) 284{ 285 struct lov_request_set *set; 286 struct lov_obd *lov = &exp->exp_obd->u.lov; 287 int rc = 0, i; 288 289 OBD_ALLOC(set, sizeof(*set)); 290 if (set == NULL) 291 return -ENOMEM; 292 lov_init_set(set); 293 294 set->set_exp = exp; 295 set->set_oi = oinfo; 296 297 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) { 298 struct lov_oinfo *loi; 299 struct lov_request *req; 300 301 loi = oinfo->oi_md->lsm_oinfo[i]; 302 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) { 303 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); 304 if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH) { 305 /* SOM requires all the OSTs to be active. */ 306 rc = -EIO; 307 goto out_set; 308 } 309 continue; 310 } 311 312 OBD_ALLOC(req, sizeof(*req)); 313 if (req == NULL) { 314 rc = -ENOMEM; 315 goto out_set; 316 } 317 318 req->rq_stripe = i; 319 req->rq_idx = loi->loi_ost_idx; 320 321 OBDO_ALLOC(req->rq_oi.oi_oa); 322 if (req->rq_oi.oi_oa == NULL) { 323 OBD_FREE(req, sizeof(*req)); 324 rc = -ENOMEM; 325 goto out_set; 326 } 327 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa, 328 sizeof(*req->rq_oi.oi_oa)); 329 req->rq_oi.oi_oa->o_oi = loi->loi_oi; 330 req->rq_oi.oi_cb_up = cb_getattr_update; 331 req->rq_oi.oi_capa = oinfo->oi_capa; 332 333 lov_set_add_req(req, set); 334 } 335 if (!set->set_count) { 336 rc = -EIO; 337 goto out_set; 338 } 339 *reqset = set; 340 return rc; 341out_set: 342 lov_fini_getattr_set(set); 343 return rc; 344} 345 346int lov_fini_destroy_set(struct lov_request_set *set) 347{ 348 if (set == NULL) 349 return 0; 350 LASSERT(set->set_exp); 351 if (atomic_read(&set->set_completes)) { 352 /* FIXME update qos data here */ 353 } 354 355 lov_put_reqset(set); 356 357 return 0; 358} 359 360int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo, 361 struct obdo *src_oa, struct lov_stripe_md *lsm, 362 struct obd_trans_info *oti, 363 struct lov_request_set **reqset) 364{ 365 struct lov_request_set *set; 366 struct lov_obd *lov = &exp->exp_obd->u.lov; 367 int rc = 0, i; 368 369 OBD_ALLOC(set, sizeof(*set)); 370 if (set == NULL) 371 return -ENOMEM; 372 lov_init_set(set); 373 374 set->set_exp = exp; 375 set->set_oi = oinfo; 376 set->set_oi->oi_md = lsm; 377 set->set_oi->oi_oa = src_oa; 378 set->set_oti = oti; 379 if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE) 380 set->set_cookies = oti->oti_logcookies; 381 382 for (i = 0; i < lsm->lsm_stripe_count; i++) { 383 struct lov_oinfo *loi; 384 struct lov_request *req; 385 386 loi = lsm->lsm_oinfo[i]; 387 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) { 388 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); 389 continue; 390 } 391 392 OBD_ALLOC(req, sizeof(*req)); 393 if (req == NULL) { 394 rc = -ENOMEM; 395 goto out_set; 396 } 397 398 req->rq_stripe = i; 399 req->rq_idx = loi->loi_ost_idx; 400 401 OBDO_ALLOC(req->rq_oi.oi_oa); 402 if (req->rq_oi.oi_oa == NULL) { 403 OBD_FREE(req, sizeof(*req)); 404 rc = -ENOMEM; 405 goto out_set; 406 } 407 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa)); 408 req->rq_oi.oi_oa->o_oi = loi->loi_oi; 409 lov_set_add_req(req, set); 410 } 411 if (!set->set_count) { 412 rc = -EIO; 413 goto out_set; 414 } 415 *reqset = set; 416 return rc; 417out_set: 418 lov_fini_destroy_set(set); 419 return rc; 420} 421 422int lov_fini_setattr_set(struct lov_request_set *set) 423{ 424 int rc = 0; 425 426 if (set == NULL) 427 return 0; 428 LASSERT(set->set_exp); 429 if (atomic_read(&set->set_completes)) { 430 rc = common_attr_done(set); 431 /* FIXME update qos data here */ 432 } 433 434 lov_put_reqset(set); 435 return rc; 436} 437 438int lov_update_setattr_set(struct lov_request_set *set, 439 struct lov_request *req, int rc) 440{ 441 struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov; 442 struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md; 443 444 lov_update_set(set, req, rc); 445 446 /* grace error on inactive ost */ 447 if (rc && !(lov->lov_tgts[req->rq_idx] && 448 lov->lov_tgts[req->rq_idx]->ltd_active)) 449 rc = 0; 450 451 if (rc == 0) { 452 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME) 453 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime = 454 req->rq_oi.oi_oa->o_ctime; 455 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME) 456 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime = 457 req->rq_oi.oi_oa->o_mtime; 458 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME) 459 lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime = 460 req->rq_oi.oi_oa->o_atime; 461 } 462 463 return rc; 464} 465 466/* The callback for osc_setattr_async that finalizes a request info when a 467 * response is received. */ 468static int cb_setattr_update(void *cookie, int rc) 469{ 470 struct obd_info *oinfo = cookie; 471 struct lov_request *lovreq; 472 473 lovreq = container_of(oinfo, struct lov_request, rq_oi); 474 return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc); 475} 476 477int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo, 478 struct obd_trans_info *oti, 479 struct lov_request_set **reqset) 480{ 481 struct lov_request_set *set; 482 struct lov_obd *lov = &exp->exp_obd->u.lov; 483 int rc = 0, i; 484 485 OBD_ALLOC(set, sizeof(*set)); 486 if (set == NULL) 487 return -ENOMEM; 488 lov_init_set(set); 489 490 set->set_exp = exp; 491 set->set_oti = oti; 492 set->set_oi = oinfo; 493 if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) 494 set->set_cookies = oti->oti_logcookies; 495 496 for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) { 497 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i]; 498 struct lov_request *req; 499 500 if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) { 501 CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); 502 continue; 503 } 504 505 OBD_ALLOC(req, sizeof(*req)); 506 if (req == NULL) { 507 rc = -ENOMEM; 508 goto out_set; 509 } 510 req->rq_stripe = i; 511 req->rq_idx = loi->loi_ost_idx; 512 513 OBDO_ALLOC(req->rq_oi.oi_oa); 514 if (req->rq_oi.oi_oa == NULL) { 515 OBD_FREE(req, sizeof(*req)); 516 rc = -ENOMEM; 517 goto out_set; 518 } 519 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa, 520 sizeof(*req->rq_oi.oi_oa)); 521 req->rq_oi.oi_oa->o_oi = loi->loi_oi; 522 req->rq_oi.oi_oa->o_stripe_idx = i; 523 req->rq_oi.oi_cb_up = cb_setattr_update; 524 req->rq_oi.oi_capa = oinfo->oi_capa; 525 526 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) { 527 int off = lov_stripe_offset(oinfo->oi_md, 528 oinfo->oi_oa->o_size, i, 529 &req->rq_oi.oi_oa->o_size); 530 531 if (off < 0 && req->rq_oi.oi_oa->o_size) 532 req->rq_oi.oi_oa->o_size--; 533 534 CDEBUG(D_INODE, "stripe %d has size %llu/%llu\n", 535 i, req->rq_oi.oi_oa->o_size, 536 oinfo->oi_oa->o_size); 537 } 538 lov_set_add_req(req, set); 539 } 540 if (!set->set_count) { 541 rc = -EIO; 542 goto out_set; 543 } 544 *reqset = set; 545 return rc; 546out_set: 547 lov_fini_setattr_set(set); 548 return rc; 549} 550 551#define LOV_U64_MAX ((__u64)~0ULL) 552#define LOV_SUM_MAX(tot, add) \ 553 do { \ 554 if ((tot) + (add) < (tot)) \ 555 (tot) = LOV_U64_MAX; \ 556 else \ 557 (tot) += (add); \ 558 } while (0) 559 560int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs, 561 int success) 562{ 563 if (success) { 564 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 565 LOV_MAGIC, 0); 566 if (osfs->os_files != LOV_U64_MAX) 567 lov_do_div64(osfs->os_files, expected_stripes); 568 if (osfs->os_ffree != LOV_U64_MAX) 569 lov_do_div64(osfs->os_ffree, expected_stripes); 570 571 spin_lock(&obd->obd_osfs_lock); 572 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs)); 573 obd->obd_osfs_age = cfs_time_current_64(); 574 spin_unlock(&obd->obd_osfs_lock); 575 return 0; 576 } 577 578 return -EIO; 579} 580 581int lov_fini_statfs_set(struct lov_request_set *set) 582{ 583 int rc = 0; 584 585 if (set == NULL) 586 return 0; 587 588 if (atomic_read(&set->set_completes)) { 589 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs, 590 atomic_read(&set->set_success)); 591 } 592 lov_put_reqset(set); 593 return rc; 594} 595 596void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs, 597 int success) 598{ 599 int shift = 0, quit = 0; 600 __u64 tmp; 601 602 if (success == 0) { 603 memcpy(osfs, lov_sfs, sizeof(*lov_sfs)); 604 } else { 605 if (osfs->os_bsize != lov_sfs->os_bsize) { 606 /* assume all block sizes are always powers of 2 */ 607 /* get the bits difference */ 608 tmp = osfs->os_bsize | lov_sfs->os_bsize; 609 for (shift = 0; shift <= 64; ++shift) { 610 if (tmp & 1) { 611 if (quit) 612 break; 613 else 614 quit = 1; 615 shift = 0; 616 } 617 tmp >>= 1; 618 } 619 } 620 621 if (osfs->os_bsize < lov_sfs->os_bsize) { 622 osfs->os_bsize = lov_sfs->os_bsize; 623 624 osfs->os_bfree >>= shift; 625 osfs->os_bavail >>= shift; 626 osfs->os_blocks >>= shift; 627 } else if (shift != 0) { 628 lov_sfs->os_bfree >>= shift; 629 lov_sfs->os_bavail >>= shift; 630 lov_sfs->os_blocks >>= shift; 631 } 632 osfs->os_bfree += lov_sfs->os_bfree; 633 osfs->os_bavail += lov_sfs->os_bavail; 634 osfs->os_blocks += lov_sfs->os_blocks; 635 /* XXX not sure about this one - depends on policy. 636 * - could be minimum if we always stripe on all OBDs 637 * (but that would be wrong for any other policy, 638 * if one of the OBDs has no more objects left) 639 * - could be sum if we stripe whole objects 640 * - could be average, just to give a nice number 641 * 642 * To give a "reasonable" (if not wholly accurate) 643 * number, we divide the total number of free objects 644 * by expected stripe count (watch out for overflow). 645 */ 646 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files); 647 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree); 648 } 649} 650 651/* The callback for osc_statfs_async that finalizes a request info when a 652 * response is received. */ 653static int cb_statfs_update(void *cookie, int rc) 654{ 655 struct obd_info *oinfo = cookie; 656 struct lov_request *lovreq; 657 struct lov_request_set *set; 658 struct obd_statfs *osfs, *lov_sfs; 659 struct lov_obd *lov; 660 struct lov_tgt_desc *tgt; 661 struct obd_device *lovobd, *tgtobd; 662 int success; 663 664 lovreq = container_of(oinfo, struct lov_request, rq_oi); 665 set = lovreq->rq_rqset; 666 lovobd = set->set_obd; 667 lov = &lovobd->u.lov; 668 osfs = set->set_oi->oi_osfs; 669 lov_sfs = oinfo->oi_osfs; 670 success = atomic_read(&set->set_success); 671 /* XXX: the same is done in lov_update_common_set, however 672 lovset->set_exp is not initialized. */ 673 lov_update_set(set, lovreq, rc); 674 if (rc) 675 goto out; 676 677 obd_getref(lovobd); 678 tgt = lov->lov_tgts[lovreq->rq_idx]; 679 if (!tgt || !tgt->ltd_active) 680 goto out_update; 681 682 tgtobd = class_exp2obd(tgt->ltd_exp); 683 spin_lock(&tgtobd->obd_osfs_lock); 684 memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs)); 685 if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0) 686 tgtobd->obd_osfs_age = cfs_time_current_64(); 687 spin_unlock(&tgtobd->obd_osfs_lock); 688 689out_update: 690 lov_update_statfs(osfs, lov_sfs, success); 691 obd_putref(lovobd); 692 693out: 694 if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD && 695 lov_set_finished(set, 0)) { 696 lov_statfs_interpret(NULL, set, set->set_count != 697 atomic_read(&set->set_success)); 698 } 699 700 return 0; 701} 702 703int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo, 704 struct lov_request_set **reqset) 705{ 706 struct lov_request_set *set; 707 struct lov_obd *lov = &obd->u.lov; 708 int rc = 0, i; 709 710 OBD_ALLOC(set, sizeof(*set)); 711 if (set == NULL) 712 return -ENOMEM; 713 lov_init_set(set); 714 715 set->set_obd = obd; 716 set->set_oi = oinfo; 717 718 /* We only get block data from the OBD */ 719 for (i = 0; i < lov->desc.ld_tgt_count; i++) { 720 struct lov_request *req; 721 722 if (lov->lov_tgts[i] == NULL || 723 (!lov_check_and_wait_active(lov, i) && 724 (oinfo->oi_flags & OBD_STATFS_NODELAY))) { 725 CDEBUG(D_HA, "lov idx %d inactive\n", i); 726 continue; 727 } 728 729 /* skip targets that have been explicitly disabled by the 730 * administrator */ 731 if (!lov->lov_tgts[i]->ltd_exp) { 732 CDEBUG(D_HA, "lov idx %d administratively disabled\n", i); 733 continue; 734 } 735 736 OBD_ALLOC(req, sizeof(*req)); 737 if (req == NULL) { 738 rc = -ENOMEM; 739 goto out_set; 740 } 741 742 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs)); 743 if (req->rq_oi.oi_osfs == NULL) { 744 OBD_FREE(req, sizeof(*req)); 745 rc = -ENOMEM; 746 goto out_set; 747 } 748 749 req->rq_idx = i; 750 req->rq_oi.oi_cb_up = cb_statfs_update; 751 req->rq_oi.oi_flags = oinfo->oi_flags; 752 753 lov_set_add_req(req, set); 754 } 755 if (!set->set_count) { 756 rc = -EIO; 757 goto out_set; 758 } 759 *reqset = set; 760 return rc; 761out_set: 762 lov_fini_statfs_set(set); 763 return rc; 764} 765