osc_request.c revision 26c4ea46a55c9056fa20e3c91b1989f3cd9473d7
1/* 2 * GPL HEADER START 3 * 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License version 2 only, 8 * as published by the Free Software Foundation. 9 * 10 * This program is distributed in the hope that it will be useful, but 11 * WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * General Public License version 2 for more details (a copy is included 14 * in the LICENSE file that accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License 17 * version 2 along with this program; If not, see 18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf 19 * 20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 21 * CA 95054 USA or visit www.sun.com if you need additional information or 22 * have any questions. 23 * 24 * GPL HEADER END 25 */ 26/* 27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. 28 * Use is subject to license terms. 29 * 30 * Copyright (c) 2011, 2012, Intel Corporation. 31 */ 32/* 33 * This file is part of Lustre, http://www.lustre.org/ 34 * Lustre is a trademark of Sun Microsystems, Inc. 35 */ 36 37#define DEBUG_SUBSYSTEM S_OSC 38 39#include "../../include/linux/libcfs/libcfs.h" 40 41 42#include "../include/lustre_dlm.h" 43#include "../include/lustre_net.h" 44#include "../include/lustre/lustre_user.h" 45#include "../include/obd_cksum.h" 46 47#include "../include/lustre_ha.h" 48#include "../include/lprocfs_status.h" 49#include "../include/lustre_debug.h" 50#include "../include/lustre_param.h" 51#include "../include/lustre_fid.h" 52#include "../include/obd_class.h" 53#include "osc_internal.h" 54#include "osc_cl_internal.h" 55 56struct osc_brw_async_args { 57 struct obdo *aa_oa; 58 int aa_requested_nob; 59 int aa_nio_count; 60 u32 aa_page_count; 61 int aa_resends; 62 struct brw_page **aa_ppga; 63 struct client_obd *aa_cli; 64 struct list_head aa_oaps; 65 struct list_head aa_exts; 66 struct obd_capa *aa_ocapa; 67 struct cl_req *aa_clerq; 68}; 69 70struct osc_async_args { 71 struct obd_info *aa_oi; 72}; 73 74struct osc_setattr_args { 75 struct obdo *sa_oa; 76 obd_enqueue_update_f sa_upcall; 77 void *sa_cookie; 78}; 79 80struct osc_fsync_args { 81 struct obd_info *fa_oi; 82 obd_enqueue_update_f fa_upcall; 83 void *fa_cookie; 84}; 85 86struct osc_enqueue_args { 87 struct obd_export *oa_exp; 88 __u64 *oa_flags; 89 obd_enqueue_update_f oa_upcall; 90 void *oa_cookie; 91 struct ost_lvb *oa_lvb; 92 struct lustre_handle *oa_lockh; 93 struct ldlm_enqueue_info *oa_ei; 94 unsigned int oa_agl:1; 95}; 96 97static void osc_release_ppga(struct brw_page **ppga, u32 count); 98static int brw_interpret(const struct lu_env *env, 99 struct ptlrpc_request *req, void *data, int rc); 100int osc_cleanup(struct obd_device *obd); 101 102/* Pack OSC object metadata for disk storage (LE byte order). */ 103static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp, 104 struct lov_stripe_md *lsm) 105{ 106 int lmm_size; 107 108 lmm_size = sizeof(**lmmp); 109 if (lmmp == NULL) 110 return lmm_size; 111 112 if (*lmmp != NULL && lsm == NULL) { 113 OBD_FREE(*lmmp, lmm_size); 114 *lmmp = NULL; 115 return 0; 116 } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) { 117 return -EBADF; 118 } 119 120 if (*lmmp == NULL) { 121 OBD_ALLOC(*lmmp, lmm_size); 122 if (*lmmp == NULL) 123 return -ENOMEM; 124 } 125 126 if (lsm) 127 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi); 128 129 return lmm_size; 130} 131 132/* Unpack OSC object metadata from disk storage (LE byte order). */ 133static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, 134 struct lov_mds_md *lmm, int lmm_bytes) 135{ 136 int lsm_size; 137 struct obd_import *imp = class_exp2cliimp(exp); 138 139 if (lmm != NULL) { 140 if (lmm_bytes < sizeof(*lmm)) { 141 CERROR("%s: lov_mds_md too small: %d, need %d\n", 142 exp->exp_obd->obd_name, lmm_bytes, 143 (int)sizeof(*lmm)); 144 return -EINVAL; 145 } 146 /* XXX LOV_MAGIC etc check? */ 147 148 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) { 149 CERROR("%s: zero lmm_object_id: rc = %d\n", 150 exp->exp_obd->obd_name, -EINVAL); 151 return -EINVAL; 152 } 153 } 154 155 lsm_size = lov_stripe_md_size(1); 156 if (lsmp == NULL) 157 return lsm_size; 158 159 if (*lsmp != NULL && lmm == NULL) { 160 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo)); 161 OBD_FREE(*lsmp, lsm_size); 162 *lsmp = NULL; 163 return 0; 164 } 165 166 if (*lsmp == NULL) { 167 OBD_ALLOC(*lsmp, lsm_size); 168 if (unlikely(*lsmp == NULL)) 169 return -ENOMEM; 170 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo)); 171 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) { 172 OBD_FREE(*lsmp, lsm_size); 173 return -ENOMEM; 174 } 175 loi_init((*lsmp)->lsm_oinfo[0]); 176 } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) { 177 return -EBADF; 178 } 179 180 if (lmm != NULL) 181 /* XXX zero *lsmp? */ 182 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi); 183 184 if (imp != NULL && 185 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES)) 186 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes; 187 else 188 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES; 189 190 return lsm_size; 191} 192 193static inline void osc_pack_capa(struct ptlrpc_request *req, 194 struct ost_body *body, void *capa) 195{ 196 struct obd_capa *oc = (struct obd_capa *)capa; 197 struct lustre_capa *c; 198 199 if (!capa) 200 return; 201 202 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1); 203 LASSERT(c); 204 capa_cpy(c, oc); 205 body->oa.o_valid |= OBD_MD_FLOSSCAPA; 206 DEBUG_CAPA(D_SEC, c, "pack"); 207} 208 209static inline void osc_pack_req_body(struct ptlrpc_request *req, 210 struct obd_info *oinfo) 211{ 212 struct ost_body *body; 213 214 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); 215 LASSERT(body); 216 217 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, 218 oinfo->oi_oa); 219 osc_pack_capa(req, body, oinfo->oi_capa); 220} 221 222static inline void osc_set_capa_size(struct ptlrpc_request *req, 223 const struct req_msg_field *field, 224 struct obd_capa *oc) 225{ 226 if (oc == NULL) 227 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0); 228 else 229 /* it is already calculated as sizeof struct obd_capa */ 230 ; 231} 232 233static int osc_getattr_interpret(const struct lu_env *env, 234 struct ptlrpc_request *req, 235 struct osc_async_args *aa, int rc) 236{ 237 struct ost_body *body; 238 239 if (rc != 0) 240 goto out; 241 242 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); 243 if (body) { 244 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); 245 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, 246 aa->aa_oi->oi_oa, &body->oa); 247 248 /* This should really be sent by the OST */ 249 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE; 250 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ; 251 } else { 252 CDEBUG(D_INFO, "can't unpack ost_body\n"); 253 rc = -EPROTO; 254 aa->aa_oi->oi_oa->o_valid = 0; 255 } 256out: 257 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc); 258 return rc; 259} 260 261static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo, 262 struct ptlrpc_request_set *set) 263{ 264 struct ptlrpc_request *req; 265 struct osc_async_args *aa; 266 int rc; 267 268 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR); 269 if (req == NULL) 270 return -ENOMEM; 271 272 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); 273 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR); 274 if (rc) { 275 ptlrpc_request_free(req); 276 return rc; 277 } 278 279 osc_pack_req_body(req, oinfo); 280 281 ptlrpc_request_set_replen(req); 282 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret; 283 284 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); 285 aa = ptlrpc_req_async_args(req); 286 aa->aa_oi = oinfo; 287 288 ptlrpc_set_add_req(set, req); 289 return 0; 290} 291 292static int osc_getattr(const struct lu_env *env, struct obd_export *exp, 293 struct obd_info *oinfo) 294{ 295 struct ptlrpc_request *req; 296 struct ost_body *body; 297 int rc; 298 299 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR); 300 if (req == NULL) 301 return -ENOMEM; 302 303 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); 304 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR); 305 if (rc) { 306 ptlrpc_request_free(req); 307 return rc; 308 } 309 310 osc_pack_req_body(req, oinfo); 311 312 ptlrpc_request_set_replen(req); 313 314 rc = ptlrpc_queue_wait(req); 315 if (rc) 316 goto out; 317 318 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); 319 if (body == NULL) { 320 rc = -EPROTO; 321 goto out; 322 } 323 324 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode); 325 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa, 326 &body->oa); 327 328 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd); 329 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ; 330 331 out: 332 ptlrpc_req_finished(req); 333 return rc; 334} 335 336static int osc_setattr(const struct lu_env *env, struct obd_export *exp, 337 struct obd_info *oinfo, struct obd_trans_info *oti) 338{ 339 struct ptlrpc_request *req; 340 struct ost_body *body; 341 int rc; 342 343 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP); 344 345 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR); 346 if (req == NULL) 347 return -ENOMEM; 348 349 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); 350 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR); 351 if (rc) { 352 ptlrpc_request_free(req); 353 return rc; 354 } 355 356 osc_pack_req_body(req, oinfo); 357 358 ptlrpc_request_set_replen(req); 359 360 rc = ptlrpc_queue_wait(req); 361 if (rc) 362 goto out; 363 364 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); 365 if (body == NULL) { 366 rc = -EPROTO; 367 goto out; 368 } 369 370 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa, 371 &body->oa); 372 373out: 374 ptlrpc_req_finished(req); 375 return rc; 376} 377 378static int osc_setattr_interpret(const struct lu_env *env, 379 struct ptlrpc_request *req, 380 struct osc_setattr_args *sa, int rc) 381{ 382 struct ost_body *body; 383 384 if (rc != 0) 385 goto out; 386 387 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); 388 if (body == NULL) { 389 rc = -EPROTO; 390 goto out; 391 } 392 393 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa, 394 &body->oa); 395out: 396 rc = sa->sa_upcall(sa->sa_cookie, rc); 397 return rc; 398} 399 400int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo, 401 struct obd_trans_info *oti, 402 obd_enqueue_update_f upcall, void *cookie, 403 struct ptlrpc_request_set *rqset) 404{ 405 struct ptlrpc_request *req; 406 struct osc_setattr_args *sa; 407 int rc; 408 409 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR); 410 if (req == NULL) 411 return -ENOMEM; 412 413 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); 414 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR); 415 if (rc) { 416 ptlrpc_request_free(req); 417 return rc; 418 } 419 420 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) 421 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies; 422 423 osc_pack_req_body(req, oinfo); 424 425 ptlrpc_request_set_replen(req); 426 427 /* do mds to ost setattr asynchronously */ 428 if (!rqset) { 429 /* Do not wait for response. */ 430 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); 431 } else { 432 req->rq_interpret_reply = 433 (ptlrpc_interpterer_t)osc_setattr_interpret; 434 435 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args)); 436 sa = ptlrpc_req_async_args(req); 437 sa->sa_oa = oinfo->oi_oa; 438 sa->sa_upcall = upcall; 439 sa->sa_cookie = cookie; 440 441 if (rqset == PTLRPCD_SET) 442 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); 443 else 444 ptlrpc_set_add_req(rqset, req); 445 } 446 447 return 0; 448} 449 450static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo, 451 struct obd_trans_info *oti, 452 struct ptlrpc_request_set *rqset) 453{ 454 return osc_setattr_async_base(exp, oinfo, oti, 455 oinfo->oi_cb_up, oinfo, rqset); 456} 457 458int osc_real_create(struct obd_export *exp, struct obdo *oa, 459 struct lov_stripe_md **ea, struct obd_trans_info *oti) 460{ 461 struct ptlrpc_request *req; 462 struct ost_body *body; 463 struct lov_stripe_md *lsm; 464 int rc; 465 466 LASSERT(oa); 467 LASSERT(ea); 468 469 lsm = *ea; 470 if (!lsm) { 471 rc = obd_alloc_memmd(exp, &lsm); 472 if (rc < 0) 473 return rc; 474 } 475 476 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE); 477 if (req == NULL) { 478 rc = -ENOMEM; 479 goto out; 480 } 481 482 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE); 483 if (rc) { 484 ptlrpc_request_free(req); 485 goto out; 486 } 487 488 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); 489 LASSERT(body); 490 491 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); 492 493 ptlrpc_request_set_replen(req); 494 495 if ((oa->o_valid & OBD_MD_FLFLAGS) && 496 oa->o_flags == OBD_FL_DELORPHAN) { 497 DEBUG_REQ(D_HA, req, 498 "delorphan from OST integration"); 499 /* Don't resend the delorphan req */ 500 req->rq_no_resend = req->rq_no_delay = 1; 501 } 502 503 rc = ptlrpc_queue_wait(req); 504 if (rc) 505 goto out_req; 506 507 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); 508 if (body == NULL) { 509 rc = -EPROTO; 510 goto out_req; 511 } 512 513 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags); 514 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa); 515 516 oa->o_blksize = cli_brw_size(exp->exp_obd); 517 oa->o_valid |= OBD_MD_FLBLKSZ; 518 519 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not 520 * have valid lsm_oinfo data structs, so don't go touching that. 521 * This needs to be fixed in a big way. 522 */ 523 lsm->lsm_oi = oa->o_oi; 524 *ea = lsm; 525 526 if (oti != NULL) { 527 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg); 528 529 if (oa->o_valid & OBD_MD_FLCOOKIE) { 530 if (!oti->oti_logcookies) 531 oti_alloc_cookies(oti, 1); 532 *oti->oti_logcookies = oa->o_lcookie; 533 } 534 } 535 536 CDEBUG(D_HA, "transno: %lld\n", 537 lustre_msg_get_transno(req->rq_repmsg)); 538out_req: 539 ptlrpc_req_finished(req); 540out: 541 if (rc && !*ea) 542 obd_free_memmd(exp, &lsm); 543 return rc; 544} 545 546int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo, 547 obd_enqueue_update_f upcall, void *cookie, 548 struct ptlrpc_request_set *rqset) 549{ 550 struct ptlrpc_request *req; 551 struct osc_setattr_args *sa; 552 struct ost_body *body; 553 int rc; 554 555 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH); 556 if (req == NULL) 557 return -ENOMEM; 558 559 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); 560 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH); 561 if (rc) { 562 ptlrpc_request_free(req); 563 return rc; 564 } 565 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */ 566 ptlrpc_at_set_req_timeout(req); 567 568 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); 569 LASSERT(body); 570 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, 571 oinfo->oi_oa); 572 osc_pack_capa(req, body, oinfo->oi_capa); 573 574 ptlrpc_request_set_replen(req); 575 576 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret; 577 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args)); 578 sa = ptlrpc_req_async_args(req); 579 sa->sa_oa = oinfo->oi_oa; 580 sa->sa_upcall = upcall; 581 sa->sa_cookie = cookie; 582 if (rqset == PTLRPCD_SET) 583 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); 584 else 585 ptlrpc_set_add_req(rqset, req); 586 587 return 0; 588} 589 590static int osc_sync_interpret(const struct lu_env *env, 591 struct ptlrpc_request *req, 592 void *arg, int rc) 593{ 594 struct osc_fsync_args *fa = arg; 595 struct ost_body *body; 596 597 if (rc) 598 goto out; 599 600 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); 601 if (body == NULL) { 602 CERROR ("can't unpack ost_body\n"); 603 rc = -EPROTO; 604 goto out; 605 } 606 607 *fa->fa_oi->oi_oa = body->oa; 608out: 609 rc = fa->fa_upcall(fa->fa_cookie, rc); 610 return rc; 611} 612 613int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo, 614 obd_enqueue_update_f upcall, void *cookie, 615 struct ptlrpc_request_set *rqset) 616{ 617 struct ptlrpc_request *req; 618 struct ost_body *body; 619 struct osc_fsync_args *fa; 620 int rc; 621 622 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC); 623 if (req == NULL) 624 return -ENOMEM; 625 626 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa); 627 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC); 628 if (rc) { 629 ptlrpc_request_free(req); 630 return rc; 631 } 632 633 /* overload the size and blocks fields in the oa with start/end */ 634 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); 635 LASSERT(body); 636 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, 637 oinfo->oi_oa); 638 osc_pack_capa(req, body, oinfo->oi_capa); 639 640 ptlrpc_request_set_replen(req); 641 req->rq_interpret_reply = osc_sync_interpret; 642 643 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args)); 644 fa = ptlrpc_req_async_args(req); 645 fa->fa_oi = oinfo; 646 fa->fa_upcall = upcall; 647 fa->fa_cookie = cookie; 648 649 if (rqset == PTLRPCD_SET) 650 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); 651 else 652 ptlrpc_set_add_req(rqset, req); 653 654 return 0; 655} 656 657/* Find and cancel locally locks matched by @mode in the resource found by 658 * @objid. Found locks are added into @cancel list. Returns the amount of 659 * locks added to @cancels list. */ 660static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa, 661 struct list_head *cancels, 662 ldlm_mode_t mode, __u64 lock_flags) 663{ 664 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; 665 struct ldlm_res_id res_id; 666 struct ldlm_resource *res; 667 int count; 668 669 /* Return, i.e. cancel nothing, only if ELC is supported (flag in 670 * export) but disabled through procfs (flag in NS). 671 * 672 * This distinguishes from a case when ELC is not supported originally, 673 * when we still want to cancel locks in advance and just cancel them 674 * locally, without sending any RPC. */ 675 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns)) 676 return 0; 677 678 ostid_build_res_name(&oa->o_oi, &res_id); 679 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0); 680 if (res == NULL) 681 return 0; 682 683 LDLM_RESOURCE_ADDREF(res); 684 count = ldlm_cancel_resource_local(res, cancels, NULL, mode, 685 lock_flags, 0, NULL); 686 LDLM_RESOURCE_DELREF(res); 687 ldlm_resource_putref(res); 688 return count; 689} 690 691static int osc_destroy_interpret(const struct lu_env *env, 692 struct ptlrpc_request *req, void *data, 693 int rc) 694{ 695 struct client_obd *cli = &req->rq_import->imp_obd->u.cli; 696 697 atomic_dec(&cli->cl_destroy_in_flight); 698 wake_up(&cli->cl_destroy_waitq); 699 return 0; 700} 701 702static int osc_can_send_destroy(struct client_obd *cli) 703{ 704 if (atomic_inc_return(&cli->cl_destroy_in_flight) <= 705 cli->cl_max_rpcs_in_flight) { 706 /* The destroy request can be sent */ 707 return 1; 708 } 709 if (atomic_dec_return(&cli->cl_destroy_in_flight) < 710 cli->cl_max_rpcs_in_flight) { 711 /* 712 * The counter has been modified between the two atomic 713 * operations. 714 */ 715 wake_up(&cli->cl_destroy_waitq); 716 } 717 return 0; 718} 719 720int osc_create(const struct lu_env *env, struct obd_export *exp, 721 struct obdo *oa, struct lov_stripe_md **ea, 722 struct obd_trans_info *oti) 723{ 724 int rc = 0; 725 726 LASSERT(oa); 727 LASSERT(ea); 728 LASSERT(oa->o_valid & OBD_MD_FLGROUP); 729 730 if ((oa->o_valid & OBD_MD_FLFLAGS) && 731 oa->o_flags == OBD_FL_RECREATE_OBJS) { 732 return osc_real_create(exp, oa, ea, oti); 733 } 734 735 if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi))) 736 return osc_real_create(exp, oa, ea, oti); 737 738 /* we should not get here anymore */ 739 LBUG(); 740 741 return rc; 742} 743 744/* Destroy requests can be async always on the client, and we don't even really 745 * care about the return code since the client cannot do anything at all about 746 * a destroy failure. 747 * When the MDS is unlinking a filename, it saves the file objects into a 748 * recovery llog, and these object records are cancelled when the OST reports 749 * they were destroyed and sync'd to disk (i.e. transaction committed). 750 * If the client dies, or the OST is down when the object should be destroyed, 751 * the records are not cancelled, and when the OST reconnects to the MDS next, 752 * it will retrieve the llog unlink logs and then sends the log cancellation 753 * cookies to the MDS after committing destroy transactions. */ 754static int osc_destroy(const struct lu_env *env, struct obd_export *exp, 755 struct obdo *oa, struct lov_stripe_md *ea, 756 struct obd_trans_info *oti, struct obd_export *md_export, 757 void *capa) 758{ 759 struct client_obd *cli = &exp->exp_obd->u.cli; 760 struct ptlrpc_request *req; 761 struct ost_body *body; 762 LIST_HEAD(cancels); 763 int rc, count; 764 765 if (!oa) { 766 CDEBUG(D_INFO, "oa NULL\n"); 767 return -EINVAL; 768 } 769 770 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW, 771 LDLM_FL_DISCARD_DATA); 772 773 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY); 774 if (req == NULL) { 775 ldlm_lock_list_put(&cancels, l_bl_ast, count); 776 return -ENOMEM; 777 } 778 779 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa); 780 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY, 781 0, &cancels, count); 782 if (rc) { 783 ptlrpc_request_free(req); 784 return rc; 785 } 786 787 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */ 788 ptlrpc_at_set_req_timeout(req); 789 790 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) 791 oa->o_lcookie = *oti->oti_logcookies; 792 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); 793 LASSERT(body); 794 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); 795 796 osc_pack_capa(req, body, (struct obd_capa *)capa); 797 ptlrpc_request_set_replen(req); 798 799 /* If osc_destroy is for destroying the unlink orphan, 800 * sent from MDT to OST, which should not be blocked here, 801 * because the process might be triggered by ptlrpcd, and 802 * it is not good to block ptlrpcd thread (b=16006)*/ 803 if (!(oa->o_flags & OBD_FL_DELORPHAN)) { 804 req->rq_interpret_reply = osc_destroy_interpret; 805 if (!osc_can_send_destroy(cli)) { 806 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, 807 NULL); 808 809 /* 810 * Wait until the number of on-going destroy RPCs drops 811 * under max_rpc_in_flight 812 */ 813 l_wait_event_exclusive(cli->cl_destroy_waitq, 814 osc_can_send_destroy(cli), &lwi); 815 } 816 } 817 818 /* Do not wait for response */ 819 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); 820 return 0; 821} 822 823static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, 824 long writing_bytes) 825{ 826 u32 bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT; 827 828 LASSERT(!(oa->o_valid & bits)); 829 830 oa->o_valid |= bits; 831 client_obd_list_lock(&cli->cl_loi_list_lock); 832 oa->o_dirty = cli->cl_dirty; 833 if (unlikely(cli->cl_dirty - cli->cl_dirty_transit > 834 cli->cl_dirty_max)) { 835 CERROR("dirty %lu - %lu > dirty_max %lu\n", 836 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max); 837 oa->o_undirty = 0; 838 } else if (unlikely(atomic_read(&obd_dirty_pages) - 839 atomic_read(&obd_dirty_transit_pages) > 840 (long)(obd_max_dirty_pages + 1))) { 841 /* The atomic_read() allowing the atomic_inc() are 842 * not covered by a lock thus they may safely race and trip 843 * this CERROR() unless we add in a small fudge factor (+1). */ 844 CERROR("dirty %d - %d > system dirty_max %d\n", 845 atomic_read(&obd_dirty_pages), 846 atomic_read(&obd_dirty_transit_pages), 847 obd_max_dirty_pages); 848 oa->o_undirty = 0; 849 } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) { 850 CERROR("dirty %lu - dirty_max %lu too big???\n", 851 cli->cl_dirty, cli->cl_dirty_max); 852 oa->o_undirty = 0; 853 } else { 854 long max_in_flight = (cli->cl_max_pages_per_rpc << 855 PAGE_CACHE_SHIFT)* 856 (cli->cl_max_rpcs_in_flight + 1); 857 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight); 858 } 859 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant; 860 oa->o_dropped = cli->cl_lost_grant; 861 cli->cl_lost_grant = 0; 862 client_obd_list_unlock(&cli->cl_loi_list_lock); 863 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n", 864 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant); 865 866} 867 868void osc_update_next_shrink(struct client_obd *cli) 869{ 870 cli->cl_next_shrink_grant = 871 cfs_time_shift(cli->cl_grant_shrink_interval); 872 CDEBUG(D_CACHE, "next time %ld to shrink grant \n", 873 cli->cl_next_shrink_grant); 874} 875 876static void __osc_update_grant(struct client_obd *cli, u64 grant) 877{ 878 client_obd_list_lock(&cli->cl_loi_list_lock); 879 cli->cl_avail_grant += grant; 880 client_obd_list_unlock(&cli->cl_loi_list_lock); 881} 882 883static void osc_update_grant(struct client_obd *cli, struct ost_body *body) 884{ 885 if (body->oa.o_valid & OBD_MD_FLGRANT) { 886 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant); 887 __osc_update_grant(cli, body->oa.o_grant); 888 } 889} 890 891static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, 892 u32 keylen, void *key, u32 vallen, 893 void *val, struct ptlrpc_request_set *set); 894 895static int osc_shrink_grant_interpret(const struct lu_env *env, 896 struct ptlrpc_request *req, 897 void *aa, int rc) 898{ 899 struct client_obd *cli = &req->rq_import->imp_obd->u.cli; 900 struct obdo *oa = ((struct osc_brw_async_args *)aa)->aa_oa; 901 struct ost_body *body; 902 903 if (rc != 0) { 904 __osc_update_grant(cli, oa->o_grant); 905 goto out; 906 } 907 908 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); 909 LASSERT(body); 910 osc_update_grant(cli, body); 911out: 912 OBDO_FREE(oa); 913 return rc; 914} 915 916static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa) 917{ 918 client_obd_list_lock(&cli->cl_loi_list_lock); 919 oa->o_grant = cli->cl_avail_grant / 4; 920 cli->cl_avail_grant -= oa->o_grant; 921 client_obd_list_unlock(&cli->cl_loi_list_lock); 922 if (!(oa->o_valid & OBD_MD_FLFLAGS)) { 923 oa->o_valid |= OBD_MD_FLFLAGS; 924 oa->o_flags = 0; 925 } 926 oa->o_flags |= OBD_FL_SHRINK_GRANT; 927 osc_update_next_shrink(cli); 928} 929 930/* Shrink the current grant, either from some large amount to enough for a 931 * full set of in-flight RPCs, or if we have already shrunk to that limit 932 * then to enough for a single RPC. This avoids keeping more grant than 933 * needed, and avoids shrinking the grant piecemeal. */ 934static int osc_shrink_grant(struct client_obd *cli) 935{ 936 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) * 937 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT); 938 939 client_obd_list_lock(&cli->cl_loi_list_lock); 940 if (cli->cl_avail_grant <= target_bytes) 941 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT; 942 client_obd_list_unlock(&cli->cl_loi_list_lock); 943 944 return osc_shrink_grant_to_target(cli, target_bytes); 945} 946 947int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes) 948{ 949 int rc = 0; 950 struct ost_body *body; 951 952 client_obd_list_lock(&cli->cl_loi_list_lock); 953 /* Don't shrink if we are already above or below the desired limit 954 * We don't want to shrink below a single RPC, as that will negatively 955 * impact block allocation and long-term performance. */ 956 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT) 957 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT; 958 959 if (target_bytes >= cli->cl_avail_grant) { 960 client_obd_list_unlock(&cli->cl_loi_list_lock); 961 return 0; 962 } 963 client_obd_list_unlock(&cli->cl_loi_list_lock); 964 965 OBD_ALLOC_PTR(body); 966 if (!body) 967 return -ENOMEM; 968 969 osc_announce_cached(cli, &body->oa, 0); 970 971 client_obd_list_lock(&cli->cl_loi_list_lock); 972 body->oa.o_grant = cli->cl_avail_grant - target_bytes; 973 cli->cl_avail_grant = target_bytes; 974 client_obd_list_unlock(&cli->cl_loi_list_lock); 975 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) { 976 body->oa.o_valid |= OBD_MD_FLFLAGS; 977 body->oa.o_flags = 0; 978 } 979 body->oa.o_flags |= OBD_FL_SHRINK_GRANT; 980 osc_update_next_shrink(cli); 981 982 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export, 983 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK, 984 sizeof(*body), body, NULL); 985 if (rc != 0) 986 __osc_update_grant(cli, body->oa.o_grant); 987 OBD_FREE_PTR(body); 988 return rc; 989} 990 991static int osc_should_shrink_grant(struct client_obd *client) 992{ 993 unsigned long time = cfs_time_current(); 994 unsigned long next_shrink = client->cl_next_shrink_grant; 995 996 if ((client->cl_import->imp_connect_data.ocd_connect_flags & 997 OBD_CONNECT_GRANT_SHRINK) == 0) 998 return 0; 999 1000 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) { 1001 /* Get the current RPC size directly, instead of going via: 1002 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export) 1003 * Keep comment here so that it can be found by searching. */ 1004 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT; 1005 1006 if (client->cl_import->imp_state == LUSTRE_IMP_FULL && 1007 client->cl_avail_grant > brw_size) 1008 return 1; 1009 else 1010 osc_update_next_shrink(client); 1011 } 1012 return 0; 1013} 1014 1015static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data) 1016{ 1017 struct client_obd *client; 1018 1019 list_for_each_entry(client, &item->ti_obd_list, 1020 cl_grant_shrink_list) { 1021 if (osc_should_shrink_grant(client)) 1022 osc_shrink_grant(client); 1023 } 1024 return 0; 1025} 1026 1027static int osc_add_shrink_grant(struct client_obd *client) 1028{ 1029 int rc; 1030 1031 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval, 1032 TIMEOUT_GRANT, 1033 osc_grant_shrink_grant_cb, NULL, 1034 &client->cl_grant_shrink_list); 1035 if (rc) { 1036 CERROR("add grant client %s error %d\n", 1037 client->cl_import->imp_obd->obd_name, rc); 1038 return rc; 1039 } 1040 CDEBUG(D_CACHE, "add grant client %s \n", 1041 client->cl_import->imp_obd->obd_name); 1042 osc_update_next_shrink(client); 1043 return 0; 1044} 1045 1046static int osc_del_shrink_grant(struct client_obd *client) 1047{ 1048 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list, 1049 TIMEOUT_GRANT); 1050} 1051 1052static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd) 1053{ 1054 /* 1055 * ocd_grant is the total grant amount we're expect to hold: if we've 1056 * been evicted, it's the new avail_grant amount, cl_dirty will drop 1057 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty. 1058 * 1059 * race is tolerable here: if we're evicted, but imp_state already 1060 * left EVICTED state, then cl_dirty must be 0 already. 1061 */ 1062 client_obd_list_lock(&cli->cl_loi_list_lock); 1063 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED) 1064 cli->cl_avail_grant = ocd->ocd_grant; 1065 else 1066 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty; 1067 1068 if (cli->cl_avail_grant < 0) { 1069 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n", 1070 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant, 1071 ocd->ocd_grant, cli->cl_dirty); 1072 /* workaround for servers which do not have the patch from 1073 * LU-2679 */ 1074 cli->cl_avail_grant = ocd->ocd_grant; 1075 } 1076 1077 /* determine the appropriate chunk size used by osc_extent. */ 1078 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize); 1079 client_obd_list_unlock(&cli->cl_loi_list_lock); 1080 1081 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld." 1082 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name, 1083 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits); 1084 1085 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK && 1086 list_empty(&cli->cl_grant_shrink_list)) 1087 osc_add_shrink_grant(cli); 1088} 1089 1090/* We assume that the reason this OSC got a short read is because it read 1091 * beyond the end of a stripe file; i.e. lustre is reading a sparse file 1092 * via the LOV, and it _knows_ it's reading inside the file, it's just that 1093 * this stripe never got written at or beyond this stripe offset yet. */ 1094static void handle_short_read(int nob_read, u32 page_count, 1095 struct brw_page **pga) 1096{ 1097 char *ptr; 1098 int i = 0; 1099 1100 /* skip bytes read OK */ 1101 while (nob_read > 0) { 1102 LASSERT (page_count > 0); 1103 1104 if (pga[i]->count > nob_read) { 1105 /* EOF inside this page */ 1106 ptr = kmap(pga[i]->pg) + 1107 (pga[i]->off & ~CFS_PAGE_MASK); 1108 memset(ptr + nob_read, 0, pga[i]->count - nob_read); 1109 kunmap(pga[i]->pg); 1110 page_count--; 1111 i++; 1112 break; 1113 } 1114 1115 nob_read -= pga[i]->count; 1116 page_count--; 1117 i++; 1118 } 1119 1120 /* zero remaining pages */ 1121 while (page_count-- > 0) { 1122 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK); 1123 memset(ptr, 0, pga[i]->count); 1124 kunmap(pga[i]->pg); 1125 i++; 1126 } 1127} 1128 1129static int check_write_rcs(struct ptlrpc_request *req, 1130 int requested_nob, int niocount, 1131 u32 page_count, struct brw_page **pga) 1132{ 1133 int i; 1134 __u32 *remote_rcs; 1135 1136 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS, 1137 sizeof(*remote_rcs) * 1138 niocount); 1139 if (remote_rcs == NULL) { 1140 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n"); 1141 return -EPROTO; 1142 } 1143 1144 /* return error if any niobuf was in error */ 1145 for (i = 0; i < niocount; i++) { 1146 if ((int)remote_rcs[i] < 0) 1147 return remote_rcs[i]; 1148 1149 if (remote_rcs[i] != 0) { 1150 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n", 1151 i, remote_rcs[i], req); 1152 return -EPROTO; 1153 } 1154 } 1155 1156 if (req->rq_bulk->bd_nob_transferred != requested_nob) { 1157 CERROR("Unexpected # bytes transferred: %d (requested %d)\n", 1158 req->rq_bulk->bd_nob_transferred, requested_nob); 1159 return -EPROTO; 1160 } 1161 1162 return 0; 1163} 1164 1165static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2) 1166{ 1167 if (p1->flag != p2->flag) { 1168 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE| 1169 OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA); 1170 1171 /* warn if we try to combine flags that we don't know to be 1172 * safe to combine */ 1173 if (unlikely((p1->flag & mask) != (p2->flag & mask))) { 1174 CWARN("Saw flags 0x%x and 0x%x in the same brw, please " 1175 "report this at http://bugs.whamcloud.com/\n", 1176 p1->flag, p2->flag); 1177 } 1178 return 0; 1179 } 1180 1181 return (p1->off + p1->count == p2->off); 1182} 1183 1184static u32 osc_checksum_bulk(int nob, u32 pg_count, 1185 struct brw_page **pga, int opc, 1186 cksum_type_t cksum_type) 1187{ 1188 __u32 cksum; 1189 int i = 0; 1190 struct cfs_crypto_hash_desc *hdesc; 1191 unsigned int bufsize; 1192 int err; 1193 unsigned char cfs_alg = cksum_obd2cfs(cksum_type); 1194 1195 LASSERT(pg_count > 0); 1196 1197 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0); 1198 if (IS_ERR(hdesc)) { 1199 CERROR("Unable to initialize checksum hash %s\n", 1200 cfs_crypto_hash_name(cfs_alg)); 1201 return PTR_ERR(hdesc); 1202 } 1203 1204 while (nob > 0 && pg_count > 0) { 1205 int count = pga[i]->count > nob ? nob : pga[i]->count; 1206 1207 /* corrupt the data before we compute the checksum, to 1208 * simulate an OST->client data error */ 1209 if (i == 0 && opc == OST_READ && 1210 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) { 1211 unsigned char *ptr = kmap(pga[i]->pg); 1212 int off = pga[i]->off & ~CFS_PAGE_MASK; 1213 memcpy(ptr + off, "bad1", min(4, nob)); 1214 kunmap(pga[i]->pg); 1215 } 1216 cfs_crypto_hash_update_page(hdesc, pga[i]->pg, 1217 pga[i]->off & ~CFS_PAGE_MASK, 1218 count); 1219 CDEBUG(D_PAGE, 1220 "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n", 1221 pga[i]->pg, pga[i]->pg->mapping, pga[i]->pg->index, 1222 (long)pga[i]->pg->flags, page_count(pga[i]->pg), 1223 page_private(pga[i]->pg), 1224 (int)(pga[i]->off & ~CFS_PAGE_MASK)); 1225 1226 nob -= pga[i]->count; 1227 pg_count--; 1228 i++; 1229 } 1230 1231 bufsize = 4; 1232 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize); 1233 1234 if (err) 1235 cfs_crypto_hash_final(hdesc, NULL, NULL); 1236 1237 /* For sending we only compute the wrong checksum instead 1238 * of corrupting the data so it is still correct on a redo */ 1239 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND)) 1240 cksum++; 1241 1242 return cksum; 1243} 1244 1245static int osc_brw_prep_request(int cmd, struct client_obd *cli, 1246 struct obdo *oa, 1247 struct lov_stripe_md *lsm, u32 page_count, 1248 struct brw_page **pga, 1249 struct ptlrpc_request **reqp, 1250 struct obd_capa *ocapa, int reserve, 1251 int resend) 1252{ 1253 struct ptlrpc_request *req; 1254 struct ptlrpc_bulk_desc *desc; 1255 struct ost_body *body; 1256 struct obd_ioobj *ioobj; 1257 struct niobuf_remote *niobuf; 1258 int niocount, i, requested_nob, opc, rc; 1259 struct osc_brw_async_args *aa; 1260 struct req_capsule *pill; 1261 struct brw_page *pg_prev; 1262 1263 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ)) 1264 return -ENOMEM; /* Recoverable */ 1265 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2)) 1266 return -EINVAL; /* Fatal */ 1267 1268 if ((cmd & OBD_BRW_WRITE) != 0) { 1269 opc = OST_WRITE; 1270 req = ptlrpc_request_alloc_pool(cli->cl_import, 1271 cli->cl_import->imp_rq_pool, 1272 &RQF_OST_BRW_WRITE); 1273 } else { 1274 opc = OST_READ; 1275 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ); 1276 } 1277 if (req == NULL) 1278 return -ENOMEM; 1279 1280 for (niocount = i = 1; i < page_count; i++) { 1281 if (!can_merge_pages(pga[i - 1], pga[i])) 1282 niocount++; 1283 } 1284 1285 pill = &req->rq_pill; 1286 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT, 1287 sizeof(*ioobj)); 1288 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT, 1289 niocount * sizeof(*niobuf)); 1290 osc_set_capa_size(req, &RMF_CAPA1, ocapa); 1291 1292 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc); 1293 if (rc) { 1294 ptlrpc_request_free(req); 1295 return rc; 1296 } 1297 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */ 1298 ptlrpc_at_set_req_timeout(req); 1299 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own 1300 * retry logic */ 1301 req->rq_no_retry_einprogress = 1; 1302 1303 desc = ptlrpc_prep_bulk_imp(req, page_count, 1304 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS, 1305 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK, 1306 OST_BULK_PORTAL); 1307 1308 if (desc == NULL) { 1309 rc = -ENOMEM; 1310 goto out; 1311 } 1312 /* NB request now owns desc and will free it when it gets freed */ 1313 1314 body = req_capsule_client_get(pill, &RMF_OST_BODY); 1315 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ); 1316 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE); 1317 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL); 1318 1319 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa); 1320 1321 obdo_to_ioobj(oa, ioobj); 1322 ioobj->ioo_bufcnt = niocount; 1323 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks 1324 * that might be send for this request. The actual number is decided 1325 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends 1326 * "max - 1" for old client compatibility sending "0", and also so the 1327 * the actual maximum is a power-of-two number, not one less. LU-1431 */ 1328 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw); 1329 osc_pack_capa(req, body, ocapa); 1330 LASSERT(page_count > 0); 1331 pg_prev = pga[0]; 1332 for (requested_nob = i = 0; i < page_count; i++, niobuf++) { 1333 struct brw_page *pg = pga[i]; 1334 int poff = pg->off & ~CFS_PAGE_MASK; 1335 1336 LASSERT(pg->count > 0); 1337 /* make sure there is no gap in the middle of page array */ 1338 LASSERTF(page_count == 1 || 1339 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) && 1340 ergo(i > 0 && i < page_count - 1, 1341 poff == 0 && pg->count == PAGE_CACHE_SIZE) && 1342 ergo(i == page_count - 1, poff == 0)), 1343 "i: %d/%d pg: %p off: %llu, count: %u\n", 1344 i, page_count, pg, pg->off, pg->count); 1345 LASSERTF(i == 0 || pg->off > pg_prev->off, 1346 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu" 1347 " prev_pg %p [pri %lu ind %lu] off %llu\n", 1348 i, page_count, 1349 pg->pg, page_private(pg->pg), pg->pg->index, pg->off, 1350 pg_prev->pg, page_private(pg_prev->pg), 1351 pg_prev->pg->index, pg_prev->off); 1352 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) == 1353 (pg->flag & OBD_BRW_SRVLOCK)); 1354 1355 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count); 1356 requested_nob += pg->count; 1357 1358 if (i > 0 && can_merge_pages(pg_prev, pg)) { 1359 niobuf--; 1360 niobuf->len += pg->count; 1361 } else { 1362 niobuf->offset = pg->off; 1363 niobuf->len = pg->count; 1364 niobuf->flags = pg->flag; 1365 } 1366 pg_prev = pg; 1367 } 1368 1369 LASSERTF((void *)(niobuf - niocount) == 1370 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE), 1371 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill, 1372 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount)); 1373 1374 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0); 1375 if (resend) { 1376 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) { 1377 body->oa.o_valid |= OBD_MD_FLFLAGS; 1378 body->oa.o_flags = 0; 1379 } 1380 body->oa.o_flags |= OBD_FL_RECOV_RESEND; 1381 } 1382 1383 if (osc_should_shrink_grant(cli)) 1384 osc_shrink_grant_local(cli, &body->oa); 1385 1386 /* size[REQ_REC_OFF] still sizeof (*body) */ 1387 if (opc == OST_WRITE) { 1388 if (cli->cl_checksum && 1389 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) { 1390 /* store cl_cksum_type in a local variable since 1391 * it can be changed via lprocfs */ 1392 cksum_type_t cksum_type = cli->cl_cksum_type; 1393 1394 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) { 1395 oa->o_flags &= OBD_FL_LOCAL_MASK; 1396 body->oa.o_flags = 0; 1397 } 1398 body->oa.o_flags |= cksum_type_pack(cksum_type); 1399 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; 1400 body->oa.o_cksum = osc_checksum_bulk(requested_nob, 1401 page_count, pga, 1402 OST_WRITE, 1403 cksum_type); 1404 CDEBUG(D_PAGE, "checksum at write origin: %x\n", 1405 body->oa.o_cksum); 1406 /* save this in 'oa', too, for later checking */ 1407 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; 1408 oa->o_flags |= cksum_type_pack(cksum_type); 1409 } else { 1410 /* clear out the checksum flag, in case this is a 1411 * resend but cl_checksum is no longer set. b=11238 */ 1412 oa->o_valid &= ~OBD_MD_FLCKSUM; 1413 } 1414 oa->o_cksum = body->oa.o_cksum; 1415 /* 1 RC per niobuf */ 1416 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER, 1417 sizeof(__u32) * niocount); 1418 } else { 1419 if (cli->cl_checksum && 1420 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) { 1421 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) 1422 body->oa.o_flags = 0; 1423 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type); 1424 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; 1425 } 1426 } 1427 ptlrpc_request_set_replen(req); 1428 1429 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); 1430 aa = ptlrpc_req_async_args(req); 1431 aa->aa_oa = oa; 1432 aa->aa_requested_nob = requested_nob; 1433 aa->aa_nio_count = niocount; 1434 aa->aa_page_count = page_count; 1435 aa->aa_resends = 0; 1436 aa->aa_ppga = pga; 1437 aa->aa_cli = cli; 1438 INIT_LIST_HEAD(&aa->aa_oaps); 1439 if (ocapa && reserve) 1440 aa->aa_ocapa = capa_get(ocapa); 1441 1442 *reqp = req; 1443 return 0; 1444 1445 out: 1446 ptlrpc_req_finished(req); 1447 return rc; 1448} 1449 1450static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer, 1451 __u32 client_cksum, __u32 server_cksum, int nob, 1452 u32 page_count, struct brw_page **pga, 1453 cksum_type_t client_cksum_type) 1454{ 1455 __u32 new_cksum; 1456 char *msg; 1457 cksum_type_t cksum_type; 1458 1459 if (server_cksum == client_cksum) { 1460 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum); 1461 return 0; 1462 } 1463 1464 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ? 1465 oa->o_flags : 0); 1466 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE, 1467 cksum_type); 1468 1469 if (cksum_type != client_cksum_type) 1470 msg = "the server did not use the checksum type specified in " 1471 "the original request - likely a protocol problem"; 1472 else if (new_cksum == server_cksum) 1473 msg = "changed on the client after we checksummed it - " 1474 "likely false positive due to mmap IO (bug 11742)"; 1475 else if (new_cksum == client_cksum) 1476 msg = "changed in transit before arrival at OST"; 1477 else 1478 msg = "changed in transit AND doesn't match the original - " 1479 "likely false positive due to mmap IO (bug 11742)"; 1480 1481 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID 1482 " object "DOSTID" extent [%llu-%llu]\n", 1483 msg, libcfs_nid2str(peer->nid), 1484 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0, 1485 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0, 1486 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0, 1487 POSTID(&oa->o_oi), pga[0]->off, 1488 pga[page_count-1]->off + pga[page_count-1]->count - 1); 1489 CERROR("original client csum %x (type %x), server csum %x (type %x), " 1490 "client csum now %x\n", client_cksum, client_cksum_type, 1491 server_cksum, cksum_type, new_cksum); 1492 return 1; 1493} 1494 1495/* Note rc enters this function as number of bytes transferred */ 1496static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) 1497{ 1498 struct osc_brw_async_args *aa = (void *)&req->rq_async_args; 1499 const lnet_process_id_t *peer = 1500 &req->rq_import->imp_connection->c_peer; 1501 struct client_obd *cli = aa->aa_cli; 1502 struct ost_body *body; 1503 __u32 client_cksum = 0; 1504 1505 if (rc < 0 && rc != -EDQUOT) { 1506 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc); 1507 return rc; 1508 } 1509 1510 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc); 1511 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY); 1512 if (body == NULL) { 1513 DEBUG_REQ(D_INFO, req, "Can't unpack body\n"); 1514 return -EPROTO; 1515 } 1516 1517 /* set/clear over quota flag for a uid/gid */ 1518 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && 1519 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) { 1520 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid }; 1521 1522 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n", 1523 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid, 1524 body->oa.o_flags); 1525 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags); 1526 } 1527 1528 osc_update_grant(cli, body); 1529 1530 if (rc < 0) 1531 return rc; 1532 1533 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM) 1534 client_cksum = aa->aa_oa->o_cksum; /* save for later */ 1535 1536 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) { 1537 if (rc > 0) { 1538 CERROR("Unexpected +ve rc %d\n", rc); 1539 return -EPROTO; 1540 } 1541 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob); 1542 1543 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk)) 1544 return -EAGAIN; 1545 1546 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum && 1547 check_write_checksum(&body->oa, peer, client_cksum, 1548 body->oa.o_cksum, aa->aa_requested_nob, 1549 aa->aa_page_count, aa->aa_ppga, 1550 cksum_type_unpack(aa->aa_oa->o_flags))) 1551 return -EAGAIN; 1552 1553 rc = check_write_rcs(req, aa->aa_requested_nob, 1554 aa->aa_nio_count, 1555 aa->aa_page_count, aa->aa_ppga); 1556 goto out; 1557 } 1558 1559 /* The rest of this function executes only for OST_READs */ 1560 1561 /* if unwrap_bulk failed, return -EAGAIN to retry */ 1562 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc); 1563 if (rc < 0) { 1564 rc = -EAGAIN; 1565 goto out; 1566 } 1567 1568 if (rc > aa->aa_requested_nob) { 1569 CERROR("Unexpected rc %d (%d requested)\n", rc, 1570 aa->aa_requested_nob); 1571 return -EPROTO; 1572 } 1573 1574 if (rc != req->rq_bulk->bd_nob_transferred) { 1575 CERROR ("Unexpected rc %d (%d transferred)\n", 1576 rc, req->rq_bulk->bd_nob_transferred); 1577 return -EPROTO; 1578 } 1579 1580 if (rc < aa->aa_requested_nob) 1581 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga); 1582 1583 if (body->oa.o_valid & OBD_MD_FLCKSUM) { 1584 static int cksum_counter; 1585 __u32 server_cksum = body->oa.o_cksum; 1586 char *via; 1587 char *router; 1588 cksum_type_t cksum_type; 1589 1590 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS? 1591 body->oa.o_flags : 0); 1592 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count, 1593 aa->aa_ppga, OST_READ, 1594 cksum_type); 1595 1596 if (peer->nid == req->rq_bulk->bd_sender) { 1597 via = router = ""; 1598 } else { 1599 via = " via "; 1600 router = libcfs_nid2str(req->rq_bulk->bd_sender); 1601 } 1602 1603 if (server_cksum != client_cksum) { 1604 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from " 1605 "%s%s%s inode "DFID" object "DOSTID 1606 " extent [%llu-%llu]\n", 1607 req->rq_import->imp_obd->obd_name, 1608 libcfs_nid2str(peer->nid), 1609 via, router, 1610 body->oa.o_valid & OBD_MD_FLFID ? 1611 body->oa.o_parent_seq : (__u64)0, 1612 body->oa.o_valid & OBD_MD_FLFID ? 1613 body->oa.o_parent_oid : 0, 1614 body->oa.o_valid & OBD_MD_FLFID ? 1615 body->oa.o_parent_ver : 0, 1616 POSTID(&body->oa.o_oi), 1617 aa->aa_ppga[0]->off, 1618 aa->aa_ppga[aa->aa_page_count-1]->off + 1619 aa->aa_ppga[aa->aa_page_count-1]->count - 1620 1); 1621 CERROR("client %x, server %x, cksum_type %x\n", 1622 client_cksum, server_cksum, cksum_type); 1623 cksum_counter = 0; 1624 aa->aa_oa->o_cksum = client_cksum; 1625 rc = -EAGAIN; 1626 } else { 1627 cksum_counter++; 1628 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum); 1629 rc = 0; 1630 } 1631 } else if (unlikely(client_cksum)) { 1632 static int cksum_missed; 1633 1634 cksum_missed++; 1635 if ((cksum_missed & (-cksum_missed)) == cksum_missed) 1636 CERROR("Checksum %u requested from %s but not sent\n", 1637 cksum_missed, libcfs_nid2str(peer->nid)); 1638 } else { 1639 rc = 0; 1640 } 1641out: 1642 if (rc >= 0) 1643 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, 1644 aa->aa_oa, &body->oa); 1645 1646 return rc; 1647} 1648 1649static int osc_brw_redo_request(struct ptlrpc_request *request, 1650 struct osc_brw_async_args *aa, int rc) 1651{ 1652 struct ptlrpc_request *new_req; 1653 struct osc_brw_async_args *new_aa; 1654 struct osc_async_page *oap; 1655 1656 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request, 1657 "redo for recoverable error %d", rc); 1658 1659 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) == 1660 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ, 1661 aa->aa_cli, aa->aa_oa, 1662 NULL /* lsm unused by osc currently */, 1663 aa->aa_page_count, aa->aa_ppga, 1664 &new_req, aa->aa_ocapa, 0, 1); 1665 if (rc) 1666 return rc; 1667 1668 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) { 1669 if (oap->oap_request != NULL) { 1670 LASSERTF(request == oap->oap_request, 1671 "request %p != oap_request %p\n", 1672 request, oap->oap_request); 1673 if (oap->oap_interrupted) { 1674 ptlrpc_req_finished(new_req); 1675 return -EINTR; 1676 } 1677 } 1678 } 1679 /* New request takes over pga and oaps from old request. 1680 * Note that copying a list_head doesn't work, need to move it... */ 1681 aa->aa_resends++; 1682 new_req->rq_interpret_reply = request->rq_interpret_reply; 1683 new_req->rq_async_args = request->rq_async_args; 1684 /* cap resend delay to the current request timeout, this is similar to 1685 * what ptlrpc does (see after_reply()) */ 1686 if (aa->aa_resends > new_req->rq_timeout) 1687 new_req->rq_sent = get_seconds() + new_req->rq_timeout; 1688 else 1689 new_req->rq_sent = get_seconds() + aa->aa_resends; 1690 new_req->rq_generation_set = 1; 1691 new_req->rq_import_generation = request->rq_import_generation; 1692 1693 new_aa = ptlrpc_req_async_args(new_req); 1694 1695 INIT_LIST_HEAD(&new_aa->aa_oaps); 1696 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps); 1697 INIT_LIST_HEAD(&new_aa->aa_exts); 1698 list_splice_init(&aa->aa_exts, &new_aa->aa_exts); 1699 new_aa->aa_resends = aa->aa_resends; 1700 1701 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) { 1702 if (oap->oap_request) { 1703 ptlrpc_req_finished(oap->oap_request); 1704 oap->oap_request = ptlrpc_request_addref(new_req); 1705 } 1706 } 1707 1708 new_aa->aa_ocapa = aa->aa_ocapa; 1709 aa->aa_ocapa = NULL; 1710 1711 /* XXX: This code will run into problem if we're going to support 1712 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set 1713 * and wait for all of them to be finished. We should inherit request 1714 * set from old request. */ 1715 ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1); 1716 1717 DEBUG_REQ(D_INFO, new_req, "new request"); 1718 return 0; 1719} 1720 1721/* 1722 * ugh, we want disk allocation on the target to happen in offset order. we'll 1723 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do 1724 * fine for our small page arrays and doesn't require allocation. its an 1725 * insertion sort that swaps elements that are strides apart, shrinking the 1726 * stride down until its '1' and the array is sorted. 1727 */ 1728static void sort_brw_pages(struct brw_page **array, int num) 1729{ 1730 int stride, i, j; 1731 struct brw_page *tmp; 1732 1733 if (num == 1) 1734 return; 1735 for (stride = 1; stride < num ; stride = (stride * 3) + 1) 1736 ; 1737 1738 do { 1739 stride /= 3; 1740 for (i = stride ; i < num ; i++) { 1741 tmp = array[i]; 1742 j = i; 1743 while (j >= stride && array[j - stride]->off > tmp->off) { 1744 array[j] = array[j - stride]; 1745 j -= stride; 1746 } 1747 array[j] = tmp; 1748 } 1749 } while (stride > 1); 1750} 1751 1752static void osc_release_ppga(struct brw_page **ppga, u32 count) 1753{ 1754 LASSERT(ppga != NULL); 1755 OBD_FREE(ppga, sizeof(*ppga) * count); 1756} 1757 1758static int brw_interpret(const struct lu_env *env, 1759 struct ptlrpc_request *req, void *data, int rc) 1760{ 1761 struct osc_brw_async_args *aa = data; 1762 struct osc_extent *ext; 1763 struct osc_extent *tmp; 1764 struct cl_object *obj = NULL; 1765 struct client_obd *cli = aa->aa_cli; 1766 1767 rc = osc_brw_fini_request(req, rc); 1768 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc); 1769 /* When server return -EINPROGRESS, client should always retry 1770 * regardless of the number of times the bulk was resent already. */ 1771 if (osc_recoverable_error(rc)) { 1772 if (req->rq_import_generation != 1773 req->rq_import->imp_generation) { 1774 CDEBUG(D_HA, "%s: resend cross eviction for object: " 1775 ""DOSTID", rc = %d.\n", 1776 req->rq_import->imp_obd->obd_name, 1777 POSTID(&aa->aa_oa->o_oi), rc); 1778 } else if (rc == -EINPROGRESS || 1779 client_should_resend(aa->aa_resends, aa->aa_cli)) { 1780 rc = osc_brw_redo_request(req, aa, rc); 1781 } else { 1782 CERROR("%s: too many resent retries for object: %llu:%llu, rc = %d.\n", 1783 req->rq_import->imp_obd->obd_name, 1784 POSTID(&aa->aa_oa->o_oi), rc); 1785 } 1786 1787 if (rc == 0) 1788 return 0; 1789 else if (rc == -EAGAIN || rc == -EINPROGRESS) 1790 rc = -EIO; 1791 } 1792 1793 if (aa->aa_ocapa) { 1794 capa_put(aa->aa_ocapa); 1795 aa->aa_ocapa = NULL; 1796 } 1797 1798 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) { 1799 if (obj == NULL && rc == 0) { 1800 obj = osc2cl(ext->oe_obj); 1801 cl_object_get(obj); 1802 } 1803 1804 list_del_init(&ext->oe_link); 1805 osc_extent_finish(env, ext, 1, rc); 1806 } 1807 LASSERT(list_empty(&aa->aa_exts)); 1808 LASSERT(list_empty(&aa->aa_oaps)); 1809 1810 if (obj != NULL) { 1811 struct obdo *oa = aa->aa_oa; 1812 struct cl_attr *attr = &osc_env_info(env)->oti_attr; 1813 unsigned long valid = 0; 1814 1815 LASSERT(rc == 0); 1816 if (oa->o_valid & OBD_MD_FLBLOCKS) { 1817 attr->cat_blocks = oa->o_blocks; 1818 valid |= CAT_BLOCKS; 1819 } 1820 if (oa->o_valid & OBD_MD_FLMTIME) { 1821 attr->cat_mtime = oa->o_mtime; 1822 valid |= CAT_MTIME; 1823 } 1824 if (oa->o_valid & OBD_MD_FLATIME) { 1825 attr->cat_atime = oa->o_atime; 1826 valid |= CAT_ATIME; 1827 } 1828 if (oa->o_valid & OBD_MD_FLCTIME) { 1829 attr->cat_ctime = oa->o_ctime; 1830 valid |= CAT_CTIME; 1831 } 1832 if (valid != 0) { 1833 cl_object_attr_lock(obj); 1834 cl_object_attr_set(env, obj, attr, valid); 1835 cl_object_attr_unlock(obj); 1836 } 1837 cl_object_put(env, obj); 1838 } 1839 OBDO_FREE(aa->aa_oa); 1840 1841 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc : 1842 req->rq_bulk->bd_nob_transferred); 1843 osc_release_ppga(aa->aa_ppga, aa->aa_page_count); 1844 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred); 1845 1846 client_obd_list_lock(&cli->cl_loi_list_lock); 1847 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters 1848 * is called so we know whether to go to sync BRWs or wait for more 1849 * RPCs to complete */ 1850 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) 1851 cli->cl_w_in_flight--; 1852 else 1853 cli->cl_r_in_flight--; 1854 osc_wake_cache_waiters(cli); 1855 client_obd_list_unlock(&cli->cl_loi_list_lock); 1856 1857 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME); 1858 return rc; 1859} 1860 1861/** 1862 * Build an RPC by the list of extent @ext_list. The caller must ensure 1863 * that the total pages in this list are NOT over max pages per RPC. 1864 * Extents in the list must be in OES_RPC state. 1865 */ 1866int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, 1867 struct list_head *ext_list, int cmd, pdl_policy_t pol) 1868{ 1869 struct ptlrpc_request *req = NULL; 1870 struct osc_extent *ext; 1871 struct brw_page **pga = NULL; 1872 struct osc_brw_async_args *aa = NULL; 1873 struct obdo *oa = NULL; 1874 struct osc_async_page *oap; 1875 struct osc_async_page *tmp; 1876 struct cl_req *clerq = NULL; 1877 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : 1878 CRT_READ; 1879 struct ldlm_lock *lock = NULL; 1880 struct cl_req_attr *crattr = NULL; 1881 u64 starting_offset = OBD_OBJECT_EOF; 1882 u64 ending_offset = 0; 1883 int mpflag = 0; 1884 int mem_tight = 0; 1885 int page_count = 0; 1886 int i; 1887 int rc; 1888 LIST_HEAD(rpc_list); 1889 1890 LASSERT(!list_empty(ext_list)); 1891 1892 /* add pages into rpc_list to build BRW rpc */ 1893 list_for_each_entry(ext, ext_list, oe_link) { 1894 LASSERT(ext->oe_state == OES_RPC); 1895 mem_tight |= ext->oe_memalloc; 1896 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) { 1897 ++page_count; 1898 list_add_tail(&oap->oap_rpc_item, &rpc_list); 1899 if (starting_offset > oap->oap_obj_off) 1900 starting_offset = oap->oap_obj_off; 1901 else 1902 LASSERT(oap->oap_page_off == 0); 1903 if (ending_offset < oap->oap_obj_off + oap->oap_count) 1904 ending_offset = oap->oap_obj_off + 1905 oap->oap_count; 1906 else 1907 LASSERT(oap->oap_page_off + oap->oap_count == 1908 PAGE_CACHE_SIZE); 1909 } 1910 } 1911 1912 if (mem_tight) 1913 mpflag = cfs_memory_pressure_get_and_set(); 1914 1915 OBD_ALLOC(crattr, sizeof(*crattr)); 1916 if (crattr == NULL) { 1917 rc = -ENOMEM; 1918 goto out; 1919 } 1920 1921 OBD_ALLOC(pga, sizeof(*pga) * page_count); 1922 if (pga == NULL) { 1923 rc = -ENOMEM; 1924 goto out; 1925 } 1926 1927 OBDO_ALLOC(oa); 1928 if (oa == NULL) { 1929 rc = -ENOMEM; 1930 goto out; 1931 } 1932 1933 i = 0; 1934 list_for_each_entry(oap, &rpc_list, oap_rpc_item) { 1935 struct cl_page *page = oap2cl_page(oap); 1936 if (clerq == NULL) { 1937 clerq = cl_req_alloc(env, page, crt, 1938 1 /* only 1-object rpcs for now */); 1939 if (IS_ERR(clerq)) { 1940 rc = PTR_ERR(clerq); 1941 goto out; 1942 } 1943 lock = oap->oap_ldlm_lock; 1944 } 1945 if (mem_tight) 1946 oap->oap_brw_flags |= OBD_BRW_MEMALLOC; 1947 pga[i] = &oap->oap_brw_page; 1948 pga[i]->off = oap->oap_obj_off + oap->oap_page_off; 1949 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n", 1950 pga[i]->pg, page_index(oap->oap_page), oap, 1951 pga[i]->flag); 1952 i++; 1953 cl_req_page_add(env, clerq, page); 1954 } 1955 1956 /* always get the data for the obdo for the rpc */ 1957 LASSERT(clerq != NULL); 1958 crattr->cra_oa = oa; 1959 cl_req_attr_set(env, clerq, crattr, ~0ULL); 1960 if (lock) { 1961 oa->o_handle = lock->l_remote_handle; 1962 oa->o_valid |= OBD_MD_FLHANDLE; 1963 } 1964 1965 rc = cl_req_prep(env, clerq); 1966 if (rc != 0) { 1967 CERROR("cl_req_prep failed: %d\n", rc); 1968 goto out; 1969 } 1970 1971 sort_brw_pages(pga, page_count); 1972 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, 1973 pga, &req, crattr->cra_capa, 1, 0); 1974 if (rc != 0) { 1975 CERROR("prep_req failed: %d\n", rc); 1976 goto out; 1977 } 1978 1979 req->rq_interpret_reply = brw_interpret; 1980 1981 if (mem_tight != 0) 1982 req->rq_memalloc = 1; 1983 1984 /* Need to update the timestamps after the request is built in case 1985 * we race with setattr (locally or in queue at OST). If OST gets 1986 * later setattr before earlier BRW (as determined by the request xid), 1987 * the OST will not use BRW timestamps. Sadly, there is no obvious 1988 * way to do this in a single call. bug 10150 */ 1989 cl_req_attr_set(env, clerq, crattr, 1990 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME); 1991 1992 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid); 1993 1994 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); 1995 aa = ptlrpc_req_async_args(req); 1996 INIT_LIST_HEAD(&aa->aa_oaps); 1997 list_splice_init(&rpc_list, &aa->aa_oaps); 1998 INIT_LIST_HEAD(&aa->aa_exts); 1999 list_splice_init(ext_list, &aa->aa_exts); 2000 aa->aa_clerq = clerq; 2001 2002 /* queued sync pages can be torn down while the pages 2003 * were between the pending list and the rpc */ 2004 tmp = NULL; 2005 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) { 2006 /* only one oap gets a request reference */ 2007 if (tmp == NULL) 2008 tmp = oap; 2009 if (oap->oap_interrupted && !req->rq_intr) { 2010 CDEBUG(D_INODE, "oap %p in req %p interrupted\n", 2011 oap, req); 2012 ptlrpc_mark_interrupted(req); 2013 } 2014 } 2015 if (tmp != NULL) 2016 tmp->oap_request = ptlrpc_request_addref(req); 2017 2018 client_obd_list_lock(&cli->cl_loi_list_lock); 2019 starting_offset >>= PAGE_CACHE_SHIFT; 2020 if (cmd == OBD_BRW_READ) { 2021 cli->cl_r_in_flight++; 2022 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count); 2023 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight); 2024 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist, 2025 starting_offset + 1); 2026 } else { 2027 cli->cl_w_in_flight++; 2028 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count); 2029 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight); 2030 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist, 2031 starting_offset + 1); 2032 } 2033 client_obd_list_unlock(&cli->cl_loi_list_lock); 2034 2035 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight", 2036 page_count, aa, cli->cl_r_in_flight, 2037 cli->cl_w_in_flight); 2038 2039 /* XXX: Maybe the caller can check the RPC bulk descriptor to 2040 * see which CPU/NUMA node the majority of pages were allocated 2041 * on, and try to assign the async RPC to the CPU core 2042 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic. 2043 * 2044 * But on the other hand, we expect that multiple ptlrpcd 2045 * threads and the initial write sponsor can run in parallel, 2046 * especially when data checksum is enabled, which is CPU-bound 2047 * operation and single ptlrpcd thread cannot process in time. 2048 * So more ptlrpcd threads sharing BRW load 2049 * (with PDL_POLICY_ROUND) seems better. 2050 */ 2051 ptlrpcd_add_req(req, pol, -1); 2052 rc = 0; 2053 2054out: 2055 if (mem_tight != 0) 2056 cfs_memory_pressure_restore(mpflag); 2057 2058 if (crattr != NULL) { 2059 capa_put(crattr->cra_capa); 2060 OBD_FREE(crattr, sizeof(*crattr)); 2061 } 2062 2063 if (rc != 0) { 2064 LASSERT(req == NULL); 2065 2066 if (oa) 2067 OBDO_FREE(oa); 2068 if (pga) 2069 OBD_FREE(pga, sizeof(*pga) * page_count); 2070 /* this should happen rarely and is pretty bad, it makes the 2071 * pending list not follow the dirty order */ 2072 while (!list_empty(ext_list)) { 2073 ext = list_entry(ext_list->next, struct osc_extent, 2074 oe_link); 2075 list_del_init(&ext->oe_link); 2076 osc_extent_finish(env, ext, 0, rc); 2077 } 2078 if (clerq && !IS_ERR(clerq)) 2079 cl_req_completion(env, clerq, rc); 2080 } 2081 return rc; 2082} 2083 2084static int osc_set_lock_data_with_check(struct ldlm_lock *lock, 2085 struct ldlm_enqueue_info *einfo) 2086{ 2087 void *data = einfo->ei_cbdata; 2088 int set = 0; 2089 2090 LASSERT(lock != NULL); 2091 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl); 2092 LASSERT(lock->l_resource->lr_type == einfo->ei_type); 2093 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp); 2094 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl); 2095 2096 lock_res_and_lock(lock); 2097 spin_lock(&osc_ast_guard); 2098 2099 if (lock->l_ast_data == NULL) 2100 lock->l_ast_data = data; 2101 if (lock->l_ast_data == data) 2102 set = 1; 2103 2104 spin_unlock(&osc_ast_guard); 2105 unlock_res_and_lock(lock); 2106 2107 return set; 2108} 2109 2110static int osc_set_data_with_check(struct lustre_handle *lockh, 2111 struct ldlm_enqueue_info *einfo) 2112{ 2113 struct ldlm_lock *lock = ldlm_handle2lock(lockh); 2114 int set = 0; 2115 2116 if (lock != NULL) { 2117 set = osc_set_lock_data_with_check(lock, einfo); 2118 LDLM_LOCK_PUT(lock); 2119 } else 2120 CERROR("lockh %p, data %p - client evicted?\n", 2121 lockh, einfo->ei_cbdata); 2122 return set; 2123} 2124 2125/* find any ldlm lock of the inode in osc 2126 * return 0 not find 2127 * 1 find one 2128 * < 0 error */ 2129static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, 2130 ldlm_iterator_t replace, void *data) 2131{ 2132 struct ldlm_res_id res_id; 2133 struct obd_device *obd = class_exp2obd(exp); 2134 int rc = 0; 2135 2136 ostid_build_res_name(&lsm->lsm_oi, &res_id); 2137 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data); 2138 if (rc == LDLM_ITER_STOP) 2139 return 1; 2140 if (rc == LDLM_ITER_CONTINUE) 2141 return 0; 2142 return rc; 2143} 2144 2145static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb, 2146 obd_enqueue_update_f upcall, void *cookie, 2147 __u64 *flags, int agl, int rc) 2148{ 2149 int intent = *flags & LDLM_FL_HAS_INTENT; 2150 2151 if (intent) { 2152 /* The request was created before ldlm_cli_enqueue call. */ 2153 if (rc == ELDLM_LOCK_ABORTED) { 2154 struct ldlm_reply *rep; 2155 rep = req_capsule_server_get(&req->rq_pill, 2156 &RMF_DLM_REP); 2157 2158 LASSERT(rep != NULL); 2159 rep->lock_policy_res1 = 2160 ptlrpc_status_ntoh(rep->lock_policy_res1); 2161 if (rep->lock_policy_res1) 2162 rc = rep->lock_policy_res1; 2163 } 2164 } 2165 2166 if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) || 2167 (rc == 0)) { 2168 *flags |= LDLM_FL_LVB_READY; 2169 CDEBUG(D_INODE, "got kms %llu blocks %llu mtime %llu\n", 2170 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime); 2171 } 2172 2173 /* Call the update callback. */ 2174 rc = (*upcall)(cookie, rc); 2175 return rc; 2176} 2177 2178static int osc_enqueue_interpret(const struct lu_env *env, 2179 struct ptlrpc_request *req, 2180 struct osc_enqueue_args *aa, int rc) 2181{ 2182 struct ldlm_lock *lock; 2183 struct lustre_handle handle; 2184 __u32 mode; 2185 struct ost_lvb *lvb; 2186 __u32 lvb_len; 2187 __u64 *flags = aa->oa_flags; 2188 2189 /* Make a local copy of a lock handle and a mode, because aa->oa_* 2190 * might be freed anytime after lock upcall has been called. */ 2191 lustre_handle_copy(&handle, aa->oa_lockh); 2192 mode = aa->oa_ei->ei_mode; 2193 2194 /* ldlm_cli_enqueue is holding a reference on the lock, so it must 2195 * be valid. */ 2196 lock = ldlm_handle2lock(&handle); 2197 2198 /* Take an additional reference so that a blocking AST that 2199 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed 2200 * to arrive after an upcall has been executed by 2201 * osc_enqueue_fini(). */ 2202 ldlm_lock_addref(&handle, mode); 2203 2204 /* Let CP AST to grant the lock first. */ 2205 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1); 2206 2207 if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) { 2208 lvb = NULL; 2209 lvb_len = 0; 2210 } else { 2211 lvb = aa->oa_lvb; 2212 lvb_len = sizeof(*aa->oa_lvb); 2213 } 2214 2215 /* Complete obtaining the lock procedure. */ 2216 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1, 2217 mode, flags, lvb, lvb_len, &handle, rc); 2218 /* Complete osc stuff. */ 2219 rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie, 2220 flags, aa->oa_agl, rc); 2221 2222 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10); 2223 2224 /* Release the lock for async request. */ 2225 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK) 2226 /* 2227 * Releases a reference taken by ldlm_cli_enqueue(), if it is 2228 * not already released by 2229 * ldlm_cli_enqueue_fini()->failed_lock_cleanup() 2230 */ 2231 ldlm_lock_decref(&handle, mode); 2232 2233 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n", 2234 aa->oa_lockh, req, aa); 2235 ldlm_lock_decref(&handle, mode); 2236 LDLM_LOCK_PUT(lock); 2237 return rc; 2238} 2239 2240struct ptlrpc_request_set *PTLRPCD_SET = (void *)1; 2241 2242/* When enqueuing asynchronously, locks are not ordered, we can obtain a lock 2243 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with 2244 * other synchronous requests, however keeping some locks and trying to obtain 2245 * others may take a considerable amount of time in a case of ost failure; and 2246 * when other sync requests do not get released lock from a client, the client 2247 * is excluded from the cluster -- such scenarious make the life difficult, so 2248 * release locks just after they are obtained. */ 2249int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, 2250 __u64 *flags, ldlm_policy_data_t *policy, 2251 struct ost_lvb *lvb, int kms_valid, 2252 obd_enqueue_update_f upcall, void *cookie, 2253 struct ldlm_enqueue_info *einfo, 2254 struct lustre_handle *lockh, 2255 struct ptlrpc_request_set *rqset, int async, int agl) 2256{ 2257 struct obd_device *obd = exp->exp_obd; 2258 struct ptlrpc_request *req = NULL; 2259 int intent = *flags & LDLM_FL_HAS_INTENT; 2260 __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY); 2261 ldlm_mode_t mode; 2262 int rc; 2263 2264 /* Filesystem lock extents are extended to page boundaries so that 2265 * dealing with the page cache is a little smoother. */ 2266 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK; 2267 policy->l_extent.end |= ~CFS_PAGE_MASK; 2268 2269 /* 2270 * kms is not valid when either object is completely fresh (so that no 2271 * locks are cached), or object was evicted. In the latter case cached 2272 * lock cannot be used, because it would prime inode state with 2273 * potentially stale LVB. 2274 */ 2275 if (!kms_valid) 2276 goto no_match; 2277 2278 /* Next, search for already existing extent locks that will cover us */ 2279 /* If we're trying to read, we also search for an existing PW lock. The 2280 * VFS and page cache already protect us locally, so lots of readers/ 2281 * writers can share a single PW lock. 2282 * 2283 * There are problems with conversion deadlocks, so instead of 2284 * converting a read lock to a write lock, we'll just enqueue a new 2285 * one. 2286 * 2287 * At some point we should cancel the read lock instead of making them 2288 * send us a blocking callback, but there are problems with canceling 2289 * locks out from other users right now, too. */ 2290 mode = einfo->ei_mode; 2291 if (einfo->ei_mode == LCK_PR) 2292 mode |= LCK_PW; 2293 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id, 2294 einfo->ei_type, policy, mode, lockh, 0); 2295 if (mode) { 2296 struct ldlm_lock *matched = ldlm_handle2lock(lockh); 2297 2298 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) { 2299 /* For AGL, if enqueue RPC is sent but the lock is not 2300 * granted, then skip to process this strpe. 2301 * Return -ECANCELED to tell the caller. */ 2302 ldlm_lock_decref(lockh, mode); 2303 LDLM_LOCK_PUT(matched); 2304 return -ECANCELED; 2305 } else if (osc_set_lock_data_with_check(matched, einfo)) { 2306 *flags |= LDLM_FL_LVB_READY; 2307 /* addref the lock only if not async requests and PW 2308 * lock is matched whereas we asked for PR. */ 2309 if (!rqset && einfo->ei_mode != mode) 2310 ldlm_lock_addref(lockh, LCK_PR); 2311 if (intent) { 2312 /* I would like to be able to ASSERT here that 2313 * rss <= kms, but I can't, for reasons which 2314 * are explained in lov_enqueue() */ 2315 } 2316 2317 /* We already have a lock, and it's referenced. 2318 * 2319 * At this point, the cl_lock::cll_state is CLS_QUEUING, 2320 * AGL upcall may change it to CLS_HELD directly. */ 2321 (*upcall)(cookie, ELDLM_OK); 2322 2323 if (einfo->ei_mode != mode) 2324 ldlm_lock_decref(lockh, LCK_PW); 2325 else if (rqset) 2326 /* For async requests, decref the lock. */ 2327 ldlm_lock_decref(lockh, einfo->ei_mode); 2328 LDLM_LOCK_PUT(matched); 2329 return ELDLM_OK; 2330 } else { 2331 ldlm_lock_decref(lockh, mode); 2332 LDLM_LOCK_PUT(matched); 2333 } 2334 } 2335 2336 no_match: 2337 if (intent) { 2338 LIST_HEAD(cancels); 2339 req = ptlrpc_request_alloc(class_exp2cliimp(exp), 2340 &RQF_LDLM_ENQUEUE_LVB); 2341 if (req == NULL) 2342 return -ENOMEM; 2343 2344 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0); 2345 if (rc) { 2346 ptlrpc_request_free(req); 2347 return rc; 2348 } 2349 2350 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, 2351 sizeof(*lvb)); 2352 ptlrpc_request_set_replen(req); 2353 } 2354 2355 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */ 2356 *flags &= ~LDLM_FL_BLOCK_GRANTED; 2357 2358 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb, 2359 sizeof(*lvb), LVB_T_OST, lockh, async); 2360 if (rqset) { 2361 if (!rc) { 2362 struct osc_enqueue_args *aa; 2363 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args)); 2364 aa = ptlrpc_req_async_args(req); 2365 aa->oa_ei = einfo; 2366 aa->oa_exp = exp; 2367 aa->oa_flags = flags; 2368 aa->oa_upcall = upcall; 2369 aa->oa_cookie = cookie; 2370 aa->oa_lvb = lvb; 2371 aa->oa_lockh = lockh; 2372 aa->oa_agl = !!agl; 2373 2374 req->rq_interpret_reply = 2375 (ptlrpc_interpterer_t)osc_enqueue_interpret; 2376 if (rqset == PTLRPCD_SET) 2377 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); 2378 else 2379 ptlrpc_set_add_req(rqset, req); 2380 } else if (intent) { 2381 ptlrpc_req_finished(req); 2382 } 2383 return rc; 2384 } 2385 2386 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc); 2387 if (intent) 2388 ptlrpc_req_finished(req); 2389 2390 return rc; 2391} 2392 2393int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id, 2394 __u32 type, ldlm_policy_data_t *policy, __u32 mode, 2395 __u64 *flags, void *data, struct lustre_handle *lockh, 2396 int unref) 2397{ 2398 struct obd_device *obd = exp->exp_obd; 2399 __u64 lflags = *flags; 2400 ldlm_mode_t rc; 2401 2402 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH)) 2403 return -EIO; 2404 2405 /* Filesystem lock extents are extended to page boundaries so that 2406 * dealing with the page cache is a little smoother */ 2407 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK; 2408 policy->l_extent.end |= ~CFS_PAGE_MASK; 2409 2410 /* Next, search for already existing extent locks that will cover us */ 2411 /* If we're trying to read, we also search for an existing PW lock. The 2412 * VFS and page cache already protect us locally, so lots of readers/ 2413 * writers can share a single PW lock. */ 2414 rc = mode; 2415 if (mode == LCK_PR) 2416 rc |= LCK_PW; 2417 rc = ldlm_lock_match(obd->obd_namespace, lflags, 2418 res_id, type, policy, rc, lockh, unref); 2419 if (rc) { 2420 if (data != NULL) { 2421 if (!osc_set_data_with_check(lockh, data)) { 2422 if (!(lflags & LDLM_FL_TEST_LOCK)) 2423 ldlm_lock_decref(lockh, rc); 2424 return 0; 2425 } 2426 } 2427 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) { 2428 ldlm_lock_addref(lockh, LCK_PR); 2429 ldlm_lock_decref(lockh, LCK_PW); 2430 } 2431 return rc; 2432 } 2433 return rc; 2434} 2435 2436int osc_cancel_base(struct lustre_handle *lockh, __u32 mode) 2437{ 2438 if (unlikely(mode == LCK_GROUP)) 2439 ldlm_lock_decref_and_cancel(lockh, mode); 2440 else 2441 ldlm_lock_decref(lockh, mode); 2442 2443 return 0; 2444} 2445 2446static int osc_statfs_interpret(const struct lu_env *env, 2447 struct ptlrpc_request *req, 2448 struct osc_async_args *aa, int rc) 2449{ 2450 struct obd_statfs *msfs; 2451 2452 if (rc == -EBADR) 2453 /* The request has in fact never been sent 2454 * due to issues at a higher level (LOV). 2455 * Exit immediately since the caller is 2456 * aware of the problem and takes care 2457 * of the clean up */ 2458 return rc; 2459 2460 if ((rc == -ENOTCONN || rc == -EAGAIN) && 2461 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY)) { 2462 rc = 0; 2463 goto out; 2464 } 2465 2466 if (rc != 0) 2467 goto out; 2468 2469 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS); 2470 if (msfs == NULL) { 2471 rc = -EPROTO; 2472 goto out; 2473 } 2474 2475 *aa->aa_oi->oi_osfs = *msfs; 2476out: 2477 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc); 2478 return rc; 2479} 2480 2481static int osc_statfs_async(struct obd_export *exp, 2482 struct obd_info *oinfo, __u64 max_age, 2483 struct ptlrpc_request_set *rqset) 2484{ 2485 struct obd_device *obd = class_exp2obd(exp); 2486 struct ptlrpc_request *req; 2487 struct osc_async_args *aa; 2488 int rc; 2489 2490 /* We could possibly pass max_age in the request (as an absolute 2491 * timestamp or a "seconds.usec ago") so the target can avoid doing 2492 * extra calls into the filesystem if that isn't necessary (e.g. 2493 * during mount that would help a bit). Having relative timestamps 2494 * is not so great if request processing is slow, while absolute 2495 * timestamps are not ideal because they need time synchronization. */ 2496 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS); 2497 if (req == NULL) 2498 return -ENOMEM; 2499 2500 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS); 2501 if (rc) { 2502 ptlrpc_request_free(req); 2503 return rc; 2504 } 2505 ptlrpc_request_set_replen(req); 2506 req->rq_request_portal = OST_CREATE_PORTAL; 2507 ptlrpc_at_set_req_timeout(req); 2508 2509 if (oinfo->oi_flags & OBD_STATFS_NODELAY) { 2510 /* procfs requests not want stat in wait for avoid deadlock */ 2511 req->rq_no_resend = 1; 2512 req->rq_no_delay = 1; 2513 } 2514 2515 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret; 2516 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args)); 2517 aa = ptlrpc_req_async_args(req); 2518 aa->aa_oi = oinfo; 2519 2520 ptlrpc_set_add_req(rqset, req); 2521 return 0; 2522} 2523 2524static int osc_statfs(const struct lu_env *env, struct obd_export *exp, 2525 struct obd_statfs *osfs, __u64 max_age, __u32 flags) 2526{ 2527 struct obd_device *obd = class_exp2obd(exp); 2528 struct obd_statfs *msfs; 2529 struct ptlrpc_request *req; 2530 struct obd_import *imp = NULL; 2531 int rc; 2532 2533 /*Since the request might also come from lprocfs, so we need 2534 *sync this with client_disconnect_export Bug15684*/ 2535 down_read(&obd->u.cli.cl_sem); 2536 if (obd->u.cli.cl_import) 2537 imp = class_import_get(obd->u.cli.cl_import); 2538 up_read(&obd->u.cli.cl_sem); 2539 if (!imp) 2540 return -ENODEV; 2541 2542 /* We could possibly pass max_age in the request (as an absolute 2543 * timestamp or a "seconds.usec ago") so the target can avoid doing 2544 * extra calls into the filesystem if that isn't necessary (e.g. 2545 * during mount that would help a bit). Having relative timestamps 2546 * is not so great if request processing is slow, while absolute 2547 * timestamps are not ideal because they need time synchronization. */ 2548 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS); 2549 2550 class_import_put(imp); 2551 2552 if (req == NULL) 2553 return -ENOMEM; 2554 2555 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS); 2556 if (rc) { 2557 ptlrpc_request_free(req); 2558 return rc; 2559 } 2560 ptlrpc_request_set_replen(req); 2561 req->rq_request_portal = OST_CREATE_PORTAL; 2562 ptlrpc_at_set_req_timeout(req); 2563 2564 if (flags & OBD_STATFS_NODELAY) { 2565 /* procfs requests not want stat in wait for avoid deadlock */ 2566 req->rq_no_resend = 1; 2567 req->rq_no_delay = 1; 2568 } 2569 2570 rc = ptlrpc_queue_wait(req); 2571 if (rc) 2572 goto out; 2573 2574 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS); 2575 if (msfs == NULL) { 2576 rc = -EPROTO; 2577 goto out; 2578 } 2579 2580 *osfs = *msfs; 2581 2582 out: 2583 ptlrpc_req_finished(req); 2584 return rc; 2585} 2586 2587/* Retrieve object striping information. 2588 * 2589 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating 2590 * the maximum number of OST indices which will fit in the user buffer. 2591 * lmm_magic must be LOV_MAGIC (we only use 1 slot here). 2592 */ 2593static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump) 2594{ 2595 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */ 2596 struct lov_user_md_v3 lum, *lumk; 2597 struct lov_user_ost_data_v1 *lmm_objects; 2598 int rc = 0, lum_size; 2599 2600 if (!lsm) 2601 return -ENODATA; 2602 2603 /* we only need the header part from user space to get lmm_magic and 2604 * lmm_stripe_count, (the header part is common to v1 and v3) */ 2605 lum_size = sizeof(struct lov_user_md_v1); 2606 if (copy_from_user(&lum, lump, lum_size)) 2607 return -EFAULT; 2608 2609 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) && 2610 (lum.lmm_magic != LOV_USER_MAGIC_V3)) 2611 return -EINVAL; 2612 2613 /* lov_user_md_vX and lov_mds_md_vX must have the same size */ 2614 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1)); 2615 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3)); 2616 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0])); 2617 2618 /* we can use lov_mds_md_size() to compute lum_size 2619 * because lov_user_md_vX and lov_mds_md_vX have the same size */ 2620 if (lum.lmm_stripe_count > 0) { 2621 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic); 2622 OBD_ALLOC(lumk, lum_size); 2623 if (!lumk) 2624 return -ENOMEM; 2625 2626 if (lum.lmm_magic == LOV_USER_MAGIC_V1) 2627 lmm_objects = 2628 &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]); 2629 else 2630 lmm_objects = &(lumk->lmm_objects[0]); 2631 lmm_objects->l_ost_oi = lsm->lsm_oi; 2632 } else { 2633 lum_size = lov_mds_md_size(0, lum.lmm_magic); 2634 lumk = &lum; 2635 } 2636 2637 lumk->lmm_oi = lsm->lsm_oi; 2638 lumk->lmm_stripe_count = 1; 2639 2640 if (copy_to_user(lump, lumk, lum_size)) 2641 rc = -EFAULT; 2642 2643 if (lumk != &lum) 2644 OBD_FREE(lumk, lum_size); 2645 2646 return rc; 2647} 2648 2649 2650static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, 2651 void *karg, void *uarg) 2652{ 2653 struct obd_device *obd = exp->exp_obd; 2654 struct obd_ioctl_data *data = karg; 2655 int err = 0; 2656 2657 if (!try_module_get(THIS_MODULE)) { 2658 CERROR("Can't get module. Is it alive?"); 2659 return -EINVAL; 2660 } 2661 switch (cmd) { 2662 case OBD_IOC_LOV_GET_CONFIG: { 2663 char *buf; 2664 struct lov_desc *desc; 2665 struct obd_uuid uuid; 2666 2667 buf = NULL; 2668 len = 0; 2669 if (obd_ioctl_getdata(&buf, &len, (void *)uarg)) { 2670 err = -EINVAL; 2671 goto out; 2672 } 2673 2674 data = (struct obd_ioctl_data *)buf; 2675 2676 if (sizeof(*desc) > data->ioc_inllen1) { 2677 obd_ioctl_freedata(buf, len); 2678 err = -EINVAL; 2679 goto out; 2680 } 2681 2682 if (data->ioc_inllen2 < sizeof(uuid)) { 2683 obd_ioctl_freedata(buf, len); 2684 err = -EINVAL; 2685 goto out; 2686 } 2687 2688 desc = (struct lov_desc *)data->ioc_inlbuf1; 2689 desc->ld_tgt_count = 1; 2690 desc->ld_active_tgt_count = 1; 2691 desc->ld_default_stripe_count = 1; 2692 desc->ld_default_stripe_size = 0; 2693 desc->ld_default_stripe_offset = 0; 2694 desc->ld_pattern = 0; 2695 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid)); 2696 2697 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid)); 2698 2699 err = copy_to_user((void *)uarg, buf, len); 2700 if (err) 2701 err = -EFAULT; 2702 obd_ioctl_freedata(buf, len); 2703 goto out; 2704 } 2705 case LL_IOC_LOV_SETSTRIPE: 2706 err = obd_alloc_memmd(exp, karg); 2707 if (err > 0) 2708 err = 0; 2709 goto out; 2710 case LL_IOC_LOV_GETSTRIPE: 2711 err = osc_getstripe(karg, uarg); 2712 goto out; 2713 case OBD_IOC_CLIENT_RECOVER: 2714 err = ptlrpc_recover_import(obd->u.cli.cl_import, 2715 data->ioc_inlbuf1, 0); 2716 if (err > 0) 2717 err = 0; 2718 goto out; 2719 case IOC_OSC_SET_ACTIVE: 2720 err = ptlrpc_set_import_active(obd->u.cli.cl_import, 2721 data->ioc_offset); 2722 goto out; 2723 case OBD_IOC_POLL_QUOTACHECK: 2724 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg); 2725 goto out; 2726 case OBD_IOC_PING_TARGET: 2727 err = ptlrpc_obd_ping(obd); 2728 goto out; 2729 default: 2730 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n", 2731 cmd, current_comm()); 2732 err = -ENOTTY; 2733 goto out; 2734 } 2735out: 2736 module_put(THIS_MODULE); 2737 return err; 2738} 2739 2740static int osc_get_info(const struct lu_env *env, struct obd_export *exp, 2741 u32 keylen, void *key, __u32 *vallen, void *val, 2742 struct lov_stripe_md *lsm) 2743{ 2744 if (!vallen || !val) 2745 return -EFAULT; 2746 2747 if (KEY_IS(KEY_LOCK_TO_STRIPE)) { 2748 __u32 *stripe = val; 2749 *vallen = sizeof(*stripe); 2750 *stripe = 0; 2751 return 0; 2752 } else if (KEY_IS(KEY_LAST_ID)) { 2753 struct ptlrpc_request *req; 2754 u64 *reply; 2755 char *tmp; 2756 int rc; 2757 2758 req = ptlrpc_request_alloc(class_exp2cliimp(exp), 2759 &RQF_OST_GET_INFO_LAST_ID); 2760 if (req == NULL) 2761 return -ENOMEM; 2762 2763 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY, 2764 RCL_CLIENT, keylen); 2765 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO); 2766 if (rc) { 2767 ptlrpc_request_free(req); 2768 return rc; 2769 } 2770 2771 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY); 2772 memcpy(tmp, key, keylen); 2773 2774 req->rq_no_delay = req->rq_no_resend = 1; 2775 ptlrpc_request_set_replen(req); 2776 rc = ptlrpc_queue_wait(req); 2777 if (rc) 2778 goto out; 2779 2780 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID); 2781 if (reply == NULL) { 2782 rc = -EPROTO; 2783 goto out; 2784 } 2785 2786 *((u64 *)val) = *reply; 2787 out: 2788 ptlrpc_req_finished(req); 2789 return rc; 2790 } else if (KEY_IS(KEY_FIEMAP)) { 2791 struct ll_fiemap_info_key *fm_key = 2792 (struct ll_fiemap_info_key *)key; 2793 struct ldlm_res_id res_id; 2794 ldlm_policy_data_t policy; 2795 struct lustre_handle lockh; 2796 ldlm_mode_t mode = 0; 2797 struct ptlrpc_request *req; 2798 struct ll_user_fiemap *reply; 2799 char *tmp; 2800 int rc; 2801 2802 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC)) 2803 goto skip_locking; 2804 2805 policy.l_extent.start = fm_key->fiemap.fm_start & 2806 CFS_PAGE_MASK; 2807 2808 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <= 2809 fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1) 2810 policy.l_extent.end = OBD_OBJECT_EOF; 2811 else 2812 policy.l_extent.end = (fm_key->fiemap.fm_start + 2813 fm_key->fiemap.fm_length + 2814 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK; 2815 2816 ostid_build_res_name(&fm_key->oa.o_oi, &res_id); 2817 mode = ldlm_lock_match(exp->exp_obd->obd_namespace, 2818 LDLM_FL_BLOCK_GRANTED | 2819 LDLM_FL_LVB_READY, 2820 &res_id, LDLM_EXTENT, &policy, 2821 LCK_PR | LCK_PW, &lockh, 0); 2822 if (mode) { /* lock is cached on client */ 2823 if (mode != LCK_PR) { 2824 ldlm_lock_addref(&lockh, LCK_PR); 2825 ldlm_lock_decref(&lockh, LCK_PW); 2826 } 2827 } else { /* no cached lock, needs acquire lock on server side */ 2828 fm_key->oa.o_valid |= OBD_MD_FLFLAGS; 2829 fm_key->oa.o_flags |= OBD_FL_SRVLOCK; 2830 } 2831 2832skip_locking: 2833 req = ptlrpc_request_alloc(class_exp2cliimp(exp), 2834 &RQF_OST_GET_INFO_FIEMAP); 2835 if (req == NULL) { 2836 rc = -ENOMEM; 2837 goto drop_lock; 2838 } 2839 2840 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY, 2841 RCL_CLIENT, keylen); 2842 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL, 2843 RCL_CLIENT, *vallen); 2844 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL, 2845 RCL_SERVER, *vallen); 2846 2847 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO); 2848 if (rc) { 2849 ptlrpc_request_free(req); 2850 goto drop_lock; 2851 } 2852 2853 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY); 2854 memcpy(tmp, key, keylen); 2855 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL); 2856 memcpy(tmp, val, *vallen); 2857 2858 ptlrpc_request_set_replen(req); 2859 rc = ptlrpc_queue_wait(req); 2860 if (rc) 2861 goto fini_req; 2862 2863 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL); 2864 if (reply == NULL) { 2865 rc = -EPROTO; 2866 goto fini_req; 2867 } 2868 2869 memcpy(val, reply, *vallen); 2870fini_req: 2871 ptlrpc_req_finished(req); 2872drop_lock: 2873 if (mode) 2874 ldlm_lock_decref(&lockh, LCK_PR); 2875 return rc; 2876 } 2877 2878 return -EINVAL; 2879} 2880 2881static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp, 2882 u32 keylen, void *key, u32 vallen, 2883 void *val, struct ptlrpc_request_set *set) 2884{ 2885 struct ptlrpc_request *req; 2886 struct obd_device *obd = exp->exp_obd; 2887 struct obd_import *imp = class_exp2cliimp(exp); 2888 char *tmp; 2889 int rc; 2890 2891 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10); 2892 2893 if (KEY_IS(KEY_CHECKSUM)) { 2894 if (vallen != sizeof(int)) 2895 return -EINVAL; 2896 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0; 2897 return 0; 2898 } 2899 2900 if (KEY_IS(KEY_SPTLRPC_CONF)) { 2901 sptlrpc_conf_client_adapt(obd); 2902 return 0; 2903 } 2904 2905 if (KEY_IS(KEY_FLUSH_CTX)) { 2906 sptlrpc_import_flush_my_ctx(imp); 2907 return 0; 2908 } 2909 2910 if (KEY_IS(KEY_CACHE_SET)) { 2911 struct client_obd *cli = &obd->u.cli; 2912 2913 LASSERT(cli->cl_cache == NULL); /* only once */ 2914 cli->cl_cache = (struct cl_client_cache *)val; 2915 atomic_inc(&cli->cl_cache->ccc_users); 2916 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left; 2917 2918 /* add this osc into entity list */ 2919 LASSERT(list_empty(&cli->cl_lru_osc)); 2920 spin_lock(&cli->cl_cache->ccc_lru_lock); 2921 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru); 2922 spin_unlock(&cli->cl_cache->ccc_lru_lock); 2923 2924 return 0; 2925 } 2926 2927 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) { 2928 struct client_obd *cli = &obd->u.cli; 2929 int nr = atomic_read(&cli->cl_lru_in_list) >> 1; 2930 int target = *(int *)val; 2931 2932 nr = osc_lru_shrink(cli, min(nr, target)); 2933 *(int *)val -= nr; 2934 return 0; 2935 } 2936 2937 if (!set && !KEY_IS(KEY_GRANT_SHRINK)) 2938 return -EINVAL; 2939 2940 /* We pass all other commands directly to OST. Since nobody calls osc 2941 methods directly and everybody is supposed to go through LOV, we 2942 assume lov checked invalid values for us. 2943 The only recognised values so far are evict_by_nid and mds_conn. 2944 Even if something bad goes through, we'd get a -EINVAL from OST 2945 anyway. */ 2946 2947 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ? 2948 &RQF_OST_SET_GRANT_INFO : 2949 &RQF_OBD_SET_INFO); 2950 if (req == NULL) 2951 return -ENOMEM; 2952 2953 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY, 2954 RCL_CLIENT, keylen); 2955 if (!KEY_IS(KEY_GRANT_SHRINK)) 2956 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL, 2957 RCL_CLIENT, vallen); 2958 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO); 2959 if (rc) { 2960 ptlrpc_request_free(req); 2961 return rc; 2962 } 2963 2964 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY); 2965 memcpy(tmp, key, keylen); 2966 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ? 2967 &RMF_OST_BODY : 2968 &RMF_SETINFO_VAL); 2969 memcpy(tmp, val, vallen); 2970 2971 if (KEY_IS(KEY_GRANT_SHRINK)) { 2972 struct osc_brw_async_args *aa; 2973 struct obdo *oa; 2974 2975 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); 2976 aa = ptlrpc_req_async_args(req); 2977 OBDO_ALLOC(oa); 2978 if (!oa) { 2979 ptlrpc_req_finished(req); 2980 return -ENOMEM; 2981 } 2982 *oa = ((struct ost_body *)val)->oa; 2983 aa->aa_oa = oa; 2984 req->rq_interpret_reply = osc_shrink_grant_interpret; 2985 } 2986 2987 ptlrpc_request_set_replen(req); 2988 if (!KEY_IS(KEY_GRANT_SHRINK)) { 2989 LASSERT(set != NULL); 2990 ptlrpc_set_add_req(set, req); 2991 ptlrpc_check_set(NULL, set); 2992 } else 2993 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); 2994 2995 return 0; 2996} 2997 2998static int osc_reconnect(const struct lu_env *env, 2999 struct obd_export *exp, struct obd_device *obd, 3000 struct obd_uuid *cluuid, 3001 struct obd_connect_data *data, 3002 void *localdata) 3003{ 3004 struct client_obd *cli = &obd->u.cli; 3005 3006 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) { 3007 long lost_grant; 3008 3009 client_obd_list_lock(&cli->cl_loi_list_lock); 3010 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?: 3011 2 * cli_brw_size(obd); 3012 lost_grant = cli->cl_lost_grant; 3013 cli->cl_lost_grant = 0; 3014 client_obd_list_unlock(&cli->cl_loi_list_lock); 3015 3016 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d" 3017 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags, 3018 data->ocd_version, data->ocd_grant, lost_grant); 3019 } 3020 3021 return 0; 3022} 3023 3024static int osc_disconnect(struct obd_export *exp) 3025{ 3026 struct obd_device *obd = class_exp2obd(exp); 3027 int rc; 3028 3029 rc = client_disconnect_export(exp); 3030 /** 3031 * Initially we put del_shrink_grant before disconnect_export, but it 3032 * causes the following problem if setup (connect) and cleanup 3033 * (disconnect) are tangled together. 3034 * connect p1 disconnect p2 3035 * ptlrpc_connect_import 3036 * ............... class_manual_cleanup 3037 * osc_disconnect 3038 * del_shrink_grant 3039 * ptlrpc_connect_interrupt 3040 * init_grant_shrink 3041 * add this client to shrink list 3042 * cleanup_osc 3043 * Bang! pinger trigger the shrink. 3044 * So the osc should be disconnected from the shrink list, after we 3045 * are sure the import has been destroyed. BUG18662 3046 */ 3047 if (obd->u.cli.cl_import == NULL) 3048 osc_del_shrink_grant(&obd->u.cli); 3049 return rc; 3050} 3051 3052static int osc_import_event(struct obd_device *obd, 3053 struct obd_import *imp, 3054 enum obd_import_event event) 3055{ 3056 struct client_obd *cli; 3057 int rc = 0; 3058 3059 LASSERT(imp->imp_obd == obd); 3060 3061 switch (event) { 3062 case IMP_EVENT_DISCON: { 3063 cli = &obd->u.cli; 3064 client_obd_list_lock(&cli->cl_loi_list_lock); 3065 cli->cl_avail_grant = 0; 3066 cli->cl_lost_grant = 0; 3067 client_obd_list_unlock(&cli->cl_loi_list_lock); 3068 break; 3069 } 3070 case IMP_EVENT_INACTIVE: { 3071 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL); 3072 break; 3073 } 3074 case IMP_EVENT_INVALIDATE: { 3075 struct ldlm_namespace *ns = obd->obd_namespace; 3076 struct lu_env *env; 3077 int refcheck; 3078 3079 env = cl_env_get(&refcheck); 3080 if (!IS_ERR(env)) { 3081 /* Reset grants */ 3082 cli = &obd->u.cli; 3083 /* all pages go to failing rpcs due to the invalid 3084 * import */ 3085 osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND); 3086 3087 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY); 3088 cl_env_put(env, &refcheck); 3089 } else 3090 rc = PTR_ERR(env); 3091 break; 3092 } 3093 case IMP_EVENT_ACTIVE: { 3094 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL); 3095 break; 3096 } 3097 case IMP_EVENT_OCD: { 3098 struct obd_connect_data *ocd = &imp->imp_connect_data; 3099 3100 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT) 3101 osc_init_grant(&obd->u.cli, ocd); 3102 3103 /* See bug 7198 */ 3104 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL) 3105 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL; 3106 3107 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL); 3108 break; 3109 } 3110 case IMP_EVENT_DEACTIVATE: { 3111 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL); 3112 break; 3113 } 3114 case IMP_EVENT_ACTIVATE: { 3115 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL); 3116 break; 3117 } 3118 default: 3119 CERROR("Unknown import event %d\n", event); 3120 LBUG(); 3121 } 3122 return rc; 3123} 3124 3125/** 3126 * Determine whether the lock can be canceled before replaying the lock 3127 * during recovery, see bug16774 for detailed information. 3128 * 3129 * \retval zero the lock can't be canceled 3130 * \retval other ok to cancel 3131 */ 3132static int osc_cancel_for_recovery(struct ldlm_lock *lock) 3133{ 3134 check_res_locked(lock->l_resource); 3135 3136 /* 3137 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR. 3138 * 3139 * XXX as a future improvement, we can also cancel unused write lock 3140 * if it doesn't have dirty data and active mmaps. 3141 */ 3142 if (lock->l_resource->lr_type == LDLM_EXTENT && 3143 (lock->l_granted_mode == LCK_PR || 3144 lock->l_granted_mode == LCK_CR) && 3145 (osc_dlm_lock_pageref(lock) == 0)) 3146 return 1; 3147 3148 return 0; 3149} 3150 3151static int brw_queue_work(const struct lu_env *env, void *data) 3152{ 3153 struct client_obd *cli = data; 3154 3155 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli); 3156 3157 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME); 3158 return 0; 3159} 3160 3161int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) 3162{ 3163 struct lprocfs_static_vars lvars = { NULL }; 3164 struct client_obd *cli = &obd->u.cli; 3165 void *handler; 3166 int rc; 3167 3168 rc = ptlrpcd_addref(); 3169 if (rc) 3170 return rc; 3171 3172 rc = client_obd_setup(obd, lcfg); 3173 if (rc) 3174 goto out_ptlrpcd; 3175 3176 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli); 3177 if (IS_ERR(handler)) { 3178 rc = PTR_ERR(handler); 3179 goto out_client_setup; 3180 } 3181 cli->cl_writeback_work = handler; 3182 3183 rc = osc_quota_setup(obd); 3184 if (rc) 3185 goto out_ptlrpcd_work; 3186 3187 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL; 3188 lprocfs_osc_init_vars(&lvars); 3189 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) { 3190 lproc_osc_attach_seqstat(obd); 3191 sptlrpc_lprocfs_cliobd_attach(obd); 3192 ptlrpc_lprocfs_register_obd(obd); 3193 } 3194 3195 /* We need to allocate a few requests more, because 3196 * brw_interpret tries to create new requests before freeing 3197 * previous ones, Ideally we want to have 2x max_rpcs_in_flight 3198 * reserved, but I'm afraid that might be too much wasted RAM 3199 * in fact, so 2 is just my guess and still should work. */ 3200 cli->cl_import->imp_rq_pool = 3201 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2, 3202 OST_MAXREQSIZE, 3203 ptlrpc_add_rqs_to_pool); 3204 3205 INIT_LIST_HEAD(&cli->cl_grant_shrink_list); 3206 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery); 3207 return rc; 3208 3209out_ptlrpcd_work: 3210 ptlrpcd_destroy_work(handler); 3211out_client_setup: 3212 client_obd_cleanup(obd); 3213out_ptlrpcd: 3214 ptlrpcd_decref(); 3215 return rc; 3216} 3217 3218static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) 3219{ 3220 int rc = 0; 3221 3222 switch (stage) { 3223 case OBD_CLEANUP_EARLY: { 3224 struct obd_import *imp; 3225 imp = obd->u.cli.cl_import; 3226 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name); 3227 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */ 3228 ptlrpc_deactivate_import(imp); 3229 spin_lock(&imp->imp_lock); 3230 imp->imp_pingable = 0; 3231 spin_unlock(&imp->imp_lock); 3232 break; 3233 } 3234 case OBD_CLEANUP_EXPORTS: { 3235 struct client_obd *cli = &obd->u.cli; 3236 /* LU-464 3237 * for echo client, export may be on zombie list, wait for 3238 * zombie thread to cull it, because cli.cl_import will be 3239 * cleared in client_disconnect_export(): 3240 * class_export_destroy() -> obd_cleanup() -> 3241 * echo_device_free() -> echo_client_cleanup() -> 3242 * obd_disconnect() -> osc_disconnect() -> 3243 * client_disconnect_export() 3244 */ 3245 obd_zombie_barrier(); 3246 if (cli->cl_writeback_work) { 3247 ptlrpcd_destroy_work(cli->cl_writeback_work); 3248 cli->cl_writeback_work = NULL; 3249 } 3250 obd_cleanup_client_import(obd); 3251 ptlrpc_lprocfs_unregister_obd(obd); 3252 lprocfs_obd_cleanup(obd); 3253 break; 3254 } 3255 } 3256 return rc; 3257} 3258 3259int osc_cleanup(struct obd_device *obd) 3260{ 3261 struct client_obd *cli = &obd->u.cli; 3262 int rc; 3263 3264 /* lru cleanup */ 3265 if (cli->cl_cache != NULL) { 3266 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0); 3267 spin_lock(&cli->cl_cache->ccc_lru_lock); 3268 list_del_init(&cli->cl_lru_osc); 3269 spin_unlock(&cli->cl_cache->ccc_lru_lock); 3270 cli->cl_lru_left = NULL; 3271 atomic_dec(&cli->cl_cache->ccc_users); 3272 cli->cl_cache = NULL; 3273 } 3274 3275 /* free memory of osc quota cache */ 3276 osc_quota_cleanup(obd); 3277 3278 rc = client_obd_cleanup(obd); 3279 3280 ptlrpcd_decref(); 3281 return rc; 3282} 3283 3284int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg) 3285{ 3286 struct lprocfs_static_vars lvars = { NULL }; 3287 int rc = 0; 3288 3289 lprocfs_osc_init_vars(&lvars); 3290 3291 switch (lcfg->lcfg_command) { 3292 default: 3293 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars, 3294 lcfg, obd); 3295 if (rc > 0) 3296 rc = 0; 3297 break; 3298 } 3299 3300 return rc; 3301} 3302 3303static int osc_process_config(struct obd_device *obd, u32 len, void *buf) 3304{ 3305 return osc_process_config_base(obd, buf); 3306} 3307 3308struct obd_ops osc_obd_ops = { 3309 .o_owner = THIS_MODULE, 3310 .o_setup = osc_setup, 3311 .o_precleanup = osc_precleanup, 3312 .o_cleanup = osc_cleanup, 3313 .o_add_conn = client_import_add_conn, 3314 .o_del_conn = client_import_del_conn, 3315 .o_connect = client_connect_import, 3316 .o_reconnect = osc_reconnect, 3317 .o_disconnect = osc_disconnect, 3318 .o_statfs = osc_statfs, 3319 .o_statfs_async = osc_statfs_async, 3320 .o_packmd = osc_packmd, 3321 .o_unpackmd = osc_unpackmd, 3322 .o_create = osc_create, 3323 .o_destroy = osc_destroy, 3324 .o_getattr = osc_getattr, 3325 .o_getattr_async = osc_getattr_async, 3326 .o_setattr = osc_setattr, 3327 .o_setattr_async = osc_setattr_async, 3328 .o_find_cbdata = osc_find_cbdata, 3329 .o_iocontrol = osc_iocontrol, 3330 .o_get_info = osc_get_info, 3331 .o_set_info_async = osc_set_info_async, 3332 .o_import_event = osc_import_event, 3333 .o_process_config = osc_process_config, 3334 .o_quotactl = osc_quotactl, 3335 .o_quotacheck = osc_quotacheck, 3336}; 3337 3338extern struct lu_kmem_descr osc_caches[]; 3339extern spinlock_t osc_ast_guard; 3340extern struct lock_class_key osc_ast_guard_class; 3341 3342int __init osc_init(void) 3343{ 3344 struct lprocfs_static_vars lvars = { NULL }; 3345 int rc; 3346 3347 /* print an address of _any_ initialized kernel symbol from this 3348 * module, to allow debugging with gdb that doesn't support data 3349 * symbols from modules.*/ 3350 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches); 3351 3352 rc = lu_kmem_init(osc_caches); 3353 if (rc) 3354 return rc; 3355 3356 lprocfs_osc_init_vars(&lvars); 3357 3358 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars, 3359 LUSTRE_OSC_NAME, &osc_device_type); 3360 if (rc) { 3361 lu_kmem_fini(osc_caches); 3362 return rc; 3363 } 3364 3365 spin_lock_init(&osc_ast_guard); 3366 lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class); 3367 3368 return rc; 3369} 3370 3371static void /*__exit*/ osc_exit(void) 3372{ 3373 class_unregister_type(LUSTRE_OSC_NAME); 3374 lu_kmem_fini(osc_caches); 3375} 3376 3377MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>"); 3378MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)"); 3379MODULE_LICENSE("GPL"); 3380MODULE_VERSION(LUSTRE_VERSION_STRING); 3381 3382module_init(osc_init); 3383module_exit(osc_exit); 3384