fid_request.c revision f267cdb464bcab7be965fa2a513675d9ad1e90f6
1/* 2 * GPL HEADER START 3 * 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License version 2 only, 8 * as published by the Free Software Foundation. 9 * 10 * This program is distributed in the hope that it will be useful, but 11 * WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * General Public License version 2 for more details (a copy is included 14 * in the LICENSE file that accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License 17 * version 2 along with this program; If not, see 18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf 19 * 20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 21 * CA 95054 USA or visit www.sun.com if you need additional information or 22 * have any questions. 23 * 24 * GPL HEADER END 25 */ 26/* 27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. 28 * Use is subject to license terms. 29 * 30 * Copyright (c) 2011, 2013, Intel Corporation. 31 */ 32/* 33 * This file is part of Lustre, http://www.lustre.org/ 34 * Lustre is a trademark of Sun Microsystems, Inc. 35 * 36 * lustre/fid/fid_request.c 37 * 38 * Lustre Sequence Manager 39 * 40 * Author: Yury Umanets <umka@clusterfs.com> 41 */ 42 43#define DEBUG_SUBSYSTEM S_FID 44 45#include "../../include/linux/libcfs/libcfs.h" 46#include <linux/module.h> 47 48#include "../include/obd.h" 49#include "../include/obd_class.h" 50#include "../include/obd_support.h" 51#include "../include/lustre_fid.h" 52/* mdc RPC locks */ 53#include "../include/lustre_mdc.h" 54#include "fid_internal.h" 55 56static int seq_client_rpc(struct lu_client_seq *seq, 57 struct lu_seq_range *output, __u32 opc, 58 const char *opcname) 59{ 60 struct obd_export *exp = seq->lcs_exp; 61 struct ptlrpc_request *req; 62 struct lu_seq_range *out, *in; 63 __u32 *op; 64 unsigned int debug_mask; 65 int rc; 66 67 req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_SEQ_QUERY, 68 LUSTRE_MDS_VERSION, SEQ_QUERY); 69 if (req == NULL) 70 return -ENOMEM; 71 72 /* Init operation code */ 73 op = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_OPC); 74 *op = opc; 75 76 /* Zero out input range, this is not recovery yet. */ 77 in = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_RANGE); 78 range_init(in); 79 80 ptlrpc_request_set_replen(req); 81 82 in->lsr_index = seq->lcs_space.lsr_index; 83 if (seq->lcs_type == LUSTRE_SEQ_METADATA) 84 fld_range_set_mdt(in); 85 else 86 fld_range_set_ost(in); 87 88 if (opc == SEQ_ALLOC_SUPER) { 89 req->rq_request_portal = SEQ_CONTROLLER_PORTAL; 90 req->rq_reply_portal = MDC_REPLY_PORTAL; 91 /* During allocating super sequence for data object, 92 * the current thread might hold the export of MDT0(MDT0 93 * precreating objects on this OST), and it will send the 94 * request to MDT0 here, so we can not keep resending the 95 * request here, otherwise if MDT0 is failed(umounted), 96 * it can not release the export of MDT0 */ 97 if (seq->lcs_type == LUSTRE_SEQ_DATA) 98 req->rq_no_delay = req->rq_no_resend = 1; 99 debug_mask = D_CONSOLE; 100 } else { 101 if (seq->lcs_type == LUSTRE_SEQ_METADATA) 102 req->rq_request_portal = SEQ_METADATA_PORTAL; 103 else 104 req->rq_request_portal = SEQ_DATA_PORTAL; 105 debug_mask = D_INFO; 106 } 107 108 ptlrpc_at_set_req_timeout(req); 109 110 if (seq->lcs_type == LUSTRE_SEQ_METADATA) 111 mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); 112 rc = ptlrpc_queue_wait(req); 113 if (seq->lcs_type == LUSTRE_SEQ_METADATA) 114 mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); 115 if (rc) 116 GOTO(out_req, rc); 117 118 out = req_capsule_server_get(&req->rq_pill, &RMF_SEQ_RANGE); 119 *output = *out; 120 121 if (!range_is_sane(output)) { 122 CERROR("%s: Invalid range received from server: " 123 DRANGE"\n", seq->lcs_name, PRANGE(output)); 124 GOTO(out_req, rc = -EINVAL); 125 } 126 127 if (range_is_exhausted(output)) { 128 CERROR("%s: Range received from server is exhausted: " 129 DRANGE"]\n", seq->lcs_name, PRANGE(output)); 130 GOTO(out_req, rc = -EINVAL); 131 } 132 133 CDEBUG_LIMIT(debug_mask, "%s: Allocated %s-sequence "DRANGE"]\n", 134 seq->lcs_name, opcname, PRANGE(output)); 135 136out_req: 137 ptlrpc_req_finished(req); 138 return rc; 139} 140 141/* Request sequence-controller node to allocate new super-sequence. */ 142int seq_client_alloc_super(struct lu_client_seq *seq, 143 const struct lu_env *env) 144{ 145 int rc; 146 147 mutex_lock(&seq->lcs_mutex); 148 149 if (seq->lcs_srv) { 150 rc = 0; 151 } else { 152 /* Check whether the connection to seq controller has been 153 * setup (lcs_exp != NULL) */ 154 if (seq->lcs_exp == NULL) { 155 mutex_unlock(&seq->lcs_mutex); 156 return -EINPROGRESS; 157 } 158 159 rc = seq_client_rpc(seq, &seq->lcs_space, 160 SEQ_ALLOC_SUPER, "super"); 161 } 162 mutex_unlock(&seq->lcs_mutex); 163 return rc; 164} 165 166/* Request sequence-controller node to allocate new meta-sequence. */ 167static int seq_client_alloc_meta(const struct lu_env *env, 168 struct lu_client_seq *seq) 169{ 170 int rc; 171 172 if (seq->lcs_srv) { 173 rc = 0; 174 } else { 175 do { 176 /* If meta server return -EINPROGRESS or EAGAIN, 177 * it means meta server might not be ready to 178 * allocate super sequence from sequence controller 179 * (MDT0)yet */ 180 rc = seq_client_rpc(seq, &seq->lcs_space, 181 SEQ_ALLOC_META, "meta"); 182 } while (rc == -EINPROGRESS || rc == -EAGAIN); 183 } 184 185 return rc; 186} 187 188/* Allocate new sequence for client. */ 189static int seq_client_alloc_seq(const struct lu_env *env, 190 struct lu_client_seq *seq, seqno_t *seqnr) 191{ 192 int rc; 193 194 LASSERT(range_is_sane(&seq->lcs_space)); 195 196 if (range_is_exhausted(&seq->lcs_space)) { 197 rc = seq_client_alloc_meta(env, seq); 198 if (rc) { 199 CERROR("%s: Can't allocate new meta-sequence, rc %d\n", 200 seq->lcs_name, rc); 201 return rc; 202 } else { 203 CDEBUG(D_INFO, "%s: New range - "DRANGE"\n", 204 seq->lcs_name, PRANGE(&seq->lcs_space)); 205 } 206 } else { 207 rc = 0; 208 } 209 210 LASSERT(!range_is_exhausted(&seq->lcs_space)); 211 *seqnr = seq->lcs_space.lsr_start; 212 seq->lcs_space.lsr_start += 1; 213 214 CDEBUG(D_INFO, "%s: Allocated sequence [%#llx]\n", seq->lcs_name, 215 *seqnr); 216 217 return rc; 218} 219 220static int seq_fid_alloc_prep(struct lu_client_seq *seq, 221 wait_queue_t *link) 222{ 223 if (seq->lcs_update) { 224 add_wait_queue(&seq->lcs_waitq, link); 225 set_current_state(TASK_UNINTERRUPTIBLE); 226 mutex_unlock(&seq->lcs_mutex); 227 228 schedule(); 229 230 mutex_lock(&seq->lcs_mutex); 231 remove_wait_queue(&seq->lcs_waitq, link); 232 set_current_state(TASK_RUNNING); 233 return -EAGAIN; 234 } 235 ++seq->lcs_update; 236 mutex_unlock(&seq->lcs_mutex); 237 return 0; 238} 239 240static void seq_fid_alloc_fini(struct lu_client_seq *seq) 241{ 242 LASSERT(seq->lcs_update == 1); 243 mutex_lock(&seq->lcs_mutex); 244 --seq->lcs_update; 245 wake_up(&seq->lcs_waitq); 246} 247 248/** 249 * Allocate the whole seq to the caller. 250 **/ 251int seq_client_get_seq(const struct lu_env *env, 252 struct lu_client_seq *seq, seqno_t *seqnr) 253{ 254 wait_queue_t link; 255 int rc; 256 257 LASSERT(seqnr != NULL); 258 mutex_lock(&seq->lcs_mutex); 259 init_waitqueue_entry(&link, current); 260 261 while (1) { 262 rc = seq_fid_alloc_prep(seq, &link); 263 if (rc == 0) 264 break; 265 } 266 267 rc = seq_client_alloc_seq(env, seq, seqnr); 268 if (rc) { 269 CERROR("%s: Can't allocate new sequence, rc %d\n", 270 seq->lcs_name, rc); 271 seq_fid_alloc_fini(seq); 272 mutex_unlock(&seq->lcs_mutex); 273 return rc; 274 } 275 276 CDEBUG(D_INFO, "%s: allocate sequence [0x%16.16Lx]\n", 277 seq->lcs_name, *seqnr); 278 279 /* Since the caller require the whole seq, 280 * so marked this seq to be used */ 281 if (seq->lcs_type == LUSTRE_SEQ_METADATA) 282 seq->lcs_fid.f_oid = LUSTRE_METADATA_SEQ_MAX_WIDTH; 283 else 284 seq->lcs_fid.f_oid = LUSTRE_DATA_SEQ_MAX_WIDTH; 285 286 seq->lcs_fid.f_seq = *seqnr; 287 seq->lcs_fid.f_ver = 0; 288 /* 289 * Inform caller that sequence switch is performed to allow it 290 * to setup FLD for it. 291 */ 292 seq_fid_alloc_fini(seq); 293 mutex_unlock(&seq->lcs_mutex); 294 295 return rc; 296} 297EXPORT_SYMBOL(seq_client_get_seq); 298 299/* Allocate new fid on passed client @seq and save it to @fid. */ 300int seq_client_alloc_fid(const struct lu_env *env, 301 struct lu_client_seq *seq, struct lu_fid *fid) 302{ 303 wait_queue_t link; 304 int rc; 305 306 LASSERT(seq != NULL); 307 LASSERT(fid != NULL); 308 309 init_waitqueue_entry(&link, current); 310 mutex_lock(&seq->lcs_mutex); 311 312 if (OBD_FAIL_CHECK(OBD_FAIL_SEQ_EXHAUST)) 313 seq->lcs_fid.f_oid = seq->lcs_width; 314 315 while (1) { 316 seqno_t seqnr; 317 318 if (!fid_is_zero(&seq->lcs_fid) && 319 fid_oid(&seq->lcs_fid) < seq->lcs_width) { 320 /* Just bump last allocated fid and return to caller. */ 321 seq->lcs_fid.f_oid += 1; 322 rc = 0; 323 break; 324 } 325 326 rc = seq_fid_alloc_prep(seq, &link); 327 if (rc) 328 continue; 329 330 rc = seq_client_alloc_seq(env, seq, &seqnr); 331 if (rc) { 332 CERROR("%s: Can't allocate new sequence, rc %d\n", 333 seq->lcs_name, rc); 334 seq_fid_alloc_fini(seq); 335 mutex_unlock(&seq->lcs_mutex); 336 return rc; 337 } 338 339 CDEBUG(D_INFO, "%s: Switch to sequence [0x%16.16Lx]\n", 340 seq->lcs_name, seqnr); 341 342 seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID; 343 seq->lcs_fid.f_seq = seqnr; 344 seq->lcs_fid.f_ver = 0; 345 346 /* 347 * Inform caller that sequence switch is performed to allow it 348 * to setup FLD for it. 349 */ 350 rc = 1; 351 352 seq_fid_alloc_fini(seq); 353 break; 354 } 355 356 *fid = seq->lcs_fid; 357 mutex_unlock(&seq->lcs_mutex); 358 359 CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name, PFID(fid)); 360 return rc; 361} 362EXPORT_SYMBOL(seq_client_alloc_fid); 363 364/* 365 * Finish the current sequence due to disconnect. 366 * See mdc_import_event() 367 */ 368void seq_client_flush(struct lu_client_seq *seq) 369{ 370 wait_queue_t link; 371 372 LASSERT(seq != NULL); 373 init_waitqueue_entry(&link, current); 374 mutex_lock(&seq->lcs_mutex); 375 376 while (seq->lcs_update) { 377 add_wait_queue(&seq->lcs_waitq, &link); 378 set_current_state(TASK_UNINTERRUPTIBLE); 379 mutex_unlock(&seq->lcs_mutex); 380 381 schedule(); 382 383 mutex_lock(&seq->lcs_mutex); 384 remove_wait_queue(&seq->lcs_waitq, &link); 385 set_current_state(TASK_RUNNING); 386 } 387 388 fid_zero(&seq->lcs_fid); 389 /** 390 * this id shld not be used for seq range allocation. 391 * set to -1 for dgb check. 392 */ 393 394 seq->lcs_space.lsr_index = -1; 395 396 range_init(&seq->lcs_space); 397 mutex_unlock(&seq->lcs_mutex); 398} 399EXPORT_SYMBOL(seq_client_flush); 400 401static void seq_client_proc_fini(struct lu_client_seq *seq) 402{ 403#if defined (CONFIG_PROC_FS) 404 if (seq->lcs_proc_dir) { 405 if (!IS_ERR(seq->lcs_proc_dir)) 406 lprocfs_remove(&seq->lcs_proc_dir); 407 seq->lcs_proc_dir = NULL; 408 } 409#endif /* CONFIG_PROC_FS */ 410} 411 412static int seq_client_proc_init(struct lu_client_seq *seq) 413{ 414#if defined (CONFIG_PROC_FS) 415 int rc; 416 417 seq->lcs_proc_dir = lprocfs_register(seq->lcs_name, 418 seq_type_proc_dir, 419 NULL, NULL); 420 421 if (IS_ERR(seq->lcs_proc_dir)) { 422 CERROR("%s: LProcFS failed in seq-init\n", 423 seq->lcs_name); 424 rc = PTR_ERR(seq->lcs_proc_dir); 425 return rc; 426 } 427 428 rc = lprocfs_add_vars(seq->lcs_proc_dir, 429 seq_client_proc_list, seq); 430 if (rc) { 431 CERROR("%s: Can't init sequence manager proc, rc %d\n", 432 seq->lcs_name, rc); 433 GOTO(out_cleanup, rc); 434 } 435 436 return 0; 437 438out_cleanup: 439 seq_client_proc_fini(seq); 440 return rc; 441 442#else /* CONFIG_PROC_FS */ 443 return 0; 444#endif 445} 446 447int seq_client_init(struct lu_client_seq *seq, 448 struct obd_export *exp, 449 enum lu_cli_type type, 450 const char *prefix, 451 struct lu_server_seq *srv) 452{ 453 int rc; 454 455 LASSERT(seq != NULL); 456 LASSERT(prefix != NULL); 457 458 seq->lcs_srv = srv; 459 seq->lcs_type = type; 460 461 mutex_init(&seq->lcs_mutex); 462 if (type == LUSTRE_SEQ_METADATA) 463 seq->lcs_width = LUSTRE_METADATA_SEQ_MAX_WIDTH; 464 else 465 seq->lcs_width = LUSTRE_DATA_SEQ_MAX_WIDTH; 466 467 init_waitqueue_head(&seq->lcs_waitq); 468 /* Make sure that things are clear before work is started. */ 469 seq_client_flush(seq); 470 471 if (exp != NULL) 472 seq->lcs_exp = class_export_get(exp); 473 else if (type == LUSTRE_SEQ_METADATA) 474 LASSERT(seq->lcs_srv != NULL); 475 476 snprintf(seq->lcs_name, sizeof(seq->lcs_name), 477 "cli-%s", prefix); 478 479 rc = seq_client_proc_init(seq); 480 if (rc) 481 seq_client_fini(seq); 482 return rc; 483} 484EXPORT_SYMBOL(seq_client_init); 485 486void seq_client_fini(struct lu_client_seq *seq) 487{ 488 seq_client_proc_fini(seq); 489 490 if (seq->lcs_exp != NULL) { 491 class_export_put(seq->lcs_exp); 492 seq->lcs_exp = NULL; 493 } 494 495 seq->lcs_srv = NULL; 496} 497EXPORT_SYMBOL(seq_client_fini); 498 499int client_fid_init(struct obd_device *obd, 500 struct obd_export *exp, enum lu_cli_type type) 501{ 502 struct client_obd *cli = &obd->u.cli; 503 char *prefix; 504 int rc; 505 506 OBD_ALLOC_PTR(cli->cl_seq); 507 if (cli->cl_seq == NULL) 508 return -ENOMEM; 509 510 OBD_ALLOC(prefix, MAX_OBD_NAME + 5); 511 if (prefix == NULL) 512 GOTO(out_free_seq, rc = -ENOMEM); 513 514 snprintf(prefix, MAX_OBD_NAME + 5, "cli-%s", obd->obd_name); 515 516 /* Init client side sequence-manager */ 517 rc = seq_client_init(cli->cl_seq, exp, type, prefix, NULL); 518 OBD_FREE(prefix, MAX_OBD_NAME + 5); 519 if (rc) 520 GOTO(out_free_seq, rc); 521 522 return rc; 523out_free_seq: 524 OBD_FREE_PTR(cli->cl_seq); 525 cli->cl_seq = NULL; 526 return rc; 527} 528EXPORT_SYMBOL(client_fid_init); 529 530int client_fid_fini(struct obd_device *obd) 531{ 532 struct client_obd *cli = &obd->u.cli; 533 534 if (cli->cl_seq != NULL) { 535 seq_client_fini(cli->cl_seq); 536 OBD_FREE_PTR(cli->cl_seq); 537 cli->cl_seq = NULL; 538 } 539 540 return 0; 541} 542EXPORT_SYMBOL(client_fid_fini); 543 544struct proc_dir_entry *seq_type_proc_dir; 545 546static int __init fid_mod_init(void) 547{ 548 seq_type_proc_dir = lprocfs_register(LUSTRE_SEQ_NAME, 549 proc_lustre_root, 550 NULL, NULL); 551 return PTR_ERR_OR_ZERO(seq_type_proc_dir); 552} 553 554static void __exit fid_mod_exit(void) 555{ 556 if (seq_type_proc_dir != NULL && !IS_ERR(seq_type_proc_dir)) { 557 lprocfs_remove(&seq_type_proc_dir); 558 seq_type_proc_dir = NULL; 559 } 560} 561 562MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>"); 563MODULE_DESCRIPTION("Lustre FID Module"); 564MODULE_LICENSE("GPL"); 565MODULE_VERSION("0.1.0"); 566 567module_init(fid_mod_init); 568module_exit(fid_mod_exit); 569