[go: nahoru, domu]

1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 */
36
37#define DEBUG_SUBSYSTEM S_LOV
38
39#include "../../include/linux/libcfs/libcfs.h"
40
41#include "../include/obd_class.h"
42#include "../include/lustre/lustre_idl.h"
43#include "lov_internal.h"
44
45static void lov_init_set(struct lov_request_set *set)
46{
47	set->set_count = 0;
48	atomic_set(&set->set_completes, 0);
49	atomic_set(&set->set_success, 0);
50	atomic_set(&set->set_finish_checked, 0);
51	set->set_cookies = NULL;
52	INIT_LIST_HEAD(&set->set_list);
53	atomic_set(&set->set_refcount, 1);
54	init_waitqueue_head(&set->set_waitq);
55	spin_lock_init(&set->set_lock);
56}
57
58void lov_finish_set(struct lov_request_set *set)
59{
60	struct list_head *pos, *n;
61
62	LASSERT(set);
63	list_for_each_safe(pos, n, &set->set_list) {
64		struct lov_request *req = list_entry(pos,
65							 struct lov_request,
66							 rq_link);
67		list_del_init(&req->rq_link);
68
69		if (req->rq_oi.oi_oa)
70			OBDO_FREE(req->rq_oi.oi_oa);
71		if (req->rq_oi.oi_md)
72			OBD_FREE_LARGE(req->rq_oi.oi_md, req->rq_buflen);
73		if (req->rq_oi.oi_osfs)
74			OBD_FREE(req->rq_oi.oi_osfs,
75				 sizeof(*req->rq_oi.oi_osfs));
76		OBD_FREE(req, sizeof(*req));
77	}
78
79	if (set->set_pga) {
80		int len = set->set_oabufs * sizeof(*set->set_pga);
81		OBD_FREE_LARGE(set->set_pga, len);
82	}
83	if (set->set_lockh)
84		lov_llh_put(set->set_lockh);
85
86	OBD_FREE(set, sizeof(*set));
87}
88
89int lov_set_finished(struct lov_request_set *set, int idempotent)
90{
91	int completes = atomic_read(&set->set_completes);
92
93	CDEBUG(D_INFO, "check set %d/%d\n", completes, set->set_count);
94
95	if (completes == set->set_count) {
96		if (idempotent)
97			return 1;
98		if (atomic_inc_return(&set->set_finish_checked) == 1)
99			return 1;
100	}
101	return 0;
102}
103
104void lov_update_set(struct lov_request_set *set,
105		    struct lov_request *req, int rc)
106{
107	req->rq_complete = 1;
108	req->rq_rc = rc;
109
110	atomic_inc(&set->set_completes);
111	if (rc == 0)
112		atomic_inc(&set->set_success);
113
114	wake_up(&set->set_waitq);
115}
116
117int lov_update_common_set(struct lov_request_set *set,
118			  struct lov_request *req, int rc)
119{
120	struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
121
122	lov_update_set(set, req, rc);
123
124	/* grace error on inactive ost */
125	if (rc && !(lov->lov_tgts[req->rq_idx] &&
126		    lov->lov_tgts[req->rq_idx]->ltd_active))
127		rc = 0;
128
129	/* FIXME in raid1 regime, should return 0 */
130	return rc;
131}
132
133void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
134{
135	list_add_tail(&req->rq_link, &set->set_list);
136	set->set_count++;
137	req->rq_rqset = set;
138}
139
140static int lov_check_set(struct lov_obd *lov, int idx)
141{
142	int rc;
143	struct lov_tgt_desc *tgt;
144
145	mutex_lock(&lov->lov_lock);
146	tgt = lov->lov_tgts[idx];
147	rc = !tgt || tgt->ltd_active ||
148		(tgt->ltd_exp &&
149		 class_exp2cliimp(tgt->ltd_exp)->imp_connect_tried);
150	mutex_unlock(&lov->lov_lock);
151
152	return rc;
153}
154
155/* Check if the OSC connection exists and is active.
156 * If the OSC has not yet had a chance to connect to the OST the first time,
157 * wait once for it to connect instead of returning an error.
158 */
159int lov_check_and_wait_active(struct lov_obd *lov, int ost_idx)
160{
161	wait_queue_head_t waitq;
162	struct l_wait_info lwi;
163	struct lov_tgt_desc *tgt;
164	int rc = 0;
165
166	mutex_lock(&lov->lov_lock);
167
168	tgt = lov->lov_tgts[ost_idx];
169
170	if (unlikely(tgt == NULL)) {
171		rc = 0;
172		goto out;
173	}
174
175	if (likely(tgt->ltd_active)) {
176		rc = 1;
177		goto out;
178	}
179
180	if (tgt->ltd_exp && class_exp2cliimp(tgt->ltd_exp)->imp_connect_tried) {
181		rc = 0;
182		goto out;
183	}
184
185	mutex_unlock(&lov->lov_lock);
186
187	init_waitqueue_head(&waitq);
188	lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(obd_timeout),
189				   cfs_time_seconds(1), NULL, NULL);
190
191	rc = l_wait_event(waitq, lov_check_set(lov, ost_idx), &lwi);
192	if (tgt != NULL && tgt->ltd_active)
193		return 1;
194
195	return 0;
196
197out:
198	mutex_unlock(&lov->lov_lock);
199	return rc;
200}
201
202static int common_attr_done(struct lov_request_set *set)
203{
204	struct list_head *pos;
205	struct lov_request *req;
206	struct obdo *tmp_oa;
207	int rc = 0, attrset = 0;
208
209	LASSERT(set->set_oi != NULL);
210
211	if (set->set_oi->oi_oa == NULL)
212		return 0;
213
214	if (!atomic_read(&set->set_success))
215		return -EIO;
216
217	OBDO_ALLOC(tmp_oa);
218	if (tmp_oa == NULL) {
219		rc = -ENOMEM;
220		goto out;
221	}
222
223	list_for_each(pos, &set->set_list) {
224		req = list_entry(pos, struct lov_request, rq_link);
225
226		if (!req->rq_complete || req->rq_rc)
227			continue;
228		if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
229			continue;
230		lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
231				req->rq_oi.oi_oa->o_valid,
232				set->set_oi->oi_md, req->rq_stripe, &attrset);
233	}
234	if (!attrset) {
235		CERROR("No stripes had valid attrs\n");
236		rc = -EIO;
237	}
238	if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
239	    (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
240		/* When we take attributes of some epoch, we require all the
241		 * ost to be active. */
242		CERROR("Not all the stripes had valid attrs\n");
243		rc = -EIO;
244		goto out;
245	}
246
247	tmp_oa->o_oi = set->set_oi->oi_oa->o_oi;
248	memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
249out:
250	if (tmp_oa)
251		OBDO_FREE(tmp_oa);
252	return rc;
253
254}
255
256int lov_fini_getattr_set(struct lov_request_set *set)
257{
258	int rc = 0;
259
260	if (set == NULL)
261		return 0;
262	LASSERT(set->set_exp);
263	if (atomic_read(&set->set_completes))
264		rc = common_attr_done(set);
265
266	lov_put_reqset(set);
267
268	return rc;
269}
270
271/* The callback for osc_getattr_async that finalizes a request info when a
272 * response is received. */
273static int cb_getattr_update(void *cookie, int rc)
274{
275	struct obd_info *oinfo = cookie;
276	struct lov_request *lovreq;
277
278	lovreq = container_of(oinfo, struct lov_request, rq_oi);
279	return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
280}
281
282int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
283			 struct lov_request_set **reqset)
284{
285	struct lov_request_set *set;
286	struct lov_obd *lov = &exp->exp_obd->u.lov;
287	int rc = 0, i;
288
289	OBD_ALLOC(set, sizeof(*set));
290	if (set == NULL)
291		return -ENOMEM;
292	lov_init_set(set);
293
294	set->set_exp = exp;
295	set->set_oi = oinfo;
296
297	for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
298		struct lov_oinfo *loi;
299		struct lov_request *req;
300
301		loi = oinfo->oi_md->lsm_oinfo[i];
302		if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
303			CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
304			if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH) {
305				/* SOM requires all the OSTs to be active. */
306				rc = -EIO;
307				goto out_set;
308			}
309			continue;
310		}
311
312		OBD_ALLOC(req, sizeof(*req));
313		if (req == NULL) {
314			rc = -ENOMEM;
315			goto out_set;
316		}
317
318		req->rq_stripe = i;
319		req->rq_idx = loi->loi_ost_idx;
320
321		OBDO_ALLOC(req->rq_oi.oi_oa);
322		if (req->rq_oi.oi_oa == NULL) {
323			OBD_FREE(req, sizeof(*req));
324			rc = -ENOMEM;
325			goto out_set;
326		}
327		memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
328		       sizeof(*req->rq_oi.oi_oa));
329		req->rq_oi.oi_oa->o_oi = loi->loi_oi;
330		req->rq_oi.oi_cb_up = cb_getattr_update;
331		req->rq_oi.oi_capa = oinfo->oi_capa;
332
333		lov_set_add_req(req, set);
334	}
335	if (!set->set_count) {
336		rc = -EIO;
337		goto out_set;
338	}
339	*reqset = set;
340	return rc;
341out_set:
342	lov_fini_getattr_set(set);
343	return rc;
344}
345
346int lov_fini_destroy_set(struct lov_request_set *set)
347{
348	if (set == NULL)
349		return 0;
350	LASSERT(set->set_exp);
351	if (atomic_read(&set->set_completes)) {
352		/* FIXME update qos data here */
353	}
354
355	lov_put_reqset(set);
356
357	return 0;
358}
359
360int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
361			 struct obdo *src_oa, struct lov_stripe_md *lsm,
362			 struct obd_trans_info *oti,
363			 struct lov_request_set **reqset)
364{
365	struct lov_request_set *set;
366	struct lov_obd *lov = &exp->exp_obd->u.lov;
367	int rc = 0, i;
368
369	OBD_ALLOC(set, sizeof(*set));
370	if (set == NULL)
371		return -ENOMEM;
372	lov_init_set(set);
373
374	set->set_exp = exp;
375	set->set_oi = oinfo;
376	set->set_oi->oi_md = lsm;
377	set->set_oi->oi_oa = src_oa;
378	set->set_oti = oti;
379	if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
380		set->set_cookies = oti->oti_logcookies;
381
382	for (i = 0; i < lsm->lsm_stripe_count; i++) {
383		struct lov_oinfo *loi;
384		struct lov_request *req;
385
386		loi = lsm->lsm_oinfo[i];
387		if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
388			CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
389			continue;
390		}
391
392		OBD_ALLOC(req, sizeof(*req));
393		if (req == NULL) {
394			rc = -ENOMEM;
395			goto out_set;
396		}
397
398		req->rq_stripe = i;
399		req->rq_idx = loi->loi_ost_idx;
400
401		OBDO_ALLOC(req->rq_oi.oi_oa);
402		if (req->rq_oi.oi_oa == NULL) {
403			OBD_FREE(req, sizeof(*req));
404			rc = -ENOMEM;
405			goto out_set;
406		}
407		memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
408		req->rq_oi.oi_oa->o_oi = loi->loi_oi;
409		lov_set_add_req(req, set);
410	}
411	if (!set->set_count) {
412		rc = -EIO;
413		goto out_set;
414	}
415	*reqset = set;
416	return rc;
417out_set:
418	lov_fini_destroy_set(set);
419	return rc;
420}
421
422int lov_fini_setattr_set(struct lov_request_set *set)
423{
424	int rc = 0;
425
426	if (set == NULL)
427		return 0;
428	LASSERT(set->set_exp);
429	if (atomic_read(&set->set_completes)) {
430		rc = common_attr_done(set);
431		/* FIXME update qos data here */
432	}
433
434	lov_put_reqset(set);
435	return rc;
436}
437
438int lov_update_setattr_set(struct lov_request_set *set,
439			   struct lov_request *req, int rc)
440{
441	struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
442	struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
443
444	lov_update_set(set, req, rc);
445
446	/* grace error on inactive ost */
447	if (rc && !(lov->lov_tgts[req->rq_idx] &&
448		    lov->lov_tgts[req->rq_idx]->ltd_active))
449		rc = 0;
450
451	if (rc == 0) {
452		if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
453			lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
454				req->rq_oi.oi_oa->o_ctime;
455		if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
456			lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
457				req->rq_oi.oi_oa->o_mtime;
458		if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
459			lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
460				req->rq_oi.oi_oa->o_atime;
461	}
462
463	return rc;
464}
465
466/* The callback for osc_setattr_async that finalizes a request info when a
467 * response is received. */
468static int cb_setattr_update(void *cookie, int rc)
469{
470	struct obd_info *oinfo = cookie;
471	struct lov_request *lovreq;
472
473	lovreq = container_of(oinfo, struct lov_request, rq_oi);
474	return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
475}
476
477int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
478			 struct obd_trans_info *oti,
479			 struct lov_request_set **reqset)
480{
481	struct lov_request_set *set;
482	struct lov_obd *lov = &exp->exp_obd->u.lov;
483	int rc = 0, i;
484
485	OBD_ALLOC(set, sizeof(*set));
486	if (set == NULL)
487		return -ENOMEM;
488	lov_init_set(set);
489
490	set->set_exp = exp;
491	set->set_oti = oti;
492	set->set_oi = oinfo;
493	if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
494		set->set_cookies = oti->oti_logcookies;
495
496	for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
497		struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
498		struct lov_request *req;
499
500		if (!lov_check_and_wait_active(lov, loi->loi_ost_idx)) {
501			CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
502			continue;
503		}
504
505		OBD_ALLOC(req, sizeof(*req));
506		if (req == NULL) {
507			rc = -ENOMEM;
508			goto out_set;
509		}
510		req->rq_stripe = i;
511		req->rq_idx = loi->loi_ost_idx;
512
513		OBDO_ALLOC(req->rq_oi.oi_oa);
514		if (req->rq_oi.oi_oa == NULL) {
515			OBD_FREE(req, sizeof(*req));
516			rc = -ENOMEM;
517			goto out_set;
518		}
519		memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
520		       sizeof(*req->rq_oi.oi_oa));
521		req->rq_oi.oi_oa->o_oi = loi->loi_oi;
522		req->rq_oi.oi_oa->o_stripe_idx = i;
523		req->rq_oi.oi_cb_up = cb_setattr_update;
524		req->rq_oi.oi_capa = oinfo->oi_capa;
525
526		if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
527			int off = lov_stripe_offset(oinfo->oi_md,
528						    oinfo->oi_oa->o_size, i,
529						    &req->rq_oi.oi_oa->o_size);
530
531			if (off < 0 && req->rq_oi.oi_oa->o_size)
532				req->rq_oi.oi_oa->o_size--;
533
534			CDEBUG(D_INODE, "stripe %d has size %llu/%llu\n",
535			       i, req->rq_oi.oi_oa->o_size,
536			       oinfo->oi_oa->o_size);
537		}
538		lov_set_add_req(req, set);
539	}
540	if (!set->set_count) {
541		rc = -EIO;
542		goto out_set;
543	}
544	*reqset = set;
545	return rc;
546out_set:
547	lov_fini_setattr_set(set);
548	return rc;
549}
550
551#define LOV_U64_MAX ((__u64)~0ULL)
552#define LOV_SUM_MAX(tot, add)					   \
553	do {							    \
554		if ((tot) + (add) < (tot))			      \
555			(tot) = LOV_U64_MAX;			    \
556		else						    \
557			(tot) += (add);				 \
558	} while (0)
559
560int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,
561		    int success)
562{
563	if (success) {
564		__u32 expected_stripes = lov_get_stripecnt(&obd->u.lov,
565							   LOV_MAGIC, 0);
566		if (osfs->os_files != LOV_U64_MAX)
567			lov_do_div64(osfs->os_files, expected_stripes);
568		if (osfs->os_ffree != LOV_U64_MAX)
569			lov_do_div64(osfs->os_ffree, expected_stripes);
570
571		spin_lock(&obd->obd_osfs_lock);
572		memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
573		obd->obd_osfs_age = cfs_time_current_64();
574		spin_unlock(&obd->obd_osfs_lock);
575		return 0;
576	}
577
578	return -EIO;
579}
580
581int lov_fini_statfs_set(struct lov_request_set *set)
582{
583	int rc = 0;
584
585	if (set == NULL)
586		return 0;
587
588	if (atomic_read(&set->set_completes)) {
589		rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
590				     atomic_read(&set->set_success));
591	}
592	lov_put_reqset(set);
593	return rc;
594}
595
596void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
597		       int success)
598{
599	int shift = 0, quit = 0;
600	__u64 tmp;
601
602	if (success == 0) {
603		memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
604	} else {
605		if (osfs->os_bsize != lov_sfs->os_bsize) {
606			/* assume all block sizes are always powers of 2 */
607			/* get the bits difference */
608			tmp = osfs->os_bsize | lov_sfs->os_bsize;
609			for (shift = 0; shift <= 64; ++shift) {
610				if (tmp & 1) {
611					if (quit)
612						break;
613					else
614						quit = 1;
615					shift = 0;
616				}
617				tmp >>= 1;
618			}
619		}
620
621		if (osfs->os_bsize < lov_sfs->os_bsize) {
622			osfs->os_bsize = lov_sfs->os_bsize;
623
624			osfs->os_bfree  >>= shift;
625			osfs->os_bavail >>= shift;
626			osfs->os_blocks >>= shift;
627		} else if (shift != 0) {
628			lov_sfs->os_bfree  >>= shift;
629			lov_sfs->os_bavail >>= shift;
630			lov_sfs->os_blocks >>= shift;
631		}
632		osfs->os_bfree += lov_sfs->os_bfree;
633		osfs->os_bavail += lov_sfs->os_bavail;
634		osfs->os_blocks += lov_sfs->os_blocks;
635		/* XXX not sure about this one - depends on policy.
636		 *   - could be minimum if we always stripe on all OBDs
637		 *     (but that would be wrong for any other policy,
638		 *     if one of the OBDs has no more objects left)
639		 *   - could be sum if we stripe whole objects
640		 *   - could be average, just to give a nice number
641		 *
642		 * To give a "reasonable" (if not wholly accurate)
643		 * number, we divide the total number of free objects
644		 * by expected stripe count (watch out for overflow).
645		 */
646		LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
647		LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
648	}
649}
650
651/* The callback for osc_statfs_async that finalizes a request info when a
652 * response is received. */
653static int cb_statfs_update(void *cookie, int rc)
654{
655	struct obd_info *oinfo = cookie;
656	struct lov_request *lovreq;
657	struct lov_request_set *set;
658	struct obd_statfs *osfs, *lov_sfs;
659	struct lov_obd *lov;
660	struct lov_tgt_desc *tgt;
661	struct obd_device *lovobd, *tgtobd;
662	int success;
663
664	lovreq = container_of(oinfo, struct lov_request, rq_oi);
665	set = lovreq->rq_rqset;
666	lovobd = set->set_obd;
667	lov = &lovobd->u.lov;
668	osfs = set->set_oi->oi_osfs;
669	lov_sfs = oinfo->oi_osfs;
670	success = atomic_read(&set->set_success);
671	/* XXX: the same is done in lov_update_common_set, however
672	   lovset->set_exp is not initialized. */
673	lov_update_set(set, lovreq, rc);
674	if (rc)
675		goto out;
676
677	obd_getref(lovobd);
678	tgt = lov->lov_tgts[lovreq->rq_idx];
679	if (!tgt || !tgt->ltd_active)
680		goto out_update;
681
682	tgtobd = class_exp2obd(tgt->ltd_exp);
683	spin_lock(&tgtobd->obd_osfs_lock);
684	memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
685	if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
686		tgtobd->obd_osfs_age = cfs_time_current_64();
687	spin_unlock(&tgtobd->obd_osfs_lock);
688
689out_update:
690	lov_update_statfs(osfs, lov_sfs, success);
691	obd_putref(lovobd);
692
693out:
694	if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
695	    lov_set_finished(set, 0)) {
696		lov_statfs_interpret(NULL, set, set->set_count !=
697				     atomic_read(&set->set_success));
698	}
699
700	return 0;
701}
702
703int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
704			struct lov_request_set **reqset)
705{
706	struct lov_request_set *set;
707	struct lov_obd *lov = &obd->u.lov;
708	int rc = 0, i;
709
710	OBD_ALLOC(set, sizeof(*set));
711	if (set == NULL)
712		return -ENOMEM;
713	lov_init_set(set);
714
715	set->set_obd = obd;
716	set->set_oi = oinfo;
717
718	/* We only get block data from the OBD */
719	for (i = 0; i < lov->desc.ld_tgt_count; i++) {
720		struct lov_request *req;
721
722		if (lov->lov_tgts[i] == NULL ||
723		    (!lov_check_and_wait_active(lov, i) &&
724		     (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
725			CDEBUG(D_HA, "lov idx %d inactive\n", i);
726			continue;
727		}
728
729		/* skip targets that have been explicitly disabled by the
730		 * administrator */
731		if (!lov->lov_tgts[i]->ltd_exp) {
732			CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
733			continue;
734		}
735
736		OBD_ALLOC(req, sizeof(*req));
737		if (req == NULL) {
738			rc = -ENOMEM;
739			goto out_set;
740		}
741
742		OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
743		if (req->rq_oi.oi_osfs == NULL) {
744			OBD_FREE(req, sizeof(*req));
745			rc = -ENOMEM;
746			goto out_set;
747		}
748
749		req->rq_idx = i;
750		req->rq_oi.oi_cb_up = cb_statfs_update;
751		req->rq_oi.oi_flags = oinfo->oi_flags;
752
753		lov_set_add_req(req, set);
754	}
755	if (!set->set_count) {
756		rc = -EIO;
757		goto out_set;
758	}
759	*reqset = set;
760	return rc;
761out_set:
762	lov_fini_statfs_set(set);
763	return rc;
764}
765