fld_request.c revision e62e5d9251bf8efd08318e78bcaca86fde228383
1/* 2 * GPL HEADER START 3 * 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License version 2 only, 8 * as published by the Free Software Foundation. 9 * 10 * This program is distributed in the hope that it will be useful, but 11 * WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * General Public License version 2 for more details (a copy is included 14 * in the LICENSE file that accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License 17 * version 2 along with this program; If not, see 18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf 19 * 20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 21 * CA 95054 USA or visit www.sun.com if you need additional information or 22 * have any questions. 23 * 24 * GPL HEADER END 25 */ 26/* 27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. 28 * Use is subject to license terms. 29 * 30 * Copyright (c) 2011, 2013, Intel Corporation. 31 */ 32/* 33 * This file is part of Lustre, http://www.lustre.org/ 34 * Lustre is a trademark of Sun Microsystems, Inc. 35 * 36 * lustre/fld/fld_request.c 37 * 38 * FLD (Fids Location Database) 39 * 40 * Author: Yury Umanets <umka@clusterfs.com> 41 */ 42 43#define DEBUG_SUBSYSTEM S_FLD 44 45# include <linux/libcfs/libcfs.h> 46# include <linux/module.h> 47# include <linux/jbd.h> 48# include <asm/div64.h> 49 50#include <obd.h> 51#include <obd_class.h> 52#include <lustre_ver.h> 53#include <obd_support.h> 54#include <lprocfs_status.h> 55 56#include <dt_object.h> 57#include <md_object.h> 58#include <lustre_req_layout.h> 59#include <lustre_fld.h> 60#include <lustre_mdc.h> 61#include "fld_internal.h" 62 63struct lu_context_key fld_thread_key; 64 65/* TODO: these 3 functions are copies of flow-control code from mdc_lib.c 66 * It should be common thing. The same about mdc RPC lock */ 67static int fld_req_avail(struct client_obd *cli, struct mdc_cache_waiter *mcw) 68{ 69 int rc; 70 ENTRY; 71 client_obd_list_lock(&cli->cl_loi_list_lock); 72 rc = list_empty(&mcw->mcw_entry); 73 client_obd_list_unlock(&cli->cl_loi_list_lock); 74 RETURN(rc); 75}; 76 77static void fld_enter_request(struct client_obd *cli) 78{ 79 struct mdc_cache_waiter mcw; 80 struct l_wait_info lwi = { 0 }; 81 82 client_obd_list_lock(&cli->cl_loi_list_lock); 83 if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) { 84 list_add_tail(&mcw.mcw_entry, &cli->cl_cache_waiters); 85 init_waitqueue_head(&mcw.mcw_waitq); 86 client_obd_list_unlock(&cli->cl_loi_list_lock); 87 l_wait_event(mcw.mcw_waitq, fld_req_avail(cli, &mcw), &lwi); 88 } else { 89 cli->cl_r_in_flight++; 90 client_obd_list_unlock(&cli->cl_loi_list_lock); 91 } 92} 93 94static void fld_exit_request(struct client_obd *cli) 95{ 96 struct list_head *l, *tmp; 97 struct mdc_cache_waiter *mcw; 98 99 client_obd_list_lock(&cli->cl_loi_list_lock); 100 cli->cl_r_in_flight--; 101 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) { 102 103 if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) { 104 /* No free request slots anymore */ 105 break; 106 } 107 108 mcw = list_entry(l, struct mdc_cache_waiter, mcw_entry); 109 list_del_init(&mcw->mcw_entry); 110 cli->cl_r_in_flight++; 111 wake_up(&mcw->mcw_waitq); 112 } 113 client_obd_list_unlock(&cli->cl_loi_list_lock); 114} 115 116static int fld_rrb_hash(struct lu_client_fld *fld, 117 seqno_t seq) 118{ 119 LASSERT(fld->lcf_count > 0); 120 return do_div(seq, fld->lcf_count); 121} 122 123static struct lu_fld_target * 124fld_rrb_scan(struct lu_client_fld *fld, seqno_t seq) 125{ 126 struct lu_fld_target *target; 127 int hash; 128 ENTRY; 129 130 /* Because almost all of special sequence located in MDT0, 131 * it should go to index 0 directly, instead of calculating 132 * hash again, and also if other MDTs is not being connected, 133 * the fld lookup requests(for seq on MDT0) should not be 134 * blocked because of other MDTs */ 135 if (fid_seq_is_norm(seq)) 136 hash = fld_rrb_hash(fld, seq); 137 else 138 hash = 0; 139 140 list_for_each_entry(target, &fld->lcf_targets, ft_chain) { 141 if (target->ft_idx == hash) 142 RETURN(target); 143 } 144 145 CERROR("%s: Can't find target by hash %d (seq "LPX64"). " 146 "Targets (%d):\n", fld->lcf_name, hash, seq, 147 fld->lcf_count); 148 149 list_for_each_entry(target, &fld->lcf_targets, ft_chain) { 150 const char *srv_name = target->ft_srv != NULL ? 151 target->ft_srv->lsf_name : "<null>"; 152 const char *exp_name = target->ft_exp != NULL ? 153 (char *)target->ft_exp->exp_obd->obd_uuid.uuid : 154 "<null>"; 155 156 CERROR(" exp: 0x%p (%s), srv: 0x%p (%s), idx: "LPU64"\n", 157 target->ft_exp, exp_name, target->ft_srv, 158 srv_name, target->ft_idx); 159 } 160 161 /* 162 * If target is not found, there is logical error anyway, so here is 163 * LBUG() to catch this situation. 164 */ 165 LBUG(); 166 RETURN(NULL); 167} 168 169struct lu_fld_hash fld_hash[] = { 170 { 171 .fh_name = "RRB", 172 .fh_hash_func = fld_rrb_hash, 173 .fh_scan_func = fld_rrb_scan 174 }, 175 { 176 0, 177 } 178}; 179 180static struct lu_fld_target * 181fld_client_get_target(struct lu_client_fld *fld, seqno_t seq) 182{ 183 struct lu_fld_target *target; 184 ENTRY; 185 186 LASSERT(fld->lcf_hash != NULL); 187 188 spin_lock(&fld->lcf_lock); 189 target = fld->lcf_hash->fh_scan_func(fld, seq); 190 spin_unlock(&fld->lcf_lock); 191 192 if (target != NULL) { 193 CDEBUG(D_INFO, "%s: Found target (idx "LPU64 194 ") by seq "LPX64"\n", fld->lcf_name, 195 target->ft_idx, seq); 196 } 197 198 RETURN(target); 199} 200 201/* 202 * Add export to FLD. This is usually done by CMM and LMV as they are main users 203 * of FLD module. 204 */ 205int fld_client_add_target(struct lu_client_fld *fld, 206 struct lu_fld_target *tar) 207{ 208 const char *name; 209 struct lu_fld_target *target, *tmp; 210 ENTRY; 211 212 LASSERT(tar != NULL); 213 name = fld_target_name(tar); 214 LASSERT(name != NULL); 215 LASSERT(tar->ft_srv != NULL || tar->ft_exp != NULL); 216 217 if (fld->lcf_flags != LUSTRE_FLD_INIT) { 218 CERROR("%s: Attempt to add target %s (idx "LPU64") " 219 "on fly - skip it\n", fld->lcf_name, name, 220 tar->ft_idx); 221 RETURN(0); 222 } else { 223 CDEBUG(D_INFO, "%s: Adding target %s (idx " 224 LPU64")\n", fld->lcf_name, name, tar->ft_idx); 225 } 226 227 OBD_ALLOC_PTR(target); 228 if (target == NULL) 229 RETURN(-ENOMEM); 230 231 spin_lock(&fld->lcf_lock); 232 list_for_each_entry(tmp, &fld->lcf_targets, ft_chain) { 233 if (tmp->ft_idx == tar->ft_idx) { 234 spin_unlock(&fld->lcf_lock); 235 OBD_FREE_PTR(target); 236 CERROR("Target %s exists in FLD and known as %s:#"LPU64"\n", 237 name, fld_target_name(tmp), tmp->ft_idx); 238 RETURN(-EEXIST); 239 } 240 } 241 242 target->ft_exp = tar->ft_exp; 243 if (target->ft_exp != NULL) 244 class_export_get(target->ft_exp); 245 target->ft_srv = tar->ft_srv; 246 target->ft_idx = tar->ft_idx; 247 248 list_add_tail(&target->ft_chain, 249 &fld->lcf_targets); 250 251 fld->lcf_count++; 252 spin_unlock(&fld->lcf_lock); 253 254 RETURN(0); 255} 256EXPORT_SYMBOL(fld_client_add_target); 257 258/* Remove export from FLD */ 259int fld_client_del_target(struct lu_client_fld *fld, __u64 idx) 260{ 261 struct lu_fld_target *target, *tmp; 262 ENTRY; 263 264 spin_lock(&fld->lcf_lock); 265 list_for_each_entry_safe(target, tmp, 266 &fld->lcf_targets, ft_chain) { 267 if (target->ft_idx == idx) { 268 fld->lcf_count--; 269 list_del(&target->ft_chain); 270 spin_unlock(&fld->lcf_lock); 271 272 if (target->ft_exp != NULL) 273 class_export_put(target->ft_exp); 274 275 OBD_FREE_PTR(target); 276 RETURN(0); 277 } 278 } 279 spin_unlock(&fld->lcf_lock); 280 RETURN(-ENOENT); 281} 282EXPORT_SYMBOL(fld_client_del_target); 283 284#ifdef LPROCFS 285proc_dir_entry_t *fld_type_proc_dir = NULL; 286 287static int fld_client_proc_init(struct lu_client_fld *fld) 288{ 289 int rc; 290 ENTRY; 291 292 fld->lcf_proc_dir = lprocfs_register(fld->lcf_name, 293 fld_type_proc_dir, 294 NULL, NULL); 295 296 if (IS_ERR(fld->lcf_proc_dir)) { 297 CERROR("%s: LProcFS failed in fld-init\n", 298 fld->lcf_name); 299 rc = PTR_ERR(fld->lcf_proc_dir); 300 RETURN(rc); 301 } 302 303 rc = lprocfs_add_vars(fld->lcf_proc_dir, 304 fld_client_proc_list, fld); 305 if (rc) { 306 CERROR("%s: Can't init FLD proc, rc %d\n", 307 fld->lcf_name, rc); 308 GOTO(out_cleanup, rc); 309 } 310 311 RETURN(0); 312 313out_cleanup: 314 fld_client_proc_fini(fld); 315 return rc; 316} 317 318void fld_client_proc_fini(struct lu_client_fld *fld) 319{ 320 ENTRY; 321 if (fld->lcf_proc_dir) { 322 if (!IS_ERR(fld->lcf_proc_dir)) 323 lprocfs_remove(&fld->lcf_proc_dir); 324 fld->lcf_proc_dir = NULL; 325 } 326 EXIT; 327} 328#else 329static int fld_client_proc_init(struct lu_client_fld *fld) 330{ 331 return 0; 332} 333 334void fld_client_proc_fini(struct lu_client_fld *fld) 335{ 336 return; 337} 338#endif 339 340EXPORT_SYMBOL(fld_client_proc_fini); 341 342static inline int hash_is_sane(int hash) 343{ 344 return (hash >= 0 && hash < ARRAY_SIZE(fld_hash)); 345} 346 347int fld_client_init(struct lu_client_fld *fld, 348 const char *prefix, int hash) 349{ 350 int cache_size, cache_threshold; 351 int rc; 352 ENTRY; 353 354 LASSERT(fld != NULL); 355 356 snprintf(fld->lcf_name, sizeof(fld->lcf_name), 357 "cli-%s", prefix); 358 359 if (!hash_is_sane(hash)) { 360 CERROR("%s: Wrong hash function %#x\n", 361 fld->lcf_name, hash); 362 RETURN(-EINVAL); 363 } 364 365 fld->lcf_count = 0; 366 spin_lock_init(&fld->lcf_lock); 367 fld->lcf_hash = &fld_hash[hash]; 368 fld->lcf_flags = LUSTRE_FLD_INIT; 369 INIT_LIST_HEAD(&fld->lcf_targets); 370 371 cache_size = FLD_CLIENT_CACHE_SIZE / 372 sizeof(struct fld_cache_entry); 373 374 cache_threshold = cache_size * 375 FLD_CLIENT_CACHE_THRESHOLD / 100; 376 377 fld->lcf_cache = fld_cache_init(fld->lcf_name, 378 cache_size, cache_threshold); 379 if (IS_ERR(fld->lcf_cache)) { 380 rc = PTR_ERR(fld->lcf_cache); 381 fld->lcf_cache = NULL; 382 GOTO(out, rc); 383 } 384 385 rc = fld_client_proc_init(fld); 386 if (rc) 387 GOTO(out, rc); 388 EXIT; 389out: 390 if (rc) 391 fld_client_fini(fld); 392 else 393 CDEBUG(D_INFO, "%s: Using \"%s\" hash\n", 394 fld->lcf_name, fld->lcf_hash->fh_name); 395 return rc; 396} 397EXPORT_SYMBOL(fld_client_init); 398 399void fld_client_fini(struct lu_client_fld *fld) 400{ 401 struct lu_fld_target *target, *tmp; 402 ENTRY; 403 404 spin_lock(&fld->lcf_lock); 405 list_for_each_entry_safe(target, tmp, 406 &fld->lcf_targets, ft_chain) { 407 fld->lcf_count--; 408 list_del(&target->ft_chain); 409 if (target->ft_exp != NULL) 410 class_export_put(target->ft_exp); 411 OBD_FREE_PTR(target); 412 } 413 spin_unlock(&fld->lcf_lock); 414 415 if (fld->lcf_cache != NULL) { 416 if (!IS_ERR(fld->lcf_cache)) 417 fld_cache_fini(fld->lcf_cache); 418 fld->lcf_cache = NULL; 419 } 420 421 EXIT; 422} 423EXPORT_SYMBOL(fld_client_fini); 424 425int fld_client_rpc(struct obd_export *exp, 426 struct lu_seq_range *range, __u32 fld_op) 427{ 428 struct ptlrpc_request *req; 429 struct lu_seq_range *prange; 430 __u32 *op; 431 int rc; 432 struct obd_import *imp; 433 ENTRY; 434 435 LASSERT(exp != NULL); 436 437 imp = class_exp2cliimp(exp); 438 req = ptlrpc_request_alloc_pack(imp, &RQF_FLD_QUERY, LUSTRE_MDS_VERSION, 439 FLD_QUERY); 440 if (req == NULL) 441 RETURN(-ENOMEM); 442 443 op = req_capsule_client_get(&req->rq_pill, &RMF_FLD_OPC); 444 *op = fld_op; 445 446 prange = req_capsule_client_get(&req->rq_pill, &RMF_FLD_MDFLD); 447 *prange = *range; 448 449 ptlrpc_request_set_replen(req); 450 req->rq_request_portal = FLD_REQUEST_PORTAL; 451 ptlrpc_at_set_req_timeout(req); 452 453 if (fld_op == FLD_LOOKUP && 454 imp->imp_connect_flags_orig & OBD_CONNECT_MDS_MDS) 455 req->rq_allow_replay = 1; 456 457 if (fld_op != FLD_LOOKUP) 458 mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); 459 fld_enter_request(&exp->exp_obd->u.cli); 460 rc = ptlrpc_queue_wait(req); 461 fld_exit_request(&exp->exp_obd->u.cli); 462 if (fld_op != FLD_LOOKUP) 463 mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); 464 if (rc) 465 GOTO(out_req, rc); 466 467 prange = req_capsule_server_get(&req->rq_pill, &RMF_FLD_MDFLD); 468 if (prange == NULL) 469 GOTO(out_req, rc = -EFAULT); 470 *range = *prange; 471 EXIT; 472out_req: 473 ptlrpc_req_finished(req); 474 return rc; 475} 476 477int fld_client_lookup(struct lu_client_fld *fld, seqno_t seq, mdsno_t *mds, 478 __u32 flags, const struct lu_env *env) 479{ 480 struct lu_seq_range res = { 0 }; 481 struct lu_fld_target *target; 482 int rc; 483 ENTRY; 484 485 fld->lcf_flags |= LUSTRE_FLD_RUN; 486 487 rc = fld_cache_lookup(fld->lcf_cache, seq, &res); 488 if (rc == 0) { 489 *mds = res.lsr_index; 490 RETURN(0); 491 } 492 493 /* Can not find it in the cache */ 494 target = fld_client_get_target(fld, seq); 495 LASSERT(target != NULL); 496 497 CDEBUG(D_INFO, "%s: Lookup fld entry (seq: "LPX64") on " 498 "target %s (idx "LPU64")\n", fld->lcf_name, seq, 499 fld_target_name(target), target->ft_idx); 500 501 res.lsr_start = seq; 502 fld_range_set_type(&res, flags); 503 rc = fld_client_rpc(target->ft_exp, &res, FLD_LOOKUP); 504 505 if (rc == 0) { 506 *mds = res.lsr_index; 507 508 fld_cache_insert(fld->lcf_cache, &res); 509 } 510 RETURN(rc); 511} 512EXPORT_SYMBOL(fld_client_lookup); 513 514void fld_client_flush(struct lu_client_fld *fld) 515{ 516 fld_cache_flush(fld->lcf_cache); 517} 518EXPORT_SYMBOL(fld_client_flush); 519 520static int __init fld_mod_init(void) 521{ 522 fld_type_proc_dir = lprocfs_register(LUSTRE_FLD_NAME, 523 proc_lustre_root, 524 NULL, NULL); 525 if (IS_ERR(fld_type_proc_dir)) 526 return PTR_ERR(fld_type_proc_dir); 527 528 LU_CONTEXT_KEY_INIT(&fld_thread_key); 529 lu_context_key_register(&fld_thread_key); 530 return 0; 531} 532 533static void __exit fld_mod_exit(void) 534{ 535 lu_context_key_degister(&fld_thread_key); 536 if (fld_type_proc_dir != NULL && !IS_ERR(fld_type_proc_dir)) { 537 lprocfs_remove(&fld_type_proc_dir); 538 fld_type_proc_dir = NULL; 539 } 540} 541 542MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>"); 543MODULE_DESCRIPTION("Lustre FLD"); 544MODULE_LICENSE("GPL"); 545 546module_init(fld_mod_init) 547module_exit(fld_mod_exit) 548