1/* 2 * GPL HEADER START 3 * 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License version 2 only, 8 * as published by the Free Software Foundation. 9 * 10 * This program is distributed in the hope that it will be useful, but 11 * WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * General Public License version 2 for more details (a copy is included 14 * in the LICENSE file that accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License 17 * version 2 along with this program; If not, see 18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf 19 * 20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 21 * CA 95054 USA or visit www.sun.com if you need additional information or 22 * have any questions. 23 * 24 * GPL HEADER END 25 */ 26/* 27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. 28 * Use is subject to license terms. 29 * 30 * Copyright (c) 2011, 2012, Intel Corporation. 31 */ 32/* 33 * This file is part of Lustre, http://www.lustre.org/ 34 * Lustre is a trademark of Sun Microsystems, Inc. 35 * 36 * lustre/mgc/mgc_request.c 37 * 38 * Author: Nathan Rutman <nathan@clusterfs.com> 39 */ 40 41#define DEBUG_SUBSYSTEM S_MGC 42#define D_MGC D_CONFIG /*|D_WARNING*/ 43 44#include <linux/module.h> 45#include "../include/obd_class.h" 46#include "../include/lustre_dlm.h" 47#include "../include/lprocfs_status.h" 48#include "../include/lustre_log.h" 49#include "../include/lustre_disk.h" 50 51#include "mgc_internal.h" 52 53static int mgc_name2resid(char *name, int len, struct ldlm_res_id *res_id, 54 int type) 55{ 56 __u64 resname = 0; 57 58 if (len > sizeof(resname)) { 59 CERROR("name too long: %s\n", name); 60 return -EINVAL; 61 } 62 if (len <= 0) { 63 CERROR("missing name: %s\n", name); 64 return -EINVAL; 65 } 66 memcpy(&resname, name, len); 67 68 /* Always use the same endianness for the resid */ 69 memset(res_id, 0, sizeof(*res_id)); 70 res_id->name[0] = cpu_to_le64(resname); 71 /* XXX: unfortunately, sptlprc and config llog share one lock */ 72 switch (type) { 73 case CONFIG_T_CONFIG: 74 case CONFIG_T_SPTLRPC: 75 resname = 0; 76 break; 77 case CONFIG_T_RECOVER: 78 case CONFIG_T_PARAMS: 79 resname = type; 80 break; 81 default: 82 LBUG(); 83 } 84 res_id->name[1] = cpu_to_le64(resname); 85 CDEBUG(D_MGC, "log %s to resid %#llx/%#llx (%.8s)\n", name, 86 res_id->name[0], res_id->name[1], (char *)&res_id->name[0]); 87 return 0; 88} 89 90int mgc_fsname2resid(char *fsname, struct ldlm_res_id *res_id, int type) 91{ 92 /* fsname is at most 8 chars long, maybe contain "-". 93 * e.g. "lustre", "SUN-000" */ 94 return mgc_name2resid(fsname, strlen(fsname), res_id, type); 95} 96EXPORT_SYMBOL(mgc_fsname2resid); 97 98int mgc_logname2resid(char *logname, struct ldlm_res_id *res_id, int type) 99{ 100 char *name_end; 101 int len; 102 103 /* logname consists of "fsname-nodetype". 104 * e.g. "lustre-MDT0001", "SUN-000-client" 105 * there is an exception: llog "params" */ 106 name_end = strrchr(logname, '-'); 107 if (!name_end) 108 len = strlen(logname); 109 else 110 len = name_end - logname; 111 return mgc_name2resid(logname, len, res_id, type); 112} 113 114/********************** config llog list **********************/ 115static LIST_HEAD(config_llog_list); 116static DEFINE_SPINLOCK(config_list_lock); 117 118/* Take a reference to a config log */ 119static int config_log_get(struct config_llog_data *cld) 120{ 121 atomic_inc(&cld->cld_refcount); 122 CDEBUG(D_INFO, "log %s refs %d\n", cld->cld_logname, 123 atomic_read(&cld->cld_refcount)); 124 return 0; 125} 126 127/* Drop a reference to a config log. When no longer referenced, 128 we can free the config log data */ 129static void config_log_put(struct config_llog_data *cld) 130{ 131 CDEBUG(D_INFO, "log %s refs %d\n", cld->cld_logname, 132 atomic_read(&cld->cld_refcount)); 133 LASSERT(atomic_read(&cld->cld_refcount) > 0); 134 135 /* spinlock to make sure no item with 0 refcount in the list */ 136 if (atomic_dec_and_lock(&cld->cld_refcount, &config_list_lock)) { 137 list_del(&cld->cld_list_chain); 138 spin_unlock(&config_list_lock); 139 140 CDEBUG(D_MGC, "dropping config log %s\n", cld->cld_logname); 141 142 if (cld->cld_recover) 143 config_log_put(cld->cld_recover); 144 if (cld->cld_sptlrpc) 145 config_log_put(cld->cld_sptlrpc); 146 if (cld->cld_params) 147 config_log_put(cld->cld_params); 148 if (cld_is_sptlrpc(cld)) 149 sptlrpc_conf_log_stop(cld->cld_logname); 150 151 class_export_put(cld->cld_mgcexp); 152 OBD_FREE(cld, sizeof(*cld) + strlen(cld->cld_logname) + 1); 153 } 154} 155 156/* Find a config log by name */ 157static 158struct config_llog_data *config_log_find(char *logname, 159 struct config_llog_instance *cfg) 160{ 161 struct config_llog_data *cld; 162 struct config_llog_data *found = NULL; 163 void * instance; 164 165 LASSERT(logname != NULL); 166 167 instance = cfg ? cfg->cfg_instance : NULL; 168 spin_lock(&config_list_lock); 169 list_for_each_entry(cld, &config_llog_list, cld_list_chain) { 170 /* check if instance equals */ 171 if (instance != cld->cld_cfg.cfg_instance) 172 continue; 173 174 /* instance may be NULL, should check name */ 175 if (strcmp(logname, cld->cld_logname) == 0) { 176 found = cld; 177 break; 178 } 179 } 180 if (found) { 181 atomic_inc(&found->cld_refcount); 182 LASSERT(found->cld_stopping == 0 || cld_is_sptlrpc(found) == 0); 183 } 184 spin_unlock(&config_list_lock); 185 return found; 186} 187 188static 189struct config_llog_data *do_config_log_add(struct obd_device *obd, 190 char *logname, 191 int type, 192 struct config_llog_instance *cfg, 193 struct super_block *sb) 194{ 195 struct config_llog_data *cld; 196 int rc; 197 198 CDEBUG(D_MGC, "do adding config log %s:%p\n", logname, 199 cfg ? cfg->cfg_instance : NULL); 200 201 OBD_ALLOC(cld, sizeof(*cld) + strlen(logname) + 1); 202 if (!cld) 203 return ERR_PTR(-ENOMEM); 204 205 strcpy(cld->cld_logname, logname); 206 if (cfg) 207 cld->cld_cfg = *cfg; 208 else 209 cld->cld_cfg.cfg_callback = class_config_llog_handler; 210 mutex_init(&cld->cld_lock); 211 cld->cld_cfg.cfg_last_idx = 0; 212 cld->cld_cfg.cfg_flags = 0; 213 cld->cld_cfg.cfg_sb = sb; 214 cld->cld_type = type; 215 atomic_set(&cld->cld_refcount, 1); 216 217 /* Keep the mgc around until we are done */ 218 cld->cld_mgcexp = class_export_get(obd->obd_self_export); 219 220 if (cld_is_sptlrpc(cld)) { 221 sptlrpc_conf_log_start(logname); 222 cld->cld_cfg.cfg_obdname = obd->obd_name; 223 } 224 225 rc = mgc_logname2resid(logname, &cld->cld_resid, type); 226 227 spin_lock(&config_list_lock); 228 list_add(&cld->cld_list_chain, &config_llog_list); 229 spin_unlock(&config_list_lock); 230 231 if (rc) { 232 config_log_put(cld); 233 return ERR_PTR(rc); 234 } 235 236 if (cld_is_sptlrpc(cld)) { 237 rc = mgc_process_log(obd, cld); 238 if (rc && rc != -ENOENT) 239 CERROR("failed processing sptlrpc log: %d\n", rc); 240 } 241 242 return cld; 243} 244 245static struct config_llog_data *config_recover_log_add(struct obd_device *obd, 246 char *fsname, 247 struct config_llog_instance *cfg, 248 struct super_block *sb) 249{ 250 struct config_llog_instance lcfg = *cfg; 251 struct lustre_sb_info *lsi = s2lsi(sb); 252 struct config_llog_data *cld; 253 char logname[32]; 254 255 if (IS_OST(lsi)) 256 return NULL; 257 258 /* for osp-on-ost, see lustre_start_osp() */ 259 if (IS_MDT(lsi) && lcfg.cfg_instance) 260 return NULL; 261 262 /* we have to use different llog for clients and mdts for cmd 263 * where only clients are notified if one of cmd server restarts */ 264 LASSERT(strlen(fsname) < sizeof(logname) / 2); 265 strcpy(logname, fsname); 266 if (IS_SERVER(lsi)) { /* mdt */ 267 LASSERT(lcfg.cfg_instance == NULL); 268 lcfg.cfg_instance = sb; 269 strcat(logname, "-mdtir"); 270 } else { 271 LASSERT(lcfg.cfg_instance != NULL); 272 strcat(logname, "-cliir"); 273 } 274 275 cld = do_config_log_add(obd, logname, CONFIG_T_RECOVER, &lcfg, sb); 276 return cld; 277} 278 279static struct config_llog_data *config_params_log_add(struct obd_device *obd, 280 struct config_llog_instance *cfg, struct super_block *sb) 281{ 282 struct config_llog_instance lcfg = *cfg; 283 struct config_llog_data *cld; 284 285 lcfg.cfg_instance = sb; 286 287 cld = do_config_log_add(obd, PARAMS_FILENAME, CONFIG_T_PARAMS, 288 &lcfg, sb); 289 290 return cld; 291} 292 293/** Add this log to the list of active logs watched by an MGC. 294 * Active means we're watching for updates. 295 * We have one active log per "mount" - client instance or servername. 296 * Each instance may be at a different point in the log. 297 */ 298static int config_log_add(struct obd_device *obd, char *logname, 299 struct config_llog_instance *cfg, 300 struct super_block *sb) 301{ 302 struct lustre_sb_info *lsi = s2lsi(sb); 303 struct config_llog_data *cld; 304 struct config_llog_data *sptlrpc_cld; 305 struct config_llog_data *params_cld; 306 char seclogname[32]; 307 char *ptr; 308 int rc; 309 310 CDEBUG(D_MGC, "adding config log %s:%p\n", logname, cfg->cfg_instance); 311 312 /* 313 * for each regular log, the depended sptlrpc log name is 314 * <fsname>-sptlrpc. multiple regular logs may share one sptlrpc log. 315 */ 316 ptr = strrchr(logname, '-'); 317 if (ptr == NULL || ptr - logname > 8) { 318 CERROR("logname %s is too long\n", logname); 319 return -EINVAL; 320 } 321 322 memcpy(seclogname, logname, ptr - logname); 323 strcpy(seclogname + (ptr - logname), "-sptlrpc"); 324 325 sptlrpc_cld = config_log_find(seclogname, NULL); 326 if (sptlrpc_cld == NULL) { 327 sptlrpc_cld = do_config_log_add(obd, seclogname, 328 CONFIG_T_SPTLRPC, NULL, NULL); 329 if (IS_ERR(sptlrpc_cld)) { 330 CERROR("can't create sptlrpc log: %s\n", seclogname); 331 rc = PTR_ERR(sptlrpc_cld); 332 goto out_err; 333 } 334 } 335 params_cld = config_params_log_add(obd, cfg, sb); 336 if (IS_ERR(params_cld)) { 337 rc = PTR_ERR(params_cld); 338 CERROR("%s: can't create params log: rc = %d\n", 339 obd->obd_name, rc); 340 goto out_err1; 341 } 342 343 cld = do_config_log_add(obd, logname, CONFIG_T_CONFIG, cfg, sb); 344 if (IS_ERR(cld)) { 345 CERROR("can't create log: %s\n", logname); 346 rc = PTR_ERR(cld); 347 goto out_err2; 348 } 349 350 cld->cld_sptlrpc = sptlrpc_cld; 351 cld->cld_params = params_cld; 352 353 LASSERT(lsi->lsi_lmd); 354 if (!(lsi->lsi_lmd->lmd_flags & LMD_FLG_NOIR)) { 355 struct config_llog_data *recover_cld; 356 *strrchr(seclogname, '-') = 0; 357 recover_cld = config_recover_log_add(obd, seclogname, cfg, sb); 358 if (IS_ERR(recover_cld)) { 359 rc = PTR_ERR(recover_cld); 360 goto out_err3; 361 } 362 cld->cld_recover = recover_cld; 363 } 364 365 return 0; 366 367out_err3: 368 config_log_put(cld); 369 370out_err2: 371 config_log_put(params_cld); 372 373out_err1: 374 config_log_put(sptlrpc_cld); 375 376out_err: 377 return rc; 378} 379 380DEFINE_MUTEX(llog_process_lock); 381 382/** Stop watching for updates on this log. 383 */ 384static int config_log_end(char *logname, struct config_llog_instance *cfg) 385{ 386 struct config_llog_data *cld; 387 struct config_llog_data *cld_sptlrpc = NULL; 388 struct config_llog_data *cld_params = NULL; 389 struct config_llog_data *cld_recover = NULL; 390 int rc = 0; 391 392 cld = config_log_find(logname, cfg); 393 if (cld == NULL) 394 return -ENOENT; 395 396 mutex_lock(&cld->cld_lock); 397 /* 398 * if cld_stopping is set, it means we didn't start the log thus 399 * not owning the start ref. this can happen after previous umount: 400 * the cld still hanging there waiting for lock cancel, and we 401 * remount again but failed in the middle and call log_end without 402 * calling start_log. 403 */ 404 if (unlikely(cld->cld_stopping)) { 405 mutex_unlock(&cld->cld_lock); 406 /* drop the ref from the find */ 407 config_log_put(cld); 408 return rc; 409 } 410 411 cld->cld_stopping = 1; 412 413 cld_recover = cld->cld_recover; 414 cld->cld_recover = NULL; 415 mutex_unlock(&cld->cld_lock); 416 417 if (cld_recover) { 418 mutex_lock(&cld_recover->cld_lock); 419 cld_recover->cld_stopping = 1; 420 mutex_unlock(&cld_recover->cld_lock); 421 config_log_put(cld_recover); 422 } 423 424 spin_lock(&config_list_lock); 425 cld_sptlrpc = cld->cld_sptlrpc; 426 cld->cld_sptlrpc = NULL; 427 cld_params = cld->cld_params; 428 cld->cld_params = NULL; 429 spin_unlock(&config_list_lock); 430 431 if (cld_sptlrpc) 432 config_log_put(cld_sptlrpc); 433 434 if (cld_params) { 435 mutex_lock(&cld_params->cld_lock); 436 cld_params->cld_stopping = 1; 437 mutex_unlock(&cld_params->cld_lock); 438 config_log_put(cld_params); 439 } 440 441 /* drop the ref from the find */ 442 config_log_put(cld); 443 /* drop the start ref */ 444 config_log_put(cld); 445 446 CDEBUG(D_MGC, "end config log %s (%d)\n", logname ? logname : "client", 447 rc); 448 return rc; 449} 450 451#if defined (CONFIG_PROC_FS) 452int lprocfs_mgc_rd_ir_state(struct seq_file *m, void *data) 453{ 454 struct obd_device *obd = data; 455 struct obd_import *imp = obd->u.cli.cl_import; 456 struct obd_connect_data *ocd = &imp->imp_connect_data; 457 struct config_llog_data *cld; 458 459 seq_printf(m, "imperative_recovery: %s\n", 460 OCD_HAS_FLAG(ocd, IMP_RECOV) ? "ENABLED" : "DISABLED"); 461 seq_printf(m, "client_state:\n"); 462 463 spin_lock(&config_list_lock); 464 list_for_each_entry(cld, &config_llog_list, cld_list_chain) { 465 if (cld->cld_recover == NULL) 466 continue; 467 seq_printf(m, " - { client: %s, nidtbl_version: %u }\n", 468 cld->cld_logname, 469 cld->cld_recover->cld_cfg.cfg_last_idx); 470 } 471 spin_unlock(&config_list_lock); 472 473 return 0; 474} 475#endif 476 477/* reenqueue any lost locks */ 478#define RQ_RUNNING 0x1 479#define RQ_NOW 0x2 480#define RQ_LATER 0x4 481#define RQ_STOP 0x8 482static int rq_state = 0; 483static wait_queue_head_t rq_waitq; 484static DECLARE_COMPLETION(rq_exit); 485 486static void do_requeue(struct config_llog_data *cld) 487{ 488 LASSERT(atomic_read(&cld->cld_refcount) > 0); 489 490 /* Do not run mgc_process_log on a disconnected export or an 491 export which is being disconnected. Take the client 492 semaphore to make the check non-racy. */ 493 down_read(&cld->cld_mgcexp->exp_obd->u.cli.cl_sem); 494 if (cld->cld_mgcexp->exp_obd->u.cli.cl_conn_count != 0) { 495 CDEBUG(D_MGC, "updating log %s\n", cld->cld_logname); 496 mgc_process_log(cld->cld_mgcexp->exp_obd, cld); 497 } else { 498 CDEBUG(D_MGC, "disconnecting, won't update log %s\n", 499 cld->cld_logname); 500 } 501 up_read(&cld->cld_mgcexp->exp_obd->u.cli.cl_sem); 502} 503 504/* this timeout represents how many seconds MGC should wait before 505 * requeue config and recover lock to the MGS. We need to randomize this 506 * in order to not flood the MGS. 507 */ 508#define MGC_TIMEOUT_MIN_SECONDS 5 509#define MGC_TIMEOUT_RAND_CENTISEC 0x1ff /* ~500 */ 510 511static int mgc_requeue_thread(void *data) 512{ 513 int rc = 0; 514 515 CDEBUG(D_MGC, "Starting requeue thread\n"); 516 517 /* Keep trying failed locks periodically */ 518 spin_lock(&config_list_lock); 519 rq_state |= RQ_RUNNING; 520 while (1) { 521 struct l_wait_info lwi; 522 struct config_llog_data *cld, *cld_prev; 523 int rand = cfs_rand() & MGC_TIMEOUT_RAND_CENTISEC; 524 int stopped = !!(rq_state & RQ_STOP); 525 int to; 526 527 /* Any new or requeued lostlocks will change the state */ 528 rq_state &= ~(RQ_NOW | RQ_LATER); 529 spin_unlock(&config_list_lock); 530 531 /* Always wait a few seconds to allow the server who 532 caused the lock revocation to finish its setup, plus some 533 random so everyone doesn't try to reconnect at once. */ 534 to = MGC_TIMEOUT_MIN_SECONDS * HZ; 535 to += rand * HZ / 100; /* rand is centi-seconds */ 536 lwi = LWI_TIMEOUT(to, NULL, NULL); 537 l_wait_event(rq_waitq, rq_state & RQ_STOP, &lwi); 538 539 /* 540 * iterate & processing through the list. for each cld, process 541 * its depending sptlrpc cld firstly (if any) and then itself. 542 * 543 * it's guaranteed any item in the list must have 544 * reference > 0; and if cld_lostlock is set, at 545 * least one reference is taken by the previous enqueue. 546 */ 547 cld_prev = NULL; 548 549 spin_lock(&config_list_lock); 550 list_for_each_entry(cld, &config_llog_list, 551 cld_list_chain) { 552 if (!cld->cld_lostlock) 553 continue; 554 555 spin_unlock(&config_list_lock); 556 557 LASSERT(atomic_read(&cld->cld_refcount) > 0); 558 559 /* Whether we enqueued again or not in mgc_process_log, 560 * we're done with the ref from the old enqueue */ 561 if (cld_prev) 562 config_log_put(cld_prev); 563 cld_prev = cld; 564 565 cld->cld_lostlock = 0; 566 if (likely(!stopped)) 567 do_requeue(cld); 568 569 spin_lock(&config_list_lock); 570 } 571 spin_unlock(&config_list_lock); 572 if (cld_prev) 573 config_log_put(cld_prev); 574 575 /* break after scanning the list so that we can drop 576 * refcount to losing lock clds */ 577 if (unlikely(stopped)) { 578 spin_lock(&config_list_lock); 579 break; 580 } 581 582 /* Wait a bit to see if anyone else needs a requeue */ 583 lwi = (struct l_wait_info) { 0 }; 584 l_wait_event(rq_waitq, rq_state & (RQ_NOW | RQ_STOP), 585 &lwi); 586 spin_lock(&config_list_lock); 587 } 588 /* spinlock and while guarantee RQ_NOW and RQ_LATER are not set */ 589 rq_state &= ~RQ_RUNNING; 590 spin_unlock(&config_list_lock); 591 592 complete(&rq_exit); 593 594 CDEBUG(D_MGC, "Ending requeue thread\n"); 595 return rc; 596} 597 598/* Add a cld to the list to requeue. Start the requeue thread if needed. 599 We are responsible for dropping the config log reference from here on out. */ 600static void mgc_requeue_add(struct config_llog_data *cld) 601{ 602 CDEBUG(D_INFO, "log %s: requeue (r=%d sp=%d st=%x)\n", 603 cld->cld_logname, atomic_read(&cld->cld_refcount), 604 cld->cld_stopping, rq_state); 605 LASSERT(atomic_read(&cld->cld_refcount) > 0); 606 607 mutex_lock(&cld->cld_lock); 608 if (cld->cld_stopping || cld->cld_lostlock) { 609 mutex_unlock(&cld->cld_lock); 610 return; 611 } 612 /* this refcount will be released in mgc_requeue_thread. */ 613 config_log_get(cld); 614 cld->cld_lostlock = 1; 615 mutex_unlock(&cld->cld_lock); 616 617 /* Hold lock for rq_state */ 618 spin_lock(&config_list_lock); 619 if (rq_state & RQ_STOP) { 620 spin_unlock(&config_list_lock); 621 cld->cld_lostlock = 0; 622 config_log_put(cld); 623 } else { 624 rq_state |= RQ_NOW; 625 spin_unlock(&config_list_lock); 626 wake_up(&rq_waitq); 627 } 628} 629 630static int mgc_llog_init(const struct lu_env *env, struct obd_device *obd) 631{ 632 struct llog_ctxt *ctxt; 633 int rc; 634 635 /* setup only remote ctxt, the local disk context is switched per each 636 * filesystem during mgc_fs_setup() */ 637 rc = llog_setup(env, obd, &obd->obd_olg, LLOG_CONFIG_REPL_CTXT, obd, 638 &llog_client_ops); 639 if (rc) 640 return rc; 641 642 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT); 643 LASSERT(ctxt); 644 645 llog_initiator_connect(ctxt); 646 llog_ctxt_put(ctxt); 647 648 return 0; 649} 650 651static int mgc_llog_fini(const struct lu_env *env, struct obd_device *obd) 652{ 653 struct llog_ctxt *ctxt; 654 655 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT); 656 if (ctxt) 657 llog_cleanup(env, ctxt); 658 659 return 0; 660} 661 662static atomic_t mgc_count = ATOMIC_INIT(0); 663static int mgc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) 664{ 665 int rc = 0; 666 667 switch (stage) { 668 case OBD_CLEANUP_EARLY: 669 break; 670 case OBD_CLEANUP_EXPORTS: 671 if (atomic_dec_and_test(&mgc_count)) { 672 int running; 673 /* stop requeue thread */ 674 spin_lock(&config_list_lock); 675 running = rq_state & RQ_RUNNING; 676 if (running) 677 rq_state |= RQ_STOP; 678 spin_unlock(&config_list_lock); 679 if (running) { 680 wake_up(&rq_waitq); 681 wait_for_completion(&rq_exit); 682 } 683 } 684 obd_cleanup_client_import(obd); 685 rc = mgc_llog_fini(NULL, obd); 686 if (rc != 0) 687 CERROR("failed to cleanup llogging subsystems\n"); 688 break; 689 } 690 return rc; 691} 692 693static int mgc_cleanup(struct obd_device *obd) 694{ 695 int rc; 696 697 /* COMPAT_146 - old config logs may have added profiles we don't 698 know about */ 699 if (obd->obd_type->typ_refcnt <= 1) 700 /* Only for the last mgc */ 701 class_del_profiles(); 702 703 lprocfs_obd_cleanup(obd); 704 ptlrpcd_decref(); 705 706 rc = client_obd_cleanup(obd); 707 return rc; 708} 709 710static int mgc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) 711{ 712 struct lprocfs_static_vars lvars; 713 int rc; 714 715 ptlrpcd_addref(); 716 717 rc = client_obd_setup(obd, lcfg); 718 if (rc) 719 goto err_decref; 720 721 rc = mgc_llog_init(NULL, obd); 722 if (rc) { 723 CERROR("failed to setup llogging subsystems\n"); 724 goto err_cleanup; 725 } 726 727 lprocfs_mgc_init_vars(&lvars); 728 lprocfs_obd_setup(obd, lvars.obd_vars); 729 sptlrpc_lprocfs_cliobd_attach(obd); 730 731 if (atomic_inc_return(&mgc_count) == 1) { 732 rq_state = 0; 733 init_waitqueue_head(&rq_waitq); 734 735 /* start requeue thread */ 736 rc = PTR_ERR(kthread_run(mgc_requeue_thread, NULL, 737 "ll_cfg_requeue")); 738 if (IS_ERR_VALUE(rc)) { 739 CERROR("%s: Cannot start requeue thread (%d)," 740 "no more log updates!\n", 741 obd->obd_name, rc); 742 goto err_cleanup; 743 } 744 /* rc is the task_struct pointer of mgc_requeue_thread. */ 745 rc = 0; 746 } 747 748 return rc; 749 750err_cleanup: 751 client_obd_cleanup(obd); 752err_decref: 753 ptlrpcd_decref(); 754 return rc; 755} 756 757/* based on ll_mdc_blocking_ast */ 758static int mgc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, 759 void *data, int flag) 760{ 761 struct lustre_handle lockh; 762 struct config_llog_data *cld = (struct config_llog_data *)data; 763 int rc = 0; 764 765 switch (flag) { 766 case LDLM_CB_BLOCKING: 767 /* mgs wants the lock, give it up... */ 768 LDLM_DEBUG(lock, "MGC blocking CB"); 769 ldlm_lock2handle(lock, &lockh); 770 rc = ldlm_cli_cancel(&lockh, LCF_ASYNC); 771 break; 772 case LDLM_CB_CANCELING: 773 /* We've given up the lock, prepare ourselves to update. */ 774 LDLM_DEBUG(lock, "MGC cancel CB"); 775 776 CDEBUG(D_MGC, "Lock res "DLDLMRES" (%.8s)\n", 777 PLDLMRES(lock->l_resource), 778 (char *)&lock->l_resource->lr_name.name[0]); 779 780 if (!cld) { 781 CDEBUG(D_INFO, "missing data, won't requeue\n"); 782 break; 783 } 784 785 /* held at mgc_process_log(). */ 786 LASSERT(atomic_read(&cld->cld_refcount) > 0); 787 /* Are we done with this log? */ 788 if (cld->cld_stopping) { 789 CDEBUG(D_MGC, "log %s: stopping, won't requeue\n", 790 cld->cld_logname); 791 config_log_put(cld); 792 break; 793 } 794 /* Make sure not to re-enqueue when the mgc is stopping 795 (we get called from client_disconnect_export) */ 796 if (!lock->l_conn_export || 797 !lock->l_conn_export->exp_obd->u.cli.cl_conn_count) { 798 CDEBUG(D_MGC, "log %.8s: disconnecting, won't requeue\n", 799 cld->cld_logname); 800 config_log_put(cld); 801 break; 802 } 803 804 /* Re-enqueue now */ 805 mgc_requeue_add(cld); 806 config_log_put(cld); 807 break; 808 default: 809 LBUG(); 810 } 811 812 return rc; 813} 814 815/* Not sure where this should go... */ 816/* This is the timeout value for MGS_CONNECT request plus a ping interval, such 817 * that we can have a chance to try the secondary MGS if any. */ 818#define MGC_ENQUEUE_LIMIT (INITIAL_CONNECT_TIMEOUT + (AT_OFF ? 0 : at_min) \ 819 + PING_INTERVAL) 820#define MGC_TARGET_REG_LIMIT 10 821#define MGC_SEND_PARAM_LIMIT 10 822 823/* Send parameter to MGS*/ 824static int mgc_set_mgs_param(struct obd_export *exp, 825 struct mgs_send_param *msp) 826{ 827 struct ptlrpc_request *req; 828 struct mgs_send_param *req_msp, *rep_msp; 829 int rc; 830 831 req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), 832 &RQF_MGS_SET_INFO, LUSTRE_MGS_VERSION, 833 MGS_SET_INFO); 834 if (!req) 835 return -ENOMEM; 836 837 req_msp = req_capsule_client_get(&req->rq_pill, &RMF_MGS_SEND_PARAM); 838 if (!req_msp) { 839 ptlrpc_req_finished(req); 840 return -ENOMEM; 841 } 842 843 memcpy(req_msp, msp, sizeof(*req_msp)); 844 ptlrpc_request_set_replen(req); 845 846 /* Limit how long we will wait for the enqueue to complete */ 847 req->rq_delay_limit = MGC_SEND_PARAM_LIMIT; 848 rc = ptlrpc_queue_wait(req); 849 if (!rc) { 850 rep_msp = req_capsule_server_get(&req->rq_pill, &RMF_MGS_SEND_PARAM); 851 memcpy(msp, rep_msp, sizeof(*rep_msp)); 852 } 853 854 ptlrpc_req_finished(req); 855 856 return rc; 857} 858 859/* Take a config lock so we can get cancel notifications */ 860static int mgc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm, 861 __u32 type, ldlm_policy_data_t *policy, __u32 mode, 862 __u64 *flags, void *bl_cb, void *cp_cb, void *gl_cb, 863 void *data, __u32 lvb_len, void *lvb_swabber, 864 struct lustre_handle *lockh) 865{ 866 struct config_llog_data *cld = (struct config_llog_data *)data; 867 struct ldlm_enqueue_info einfo = { 868 .ei_type = type, 869 .ei_mode = mode, 870 .ei_cb_bl = mgc_blocking_ast, 871 .ei_cb_cp = ldlm_completion_ast, 872 }; 873 struct ptlrpc_request *req; 874 int short_limit = cld_is_sptlrpc(cld); 875 int rc; 876 877 CDEBUG(D_MGC, "Enqueue for %s (res %#llx)\n", cld->cld_logname, 878 cld->cld_resid.name[0]); 879 880 /* We need a callback for every lockholder, so don't try to 881 ldlm_lock_match (see rev 1.1.2.11.2.47) */ 882 req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), 883 &RQF_LDLM_ENQUEUE, LUSTRE_DLM_VERSION, 884 LDLM_ENQUEUE); 885 if (req == NULL) 886 return -ENOMEM; 887 888 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, 0); 889 ptlrpc_request_set_replen(req); 890 891 /* check if this is server or client */ 892 if (cld->cld_cfg.cfg_sb) { 893 struct lustre_sb_info *lsi = s2lsi(cld->cld_cfg.cfg_sb); 894 if (lsi && IS_SERVER(lsi)) 895 short_limit = 1; 896 } 897 /* Limit how long we will wait for the enqueue to complete */ 898 req->rq_delay_limit = short_limit ? 5 : MGC_ENQUEUE_LIMIT; 899 rc = ldlm_cli_enqueue(exp, &req, &einfo, &cld->cld_resid, NULL, flags, 900 NULL, 0, LVB_T_NONE, lockh, 0); 901 /* A failed enqueue should still call the mgc_blocking_ast, 902 where it will be requeued if needed ("grant failed"). */ 903 ptlrpc_req_finished(req); 904 return rc; 905} 906 907static void mgc_notify_active(struct obd_device *unused) 908{ 909 /* wakeup mgc_requeue_thread to requeue mgc lock */ 910 spin_lock(&config_list_lock); 911 rq_state |= RQ_NOW; 912 spin_unlock(&config_list_lock); 913 wake_up(&rq_waitq); 914 915 /* TODO: Help the MGS rebuild nidtbl. -jay */ 916} 917 918/* Send target_reg message to MGS */ 919static int mgc_target_register(struct obd_export *exp, 920 struct mgs_target_info *mti) 921{ 922 struct ptlrpc_request *req; 923 struct mgs_target_info *req_mti, *rep_mti; 924 int rc; 925 926 req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), 927 &RQF_MGS_TARGET_REG, LUSTRE_MGS_VERSION, 928 MGS_TARGET_REG); 929 if (req == NULL) 930 return -ENOMEM; 931 932 req_mti = req_capsule_client_get(&req->rq_pill, &RMF_MGS_TARGET_INFO); 933 if (!req_mti) { 934 ptlrpc_req_finished(req); 935 return -ENOMEM; 936 } 937 938 memcpy(req_mti, mti, sizeof(*req_mti)); 939 ptlrpc_request_set_replen(req); 940 CDEBUG(D_MGC, "register %s\n", mti->mti_svname); 941 /* Limit how long we will wait for the enqueue to complete */ 942 req->rq_delay_limit = MGC_TARGET_REG_LIMIT; 943 944 rc = ptlrpc_queue_wait(req); 945 if (!rc) { 946 rep_mti = req_capsule_server_get(&req->rq_pill, 947 &RMF_MGS_TARGET_INFO); 948 memcpy(mti, rep_mti, sizeof(*rep_mti)); 949 CDEBUG(D_MGC, "register %s got index = %d\n", 950 mti->mti_svname, mti->mti_stripe_index); 951 } 952 ptlrpc_req_finished(req); 953 954 return rc; 955} 956 957int mgc_set_info_async(const struct lu_env *env, struct obd_export *exp, 958 u32 keylen, void *key, u32 vallen, 959 void *val, struct ptlrpc_request_set *set) 960{ 961 int rc = -EINVAL; 962 963 /* Turn off initial_recov after we try all backup servers once */ 964 if (KEY_IS(KEY_INIT_RECOV_BACKUP)) { 965 struct obd_import *imp = class_exp2cliimp(exp); 966 int value; 967 if (vallen != sizeof(int)) 968 return -EINVAL; 969 value = *(int *)val; 970 CDEBUG(D_MGC, "InitRecov %s %d/d%d:i%d:r%d:or%d:%s\n", 971 imp->imp_obd->obd_name, value, 972 imp->imp_deactive, imp->imp_invalid, 973 imp->imp_replayable, imp->imp_obd->obd_replayable, 974 ptlrpc_import_state_name(imp->imp_state)); 975 /* Resurrect if we previously died */ 976 if ((imp->imp_state != LUSTRE_IMP_FULL && 977 imp->imp_state != LUSTRE_IMP_NEW) || value > 1) 978 ptlrpc_reconnect_import(imp); 979 return 0; 980 } 981 if (KEY_IS(KEY_SET_INFO)) { 982 struct mgs_send_param *msp; 983 984 msp = (struct mgs_send_param *)val; 985 rc = mgc_set_mgs_param(exp, msp); 986 return rc; 987 } 988 if (KEY_IS(KEY_MGSSEC)) { 989 struct client_obd *cli = &exp->exp_obd->u.cli; 990 struct sptlrpc_flavor flvr; 991 992 /* 993 * empty string means using current flavor, if which haven't 994 * been set yet, set it as null. 995 * 996 * if flavor has been set previously, check the asking flavor 997 * must match the existing one. 998 */ 999 if (vallen == 0) { 1000 if (cli->cl_flvr_mgc.sf_rpc != SPTLRPC_FLVR_INVALID) 1001 return 0; 1002 val = "null"; 1003 vallen = 4; 1004 } 1005 1006 rc = sptlrpc_parse_flavor(val, &flvr); 1007 if (rc) { 1008 CERROR("invalid sptlrpc flavor %s to MGS\n", 1009 (char *) val); 1010 return rc; 1011 } 1012 1013 /* 1014 * caller already hold a mutex 1015 */ 1016 if (cli->cl_flvr_mgc.sf_rpc == SPTLRPC_FLVR_INVALID) { 1017 cli->cl_flvr_mgc = flvr; 1018 } else if (memcmp(&cli->cl_flvr_mgc, &flvr, 1019 sizeof(flvr)) != 0) { 1020 char str[20]; 1021 1022 sptlrpc_flavor2name(&cli->cl_flvr_mgc, 1023 str, sizeof(str)); 1024 LCONSOLE_ERROR("asking sptlrpc flavor %s to MGS but " 1025 "currently %s is in use\n", 1026 (char *) val, str); 1027 rc = -EPERM; 1028 } 1029 return rc; 1030 } 1031 1032 return rc; 1033} 1034 1035static int mgc_get_info(const struct lu_env *env, struct obd_export *exp, 1036 __u32 keylen, void *key, __u32 *vallen, void *val, 1037 struct lov_stripe_md *unused) 1038{ 1039 int rc = -EINVAL; 1040 1041 if (KEY_IS(KEY_CONN_DATA)) { 1042 struct obd_import *imp = class_exp2cliimp(exp); 1043 struct obd_connect_data *data = val; 1044 1045 if (*vallen == sizeof(*data)) { 1046 *data = imp->imp_connect_data; 1047 rc = 0; 1048 } 1049 } 1050 1051 return rc; 1052} 1053 1054static int mgc_import_event(struct obd_device *obd, 1055 struct obd_import *imp, 1056 enum obd_import_event event) 1057{ 1058 int rc = 0; 1059 1060 LASSERT(imp->imp_obd == obd); 1061 CDEBUG(D_MGC, "import event %#x\n", event); 1062 1063 switch (event) { 1064 case IMP_EVENT_DISCON: 1065 /* MGC imports should not wait for recovery */ 1066 if (OCD_HAS_FLAG(&imp->imp_connect_data, IMP_RECOV)) 1067 ptlrpc_pinger_ir_down(); 1068 break; 1069 case IMP_EVENT_INACTIVE: 1070 break; 1071 case IMP_EVENT_INVALIDATE: { 1072 struct ldlm_namespace *ns = obd->obd_namespace; 1073 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY); 1074 break; 1075 } 1076 case IMP_EVENT_ACTIVE: 1077 CDEBUG(D_INFO, "%s: Reactivating import\n", obd->obd_name); 1078 /* Clearing obd_no_recov allows us to continue pinging */ 1079 obd->obd_no_recov = 0; 1080 mgc_notify_active(obd); 1081 if (OCD_HAS_FLAG(&imp->imp_connect_data, IMP_RECOV)) 1082 ptlrpc_pinger_ir_up(); 1083 break; 1084 case IMP_EVENT_OCD: 1085 break; 1086 case IMP_EVENT_DEACTIVATE: 1087 case IMP_EVENT_ACTIVATE: 1088 break; 1089 default: 1090 CERROR("Unknown import event %#x\n", event); 1091 LBUG(); 1092 } 1093 return rc; 1094} 1095 1096enum { 1097 CONFIG_READ_NRPAGES_INIT = 1 << (20 - PAGE_CACHE_SHIFT), 1098 CONFIG_READ_NRPAGES = 4 1099}; 1100 1101static int mgc_apply_recover_logs(struct obd_device *mgc, 1102 struct config_llog_data *cld, 1103 __u64 max_version, 1104 void *data, int datalen, bool mne_swab) 1105{ 1106 struct config_llog_instance *cfg = &cld->cld_cfg; 1107 struct lustre_sb_info *lsi = s2lsi(cfg->cfg_sb); 1108 struct mgs_nidtbl_entry *entry; 1109 struct lustre_cfg *lcfg; 1110 struct lustre_cfg_bufs bufs; 1111 u64 prev_version = 0; 1112 char *inst; 1113 char *buf; 1114 int bufsz; 1115 int pos; 1116 int rc = 0; 1117 int off = 0; 1118 1119 LASSERT(cfg->cfg_instance != NULL); 1120 LASSERT(cfg->cfg_sb == cfg->cfg_instance); 1121 1122 OBD_ALLOC(inst, PAGE_CACHE_SIZE); 1123 if (inst == NULL) 1124 return -ENOMEM; 1125 1126 if (!IS_SERVER(lsi)) { 1127 pos = snprintf(inst, PAGE_CACHE_SIZE, "%p", cfg->cfg_instance); 1128 if (pos >= PAGE_CACHE_SIZE) { 1129 OBD_FREE(inst, PAGE_CACHE_SIZE); 1130 return -E2BIG; 1131 } 1132 } else { 1133 LASSERT(IS_MDT(lsi)); 1134 rc = server_name2svname(lsi->lsi_svname, inst, NULL, 1135 PAGE_CACHE_SIZE); 1136 if (rc) { 1137 OBD_FREE(inst, PAGE_CACHE_SIZE); 1138 return -EINVAL; 1139 } 1140 pos = strlen(inst); 1141 } 1142 1143 ++pos; 1144 buf = inst + pos; 1145 bufsz = PAGE_CACHE_SIZE - pos; 1146 1147 while (datalen > 0) { 1148 int entry_len = sizeof(*entry); 1149 int is_ost; 1150 struct obd_device *obd; 1151 char *obdname; 1152 char *cname; 1153 char *params; 1154 char *uuid; 1155 1156 rc = -EINVAL; 1157 if (datalen < sizeof(*entry)) 1158 break; 1159 1160 entry = (typeof(entry))(data + off); 1161 1162 /* sanity check */ 1163 if (entry->mne_nid_type != 0) /* only support type 0 for ipv4 */ 1164 break; 1165 if (entry->mne_nid_count == 0) /* at least one nid entry */ 1166 break; 1167 if (entry->mne_nid_size != sizeof(lnet_nid_t)) 1168 break; 1169 1170 entry_len += entry->mne_nid_count * entry->mne_nid_size; 1171 if (datalen < entry_len) /* must have entry_len at least */ 1172 break; 1173 1174 /* Keep this swab for normal mixed endian handling. LU-1644 */ 1175 if (mne_swab) 1176 lustre_swab_mgs_nidtbl_entry(entry); 1177 if (entry->mne_length > PAGE_CACHE_SIZE) { 1178 CERROR("MNE too large (%u)\n", entry->mne_length); 1179 break; 1180 } 1181 1182 if (entry->mne_length < entry_len) 1183 break; 1184 1185 off += entry->mne_length; 1186 datalen -= entry->mne_length; 1187 if (datalen < 0) 1188 break; 1189 1190 if (entry->mne_version > max_version) { 1191 CERROR("entry index(%lld) is over max_index(%lld)\n", 1192 entry->mne_version, max_version); 1193 break; 1194 } 1195 1196 if (prev_version >= entry->mne_version) { 1197 CERROR("index unsorted, prev %lld, now %lld\n", 1198 prev_version, entry->mne_version); 1199 break; 1200 } 1201 prev_version = entry->mne_version; 1202 1203 /* 1204 * Write a string with format "nid::instance" to 1205 * lustre/<osc|mdc>/<target>-<osc|mdc>-<instance>/import. 1206 */ 1207 1208 is_ost = entry->mne_type == LDD_F_SV_TYPE_OST; 1209 memset(buf, 0, bufsz); 1210 obdname = buf; 1211 pos = 0; 1212 1213 /* lustre-OST0001-osc-<instance #> */ 1214 strcpy(obdname, cld->cld_logname); 1215 cname = strrchr(obdname, '-'); 1216 if (cname == NULL) { 1217 CERROR("mgc %s: invalid logname %s\n", 1218 mgc->obd_name, obdname); 1219 break; 1220 } 1221 1222 pos = cname - obdname; 1223 obdname[pos] = 0; 1224 pos += sprintf(obdname + pos, "-%s%04x", 1225 is_ost ? "OST" : "MDT", entry->mne_index); 1226 1227 cname = is_ost ? "osc" : "mdc", 1228 pos += sprintf(obdname + pos, "-%s-%s", cname, inst); 1229 lustre_cfg_bufs_reset(&bufs, obdname); 1230 1231 /* find the obd by obdname */ 1232 obd = class_name2obd(obdname); 1233 if (obd == NULL) { 1234 CDEBUG(D_INFO, "mgc %s: cannot find obdname %s\n", 1235 mgc->obd_name, obdname); 1236 rc = 0; 1237 /* this is a safe race, when the ost is starting up...*/ 1238 continue; 1239 } 1240 1241 /* osc.import = "connection=<Conn UUID>::<target instance>" */ 1242 ++pos; 1243 params = buf + pos; 1244 pos += sprintf(params, "%s.import=%s", cname, "connection="); 1245 uuid = buf + pos; 1246 1247 down_read(&obd->u.cli.cl_sem); 1248 if (obd->u.cli.cl_import == NULL) { 1249 /* client does not connect to the OST yet */ 1250 up_read(&obd->u.cli.cl_sem); 1251 rc = 0; 1252 continue; 1253 } 1254 1255 /* TODO: iterate all nids to find one */ 1256 /* find uuid by nid */ 1257 rc = client_import_find_conn(obd->u.cli.cl_import, 1258 entry->u.nids[0], 1259 (struct obd_uuid *)uuid); 1260 up_read(&obd->u.cli.cl_sem); 1261 if (rc < 0) { 1262 CERROR("mgc: cannot find uuid by nid %s\n", 1263 libcfs_nid2str(entry->u.nids[0])); 1264 break; 1265 } 1266 1267 CDEBUG(D_INFO, "Find uuid %s by nid %s\n", 1268 uuid, libcfs_nid2str(entry->u.nids[0])); 1269 1270 pos += strlen(uuid); 1271 pos += sprintf(buf + pos, "::%u", entry->mne_instance); 1272 LASSERT(pos < bufsz); 1273 1274 lustre_cfg_bufs_set_string(&bufs, 1, params); 1275 1276 rc = -ENOMEM; 1277 lcfg = lustre_cfg_new(LCFG_PARAM, &bufs); 1278 if (lcfg == NULL) { 1279 CERROR("mgc: cannot allocate memory\n"); 1280 break; 1281 } 1282 1283 CDEBUG(D_INFO, "ir apply logs %lld/%lld for %s -> %s\n", 1284 prev_version, max_version, obdname, params); 1285 1286 rc = class_process_config(lcfg); 1287 lustre_cfg_free(lcfg); 1288 if (rc) 1289 CDEBUG(D_INFO, "process config for %s error %d\n", 1290 obdname, rc); 1291 1292 /* continue, even one with error */ 1293 } 1294 1295 OBD_FREE(inst, PAGE_CACHE_SIZE); 1296 return rc; 1297} 1298 1299/** 1300 * This function is called if this client was notified for target restarting 1301 * by the MGS. A CONFIG_READ RPC is going to send to fetch recovery logs. 1302 */ 1303static int mgc_process_recover_log(struct obd_device *obd, 1304 struct config_llog_data *cld) 1305{ 1306 struct ptlrpc_request *req = NULL; 1307 struct config_llog_instance *cfg = &cld->cld_cfg; 1308 struct mgs_config_body *body; 1309 struct mgs_config_res *res; 1310 struct ptlrpc_bulk_desc *desc; 1311 struct page **pages; 1312 int nrpages; 1313 bool eof = true; 1314 bool mne_swab = false; 1315 int i; 1316 int ealen; 1317 int rc; 1318 1319 /* allocate buffer for bulk transfer. 1320 * if this is the first time for this mgs to read logs, 1321 * CONFIG_READ_NRPAGES_INIT will be used since it will read all logs 1322 * once; otherwise, it only reads increment of logs, this should be 1323 * small and CONFIG_READ_NRPAGES will be used. 1324 */ 1325 nrpages = CONFIG_READ_NRPAGES; 1326 if (cfg->cfg_last_idx == 0) /* the first time */ 1327 nrpages = CONFIG_READ_NRPAGES_INIT; 1328 1329 OBD_ALLOC(pages, sizeof(*pages) * nrpages); 1330 if (pages == NULL) { 1331 rc = -ENOMEM; 1332 goto out; 1333 } 1334 1335 for (i = 0; i < nrpages; i++) { 1336 pages[i] = alloc_page(GFP_IOFS); 1337 if (pages[i] == NULL) { 1338 rc = -ENOMEM; 1339 goto out; 1340 } 1341 } 1342 1343again: 1344 LASSERT(cld_is_recover(cld)); 1345 LASSERT(mutex_is_locked(&cld->cld_lock)); 1346 req = ptlrpc_request_alloc(class_exp2cliimp(cld->cld_mgcexp), 1347 &RQF_MGS_CONFIG_READ); 1348 if (req == NULL) { 1349 rc = -ENOMEM; 1350 goto out; 1351 } 1352 1353 rc = ptlrpc_request_pack(req, LUSTRE_MGS_VERSION, MGS_CONFIG_READ); 1354 if (rc) 1355 goto out; 1356 1357 /* pack request */ 1358 body = req_capsule_client_get(&req->rq_pill, &RMF_MGS_CONFIG_BODY); 1359 LASSERT(body != NULL); 1360 LASSERT(sizeof(body->mcb_name) > strlen(cld->cld_logname)); 1361 if (strlcpy(body->mcb_name, cld->cld_logname, sizeof(body->mcb_name)) 1362 >= sizeof(body->mcb_name)) { 1363 rc = -E2BIG; 1364 goto out; 1365 } 1366 body->mcb_offset = cfg->cfg_last_idx + 1; 1367 body->mcb_type = cld->cld_type; 1368 body->mcb_bits = PAGE_CACHE_SHIFT; 1369 body->mcb_units = nrpages; 1370 1371 /* allocate bulk transfer descriptor */ 1372 desc = ptlrpc_prep_bulk_imp(req, nrpages, 1, BULK_PUT_SINK, 1373 MGS_BULK_PORTAL); 1374 if (desc == NULL) { 1375 rc = -ENOMEM; 1376 goto out; 1377 } 1378 1379 for (i = 0; i < nrpages; i++) 1380 ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_CACHE_SIZE); 1381 1382 ptlrpc_request_set_replen(req); 1383 rc = ptlrpc_queue_wait(req); 1384 if (rc) 1385 goto out; 1386 1387 res = req_capsule_server_get(&req->rq_pill, &RMF_MGS_CONFIG_RES); 1388 if (res->mcr_size < res->mcr_offset) { 1389 rc = -EINVAL; 1390 goto out; 1391 } 1392 1393 /* always update the index even though it might have errors with 1394 * handling the recover logs */ 1395 cfg->cfg_last_idx = res->mcr_offset; 1396 eof = res->mcr_offset == res->mcr_size; 1397 1398 CDEBUG(D_INFO, "Latest version %lld, more %d.\n", 1399 res->mcr_offset, eof == false); 1400 1401 ealen = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, 0); 1402 if (ealen < 0) { 1403 rc = ealen; 1404 goto out; 1405 } 1406 1407 if (ealen > nrpages << PAGE_CACHE_SHIFT) { 1408 rc = -EINVAL; 1409 goto out; 1410 } 1411 1412 if (ealen == 0) { /* no logs transferred */ 1413 if (!eof) 1414 rc = -EINVAL; 1415 goto out; 1416 } 1417 1418 mne_swab = !!ptlrpc_rep_need_swab(req); 1419#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 2, 50, 0) 1420 /* This import flag means the server did an extra swab of IR MNE 1421 * records (fixed in LU-1252), reverse it here if needed. LU-1644 */ 1422 if (unlikely(req->rq_import->imp_need_mne_swab)) 1423 mne_swab = !mne_swab; 1424#else 1425#warning "LU-1644: Remove old OBD_CONNECT_MNE_SWAB fixup and imp_need_mne_swab" 1426#endif 1427 1428 for (i = 0; i < nrpages && ealen > 0; i++) { 1429 int rc2; 1430 void *ptr; 1431 1432 ptr = kmap(pages[i]); 1433 rc2 = mgc_apply_recover_logs(obd, cld, res->mcr_offset, ptr, 1434 min_t(int, ealen, PAGE_CACHE_SIZE), 1435 mne_swab); 1436 kunmap(pages[i]); 1437 if (rc2 < 0) { 1438 CWARN("Process recover log %s error %d\n", 1439 cld->cld_logname, rc2); 1440 break; 1441 } 1442 1443 ealen -= PAGE_CACHE_SIZE; 1444 } 1445 1446out: 1447 if (req) 1448 ptlrpc_req_finished(req); 1449 1450 if (rc == 0 && !eof) 1451 goto again; 1452 1453 if (pages) { 1454 for (i = 0; i < nrpages; i++) { 1455 if (pages[i] == NULL) 1456 break; 1457 __free_page(pages[i]); 1458 } 1459 OBD_FREE(pages, sizeof(*pages) * nrpages); 1460 } 1461 return rc; 1462} 1463 1464/* local_only means it cannot get remote llogs */ 1465static int mgc_process_cfg_log(struct obd_device *mgc, 1466 struct config_llog_data *cld, int local_only) 1467{ 1468 struct llog_ctxt *ctxt; 1469 struct lustre_sb_info *lsi = NULL; 1470 int rc = 0; 1471 bool sptlrpc_started = false; 1472 struct lu_env *env; 1473 1474 LASSERT(cld); 1475 LASSERT(mutex_is_locked(&cld->cld_lock)); 1476 1477 /* 1478 * local copy of sptlrpc log is controlled elsewhere, don't try to 1479 * read it up here. 1480 */ 1481 if (cld_is_sptlrpc(cld) && local_only) 1482 return 0; 1483 1484 if (cld->cld_cfg.cfg_sb) 1485 lsi = s2lsi(cld->cld_cfg.cfg_sb); 1486 1487 OBD_ALLOC_PTR(env); 1488 if (env == NULL) 1489 return -ENOMEM; 1490 1491 rc = lu_env_init(env, LCT_MG_THREAD); 1492 if (rc) 1493 goto out_free; 1494 1495 ctxt = llog_get_context(mgc, LLOG_CONFIG_REPL_CTXT); 1496 LASSERT(ctxt); 1497 1498 if (local_only) /* no local log at client side */ { 1499 rc = -EIO; 1500 goto out_pop; 1501 } 1502 1503 if (cld_is_sptlrpc(cld)) { 1504 sptlrpc_conf_log_update_begin(cld->cld_logname); 1505 sptlrpc_started = true; 1506 } 1507 1508 /* logname and instance info should be the same, so use our 1509 * copy of the instance for the update. The cfg_last_idx will 1510 * be updated here. */ 1511 rc = class_config_parse_llog(env, ctxt, cld->cld_logname, 1512 &cld->cld_cfg); 1513 1514out_pop: 1515 __llog_ctxt_put(env, ctxt); 1516 1517 /* 1518 * update settings on existing OBDs. doing it inside 1519 * of llog_process_lock so no device is attaching/detaching 1520 * in parallel. 1521 * the logname must be <fsname>-sptlrpc 1522 */ 1523 if (sptlrpc_started) { 1524 LASSERT(cld_is_sptlrpc(cld)); 1525 sptlrpc_conf_log_update_end(cld->cld_logname); 1526 class_notify_sptlrpc_conf(cld->cld_logname, 1527 strlen(cld->cld_logname) - 1528 strlen("-sptlrpc")); 1529 } 1530 1531 lu_env_fini(env); 1532out_free: 1533 OBD_FREE_PTR(env); 1534 return rc; 1535} 1536 1537/** Get a config log from the MGS and process it. 1538 * This func is called for both clients and servers. 1539 * Copy the log locally before parsing it if appropriate (non-MGS server) 1540 */ 1541int mgc_process_log(struct obd_device *mgc, struct config_llog_data *cld) 1542{ 1543 struct lustre_handle lockh = { 0 }; 1544 __u64 flags = LDLM_FL_NO_LRU; 1545 int rc = 0, rcl; 1546 1547 LASSERT(cld); 1548 1549 /* I don't want multiple processes running process_log at once -- 1550 sounds like badness. It actually might be fine, as long as 1551 we're not trying to update from the same log 1552 simultaneously (in which case we should use a per-log sem.) */ 1553 mutex_lock(&cld->cld_lock); 1554 if (cld->cld_stopping) { 1555 mutex_unlock(&cld->cld_lock); 1556 return 0; 1557 } 1558 1559 OBD_FAIL_TIMEOUT(OBD_FAIL_MGC_PAUSE_PROCESS_LOG, 20); 1560 1561 CDEBUG(D_MGC, "Process log %s:%p from %d\n", cld->cld_logname, 1562 cld->cld_cfg.cfg_instance, cld->cld_cfg.cfg_last_idx + 1); 1563 1564 /* Get the cfg lock on the llog */ 1565 rcl = mgc_enqueue(mgc->u.cli.cl_mgc_mgsexp, NULL, LDLM_PLAIN, NULL, 1566 LCK_CR, &flags, NULL, NULL, NULL, 1567 cld, 0, NULL, &lockh); 1568 if (rcl == 0) { 1569 /* Get the cld, it will be released in mgc_blocking_ast. */ 1570 config_log_get(cld); 1571 rc = ldlm_lock_set_data(&lockh, (void *)cld); 1572 LASSERT(rc == 0); 1573 } else { 1574 CDEBUG(D_MGC, "Can't get cfg lock: %d\n", rcl); 1575 1576 /* mark cld_lostlock so that it will requeue 1577 * after MGC becomes available. */ 1578 cld->cld_lostlock = 1; 1579 /* Get extra reference, it will be put in requeue thread */ 1580 config_log_get(cld); 1581 } 1582 1583 1584 if (cld_is_recover(cld)) { 1585 rc = 0; /* this is not a fatal error for recover log */ 1586 if (rcl == 0) 1587 rc = mgc_process_recover_log(mgc, cld); 1588 } else { 1589 rc = mgc_process_cfg_log(mgc, cld, rcl != 0); 1590 } 1591 1592 CDEBUG(D_MGC, "%s: configuration from log '%s' %sed (%d).\n", 1593 mgc->obd_name, cld->cld_logname, rc ? "fail" : "succeed", rc); 1594 1595 mutex_unlock(&cld->cld_lock); 1596 1597 /* Now drop the lock so MGS can revoke it */ 1598 if (!rcl) 1599 ldlm_lock_decref(&lockh, LCK_CR); 1600 1601 return rc; 1602} 1603 1604 1605/** Called from lustre_process_log. 1606 * LCFG_LOG_START gets the config log from the MGS, processes it to start 1607 * any services, and adds it to the list logs to watch (follow). 1608 */ 1609static int mgc_process_config(struct obd_device *obd, u32 len, void *buf) 1610{ 1611 struct lustre_cfg *lcfg = buf; 1612 struct config_llog_instance *cfg = NULL; 1613 char *logname; 1614 int rc = 0; 1615 1616 switch (lcfg->lcfg_command) { 1617 case LCFG_LOV_ADD_OBD: { 1618 /* Overloading this cfg command: register a new target */ 1619 struct mgs_target_info *mti; 1620 1621 if (LUSTRE_CFG_BUFLEN(lcfg, 1) != 1622 sizeof(struct mgs_target_info)) { 1623 rc = -EINVAL; 1624 goto out; 1625 } 1626 1627 mti = (struct mgs_target_info *)lustre_cfg_buf(lcfg, 1); 1628 CDEBUG(D_MGC, "add_target %s %#x\n", 1629 mti->mti_svname, mti->mti_flags); 1630 rc = mgc_target_register(obd->u.cli.cl_mgc_mgsexp, mti); 1631 break; 1632 } 1633 case LCFG_LOV_DEL_OBD: 1634 /* Unregister has no meaning at the moment. */ 1635 CERROR("lov_del_obd unimplemented\n"); 1636 rc = -ENOSYS; 1637 break; 1638 case LCFG_SPTLRPC_CONF: { 1639 rc = sptlrpc_process_config(lcfg); 1640 break; 1641 } 1642 case LCFG_LOG_START: { 1643 struct config_llog_data *cld; 1644 struct super_block *sb; 1645 1646 logname = lustre_cfg_string(lcfg, 1); 1647 cfg = (struct config_llog_instance *)lustre_cfg_buf(lcfg, 2); 1648 sb = *(struct super_block **)lustre_cfg_buf(lcfg, 3); 1649 1650 CDEBUG(D_MGC, "parse_log %s from %d\n", logname, 1651 cfg->cfg_last_idx); 1652 1653 /* We're only called through here on the initial mount */ 1654 rc = config_log_add(obd, logname, cfg, sb); 1655 if (rc) 1656 break; 1657 cld = config_log_find(logname, cfg); 1658 if (cld == NULL) { 1659 rc = -ENOENT; 1660 break; 1661 } 1662 1663 /* COMPAT_146 */ 1664 /* FIXME only set this for old logs! Right now this forces 1665 us to always skip the "inside markers" check */ 1666 cld->cld_cfg.cfg_flags |= CFG_F_COMPAT146; 1667 1668 rc = mgc_process_log(obd, cld); 1669 if (rc == 0 && cld->cld_recover != NULL) { 1670 if (OCD_HAS_FLAG(&obd->u.cli.cl_import-> 1671 imp_connect_data, IMP_RECOV)) { 1672 rc = mgc_process_log(obd, cld->cld_recover); 1673 } else { 1674 struct config_llog_data *cir = cld->cld_recover; 1675 cld->cld_recover = NULL; 1676 config_log_put(cir); 1677 } 1678 if (rc) 1679 CERROR("Cannot process recover llog %d\n", rc); 1680 } 1681 1682 if (rc == 0 && cld->cld_params != NULL) { 1683 rc = mgc_process_log(obd, cld->cld_params); 1684 if (rc == -ENOENT) { 1685 CDEBUG(D_MGC, 1686 "There is no params config file yet\n"); 1687 rc = 0; 1688 } 1689 /* params log is optional */ 1690 if (rc) 1691 CERROR( 1692 "%s: can't process params llog: rc = %d\n", 1693 obd->obd_name, rc); 1694 } 1695 config_log_put(cld); 1696 1697 break; 1698 } 1699 case LCFG_LOG_END: { 1700 logname = lustre_cfg_string(lcfg, 1); 1701 1702 if (lcfg->lcfg_bufcount >= 2) 1703 cfg = (struct config_llog_instance *)lustre_cfg_buf( 1704 lcfg, 2); 1705 rc = config_log_end(logname, cfg); 1706 break; 1707 } 1708 default: { 1709 CERROR("Unknown command: %d\n", lcfg->lcfg_command); 1710 rc = -EINVAL; 1711 goto out; 1712 1713 } 1714 } 1715out: 1716 return rc; 1717} 1718 1719struct obd_ops mgc_obd_ops = { 1720 .o_owner = THIS_MODULE, 1721 .o_setup = mgc_setup, 1722 .o_precleanup = mgc_precleanup, 1723 .o_cleanup = mgc_cleanup, 1724 .o_add_conn = client_import_add_conn, 1725 .o_del_conn = client_import_del_conn, 1726 .o_connect = client_connect_import, 1727 .o_disconnect = client_disconnect_export, 1728 /* .o_enqueue = mgc_enqueue, */ 1729 /* .o_iocontrol = mgc_iocontrol, */ 1730 .o_set_info_async = mgc_set_info_async, 1731 .o_get_info = mgc_get_info, 1732 .o_import_event = mgc_import_event, 1733 .o_process_config = mgc_process_config, 1734}; 1735 1736int __init mgc_init(void) 1737{ 1738 return class_register_type(&mgc_obd_ops, NULL, NULL, 1739 LUSTRE_MGC_NAME, NULL); 1740} 1741 1742static void /*__exit*/ mgc_exit(void) 1743{ 1744 class_unregister_type(LUSTRE_MGC_NAME); 1745} 1746 1747MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>"); 1748MODULE_DESCRIPTION("Lustre Management Client"); 1749MODULE_LICENSE("GPL"); 1750 1751module_init(mgc_init); 1752module_exit(mgc_exit); 1753