fid_request.c revision d7e09d0397e84eefbabfd9cb353221f3c6448d83
1/* 2 * GPL HEADER START 3 * 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License version 2 only, 8 * as published by the Free Software Foundation. 9 * 10 * This program is distributed in the hope that it will be useful, but 11 * WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * General Public License version 2 for more details (a copy is included 14 * in the LICENSE file that accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License 17 * version 2 along with this program; If not, see 18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf 19 * 20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 21 * CA 95054 USA or visit www.sun.com if you need additional information or 22 * have any questions. 23 * 24 * GPL HEADER END 25 */ 26/* 27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. 28 * Use is subject to license terms. 29 * 30 * Copyright (c) 2011, 2012, Intel Corporation. 31 */ 32/* 33 * This file is part of Lustre, http://www.lustre.org/ 34 * Lustre is a trademark of Sun Microsystems, Inc. 35 * 36 * lustre/fid/fid_request.c 37 * 38 * Lustre Sequence Manager 39 * 40 * Author: Yury Umanets <umka@clusterfs.com> 41 */ 42 43#define DEBUG_SUBSYSTEM S_FID 44 45# include <linux/libcfs/libcfs.h> 46# include <linux/module.h> 47 48#include <obd.h> 49#include <obd_class.h> 50#include <dt_object.h> 51#include <md_object.h> 52#include <obd_support.h> 53#include <lustre_req_layout.h> 54#include <lustre_fid.h> 55/* mdc RPC locks */ 56#include <lustre_mdc.h> 57#include "fid_internal.h" 58 59static int seq_client_rpc(struct lu_client_seq *seq, 60 struct lu_seq_range *output, __u32 opc, 61 const char *opcname) 62{ 63 struct obd_export *exp = seq->lcs_exp; 64 struct ptlrpc_request *req; 65 struct lu_seq_range *out, *in; 66 __u32 *op; 67 unsigned int debug_mask; 68 int rc; 69 ENTRY; 70 71 req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_SEQ_QUERY, 72 LUSTRE_MDS_VERSION, SEQ_QUERY); 73 if (req == NULL) 74 RETURN(-ENOMEM); 75 76 /* Init operation code */ 77 op = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_OPC); 78 *op = opc; 79 80 /* Zero out input range, this is not recovery yet. */ 81 in = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_RANGE); 82 range_init(in); 83 84 ptlrpc_request_set_replen(req); 85 86 in->lsr_index = seq->lcs_space.lsr_index; 87 if (seq->lcs_type == LUSTRE_SEQ_METADATA) 88 fld_range_set_mdt(in); 89 else 90 fld_range_set_ost(in); 91 92 if (opc == SEQ_ALLOC_SUPER) { 93 req->rq_request_portal = SEQ_CONTROLLER_PORTAL; 94 req->rq_reply_portal = MDC_REPLY_PORTAL; 95 /* During allocating super sequence for data object, 96 * the current thread might hold the export of MDT0(MDT0 97 * precreating objects on this OST), and it will send the 98 * request to MDT0 here, so we can not keep resending the 99 * request here, otherwise if MDT0 is failed(umounted), 100 * it can not release the export of MDT0 */ 101 if (seq->lcs_type == LUSTRE_SEQ_DATA) 102 req->rq_no_delay = req->rq_no_resend = 1; 103 debug_mask = D_CONSOLE; 104 } else { 105 if (seq->lcs_type == LUSTRE_SEQ_METADATA) 106 req->rq_request_portal = SEQ_METADATA_PORTAL; 107 else 108 req->rq_request_portal = SEQ_DATA_PORTAL; 109 debug_mask = D_INFO; 110 } 111 112 ptlrpc_at_set_req_timeout(req); 113 114 if (seq->lcs_type == LUSTRE_SEQ_METADATA) 115 mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); 116 rc = ptlrpc_queue_wait(req); 117 if (seq->lcs_type == LUSTRE_SEQ_METADATA) 118 mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); 119 if (rc) 120 GOTO(out_req, rc); 121 122 out = req_capsule_server_get(&req->rq_pill, &RMF_SEQ_RANGE); 123 *output = *out; 124 125 if (!range_is_sane(output)) { 126 CERROR("%s: Invalid range received from server: " 127 DRANGE"\n", seq->lcs_name, PRANGE(output)); 128 GOTO(out_req, rc = -EINVAL); 129 } 130 131 if (range_is_exhausted(output)) { 132 CERROR("%s: Range received from server is exhausted: " 133 DRANGE"]\n", seq->lcs_name, PRANGE(output)); 134 GOTO(out_req, rc = -EINVAL); 135 } 136 137 CDEBUG_LIMIT(debug_mask, "%s: Allocated %s-sequence "DRANGE"]\n", 138 seq->lcs_name, opcname, PRANGE(output)); 139 140 EXIT; 141out_req: 142 ptlrpc_req_finished(req); 143 return rc; 144} 145 146/* Request sequence-controller node to allocate new super-sequence. */ 147int seq_client_alloc_super(struct lu_client_seq *seq, 148 const struct lu_env *env) 149{ 150 int rc; 151 ENTRY; 152 153 mutex_lock(&seq->lcs_mutex); 154 155 if (seq->lcs_srv) { 156 LASSERT(env != NULL); 157 rc = seq_server_alloc_super(seq->lcs_srv, &seq->lcs_space, 158 env); 159 } else { 160 /* Check whether the connection to seq controller has been 161 * setup (lcs_exp != NULL) */ 162 if (seq->lcs_exp == NULL) { 163 mutex_unlock(&seq->lcs_mutex); 164 RETURN(-EINPROGRESS); 165 } 166 167 rc = seq_client_rpc(seq, &seq->lcs_space, 168 SEQ_ALLOC_SUPER, "super"); 169 } 170 mutex_unlock(&seq->lcs_mutex); 171 RETURN(rc); 172} 173 174/* Request sequence-controller node to allocate new meta-sequence. */ 175static int seq_client_alloc_meta(const struct lu_env *env, 176 struct lu_client_seq *seq) 177{ 178 int rc; 179 ENTRY; 180 181 if (seq->lcs_srv) { 182 LASSERT(env != NULL); 183 rc = seq_server_alloc_meta(seq->lcs_srv, &seq->lcs_space, env); 184 } else { 185 do { 186 /* If meta server return -EINPROGRESS or EAGAIN, 187 * it means meta server might not be ready to 188 * allocate super sequence from sequence controller 189 * (MDT0)yet */ 190 rc = seq_client_rpc(seq, &seq->lcs_space, 191 SEQ_ALLOC_META, "meta"); 192 } while (rc == -EINPROGRESS || rc == -EAGAIN); 193 } 194 RETURN(rc); 195} 196 197/* Allocate new sequence for client. */ 198static int seq_client_alloc_seq(const struct lu_env *env, 199 struct lu_client_seq *seq, seqno_t *seqnr) 200{ 201 int rc; 202 ENTRY; 203 204 LASSERT(range_is_sane(&seq->lcs_space)); 205 206 if (range_is_exhausted(&seq->lcs_space)) { 207 rc = seq_client_alloc_meta(env, seq); 208 if (rc) { 209 CERROR("%s: Can't allocate new meta-sequence," 210 "rc %d\n", seq->lcs_name, rc); 211 RETURN(rc); 212 } else { 213 CDEBUG(D_INFO, "%s: New range - "DRANGE"\n", 214 seq->lcs_name, PRANGE(&seq->lcs_space)); 215 } 216 } else { 217 rc = 0; 218 } 219 220 LASSERT(!range_is_exhausted(&seq->lcs_space)); 221 *seqnr = seq->lcs_space.lsr_start; 222 seq->lcs_space.lsr_start += 1; 223 224 CDEBUG(D_INFO, "%s: Allocated sequence ["LPX64"]\n", seq->lcs_name, 225 *seqnr); 226 227 RETURN(rc); 228} 229 230static int seq_fid_alloc_prep(struct lu_client_seq *seq, 231 wait_queue_t *link) 232{ 233 if (seq->lcs_update) { 234 add_wait_queue(&seq->lcs_waitq, link); 235 set_current_state(TASK_UNINTERRUPTIBLE); 236 mutex_unlock(&seq->lcs_mutex); 237 238 waitq_wait(link, TASK_UNINTERRUPTIBLE); 239 240 mutex_lock(&seq->lcs_mutex); 241 remove_wait_queue(&seq->lcs_waitq, link); 242 set_current_state(TASK_RUNNING); 243 return -EAGAIN; 244 } 245 ++seq->lcs_update; 246 mutex_unlock(&seq->lcs_mutex); 247 return 0; 248} 249 250static void seq_fid_alloc_fini(struct lu_client_seq *seq) 251{ 252 LASSERT(seq->lcs_update == 1); 253 mutex_lock(&seq->lcs_mutex); 254 --seq->lcs_update; 255 wake_up(&seq->lcs_waitq); 256} 257 258/** 259 * Allocate the whole seq to the caller. 260 **/ 261int seq_client_get_seq(const struct lu_env *env, 262 struct lu_client_seq *seq, seqno_t *seqnr) 263{ 264 wait_queue_t link; 265 int rc; 266 267 LASSERT(seqnr != NULL); 268 mutex_lock(&seq->lcs_mutex); 269 init_waitqueue_entry_current(&link); 270 271 while (1) { 272 rc = seq_fid_alloc_prep(seq, &link); 273 if (rc == 0) 274 break; 275 } 276 277 rc = seq_client_alloc_seq(env, seq, seqnr); 278 if (rc) { 279 CERROR("%s: Can't allocate new sequence, " 280 "rc %d\n", seq->lcs_name, rc); 281 seq_fid_alloc_fini(seq); 282 mutex_unlock(&seq->lcs_mutex); 283 return rc; 284 } 285 286 CDEBUG(D_INFO, "%s: allocate sequence " 287 "[0x%16.16"LPF64"x]\n", seq->lcs_name, *seqnr); 288 289 /* Since the caller require the whole seq, 290 * so marked this seq to be used */ 291 if (seq->lcs_type == LUSTRE_SEQ_METADATA) 292 seq->lcs_fid.f_oid = LUSTRE_METADATA_SEQ_MAX_WIDTH; 293 else 294 seq->lcs_fid.f_oid = LUSTRE_DATA_SEQ_MAX_WIDTH; 295 296 seq->lcs_fid.f_seq = *seqnr; 297 seq->lcs_fid.f_ver = 0; 298 /* 299 * Inform caller that sequence switch is performed to allow it 300 * to setup FLD for it. 301 */ 302 seq_fid_alloc_fini(seq); 303 mutex_unlock(&seq->lcs_mutex); 304 305 return rc; 306} 307EXPORT_SYMBOL(seq_client_get_seq); 308 309/* Allocate new fid on passed client @seq and save it to @fid. */ 310int seq_client_alloc_fid(const struct lu_env *env, 311 struct lu_client_seq *seq, struct lu_fid *fid) 312{ 313 wait_queue_t link; 314 int rc; 315 ENTRY; 316 317 LASSERT(seq != NULL); 318 LASSERT(fid != NULL); 319 320 init_waitqueue_entry_current(&link); 321 mutex_lock(&seq->lcs_mutex); 322 323 if (OBD_FAIL_CHECK(OBD_FAIL_SEQ_EXHAUST)) 324 seq->lcs_fid.f_oid = seq->lcs_width; 325 326 while (1) { 327 seqno_t seqnr; 328 329 if (!fid_is_zero(&seq->lcs_fid) && 330 fid_oid(&seq->lcs_fid) < seq->lcs_width) { 331 /* Just bump last allocated fid and return to caller. */ 332 seq->lcs_fid.f_oid += 1; 333 rc = 0; 334 break; 335 } 336 337 rc = seq_fid_alloc_prep(seq, &link); 338 if (rc) 339 continue; 340 341 rc = seq_client_alloc_seq(env, seq, &seqnr); 342 if (rc) { 343 CERROR("%s: Can't allocate new sequence, " 344 "rc %d\n", seq->lcs_name, rc); 345 seq_fid_alloc_fini(seq); 346 mutex_unlock(&seq->lcs_mutex); 347 RETURN(rc); 348 } 349 350 CDEBUG(D_INFO, "%s: Switch to sequence " 351 "[0x%16.16"LPF64"x]\n", seq->lcs_name, seqnr); 352 353 seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID; 354 seq->lcs_fid.f_seq = seqnr; 355 seq->lcs_fid.f_ver = 0; 356 357 /* 358 * Inform caller that sequence switch is performed to allow it 359 * to setup FLD for it. 360 */ 361 rc = 1; 362 363 seq_fid_alloc_fini(seq); 364 break; 365 } 366 367 *fid = seq->lcs_fid; 368 mutex_unlock(&seq->lcs_mutex); 369 370 CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name, PFID(fid)); 371 RETURN(rc); 372} 373EXPORT_SYMBOL(seq_client_alloc_fid); 374 375/* 376 * Finish the current sequence due to disconnect. 377 * See mdc_import_event() 378 */ 379void seq_client_flush(struct lu_client_seq *seq) 380{ 381 wait_queue_t link; 382 383 LASSERT(seq != NULL); 384 init_waitqueue_entry_current(&link); 385 mutex_lock(&seq->lcs_mutex); 386 387 while (seq->lcs_update) { 388 add_wait_queue(&seq->lcs_waitq, &link); 389 set_current_state(TASK_UNINTERRUPTIBLE); 390 mutex_unlock(&seq->lcs_mutex); 391 392 waitq_wait(&link, TASK_UNINTERRUPTIBLE); 393 394 mutex_lock(&seq->lcs_mutex); 395 remove_wait_queue(&seq->lcs_waitq, &link); 396 set_current_state(TASK_RUNNING); 397 } 398 399 fid_zero(&seq->lcs_fid); 400 /** 401 * this id shld not be used for seq range allocation. 402 * set to -1 for dgb check. 403 */ 404 405 seq->lcs_space.lsr_index = -1; 406 407 range_init(&seq->lcs_space); 408 mutex_unlock(&seq->lcs_mutex); 409} 410EXPORT_SYMBOL(seq_client_flush); 411 412static void seq_client_proc_fini(struct lu_client_seq *seq); 413 414#ifdef LPROCFS 415static int seq_client_proc_init(struct lu_client_seq *seq) 416{ 417 int rc; 418 ENTRY; 419 420 seq->lcs_proc_dir = lprocfs_register(seq->lcs_name, 421 seq_type_proc_dir, 422 NULL, NULL); 423 424 if (IS_ERR(seq->lcs_proc_dir)) { 425 CERROR("%s: LProcFS failed in seq-init\n", 426 seq->lcs_name); 427 rc = PTR_ERR(seq->lcs_proc_dir); 428 RETURN(rc); 429 } 430 431 rc = lprocfs_add_vars(seq->lcs_proc_dir, 432 seq_client_proc_list, seq); 433 if (rc) { 434 CERROR("%s: Can't init sequence manager " 435 "proc, rc %d\n", seq->lcs_name, rc); 436 GOTO(out_cleanup, rc); 437 } 438 439 RETURN(0); 440 441out_cleanup: 442 seq_client_proc_fini(seq); 443 return rc; 444} 445 446static void seq_client_proc_fini(struct lu_client_seq *seq) 447{ 448 ENTRY; 449 if (seq->lcs_proc_dir) { 450 if (!IS_ERR(seq->lcs_proc_dir)) 451 lprocfs_remove(&seq->lcs_proc_dir); 452 seq->lcs_proc_dir = NULL; 453 } 454 EXIT; 455} 456#else 457static int seq_client_proc_init(struct lu_client_seq *seq) 458{ 459 return 0; 460} 461 462static void seq_client_proc_fini(struct lu_client_seq *seq) 463{ 464 return; 465} 466#endif 467 468int seq_client_init(struct lu_client_seq *seq, 469 struct obd_export *exp, 470 enum lu_cli_type type, 471 const char *prefix, 472 struct lu_server_seq *srv) 473{ 474 int rc; 475 ENTRY; 476 477 LASSERT(seq != NULL); 478 LASSERT(prefix != NULL); 479 480 seq->lcs_srv = srv; 481 seq->lcs_type = type; 482 483 mutex_init(&seq->lcs_mutex); 484 if (type == LUSTRE_SEQ_METADATA) 485 seq->lcs_width = LUSTRE_METADATA_SEQ_MAX_WIDTH; 486 else 487 seq->lcs_width = LUSTRE_DATA_SEQ_MAX_WIDTH; 488 489 init_waitqueue_head(&seq->lcs_waitq); 490 /* Make sure that things are clear before work is started. */ 491 seq_client_flush(seq); 492 493 if (exp != NULL) 494 seq->lcs_exp = class_export_get(exp); 495 else if (type == LUSTRE_SEQ_METADATA) 496 LASSERT(seq->lcs_srv != NULL); 497 498 snprintf(seq->lcs_name, sizeof(seq->lcs_name), 499 "cli-%s", prefix); 500 501 rc = seq_client_proc_init(seq); 502 if (rc) 503 seq_client_fini(seq); 504 RETURN(rc); 505} 506EXPORT_SYMBOL(seq_client_init); 507 508void seq_client_fini(struct lu_client_seq *seq) 509{ 510 ENTRY; 511 512 seq_client_proc_fini(seq); 513 514 if (seq->lcs_exp != NULL) { 515 class_export_put(seq->lcs_exp); 516 seq->lcs_exp = NULL; 517 } 518 519 seq->lcs_srv = NULL; 520 EXIT; 521} 522EXPORT_SYMBOL(seq_client_fini); 523