1/* 2 * Copyright (c) 2006 Oracle. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the 8 * OpenIB.org BSD license below: 9 * 10 * Redistribution and use in source and binary forms, with or 11 * without modification, are permitted provided that the following 12 * conditions are met: 13 * 14 * - Redistributions of source code must retain the above 15 * copyright notice, this list of conditions and the following 16 * disclaimer. 17 * 18 * - Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials 21 * provided with the distribution. 22 * 23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 30 * SOFTWARE. 31 * 32 */ 33#include <linux/kernel.h> 34#include <linux/in.h> 35#include <linux/device.h> 36#include <linux/dmapool.h> 37#include <linux/ratelimit.h> 38 39#include "rds.h" 40#include "iw.h" 41 42static void rds_iw_send_rdma_complete(struct rds_message *rm, 43 int wc_status) 44{ 45 int notify_status; 46 47 switch (wc_status) { 48 case IB_WC_WR_FLUSH_ERR: 49 return; 50 51 case IB_WC_SUCCESS: 52 notify_status = RDS_RDMA_SUCCESS; 53 break; 54 55 case IB_WC_REM_ACCESS_ERR: 56 notify_status = RDS_RDMA_REMOTE_ERROR; 57 break; 58 59 default: 60 notify_status = RDS_RDMA_OTHER_ERROR; 61 break; 62 } 63 rds_rdma_send_complete(rm, notify_status); 64} 65 66static void rds_iw_send_unmap_rdma(struct rds_iw_connection *ic, 67 struct rm_rdma_op *op) 68{ 69 if (op->op_mapped) { 70 ib_dma_unmap_sg(ic->i_cm_id->device, 71 op->op_sg, op->op_nents, 72 op->op_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE); 73 op->op_mapped = 0; 74 } 75} 76 77static void rds_iw_send_unmap_rm(struct rds_iw_connection *ic, 78 struct rds_iw_send_work *send, 79 int wc_status) 80{ 81 struct rds_message *rm = send->s_rm; 82 83 rdsdebug("ic %p send %p rm %p\n", ic, send, rm); 84 85 ib_dma_unmap_sg(ic->i_cm_id->device, 86 rm->data.op_sg, rm->data.op_nents, 87 DMA_TO_DEVICE); 88 89 if (rm->rdma.op_active) { 90 rds_iw_send_unmap_rdma(ic, &rm->rdma); 91 92 /* If the user asked for a completion notification on this 93 * message, we can implement three different semantics: 94 * 1. Notify when we received the ACK on the RDS message 95 * that was queued with the RDMA. This provides reliable 96 * notification of RDMA status at the expense of a one-way 97 * packet delay. 98 * 2. Notify when the IB stack gives us the completion event for 99 * the RDMA operation. 100 * 3. Notify when the IB stack gives us the completion event for 101 * the accompanying RDS messages. 102 * Here, we implement approach #3. To implement approach #2, 103 * call rds_rdma_send_complete from the cq_handler. To implement #1, 104 * don't call rds_rdma_send_complete at all, and fall back to the notify 105 * handling in the ACK processing code. 106 * 107 * Note: There's no need to explicitly sync any RDMA buffers using 108 * ib_dma_sync_sg_for_cpu - the completion for the RDMA 109 * operation itself unmapped the RDMA buffers, which takes care 110 * of synching. 111 */ 112 rds_iw_send_rdma_complete(rm, wc_status); 113 114 if (rm->rdma.op_write) 115 rds_stats_add(s_send_rdma_bytes, rm->rdma.op_bytes); 116 else 117 rds_stats_add(s_recv_rdma_bytes, rm->rdma.op_bytes); 118 } 119 120 /* If anyone waited for this message to get flushed out, wake 121 * them up now */ 122 rds_message_unmapped(rm); 123 124 rds_message_put(rm); 125 send->s_rm = NULL; 126} 127 128void rds_iw_send_init_ring(struct rds_iw_connection *ic) 129{ 130 struct rds_iw_send_work *send; 131 u32 i; 132 133 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) { 134 struct ib_sge *sge; 135 136 send->s_rm = NULL; 137 send->s_op = NULL; 138 send->s_mapping = NULL; 139 140 send->s_wr.next = NULL; 141 send->s_wr.wr_id = i; 142 send->s_wr.sg_list = send->s_sge; 143 send->s_wr.num_sge = 1; 144 send->s_wr.opcode = IB_WR_SEND; 145 send->s_wr.send_flags = 0; 146 send->s_wr.ex.imm_data = 0; 147 148 sge = rds_iw_data_sge(ic, send->s_sge); 149 sge->lkey = 0; 150 151 sge = rds_iw_header_sge(ic, send->s_sge); 152 sge->addr = ic->i_send_hdrs_dma + (i * sizeof(struct rds_header)); 153 sge->length = sizeof(struct rds_header); 154 sge->lkey = 0; 155 156 send->s_mr = ib_alloc_fast_reg_mr(ic->i_pd, fastreg_message_size); 157 if (IS_ERR(send->s_mr)) { 158 printk(KERN_WARNING "RDS/IW: ib_alloc_fast_reg_mr failed\n"); 159 break; 160 } 161 162 send->s_page_list = ib_alloc_fast_reg_page_list( 163 ic->i_cm_id->device, fastreg_message_size); 164 if (IS_ERR(send->s_page_list)) { 165 printk(KERN_WARNING "RDS/IW: ib_alloc_fast_reg_page_list failed\n"); 166 break; 167 } 168 } 169} 170 171void rds_iw_send_clear_ring(struct rds_iw_connection *ic) 172{ 173 struct rds_iw_send_work *send; 174 u32 i; 175 176 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) { 177 BUG_ON(!send->s_mr); 178 ib_dereg_mr(send->s_mr); 179 BUG_ON(!send->s_page_list); 180 ib_free_fast_reg_page_list(send->s_page_list); 181 if (send->s_wr.opcode == 0xdead) 182 continue; 183 if (send->s_rm) 184 rds_iw_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR); 185 if (send->s_op) 186 rds_iw_send_unmap_rdma(ic, send->s_op); 187 } 188} 189 190/* 191 * The _oldest/_free ring operations here race cleanly with the alloc/unalloc 192 * operations performed in the send path. As the sender allocs and potentially 193 * unallocs the next free entry in the ring it doesn't alter which is 194 * the next to be freed, which is what this is concerned with. 195 */ 196void rds_iw_send_cq_comp_handler(struct ib_cq *cq, void *context) 197{ 198 struct rds_connection *conn = context; 199 struct rds_iw_connection *ic = conn->c_transport_data; 200 struct ib_wc wc; 201 struct rds_iw_send_work *send; 202 u32 completed; 203 u32 oldest; 204 u32 i; 205 int ret; 206 207 rdsdebug("cq %p conn %p\n", cq, conn); 208 rds_iw_stats_inc(s_iw_tx_cq_call); 209 ret = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP); 210 if (ret) 211 rdsdebug("ib_req_notify_cq send failed: %d\n", ret); 212 213 while (ib_poll_cq(cq, 1, &wc) > 0) { 214 rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n", 215 (unsigned long long)wc.wr_id, wc.status, wc.byte_len, 216 be32_to_cpu(wc.ex.imm_data)); 217 rds_iw_stats_inc(s_iw_tx_cq_event); 218 219 if (wc.status != IB_WC_SUCCESS) { 220 printk(KERN_ERR "WC Error: status = %d opcode = %d\n", wc.status, wc.opcode); 221 break; 222 } 223 224 if (wc.opcode == IB_WC_LOCAL_INV && wc.wr_id == RDS_IW_LOCAL_INV_WR_ID) { 225 ic->i_fastreg_posted = 0; 226 continue; 227 } 228 229 if (wc.opcode == IB_WC_FAST_REG_MR && wc.wr_id == RDS_IW_FAST_REG_WR_ID) { 230 ic->i_fastreg_posted = 1; 231 continue; 232 } 233 234 if (wc.wr_id == RDS_IW_ACK_WR_ID) { 235 if (time_after(jiffies, ic->i_ack_queued + HZ/2)) 236 rds_iw_stats_inc(s_iw_tx_stalled); 237 rds_iw_ack_send_complete(ic); 238 continue; 239 } 240 241 oldest = rds_iw_ring_oldest(&ic->i_send_ring); 242 243 completed = rds_iw_ring_completed(&ic->i_send_ring, wc.wr_id, oldest); 244 245 for (i = 0; i < completed; i++) { 246 send = &ic->i_sends[oldest]; 247 248 /* In the error case, wc.opcode sometimes contains garbage */ 249 switch (send->s_wr.opcode) { 250 case IB_WR_SEND: 251 if (send->s_rm) 252 rds_iw_send_unmap_rm(ic, send, wc.status); 253 break; 254 case IB_WR_FAST_REG_MR: 255 case IB_WR_RDMA_WRITE: 256 case IB_WR_RDMA_READ: 257 case IB_WR_RDMA_READ_WITH_INV: 258 /* Nothing to be done - the SG list will be unmapped 259 * when the SEND completes. */ 260 break; 261 default: 262 printk_ratelimited(KERN_NOTICE 263 "RDS/IW: %s: unexpected opcode 0x%x in WR!\n", 264 __func__, send->s_wr.opcode); 265 break; 266 } 267 268 send->s_wr.opcode = 0xdead; 269 send->s_wr.num_sge = 1; 270 if (time_after(jiffies, send->s_queued + HZ/2)) 271 rds_iw_stats_inc(s_iw_tx_stalled); 272 273 /* If a RDMA operation produced an error, signal this right 274 * away. If we don't, the subsequent SEND that goes with this 275 * RDMA will be canceled with ERR_WFLUSH, and the application 276 * never learn that the RDMA failed. */ 277 if (unlikely(wc.status == IB_WC_REM_ACCESS_ERR && send->s_op)) { 278 struct rds_message *rm; 279 280 rm = rds_send_get_message(conn, send->s_op); 281 if (rm) 282 rds_iw_send_rdma_complete(rm, wc.status); 283 } 284 285 oldest = (oldest + 1) % ic->i_send_ring.w_nr; 286 } 287 288 rds_iw_ring_free(&ic->i_send_ring, completed); 289 290 if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) || 291 test_bit(0, &conn->c_map_queued)) 292 queue_delayed_work(rds_wq, &conn->c_send_w, 0); 293 294 /* We expect errors as the qp is drained during shutdown */ 295 if (wc.status != IB_WC_SUCCESS && rds_conn_up(conn)) { 296 rds_iw_conn_error(conn, 297 "send completion on %pI4 " 298 "had status %u, disconnecting and reconnecting\n", 299 &conn->c_faddr, wc.status); 300 } 301 } 302} 303 304/* 305 * This is the main function for allocating credits when sending 306 * messages. 307 * 308 * Conceptually, we have two counters: 309 * - send credits: this tells us how many WRs we're allowed 310 * to submit without overruning the receiver's queue. For 311 * each SEND WR we post, we decrement this by one. 312 * 313 * - posted credits: this tells us how many WRs we recently 314 * posted to the receive queue. This value is transferred 315 * to the peer as a "credit update" in a RDS header field. 316 * Every time we transmit credits to the peer, we subtract 317 * the amount of transferred credits from this counter. 318 * 319 * It is essential that we avoid situations where both sides have 320 * exhausted their send credits, and are unable to send new credits 321 * to the peer. We achieve this by requiring that we send at least 322 * one credit update to the peer before exhausting our credits. 323 * When new credits arrive, we subtract one credit that is withheld 324 * until we've posted new buffers and are ready to transmit these 325 * credits (see rds_iw_send_add_credits below). 326 * 327 * The RDS send code is essentially single-threaded; rds_send_xmit 328 * grabs c_send_lock to ensure exclusive access to the send ring. 329 * However, the ACK sending code is independent and can race with 330 * message SENDs. 331 * 332 * In the send path, we need to update the counters for send credits 333 * and the counter of posted buffers atomically - when we use the 334 * last available credit, we cannot allow another thread to race us 335 * and grab the posted credits counter. Hence, we have to use a 336 * spinlock to protect the credit counter, or use atomics. 337 * 338 * Spinlocks shared between the send and the receive path are bad, 339 * because they create unnecessary delays. An early implementation 340 * using a spinlock showed a 5% degradation in throughput at some 341 * loads. 342 * 343 * This implementation avoids spinlocks completely, putting both 344 * counters into a single atomic, and updating that atomic using 345 * atomic_add (in the receive path, when receiving fresh credits), 346 * and using atomic_cmpxchg when updating the two counters. 347 */ 348int rds_iw_send_grab_credits(struct rds_iw_connection *ic, 349 u32 wanted, u32 *adv_credits, int need_posted, int max_posted) 350{ 351 unsigned int avail, posted, got = 0, advertise; 352 long oldval, newval; 353 354 *adv_credits = 0; 355 if (!ic->i_flowctl) 356 return wanted; 357 358try_again: 359 advertise = 0; 360 oldval = newval = atomic_read(&ic->i_credits); 361 posted = IB_GET_POST_CREDITS(oldval); 362 avail = IB_GET_SEND_CREDITS(oldval); 363 364 rdsdebug("rds_iw_send_grab_credits(%u): credits=%u posted=%u\n", 365 wanted, avail, posted); 366 367 /* The last credit must be used to send a credit update. */ 368 if (avail && !posted) 369 avail--; 370 371 if (avail < wanted) { 372 struct rds_connection *conn = ic->i_cm_id->context; 373 374 /* Oops, there aren't that many credits left! */ 375 set_bit(RDS_LL_SEND_FULL, &conn->c_flags); 376 got = avail; 377 } else { 378 /* Sometimes you get what you want, lalala. */ 379 got = wanted; 380 } 381 newval -= IB_SET_SEND_CREDITS(got); 382 383 /* 384 * If need_posted is non-zero, then the caller wants 385 * the posted regardless of whether any send credits are 386 * available. 387 */ 388 if (posted && (got || need_posted)) { 389 advertise = min_t(unsigned int, posted, max_posted); 390 newval -= IB_SET_POST_CREDITS(advertise); 391 } 392 393 /* Finally bill everything */ 394 if (atomic_cmpxchg(&ic->i_credits, oldval, newval) != oldval) 395 goto try_again; 396 397 *adv_credits = advertise; 398 return got; 399} 400 401void rds_iw_send_add_credits(struct rds_connection *conn, unsigned int credits) 402{ 403 struct rds_iw_connection *ic = conn->c_transport_data; 404 405 if (credits == 0) 406 return; 407 408 rdsdebug("rds_iw_send_add_credits(%u): current=%u%s\n", 409 credits, 410 IB_GET_SEND_CREDITS(atomic_read(&ic->i_credits)), 411 test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ? ", ll_send_full" : ""); 412 413 atomic_add(IB_SET_SEND_CREDITS(credits), &ic->i_credits); 414 if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags)) 415 queue_delayed_work(rds_wq, &conn->c_send_w, 0); 416 417 WARN_ON(IB_GET_SEND_CREDITS(credits) >= 16384); 418 419 rds_iw_stats_inc(s_iw_rx_credit_updates); 420} 421 422void rds_iw_advertise_credits(struct rds_connection *conn, unsigned int posted) 423{ 424 struct rds_iw_connection *ic = conn->c_transport_data; 425 426 if (posted == 0) 427 return; 428 429 atomic_add(IB_SET_POST_CREDITS(posted), &ic->i_credits); 430 431 /* Decide whether to send an update to the peer now. 432 * If we would send a credit update for every single buffer we 433 * post, we would end up with an ACK storm (ACK arrives, 434 * consumes buffer, we refill the ring, send ACK to remote 435 * advertising the newly posted buffer... ad inf) 436 * 437 * Performance pretty much depends on how often we send 438 * credit updates - too frequent updates mean lots of ACKs. 439 * Too infrequent updates, and the peer will run out of 440 * credits and has to throttle. 441 * For the time being, 16 seems to be a good compromise. 442 */ 443 if (IB_GET_POST_CREDITS(atomic_read(&ic->i_credits)) >= 16) 444 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags); 445} 446 447static inline void 448rds_iw_xmit_populate_wr(struct rds_iw_connection *ic, 449 struct rds_iw_send_work *send, unsigned int pos, 450 unsigned long buffer, unsigned int length, 451 int send_flags) 452{ 453 struct ib_sge *sge; 454 455 WARN_ON(pos != send - ic->i_sends); 456 457 send->s_wr.send_flags = send_flags; 458 send->s_wr.opcode = IB_WR_SEND; 459 send->s_wr.num_sge = 2; 460 send->s_wr.next = NULL; 461 send->s_queued = jiffies; 462 send->s_op = NULL; 463 464 if (length != 0) { 465 sge = rds_iw_data_sge(ic, send->s_sge); 466 sge->addr = buffer; 467 sge->length = length; 468 sge->lkey = rds_iw_local_dma_lkey(ic); 469 470 sge = rds_iw_header_sge(ic, send->s_sge); 471 } else { 472 /* We're sending a packet with no payload. There is only 473 * one SGE */ 474 send->s_wr.num_sge = 1; 475 sge = &send->s_sge[0]; 476 } 477 478 sge->addr = ic->i_send_hdrs_dma + (pos * sizeof(struct rds_header)); 479 sge->length = sizeof(struct rds_header); 480 sge->lkey = rds_iw_local_dma_lkey(ic); 481} 482 483/* 484 * This can be called multiple times for a given message. The first time 485 * we see a message we map its scatterlist into the IB device so that 486 * we can provide that mapped address to the IB scatter gather entries 487 * in the IB work requests. We translate the scatterlist into a series 488 * of work requests that fragment the message. These work requests complete 489 * in order so we pass ownership of the message to the completion handler 490 * once we send the final fragment. 491 * 492 * The RDS core uses the c_send_lock to only enter this function once 493 * per connection. This makes sure that the tx ring alloc/unalloc pairs 494 * don't get out of sync and confuse the ring. 495 */ 496int rds_iw_xmit(struct rds_connection *conn, struct rds_message *rm, 497 unsigned int hdr_off, unsigned int sg, unsigned int off) 498{ 499 struct rds_iw_connection *ic = conn->c_transport_data; 500 struct ib_device *dev = ic->i_cm_id->device; 501 struct rds_iw_send_work *send = NULL; 502 struct rds_iw_send_work *first; 503 struct rds_iw_send_work *prev; 504 struct ib_send_wr *failed_wr; 505 struct scatterlist *scat; 506 u32 pos; 507 u32 i; 508 u32 work_alloc; 509 u32 credit_alloc; 510 u32 posted; 511 u32 adv_credits = 0; 512 int send_flags = 0; 513 int sent; 514 int ret; 515 int flow_controlled = 0; 516 517 BUG_ON(off % RDS_FRAG_SIZE); 518 BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header)); 519 520 /* Fastreg support */ 521 if (rds_rdma_cookie_key(rm->m_rdma_cookie) && !ic->i_fastreg_posted) { 522 ret = -EAGAIN; 523 goto out; 524 } 525 526 /* FIXME we may overallocate here */ 527 if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0) 528 i = 1; 529 else 530 i = ceil(be32_to_cpu(rm->m_inc.i_hdr.h_len), RDS_FRAG_SIZE); 531 532 work_alloc = rds_iw_ring_alloc(&ic->i_send_ring, i, &pos); 533 if (work_alloc == 0) { 534 set_bit(RDS_LL_SEND_FULL, &conn->c_flags); 535 rds_iw_stats_inc(s_iw_tx_ring_full); 536 ret = -ENOMEM; 537 goto out; 538 } 539 540 credit_alloc = work_alloc; 541 if (ic->i_flowctl) { 542 credit_alloc = rds_iw_send_grab_credits(ic, work_alloc, &posted, 0, RDS_MAX_ADV_CREDIT); 543 adv_credits += posted; 544 if (credit_alloc < work_alloc) { 545 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc - credit_alloc); 546 work_alloc = credit_alloc; 547 flow_controlled++; 548 } 549 if (work_alloc == 0) { 550 set_bit(RDS_LL_SEND_FULL, &conn->c_flags); 551 rds_iw_stats_inc(s_iw_tx_throttle); 552 ret = -ENOMEM; 553 goto out; 554 } 555 } 556 557 /* map the message the first time we see it */ 558 if (!ic->i_rm) { 559 /* 560 printk(KERN_NOTICE "rds_iw_xmit prep msg dport=%u flags=0x%x len=%d\n", 561 be16_to_cpu(rm->m_inc.i_hdr.h_dport), 562 rm->m_inc.i_hdr.h_flags, 563 be32_to_cpu(rm->m_inc.i_hdr.h_len)); 564 */ 565 if (rm->data.op_nents) { 566 rm->data.op_count = ib_dma_map_sg(dev, 567 rm->data.op_sg, 568 rm->data.op_nents, 569 DMA_TO_DEVICE); 570 rdsdebug("ic %p mapping rm %p: %d\n", ic, rm, rm->data.op_count); 571 if (rm->data.op_count == 0) { 572 rds_iw_stats_inc(s_iw_tx_sg_mapping_failure); 573 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc); 574 ret = -ENOMEM; /* XXX ? */ 575 goto out; 576 } 577 } else { 578 rm->data.op_count = 0; 579 } 580 581 ic->i_unsignaled_wrs = rds_iw_sysctl_max_unsig_wrs; 582 ic->i_unsignaled_bytes = rds_iw_sysctl_max_unsig_bytes; 583 rds_message_addref(rm); 584 ic->i_rm = rm; 585 586 /* Finalize the header */ 587 if (test_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags)) 588 rm->m_inc.i_hdr.h_flags |= RDS_FLAG_ACK_REQUIRED; 589 if (test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) 590 rm->m_inc.i_hdr.h_flags |= RDS_FLAG_RETRANSMITTED; 591 592 /* If it has a RDMA op, tell the peer we did it. This is 593 * used by the peer to release use-once RDMA MRs. */ 594 if (rm->rdma.op_active) { 595 struct rds_ext_header_rdma ext_hdr; 596 597 ext_hdr.h_rdma_rkey = cpu_to_be32(rm->rdma.op_rkey); 598 rds_message_add_extension(&rm->m_inc.i_hdr, 599 RDS_EXTHDR_RDMA, &ext_hdr, sizeof(ext_hdr)); 600 } 601 if (rm->m_rdma_cookie) { 602 rds_message_add_rdma_dest_extension(&rm->m_inc.i_hdr, 603 rds_rdma_cookie_key(rm->m_rdma_cookie), 604 rds_rdma_cookie_offset(rm->m_rdma_cookie)); 605 } 606 607 /* Note - rds_iw_piggyb_ack clears the ACK_REQUIRED bit, so 608 * we should not do this unless we have a chance of at least 609 * sticking the header into the send ring. Which is why we 610 * should call rds_iw_ring_alloc first. */ 611 rm->m_inc.i_hdr.h_ack = cpu_to_be64(rds_iw_piggyb_ack(ic)); 612 rds_message_make_checksum(&rm->m_inc.i_hdr); 613 614 /* 615 * Update adv_credits since we reset the ACK_REQUIRED bit. 616 */ 617 rds_iw_send_grab_credits(ic, 0, &posted, 1, RDS_MAX_ADV_CREDIT - adv_credits); 618 adv_credits += posted; 619 BUG_ON(adv_credits > 255); 620 } 621 622 send = &ic->i_sends[pos]; 623 first = send; 624 prev = NULL; 625 scat = &rm->data.op_sg[sg]; 626 sent = 0; 627 i = 0; 628 629 /* Sometimes you want to put a fence between an RDMA 630 * READ and the following SEND. 631 * We could either do this all the time 632 * or when requested by the user. Right now, we let 633 * the application choose. 634 */ 635 if (rm->rdma.op_active && rm->rdma.op_fence) 636 send_flags = IB_SEND_FENCE; 637 638 /* 639 * We could be copying the header into the unused tail of the page. 640 * That would need to be changed in the future when those pages might 641 * be mapped userspace pages or page cache pages. So instead we always 642 * use a second sge and our long-lived ring of mapped headers. We send 643 * the header after the data so that the data payload can be aligned on 644 * the receiver. 645 */ 646 647 /* handle a 0-len message */ 648 if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0) { 649 rds_iw_xmit_populate_wr(ic, send, pos, 0, 0, send_flags); 650 goto add_header; 651 } 652 653 /* if there's data reference it with a chain of work reqs */ 654 for (; i < work_alloc && scat != &rm->data.op_sg[rm->data.op_count]; i++) { 655 unsigned int len; 656 657 send = &ic->i_sends[pos]; 658 659 len = min(RDS_FRAG_SIZE, ib_sg_dma_len(dev, scat) - off); 660 rds_iw_xmit_populate_wr(ic, send, pos, 661 ib_sg_dma_address(dev, scat) + off, len, 662 send_flags); 663 664 /* 665 * We want to delay signaling completions just enough to get 666 * the batching benefits but not so much that we create dead time 667 * on the wire. 668 */ 669 if (ic->i_unsignaled_wrs-- == 0) { 670 ic->i_unsignaled_wrs = rds_iw_sysctl_max_unsig_wrs; 671 send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED; 672 } 673 674 ic->i_unsignaled_bytes -= len; 675 if (ic->i_unsignaled_bytes <= 0) { 676 ic->i_unsignaled_bytes = rds_iw_sysctl_max_unsig_bytes; 677 send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED; 678 } 679 680 /* 681 * Always signal the last one if we're stopping due to flow control. 682 */ 683 if (flow_controlled && i == (work_alloc-1)) 684 send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED; 685 686 rdsdebug("send %p wr %p num_sge %u next %p\n", send, 687 &send->s_wr, send->s_wr.num_sge, send->s_wr.next); 688 689 sent += len; 690 off += len; 691 if (off == ib_sg_dma_len(dev, scat)) { 692 scat++; 693 off = 0; 694 } 695 696add_header: 697 /* Tack on the header after the data. The header SGE should already 698 * have been set up to point to the right header buffer. */ 699 memcpy(&ic->i_send_hdrs[pos], &rm->m_inc.i_hdr, sizeof(struct rds_header)); 700 701 if (0) { 702 struct rds_header *hdr = &ic->i_send_hdrs[pos]; 703 704 printk(KERN_NOTICE "send WR dport=%u flags=0x%x len=%d\n", 705 be16_to_cpu(hdr->h_dport), 706 hdr->h_flags, 707 be32_to_cpu(hdr->h_len)); 708 } 709 if (adv_credits) { 710 struct rds_header *hdr = &ic->i_send_hdrs[pos]; 711 712 /* add credit and redo the header checksum */ 713 hdr->h_credit = adv_credits; 714 rds_message_make_checksum(hdr); 715 adv_credits = 0; 716 rds_iw_stats_inc(s_iw_tx_credit_updates); 717 } 718 719 if (prev) 720 prev->s_wr.next = &send->s_wr; 721 prev = send; 722 723 pos = (pos + 1) % ic->i_send_ring.w_nr; 724 } 725 726 /* Account the RDS header in the number of bytes we sent, but just once. 727 * The caller has no concept of fragmentation. */ 728 if (hdr_off == 0) 729 sent += sizeof(struct rds_header); 730 731 /* if we finished the message then send completion owns it */ 732 if (scat == &rm->data.op_sg[rm->data.op_count]) { 733 prev->s_rm = ic->i_rm; 734 prev->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED; 735 ic->i_rm = NULL; 736 } 737 738 if (i < work_alloc) { 739 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc - i); 740 work_alloc = i; 741 } 742 if (ic->i_flowctl && i < credit_alloc) 743 rds_iw_send_add_credits(conn, credit_alloc - i); 744 745 /* XXX need to worry about failed_wr and partial sends. */ 746 failed_wr = &first->s_wr; 747 ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr); 748 rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic, 749 first, &first->s_wr, ret, failed_wr); 750 BUG_ON(failed_wr != &first->s_wr); 751 if (ret) { 752 printk(KERN_WARNING "RDS/IW: ib_post_send to %pI4 " 753 "returned %d\n", &conn->c_faddr, ret); 754 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc); 755 if (prev->s_rm) { 756 ic->i_rm = prev->s_rm; 757 prev->s_rm = NULL; 758 } 759 goto out; 760 } 761 762 ret = sent; 763out: 764 BUG_ON(adv_credits); 765 return ret; 766} 767 768static void rds_iw_build_send_fastreg(struct rds_iw_device *rds_iwdev, struct rds_iw_connection *ic, struct rds_iw_send_work *send, int nent, int len, u64 sg_addr) 769{ 770 BUG_ON(nent > send->s_page_list->max_page_list_len); 771 /* 772 * Perform a WR for the fast_reg_mr. Each individual page 773 * in the sg list is added to the fast reg page list and placed 774 * inside the fast_reg_mr WR. 775 */ 776 send->s_wr.opcode = IB_WR_FAST_REG_MR; 777 send->s_wr.wr.fast_reg.length = len; 778 send->s_wr.wr.fast_reg.rkey = send->s_mr->rkey; 779 send->s_wr.wr.fast_reg.page_list = send->s_page_list; 780 send->s_wr.wr.fast_reg.page_list_len = nent; 781 send->s_wr.wr.fast_reg.page_shift = PAGE_SHIFT; 782 send->s_wr.wr.fast_reg.access_flags = IB_ACCESS_REMOTE_WRITE; 783 send->s_wr.wr.fast_reg.iova_start = sg_addr; 784 785 ib_update_fast_reg_key(send->s_mr, send->s_remap_count++); 786} 787 788int rds_iw_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op) 789{ 790 struct rds_iw_connection *ic = conn->c_transport_data; 791 struct rds_iw_send_work *send = NULL; 792 struct rds_iw_send_work *first; 793 struct rds_iw_send_work *prev; 794 struct ib_send_wr *failed_wr; 795 struct rds_iw_device *rds_iwdev; 796 struct scatterlist *scat; 797 unsigned long len; 798 u64 remote_addr = op->op_remote_addr; 799 u32 pos, fr_pos; 800 u32 work_alloc; 801 u32 i; 802 u32 j; 803 int sent; 804 int ret; 805 int num_sge; 806 807 rds_iwdev = ib_get_client_data(ic->i_cm_id->device, &rds_iw_client); 808 809 /* map the message the first time we see it */ 810 if (!op->op_mapped) { 811 op->op_count = ib_dma_map_sg(ic->i_cm_id->device, 812 op->op_sg, op->op_nents, (op->op_write) ? 813 DMA_TO_DEVICE : DMA_FROM_DEVICE); 814 rdsdebug("ic %p mapping op %p: %d\n", ic, op, op->op_count); 815 if (op->op_count == 0) { 816 rds_iw_stats_inc(s_iw_tx_sg_mapping_failure); 817 ret = -ENOMEM; /* XXX ? */ 818 goto out; 819 } 820 821 op->op_mapped = 1; 822 } 823 824 if (!op->op_write) { 825 /* Alloc space on the send queue for the fastreg */ 826 work_alloc = rds_iw_ring_alloc(&ic->i_send_ring, 1, &fr_pos); 827 if (work_alloc != 1) { 828 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc); 829 rds_iw_stats_inc(s_iw_tx_ring_full); 830 ret = -ENOMEM; 831 goto out; 832 } 833 } 834 835 /* 836 * Instead of knowing how to return a partial rdma read/write we insist that there 837 * be enough work requests to send the entire message. 838 */ 839 i = ceil(op->op_count, rds_iwdev->max_sge); 840 841 work_alloc = rds_iw_ring_alloc(&ic->i_send_ring, i, &pos); 842 if (work_alloc != i) { 843 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc); 844 rds_iw_stats_inc(s_iw_tx_ring_full); 845 ret = -ENOMEM; 846 goto out; 847 } 848 849 send = &ic->i_sends[pos]; 850 if (!op->op_write) { 851 first = prev = &ic->i_sends[fr_pos]; 852 } else { 853 first = send; 854 prev = NULL; 855 } 856 scat = &op->op_sg[0]; 857 sent = 0; 858 num_sge = op->op_count; 859 860 for (i = 0; i < work_alloc && scat != &op->op_sg[op->op_count]; i++) { 861 send->s_wr.send_flags = 0; 862 send->s_queued = jiffies; 863 864 /* 865 * We want to delay signaling completions just enough to get 866 * the batching benefits but not so much that we create dead time on the wire. 867 */ 868 if (ic->i_unsignaled_wrs-- == 0) { 869 ic->i_unsignaled_wrs = rds_iw_sysctl_max_unsig_wrs; 870 send->s_wr.send_flags = IB_SEND_SIGNALED; 871 } 872 873 /* To avoid the need to have the plumbing to invalidate the fastreg_mr used 874 * for local access after RDS is finished with it, using 875 * IB_WR_RDMA_READ_WITH_INV will invalidate it after the read has completed. 876 */ 877 if (op->op_write) 878 send->s_wr.opcode = IB_WR_RDMA_WRITE; 879 else 880 send->s_wr.opcode = IB_WR_RDMA_READ_WITH_INV; 881 882 send->s_wr.wr.rdma.remote_addr = remote_addr; 883 send->s_wr.wr.rdma.rkey = op->op_rkey; 884 send->s_op = op; 885 886 if (num_sge > rds_iwdev->max_sge) { 887 send->s_wr.num_sge = rds_iwdev->max_sge; 888 num_sge -= rds_iwdev->max_sge; 889 } else 890 send->s_wr.num_sge = num_sge; 891 892 send->s_wr.next = NULL; 893 894 if (prev) 895 prev->s_wr.next = &send->s_wr; 896 897 for (j = 0; j < send->s_wr.num_sge && scat != &op->op_sg[op->op_count]; j++) { 898 len = ib_sg_dma_len(ic->i_cm_id->device, scat); 899 900 if (send->s_wr.opcode == IB_WR_RDMA_READ_WITH_INV) 901 send->s_page_list->page_list[j] = ib_sg_dma_address(ic->i_cm_id->device, scat); 902 else { 903 send->s_sge[j].addr = ib_sg_dma_address(ic->i_cm_id->device, scat); 904 send->s_sge[j].length = len; 905 send->s_sge[j].lkey = rds_iw_local_dma_lkey(ic); 906 } 907 908 sent += len; 909 rdsdebug("ic %p sent %d remote_addr %llu\n", ic, sent, remote_addr); 910 remote_addr += len; 911 912 scat++; 913 } 914 915 if (send->s_wr.opcode == IB_WR_RDMA_READ_WITH_INV) { 916 send->s_wr.num_sge = 1; 917 send->s_sge[0].addr = conn->c_xmit_rm->m_rs->rs_user_addr; 918 send->s_sge[0].length = conn->c_xmit_rm->m_rs->rs_user_bytes; 919 send->s_sge[0].lkey = ic->i_sends[fr_pos].s_mr->lkey; 920 } 921 922 rdsdebug("send %p wr %p num_sge %u next %p\n", send, 923 &send->s_wr, send->s_wr.num_sge, send->s_wr.next); 924 925 prev = send; 926 if (++send == &ic->i_sends[ic->i_send_ring.w_nr]) 927 send = ic->i_sends; 928 } 929 930 /* if we finished the message then send completion owns it */ 931 if (scat == &op->op_sg[op->op_count]) 932 first->s_wr.send_flags = IB_SEND_SIGNALED; 933 934 if (i < work_alloc) { 935 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc - i); 936 work_alloc = i; 937 } 938 939 /* On iWARP, local memory access by a remote system (ie, RDMA Read) is not 940 * recommended. Putting the lkey on the wire is a security hole, as it can 941 * allow for memory access to all of memory on the remote system. Some 942 * adapters do not allow using the lkey for this at all. To bypass this use a 943 * fastreg_mr (or possibly a dma_mr) 944 */ 945 if (!op->op_write) { 946 rds_iw_build_send_fastreg(rds_iwdev, ic, &ic->i_sends[fr_pos], 947 op->op_count, sent, conn->c_xmit_rm->m_rs->rs_user_addr); 948 work_alloc++; 949 } 950 951 failed_wr = &first->s_wr; 952 ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr); 953 rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic, 954 first, &first->s_wr, ret, failed_wr); 955 BUG_ON(failed_wr != &first->s_wr); 956 if (ret) { 957 printk(KERN_WARNING "RDS/IW: rdma ib_post_send to %pI4 " 958 "returned %d\n", &conn->c_faddr, ret); 959 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc); 960 goto out; 961 } 962 963out: 964 return ret; 965} 966 967void rds_iw_xmit_complete(struct rds_connection *conn) 968{ 969 struct rds_iw_connection *ic = conn->c_transport_data; 970 971 /* We may have a pending ACK or window update we were unable 972 * to send previously (due to flow control). Try again. */ 973 rds_iw_attempt_ack(ic); 974} 975