1/* 2 * GPL HEADER START 3 * 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License version 2 only, 8 * as published by the Free Software Foundation. 9 * 10 * This program is distributed in the hope that it will be useful, but 11 * WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * General Public License version 2 for more details (a copy is included 14 * in the LICENSE file that accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License 17 * version 2 along with this program; If not, see 18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf 19 * 20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 21 * CA 95054 USA or visit www.sun.com if you need additional information or 22 * have any questions. 23 * 24 * GPL HEADER END 25 */ 26/* 27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. 28 * Use is subject to license terms. 29 * 30 * Copyright (c) 2011, 2013, Intel Corporation. 31 */ 32/* 33 * This file is part of Lustre, http://www.lustre.org/ 34 * Lustre is a trademark of Sun Microsystems, Inc. 35 * 36 * lustre/fid/fid_request.c 37 * 38 * Lustre Sequence Manager 39 * 40 * Author: Yury Umanets <umka@clusterfs.com> 41 */ 42 43#define DEBUG_SUBSYSTEM S_FID 44 45#include "../../include/linux/libcfs/libcfs.h" 46#include <linux/module.h> 47 48#include "../include/obd.h" 49#include "../include/obd_class.h" 50#include "../include/obd_support.h" 51#include "../include/lustre_fid.h" 52/* mdc RPC locks */ 53#include "../include/lustre_mdc.h" 54#include "fid_internal.h" 55 56static int seq_client_rpc(struct lu_client_seq *seq, 57 struct lu_seq_range *output, __u32 opc, 58 const char *opcname) 59{ 60 struct obd_export *exp = seq->lcs_exp; 61 struct ptlrpc_request *req; 62 struct lu_seq_range *out, *in; 63 __u32 *op; 64 unsigned int debug_mask; 65 int rc; 66 67 req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_SEQ_QUERY, 68 LUSTRE_MDS_VERSION, SEQ_QUERY); 69 if (req == NULL) 70 return -ENOMEM; 71 72 /* Init operation code */ 73 op = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_OPC); 74 *op = opc; 75 76 /* Zero out input range, this is not recovery yet. */ 77 in = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_RANGE); 78 range_init(in); 79 80 ptlrpc_request_set_replen(req); 81 82 in->lsr_index = seq->lcs_space.lsr_index; 83 if (seq->lcs_type == LUSTRE_SEQ_METADATA) 84 fld_range_set_mdt(in); 85 else 86 fld_range_set_ost(in); 87 88 if (opc == SEQ_ALLOC_SUPER) { 89 req->rq_request_portal = SEQ_CONTROLLER_PORTAL; 90 req->rq_reply_portal = MDC_REPLY_PORTAL; 91 /* During allocating super sequence for data object, 92 * the current thread might hold the export of MDT0(MDT0 93 * precreating objects on this OST), and it will send the 94 * request to MDT0 here, so we can not keep resending the 95 * request here, otherwise if MDT0 is failed(umounted), 96 * it can not release the export of MDT0 */ 97 if (seq->lcs_type == LUSTRE_SEQ_DATA) 98 req->rq_no_delay = req->rq_no_resend = 1; 99 debug_mask = D_CONSOLE; 100 } else { 101 if (seq->lcs_type == LUSTRE_SEQ_METADATA) 102 req->rq_request_portal = SEQ_METADATA_PORTAL; 103 else 104 req->rq_request_portal = SEQ_DATA_PORTAL; 105 debug_mask = D_INFO; 106 } 107 108 ptlrpc_at_set_req_timeout(req); 109 110 if (seq->lcs_type == LUSTRE_SEQ_METADATA) 111 mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); 112 rc = ptlrpc_queue_wait(req); 113 if (seq->lcs_type == LUSTRE_SEQ_METADATA) 114 mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); 115 if (rc) 116 goto out_req; 117 118 out = req_capsule_server_get(&req->rq_pill, &RMF_SEQ_RANGE); 119 *output = *out; 120 121 if (!range_is_sane(output)) { 122 CERROR("%s: Invalid range received from server: " 123 DRANGE"\n", seq->lcs_name, PRANGE(output)); 124 rc = -EINVAL; 125 goto out_req; 126 } 127 128 if (range_is_exhausted(output)) { 129 CERROR("%s: Range received from server is exhausted: " 130 DRANGE"]\n", seq->lcs_name, PRANGE(output)); 131 rc = -EINVAL; 132 goto out_req; 133 } 134 135 CDEBUG_LIMIT(debug_mask, "%s: Allocated %s-sequence "DRANGE"]\n", 136 seq->lcs_name, opcname, PRANGE(output)); 137 138out_req: 139 ptlrpc_req_finished(req); 140 return rc; 141} 142 143/* Request sequence-controller node to allocate new super-sequence. */ 144int seq_client_alloc_super(struct lu_client_seq *seq, 145 const struct lu_env *env) 146{ 147 int rc; 148 149 mutex_lock(&seq->lcs_mutex); 150 151 if (seq->lcs_srv) { 152 rc = 0; 153 } else { 154 /* Check whether the connection to seq controller has been 155 * setup (lcs_exp != NULL) */ 156 if (seq->lcs_exp == NULL) { 157 mutex_unlock(&seq->lcs_mutex); 158 return -EINPROGRESS; 159 } 160 161 rc = seq_client_rpc(seq, &seq->lcs_space, 162 SEQ_ALLOC_SUPER, "super"); 163 } 164 mutex_unlock(&seq->lcs_mutex); 165 return rc; 166} 167 168/* Request sequence-controller node to allocate new meta-sequence. */ 169static int seq_client_alloc_meta(const struct lu_env *env, 170 struct lu_client_seq *seq) 171{ 172 int rc; 173 174 if (seq->lcs_srv) { 175 rc = 0; 176 } else { 177 do { 178 /* If meta server return -EINPROGRESS or EAGAIN, 179 * it means meta server might not be ready to 180 * allocate super sequence from sequence controller 181 * (MDT0)yet */ 182 rc = seq_client_rpc(seq, &seq->lcs_space, 183 SEQ_ALLOC_META, "meta"); 184 } while (rc == -EINPROGRESS || rc == -EAGAIN); 185 } 186 187 return rc; 188} 189 190/* Allocate new sequence for client. */ 191static int seq_client_alloc_seq(const struct lu_env *env, 192 struct lu_client_seq *seq, u64 *seqnr) 193{ 194 int rc; 195 196 LASSERT(range_is_sane(&seq->lcs_space)); 197 198 if (range_is_exhausted(&seq->lcs_space)) { 199 rc = seq_client_alloc_meta(env, seq); 200 if (rc) { 201 CERROR("%s: Can't allocate new meta-sequence, rc %d\n", 202 seq->lcs_name, rc); 203 return rc; 204 } else { 205 CDEBUG(D_INFO, "%s: New range - "DRANGE"\n", 206 seq->lcs_name, PRANGE(&seq->lcs_space)); 207 } 208 } else { 209 rc = 0; 210 } 211 212 LASSERT(!range_is_exhausted(&seq->lcs_space)); 213 *seqnr = seq->lcs_space.lsr_start; 214 seq->lcs_space.lsr_start += 1; 215 216 CDEBUG(D_INFO, "%s: Allocated sequence [%#llx]\n", seq->lcs_name, 217 *seqnr); 218 219 return rc; 220} 221 222static int seq_fid_alloc_prep(struct lu_client_seq *seq, 223 wait_queue_t *link) 224{ 225 if (seq->lcs_update) { 226 add_wait_queue(&seq->lcs_waitq, link); 227 set_current_state(TASK_UNINTERRUPTIBLE); 228 mutex_unlock(&seq->lcs_mutex); 229 230 schedule(); 231 232 mutex_lock(&seq->lcs_mutex); 233 remove_wait_queue(&seq->lcs_waitq, link); 234 set_current_state(TASK_RUNNING); 235 return -EAGAIN; 236 } 237 ++seq->lcs_update; 238 mutex_unlock(&seq->lcs_mutex); 239 return 0; 240} 241 242static void seq_fid_alloc_fini(struct lu_client_seq *seq) 243{ 244 LASSERT(seq->lcs_update == 1); 245 mutex_lock(&seq->lcs_mutex); 246 --seq->lcs_update; 247 wake_up(&seq->lcs_waitq); 248} 249 250/** 251 * Allocate the whole seq to the caller. 252 **/ 253int seq_client_get_seq(const struct lu_env *env, 254 struct lu_client_seq *seq, u64 *seqnr) 255{ 256 wait_queue_t link; 257 int rc; 258 259 LASSERT(seqnr != NULL); 260 mutex_lock(&seq->lcs_mutex); 261 init_waitqueue_entry(&link, current); 262 263 while (1) { 264 rc = seq_fid_alloc_prep(seq, &link); 265 if (rc == 0) 266 break; 267 } 268 269 rc = seq_client_alloc_seq(env, seq, seqnr); 270 if (rc) { 271 CERROR("%s: Can't allocate new sequence, rc %d\n", 272 seq->lcs_name, rc); 273 seq_fid_alloc_fini(seq); 274 mutex_unlock(&seq->lcs_mutex); 275 return rc; 276 } 277 278 CDEBUG(D_INFO, "%s: allocate sequence [0x%16.16Lx]\n", 279 seq->lcs_name, *seqnr); 280 281 /* Since the caller require the whole seq, 282 * so marked this seq to be used */ 283 if (seq->lcs_type == LUSTRE_SEQ_METADATA) 284 seq->lcs_fid.f_oid = LUSTRE_METADATA_SEQ_MAX_WIDTH; 285 else 286 seq->lcs_fid.f_oid = LUSTRE_DATA_SEQ_MAX_WIDTH; 287 288 seq->lcs_fid.f_seq = *seqnr; 289 seq->lcs_fid.f_ver = 0; 290 /* 291 * Inform caller that sequence switch is performed to allow it 292 * to setup FLD for it. 293 */ 294 seq_fid_alloc_fini(seq); 295 mutex_unlock(&seq->lcs_mutex); 296 297 return rc; 298} 299EXPORT_SYMBOL(seq_client_get_seq); 300 301/* Allocate new fid on passed client @seq and save it to @fid. */ 302int seq_client_alloc_fid(const struct lu_env *env, 303 struct lu_client_seq *seq, struct lu_fid *fid) 304{ 305 wait_queue_t link; 306 int rc; 307 308 LASSERT(seq != NULL); 309 LASSERT(fid != NULL); 310 311 init_waitqueue_entry(&link, current); 312 mutex_lock(&seq->lcs_mutex); 313 314 if (OBD_FAIL_CHECK(OBD_FAIL_SEQ_EXHAUST)) 315 seq->lcs_fid.f_oid = seq->lcs_width; 316 317 while (1) { 318 u64 seqnr; 319 320 if (!fid_is_zero(&seq->lcs_fid) && 321 fid_oid(&seq->lcs_fid) < seq->lcs_width) { 322 /* Just bump last allocated fid and return to caller. */ 323 seq->lcs_fid.f_oid += 1; 324 rc = 0; 325 break; 326 } 327 328 rc = seq_fid_alloc_prep(seq, &link); 329 if (rc) 330 continue; 331 332 rc = seq_client_alloc_seq(env, seq, &seqnr); 333 if (rc) { 334 CERROR("%s: Can't allocate new sequence, rc %d\n", 335 seq->lcs_name, rc); 336 seq_fid_alloc_fini(seq); 337 mutex_unlock(&seq->lcs_mutex); 338 return rc; 339 } 340 341 CDEBUG(D_INFO, "%s: Switch to sequence [0x%16.16Lx]\n", 342 seq->lcs_name, seqnr); 343 344 seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID; 345 seq->lcs_fid.f_seq = seqnr; 346 seq->lcs_fid.f_ver = 0; 347 348 /* 349 * Inform caller that sequence switch is performed to allow it 350 * to setup FLD for it. 351 */ 352 rc = 1; 353 354 seq_fid_alloc_fini(seq); 355 break; 356 } 357 358 *fid = seq->lcs_fid; 359 mutex_unlock(&seq->lcs_mutex); 360 361 CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name, PFID(fid)); 362 return rc; 363} 364EXPORT_SYMBOL(seq_client_alloc_fid); 365 366/* 367 * Finish the current sequence due to disconnect. 368 * See mdc_import_event() 369 */ 370void seq_client_flush(struct lu_client_seq *seq) 371{ 372 wait_queue_t link; 373 374 LASSERT(seq != NULL); 375 init_waitqueue_entry(&link, current); 376 mutex_lock(&seq->lcs_mutex); 377 378 while (seq->lcs_update) { 379 add_wait_queue(&seq->lcs_waitq, &link); 380 set_current_state(TASK_UNINTERRUPTIBLE); 381 mutex_unlock(&seq->lcs_mutex); 382 383 schedule(); 384 385 mutex_lock(&seq->lcs_mutex); 386 remove_wait_queue(&seq->lcs_waitq, &link); 387 set_current_state(TASK_RUNNING); 388 } 389 390 fid_zero(&seq->lcs_fid); 391 /** 392 * this id shld not be used for seq range allocation. 393 * set to -1 for dgb check. 394 */ 395 396 seq->lcs_space.lsr_index = -1; 397 398 range_init(&seq->lcs_space); 399 mutex_unlock(&seq->lcs_mutex); 400} 401EXPORT_SYMBOL(seq_client_flush); 402 403static void seq_client_proc_fini(struct lu_client_seq *seq) 404{ 405#if defined (CONFIG_PROC_FS) 406 if (seq->lcs_proc_dir) { 407 if (!IS_ERR(seq->lcs_proc_dir)) 408 lprocfs_remove(&seq->lcs_proc_dir); 409 seq->lcs_proc_dir = NULL; 410 } 411#endif /* CONFIG_PROC_FS */ 412} 413 414static int seq_client_proc_init(struct lu_client_seq *seq) 415{ 416#if defined (CONFIG_PROC_FS) 417 int rc; 418 419 seq->lcs_proc_dir = lprocfs_register(seq->lcs_name, 420 seq_type_proc_dir, 421 NULL, NULL); 422 423 if (IS_ERR(seq->lcs_proc_dir)) { 424 CERROR("%s: LProcFS failed in seq-init\n", 425 seq->lcs_name); 426 rc = PTR_ERR(seq->lcs_proc_dir); 427 return rc; 428 } 429 430 rc = lprocfs_add_vars(seq->lcs_proc_dir, 431 seq_client_proc_list, seq); 432 if (rc) { 433 CERROR("%s: Can't init sequence manager proc, rc %d\n", 434 seq->lcs_name, rc); 435 goto out_cleanup; 436 } 437 438 return 0; 439 440out_cleanup: 441 seq_client_proc_fini(seq); 442 return rc; 443 444#else /* CONFIG_PROC_FS */ 445 return 0; 446#endif 447} 448 449int seq_client_init(struct lu_client_seq *seq, 450 struct obd_export *exp, 451 enum lu_cli_type type, 452 const char *prefix, 453 struct lu_server_seq *srv) 454{ 455 int rc; 456 457 LASSERT(seq != NULL); 458 LASSERT(prefix != NULL); 459 460 seq->lcs_srv = srv; 461 seq->lcs_type = type; 462 463 mutex_init(&seq->lcs_mutex); 464 if (type == LUSTRE_SEQ_METADATA) 465 seq->lcs_width = LUSTRE_METADATA_SEQ_MAX_WIDTH; 466 else 467 seq->lcs_width = LUSTRE_DATA_SEQ_MAX_WIDTH; 468 469 init_waitqueue_head(&seq->lcs_waitq); 470 /* Make sure that things are clear before work is started. */ 471 seq_client_flush(seq); 472 473 if (exp != NULL) 474 seq->lcs_exp = class_export_get(exp); 475 else if (type == LUSTRE_SEQ_METADATA) 476 LASSERT(seq->lcs_srv != NULL); 477 478 snprintf(seq->lcs_name, sizeof(seq->lcs_name), 479 "cli-%s", prefix); 480 481 rc = seq_client_proc_init(seq); 482 if (rc) 483 seq_client_fini(seq); 484 return rc; 485} 486EXPORT_SYMBOL(seq_client_init); 487 488void seq_client_fini(struct lu_client_seq *seq) 489{ 490 seq_client_proc_fini(seq); 491 492 if (seq->lcs_exp != NULL) { 493 class_export_put(seq->lcs_exp); 494 seq->lcs_exp = NULL; 495 } 496 497 seq->lcs_srv = NULL; 498} 499EXPORT_SYMBOL(seq_client_fini); 500 501int client_fid_init(struct obd_device *obd, 502 struct obd_export *exp, enum lu_cli_type type) 503{ 504 struct client_obd *cli = &obd->u.cli; 505 char *prefix; 506 int rc; 507 508 OBD_ALLOC_PTR(cli->cl_seq); 509 if (cli->cl_seq == NULL) 510 return -ENOMEM; 511 512 OBD_ALLOC(prefix, MAX_OBD_NAME + 5); 513 if (prefix == NULL) { 514 rc = -ENOMEM; 515 goto out_free_seq; 516 } 517 518 snprintf(prefix, MAX_OBD_NAME + 5, "cli-%s", obd->obd_name); 519 520 /* Init client side sequence-manager */ 521 rc = seq_client_init(cli->cl_seq, exp, type, prefix, NULL); 522 OBD_FREE(prefix, MAX_OBD_NAME + 5); 523 if (rc) 524 goto out_free_seq; 525 526 return rc; 527out_free_seq: 528 OBD_FREE_PTR(cli->cl_seq); 529 cli->cl_seq = NULL; 530 return rc; 531} 532EXPORT_SYMBOL(client_fid_init); 533 534int client_fid_fini(struct obd_device *obd) 535{ 536 struct client_obd *cli = &obd->u.cli; 537 538 if (cli->cl_seq != NULL) { 539 seq_client_fini(cli->cl_seq); 540 OBD_FREE_PTR(cli->cl_seq); 541 cli->cl_seq = NULL; 542 } 543 544 return 0; 545} 546EXPORT_SYMBOL(client_fid_fini); 547 548struct proc_dir_entry *seq_type_proc_dir; 549 550static int __init fid_mod_init(void) 551{ 552 seq_type_proc_dir = lprocfs_register(LUSTRE_SEQ_NAME, 553 proc_lustre_root, 554 NULL, NULL); 555 return PTR_ERR_OR_ZERO(seq_type_proc_dir); 556} 557 558static void __exit fid_mod_exit(void) 559{ 560 if (seq_type_proc_dir != NULL && !IS_ERR(seq_type_proc_dir)) { 561 lprocfs_remove(&seq_type_proc_dir); 562 seq_type_proc_dir = NULL; 563 } 564} 565 566MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>"); 567MODULE_DESCRIPTION("Lustre FID Module"); 568MODULE_LICENSE("GPL"); 569MODULE_VERSION("0.1.0"); 570 571module_init(fid_mod_init); 572module_exit(fid_mod_exit); 573