fld_request.c revision 1a339759f706d3bc7337348af728b04a8d30e31d
1/* 2 * GPL HEADER START 3 * 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License version 2 only, 8 * as published by the Free Software Foundation. 9 * 10 * This program is distributed in the hope that it will be useful, but 11 * WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * General Public License version 2 for more details (a copy is included 14 * in the LICENSE file that accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License 17 * version 2 along with this program; If not, see 18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf 19 * 20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 21 * CA 95054 USA or visit www.sun.com if you need additional information or 22 * have any questions. 23 * 24 * GPL HEADER END 25 */ 26/* 27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. 28 * Use is subject to license terms. 29 * 30 * Copyright (c) 2011, 2013, Intel Corporation. 31 */ 32/* 33 * This file is part of Lustre, http://www.lustre.org/ 34 * Lustre is a trademark of Sun Microsystems, Inc. 35 * 36 * lustre/fld/fld_request.c 37 * 38 * FLD (Fids Location Database) 39 * 40 * Author: Yury Umanets <umka@clusterfs.com> 41 */ 42 43#define DEBUG_SUBSYSTEM S_FLD 44 45#include "../../include/linux/libcfs/libcfs.h" 46#include <linux/module.h> 47#include <asm/div64.h> 48 49#include "../include/obd.h" 50#include "../include/obd_class.h" 51#include "../include/lustre_ver.h" 52#include "../include/obd_support.h" 53#include "../include/lprocfs_status.h" 54 55#include "../include/dt_object.h" 56#include "../include/lustre_req_layout.h" 57#include "../include/lustre_fld.h" 58#include "../include/lustre_mdc.h" 59#include "fld_internal.h" 60 61/* TODO: these 3 functions are copies of flow-control code from mdc_lib.c 62 * It should be common thing. The same about mdc RPC lock */ 63static int fld_req_avail(struct client_obd *cli, struct mdc_cache_waiter *mcw) 64{ 65 int rc; 66 67 client_obd_list_lock(&cli->cl_loi_list_lock); 68 rc = list_empty(&mcw->mcw_entry); 69 client_obd_list_unlock(&cli->cl_loi_list_lock); 70 return rc; 71}; 72 73static void fld_enter_request(struct client_obd *cli) 74{ 75 struct mdc_cache_waiter mcw; 76 struct l_wait_info lwi = { 0 }; 77 78 client_obd_list_lock(&cli->cl_loi_list_lock); 79 if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) { 80 list_add_tail(&mcw.mcw_entry, &cli->cl_cache_waiters); 81 init_waitqueue_head(&mcw.mcw_waitq); 82 client_obd_list_unlock(&cli->cl_loi_list_lock); 83 l_wait_event(mcw.mcw_waitq, fld_req_avail(cli, &mcw), &lwi); 84 } else { 85 cli->cl_r_in_flight++; 86 client_obd_list_unlock(&cli->cl_loi_list_lock); 87 } 88} 89 90static void fld_exit_request(struct client_obd *cli) 91{ 92 struct list_head *l, *tmp; 93 struct mdc_cache_waiter *mcw; 94 95 client_obd_list_lock(&cli->cl_loi_list_lock); 96 cli->cl_r_in_flight--; 97 list_for_each_safe(l, tmp, &cli->cl_cache_waiters) { 98 99 if (cli->cl_r_in_flight >= cli->cl_max_rpcs_in_flight) { 100 /* No free request slots anymore */ 101 break; 102 } 103 104 mcw = list_entry(l, struct mdc_cache_waiter, mcw_entry); 105 list_del_init(&mcw->mcw_entry); 106 cli->cl_r_in_flight++; 107 wake_up(&mcw->mcw_waitq); 108 } 109 client_obd_list_unlock(&cli->cl_loi_list_lock); 110} 111 112static int fld_rrb_hash(struct lu_client_fld *fld, u64 seq) 113{ 114 LASSERT(fld->lcf_count > 0); 115 return do_div(seq, fld->lcf_count); 116} 117 118static struct lu_fld_target * 119fld_rrb_scan(struct lu_client_fld *fld, u64 seq) 120{ 121 struct lu_fld_target *target; 122 int hash; 123 124 /* Because almost all of special sequence located in MDT0, 125 * it should go to index 0 directly, instead of calculating 126 * hash again, and also if other MDTs is not being connected, 127 * the fld lookup requests(for seq on MDT0) should not be 128 * blocked because of other MDTs */ 129 if (fid_seq_is_norm(seq)) 130 hash = fld_rrb_hash(fld, seq); 131 else 132 hash = 0; 133 134 list_for_each_entry(target, &fld->lcf_targets, ft_chain) { 135 if (target->ft_idx == hash) 136 return target; 137 } 138 139 CERROR("%s: Can't find target by hash %d (seq %#llx). Targets (%d):\n", 140 fld->lcf_name, hash, seq, fld->lcf_count); 141 142 list_for_each_entry(target, &fld->lcf_targets, ft_chain) { 143 const char *srv_name = target->ft_srv != NULL ? 144 target->ft_srv->lsf_name : "<null>"; 145 const char *exp_name = target->ft_exp != NULL ? 146 (char *)target->ft_exp->exp_obd->obd_uuid.uuid : 147 "<null>"; 148 149 CERROR(" exp: 0x%p (%s), srv: 0x%p (%s), idx: %llu\n", 150 target->ft_exp, exp_name, target->ft_srv, 151 srv_name, target->ft_idx); 152 } 153 154 /* 155 * If target is not found, there is logical error anyway, so here is 156 * LBUG() to catch this situation. 157 */ 158 LBUG(); 159 return NULL; 160} 161 162struct lu_fld_hash fld_hash[] = { 163 { 164 .fh_name = "RRB", 165 .fh_hash_func = fld_rrb_hash, 166 .fh_scan_func = fld_rrb_scan 167 }, 168 { 169 NULL, 170 } 171}; 172 173static struct lu_fld_target * 174fld_client_get_target(struct lu_client_fld *fld, u64 seq) 175{ 176 struct lu_fld_target *target; 177 178 LASSERT(fld->lcf_hash != NULL); 179 180 spin_lock(&fld->lcf_lock); 181 target = fld->lcf_hash->fh_scan_func(fld, seq); 182 spin_unlock(&fld->lcf_lock); 183 184 if (target != NULL) { 185 CDEBUG(D_INFO, "%s: Found target (idx %llu) by seq %#llx\n", 186 fld->lcf_name, target->ft_idx, seq); 187 } 188 189 return target; 190} 191 192/* 193 * Add export to FLD. This is usually done by CMM and LMV as they are main users 194 * of FLD module. 195 */ 196int fld_client_add_target(struct lu_client_fld *fld, 197 struct lu_fld_target *tar) 198{ 199 const char *name; 200 struct lu_fld_target *target, *tmp; 201 202 LASSERT(tar != NULL); 203 name = fld_target_name(tar); 204 LASSERT(name != NULL); 205 LASSERT(tar->ft_srv != NULL || tar->ft_exp != NULL); 206 207 if (fld->lcf_flags != LUSTRE_FLD_INIT) { 208 CERROR("%s: Attempt to add target %s (idx %llu) on fly - skip it\n", 209 fld->lcf_name, name, tar->ft_idx); 210 return 0; 211 } else { 212 CDEBUG(D_INFO, "%s: Adding target %s (idx %llu)\n", 213 fld->lcf_name, name, tar->ft_idx); 214 } 215 216 OBD_ALLOC_PTR(target); 217 if (target == NULL) 218 return -ENOMEM; 219 220 spin_lock(&fld->lcf_lock); 221 list_for_each_entry(tmp, &fld->lcf_targets, ft_chain) { 222 if (tmp->ft_idx == tar->ft_idx) { 223 spin_unlock(&fld->lcf_lock); 224 OBD_FREE_PTR(target); 225 CERROR("Target %s exists in FLD and known as %s:#%llu\n", 226 name, fld_target_name(tmp), tmp->ft_idx); 227 return -EEXIST; 228 } 229 } 230 231 target->ft_exp = tar->ft_exp; 232 if (target->ft_exp != NULL) 233 class_export_get(target->ft_exp); 234 target->ft_srv = tar->ft_srv; 235 target->ft_idx = tar->ft_idx; 236 237 list_add_tail(&target->ft_chain, 238 &fld->lcf_targets); 239 240 fld->lcf_count++; 241 spin_unlock(&fld->lcf_lock); 242 243 return 0; 244} 245EXPORT_SYMBOL(fld_client_add_target); 246 247/* Remove export from FLD */ 248int fld_client_del_target(struct lu_client_fld *fld, __u64 idx) 249{ 250 struct lu_fld_target *target, *tmp; 251 252 spin_lock(&fld->lcf_lock); 253 list_for_each_entry_safe(target, tmp, 254 &fld->lcf_targets, ft_chain) { 255 if (target->ft_idx == idx) { 256 fld->lcf_count--; 257 list_del(&target->ft_chain); 258 spin_unlock(&fld->lcf_lock); 259 260 if (target->ft_exp != NULL) 261 class_export_put(target->ft_exp); 262 263 OBD_FREE_PTR(target); 264 return 0; 265 } 266 } 267 spin_unlock(&fld->lcf_lock); 268 return -ENOENT; 269} 270EXPORT_SYMBOL(fld_client_del_target); 271 272struct proc_dir_entry *fld_type_proc_dir = NULL; 273 274#if defined (CONFIG_PROC_FS) 275static int fld_client_proc_init(struct lu_client_fld *fld) 276{ 277 int rc; 278 279 fld->lcf_proc_dir = lprocfs_register(fld->lcf_name, 280 fld_type_proc_dir, 281 NULL, NULL); 282 283 if (IS_ERR(fld->lcf_proc_dir)) { 284 CERROR("%s: LProcFS failed in fld-init\n", 285 fld->lcf_name); 286 rc = PTR_ERR(fld->lcf_proc_dir); 287 return rc; 288 } 289 290 rc = lprocfs_add_vars(fld->lcf_proc_dir, 291 fld_client_proc_list, fld); 292 if (rc) { 293 CERROR("%s: Can't init FLD proc, rc %d\n", 294 fld->lcf_name, rc); 295 goto out_cleanup; 296 } 297 298 return 0; 299 300out_cleanup: 301 fld_client_proc_fini(fld); 302 return rc; 303} 304 305void fld_client_proc_fini(struct lu_client_fld *fld) 306{ 307 if (fld->lcf_proc_dir) { 308 if (!IS_ERR(fld->lcf_proc_dir)) 309 lprocfs_remove(&fld->lcf_proc_dir); 310 fld->lcf_proc_dir = NULL; 311 } 312} 313#else 314static int fld_client_proc_init(struct lu_client_fld *fld) 315{ 316 return 0; 317} 318 319void fld_client_proc_fini(struct lu_client_fld *fld) 320{ 321 return; 322} 323#endif 324EXPORT_SYMBOL(fld_client_proc_fini); 325 326static inline int hash_is_sane(int hash) 327{ 328 return (hash >= 0 && hash < ARRAY_SIZE(fld_hash)); 329} 330 331int fld_client_init(struct lu_client_fld *fld, 332 const char *prefix, int hash) 333{ 334 int cache_size, cache_threshold; 335 int rc; 336 337 LASSERT(fld != NULL); 338 339 snprintf(fld->lcf_name, sizeof(fld->lcf_name), 340 "cli-%s", prefix); 341 342 if (!hash_is_sane(hash)) { 343 CERROR("%s: Wrong hash function %#x\n", 344 fld->lcf_name, hash); 345 return -EINVAL; 346 } 347 348 fld->lcf_count = 0; 349 spin_lock_init(&fld->lcf_lock); 350 fld->lcf_hash = &fld_hash[hash]; 351 fld->lcf_flags = LUSTRE_FLD_INIT; 352 INIT_LIST_HEAD(&fld->lcf_targets); 353 354 cache_size = FLD_CLIENT_CACHE_SIZE / 355 sizeof(struct fld_cache_entry); 356 357 cache_threshold = cache_size * 358 FLD_CLIENT_CACHE_THRESHOLD / 100; 359 360 fld->lcf_cache = fld_cache_init(fld->lcf_name, 361 cache_size, cache_threshold); 362 if (IS_ERR(fld->lcf_cache)) { 363 rc = PTR_ERR(fld->lcf_cache); 364 fld->lcf_cache = NULL; 365 goto out; 366 } 367 368 rc = fld_client_proc_init(fld); 369 if (rc) 370 goto out; 371out: 372 if (rc) 373 fld_client_fini(fld); 374 else 375 CDEBUG(D_INFO, "%s: Using \"%s\" hash\n", 376 fld->lcf_name, fld->lcf_hash->fh_name); 377 return rc; 378} 379EXPORT_SYMBOL(fld_client_init); 380 381void fld_client_fini(struct lu_client_fld *fld) 382{ 383 struct lu_fld_target *target, *tmp; 384 385 spin_lock(&fld->lcf_lock); 386 list_for_each_entry_safe(target, tmp, 387 &fld->lcf_targets, ft_chain) { 388 fld->lcf_count--; 389 list_del(&target->ft_chain); 390 if (target->ft_exp != NULL) 391 class_export_put(target->ft_exp); 392 OBD_FREE_PTR(target); 393 } 394 spin_unlock(&fld->lcf_lock); 395 396 if (fld->lcf_cache != NULL) { 397 if (!IS_ERR(fld->lcf_cache)) 398 fld_cache_fini(fld->lcf_cache); 399 fld->lcf_cache = NULL; 400 } 401} 402EXPORT_SYMBOL(fld_client_fini); 403 404int fld_client_rpc(struct obd_export *exp, 405 struct lu_seq_range *range, __u32 fld_op) 406{ 407 struct ptlrpc_request *req; 408 struct lu_seq_range *prange; 409 __u32 *op; 410 int rc; 411 struct obd_import *imp; 412 413 LASSERT(exp != NULL); 414 415 imp = class_exp2cliimp(exp); 416 req = ptlrpc_request_alloc_pack(imp, &RQF_FLD_QUERY, LUSTRE_MDS_VERSION, 417 FLD_QUERY); 418 if (req == NULL) 419 return -ENOMEM; 420 421 op = req_capsule_client_get(&req->rq_pill, &RMF_FLD_OPC); 422 *op = fld_op; 423 424 prange = req_capsule_client_get(&req->rq_pill, &RMF_FLD_MDFLD); 425 *prange = *range; 426 427 ptlrpc_request_set_replen(req); 428 req->rq_request_portal = FLD_REQUEST_PORTAL; 429 ptlrpc_at_set_req_timeout(req); 430 431 if (fld_op == FLD_LOOKUP && 432 imp->imp_connect_flags_orig & OBD_CONNECT_MDS_MDS) 433 req->rq_allow_replay = 1; 434 435 if (fld_op != FLD_LOOKUP) 436 mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); 437 fld_enter_request(&exp->exp_obd->u.cli); 438 rc = ptlrpc_queue_wait(req); 439 fld_exit_request(&exp->exp_obd->u.cli); 440 if (fld_op != FLD_LOOKUP) 441 mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); 442 if (rc) 443 goto out_req; 444 445 prange = req_capsule_server_get(&req->rq_pill, &RMF_FLD_MDFLD); 446 if (prange == NULL) { 447 rc = -EFAULT; 448 goto out_req; 449 } 450 *range = *prange; 451out_req: 452 ptlrpc_req_finished(req); 453 return rc; 454} 455 456int fld_client_lookup(struct lu_client_fld *fld, u64 seq, u32 *mds, 457 __u32 flags, const struct lu_env *env) 458{ 459 struct lu_seq_range res = { 0 }; 460 struct lu_fld_target *target; 461 int rc; 462 463 fld->lcf_flags |= LUSTRE_FLD_RUN; 464 465 rc = fld_cache_lookup(fld->lcf_cache, seq, &res); 466 if (rc == 0) { 467 *mds = res.lsr_index; 468 return 0; 469 } 470 471 /* Can not find it in the cache */ 472 target = fld_client_get_target(fld, seq); 473 LASSERT(target != NULL); 474 475 CDEBUG(D_INFO, "%s: Lookup fld entry (seq: %#llx) on target %s (idx %llu)\n", 476 fld->lcf_name, seq, fld_target_name(target), target->ft_idx); 477 478 res.lsr_start = seq; 479 fld_range_set_type(&res, flags); 480 rc = fld_client_rpc(target->ft_exp, &res, FLD_LOOKUP); 481 482 if (rc == 0) { 483 *mds = res.lsr_index; 484 485 fld_cache_insert(fld->lcf_cache, &res); 486 } 487 return rc; 488} 489EXPORT_SYMBOL(fld_client_lookup); 490 491void fld_client_flush(struct lu_client_fld *fld) 492{ 493 fld_cache_flush(fld->lcf_cache); 494} 495EXPORT_SYMBOL(fld_client_flush); 496 497static int __init fld_mod_init(void) 498{ 499 fld_type_proc_dir = lprocfs_register(LUSTRE_FLD_NAME, 500 proc_lustre_root, 501 NULL, NULL); 502 return PTR_ERR_OR_ZERO(fld_type_proc_dir); 503} 504 505static void __exit fld_mod_exit(void) 506{ 507 if (fld_type_proc_dir != NULL && !IS_ERR(fld_type_proc_dir)) { 508 lprocfs_remove(&fld_type_proc_dir); 509 fld_type_proc_dir = NULL; 510 } 511} 512 513MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>"); 514MODULE_DESCRIPTION("Lustre FLD"); 515MODULE_LICENSE("GPL"); 516 517module_init(fld_mod_init) 518module_exit(fld_mod_exit) 519