1/* 2 * Copyright (c) 2006 Oracle. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the 8 * OpenIB.org BSD license below: 9 * 10 * Redistribution and use in source and binary forms, with or 11 * without modification, are permitted provided that the following 12 * conditions are met: 13 * 14 * - Redistributions of source code must retain the above 15 * copyright notice, this list of conditions and the following 16 * disclaimer. 17 * 18 * - Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials 21 * provided with the distribution. 22 * 23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 30 * SOFTWARE. 31 * 32 */ 33#include <linux/kernel.h> 34#include <linux/slab.h> 35#include <linux/rculist.h> 36#include <linux/llist.h> 37 38#include "rds.h" 39#include "ib.h" 40 41static DEFINE_PER_CPU(unsigned long, clean_list_grace); 42#define CLEAN_LIST_BUSY_BIT 0 43 44/* 45 * This is stored as mr->r_trans_private. 46 */ 47struct rds_ib_mr { 48 struct rds_ib_device *device; 49 struct rds_ib_mr_pool *pool; 50 struct ib_fmr *fmr; 51 52 struct llist_node llnode; 53 54 /* unmap_list is for freeing */ 55 struct list_head unmap_list; 56 unsigned int remap_count; 57 58 struct scatterlist *sg; 59 unsigned int sg_len; 60 u64 *dma; 61 int sg_dma_len; 62}; 63 64/* 65 * Our own little FMR pool 66 */ 67struct rds_ib_mr_pool { 68 struct mutex flush_lock; /* serialize fmr invalidate */ 69 struct delayed_work flush_worker; /* flush worker */ 70 71 atomic_t item_count; /* total # of MRs */ 72 atomic_t dirty_count; /* # dirty of MRs */ 73 74 struct llist_head drop_list; /* MRs that have reached their max_maps limit */ 75 struct llist_head free_list; /* unused MRs */ 76 struct llist_head clean_list; /* global unused & unamapped MRs */ 77 wait_queue_head_t flush_wait; 78 79 atomic_t free_pinned; /* memory pinned by free MRs */ 80 unsigned long max_items; 81 unsigned long max_items_soft; 82 unsigned long max_free_pinned; 83 struct ib_fmr_attr fmr_attr; 84}; 85 86static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all, struct rds_ib_mr **); 87static void rds_ib_teardown_mr(struct rds_ib_mr *ibmr); 88static void rds_ib_mr_pool_flush_worker(struct work_struct *work); 89 90static struct rds_ib_device *rds_ib_get_device(__be32 ipaddr) 91{ 92 struct rds_ib_device *rds_ibdev; 93 struct rds_ib_ipaddr *i_ipaddr; 94 95 rcu_read_lock(); 96 list_for_each_entry_rcu(rds_ibdev, &rds_ib_devices, list) { 97 list_for_each_entry_rcu(i_ipaddr, &rds_ibdev->ipaddr_list, list) { 98 if (i_ipaddr->ipaddr == ipaddr) { 99 atomic_inc(&rds_ibdev->refcount); 100 rcu_read_unlock(); 101 return rds_ibdev; 102 } 103 } 104 } 105 rcu_read_unlock(); 106 107 return NULL; 108} 109 110static int rds_ib_add_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr) 111{ 112 struct rds_ib_ipaddr *i_ipaddr; 113 114 i_ipaddr = kmalloc(sizeof *i_ipaddr, GFP_KERNEL); 115 if (!i_ipaddr) 116 return -ENOMEM; 117 118 i_ipaddr->ipaddr = ipaddr; 119 120 spin_lock_irq(&rds_ibdev->spinlock); 121 list_add_tail_rcu(&i_ipaddr->list, &rds_ibdev->ipaddr_list); 122 spin_unlock_irq(&rds_ibdev->spinlock); 123 124 return 0; 125} 126 127static void rds_ib_remove_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr) 128{ 129 struct rds_ib_ipaddr *i_ipaddr; 130 struct rds_ib_ipaddr *to_free = NULL; 131 132 133 spin_lock_irq(&rds_ibdev->spinlock); 134 list_for_each_entry_rcu(i_ipaddr, &rds_ibdev->ipaddr_list, list) { 135 if (i_ipaddr->ipaddr == ipaddr) { 136 list_del_rcu(&i_ipaddr->list); 137 to_free = i_ipaddr; 138 break; 139 } 140 } 141 spin_unlock_irq(&rds_ibdev->spinlock); 142 143 if (to_free) { 144 synchronize_rcu(); 145 kfree(to_free); 146 } 147} 148 149int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr) 150{ 151 struct rds_ib_device *rds_ibdev_old; 152 153 rds_ibdev_old = rds_ib_get_device(ipaddr); 154 if (rds_ibdev_old) { 155 rds_ib_remove_ipaddr(rds_ibdev_old, ipaddr); 156 rds_ib_dev_put(rds_ibdev_old); 157 } 158 159 return rds_ib_add_ipaddr(rds_ibdev, ipaddr); 160} 161 162void rds_ib_add_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn) 163{ 164 struct rds_ib_connection *ic = conn->c_transport_data; 165 166 /* conn was previously on the nodev_conns_list */ 167 spin_lock_irq(&ib_nodev_conns_lock); 168 BUG_ON(list_empty(&ib_nodev_conns)); 169 BUG_ON(list_empty(&ic->ib_node)); 170 list_del(&ic->ib_node); 171 172 spin_lock(&rds_ibdev->spinlock); 173 list_add_tail(&ic->ib_node, &rds_ibdev->conn_list); 174 spin_unlock(&rds_ibdev->spinlock); 175 spin_unlock_irq(&ib_nodev_conns_lock); 176 177 ic->rds_ibdev = rds_ibdev; 178 atomic_inc(&rds_ibdev->refcount); 179} 180 181void rds_ib_remove_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn) 182{ 183 struct rds_ib_connection *ic = conn->c_transport_data; 184 185 /* place conn on nodev_conns_list */ 186 spin_lock(&ib_nodev_conns_lock); 187 188 spin_lock_irq(&rds_ibdev->spinlock); 189 BUG_ON(list_empty(&ic->ib_node)); 190 list_del(&ic->ib_node); 191 spin_unlock_irq(&rds_ibdev->spinlock); 192 193 list_add_tail(&ic->ib_node, &ib_nodev_conns); 194 195 spin_unlock(&ib_nodev_conns_lock); 196 197 ic->rds_ibdev = NULL; 198 rds_ib_dev_put(rds_ibdev); 199} 200 201void rds_ib_destroy_nodev_conns(void) 202{ 203 struct rds_ib_connection *ic, *_ic; 204 LIST_HEAD(tmp_list); 205 206 /* avoid calling conn_destroy with irqs off */ 207 spin_lock_irq(&ib_nodev_conns_lock); 208 list_splice(&ib_nodev_conns, &tmp_list); 209 spin_unlock_irq(&ib_nodev_conns_lock); 210 211 list_for_each_entry_safe(ic, _ic, &tmp_list, ib_node) 212 rds_conn_destroy(ic->conn); 213} 214 215struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev) 216{ 217 struct rds_ib_mr_pool *pool; 218 219 pool = kzalloc(sizeof(*pool), GFP_KERNEL); 220 if (!pool) 221 return ERR_PTR(-ENOMEM); 222 223 init_llist_head(&pool->free_list); 224 init_llist_head(&pool->drop_list); 225 init_llist_head(&pool->clean_list); 226 mutex_init(&pool->flush_lock); 227 init_waitqueue_head(&pool->flush_wait); 228 INIT_DELAYED_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker); 229 230 pool->fmr_attr.max_pages = fmr_message_size; 231 pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps; 232 pool->fmr_attr.page_shift = PAGE_SHIFT; 233 pool->max_free_pinned = rds_ibdev->max_fmrs * fmr_message_size / 4; 234 235 /* We never allow more than max_items MRs to be allocated. 236 * When we exceed more than max_items_soft, we start freeing 237 * items more aggressively. 238 * Make sure that max_items > max_items_soft > max_items / 2 239 */ 240 pool->max_items_soft = rds_ibdev->max_fmrs * 3 / 4; 241 pool->max_items = rds_ibdev->max_fmrs; 242 243 return pool; 244} 245 246void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_connection *iinfo) 247{ 248 struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool; 249 250 iinfo->rdma_mr_max = pool->max_items; 251 iinfo->rdma_mr_size = pool->fmr_attr.max_pages; 252} 253 254void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *pool) 255{ 256 cancel_delayed_work_sync(&pool->flush_worker); 257 rds_ib_flush_mr_pool(pool, 1, NULL); 258 WARN_ON(atomic_read(&pool->item_count)); 259 WARN_ON(atomic_read(&pool->free_pinned)); 260 kfree(pool); 261} 262 263static inline struct rds_ib_mr *rds_ib_reuse_fmr(struct rds_ib_mr_pool *pool) 264{ 265 struct rds_ib_mr *ibmr = NULL; 266 struct llist_node *ret; 267 unsigned long *flag; 268 269 preempt_disable(); 270 flag = &__get_cpu_var(clean_list_grace); 271 set_bit(CLEAN_LIST_BUSY_BIT, flag); 272 ret = llist_del_first(&pool->clean_list); 273 if (ret) 274 ibmr = llist_entry(ret, struct rds_ib_mr, llnode); 275 276 clear_bit(CLEAN_LIST_BUSY_BIT, flag); 277 preempt_enable(); 278 return ibmr; 279} 280 281static inline void wait_clean_list_grace(void) 282{ 283 int cpu; 284 unsigned long *flag; 285 286 for_each_online_cpu(cpu) { 287 flag = &per_cpu(clean_list_grace, cpu); 288 while (test_bit(CLEAN_LIST_BUSY_BIT, flag)) 289 cpu_relax(); 290 } 291} 292 293static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev) 294{ 295 struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool; 296 struct rds_ib_mr *ibmr = NULL; 297 int err = 0, iter = 0; 298 299 if (atomic_read(&pool->dirty_count) >= pool->max_items / 10) 300 schedule_delayed_work(&pool->flush_worker, 10); 301 302 while (1) { 303 ibmr = rds_ib_reuse_fmr(pool); 304 if (ibmr) 305 return ibmr; 306 307 /* No clean MRs - now we have the choice of either 308 * allocating a fresh MR up to the limit imposed by the 309 * driver, or flush any dirty unused MRs. 310 * We try to avoid stalling in the send path if possible, 311 * so we allocate as long as we're allowed to. 312 * 313 * We're fussy with enforcing the FMR limit, though. If the driver 314 * tells us we can't use more than N fmrs, we shouldn't start 315 * arguing with it */ 316 if (atomic_inc_return(&pool->item_count) <= pool->max_items) 317 break; 318 319 atomic_dec(&pool->item_count); 320 321 if (++iter > 2) { 322 rds_ib_stats_inc(s_ib_rdma_mr_pool_depleted); 323 return ERR_PTR(-EAGAIN); 324 } 325 326 /* We do have some empty MRs. Flush them out. */ 327 rds_ib_stats_inc(s_ib_rdma_mr_pool_wait); 328 rds_ib_flush_mr_pool(pool, 0, &ibmr); 329 if (ibmr) 330 return ibmr; 331 } 332 333 ibmr = kzalloc_node(sizeof(*ibmr), GFP_KERNEL, rdsibdev_to_node(rds_ibdev)); 334 if (!ibmr) { 335 err = -ENOMEM; 336 goto out_no_cigar; 337 } 338 339 memset(ibmr, 0, sizeof(*ibmr)); 340 341 ibmr->fmr = ib_alloc_fmr(rds_ibdev->pd, 342 (IB_ACCESS_LOCAL_WRITE | 343 IB_ACCESS_REMOTE_READ | 344 IB_ACCESS_REMOTE_WRITE| 345 IB_ACCESS_REMOTE_ATOMIC), 346 &pool->fmr_attr); 347 if (IS_ERR(ibmr->fmr)) { 348 err = PTR_ERR(ibmr->fmr); 349 ibmr->fmr = NULL; 350 printk(KERN_WARNING "RDS/IB: ib_alloc_fmr failed (err=%d)\n", err); 351 goto out_no_cigar; 352 } 353 354 rds_ib_stats_inc(s_ib_rdma_mr_alloc); 355 return ibmr; 356 357out_no_cigar: 358 if (ibmr) { 359 if (ibmr->fmr) 360 ib_dealloc_fmr(ibmr->fmr); 361 kfree(ibmr); 362 } 363 atomic_dec(&pool->item_count); 364 return ERR_PTR(err); 365} 366 367static int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibmr, 368 struct scatterlist *sg, unsigned int nents) 369{ 370 struct ib_device *dev = rds_ibdev->dev; 371 struct scatterlist *scat = sg; 372 u64 io_addr = 0; 373 u64 *dma_pages; 374 u32 len; 375 int page_cnt, sg_dma_len; 376 int i, j; 377 int ret; 378 379 sg_dma_len = ib_dma_map_sg(dev, sg, nents, 380 DMA_BIDIRECTIONAL); 381 if (unlikely(!sg_dma_len)) { 382 printk(KERN_WARNING "RDS/IB: dma_map_sg failed!\n"); 383 return -EBUSY; 384 } 385 386 len = 0; 387 page_cnt = 0; 388 389 for (i = 0; i < sg_dma_len; ++i) { 390 unsigned int dma_len = ib_sg_dma_len(dev, &scat[i]); 391 u64 dma_addr = ib_sg_dma_address(dev, &scat[i]); 392 393 if (dma_addr & ~PAGE_MASK) { 394 if (i > 0) 395 return -EINVAL; 396 else 397 ++page_cnt; 398 } 399 if ((dma_addr + dma_len) & ~PAGE_MASK) { 400 if (i < sg_dma_len - 1) 401 return -EINVAL; 402 else 403 ++page_cnt; 404 } 405 406 len += dma_len; 407 } 408 409 page_cnt += len >> PAGE_SHIFT; 410 if (page_cnt > fmr_message_size) 411 return -EINVAL; 412 413 dma_pages = kmalloc_node(sizeof(u64) * page_cnt, GFP_ATOMIC, 414 rdsibdev_to_node(rds_ibdev)); 415 if (!dma_pages) 416 return -ENOMEM; 417 418 page_cnt = 0; 419 for (i = 0; i < sg_dma_len; ++i) { 420 unsigned int dma_len = ib_sg_dma_len(dev, &scat[i]); 421 u64 dma_addr = ib_sg_dma_address(dev, &scat[i]); 422 423 for (j = 0; j < dma_len; j += PAGE_SIZE) 424 dma_pages[page_cnt++] = 425 (dma_addr & PAGE_MASK) + j; 426 } 427 428 ret = ib_map_phys_fmr(ibmr->fmr, 429 dma_pages, page_cnt, io_addr); 430 if (ret) 431 goto out; 432 433 /* Success - we successfully remapped the MR, so we can 434 * safely tear down the old mapping. */ 435 rds_ib_teardown_mr(ibmr); 436 437 ibmr->sg = scat; 438 ibmr->sg_len = nents; 439 ibmr->sg_dma_len = sg_dma_len; 440 ibmr->remap_count++; 441 442 rds_ib_stats_inc(s_ib_rdma_mr_used); 443 ret = 0; 444 445out: 446 kfree(dma_pages); 447 448 return ret; 449} 450 451void rds_ib_sync_mr(void *trans_private, int direction) 452{ 453 struct rds_ib_mr *ibmr = trans_private; 454 struct rds_ib_device *rds_ibdev = ibmr->device; 455 456 switch (direction) { 457 case DMA_FROM_DEVICE: 458 ib_dma_sync_sg_for_cpu(rds_ibdev->dev, ibmr->sg, 459 ibmr->sg_dma_len, DMA_BIDIRECTIONAL); 460 break; 461 case DMA_TO_DEVICE: 462 ib_dma_sync_sg_for_device(rds_ibdev->dev, ibmr->sg, 463 ibmr->sg_dma_len, DMA_BIDIRECTIONAL); 464 break; 465 } 466} 467 468static void __rds_ib_teardown_mr(struct rds_ib_mr *ibmr) 469{ 470 struct rds_ib_device *rds_ibdev = ibmr->device; 471 472 if (ibmr->sg_dma_len) { 473 ib_dma_unmap_sg(rds_ibdev->dev, 474 ibmr->sg, ibmr->sg_len, 475 DMA_BIDIRECTIONAL); 476 ibmr->sg_dma_len = 0; 477 } 478 479 /* Release the s/g list */ 480 if (ibmr->sg_len) { 481 unsigned int i; 482 483 for (i = 0; i < ibmr->sg_len; ++i) { 484 struct page *page = sg_page(&ibmr->sg[i]); 485 486 /* FIXME we need a way to tell a r/w MR 487 * from a r/o MR */ 488 BUG_ON(irqs_disabled()); 489 set_page_dirty(page); 490 put_page(page); 491 } 492 kfree(ibmr->sg); 493 494 ibmr->sg = NULL; 495 ibmr->sg_len = 0; 496 } 497} 498 499static void rds_ib_teardown_mr(struct rds_ib_mr *ibmr) 500{ 501 unsigned int pinned = ibmr->sg_len; 502 503 __rds_ib_teardown_mr(ibmr); 504 if (pinned) { 505 struct rds_ib_device *rds_ibdev = ibmr->device; 506 struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool; 507 508 atomic_sub(pinned, &pool->free_pinned); 509 } 510} 511 512static inline unsigned int rds_ib_flush_goal(struct rds_ib_mr_pool *pool, int free_all) 513{ 514 unsigned int item_count; 515 516 item_count = atomic_read(&pool->item_count); 517 if (free_all) 518 return item_count; 519 520 return 0; 521} 522 523/* 524 * given an llist of mrs, put them all into the list_head for more processing 525 */ 526static void llist_append_to_list(struct llist_head *llist, struct list_head *list) 527{ 528 struct rds_ib_mr *ibmr; 529 struct llist_node *node; 530 struct llist_node *next; 531 532 node = llist_del_all(llist); 533 while (node) { 534 next = node->next; 535 ibmr = llist_entry(node, struct rds_ib_mr, llnode); 536 list_add_tail(&ibmr->unmap_list, list); 537 node = next; 538 } 539} 540 541/* 542 * this takes a list head of mrs and turns it into linked llist nodes 543 * of clusters. Each cluster has linked llist nodes of 544 * MR_CLUSTER_SIZE mrs that are ready for reuse. 545 */ 546static void list_to_llist_nodes(struct rds_ib_mr_pool *pool, 547 struct list_head *list, 548 struct llist_node **nodes_head, 549 struct llist_node **nodes_tail) 550{ 551 struct rds_ib_mr *ibmr; 552 struct llist_node *cur = NULL; 553 struct llist_node **next = nodes_head; 554 555 list_for_each_entry(ibmr, list, unmap_list) { 556 cur = &ibmr->llnode; 557 *next = cur; 558 next = &cur->next; 559 } 560 *next = NULL; 561 *nodes_tail = cur; 562} 563 564/* 565 * Flush our pool of MRs. 566 * At a minimum, all currently unused MRs are unmapped. 567 * If the number of MRs allocated exceeds the limit, we also try 568 * to free as many MRs as needed to get back to this limit. 569 */ 570static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, 571 int free_all, struct rds_ib_mr **ibmr_ret) 572{ 573 struct rds_ib_mr *ibmr, *next; 574 struct llist_node *clean_nodes; 575 struct llist_node *clean_tail; 576 LIST_HEAD(unmap_list); 577 LIST_HEAD(fmr_list); 578 unsigned long unpinned = 0; 579 unsigned int nfreed = 0, ncleaned = 0, free_goal; 580 int ret = 0; 581 582 rds_ib_stats_inc(s_ib_rdma_mr_pool_flush); 583 584 if (ibmr_ret) { 585 DEFINE_WAIT(wait); 586 while(!mutex_trylock(&pool->flush_lock)) { 587 ibmr = rds_ib_reuse_fmr(pool); 588 if (ibmr) { 589 *ibmr_ret = ibmr; 590 finish_wait(&pool->flush_wait, &wait); 591 goto out_nolock; 592 } 593 594 prepare_to_wait(&pool->flush_wait, &wait, 595 TASK_UNINTERRUPTIBLE); 596 if (llist_empty(&pool->clean_list)) 597 schedule(); 598 599 ibmr = rds_ib_reuse_fmr(pool); 600 if (ibmr) { 601 *ibmr_ret = ibmr; 602 finish_wait(&pool->flush_wait, &wait); 603 goto out_nolock; 604 } 605 } 606 finish_wait(&pool->flush_wait, &wait); 607 } else 608 mutex_lock(&pool->flush_lock); 609 610 if (ibmr_ret) { 611 ibmr = rds_ib_reuse_fmr(pool); 612 if (ibmr) { 613 *ibmr_ret = ibmr; 614 goto out; 615 } 616 } 617 618 /* Get the list of all MRs to be dropped. Ordering matters - 619 * we want to put drop_list ahead of free_list. 620 */ 621 llist_append_to_list(&pool->drop_list, &unmap_list); 622 llist_append_to_list(&pool->free_list, &unmap_list); 623 if (free_all) 624 llist_append_to_list(&pool->clean_list, &unmap_list); 625 626 free_goal = rds_ib_flush_goal(pool, free_all); 627 628 if (list_empty(&unmap_list)) 629 goto out; 630 631 /* String all ib_mr's onto one list and hand them to ib_unmap_fmr */ 632 list_for_each_entry(ibmr, &unmap_list, unmap_list) 633 list_add(&ibmr->fmr->list, &fmr_list); 634 635 ret = ib_unmap_fmr(&fmr_list); 636 if (ret) 637 printk(KERN_WARNING "RDS/IB: ib_unmap_fmr failed (err=%d)\n", ret); 638 639 /* Now we can destroy the DMA mapping and unpin any pages */ 640 list_for_each_entry_safe(ibmr, next, &unmap_list, unmap_list) { 641 unpinned += ibmr->sg_len; 642 __rds_ib_teardown_mr(ibmr); 643 if (nfreed < free_goal || ibmr->remap_count >= pool->fmr_attr.max_maps) { 644 rds_ib_stats_inc(s_ib_rdma_mr_free); 645 list_del(&ibmr->unmap_list); 646 ib_dealloc_fmr(ibmr->fmr); 647 kfree(ibmr); 648 nfreed++; 649 } 650 ncleaned++; 651 } 652 653 if (!list_empty(&unmap_list)) { 654 /* we have to make sure that none of the things we're about 655 * to put on the clean list would race with other cpus trying 656 * to pull items off. The llist would explode if we managed to 657 * remove something from the clean list and then add it back again 658 * while another CPU was spinning on that same item in llist_del_first. 659 * 660 * This is pretty unlikely, but just in case wait for an llist grace period 661 * here before adding anything back into the clean list. 662 */ 663 wait_clean_list_grace(); 664 665 list_to_llist_nodes(pool, &unmap_list, &clean_nodes, &clean_tail); 666 if (ibmr_ret) 667 *ibmr_ret = llist_entry(clean_nodes, struct rds_ib_mr, llnode); 668 669 /* more than one entry in llist nodes */ 670 if (clean_nodes->next) 671 llist_add_batch(clean_nodes->next, clean_tail, &pool->clean_list); 672 673 } 674 675 atomic_sub(unpinned, &pool->free_pinned); 676 atomic_sub(ncleaned, &pool->dirty_count); 677 atomic_sub(nfreed, &pool->item_count); 678 679out: 680 mutex_unlock(&pool->flush_lock); 681 if (waitqueue_active(&pool->flush_wait)) 682 wake_up(&pool->flush_wait); 683out_nolock: 684 return ret; 685} 686 687static void rds_ib_mr_pool_flush_worker(struct work_struct *work) 688{ 689 struct rds_ib_mr_pool *pool = container_of(work, struct rds_ib_mr_pool, flush_worker.work); 690 691 rds_ib_flush_mr_pool(pool, 0, NULL); 692} 693 694void rds_ib_free_mr(void *trans_private, int invalidate) 695{ 696 struct rds_ib_mr *ibmr = trans_private; 697 struct rds_ib_device *rds_ibdev = ibmr->device; 698 struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool; 699 700 rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg_len); 701 702 /* Return it to the pool's free list */ 703 if (ibmr->remap_count >= pool->fmr_attr.max_maps) 704 llist_add(&ibmr->llnode, &pool->drop_list); 705 else 706 llist_add(&ibmr->llnode, &pool->free_list); 707 708 atomic_add(ibmr->sg_len, &pool->free_pinned); 709 atomic_inc(&pool->dirty_count); 710 711 /* If we've pinned too many pages, request a flush */ 712 if (atomic_read(&pool->free_pinned) >= pool->max_free_pinned || 713 atomic_read(&pool->dirty_count) >= pool->max_items / 10) 714 schedule_delayed_work(&pool->flush_worker, 10); 715 716 if (invalidate) { 717 if (likely(!in_interrupt())) { 718 rds_ib_flush_mr_pool(pool, 0, NULL); 719 } else { 720 /* We get here if the user created a MR marked 721 * as use_once and invalidate at the same time. */ 722 schedule_delayed_work(&pool->flush_worker, 10); 723 } 724 } 725 726 rds_ib_dev_put(rds_ibdev); 727} 728 729void rds_ib_flush_mrs(void) 730{ 731 struct rds_ib_device *rds_ibdev; 732 733 down_read(&rds_ib_devices_lock); 734 list_for_each_entry(rds_ibdev, &rds_ib_devices, list) { 735 struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool; 736 737 if (pool) 738 rds_ib_flush_mr_pool(pool, 0, NULL); 739 } 740 up_read(&rds_ib_devices_lock); 741} 742 743void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents, 744 struct rds_sock *rs, u32 *key_ret) 745{ 746 struct rds_ib_device *rds_ibdev; 747 struct rds_ib_mr *ibmr = NULL; 748 int ret; 749 750 rds_ibdev = rds_ib_get_device(rs->rs_bound_addr); 751 if (!rds_ibdev) { 752 ret = -ENODEV; 753 goto out; 754 } 755 756 if (!rds_ibdev->mr_pool) { 757 ret = -ENODEV; 758 goto out; 759 } 760 761 ibmr = rds_ib_alloc_fmr(rds_ibdev); 762 if (IS_ERR(ibmr)) 763 return ibmr; 764 765 ret = rds_ib_map_fmr(rds_ibdev, ibmr, sg, nents); 766 if (ret == 0) 767 *key_ret = ibmr->fmr->rkey; 768 else 769 printk(KERN_WARNING "RDS/IB: map_fmr failed (errno=%d)\n", ret); 770 771 ibmr->device = rds_ibdev; 772 rds_ibdev = NULL; 773 774 out: 775 if (ret) { 776 if (ibmr) 777 rds_ib_free_mr(ibmr, 0); 778 ibmr = ERR_PTR(ret); 779 } 780 if (rds_ibdev) 781 rds_ib_dev_put(rds_ibdev); 782 return ibmr; 783} 784 785