osc_request.c revision 9d8654397d0dcb1885457a2188b59995f2219676
1/* 2 * GPL HEADER START 3 * 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License version 2 only, 8 * as published by the Free Software Foundation. 9 * 10 * This program is distributed in the hope that it will be useful, but 11 * WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * General Public License version 2 for more details (a copy is included 14 * in the LICENSE file that accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License 17 * version 2 along with this program; If not, see 18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf 19 * 20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 21 * CA 95054 USA or visit www.sun.com if you need additional information or 22 * have any questions. 23 * 24 * GPL HEADER END 25 */ 26/* 27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. 28 * Use is subject to license terms. 29 * 30 * Copyright (c) 2011, 2012, Intel Corporation. 31 */ 32/* 33 * This file is part of Lustre, http://www.lustre.org/ 34 * Lustre is a trademark of Sun Microsystems, Inc. 35 */ 36 37#define DEBUG_SUBSYSTEM S_OSC 38 39#include <linux/libcfs/libcfs.h> 40 41 42#include <lustre_dlm.h> 43#include <lustre_net.h> 44#include <lustre/lustre_user.h> 45#include <obd_cksum.h> 46#include <obd_ost.h> 47#include <obd_lov.h> 48 49#ifdef __CYGWIN__ 50# include <ctype.h> 51#endif 52 53#include <lustre_ha.h> 54#include <lprocfs_status.h> 55#include <lustre_log.h> 56#include <lustre_debug.h> 57#include <lustre_param.h> 58#include <lustre_fid.h> 59#include "osc_internal.h" 60#include "osc_cl_internal.h" 61 62static void osc_release_ppga(struct brw_page **ppga, obd_count count); 63static int brw_interpret(const struct lu_env *env, 64 struct ptlrpc_request *req, void *data, int rc); 65int osc_cleanup(struct obd_device *obd); 66 67/* Pack OSC object metadata for disk storage (LE byte order). */ 68static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp, 69 struct lov_stripe_md *lsm) 70{ 71 int lmm_size; 72 ENTRY; 73 74 lmm_size = sizeof(**lmmp); 75 if (lmmp == NULL) 76 RETURN(lmm_size); 77 78 if (*lmmp != NULL && lsm == NULL) { 79 OBD_FREE(*lmmp, lmm_size); 80 *lmmp = NULL; 81 RETURN(0); 82 } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) { 83 RETURN(-EBADF); 84 } 85 86 if (*lmmp == NULL) { 87 OBD_ALLOC(*lmmp, lmm_size); 88 if (*lmmp == NULL) 89 RETURN(-ENOMEM); 90 } 91 92 if (lsm) 93 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi); 94 95 RETURN(lmm_size); 96} 97 98/* Unpack OSC object metadata from disk storage (LE byte order). */ 99static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, 100 struct lov_mds_md *lmm, int lmm_bytes) 101{ 102 int lsm_size; 103 struct obd_import *imp = class_exp2cliimp(exp); 104 ENTRY; 105 106 if (lmm != NULL) { 107 if (lmm_bytes < sizeof(*lmm)) { 108 CERROR("%s: lov_mds_md too small: %d, need %d\n", 109 exp->exp_obd->obd_name, lmm_bytes, 110 (int)sizeof(*lmm)); 111 RETURN(-EINVAL); 112 } 113 /* XXX LOV_MAGIC etc check? */ 114 115 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) { 116 CERROR("%s: zero lmm_object_id: rc = %d\n", 117 exp->exp_obd->obd_name, -EINVAL); 118 RETURN(-EINVAL); 119 } 120 } 121 122 lsm_size = lov_stripe_md_size(1); 123 if (lsmp == NULL) 124 RETURN(lsm_size); 125 126 if (*lsmp != NULL && lmm == NULL) { 127 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo)); 128 OBD_FREE(*lsmp, lsm_size); 129 *lsmp = NULL; 130 RETURN(0); 131 } 132 133 if (*lsmp == NULL) { 134 OBD_ALLOC(*lsmp, lsm_size); 135 if (unlikely(*lsmp == NULL)) 136 RETURN(-ENOMEM); 137 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo)); 138 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) { 139 OBD_FREE(*lsmp, lsm_size); 140 RETURN(-ENOMEM); 141 } 142 loi_init((*lsmp)->lsm_oinfo[0]); 143 } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) { 144 RETURN(-EBADF); 145 } 146 147 if (lmm != NULL) 148 /* XXX zero *lsmp? */ 149 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi); 150 151 if (imp != NULL && 152 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES)) 153 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes; 154 else 155 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES; 156 157 RETURN(lsm_size); 158} 159 160static inline void osc_pack_capa(struct ptlrpc_request *req, 161 struct ost_body *body, void *capa) 162{ 163 struct obd_capa *oc = (struct obd_capa *)capa; 164 struct lustre_capa *c; 165 166 if (!capa) 167 return; 168 169 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1); 170 LASSERT(c); 171 capa_cpy(c, oc); 172 body->oa.o_valid |= OBD_MD_FLOSSCAPA; 173 DEBUG_CAPA(D_SEC, c, "pack"); 174} 175 176static inline void osc_pack_req_body(struct ptlrpc_request *req, 177 struct obd_info *oinfo) 178{ 179 struct ost_body *body; 180 181 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); 182 LASSERT(body); 183 184 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, 185 oinfo->oi_oa); 186 osc_pack_capa(req, body, oinfo->oi_capa); 187} 188 189static inline void osc_set_capa_size(struct ptlrpc_request *req, 190 const struct req_msg_field *field, 191 struct obd_capa *oc) 192{ 193 if (oc == NULL) 194 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0); 195 else 196 /* it is already calculated as sizeof struct obd_capa */ 197 ; 198} 199 200static int osc_getattr_interpret(const struct lu_env *env, 201 struct ptlrpc_request *req, 202 struct osc_async_args *aa, int rc) 203{ 204 struct ost_body *body; 205 ENTRY; 206 207 if (rc != 0) 208 GOTO(out, rc); 209 210 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); 211 if (body) { 212 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); 213 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, 214 aa->aa_oi->oi_oa, &body->oa); 215 216 /* This should really be sent by the OST */ 217 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE; 218 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ; 219 } else { 220 CDEBUG(D_INFO, "can't unpack ost_body\n"); 221 rc = -EPROTO; 222 aa->aa_oi->oi_oa->o_valid = 0; 223 } 224out: 225 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc); 226 RETURN(rc); 227} 228 229static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo, 230 struct ptlrpc_request_set *set) 231{ 232 struct ptlrpc_request *req; 233 struct osc_async_args *aa; 234 int rc; 235 ENTRY; 236 237 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR); 238 if (req == NULL) 239 RETURN(-ENOMEM); 240 241 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); 242 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR); 243 if (rc) { 244 ptlrpc_request_free(req); 245 RETURN(rc); 246 } 247 248 osc_pack_req_body(req, oinfo); 249 250 ptlrpc_request_set_replen(req); 251 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret; 252 253 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); 254 aa = ptlrpc_req_async_args(req); 255 aa->aa_oi = oinfo; 256 257 ptlrpc_set_add_req(set, req); 258 RETURN(0); 259} 260 261static int osc_getattr(const struct lu_env *env, struct obd_export *exp, 262 struct obd_info *oinfo) 263{ 264 struct ptlrpc_request *req; 265 struct ost_body *body; 266 int rc; 267 ENTRY; 268 269 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR); 270 if (req == NULL) 271 RETURN(-ENOMEM); 272 273 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); 274 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR); 275 if (rc) { 276 ptlrpc_request_free(req); 277 RETURN(rc); 278 } 279 280 osc_pack_req_body(req, oinfo); 281 282 ptlrpc_request_set_replen(req); 283 284 rc = ptlrpc_queue_wait(req); 285 if (rc) 286 GOTO(out, rc); 287 288 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); 289 if (body == NULL) 290 GOTO(out, rc = -EPROTO); 291 292 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); 293 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa, 294 &body->oa); 295 296 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd); 297 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ; 298 299 EXIT; 300 out: 301 ptlrpc_req_finished(req); 302 return rc; 303} 304 305static int osc_setattr(const struct lu_env *env, struct obd_export *exp, 306 struct obd_info *oinfo, struct obd_trans_info *oti) 307{ 308 struct ptlrpc_request *req; 309 struct ost_body *body; 310 int rc; 311 ENTRY; 312 313 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP); 314 315 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR); 316 if (req == NULL) 317 RETURN(-ENOMEM); 318 319 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); 320 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR); 321 if (rc) { 322 ptlrpc_request_free(req); 323 RETURN(rc); 324 } 325 326 osc_pack_req_body(req, oinfo); 327 328 ptlrpc_request_set_replen(req); 329 330 rc = ptlrpc_queue_wait(req); 331 if (rc) 332 GOTO(out, rc); 333 334 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); 335 if (body == NULL) 336 GOTO(out, rc = -EPROTO); 337 338 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa, 339 &body->oa); 340 341 EXIT; 342out: 343 ptlrpc_req_finished(req); 344 RETURN(rc); 345} 346 347static int osc_setattr_interpret(const struct lu_env *env, 348 struct ptlrpc_request *req, 349 struct osc_setattr_args *sa, int rc) 350{ 351 struct ost_body *body; 352 ENTRY; 353 354 if (rc != 0) 355 GOTO(out, rc); 356 357 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); 358 if (body == NULL) 359 GOTO(out, rc = -EPROTO); 360 361 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa, 362 &body->oa); 363out: 364 rc = sa->sa_upcall(sa->sa_cookie, rc); 365 RETURN(rc); 366} 367 368int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo, 369 struct obd_trans_info *oti, 370 obd_enqueue_update_f upcall, void *cookie, 371 struct ptlrpc_request_set *rqset) 372{ 373 struct ptlrpc_request *req; 374 struct osc_setattr_args *sa; 375 int rc; 376 ENTRY; 377 378 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR); 379 if (req == NULL) 380 RETURN(-ENOMEM); 381 382 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); 383 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR); 384 if (rc) { 385 ptlrpc_request_free(req); 386 RETURN(rc); 387 } 388 389 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) 390 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies; 391 392 osc_pack_req_body(req, oinfo); 393 394 ptlrpc_request_set_replen(req); 395 396 /* do mds to ost setattr asynchronously */ 397 if (!rqset) { 398 /* Do not wait for response. */ 399 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); 400 } else { 401 req->rq_interpret_reply = 402 (ptlrpc_interpterer_t)osc_setattr_interpret; 403 404 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args)); 405 sa = ptlrpc_req_async_args(req); 406 sa->sa_oa = oinfo->oi_oa; 407 sa->sa_upcall = upcall; 408 sa->sa_cookie = cookie; 409 410 if (rqset == PTLRPCD_SET) 411 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); 412 else 413 ptlrpc_set_add_req(rqset, req); 414 } 415 416 RETURN(0); 417} 418 419static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo, 420 struct obd_trans_info *oti, 421 struct ptlrpc_request_set *rqset) 422{ 423 return osc_setattr_async_base(exp, oinfo, oti, 424 oinfo->oi_cb_up, oinfo, rqset); 425} 426 427int osc_real_create(struct obd_export *exp, struct obdo *oa, 428 struct lov_stripe_md **ea, struct obd_trans_info *oti) 429{ 430 struct ptlrpc_request *req; 431 struct ost_body *body; 432 struct lov_stripe_md *lsm; 433 int rc; 434 ENTRY; 435 436 LASSERT(oa); 437 LASSERT(ea); 438 439 lsm = *ea; 440 if (!lsm) { 441 rc = obd_alloc_memmd(exp, &lsm); 442 if (rc < 0) 443 RETURN(rc); 444 } 445 446 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE); 447 if (req == NULL) 448 GOTO(out, rc = -ENOMEM); 449 450 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE); 451 if (rc) { 452 ptlrpc_request_free(req); 453 GOTO(out, rc); 454 } 455 456 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); 457 LASSERT(body); 458 459 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); 460 461 ptlrpc_request_set_replen(req); 462 463 if ((oa->o_valid & OBD_MD_FLFLAGS) && 464 oa->o_flags == OBD_FL_DELORPHAN) { 465 DEBUG_REQ(D_HA, req, 466 "delorphan from OST integration"); 467 /* Don't resend the delorphan req */ 468 req->rq_no_resend = req->rq_no_delay = 1; 469 } 470 471 rc = ptlrpc_queue_wait(req); 472 if (rc) 473 GOTO(out_req, rc); 474 475 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); 476 if (body == NULL) 477 GOTO(out_req, rc = -EPROTO); 478 479 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags); 480 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa); 481 482 oa->o_blksize = cli_brw_size(exp->exp_obd); 483 oa->o_valid |= OBD_MD_FLBLKSZ; 484 485 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not 486 * have valid lsm_oinfo data structs, so don't go touching that. 487 * This needs to be fixed in a big way. 488 */ 489 lsm->lsm_oi = oa->o_oi; 490 *ea = lsm; 491 492 if (oti != NULL) { 493 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg); 494 495 if (oa->o_valid & OBD_MD_FLCOOKIE) { 496 if (!oti->oti_logcookies) 497 oti_alloc_cookies(oti, 1); 498 *oti->oti_logcookies = oa->o_lcookie; 499 } 500 } 501 502 CDEBUG(D_HA, "transno: "LPD64"\n", 503 lustre_msg_get_transno(req->rq_repmsg)); 504out_req: 505 ptlrpc_req_finished(req); 506out: 507 if (rc && !*ea) 508 obd_free_memmd(exp, &lsm); 509 RETURN(rc); 510} 511 512int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo, 513 obd_enqueue_update_f upcall, void *cookie, 514 struct ptlrpc_request_set *rqset) 515{ 516 struct ptlrpc_request *req; 517 struct osc_setattr_args *sa; 518 struct ost_body *body; 519 int rc; 520 ENTRY; 521 522 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH); 523 if (req == NULL) 524 RETURN(-ENOMEM); 525 526 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); 527 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH); 528 if (rc) { 529 ptlrpc_request_free(req); 530 RETURN(rc); 531 } 532 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */ 533 ptlrpc_at_set_req_timeout(req); 534 535 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); 536 LASSERT(body); 537 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, 538 oinfo->oi_oa); 539 osc_pack_capa(req, body, oinfo->oi_capa); 540 541 ptlrpc_request_set_replen(req); 542 543 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret; 544 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args)); 545 sa = ptlrpc_req_async_args(req); 546 sa->sa_oa = oinfo->oi_oa; 547 sa->sa_upcall = upcall; 548 sa->sa_cookie = cookie; 549 if (rqset == PTLRPCD_SET) 550 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); 551 else 552 ptlrpc_set_add_req(rqset, req); 553 554 RETURN(0); 555} 556 557static int osc_punch(const struct lu_env *env, struct obd_export *exp, 558 struct obd_info *oinfo, struct obd_trans_info *oti, 559 struct ptlrpc_request_set *rqset) 560{ 561 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start; 562 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end; 563 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; 564 return osc_punch_base(exp, oinfo, 565 oinfo->oi_cb_up, oinfo, rqset); 566} 567 568static int osc_sync_interpret(const struct lu_env *env, 569 struct ptlrpc_request *req, 570 void *arg, int rc) 571{ 572 struct osc_fsync_args *fa = arg; 573 struct ost_body *body; 574 ENTRY; 575 576 if (rc) 577 GOTO(out, rc); 578 579 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); 580 if (body == NULL) { 581 CERROR ("can't unpack ost_body\n"); 582 GOTO(out, rc = -EPROTO); 583 } 584 585 *fa->fa_oi->oi_oa = body->oa; 586out: 587 rc = fa->fa_upcall(fa->fa_cookie, rc); 588 RETURN(rc); 589} 590 591int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo, 592 obd_enqueue_update_f upcall, void *cookie, 593 struct ptlrpc_request_set *rqset) 594{ 595 struct ptlrpc_request *req; 596 struct ost_body *body; 597 struct osc_fsync_args *fa; 598 int rc; 599 ENTRY; 600 601 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC); 602 if (req == NULL) 603 RETURN(-ENOMEM); 604 605 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); 606 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC); 607 if (rc) { 608 ptlrpc_request_free(req); 609 RETURN(rc); 610 } 611 612 /* overload the size and blocks fields in the oa with start/end */ 613 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); 614 LASSERT(body); 615 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, 616 oinfo->oi_oa); 617 osc_pack_capa(req, body, oinfo->oi_capa); 618 619 ptlrpc_request_set_replen(req); 620 req->rq_interpret_reply = osc_sync_interpret; 621 622 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args)); 623 fa = ptlrpc_req_async_args(req); 624 fa->fa_oi = oinfo; 625 fa->fa_upcall = upcall; 626 fa->fa_cookie = cookie; 627 628 if (rqset == PTLRPCD_SET) 629 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); 630 else 631 ptlrpc_set_add_req(rqset, req); 632 633 RETURN (0); 634} 635 636static int osc_sync(const struct lu_env *env, struct obd_export *exp, 637 struct obd_info *oinfo, obd_size start, obd_size end, 638 struct ptlrpc_request_set *set) 639{ 640 ENTRY; 641 642 if (!oinfo->oi_oa) { 643 CDEBUG(D_INFO, "oa NULL\n"); 644 RETURN(-EINVAL); 645 } 646 647 oinfo->oi_oa->o_size = start; 648 oinfo->oi_oa->o_blocks = end; 649 oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS); 650 651 RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set)); 652} 653 654/* Find and cancel locally locks matched by @mode in the resource found by 655 * @objid. Found locks are added into @cancel list. Returns the amount of 656 * locks added to @cancels list. */ 657static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa, 658 struct list_head *cancels, 659 ldlm_mode_t mode, int lock_flags) 660{ 661 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; 662 struct ldlm_res_id res_id; 663 struct ldlm_resource *res; 664 int count; 665 ENTRY; 666 667 /* Return, i.e. cancel nothing, only if ELC is supported (flag in 668 * export) but disabled through procfs (flag in NS). 669 * 670 * This distinguishes from a case when ELC is not supported originally, 671 * when we still want to cancel locks in advance and just cancel them 672 * locally, without sending any RPC. */ 673 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns)) 674 RETURN(0); 675 676 ostid_build_res_name(&oa->o_oi, &res_id); 677 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0); 678 if (res == NULL) 679 RETURN(0); 680 681 LDLM_RESOURCE_ADDREF(res); 682 count = ldlm_cancel_resource_local(res, cancels, NULL, mode, 683 lock_flags, 0, NULL); 684 LDLM_RESOURCE_DELREF(res); 685 ldlm_resource_putref(res); 686 RETURN(count); 687} 688 689static int osc_destroy_interpret(const struct lu_env *env, 690 struct ptlrpc_request *req, void *data, 691 int rc) 692{ 693 struct client_obd *cli = &req->rq_import->imp_obd->u.cli; 694 695 atomic_dec(&cli->cl_destroy_in_flight); 696 wake_up(&cli->cl_destroy_waitq); 697 return 0; 698} 699 700static int osc_can_send_destroy(struct client_obd *cli) 701{ 702 if (atomic_inc_return(&cli->cl_destroy_in_flight) <= 703 cli->cl_max_rpcs_in_flight) { 704 /* The destroy request can be sent */ 705 return 1; 706 } 707 if (atomic_dec_return(&cli->cl_destroy_in_flight) < 708 cli->cl_max_rpcs_in_flight) { 709 /* 710 * The counter has been modified between the two atomic 711 * operations. 712 */ 713 wake_up(&cli->cl_destroy_waitq); 714 } 715 return 0; 716} 717 718int osc_create(const struct lu_env *env, struct obd_export *exp, 719 struct obdo *oa, struct lov_stripe_md **ea, 720 struct obd_trans_info *oti) 721{ 722 int rc = 0; 723 ENTRY; 724 725 LASSERT(oa); 726 LASSERT(ea); 727 LASSERT(oa->o_valid & OBD_MD_FLGROUP); 728 729 if ((oa->o_valid & OBD_MD_FLFLAGS) && 730 oa->o_flags == OBD_FL_RECREATE_OBJS) { 731 RETURN(osc_real_create(exp, oa, ea, oti)); 732 } 733 734 if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi))) 735 RETURN(osc_real_create(exp, oa, ea, oti)); 736 737 /* we should not get here anymore */ 738 LBUG(); 739 740 RETURN(rc); 741} 742 743/* Destroy requests can be async always on the client, and we don't even really 744 * care about the return code since the client cannot do anything at all about 745 * a destroy failure. 746 * When the MDS is unlinking a filename, it saves the file objects into a 747 * recovery llog, and these object records are cancelled when the OST reports 748 * they were destroyed and sync'd to disk (i.e. transaction committed). 749 * If the client dies, or the OST is down when the object should be destroyed, 750 * the records are not cancelled, and when the OST reconnects to the MDS next, 751 * it will retrieve the llog unlink logs and then sends the log cancellation 752 * cookies to the MDS after committing destroy transactions. */ 753static int osc_destroy(const struct lu_env *env, struct obd_export *exp, 754 struct obdo *oa, struct lov_stripe_md *ea, 755 struct obd_trans_info *oti, struct obd_export *md_export, 756 void *capa) 757{ 758 struct client_obd *cli = &exp->exp_obd->u.cli; 759 struct ptlrpc_request *req; 760 struct ost_body *body; 761 LIST_HEAD(cancels); 762 int rc, count; 763 ENTRY; 764 765 if (!oa) { 766 CDEBUG(D_INFO, "oa NULL\n"); 767 RETURN(-EINVAL); 768 } 769 770 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW, 771 LDLM_FL_DISCARD_DATA); 772 773 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY); 774 if (req == NULL) { 775 ldlm_lock_list_put(&cancels, l_bl_ast, count); 776 RETURN(-ENOMEM); 777 } 778 779 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa); 780 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY, 781 0, &cancels, count); 782 if (rc) { 783 ptlrpc_request_free(req); 784 RETURN(rc); 785 } 786 787 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */ 788 ptlrpc_at_set_req_timeout(req); 789 790 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) 791 oa->o_lcookie = *oti->oti_logcookies; 792 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); 793 LASSERT(body); 794 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); 795 796 osc_pack_capa(req, body, (struct obd_capa *)capa); 797 ptlrpc_request_set_replen(req); 798 799 /* If osc_destory is for destroying the unlink orphan, 800 * sent from MDT to OST, which should not be blocked here, 801 * because the process might be triggered by ptlrpcd, and 802 * it is not good to block ptlrpcd thread (b=16006)*/ 803 if (!(oa->o_flags & OBD_FL_DELORPHAN)) { 804 req->rq_interpret_reply = osc_destroy_interpret; 805 if (!osc_can_send_destroy(cli)) { 806 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, 807 NULL); 808 809 /* 810 * Wait until the number of on-going destroy RPCs drops 811 * under max_rpc_in_flight 812 */ 813 l_wait_event_exclusive(cli->cl_destroy_waitq, 814 osc_can_send_destroy(cli), &lwi); 815 } 816 } 817 818 /* Do not wait for response */ 819 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); 820 RETURN(0); 821} 822 823static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, 824 long writing_bytes) 825{ 826 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT; 827 828 LASSERT(!(oa->o_valid & bits)); 829 830 oa->o_valid |= bits; 831 client_obd_list_lock(&cli->cl_loi_list_lock); 832 oa->o_dirty = cli->cl_dirty; 833 if (unlikely(cli->cl_dirty - cli->cl_dirty_transit > 834 cli->cl_dirty_max)) { 835 CERROR("dirty %lu - %lu > dirty_max %lu\n", 836 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max); 837 oa->o_undirty = 0; 838 } else if (unlikely(atomic_read(&obd_dirty_pages) - 839 atomic_read(&obd_dirty_transit_pages) > 840 (long)(obd_max_dirty_pages + 1))) { 841 /* The atomic_read() allowing the atomic_inc() are 842 * not covered by a lock thus they may safely race and trip 843 * this CERROR() unless we add in a small fudge factor (+1). */ 844 CERROR("dirty %d - %d > system dirty_max %d\n", 845 atomic_read(&obd_dirty_pages), 846 atomic_read(&obd_dirty_transit_pages), 847 obd_max_dirty_pages); 848 oa->o_undirty = 0; 849 } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) { 850 CERROR("dirty %lu - dirty_max %lu too big???\n", 851 cli->cl_dirty, cli->cl_dirty_max); 852 oa->o_undirty = 0; 853 } else { 854 long max_in_flight = (cli->cl_max_pages_per_rpc << 855 PAGE_CACHE_SHIFT)* 856 (cli->cl_max_rpcs_in_flight + 1); 857 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight); 858 } 859 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant; 860 oa->o_dropped = cli->cl_lost_grant; 861 cli->cl_lost_grant = 0; 862 client_obd_list_unlock(&cli->cl_loi_list_lock); 863 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n", 864 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant); 865 866} 867 868void osc_update_next_shrink(struct client_obd *cli) 869{ 870 cli->cl_next_shrink_grant = 871 cfs_time_shift(cli->cl_grant_shrink_interval); 872 CDEBUG(D_CACHE, "next time %ld to shrink grant \n", 873 cli->cl_next_shrink_grant); 874} 875 876static void __osc_update_grant(struct client_obd *cli, obd_size grant) 877{ 878 client_obd_list_lock(&cli->cl_loi_list_lock); 879 cli->cl_avail_grant += grant; 880 client_obd_list_unlock(&cli->cl_loi_list_lock); 881} 882 883static void osc_update_grant(struct client_obd *cli, struct ost_body *body) 884{ 885 if (body->oa.o_valid & OBD_MD_FLGRANT) { 886 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant); 887 __osc_update_grant(cli, body->oa.o_grant); 888 } 889} 890 891static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, 892 obd_count keylen, void *key, obd_count vallen, 893 void *val, struct ptlrpc_request_set *set); 894 895static int osc_shrink_grant_interpret(const struct lu_env *env, 896 struct ptlrpc_request *req, 897 void *aa, int rc) 898{ 899 struct client_obd *cli = &req->rq_import->imp_obd->u.cli; 900 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa; 901 struct ost_body *body; 902 903 if (rc != 0) { 904 __osc_update_grant(cli, oa->o_grant); 905 GOTO(out, rc); 906 } 907 908 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); 909 LASSERT(body); 910 osc_update_grant(cli, body); 911out: 912 OBDO_FREE(oa); 913 return rc; 914} 915 916static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa) 917{ 918 client_obd_list_lock(&cli->cl_loi_list_lock); 919 oa->o_grant = cli->cl_avail_grant / 4; 920 cli->cl_avail_grant -= oa->o_grant; 921 client_obd_list_unlock(&cli->cl_loi_list_lock); 922 if (!(oa->o_valid & OBD_MD_FLFLAGS)) { 923 oa->o_valid |= OBD_MD_FLFLAGS; 924 oa->o_flags = 0; 925 } 926 oa->o_flags |= OBD_FL_SHRINK_GRANT; 927 osc_update_next_shrink(cli); 928} 929 930/* Shrink the current grant, either from some large amount to enough for a 931 * full set of in-flight RPCs, or if we have already shrunk to that limit 932 * then to enough for a single RPC. This avoids keeping more grant than 933 * needed, and avoids shrinking the grant piecemeal. */ 934static int osc_shrink_grant(struct client_obd *cli) 935{ 936 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) * 937 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT); 938 939 client_obd_list_lock(&cli->cl_loi_list_lock); 940 if (cli->cl_avail_grant <= target_bytes) 941 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT; 942 client_obd_list_unlock(&cli->cl_loi_list_lock); 943 944 return osc_shrink_grant_to_target(cli, target_bytes); 945} 946 947int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes) 948{ 949 int rc = 0; 950 struct ost_body *body; 951 ENTRY; 952 953 client_obd_list_lock(&cli->cl_loi_list_lock); 954 /* Don't shrink if we are already above or below the desired limit 955 * We don't want to shrink below a single RPC, as that will negatively 956 * impact block allocation and long-term performance. */ 957 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT) 958 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT; 959 960 if (target_bytes >= cli->cl_avail_grant) { 961 client_obd_list_unlock(&cli->cl_loi_list_lock); 962 RETURN(0); 963 } 964 client_obd_list_unlock(&cli->cl_loi_list_lock); 965 966 OBD_ALLOC_PTR(body); 967 if (!body) 968 RETURN(-ENOMEM); 969 970 osc_announce_cached(cli, &body->oa, 0); 971 972 client_obd_list_lock(&cli->cl_loi_list_lock); 973 body->oa.o_grant = cli->cl_avail_grant - target_bytes; 974 cli->cl_avail_grant = target_bytes; 975 client_obd_list_unlock(&cli->cl_loi_list_lock); 976 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) { 977 body->oa.o_valid |= OBD_MD_FLFLAGS; 978 body->oa.o_flags = 0; 979 } 980 body->oa.o_flags |= OBD_FL_SHRINK_GRANT; 981 osc_update_next_shrink(cli); 982 983 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export, 984 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK, 985 sizeof(*body), body, NULL); 986 if (rc != 0) 987 __osc_update_grant(cli, body->oa.o_grant); 988 OBD_FREE_PTR(body); 989 RETURN(rc); 990} 991 992static int osc_should_shrink_grant(struct client_obd *client) 993{ 994 cfs_time_t time = cfs_time_current(); 995 cfs_time_t next_shrink = client->cl_next_shrink_grant; 996 997 if ((client->cl_import->imp_connect_data.ocd_connect_flags & 998 OBD_CONNECT_GRANT_SHRINK) == 0) 999 return 0; 1000 1001 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) { 1002 /* Get the current RPC size directly, instead of going via: 1003 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export) 1004 * Keep comment here so that it can be found by searching. */ 1005 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT; 1006 1007 if (client->cl_import->imp_state == LUSTRE_IMP_FULL && 1008 client->cl_avail_grant > brw_size) 1009 return 1; 1010 else 1011 osc_update_next_shrink(client); 1012 } 1013 return 0; 1014} 1015 1016static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data) 1017{ 1018 struct client_obd *client; 1019 1020 list_for_each_entry(client, &item->ti_obd_list, 1021 cl_grant_shrink_list) { 1022 if (osc_should_shrink_grant(client)) 1023 osc_shrink_grant(client); 1024 } 1025 return 0; 1026} 1027 1028static int osc_add_shrink_grant(struct client_obd *client) 1029{ 1030 int rc; 1031 1032 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval, 1033 TIMEOUT_GRANT, 1034 osc_grant_shrink_grant_cb, NULL, 1035 &client->cl_grant_shrink_list); 1036 if (rc) { 1037 CERROR("add grant client %s error %d\n", 1038 client->cl_import->imp_obd->obd_name, rc); 1039 return rc; 1040 } 1041 CDEBUG(D_CACHE, "add grant client %s \n", 1042 client->cl_import->imp_obd->obd_name); 1043 osc_update_next_shrink(client); 1044 return 0; 1045} 1046 1047static int osc_del_shrink_grant(struct client_obd *client) 1048{ 1049 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list, 1050 TIMEOUT_GRANT); 1051} 1052 1053static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) 1054{ 1055 /* 1056 * ocd_grant is the total grant amount we're expect to hold: if we've 1057 * been evicted, it's the new avail_grant amount, cl_dirty will drop 1058 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty. 1059 * 1060 * race is tolerable here: if we're evicted, but imp_state already 1061 * left EVICTED state, then cl_dirty must be 0 already. 1062 */ 1063 client_obd_list_lock(&cli->cl_loi_list_lock); 1064 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED) 1065 cli->cl_avail_grant = ocd->ocd_grant; 1066 else 1067 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty; 1068 1069 if (cli->cl_avail_grant < 0) { 1070 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n", 1071 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant, 1072 ocd->ocd_grant, cli->cl_dirty); 1073 /* workaround for servers which do not have the patch from 1074 * LU-2679 */ 1075 cli->cl_avail_grant = ocd->ocd_grant; 1076 } 1077 1078 /* determine the appropriate chunk size used by osc_extent. */ 1079 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize); 1080 client_obd_list_unlock(&cli->cl_loi_list_lock); 1081 1082 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld." 1083 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name, 1084 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits); 1085 1086 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK && 1087 list_empty(&cli->cl_grant_shrink_list)) 1088 osc_add_shrink_grant(cli); 1089} 1090 1091/* We assume that the reason this OSC got a short read is because it read 1092 * beyond the end of a stripe file; i.e. lustre is reading a sparse file 1093 * via the LOV, and it _knows_ it's reading inside the file, it's just that 1094 * this stripe never got written at or beyond this stripe offset yet. */ 1095static void handle_short_read(int nob_read, obd_count page_count, 1096 struct brw_page **pga) 1097{ 1098 char *ptr; 1099 int i = 0; 1100 1101 /* skip bytes read OK */ 1102 while (nob_read > 0) { 1103 LASSERT (page_count > 0); 1104 1105 if (pga[i]->count > nob_read) { 1106 /* EOF inside this page */ 1107 ptr = kmap(pga[i]->pg) + 1108 (pga[i]->off & ~CFS_PAGE_MASK); 1109 memset(ptr + nob_read, 0, pga[i]->count - nob_read); 1110 kunmap(pga[i]->pg); 1111 page_count--; 1112 i++; 1113 break; 1114 } 1115 1116 nob_read -= pga[i]->count; 1117 page_count--; 1118 i++; 1119 } 1120 1121 /* zero remaining pages */ 1122 while (page_count-- > 0) { 1123 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK); 1124 memset(ptr, 0, pga[i]->count); 1125 kunmap(pga[i]->pg); 1126 i++; 1127 } 1128} 1129 1130static int check_write_rcs(struct ptlrpc_request *req, 1131 int requested_nob, int niocount, 1132 obd_count page_count, struct brw_page **pga) 1133{ 1134 int i; 1135 __u32 *remote_rcs; 1136 1137 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS, 1138 sizeof(*remote_rcs) * 1139 niocount); 1140 if (remote_rcs == NULL) { 1141 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n"); 1142 return(-EPROTO); 1143 } 1144 1145 /* return error if any niobuf was in error */ 1146 for (i = 0; i < niocount; i++) { 1147 if ((int)remote_rcs[i] < 0) 1148 return(remote_rcs[i]); 1149 1150 if (remote_rcs[i] != 0) { 1151 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n", 1152 i, remote_rcs[i], req); 1153 return(-EPROTO); 1154 } 1155 } 1156 1157 if (req->rq_bulk->bd_nob_transferred != requested_nob) { 1158 CERROR("Unexpected # bytes transferred: %d (requested %d)\n", 1159 req->rq_bulk->bd_nob_transferred, requested_nob); 1160 return(-EPROTO); 1161 } 1162 1163 return (0); 1164} 1165 1166static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2) 1167{ 1168 if (p1->flag != p2->flag) { 1169 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE| 1170 OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA); 1171 1172 /* warn if we try to combine flags that we don't know to be 1173 * safe to combine */ 1174 if (unlikely((p1->flag & mask) != (p2->flag & mask))) { 1175 CWARN("Saw flags 0x%x and 0x%x in the same brw, please " 1176 "report this at http://bugs.whamcloud.com/\n", 1177 p1->flag, p2->flag); 1178 } 1179 return 0; 1180 } 1181 1182 return (p1->off + p1->count == p2->off); 1183} 1184 1185static obd_count osc_checksum_bulk(int nob, obd_count pg_count, 1186 struct brw_page **pga, int opc, 1187 cksum_type_t cksum_type) 1188{ 1189 __u32 cksum; 1190 int i = 0; 1191 struct cfs_crypto_hash_desc *hdesc; 1192 unsigned int bufsize; 1193 int err; 1194 unsigned char cfs_alg = cksum_obd2cfs(cksum_type); 1195 1196 LASSERT(pg_count > 0); 1197 1198 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0); 1199 if (IS_ERR(hdesc)) { 1200 CERROR("Unable to initialize checksum hash %s\n", 1201 cfs_crypto_hash_name(cfs_alg)); 1202 return PTR_ERR(hdesc); 1203 } 1204 1205 while (nob > 0 && pg_count > 0) { 1206 int count = pga[i]->count > nob ? nob : pga[i]->count; 1207 1208 /* corrupt the data before we compute the checksum, to 1209 * simulate an OST->client data error */ 1210 if (i == 0 && opc == OST_READ && 1211 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) { 1212 unsigned char *ptr = kmap(pga[i]->pg); 1213 int off = pga[i]->off & ~CFS_PAGE_MASK; 1214 memcpy(ptr + off, "bad1", min(4, nob)); 1215 kunmap(pga[i]->pg); 1216 } 1217 cfs_crypto_hash_update_page(hdesc, pga[i]->pg, 1218 pga[i]->off & ~CFS_PAGE_MASK, 1219 count); 1220 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n", 1221 (int)(pga[i]->off & ~CFS_PAGE_MASK)); 1222 1223 nob -= pga[i]->count; 1224 pg_count--; 1225 i++; 1226 } 1227 1228 bufsize = 4; 1229 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize); 1230 1231 if (err) 1232 cfs_crypto_hash_final(hdesc, NULL, NULL); 1233 1234 /* For sending we only compute the wrong checksum instead 1235 * of corrupting the data so it is still correct on a redo */ 1236 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND)) 1237 cksum++; 1238 1239 return cksum; 1240} 1241 1242static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, 1243 struct lov_stripe_md *lsm, obd_count page_count, 1244 struct brw_page **pga, 1245 struct ptlrpc_request **reqp, 1246 struct obd_capa *ocapa, int reserve, 1247 int resend) 1248{ 1249 struct ptlrpc_request *req; 1250 struct ptlrpc_bulk_desc *desc; 1251 struct ost_body *body; 1252 struct obd_ioobj *ioobj; 1253 struct niobuf_remote *niobuf; 1254 int niocount, i, requested_nob, opc, rc; 1255 struct osc_brw_async_args *aa; 1256 struct req_capsule *pill; 1257 struct brw_page *pg_prev; 1258 1259 ENTRY; 1260 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ)) 1261 RETURN(-ENOMEM); /* Recoverable */ 1262 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2)) 1263 RETURN(-EINVAL); /* Fatal */ 1264 1265 if ((cmd & OBD_BRW_WRITE) != 0) { 1266 opc = OST_WRITE; 1267 req = ptlrpc_request_alloc_pool(cli->cl_import, 1268 cli->cl_import->imp_rq_pool, 1269 &RQF_OST_BRW_WRITE); 1270 } else { 1271 opc = OST_READ; 1272 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ); 1273 } 1274 if (req == NULL) 1275 RETURN(-ENOMEM); 1276 1277 for (niocount = i = 1; i < page_count; i++) { 1278 if (!can_merge_pages(pga[i - 1], pga[i])) 1279 niocount++; 1280 } 1281 1282 pill = &req->rq_pill; 1283 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT, 1284 sizeof(*ioobj)); 1285 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT, 1286 niocount * sizeof(*niobuf)); 1287 osc_set_capa_size(req, &RMF_CAPA1, ocapa); 1288 1289 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc); 1290 if (rc) { 1291 ptlrpc_request_free(req); 1292 RETURN(rc); 1293 } 1294 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */ 1295 ptlrpc_at_set_req_timeout(req); 1296 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own 1297 * retry logic */ 1298 req->rq_no_retry_einprogress = 1; 1299 1300 desc = ptlrpc_prep_bulk_imp(req, page_count, 1301 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS, 1302 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK, 1303 OST_BULK_PORTAL); 1304 1305 if (desc == NULL) 1306 GOTO(out, rc = -ENOMEM); 1307 /* NB request now owns desc and will free it when it gets freed */ 1308 1309 body = req_capsule_client_get(pill, &RMF_OST_BODY); 1310 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ); 1311 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE); 1312 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL); 1313 1314 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); 1315 1316 obdo_to_ioobj(oa, ioobj); 1317 ioobj->ioo_bufcnt = niocount; 1318 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks 1319 * that might be send for this request. The actual number is decided 1320 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends 1321 * "max - 1" for old client compatibility sending "0", and also so the 1322 * the actual maximum is a power-of-two number, not one less. LU-1431 */ 1323 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw); 1324 osc_pack_capa(req, body, ocapa); 1325 LASSERT(page_count > 0); 1326 pg_prev = pga[0]; 1327 for (requested_nob = i = 0; i < page_count; i++, niobuf++) { 1328 struct brw_page *pg = pga[i]; 1329 int poff = pg->off & ~CFS_PAGE_MASK; 1330 1331 LASSERT(pg->count > 0); 1332 /* make sure there is no gap in the middle of page array */ 1333 LASSERTF(page_count == 1 || 1334 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) && 1335 ergo(i > 0 && i < page_count - 1, 1336 poff == 0 && pg->count == PAGE_CACHE_SIZE) && 1337 ergo(i == page_count - 1, poff == 0)), 1338 "i: %d/%d pg: %p off: "LPU64", count: %u\n", 1339 i, page_count, pg, pg->off, pg->count); 1340 LASSERTF(i == 0 || pg->off > pg_prev->off, 1341 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64 1342 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n", 1343 i, page_count, 1344 pg->pg, page_private(pg->pg), pg->pg->index, pg->off, 1345 pg_prev->pg, page_private(pg_prev->pg), 1346 pg_prev->pg->index, pg_prev->off); 1347 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) == 1348 (pg->flag & OBD_BRW_SRVLOCK)); 1349 1350 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count); 1351 requested_nob += pg->count; 1352 1353 if (i > 0 && can_merge_pages(pg_prev, pg)) { 1354 niobuf--; 1355 niobuf->len += pg->count; 1356 } else { 1357 niobuf->offset = pg->off; 1358 niobuf->len = pg->count; 1359 niobuf->flags = pg->flag; 1360 } 1361 pg_prev = pg; 1362 } 1363 1364 LASSERTF((void *)(niobuf - niocount) == 1365 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE), 1366 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill, 1367 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount)); 1368 1369 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0); 1370 if (resend) { 1371 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) { 1372 body->oa.o_valid |= OBD_MD_FLFLAGS; 1373 body->oa.o_flags = 0; 1374 } 1375 body->oa.o_flags |= OBD_FL_RECOV_RESEND; 1376 } 1377 1378 if (osc_should_shrink_grant(cli)) 1379 osc_shrink_grant_local(cli, &body->oa); 1380 1381 /* size[REQ_REC_OFF] still sizeof (*body) */ 1382 if (opc == OST_WRITE) { 1383 if (cli->cl_checksum && 1384 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) { 1385 /* store cl_cksum_type in a local variable since 1386 * it can be changed via lprocfs */ 1387 cksum_type_t cksum_type = cli->cl_cksum_type; 1388 1389 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) { 1390 oa->o_flags &= OBD_FL_LOCAL_MASK; 1391 body->oa.o_flags = 0; 1392 } 1393 body->oa.o_flags |= cksum_type_pack(cksum_type); 1394 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; 1395 body->oa.o_cksum = osc_checksum_bulk(requested_nob, 1396 page_count, pga, 1397 OST_WRITE, 1398 cksum_type); 1399 CDEBUG(D_PAGE, "checksum at write origin: %x\n", 1400 body->oa.o_cksum); 1401 /* save this in 'oa', too, for later checking */ 1402 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; 1403 oa->o_flags |= cksum_type_pack(cksum_type); 1404 } else { 1405 /* clear out the checksum flag, in case this is a 1406 * resend but cl_checksum is no longer set. b=11238 */ 1407 oa->o_valid &= ~OBD_MD_FLCKSUM; 1408 } 1409 oa->o_cksum = body->oa.o_cksum; 1410 /* 1 RC per niobuf */ 1411 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER, 1412 sizeof(__u32) * niocount); 1413 } else { 1414 if (cli->cl_checksum && 1415 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) { 1416 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) 1417 body->oa.o_flags = 0; 1418 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type); 1419 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; 1420 } 1421 } 1422 ptlrpc_request_set_replen(req); 1423 1424 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); 1425 aa = ptlrpc_req_async_args(req); 1426 aa->aa_oa = oa; 1427 aa->aa_requested_nob = requested_nob; 1428 aa->aa_nio_count = niocount; 1429 aa->aa_page_count = page_count; 1430 aa->aa_resends = 0; 1431 aa->aa_ppga = pga; 1432 aa->aa_cli = cli; 1433 INIT_LIST_HEAD(&aa->aa_oaps); 1434 if (ocapa && reserve) 1435 aa->aa_ocapa = capa_get(ocapa); 1436 1437 *reqp = req; 1438 RETURN(0); 1439 1440 out: 1441 ptlrpc_req_finished(req); 1442 RETURN(rc); 1443} 1444 1445static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer, 1446 __u32 client_cksum, __u32 server_cksum, int nob, 1447 obd_count page_count, struct brw_page **pga, 1448 cksum_type_t client_cksum_type) 1449{ 1450 __u32 new_cksum; 1451 char *msg; 1452 cksum_type_t cksum_type; 1453 1454 if (server_cksum == client_cksum) { 1455 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum); 1456 return 0; 1457 } 1458 1459 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ? 1460 oa->o_flags : 0); 1461 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE, 1462 cksum_type); 1463 1464 if (cksum_type != client_cksum_type) 1465 msg = "the server did not use the checksum type specified in " 1466 "the original request - likely a protocol problem"; 1467 else if (new_cksum == server_cksum) 1468 msg = "changed on the client after we checksummed it - " 1469 "likely false positive due to mmap IO (bug 11742)"; 1470 else if (new_cksum == client_cksum) 1471 msg = "changed in transit before arrival at OST"; 1472 else 1473 msg = "changed in transit AND doesn't match the original - " 1474 "likely false positive due to mmap IO (bug 11742)"; 1475 1476 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID 1477 " object "DOSTID" extent ["LPU64"-"LPU64"]\n", 1478 msg, libcfs_nid2str(peer->nid), 1479 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0, 1480 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0, 1481 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0, 1482 POSTID(&oa->o_oi), pga[0]->off, 1483 pga[page_count-1]->off + pga[page_count-1]->count - 1); 1484 CERROR("original client csum %x (type %x), server csum %x (type %x), " 1485 "client csum now %x\n", client_cksum, client_cksum_type, 1486 server_cksum, cksum_type, new_cksum); 1487 return 1; 1488} 1489 1490/* Note rc enters this function as number of bytes transferred */ 1491static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) 1492{ 1493 struct osc_brw_async_args *aa = (void *)&req->rq_async_args; 1494 const lnet_process_id_t *peer = 1495 &req->rq_import->imp_connection->c_peer; 1496 struct client_obd *cli = aa->aa_cli; 1497 struct ost_body *body; 1498 __u32 client_cksum = 0; 1499 ENTRY; 1500 1501 if (rc < 0 && rc != -EDQUOT) { 1502 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc); 1503 RETURN(rc); 1504 } 1505 1506 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc); 1507 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); 1508 if (body == NULL) { 1509 DEBUG_REQ(D_INFO, req, "Can't unpack body\n"); 1510 RETURN(-EPROTO); 1511 } 1512 1513 /* set/clear over quota flag for a uid/gid */ 1514 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && 1515 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) { 1516 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid }; 1517 1518 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n", 1519 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid, 1520 body->oa.o_flags); 1521 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags); 1522 } 1523 1524 osc_update_grant(cli, body); 1525 1526 if (rc < 0) 1527 RETURN(rc); 1528 1529 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM) 1530 client_cksum = aa->aa_oa->o_cksum; /* save for later */ 1531 1532 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) { 1533 if (rc > 0) { 1534 CERROR("Unexpected +ve rc %d\n", rc); 1535 RETURN(-EPROTO); 1536 } 1537 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob); 1538 1539 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk)) 1540 RETURN(-EAGAIN); 1541 1542 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum && 1543 check_write_checksum(&body->oa, peer, client_cksum, 1544 body->oa.o_cksum, aa->aa_requested_nob, 1545 aa->aa_page_count, aa->aa_ppga, 1546 cksum_type_unpack(aa->aa_oa->o_flags))) 1547 RETURN(-EAGAIN); 1548 1549 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count, 1550 aa->aa_page_count, aa->aa_ppga); 1551 GOTO(out, rc); 1552 } 1553 1554 /* The rest of this function executes only for OST_READs */ 1555 1556 /* if unwrap_bulk failed, return -EAGAIN to retry */ 1557 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc); 1558 if (rc < 0) 1559 GOTO(out, rc = -EAGAIN); 1560 1561 if (rc > aa->aa_requested_nob) { 1562 CERROR("Unexpected rc %d (%d requested)\n", rc, 1563 aa->aa_requested_nob); 1564 RETURN(-EPROTO); 1565 } 1566 1567 if (rc != req->rq_bulk->bd_nob_transferred) { 1568 CERROR ("Unexpected rc %d (%d transferred)\n", 1569 rc, req->rq_bulk->bd_nob_transferred); 1570 return (-EPROTO); 1571 } 1572 1573 if (rc < aa->aa_requested_nob) 1574 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga); 1575 1576 if (body->oa.o_valid & OBD_MD_FLCKSUM) { 1577 static int cksum_counter; 1578 __u32 server_cksum = body->oa.o_cksum; 1579 char *via; 1580 char *router; 1581 cksum_type_t cksum_type; 1582 1583 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS? 1584 body->oa.o_flags : 0); 1585 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count, 1586 aa->aa_ppga, OST_READ, 1587 cksum_type); 1588 1589 if (peer->nid == req->rq_bulk->bd_sender) { 1590 via = router = ""; 1591 } else { 1592 via = " via "; 1593 router = libcfs_nid2str(req->rq_bulk->bd_sender); 1594 } 1595 1596 if (server_cksum == ~0 && rc > 0) { 1597 CERROR("Protocol error: server %s set the 'checksum' " 1598 "bit, but didn't send a checksum. Not fatal, " 1599 "but please notify on http://bugs.whamcloud.com/\n", 1600 libcfs_nid2str(peer->nid)); 1601 } else if (server_cksum != client_cksum) { 1602 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from " 1603 "%s%s%s inode "DFID" object "DOSTID 1604 " extent ["LPU64"-"LPU64"]\n", 1605 req->rq_import->imp_obd->obd_name, 1606 libcfs_nid2str(peer->nid), 1607 via, router, 1608 body->oa.o_valid & OBD_MD_FLFID ? 1609 body->oa.o_parent_seq : (__u64)0, 1610 body->oa.o_valid & OBD_MD_FLFID ? 1611 body->oa.o_parent_oid : 0, 1612 body->oa.o_valid & OBD_MD_FLFID ? 1613 body->oa.o_parent_ver : 0, 1614 POSTID(&body->oa.o_oi), 1615 aa->aa_ppga[0]->off, 1616 aa->aa_ppga[aa->aa_page_count-1]->off + 1617 aa->aa_ppga[aa->aa_page_count-1]->count - 1618 1); 1619 CERROR("client %x, server %x, cksum_type %x\n", 1620 client_cksum, server_cksum, cksum_type); 1621 cksum_counter = 0; 1622 aa->aa_oa->o_cksum = client_cksum; 1623 rc = -EAGAIN; 1624 } else { 1625 cksum_counter++; 1626 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum); 1627 rc = 0; 1628 } 1629 } else if (unlikely(client_cksum)) { 1630 static int cksum_missed; 1631 1632 cksum_missed++; 1633 if ((cksum_missed & (-cksum_missed)) == cksum_missed) 1634 CERROR("Checksum %u requested from %s but not sent\n", 1635 cksum_missed, libcfs_nid2str(peer->nid)); 1636 } else { 1637 rc = 0; 1638 } 1639out: 1640 if (rc >= 0) 1641 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, 1642 aa->aa_oa, &body->oa); 1643 1644 RETURN(rc); 1645} 1646 1647static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa, 1648 struct lov_stripe_md *lsm, 1649 obd_count page_count, struct brw_page **pga, 1650 struct obd_capa *ocapa) 1651{ 1652 struct ptlrpc_request *req; 1653 int rc; 1654 wait_queue_head_t waitq; 1655 int generation, resends = 0; 1656 struct l_wait_info lwi; 1657 1658 ENTRY; 1659 1660 init_waitqueue_head(&waitq); 1661 generation = exp->exp_obd->u.cli.cl_import->imp_generation; 1662 1663restart_bulk: 1664 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm, 1665 page_count, pga, &req, ocapa, 0, resends); 1666 if (rc != 0) 1667 return (rc); 1668 1669 if (resends) { 1670 req->rq_generation_set = 1; 1671 req->rq_import_generation = generation; 1672 req->rq_sent = cfs_time_current_sec() + resends; 1673 } 1674 1675 rc = ptlrpc_queue_wait(req); 1676 1677 if (rc == -ETIMEDOUT && req->rq_resend) { 1678 DEBUG_REQ(D_HA, req, "BULK TIMEOUT"); 1679 ptlrpc_req_finished(req); 1680 goto restart_bulk; 1681 } 1682 1683 rc = osc_brw_fini_request(req, rc); 1684 1685 ptlrpc_req_finished(req); 1686 /* When server return -EINPROGRESS, client should always retry 1687 * regardless of the number of times the bulk was resent already.*/ 1688 if (osc_recoverable_error(rc)) { 1689 resends++; 1690 if (rc != -EINPROGRESS && 1691 !client_should_resend(resends, &exp->exp_obd->u.cli)) { 1692 CERROR("%s: too many resend retries for object: " 1693 ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name, 1694 POSTID(&oa->o_oi), rc); 1695 goto out; 1696 } 1697 if (generation != 1698 exp->exp_obd->u.cli.cl_import->imp_generation) { 1699 CDEBUG(D_HA, "%s: resend cross eviction for object: " 1700 ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name, 1701 POSTID(&oa->o_oi), rc); 1702 goto out; 1703 } 1704 1705 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, 1706 NULL); 1707 l_wait_event(waitq, 0, &lwi); 1708 1709 goto restart_bulk; 1710 } 1711out: 1712 if (rc == -EAGAIN || rc == -EINPROGRESS) 1713 rc = -EIO; 1714 RETURN (rc); 1715} 1716 1717static int osc_brw_redo_request(struct ptlrpc_request *request, 1718 struct osc_brw_async_args *aa, int rc) 1719{ 1720 struct ptlrpc_request *new_req; 1721 struct osc_brw_async_args *new_aa; 1722 struct osc_async_page *oap; 1723 ENTRY; 1724 1725 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request, 1726 "redo for recoverable error %d", rc); 1727 1728 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) == 1729 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ, 1730 aa->aa_cli, aa->aa_oa, 1731 NULL /* lsm unused by osc currently */, 1732 aa->aa_page_count, aa->aa_ppga, 1733 &new_req, aa->aa_ocapa, 0, 1); 1734 if (rc) 1735 RETURN(rc); 1736 1737 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) { 1738 if (oap->oap_request != NULL) { 1739 LASSERTF(request == oap->oap_request, 1740 "request %p != oap_request %p\n", 1741 request, oap->oap_request); 1742 if (oap->oap_interrupted) { 1743 ptlrpc_req_finished(new_req); 1744 RETURN(-EINTR); 1745 } 1746 } 1747 } 1748 /* New request takes over pga and oaps from old request. 1749 * Note that copying a list_head doesn't work, need to move it... */ 1750 aa->aa_resends++; 1751 new_req->rq_interpret_reply = request->rq_interpret_reply; 1752 new_req->rq_async_args = request->rq_async_args; 1753 /* cap resend delay to the current request timeout, this is similar to 1754 * what ptlrpc does (see after_reply()) */ 1755 if (aa->aa_resends > new_req->rq_timeout) 1756 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout; 1757 else 1758 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends; 1759 new_req->rq_generation_set = 1; 1760 new_req->rq_import_generation = request->rq_import_generation; 1761 1762 new_aa = ptlrpc_req_async_args(new_req); 1763 1764 INIT_LIST_HEAD(&new_aa->aa_oaps); 1765 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps); 1766 INIT_LIST_HEAD(&new_aa->aa_exts); 1767 list_splice_init(&aa->aa_exts, &new_aa->aa_exts); 1768 new_aa->aa_resends = aa->aa_resends; 1769 1770 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) { 1771 if (oap->oap_request) { 1772 ptlrpc_req_finished(oap->oap_request); 1773 oap->oap_request = ptlrpc_request_addref(new_req); 1774 } 1775 } 1776 1777 new_aa->aa_ocapa = aa->aa_ocapa; 1778 aa->aa_ocapa = NULL; 1779 1780 /* XXX: This code will run into problem if we're going to support 1781 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set 1782 * and wait for all of them to be finished. We should inherit request 1783 * set from old request. */ 1784 ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1); 1785 1786 DEBUG_REQ(D_INFO, new_req, "new request"); 1787 RETURN(0); 1788} 1789 1790/* 1791 * ugh, we want disk allocation on the target to happen in offset order. we'll 1792 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do 1793 * fine for our small page arrays and doesn't require allocation. its an 1794 * insertion sort that swaps elements that are strides apart, shrinking the 1795 * stride down until its '1' and the array is sorted. 1796 */ 1797static void sort_brw_pages(struct brw_page **array, int num) 1798{ 1799 int stride, i, j; 1800 struct brw_page *tmp; 1801 1802 if (num == 1) 1803 return; 1804 for (stride = 1; stride < num ; stride = (stride * 3) + 1) 1805 ; 1806 1807 do { 1808 stride /= 3; 1809 for (i = stride ; i < num ; i++) { 1810 tmp = array[i]; 1811 j = i; 1812 while (j >= stride && array[j - stride]->off > tmp->off) { 1813 array[j] = array[j - stride]; 1814 j -= stride; 1815 } 1816 array[j] = tmp; 1817 } 1818 } while (stride > 1); 1819} 1820 1821static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages) 1822{ 1823 int count = 1; 1824 int offset; 1825 int i = 0; 1826 1827 LASSERT (pages > 0); 1828 offset = pg[i]->off & ~CFS_PAGE_MASK; 1829 1830 for (;;) { 1831 pages--; 1832 if (pages == 0) /* that's all */ 1833 return count; 1834 1835 if (offset + pg[i]->count < PAGE_CACHE_SIZE) 1836 return count; /* doesn't end on page boundary */ 1837 1838 i++; 1839 offset = pg[i]->off & ~CFS_PAGE_MASK; 1840 if (offset != 0) /* doesn't start on page boundary */ 1841 return count; 1842 1843 count++; 1844 } 1845} 1846 1847static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count) 1848{ 1849 struct brw_page **ppga; 1850 int i; 1851 1852 OBD_ALLOC(ppga, sizeof(*ppga) * count); 1853 if (ppga == NULL) 1854 return NULL; 1855 1856 for (i = 0; i < count; i++) 1857 ppga[i] = pga + i; 1858 return ppga; 1859} 1860 1861static void osc_release_ppga(struct brw_page **ppga, obd_count count) 1862{ 1863 LASSERT(ppga != NULL); 1864 OBD_FREE(ppga, sizeof(*ppga) * count); 1865} 1866 1867static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo, 1868 obd_count page_count, struct brw_page *pga, 1869 struct obd_trans_info *oti) 1870{ 1871 struct obdo *saved_oa = NULL; 1872 struct brw_page **ppga, **orig; 1873 struct obd_import *imp = class_exp2cliimp(exp); 1874 struct client_obd *cli; 1875 int rc, page_count_orig; 1876 ENTRY; 1877 1878 LASSERT((imp != NULL) && (imp->imp_obd != NULL)); 1879 cli = &imp->imp_obd->u.cli; 1880 1881 if (cmd & OBD_BRW_CHECK) { 1882 /* The caller just wants to know if there's a chance that this 1883 * I/O can succeed */ 1884 1885 if (imp->imp_invalid) 1886 RETURN(-EIO); 1887 RETURN(0); 1888 } 1889 1890 /* test_brw with a failed create can trip this, maybe others. */ 1891 LASSERT(cli->cl_max_pages_per_rpc); 1892 1893 rc = 0; 1894 1895 orig = ppga = osc_build_ppga(pga, page_count); 1896 if (ppga == NULL) 1897 RETURN(-ENOMEM); 1898 page_count_orig = page_count; 1899 1900 sort_brw_pages(ppga, page_count); 1901 while (page_count) { 1902 obd_count pages_per_brw; 1903 1904 if (page_count > cli->cl_max_pages_per_rpc) 1905 pages_per_brw = cli->cl_max_pages_per_rpc; 1906 else 1907 pages_per_brw = page_count; 1908 1909 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw); 1910 1911 if (saved_oa != NULL) { 1912 /* restore previously saved oa */ 1913 *oinfo->oi_oa = *saved_oa; 1914 } else if (page_count > pages_per_brw) { 1915 /* save a copy of oa (brw will clobber it) */ 1916 OBDO_ALLOC(saved_oa); 1917 if (saved_oa == NULL) 1918 GOTO(out, rc = -ENOMEM); 1919 *saved_oa = *oinfo->oi_oa; 1920 } 1921 1922 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md, 1923 pages_per_brw, ppga, oinfo->oi_capa); 1924 1925 if (rc != 0) 1926 break; 1927 1928 page_count -= pages_per_brw; 1929 ppga += pages_per_brw; 1930 } 1931 1932out: 1933 osc_release_ppga(orig, page_count_orig); 1934 1935 if (saved_oa != NULL) 1936 OBDO_FREE(saved_oa); 1937 1938 RETURN(rc); 1939} 1940 1941static int brw_interpret(const struct lu_env *env, 1942 struct ptlrpc_request *req, void *data, int rc) 1943{ 1944 struct osc_brw_async_args *aa = data; 1945 struct osc_extent *ext; 1946 struct osc_extent *tmp; 1947 struct cl_object *obj = NULL; 1948 struct client_obd *cli = aa->aa_cli; 1949 ENTRY; 1950 1951 rc = osc_brw_fini_request(req, rc); 1952 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc); 1953 /* When server return -EINPROGRESS, client should always retry 1954 * regardless of the number of times the bulk was resent already. */ 1955 if (osc_recoverable_error(rc)) { 1956 if (req->rq_import_generation != 1957 req->rq_import->imp_generation) { 1958 CDEBUG(D_HA, "%s: resend cross eviction for object: " 1959 ""DOSTID", rc = %d.\n", 1960 req->rq_import->imp_obd->obd_name, 1961 POSTID(&aa->aa_oa->o_oi), rc); 1962 } else if (rc == -EINPROGRESS || 1963 client_should_resend(aa->aa_resends, aa->aa_cli)) { 1964 rc = osc_brw_redo_request(req, aa, rc); 1965 } else { 1966 CERROR("%s: too many resent retries for object: " 1967 ""LPU64":"LPU64", rc = %d.\n", 1968 req->rq_import->imp_obd->obd_name, 1969 POSTID(&aa->aa_oa->o_oi), rc); 1970 } 1971 1972 if (rc == 0) 1973 RETURN(0); 1974 else if (rc == -EAGAIN || rc == -EINPROGRESS) 1975 rc = -EIO; 1976 } 1977 1978 if (aa->aa_ocapa) { 1979 capa_put(aa->aa_ocapa); 1980 aa->aa_ocapa = NULL; 1981 } 1982 1983 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) { 1984 if (obj == NULL && rc == 0) { 1985 obj = osc2cl(ext->oe_obj); 1986 cl_object_get(obj); 1987 } 1988 1989 list_del_init(&ext->oe_link); 1990 osc_extent_finish(env, ext, 1, rc); 1991 } 1992 LASSERT(list_empty(&aa->aa_exts)); 1993 LASSERT(list_empty(&aa->aa_oaps)); 1994 1995 if (obj != NULL) { 1996 struct obdo *oa = aa->aa_oa; 1997 struct cl_attr *attr = &osc_env_info(env)->oti_attr; 1998 unsigned long valid = 0; 1999 2000 LASSERT(rc == 0); 2001 if (oa->o_valid & OBD_MD_FLBLOCKS) { 2002 attr->cat_blocks = oa->o_blocks; 2003 valid |= CAT_BLOCKS; 2004 } 2005 if (oa->o_valid & OBD_MD_FLMTIME) { 2006 attr->cat_mtime = oa->o_mtime; 2007 valid |= CAT_MTIME; 2008 } 2009 if (oa->o_valid & OBD_MD_FLATIME) { 2010 attr->cat_atime = oa->o_atime; 2011 valid |= CAT_ATIME; 2012 } 2013 if (oa->o_valid & OBD_MD_FLCTIME) { 2014 attr->cat_ctime = oa->o_ctime; 2015 valid |= CAT_CTIME; 2016 } 2017 if (valid != 0) { 2018 cl_object_attr_lock(obj); 2019 cl_object_attr_set(env, obj, attr, valid); 2020 cl_object_attr_unlock(obj); 2021 } 2022 cl_object_put(env, obj); 2023 } 2024 OBDO_FREE(aa->aa_oa); 2025 2026 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc : 2027 req->rq_bulk->bd_nob_transferred); 2028 osc_release_ppga(aa->aa_ppga, aa->aa_page_count); 2029 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred); 2030 2031 client_obd_list_lock(&cli->cl_loi_list_lock); 2032 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters 2033 * is called so we know whether to go to sync BRWs or wait for more 2034 * RPCs to complete */ 2035 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) 2036 cli->cl_w_in_flight--; 2037 else 2038 cli->cl_r_in_flight--; 2039 osc_wake_cache_waiters(cli); 2040 client_obd_list_unlock(&cli->cl_loi_list_lock); 2041 2042 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME); 2043 RETURN(rc); 2044} 2045 2046/** 2047 * Build an RPC by the list of extent @ext_list. The caller must ensure 2048 * that the total pages in this list are NOT over max pages per RPC. 2049 * Extents in the list must be in OES_RPC state. 2050 */ 2051int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, 2052 struct list_head *ext_list, int cmd, pdl_policy_t pol) 2053{ 2054 struct ptlrpc_request *req = NULL; 2055 struct osc_extent *ext; 2056 struct brw_page **pga = NULL; 2057 struct osc_brw_async_args *aa = NULL; 2058 struct obdo *oa = NULL; 2059 struct osc_async_page *oap; 2060 struct osc_async_page *tmp; 2061 struct cl_req *clerq = NULL; 2062 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : 2063 CRT_READ; 2064 struct ldlm_lock *lock = NULL; 2065 struct cl_req_attr *crattr = NULL; 2066 obd_off starting_offset = OBD_OBJECT_EOF; 2067 obd_off ending_offset = 0; 2068 int mpflag = 0; 2069 int mem_tight = 0; 2070 int page_count = 0; 2071 int i; 2072 int rc; 2073 LIST_HEAD(rpc_list); 2074 2075 ENTRY; 2076 LASSERT(!list_empty(ext_list)); 2077 2078 /* add pages into rpc_list to build BRW rpc */ 2079 list_for_each_entry(ext, ext_list, oe_link) { 2080 LASSERT(ext->oe_state == OES_RPC); 2081 mem_tight |= ext->oe_memalloc; 2082 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) { 2083 ++page_count; 2084 list_add_tail(&oap->oap_rpc_item, &rpc_list); 2085 if (starting_offset > oap->oap_obj_off) 2086 starting_offset = oap->oap_obj_off; 2087 else 2088 LASSERT(oap->oap_page_off == 0); 2089 if (ending_offset < oap->oap_obj_off + oap->oap_count) 2090 ending_offset = oap->oap_obj_off + 2091 oap->oap_count; 2092 else 2093 LASSERT(oap->oap_page_off + oap->oap_count == 2094 PAGE_CACHE_SIZE); 2095 } 2096 } 2097 2098 if (mem_tight) 2099 mpflag = cfs_memory_pressure_get_and_set(); 2100 2101 OBD_ALLOC(crattr, sizeof(*crattr)); 2102 if (crattr == NULL) 2103 GOTO(out, rc = -ENOMEM); 2104 2105 OBD_ALLOC(pga, sizeof(*pga) * page_count); 2106 if (pga == NULL) 2107 GOTO(out, rc = -ENOMEM); 2108 2109 OBDO_ALLOC(oa); 2110 if (oa == NULL) 2111 GOTO(out, rc = -ENOMEM); 2112 2113 i = 0; 2114 list_for_each_entry(oap, &rpc_list, oap_rpc_item) { 2115 struct cl_page *page = oap2cl_page(oap); 2116 if (clerq == NULL) { 2117 clerq = cl_req_alloc(env, page, crt, 2118 1 /* only 1-object rpcs for now */); 2119 if (IS_ERR(clerq)) 2120 GOTO(out, rc = PTR_ERR(clerq)); 2121 lock = oap->oap_ldlm_lock; 2122 } 2123 if (mem_tight) 2124 oap->oap_brw_flags |= OBD_BRW_MEMALLOC; 2125 pga[i] = &oap->oap_brw_page; 2126 pga[i]->off = oap->oap_obj_off + oap->oap_page_off; 2127 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n", 2128 pga[i]->pg, page_index(oap->oap_page), oap, 2129 pga[i]->flag); 2130 i++; 2131 cl_req_page_add(env, clerq, page); 2132 } 2133 2134 /* always get the data for the obdo for the rpc */ 2135 LASSERT(clerq != NULL); 2136 crattr->cra_oa = oa; 2137 cl_req_attr_set(env, clerq, crattr, ~0ULL); 2138 if (lock) { 2139 oa->o_handle = lock->l_remote_handle; 2140 oa->o_valid |= OBD_MD_FLHANDLE; 2141 } 2142 2143 rc = cl_req_prep(env, clerq); 2144 if (rc != 0) { 2145 CERROR("cl_req_prep failed: %d\n", rc); 2146 GOTO(out, rc); 2147 } 2148 2149 sort_brw_pages(pga, page_count); 2150 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, 2151 pga, &req, crattr->cra_capa, 1, 0); 2152 if (rc != 0) { 2153 CERROR("prep_req failed: %d\n", rc); 2154 GOTO(out, rc); 2155 } 2156 2157 req->rq_interpret_reply = brw_interpret; 2158 2159 if (mem_tight != 0) 2160 req->rq_memalloc = 1; 2161 2162 /* Need to update the timestamps after the request is built in case 2163 * we race with setattr (locally or in queue at OST). If OST gets 2164 * later setattr before earlier BRW (as determined by the request xid), 2165 * the OST will not use BRW timestamps. Sadly, there is no obvious 2166 * way to do this in a single call. bug 10150 */ 2167 cl_req_attr_set(env, clerq, crattr, 2168 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME); 2169 2170 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid); 2171 2172 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); 2173 aa = ptlrpc_req_async_args(req); 2174 INIT_LIST_HEAD(&aa->aa_oaps); 2175 list_splice_init(&rpc_list, &aa->aa_oaps); 2176 INIT_LIST_HEAD(&aa->aa_exts); 2177 list_splice_init(ext_list, &aa->aa_exts); 2178 aa->aa_clerq = clerq; 2179 2180 /* queued sync pages can be torn down while the pages 2181 * were between the pending list and the rpc */ 2182 tmp = NULL; 2183 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) { 2184 /* only one oap gets a request reference */ 2185 if (tmp == NULL) 2186 tmp = oap; 2187 if (oap->oap_interrupted && !req->rq_intr) { 2188 CDEBUG(D_INODE, "oap %p in req %p interrupted\n", 2189 oap, req); 2190 ptlrpc_mark_interrupted(req); 2191 } 2192 } 2193 if (tmp != NULL) 2194 tmp->oap_request = ptlrpc_request_addref(req); 2195 2196 client_obd_list_lock(&cli->cl_loi_list_lock); 2197 starting_offset >>= PAGE_CACHE_SHIFT; 2198 if (cmd == OBD_BRW_READ) { 2199 cli->cl_r_in_flight++; 2200 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count); 2201 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight); 2202 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist, 2203 starting_offset + 1); 2204 } else { 2205 cli->cl_w_in_flight++; 2206 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count); 2207 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight); 2208 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist, 2209 starting_offset + 1); 2210 } 2211 client_obd_list_unlock(&cli->cl_loi_list_lock); 2212 2213 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight", 2214 page_count, aa, cli->cl_r_in_flight, 2215 cli->cl_w_in_flight); 2216 2217 /* XXX: Maybe the caller can check the RPC bulk descriptor to 2218 * see which CPU/NUMA node the majority of pages were allocated 2219 * on, and try to assign the async RPC to the CPU core 2220 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic. 2221 * 2222 * But on the other hand, we expect that multiple ptlrpcd 2223 * threads and the initial write sponsor can run in parallel, 2224 * especially when data checksum is enabled, which is CPU-bound 2225 * operation and single ptlrpcd thread cannot process in time. 2226 * So more ptlrpcd threads sharing BRW load 2227 * (with PDL_POLICY_ROUND) seems better. 2228 */ 2229 ptlrpcd_add_req(req, pol, -1); 2230 rc = 0; 2231 EXIT; 2232 2233out: 2234 if (mem_tight != 0) 2235 cfs_memory_pressure_restore(mpflag); 2236 2237 if (crattr != NULL) { 2238 capa_put(crattr->cra_capa); 2239 OBD_FREE(crattr, sizeof(*crattr)); 2240 } 2241 2242 if (rc != 0) { 2243 LASSERT(req == NULL); 2244 2245 if (oa) 2246 OBDO_FREE(oa); 2247 if (pga) 2248 OBD_FREE(pga, sizeof(*pga) * page_count); 2249 /* this should happen rarely and is pretty bad, it makes the 2250 * pending list not follow the dirty order */ 2251 while (!list_empty(ext_list)) { 2252 ext = list_entry(ext_list->next, struct osc_extent, 2253 oe_link); 2254 list_del_init(&ext->oe_link); 2255 osc_extent_finish(env, ext, 0, rc); 2256 } 2257 if (clerq && !IS_ERR(clerq)) 2258 cl_req_completion(env, clerq, rc); 2259 } 2260 RETURN(rc); 2261} 2262 2263static int osc_set_lock_data_with_check(struct ldlm_lock *lock, 2264 struct ldlm_enqueue_info *einfo) 2265{ 2266 void *data = einfo->ei_cbdata; 2267 int set = 0; 2268 2269 LASSERT(lock != NULL); 2270 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl); 2271 LASSERT(lock->l_resource->lr_type == einfo->ei_type); 2272 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp); 2273 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl); 2274 2275 lock_res_and_lock(lock); 2276 spin_lock(&osc_ast_guard); 2277 2278 if (lock->l_ast_data == NULL) 2279 lock->l_ast_data = data; 2280 if (lock->l_ast_data == data) 2281 set = 1; 2282 2283 spin_unlock(&osc_ast_guard); 2284 unlock_res_and_lock(lock); 2285 2286 return set; 2287} 2288 2289static int osc_set_data_with_check(struct lustre_handle *lockh, 2290 struct ldlm_enqueue_info *einfo) 2291{ 2292 struct ldlm_lock *lock = ldlm_handle2lock(lockh); 2293 int set = 0; 2294 2295 if (lock != NULL) { 2296 set = osc_set_lock_data_with_check(lock, einfo); 2297 LDLM_LOCK_PUT(lock); 2298 } else 2299 CERROR("lockh %p, data %p - client evicted?\n", 2300 lockh, einfo->ei_cbdata); 2301 return set; 2302} 2303 2304static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, 2305 ldlm_iterator_t replace, void *data) 2306{ 2307 struct ldlm_res_id res_id; 2308 struct obd_device *obd = class_exp2obd(exp); 2309 2310 ostid_build_res_name(&lsm->lsm_oi, &res_id); 2311 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data); 2312 return 0; 2313} 2314 2315/* find any ldlm lock of the inode in osc 2316 * return 0 not find 2317 * 1 find one 2318 * < 0 error */ 2319static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, 2320 ldlm_iterator_t replace, void *data) 2321{ 2322 struct ldlm_res_id res_id; 2323 struct obd_device *obd = class_exp2obd(exp); 2324 int rc = 0; 2325 2326 ostid_build_res_name(&lsm->lsm_oi, &res_id); 2327 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data); 2328 if (rc == LDLM_ITER_STOP) 2329 return(1); 2330 if (rc == LDLM_ITER_CONTINUE) 2331 return(0); 2332 return(rc); 2333} 2334 2335static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb, 2336 obd_enqueue_update_f upcall, void *cookie, 2337 __u64 *flags, int agl, int rc) 2338{ 2339 int intent = *flags & LDLM_FL_HAS_INTENT; 2340 ENTRY; 2341 2342 if (intent) { 2343 /* The request was created before ldlm_cli_enqueue call. */ 2344 if (rc == ELDLM_LOCK_ABORTED) { 2345 struct ldlm_reply *rep; 2346 rep = req_capsule_server_get(&req->rq_pill, 2347 &RMF_DLM_REP); 2348 2349 LASSERT(rep != NULL); 2350 if (rep->lock_policy_res1) 2351 rc = rep->lock_policy_res1; 2352 } 2353 } 2354 2355 if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) || 2356 (rc == 0)) { 2357 *flags |= LDLM_FL_LVB_READY; 2358 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n", 2359 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime); 2360 } 2361 2362 /* Call the update callback. */ 2363 rc = (*upcall)(cookie, rc); 2364 RETURN(rc); 2365} 2366 2367static int osc_enqueue_interpret(const struct lu_env *env, 2368 struct ptlrpc_request *req, 2369 struct osc_enqueue_args *aa, int rc) 2370{ 2371 struct ldlm_lock *lock; 2372 struct lustre_handle handle; 2373 __u32 mode; 2374 struct ost_lvb *lvb; 2375 __u32 lvb_len; 2376 __u64 *flags = aa->oa_flags; 2377 2378 /* Make a local copy of a lock handle and a mode, because aa->oa_* 2379 * might be freed anytime after lock upcall has been called. */ 2380 lustre_handle_copy(&handle, aa->oa_lockh); 2381 mode = aa->oa_ei->ei_mode; 2382 2383 /* ldlm_cli_enqueue is holding a reference on the lock, so it must 2384 * be valid. */ 2385 lock = ldlm_handle2lock(&handle); 2386 2387 /* Take an additional reference so that a blocking AST that 2388 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed 2389 * to arrive after an upcall has been executed by 2390 * osc_enqueue_fini(). */ 2391 ldlm_lock_addref(&handle, mode); 2392 2393 /* Let CP AST to grant the lock first. */ 2394 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1); 2395 2396 if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) { 2397 lvb = NULL; 2398 lvb_len = 0; 2399 } else { 2400 lvb = aa->oa_lvb; 2401 lvb_len = sizeof(*aa->oa_lvb); 2402 } 2403 2404 /* Complete obtaining the lock procedure. */ 2405 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1, 2406 mode, flags, lvb, lvb_len, &handle, rc); 2407 /* Complete osc stuff. */ 2408 rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie, 2409 flags, aa->oa_agl, rc); 2410 2411 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10); 2412 2413 /* Release the lock for async request. */ 2414 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK) 2415 /* 2416 * Releases a reference taken by ldlm_cli_enqueue(), if it is 2417 * not already released by 2418 * ldlm_cli_enqueue_fini()->failed_lock_cleanup() 2419 */ 2420 ldlm_lock_decref(&handle, mode); 2421 2422 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n", 2423 aa->oa_lockh, req, aa); 2424 ldlm_lock_decref(&handle, mode); 2425 LDLM_LOCK_PUT(lock); 2426 return rc; 2427} 2428 2429void osc_update_enqueue(struct lustre_handle *lov_lockhp, 2430 struct lov_oinfo *loi, int flags, 2431 struct ost_lvb *lvb, __u32 mode, int rc) 2432{ 2433 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp); 2434 2435 if (rc == ELDLM_OK) { 2436 __u64 tmp; 2437 2438 LASSERT(lock != NULL); 2439 loi->loi_lvb = *lvb; 2440 tmp = loi->loi_lvb.lvb_size; 2441 /* Extend KMS up to the end of this lock and no further 2442 * A lock on [x,y] means a KMS of up to y + 1 bytes! */ 2443 if (tmp > lock->l_policy_data.l_extent.end) 2444 tmp = lock->l_policy_data.l_extent.end + 1; 2445 if (tmp >= loi->loi_kms) { 2446 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64 2447 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp); 2448 loi_kms_set(loi, tmp); 2449 } else { 2450 LDLM_DEBUG(lock, "lock acquired, setting rss=" 2451 LPU64"; leaving kms="LPU64", end="LPU64, 2452 loi->loi_lvb.lvb_size, loi->loi_kms, 2453 lock->l_policy_data.l_extent.end); 2454 } 2455 ldlm_lock_allow_match(lock); 2456 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) { 2457 LASSERT(lock != NULL); 2458 loi->loi_lvb = *lvb; 2459 ldlm_lock_allow_match(lock); 2460 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving" 2461 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms); 2462 rc = ELDLM_OK; 2463 } 2464 2465 if (lock != NULL) { 2466 if (rc != ELDLM_OK) 2467 ldlm_lock_fail_match(lock); 2468 2469 LDLM_LOCK_PUT(lock); 2470 } 2471} 2472EXPORT_SYMBOL(osc_update_enqueue); 2473 2474struct ptlrpc_request_set *PTLRPCD_SET = (void *)1; 2475 2476/* When enqueuing asynchronously, locks are not ordered, we can obtain a lock 2477 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with 2478 * other synchronous requests, however keeping some locks and trying to obtain 2479 * others may take a considerable amount of time in a case of ost failure; and 2480 * when other sync requests do not get released lock from a client, the client 2481 * is excluded from the cluster -- such scenarious make the life difficult, so 2482 * release locks just after they are obtained. */ 2483int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, 2484 __u64 *flags, ldlm_policy_data_t *policy, 2485 struct ost_lvb *lvb, int kms_valid, 2486 obd_enqueue_update_f upcall, void *cookie, 2487 struct ldlm_enqueue_info *einfo, 2488 struct lustre_handle *lockh, 2489 struct ptlrpc_request_set *rqset, int async, int agl) 2490{ 2491 struct obd_device *obd = exp->exp_obd; 2492 struct ptlrpc_request *req = NULL; 2493 int intent = *flags & LDLM_FL_HAS_INTENT; 2494 int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY); 2495 ldlm_mode_t mode; 2496 int rc; 2497 ENTRY; 2498 2499 /* Filesystem lock extents are extended to page boundaries so that 2500 * dealing with the page cache is a little smoother. */ 2501 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK; 2502 policy->l_extent.end |= ~CFS_PAGE_MASK; 2503 2504 /* 2505 * kms is not valid when either object is completely fresh (so that no 2506 * locks are cached), or object was evicted. In the latter case cached 2507 * lock cannot be used, because it would prime inode state with 2508 * potentially stale LVB. 2509 */ 2510 if (!kms_valid) 2511 goto no_match; 2512 2513 /* Next, search for already existing extent locks that will cover us */ 2514 /* If we're trying to read, we also search for an existing PW lock. The 2515 * VFS and page cache already protect us locally, so lots of readers/ 2516 * writers can share a single PW lock. 2517 * 2518 * There are problems with conversion deadlocks, so instead of 2519 * converting a read lock to a write lock, we'll just enqueue a new 2520 * one. 2521 * 2522 * At some point we should cancel the read lock instead of making them 2523 * send us a blocking callback, but there are problems with canceling 2524 * locks out from other users right now, too. */ 2525 mode = einfo->ei_mode; 2526 if (einfo->ei_mode == LCK_PR) 2527 mode |= LCK_PW; 2528 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id, 2529 einfo->ei_type, policy, mode, lockh, 0); 2530 if (mode) { 2531 struct ldlm_lock *matched = ldlm_handle2lock(lockh); 2532 2533 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) { 2534 /* For AGL, if enqueue RPC is sent but the lock is not 2535 * granted, then skip to process this strpe. 2536 * Return -ECANCELED to tell the caller. */ 2537 ldlm_lock_decref(lockh, mode); 2538 LDLM_LOCK_PUT(matched); 2539 RETURN(-ECANCELED); 2540 } else if (osc_set_lock_data_with_check(matched, einfo)) { 2541 *flags |= LDLM_FL_LVB_READY; 2542 /* addref the lock only if not async requests and PW 2543 * lock is matched whereas we asked for PR. */ 2544 if (!rqset && einfo->ei_mode != mode) 2545 ldlm_lock_addref(lockh, LCK_PR); 2546 if (intent) { 2547 /* I would like to be able to ASSERT here that 2548 * rss <= kms, but I can't, for reasons which 2549 * are explained in lov_enqueue() */ 2550 } 2551 2552 /* We already have a lock, and it's referenced. 2553 * 2554 * At this point, the cl_lock::cll_state is CLS_QUEUING, 2555 * AGL upcall may change it to CLS_HELD directly. */ 2556 (*upcall)(cookie, ELDLM_OK); 2557 2558 if (einfo->ei_mode != mode) 2559 ldlm_lock_decref(lockh, LCK_PW); 2560 else if (rqset) 2561 /* For async requests, decref the lock. */ 2562 ldlm_lock_decref(lockh, einfo->ei_mode); 2563 LDLM_LOCK_PUT(matched); 2564 RETURN(ELDLM_OK); 2565 } else { 2566 ldlm_lock_decref(lockh, mode); 2567 LDLM_LOCK_PUT(matched); 2568 } 2569 } 2570 2571 no_match: 2572 if (intent) { 2573 LIST_HEAD(cancels); 2574 req = ptlrpc_request_alloc(class_exp2cliimp(exp), 2575 &RQF_LDLM_ENQUEUE_LVB); 2576 if (req == NULL) 2577 RETURN(-ENOMEM); 2578 2579 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0); 2580 if (rc) { 2581 ptlrpc_request_free(req); 2582 RETURN(rc); 2583 } 2584 2585 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, 2586 sizeof *lvb); 2587 ptlrpc_request_set_replen(req); 2588 } 2589 2590 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */ 2591 *flags &= ~LDLM_FL_BLOCK_GRANTED; 2592 2593 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb, 2594 sizeof(*lvb), LVB_T_OST, lockh, async); 2595 if (rqset) { 2596 if (!rc) { 2597 struct osc_enqueue_args *aa; 2598 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args)); 2599 aa = ptlrpc_req_async_args(req); 2600 aa->oa_ei = einfo; 2601 aa->oa_exp = exp; 2602 aa->oa_flags = flags; 2603 aa->oa_upcall = upcall; 2604 aa->oa_cookie = cookie; 2605 aa->oa_lvb = lvb; 2606 aa->oa_lockh = lockh; 2607 aa->oa_agl = !!agl; 2608 2609 req->rq_interpret_reply = 2610 (ptlrpc_interpterer_t)osc_enqueue_interpret; 2611 if (rqset == PTLRPCD_SET) 2612 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); 2613 else 2614 ptlrpc_set_add_req(rqset, req); 2615 } else if (intent) { 2616 ptlrpc_req_finished(req); 2617 } 2618 RETURN(rc); 2619 } 2620 2621 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc); 2622 if (intent) 2623 ptlrpc_req_finished(req); 2624 2625 RETURN(rc); 2626} 2627 2628static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, 2629 struct ldlm_enqueue_info *einfo, 2630 struct ptlrpc_request_set *rqset) 2631{ 2632 struct ldlm_res_id res_id; 2633 int rc; 2634 ENTRY; 2635 2636 ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id); 2637 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy, 2638 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb, 2639 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid, 2640 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh, 2641 rqset, rqset != NULL, 0); 2642 RETURN(rc); 2643} 2644 2645int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id, 2646 __u32 type, ldlm_policy_data_t *policy, __u32 mode, 2647 int *flags, void *data, struct lustre_handle *lockh, 2648 int unref) 2649{ 2650 struct obd_device *obd = exp->exp_obd; 2651 int lflags = *flags; 2652 ldlm_mode_t rc; 2653 ENTRY; 2654 2655 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH)) 2656 RETURN(-EIO); 2657 2658 /* Filesystem lock extents are extended to page boundaries so that 2659 * dealing with the page cache is a little smoother */ 2660 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK; 2661 policy->l_extent.end |= ~CFS_PAGE_MASK; 2662 2663 /* Next, search for already existing extent locks that will cover us */ 2664 /* If we're trying to read, we also search for an existing PW lock. The 2665 * VFS and page cache already protect us locally, so lots of readers/ 2666 * writers can share a single PW lock. */ 2667 rc = mode; 2668 if (mode == LCK_PR) 2669 rc |= LCK_PW; 2670 rc = ldlm_lock_match(obd->obd_namespace, lflags, 2671 res_id, type, policy, rc, lockh, unref); 2672 if (rc) { 2673 if (data != NULL) { 2674 if (!osc_set_data_with_check(lockh, data)) { 2675 if (!(lflags & LDLM_FL_TEST_LOCK)) 2676 ldlm_lock_decref(lockh, rc); 2677 RETURN(0); 2678 } 2679 } 2680 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) { 2681 ldlm_lock_addref(lockh, LCK_PR); 2682 ldlm_lock_decref(lockh, LCK_PW); 2683 } 2684 RETURN(rc); 2685 } 2686 RETURN(rc); 2687} 2688 2689int osc_cancel_base(struct lustre_handle *lockh, __u32 mode) 2690{ 2691 ENTRY; 2692 2693 if (unlikely(mode == LCK_GROUP)) 2694 ldlm_lock_decref_and_cancel(lockh, mode); 2695 else 2696 ldlm_lock_decref(lockh, mode); 2697 2698 RETURN(0); 2699} 2700 2701static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md, 2702 __u32 mode, struct lustre_handle *lockh) 2703{ 2704 ENTRY; 2705 RETURN(osc_cancel_base(lockh, mode)); 2706} 2707 2708static int osc_cancel_unused(struct obd_export *exp, 2709 struct lov_stripe_md *lsm, 2710 ldlm_cancel_flags_t flags, 2711 void *opaque) 2712{ 2713 struct obd_device *obd = class_exp2obd(exp); 2714 struct ldlm_res_id res_id, *resp = NULL; 2715 2716 if (lsm != NULL) { 2717 ostid_build_res_name(&lsm->lsm_oi, &res_id); 2718 resp = &res_id; 2719 } 2720 2721 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque); 2722} 2723 2724static int osc_statfs_interpret(const struct lu_env *env, 2725 struct ptlrpc_request *req, 2726 struct osc_async_args *aa, int rc) 2727{ 2728 struct obd_statfs *msfs; 2729 ENTRY; 2730 2731 if (rc == -EBADR) 2732 /* The request has in fact never been sent 2733 * due to issues at a higher level (LOV). 2734 * Exit immediately since the caller is 2735 * aware of the problem and takes care 2736 * of the clean up */ 2737 RETURN(rc); 2738 2739 if ((rc == -ENOTCONN || rc == -EAGAIN) && 2740 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY)) 2741 GOTO(out, rc = 0); 2742 2743 if (rc != 0) 2744 GOTO(out, rc); 2745 2746 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS); 2747 if (msfs == NULL) { 2748 GOTO(out, rc = -EPROTO); 2749 } 2750 2751 *aa->aa_oi->oi_osfs = *msfs; 2752out: 2753 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc); 2754 RETURN(rc); 2755} 2756 2757static int osc_statfs_async(struct obd_export *exp, 2758 struct obd_info *oinfo, __u64 max_age, 2759 struct ptlrpc_request_set *rqset) 2760{ 2761 struct obd_device *obd = class_exp2obd(exp); 2762 struct ptlrpc_request *req; 2763 struct osc_async_args *aa; 2764 int rc; 2765 ENTRY; 2766 2767 /* We could possibly pass max_age in the request (as an absolute 2768 * timestamp or a "seconds.usec ago") so the target can avoid doing 2769 * extra calls into the filesystem if that isn't necessary (e.g. 2770 * during mount that would help a bit). Having relative timestamps 2771 * is not so great if request processing is slow, while absolute 2772 * timestamps are not ideal because they need time synchronization. */ 2773 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS); 2774 if (req == NULL) 2775 RETURN(-ENOMEM); 2776 2777 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS); 2778 if (rc) { 2779 ptlrpc_request_free(req); 2780 RETURN(rc); 2781 } 2782 ptlrpc_request_set_replen(req); 2783 req->rq_request_portal = OST_CREATE_PORTAL; 2784 ptlrpc_at_set_req_timeout(req); 2785 2786 if (oinfo->oi_flags & OBD_STATFS_NODELAY) { 2787 /* procfs requests not want stat in wait for avoid deadlock */ 2788 req->rq_no_resend = 1; 2789 req->rq_no_delay = 1; 2790 } 2791 2792 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret; 2793 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args)); 2794 aa = ptlrpc_req_async_args(req); 2795 aa->aa_oi = oinfo; 2796 2797 ptlrpc_set_add_req(rqset, req); 2798 RETURN(0); 2799} 2800 2801static int osc_statfs(const struct lu_env *env, struct obd_export *exp, 2802 struct obd_statfs *osfs, __u64 max_age, __u32 flags) 2803{ 2804 struct obd_device *obd = class_exp2obd(exp); 2805 struct obd_statfs *msfs; 2806 struct ptlrpc_request *req; 2807 struct obd_import *imp = NULL; 2808 int rc; 2809 ENTRY; 2810 2811 /*Since the request might also come from lprocfs, so we need 2812 *sync this with client_disconnect_export Bug15684*/ 2813 down_read(&obd->u.cli.cl_sem); 2814 if (obd->u.cli.cl_import) 2815 imp = class_import_get(obd->u.cli.cl_import); 2816 up_read(&obd->u.cli.cl_sem); 2817 if (!imp) 2818 RETURN(-ENODEV); 2819 2820 /* We could possibly pass max_age in the request (as an absolute 2821 * timestamp or a "seconds.usec ago") so the target can avoid doing 2822 * extra calls into the filesystem if that isn't necessary (e.g. 2823 * during mount that would help a bit). Having relative timestamps 2824 * is not so great if request processing is slow, while absolute 2825 * timestamps are not ideal because they need time synchronization. */ 2826 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS); 2827 2828 class_import_put(imp); 2829 2830 if (req == NULL) 2831 RETURN(-ENOMEM); 2832 2833 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS); 2834 if (rc) { 2835 ptlrpc_request_free(req); 2836 RETURN(rc); 2837 } 2838 ptlrpc_request_set_replen(req); 2839 req->rq_request_portal = OST_CREATE_PORTAL; 2840 ptlrpc_at_set_req_timeout(req); 2841 2842 if (flags & OBD_STATFS_NODELAY) { 2843 /* procfs requests not want stat in wait for avoid deadlock */ 2844 req->rq_no_resend = 1; 2845 req->rq_no_delay = 1; 2846 } 2847 2848 rc = ptlrpc_queue_wait(req); 2849 if (rc) 2850 GOTO(out, rc); 2851 2852 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS); 2853 if (msfs == NULL) { 2854 GOTO(out, rc = -EPROTO); 2855 } 2856 2857 *osfs = *msfs; 2858 2859 EXIT; 2860 out: 2861 ptlrpc_req_finished(req); 2862 return rc; 2863} 2864 2865/* Retrieve object striping information. 2866 * 2867 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating 2868 * the maximum number of OST indices which will fit in the user buffer. 2869 * lmm_magic must be LOV_MAGIC (we only use 1 slot here). 2870 */ 2871static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump) 2872{ 2873 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */ 2874 struct lov_user_md_v3 lum, *lumk; 2875 struct lov_user_ost_data_v1 *lmm_objects; 2876 int rc = 0, lum_size; 2877 ENTRY; 2878 2879 if (!lsm) 2880 RETURN(-ENODATA); 2881 2882 /* we only need the header part from user space to get lmm_magic and 2883 * lmm_stripe_count, (the header part is common to v1 and v3) */ 2884 lum_size = sizeof(struct lov_user_md_v1); 2885 if (copy_from_user(&lum, lump, lum_size)) 2886 RETURN(-EFAULT); 2887 2888 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) && 2889 (lum.lmm_magic != LOV_USER_MAGIC_V3)) 2890 RETURN(-EINVAL); 2891 2892 /* lov_user_md_vX and lov_mds_md_vX must have the same size */ 2893 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1)); 2894 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3)); 2895 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0])); 2896 2897 /* we can use lov_mds_md_size() to compute lum_size 2898 * because lov_user_md_vX and lov_mds_md_vX have the same size */ 2899 if (lum.lmm_stripe_count > 0) { 2900 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic); 2901 OBD_ALLOC(lumk, lum_size); 2902 if (!lumk) 2903 RETURN(-ENOMEM); 2904 2905 if (lum.lmm_magic == LOV_USER_MAGIC_V1) 2906 lmm_objects = 2907 &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]); 2908 else 2909 lmm_objects = &(lumk->lmm_objects[0]); 2910 lmm_objects->l_ost_oi = lsm->lsm_oi; 2911 } else { 2912 lum_size = lov_mds_md_size(0, lum.lmm_magic); 2913 lumk = &lum; 2914 } 2915 2916 lumk->lmm_oi = lsm->lsm_oi; 2917 lumk->lmm_stripe_count = 1; 2918 2919 if (copy_to_user(lump, lumk, lum_size)) 2920 rc = -EFAULT; 2921 2922 if (lumk != &lum) 2923 OBD_FREE(lumk, lum_size); 2924 2925 RETURN(rc); 2926} 2927 2928 2929static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, 2930 void *karg, void *uarg) 2931{ 2932 struct obd_device *obd = exp->exp_obd; 2933 struct obd_ioctl_data *data = karg; 2934 int err = 0; 2935 ENTRY; 2936 2937 if (!try_module_get(THIS_MODULE)) { 2938 CERROR("Can't get module. Is it alive?"); 2939 return -EINVAL; 2940 } 2941 switch (cmd) { 2942 case OBD_IOC_LOV_GET_CONFIG: { 2943 char *buf; 2944 struct lov_desc *desc; 2945 struct obd_uuid uuid; 2946 2947 buf = NULL; 2948 len = 0; 2949 if (obd_ioctl_getdata(&buf, &len, (void *)uarg)) 2950 GOTO(out, err = -EINVAL); 2951 2952 data = (struct obd_ioctl_data *)buf; 2953 2954 if (sizeof(*desc) > data->ioc_inllen1) { 2955 obd_ioctl_freedata(buf, len); 2956 GOTO(out, err = -EINVAL); 2957 } 2958 2959 if (data->ioc_inllen2 < sizeof(uuid)) { 2960 obd_ioctl_freedata(buf, len); 2961 GOTO(out, err = -EINVAL); 2962 } 2963 2964 desc = (struct lov_desc *)data->ioc_inlbuf1; 2965 desc->ld_tgt_count = 1; 2966 desc->ld_active_tgt_count = 1; 2967 desc->ld_default_stripe_count = 1; 2968 desc->ld_default_stripe_size = 0; 2969 desc->ld_default_stripe_offset = 0; 2970 desc->ld_pattern = 0; 2971 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid)); 2972 2973 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid)); 2974 2975 err = copy_to_user((void *)uarg, buf, len); 2976 if (err) 2977 err = -EFAULT; 2978 obd_ioctl_freedata(buf, len); 2979 GOTO(out, err); 2980 } 2981 case LL_IOC_LOV_SETSTRIPE: 2982 err = obd_alloc_memmd(exp, karg); 2983 if (err > 0) 2984 err = 0; 2985 GOTO(out, err); 2986 case LL_IOC_LOV_GETSTRIPE: 2987 err = osc_getstripe(karg, uarg); 2988 GOTO(out, err); 2989 case OBD_IOC_CLIENT_RECOVER: 2990 err = ptlrpc_recover_import(obd->u.cli.cl_import, 2991 data->ioc_inlbuf1, 0); 2992 if (err > 0) 2993 err = 0; 2994 GOTO(out, err); 2995 case IOC_OSC_SET_ACTIVE: 2996 err = ptlrpc_set_import_active(obd->u.cli.cl_import, 2997 data->ioc_offset); 2998 GOTO(out, err); 2999 case OBD_IOC_POLL_QUOTACHECK: 3000 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg); 3001 GOTO(out, err); 3002 case OBD_IOC_PING_TARGET: 3003 err = ptlrpc_obd_ping(obd); 3004 GOTO(out, err); 3005 default: 3006 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n", 3007 cmd, current_comm()); 3008 GOTO(out, err = -ENOTTY); 3009 } 3010out: 3011 module_put(THIS_MODULE); 3012 return err; 3013} 3014 3015static int osc_get_info(const struct lu_env *env, struct obd_export *exp, 3016 obd_count keylen, void *key, __u32 *vallen, void *val, 3017 struct lov_stripe_md *lsm) 3018{ 3019 ENTRY; 3020 if (!vallen || !val) 3021 RETURN(-EFAULT); 3022 3023 if (KEY_IS(KEY_LOCK_TO_STRIPE)) { 3024 __u32 *stripe = val; 3025 *vallen = sizeof(*stripe); 3026 *stripe = 0; 3027 RETURN(0); 3028 } else if (KEY_IS(KEY_LAST_ID)) { 3029 struct ptlrpc_request *req; 3030 obd_id *reply; 3031 char *tmp; 3032 int rc; 3033 3034 req = ptlrpc_request_alloc(class_exp2cliimp(exp), 3035 &RQF_OST_GET_INFO_LAST_ID); 3036 if (req == NULL) 3037 RETURN(-ENOMEM); 3038 3039 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY, 3040 RCL_CLIENT, keylen); 3041 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO); 3042 if (rc) { 3043 ptlrpc_request_free(req); 3044 RETURN(rc); 3045 } 3046 3047 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY); 3048 memcpy(tmp, key, keylen); 3049 3050 req->rq_no_delay = req->rq_no_resend = 1; 3051 ptlrpc_request_set_replen(req); 3052 rc = ptlrpc_queue_wait(req); 3053 if (rc) 3054 GOTO(out, rc); 3055 3056 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID); 3057 if (reply == NULL) 3058 GOTO(out, rc = -EPROTO); 3059 3060 *((obd_id *)val) = *reply; 3061 out: 3062 ptlrpc_req_finished(req); 3063 RETURN(rc); 3064 } else if (KEY_IS(KEY_FIEMAP)) { 3065 struct ll_fiemap_info_key *fm_key = 3066 (struct ll_fiemap_info_key *)key; 3067 struct ldlm_res_id res_id; 3068 ldlm_policy_data_t policy; 3069 struct lustre_handle lockh; 3070 ldlm_mode_t mode = 0; 3071 struct ptlrpc_request *req; 3072 struct ll_user_fiemap *reply; 3073 char *tmp; 3074 int rc; 3075 3076 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC)) 3077 goto skip_locking; 3078 3079 policy.l_extent.start = fm_key->fiemap.fm_start & 3080 CFS_PAGE_MASK; 3081 3082 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <= 3083 fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1) 3084 policy.l_extent.end = OBD_OBJECT_EOF; 3085 else 3086 policy.l_extent.end = (fm_key->fiemap.fm_start + 3087 fm_key->fiemap.fm_length + 3088 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK; 3089 3090 ostid_build_res_name(&fm_key->oa.o_oi, &res_id); 3091 mode = ldlm_lock_match(exp->exp_obd->obd_namespace, 3092 LDLM_FL_BLOCK_GRANTED | 3093 LDLM_FL_LVB_READY, 3094 &res_id, LDLM_EXTENT, &policy, 3095 LCK_PR | LCK_PW, &lockh, 0); 3096 if (mode) { /* lock is cached on client */ 3097 if (mode != LCK_PR) { 3098 ldlm_lock_addref(&lockh, LCK_PR); 3099 ldlm_lock_decref(&lockh, LCK_PW); 3100 } 3101 } else { /* no cached lock, needs acquire lock on server side */ 3102 fm_key->oa.o_valid |= OBD_MD_FLFLAGS; 3103 fm_key->oa.o_flags |= OBD_FL_SRVLOCK; 3104 } 3105 3106skip_locking: 3107 req = ptlrpc_request_alloc(class_exp2cliimp(exp), 3108 &RQF_OST_GET_INFO_FIEMAP); 3109 if (req == NULL) 3110 GOTO(drop_lock, rc = -ENOMEM); 3111 3112 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY, 3113 RCL_CLIENT, keylen); 3114 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL, 3115 RCL_CLIENT, *vallen); 3116 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL, 3117 RCL_SERVER, *vallen); 3118 3119 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO); 3120 if (rc) { 3121 ptlrpc_request_free(req); 3122 GOTO(drop_lock, rc); 3123 } 3124 3125 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY); 3126 memcpy(tmp, key, keylen); 3127 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL); 3128 memcpy(tmp, val, *vallen); 3129 3130 ptlrpc_request_set_replen(req); 3131 rc = ptlrpc_queue_wait(req); 3132 if (rc) 3133 GOTO(fini_req, rc); 3134 3135 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL); 3136 if (reply == NULL) 3137 GOTO(fini_req, rc = -EPROTO); 3138 3139 memcpy(val, reply, *vallen); 3140fini_req: 3141 ptlrpc_req_finished(req); 3142drop_lock: 3143 if (mode) 3144 ldlm_lock_decref(&lockh, LCK_PR); 3145 RETURN(rc); 3146 } 3147 3148 RETURN(-EINVAL); 3149} 3150 3151static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, 3152 obd_count keylen, void *key, obd_count vallen, 3153 void *val, struct ptlrpc_request_set *set) 3154{ 3155 struct ptlrpc_request *req; 3156 struct obd_device *obd = exp->exp_obd; 3157 struct obd_import *imp = class_exp2cliimp(exp); 3158 char *tmp; 3159 int rc; 3160 ENTRY; 3161 3162 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10); 3163 3164 if (KEY_IS(KEY_CHECKSUM)) { 3165 if (vallen != sizeof(int)) 3166 RETURN(-EINVAL); 3167 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0; 3168 RETURN(0); 3169 } 3170 3171 if (KEY_IS(KEY_SPTLRPC_CONF)) { 3172 sptlrpc_conf_client_adapt(obd); 3173 RETURN(0); 3174 } 3175 3176 if (KEY_IS(KEY_FLUSH_CTX)) { 3177 sptlrpc_import_flush_my_ctx(imp); 3178 RETURN(0); 3179 } 3180 3181 if (KEY_IS(KEY_CACHE_SET)) { 3182 struct client_obd *cli = &obd->u.cli; 3183 3184 LASSERT(cli->cl_cache == NULL); /* only once */ 3185 cli->cl_cache = (struct cl_client_cache *)val; 3186 atomic_inc(&cli->cl_cache->ccc_users); 3187 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left; 3188 3189 /* add this osc into entity list */ 3190 LASSERT(list_empty(&cli->cl_lru_osc)); 3191 spin_lock(&cli->cl_cache->ccc_lru_lock); 3192 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru); 3193 spin_unlock(&cli->cl_cache->ccc_lru_lock); 3194 3195 RETURN(0); 3196 } 3197 3198 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) { 3199 struct client_obd *cli = &obd->u.cli; 3200 int nr = atomic_read(&cli->cl_lru_in_list) >> 1; 3201 int target = *(int *)val; 3202 3203 nr = osc_lru_shrink(cli, min(nr, target)); 3204 *(int *)val -= nr; 3205 RETURN(0); 3206 } 3207 3208 if (!set && !KEY_IS(KEY_GRANT_SHRINK)) 3209 RETURN(-EINVAL); 3210 3211 /* We pass all other commands directly to OST. Since nobody calls osc 3212 methods directly and everybody is supposed to go through LOV, we 3213 assume lov checked invalid values for us. 3214 The only recognised values so far are evict_by_nid and mds_conn. 3215 Even if something bad goes through, we'd get a -EINVAL from OST 3216 anyway. */ 3217 3218 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ? 3219 &RQF_OST_SET_GRANT_INFO : 3220 &RQF_OBD_SET_INFO); 3221 if (req == NULL) 3222 RETURN(-ENOMEM); 3223 3224 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY, 3225 RCL_CLIENT, keylen); 3226 if (!KEY_IS(KEY_GRANT_SHRINK)) 3227 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL, 3228 RCL_CLIENT, vallen); 3229 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO); 3230 if (rc) { 3231 ptlrpc_request_free(req); 3232 RETURN(rc); 3233 } 3234 3235 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY); 3236 memcpy(tmp, key, keylen); 3237 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ? 3238 &RMF_OST_BODY : 3239 &RMF_SETINFO_VAL); 3240 memcpy(tmp, val, vallen); 3241 3242 if (KEY_IS(KEY_GRANT_SHRINK)) { 3243 struct osc_grant_args *aa; 3244 struct obdo *oa; 3245 3246 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); 3247 aa = ptlrpc_req_async_args(req); 3248 OBDO_ALLOC(oa); 3249 if (!oa) { 3250 ptlrpc_req_finished(req); 3251 RETURN(-ENOMEM); 3252 } 3253 *oa = ((struct ost_body *)val)->oa; 3254 aa->aa_oa = oa; 3255 req->rq_interpret_reply = osc_shrink_grant_interpret; 3256 } 3257 3258 ptlrpc_request_set_replen(req); 3259 if (!KEY_IS(KEY_GRANT_SHRINK)) { 3260 LASSERT(set != NULL); 3261 ptlrpc_set_add_req(set, req); 3262 ptlrpc_check_set(NULL, set); 3263 } else 3264 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); 3265 3266 RETURN(0); 3267} 3268 3269 3270static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg, 3271 struct obd_device *disk_obd, int *index) 3272{ 3273 /* this code is not supposed to be used with LOD/OSP 3274 * to be removed soon */ 3275 LBUG(); 3276 return 0; 3277} 3278 3279static int osc_llog_finish(struct obd_device *obd, int count) 3280{ 3281 struct llog_ctxt *ctxt; 3282 3283 ENTRY; 3284 3285 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT); 3286 if (ctxt) { 3287 llog_cat_close(NULL, ctxt->loc_handle); 3288 llog_cleanup(NULL, ctxt); 3289 } 3290 3291 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT); 3292 if (ctxt) 3293 llog_cleanup(NULL, ctxt); 3294 RETURN(0); 3295} 3296 3297static int osc_reconnect(const struct lu_env *env, 3298 struct obd_export *exp, struct obd_device *obd, 3299 struct obd_uuid *cluuid, 3300 struct obd_connect_data *data, 3301 void *localdata) 3302{ 3303 struct client_obd *cli = &obd->u.cli; 3304 3305 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) { 3306 long lost_grant; 3307 3308 client_obd_list_lock(&cli->cl_loi_list_lock); 3309 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?: 3310 2 * cli_brw_size(obd); 3311 lost_grant = cli->cl_lost_grant; 3312 cli->cl_lost_grant = 0; 3313 client_obd_list_unlock(&cli->cl_loi_list_lock); 3314 3315 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d" 3316 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags, 3317 data->ocd_version, data->ocd_grant, lost_grant); 3318 } 3319 3320 RETURN(0); 3321} 3322 3323static int osc_disconnect(struct obd_export *exp) 3324{ 3325 struct obd_device *obd = class_exp2obd(exp); 3326 struct llog_ctxt *ctxt; 3327 int rc; 3328 3329 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT); 3330 if (ctxt) { 3331 if (obd->u.cli.cl_conn_count == 1) { 3332 /* Flush any remaining cancel messages out to the 3333 * target */ 3334 llog_sync(ctxt, exp, 0); 3335 } 3336 llog_ctxt_put(ctxt); 3337 } else { 3338 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n", 3339 obd); 3340 } 3341 3342 rc = client_disconnect_export(exp); 3343 /** 3344 * Initially we put del_shrink_grant before disconnect_export, but it 3345 * causes the following problem if setup (connect) and cleanup 3346 * (disconnect) are tangled together. 3347 * connect p1 disconnect p2 3348 * ptlrpc_connect_import 3349 * ............... class_manual_cleanup 3350 * osc_disconnect 3351 * del_shrink_grant 3352 * ptlrpc_connect_interrupt 3353 * init_grant_shrink 3354 * add this client to shrink list 3355 * cleanup_osc 3356 * Bang! pinger trigger the shrink. 3357 * So the osc should be disconnected from the shrink list, after we 3358 * are sure the import has been destroyed. BUG18662 3359 */ 3360 if (obd->u.cli.cl_import == NULL) 3361 osc_del_shrink_grant(&obd->u.cli); 3362 return rc; 3363} 3364 3365static int osc_import_event(struct obd_device *obd, 3366 struct obd_import *imp, 3367 enum obd_import_event event) 3368{ 3369 struct client_obd *cli; 3370 int rc = 0; 3371 3372 ENTRY; 3373 LASSERT(imp->imp_obd == obd); 3374 3375 switch (event) { 3376 case IMP_EVENT_DISCON: { 3377 cli = &obd->u.cli; 3378 client_obd_list_lock(&cli->cl_loi_list_lock); 3379 cli->cl_avail_grant = 0; 3380 cli->cl_lost_grant = 0; 3381 client_obd_list_unlock(&cli->cl_loi_list_lock); 3382 break; 3383 } 3384 case IMP_EVENT_INACTIVE: { 3385 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL); 3386 break; 3387 } 3388 case IMP_EVENT_INVALIDATE: { 3389 struct ldlm_namespace *ns = obd->obd_namespace; 3390 struct lu_env *env; 3391 int refcheck; 3392 3393 env = cl_env_get(&refcheck); 3394 if (!IS_ERR(env)) { 3395 /* Reset grants */ 3396 cli = &obd->u.cli; 3397 /* all pages go to failing rpcs due to the invalid 3398 * import */ 3399 osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND); 3400 3401 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY); 3402 cl_env_put(env, &refcheck); 3403 } else 3404 rc = PTR_ERR(env); 3405 break; 3406 } 3407 case IMP_EVENT_ACTIVE: { 3408 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL); 3409 break; 3410 } 3411 case IMP_EVENT_OCD: { 3412 struct obd_connect_data *ocd = &imp->imp_connect_data; 3413 3414 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT) 3415 osc_init_grant(&obd->u.cli, ocd); 3416 3417 /* See bug 7198 */ 3418 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL) 3419 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL; 3420 3421 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL); 3422 break; 3423 } 3424 case IMP_EVENT_DEACTIVATE: { 3425 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL); 3426 break; 3427 } 3428 case IMP_EVENT_ACTIVATE: { 3429 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL); 3430 break; 3431 } 3432 default: 3433 CERROR("Unknown import event %d\n", event); 3434 LBUG(); 3435 } 3436 RETURN(rc); 3437} 3438 3439/** 3440 * Determine whether the lock can be canceled before replaying the lock 3441 * during recovery, see bug16774 for detailed information. 3442 * 3443 * \retval zero the lock can't be canceled 3444 * \retval other ok to cancel 3445 */ 3446static int osc_cancel_for_recovery(struct ldlm_lock *lock) 3447{ 3448 check_res_locked(lock->l_resource); 3449 3450 /* 3451 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR. 3452 * 3453 * XXX as a future improvement, we can also cancel unused write lock 3454 * if it doesn't have dirty data and active mmaps. 3455 */ 3456 if (lock->l_resource->lr_type == LDLM_EXTENT && 3457 (lock->l_granted_mode == LCK_PR || 3458 lock->l_granted_mode == LCK_CR) && 3459 (osc_dlm_lock_pageref(lock) == 0)) 3460 RETURN(1); 3461 3462 RETURN(0); 3463} 3464 3465static int brw_queue_work(const struct lu_env *env, void *data) 3466{ 3467 struct client_obd *cli = data; 3468 3469 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli); 3470 3471 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME); 3472 RETURN(0); 3473} 3474 3475int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) 3476{ 3477 struct lprocfs_static_vars lvars = { 0 }; 3478 struct client_obd *cli = &obd->u.cli; 3479 void *handler; 3480 int rc; 3481 ENTRY; 3482 3483 rc = ptlrpcd_addref(); 3484 if (rc) 3485 RETURN(rc); 3486 3487 rc = client_obd_setup(obd, lcfg); 3488 if (rc) 3489 GOTO(out_ptlrpcd, rc); 3490 3491 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli); 3492 if (IS_ERR(handler)) 3493 GOTO(out_client_setup, rc = PTR_ERR(handler)); 3494 cli->cl_writeback_work = handler; 3495 3496 rc = osc_quota_setup(obd); 3497 if (rc) 3498 GOTO(out_ptlrpcd_work, rc); 3499 3500 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL; 3501 lprocfs_osc_init_vars(&lvars); 3502 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) { 3503 lproc_osc_attach_seqstat(obd); 3504 sptlrpc_lprocfs_cliobd_attach(obd); 3505 ptlrpc_lprocfs_register_obd(obd); 3506 } 3507 3508 /* We need to allocate a few requests more, because 3509 * brw_interpret tries to create new requests before freeing 3510 * previous ones, Ideally we want to have 2x max_rpcs_in_flight 3511 * reserved, but I'm afraid that might be too much wasted RAM 3512 * in fact, so 2 is just my guess and still should work. */ 3513 cli->cl_import->imp_rq_pool = 3514 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2, 3515 OST_MAXREQSIZE, 3516 ptlrpc_add_rqs_to_pool); 3517 3518 INIT_LIST_HEAD(&cli->cl_grant_shrink_list); 3519 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery); 3520 RETURN(rc); 3521 3522out_ptlrpcd_work: 3523 ptlrpcd_destroy_work(handler); 3524out_client_setup: 3525 client_obd_cleanup(obd); 3526out_ptlrpcd: 3527 ptlrpcd_decref(); 3528 RETURN(rc); 3529} 3530 3531static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) 3532{ 3533 int rc = 0; 3534 ENTRY; 3535 3536 switch (stage) { 3537 case OBD_CLEANUP_EARLY: { 3538 struct obd_import *imp; 3539 imp = obd->u.cli.cl_import; 3540 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name); 3541 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */ 3542 ptlrpc_deactivate_import(imp); 3543 spin_lock(&imp->imp_lock); 3544 imp->imp_pingable = 0; 3545 spin_unlock(&imp->imp_lock); 3546 break; 3547 } 3548 case OBD_CLEANUP_EXPORTS: { 3549 struct client_obd *cli = &obd->u.cli; 3550 /* LU-464 3551 * for echo client, export may be on zombie list, wait for 3552 * zombie thread to cull it, because cli.cl_import will be 3553 * cleared in client_disconnect_export(): 3554 * class_export_destroy() -> obd_cleanup() -> 3555 * echo_device_free() -> echo_client_cleanup() -> 3556 * obd_disconnect() -> osc_disconnect() -> 3557 * client_disconnect_export() 3558 */ 3559 obd_zombie_barrier(); 3560 if (cli->cl_writeback_work) { 3561 ptlrpcd_destroy_work(cli->cl_writeback_work); 3562 cli->cl_writeback_work = NULL; 3563 } 3564 obd_cleanup_client_import(obd); 3565 ptlrpc_lprocfs_unregister_obd(obd); 3566 lprocfs_obd_cleanup(obd); 3567 rc = obd_llog_finish(obd, 0); 3568 if (rc != 0) 3569 CERROR("failed to cleanup llogging subsystems\n"); 3570 break; 3571 } 3572 } 3573 RETURN(rc); 3574} 3575 3576int osc_cleanup(struct obd_device *obd) 3577{ 3578 struct client_obd *cli = &obd->u.cli; 3579 int rc; 3580 3581 ENTRY; 3582 3583 /* lru cleanup */ 3584 if (cli->cl_cache != NULL) { 3585 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0); 3586 spin_lock(&cli->cl_cache->ccc_lru_lock); 3587 list_del_init(&cli->cl_lru_osc); 3588 spin_unlock(&cli->cl_cache->ccc_lru_lock); 3589 cli->cl_lru_left = NULL; 3590 atomic_dec(&cli->cl_cache->ccc_users); 3591 cli->cl_cache = NULL; 3592 } 3593 3594 /* free memory of osc quota cache */ 3595 osc_quota_cleanup(obd); 3596 3597 rc = client_obd_cleanup(obd); 3598 3599 ptlrpcd_decref(); 3600 RETURN(rc); 3601} 3602 3603int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg) 3604{ 3605 struct lprocfs_static_vars lvars = { 0 }; 3606 int rc = 0; 3607 3608 lprocfs_osc_init_vars(&lvars); 3609 3610 switch (lcfg->lcfg_command) { 3611 default: 3612 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, 3613 lcfg, obd); 3614 if (rc > 0) 3615 rc = 0; 3616 break; 3617 } 3618 3619 return(rc); 3620} 3621 3622static int osc_process_config(struct obd_device *obd, obd_count len, void *buf) 3623{ 3624 return osc_process_config_base(obd, buf); 3625} 3626 3627struct obd_ops osc_obd_ops = { 3628 .o_owner = THIS_MODULE, 3629 .o_setup = osc_setup, 3630 .o_precleanup = osc_precleanup, 3631 .o_cleanup = osc_cleanup, 3632 .o_add_conn = client_import_add_conn, 3633 .o_del_conn = client_import_del_conn, 3634 .o_connect = client_connect_import, 3635 .o_reconnect = osc_reconnect, 3636 .o_disconnect = osc_disconnect, 3637 .o_statfs = osc_statfs, 3638 .o_statfs_async = osc_statfs_async, 3639 .o_packmd = osc_packmd, 3640 .o_unpackmd = osc_unpackmd, 3641 .o_create = osc_create, 3642 .o_destroy = osc_destroy, 3643 .o_getattr = osc_getattr, 3644 .o_getattr_async = osc_getattr_async, 3645 .o_setattr = osc_setattr, 3646 .o_setattr_async = osc_setattr_async, 3647 .o_brw = osc_brw, 3648 .o_punch = osc_punch, 3649 .o_sync = osc_sync, 3650 .o_enqueue = osc_enqueue, 3651 .o_change_cbdata = osc_change_cbdata, 3652 .o_find_cbdata = osc_find_cbdata, 3653 .o_cancel = osc_cancel, 3654 .o_cancel_unused = osc_cancel_unused, 3655 .o_iocontrol = osc_iocontrol, 3656 .o_get_info = osc_get_info, 3657 .o_set_info_async = osc_set_info_async, 3658 .o_import_event = osc_import_event, 3659 .o_llog_init = osc_llog_init, 3660 .o_llog_finish = osc_llog_finish, 3661 .o_process_config = osc_process_config, 3662 .o_quotactl = osc_quotactl, 3663 .o_quotacheck = osc_quotacheck, 3664}; 3665 3666extern struct lu_kmem_descr osc_caches[]; 3667extern spinlock_t osc_ast_guard; 3668extern struct lock_class_key osc_ast_guard_class; 3669 3670int __init osc_init(void) 3671{ 3672 struct lprocfs_static_vars lvars = { 0 }; 3673 int rc; 3674 ENTRY; 3675 3676 /* print an address of _any_ initialized kernel symbol from this 3677 * module, to allow debugging with gdb that doesn't support data 3678 * symbols from modules.*/ 3679 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches); 3680 3681 rc = lu_kmem_init(osc_caches); 3682 3683 lprocfs_osc_init_vars(&lvars); 3684 3685 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars, 3686 LUSTRE_OSC_NAME, &osc_device_type); 3687 if (rc) { 3688 lu_kmem_fini(osc_caches); 3689 RETURN(rc); 3690 } 3691 3692 spin_lock_init(&osc_ast_guard); 3693 lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class); 3694 3695 RETURN(rc); 3696} 3697 3698static void /*__exit*/ osc_exit(void) 3699{ 3700 class_unregister_type(LUSTRE_OSC_NAME); 3701 lu_kmem_fini(osc_caches); 3702} 3703 3704MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>"); 3705MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)"); 3706MODULE_LICENSE("GPL"); 3707 3708cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit); 3709