send.c revision 1cc2228c599f173d77000a250bf0541294e1a7be
1/* 2 * Copyright (c) 2006 Oracle. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the 8 * OpenIB.org BSD license below: 9 * 10 * Redistribution and use in source and binary forms, with or 11 * without modification, are permitted provided that the following 12 * conditions are met: 13 * 14 * - Redistributions of source code must retain the above 15 * copyright notice, this list of conditions and the following 16 * disclaimer. 17 * 18 * - Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials 21 * provided with the distribution. 22 * 23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 30 * SOFTWARE. 31 * 32 */ 33#include <linux/kernel.h> 34#include <linux/gfp.h> 35#include <net/sock.h> 36#include <linux/in.h> 37#include <linux/list.h> 38 39#include "rds.h" 40 41/* When transmitting messages in rds_send_xmit, we need to emerge from 42 * time to time and briefly release the CPU. Otherwise the softlock watchdog 43 * will kick our shin. 44 * Also, it seems fairer to not let one busy connection stall all the 45 * others. 46 * 47 * send_batch_count is the number of times we'll loop in send_xmit. Setting 48 * it to 0 will restore the old behavior (where we looped until we had 49 * drained the queue). 50 */ 51static int send_batch_count = 64; 52module_param(send_batch_count, int, 0444); 53MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue"); 54 55/* 56 * Reset the send state. Caller must hold c_send_lock when calling here. 57 */ 58void rds_send_reset(struct rds_connection *conn) 59{ 60 struct rds_message *rm, *tmp; 61 unsigned long flags; 62 63 spin_lock_irqsave(&conn->c_send_lock, flags); 64 if (conn->c_xmit_rm) { 65 rm = conn->c_xmit_rm; 66 conn->c_xmit_rm = NULL; 67 /* Tell the user the RDMA op is no longer mapped by the 68 * transport. This isn't entirely true (it's flushed out 69 * independently) but as the connection is down, there's 70 * no ongoing RDMA to/from that memory */ 71 rds_message_unmapped(rm); 72 spin_unlock_irqrestore(&conn->c_send_lock, flags); 73 74 rds_message_put(rm); 75 } else { 76 spin_unlock_irqrestore(&conn->c_send_lock, flags); 77 } 78 79 conn->c_xmit_sg = 0; 80 conn->c_xmit_hdr_off = 0; 81 conn->c_xmit_data_off = 0; 82 conn->c_xmit_atomic_sent = 0; 83 conn->c_xmit_rdma_sent = 0; 84 conn->c_xmit_data_sent = 0; 85 86 conn->c_map_queued = 0; 87 88 conn->c_unacked_packets = rds_sysctl_max_unacked_packets; 89 conn->c_unacked_bytes = rds_sysctl_max_unacked_bytes; 90 91 /* Mark messages as retransmissions, and move them to the send q */ 92 spin_lock_irqsave(&conn->c_lock, flags); 93 list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { 94 set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); 95 set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags); 96 } 97 list_splice_init(&conn->c_retrans, &conn->c_send_queue); 98 spin_unlock_irqrestore(&conn->c_lock, flags); 99} 100 101/* 102 * We're making the concious trade-off here to only send one message 103 * down the connection at a time. 104 * Pro: 105 * - tx queueing is a simple fifo list 106 * - reassembly is optional and easily done by transports per conn 107 * - no per flow rx lookup at all, straight to the socket 108 * - less per-frag memory and wire overhead 109 * Con: 110 * - queued acks can be delayed behind large messages 111 * Depends: 112 * - small message latency is higher behind queued large messages 113 * - large message latency isn't starved by intervening small sends 114 */ 115int rds_send_xmit(struct rds_connection *conn) 116{ 117 struct rds_message *rm; 118 unsigned long flags; 119 unsigned int tmp; 120 struct scatterlist *sg; 121 int ret = 0; 122 int gen = 0; 123 LIST_HEAD(to_be_dropped); 124 125restart: 126 if (!rds_conn_up(conn)) 127 goto out; 128 129 /* 130 * sendmsg calls here after having queued its message on the send 131 * queue. We only have one task feeding the connection at a time. If 132 * another thread is already feeding the queue then we back off. This 133 * avoids blocking the caller and trading per-connection data between 134 * caches per message. 135 */ 136 if (!spin_trylock_irqsave(&conn->c_send_lock, flags)) { 137 rds_stats_inc(s_send_lock_contention); 138 ret = -ENOMEM; 139 goto out; 140 } 141 atomic_inc(&conn->c_senders); 142 143 if (conn->c_trans->xmit_prepare) 144 conn->c_trans->xmit_prepare(conn); 145 146 gen = atomic_inc_return(&conn->c_send_generation); 147 148 /* 149 * spin trying to push headers and data down the connection until 150 * the connection doesn't make forward progress. 151 */ 152 while (1) { 153 154 rm = conn->c_xmit_rm; 155 156 /* 157 * If between sending messages, we can send a pending congestion 158 * map update. 159 */ 160 if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) { 161 rm = rds_cong_update_alloc(conn); 162 if (IS_ERR(rm)) { 163 ret = PTR_ERR(rm); 164 break; 165 } 166 rm->data.op_active = 1; 167 168 conn->c_xmit_rm = rm; 169 } 170 171 /* 172 * If not already working on one, grab the next message. 173 * 174 * c_xmit_rm holds a ref while we're sending this message down 175 * the connction. We can use this ref while holding the 176 * send_sem.. rds_send_reset() is serialized with it. 177 */ 178 if (!rm) { 179 unsigned int len; 180 181 spin_lock(&conn->c_lock); 182 183 if (!list_empty(&conn->c_send_queue)) { 184 rm = list_entry(conn->c_send_queue.next, 185 struct rds_message, 186 m_conn_item); 187 rds_message_addref(rm); 188 189 /* 190 * Move the message from the send queue to the retransmit 191 * list right away. 192 */ 193 list_move_tail(&rm->m_conn_item, &conn->c_retrans); 194 } 195 196 spin_unlock(&conn->c_lock); 197 198 if (!rm) 199 break; 200 201 /* Unfortunately, the way Infiniband deals with 202 * RDMA to a bad MR key is by moving the entire 203 * queue pair to error state. We cold possibly 204 * recover from that, but right now we drop the 205 * connection. 206 * Therefore, we never retransmit messages with RDMA ops. 207 */ 208 if (rm->rdma.op_active && 209 test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) { 210 spin_lock(&conn->c_lock); 211 if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) 212 list_move(&rm->m_conn_item, &to_be_dropped); 213 spin_unlock(&conn->c_lock); 214 continue; 215 } 216 217 /* Require an ACK every once in a while */ 218 len = ntohl(rm->m_inc.i_hdr.h_len); 219 if (conn->c_unacked_packets == 0 || 220 conn->c_unacked_bytes < len) { 221 __set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); 222 223 conn->c_unacked_packets = rds_sysctl_max_unacked_packets; 224 conn->c_unacked_bytes = rds_sysctl_max_unacked_bytes; 225 rds_stats_inc(s_send_ack_required); 226 } else { 227 conn->c_unacked_bytes -= len; 228 conn->c_unacked_packets--; 229 } 230 231 conn->c_xmit_rm = rm; 232 } 233 234 /* The transport either sends the whole rdma or none of it */ 235 if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) { 236 rm->m_final_op = &rm->rdma; 237 ret = conn->c_trans->xmit_rdma(conn, &rm->rdma); 238 if (ret) 239 break; 240 conn->c_xmit_rdma_sent = 1; 241 242 /* The transport owns the mapped memory for now. 243 * You can't unmap it while it's on the send queue */ 244 set_bit(RDS_MSG_MAPPED, &rm->m_flags); 245 } 246 247 if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { 248 rm->m_final_op = &rm->atomic; 249 ret = conn->c_trans->xmit_atomic(conn, &rm->atomic); 250 if (ret) 251 break; 252 conn->c_xmit_atomic_sent = 1; 253 254 /* The transport owns the mapped memory for now. 255 * You can't unmap it while it's on the send queue */ 256 set_bit(RDS_MSG_MAPPED, &rm->m_flags); 257 } 258 259 /* 260 * A number of cases require an RDS header to be sent 261 * even if there is no data. 262 * We permit 0-byte sends; rds-ping depends on this. 263 * However, if there are exclusively attached silent ops, 264 * we skip the hdr/data send, to enable silent operation. 265 */ 266 if (rm->data.op_nents == 0) { 267 int ops_present; 268 int all_ops_are_silent = 1; 269 270 ops_present = (rm->atomic.op_active || rm->rdma.op_active); 271 if (rm->atomic.op_active && !rm->atomic.op_silent) 272 all_ops_are_silent = 0; 273 if (rm->rdma.op_active && !rm->rdma.op_silent) 274 all_ops_are_silent = 0; 275 276 if (ops_present && all_ops_are_silent 277 && !rm->m_rdma_cookie) 278 rm->data.op_active = 0; 279 } 280 281 if (rm->data.op_active && !conn->c_xmit_data_sent) { 282 rm->m_final_op = &rm->data; 283 ret = conn->c_trans->xmit(conn, rm, 284 conn->c_xmit_hdr_off, 285 conn->c_xmit_sg, 286 conn->c_xmit_data_off); 287 if (ret <= 0) 288 break; 289 290 if (conn->c_xmit_hdr_off < sizeof(struct rds_header)) { 291 tmp = min_t(int, ret, 292 sizeof(struct rds_header) - 293 conn->c_xmit_hdr_off); 294 conn->c_xmit_hdr_off += tmp; 295 ret -= tmp; 296 } 297 298 sg = &rm->data.op_sg[conn->c_xmit_sg]; 299 while (ret) { 300 tmp = min_t(int, ret, sg->length - 301 conn->c_xmit_data_off); 302 conn->c_xmit_data_off += tmp; 303 ret -= tmp; 304 if (conn->c_xmit_data_off == sg->length) { 305 conn->c_xmit_data_off = 0; 306 sg++; 307 conn->c_xmit_sg++; 308 BUG_ON(ret != 0 && 309 conn->c_xmit_sg == rm->data.op_nents); 310 } 311 } 312 313 if (conn->c_xmit_hdr_off == sizeof(struct rds_header) && 314 (conn->c_xmit_sg == rm->data.op_nents)) 315 conn->c_xmit_data_sent = 1; 316 } 317 318 /* 319 * A rm will only take multiple times through this loop 320 * if there is a data op. Thus, if the data is sent (or there was 321 * none), then we're done with the rm. 322 */ 323 if (!rm->data.op_active || conn->c_xmit_data_sent) { 324 conn->c_xmit_rm = NULL; 325 conn->c_xmit_sg = 0; 326 conn->c_xmit_hdr_off = 0; 327 conn->c_xmit_data_off = 0; 328 conn->c_xmit_rdma_sent = 0; 329 conn->c_xmit_atomic_sent = 0; 330 conn->c_xmit_data_sent = 0; 331 332 rds_message_put(rm); 333 } 334 } 335 336 if (conn->c_trans->xmit_complete) 337 conn->c_trans->xmit_complete(conn); 338 339 /* 340 * We might be racing with another sender who queued a message but 341 * backed off on noticing that we held the c_send_lock. If we check 342 * for queued messages after dropping the sem then either we'll 343 * see the queued message or the queuer will get the sem. If we 344 * notice the queued message then we trigger an immediate retry. 345 * 346 * We need to be careful only to do this when we stopped processing 347 * the send queue because it was empty. It's the only way we 348 * stop processing the loop when the transport hasn't taken 349 * responsibility for forward progress. 350 */ 351 spin_unlock_irqrestore(&conn->c_send_lock, flags); 352 353 /* Nuke any messages we decided not to retransmit. */ 354 if (!list_empty(&to_be_dropped)) { 355 /* irqs on here, so we can put(), unlike above */ 356 list_for_each_entry(rm, &to_be_dropped, m_conn_item) 357 rds_message_put(rm); 358 rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED); 359 } 360 361 atomic_dec(&conn->c_senders); 362 363 /* 364 * Other senders will see we have c_send_lock and exit. We 365 * need to recheck the send queue and race again for c_send_lock 366 * to make sure messages don't just sit on the send queue, if 367 * somebody hasn't already beat us into the loop. 368 * 369 * If the transport cannot continue (i.e ret != 0), then it must 370 * call us when more room is available, such as from the tx 371 * completion handler. 372 */ 373 if (ret == 0) { 374 smp_mb(); 375 if (!list_empty(&conn->c_send_queue)) { 376 rds_stats_inc(s_send_lock_queue_raced); 377 if (gen == atomic_read(&conn->c_send_generation)) { 378 goto restart; 379 } 380 } 381 } 382out: 383 return ret; 384} 385 386static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm) 387{ 388 u32 len = be32_to_cpu(rm->m_inc.i_hdr.h_len); 389 390 assert_spin_locked(&rs->rs_lock); 391 392 BUG_ON(rs->rs_snd_bytes < len); 393 rs->rs_snd_bytes -= len; 394 395 if (rs->rs_snd_bytes == 0) 396 rds_stats_inc(s_send_queue_empty); 397} 398 399static inline int rds_send_is_acked(struct rds_message *rm, u64 ack, 400 is_acked_func is_acked) 401{ 402 if (is_acked) 403 return is_acked(rm, ack); 404 return be64_to_cpu(rm->m_inc.i_hdr.h_sequence) <= ack; 405} 406 407/* 408 * Returns true if there are no messages on the send and retransmit queues 409 * which have a sequence number greater than or equal to the given sequence 410 * number. 411 */ 412int rds_send_acked_before(struct rds_connection *conn, u64 seq) 413{ 414 struct rds_message *rm, *tmp; 415 int ret = 1; 416 417 spin_lock(&conn->c_lock); 418 419 list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { 420 if (be64_to_cpu(rm->m_inc.i_hdr.h_sequence) < seq) 421 ret = 0; 422 break; 423 } 424 425 list_for_each_entry_safe(rm, tmp, &conn->c_send_queue, m_conn_item) { 426 if (be64_to_cpu(rm->m_inc.i_hdr.h_sequence) < seq) 427 ret = 0; 428 break; 429 } 430 431 spin_unlock(&conn->c_lock); 432 433 return ret; 434} 435 436/* 437 * This is pretty similar to what happens below in the ACK 438 * handling code - except that we call here as soon as we get 439 * the IB send completion on the RDMA op and the accompanying 440 * message. 441 */ 442void rds_rdma_send_complete(struct rds_message *rm, int status) 443{ 444 struct rds_sock *rs = NULL; 445 struct rm_rdma_op *ro; 446 struct rds_notifier *notifier; 447 unsigned long flags; 448 449 spin_lock_irqsave(&rm->m_rs_lock, flags); 450 451 ro = &rm->rdma; 452 if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) && 453 ro->op_active && ro->op_notify && ro->op_notifier) { 454 notifier = ro->op_notifier; 455 rs = rm->m_rs; 456 sock_hold(rds_rs_to_sk(rs)); 457 458 notifier->n_status = status; 459 spin_lock(&rs->rs_lock); 460 list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); 461 spin_unlock(&rs->rs_lock); 462 463 ro->op_notifier = NULL; 464 } 465 466 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 467 468 if (rs) { 469 rds_wake_sk_sleep(rs); 470 sock_put(rds_rs_to_sk(rs)); 471 } 472} 473EXPORT_SYMBOL_GPL(rds_rdma_send_complete); 474 475/* 476 * Just like above, except looks at atomic op 477 */ 478void rds_atomic_send_complete(struct rds_message *rm, int status) 479{ 480 struct rds_sock *rs = NULL; 481 struct rm_atomic_op *ao; 482 struct rds_notifier *notifier; 483 unsigned long flags; 484 485 spin_lock_irqsave(&rm->m_rs_lock, flags); 486 487 ao = &rm->atomic; 488 if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) 489 && ao->op_active && ao->op_notify && ao->op_notifier) { 490 notifier = ao->op_notifier; 491 rs = rm->m_rs; 492 sock_hold(rds_rs_to_sk(rs)); 493 494 notifier->n_status = status; 495 spin_lock(&rs->rs_lock); 496 list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); 497 spin_unlock(&rs->rs_lock); 498 499 ao->op_notifier = NULL; 500 } 501 502 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 503 504 if (rs) { 505 rds_wake_sk_sleep(rs); 506 sock_put(rds_rs_to_sk(rs)); 507 } 508} 509EXPORT_SYMBOL_GPL(rds_atomic_send_complete); 510 511/* 512 * This is the same as rds_rdma_send_complete except we 513 * don't do any locking - we have all the ingredients (message, 514 * socket, socket lock) and can just move the notifier. 515 */ 516static inline void 517__rds_send_complete(struct rds_sock *rs, struct rds_message *rm, int status) 518{ 519 struct rm_rdma_op *ro; 520 struct rm_atomic_op *ao; 521 522 ro = &rm->rdma; 523 if (ro->op_active && ro->op_notify && ro->op_notifier) { 524 ro->op_notifier->n_status = status; 525 list_add_tail(&ro->op_notifier->n_list, &rs->rs_notify_queue); 526 ro->op_notifier = NULL; 527 } 528 529 ao = &rm->atomic; 530 if (ao->op_active && ao->op_notify && ao->op_notifier) { 531 ao->op_notifier->n_status = status; 532 list_add_tail(&ao->op_notifier->n_list, &rs->rs_notify_queue); 533 ao->op_notifier = NULL; 534 } 535 536 /* No need to wake the app - caller does this */ 537} 538 539/* 540 * This is called from the IB send completion when we detect 541 * a RDMA operation that failed with remote access error. 542 * So speed is not an issue here. 543 */ 544struct rds_message *rds_send_get_message(struct rds_connection *conn, 545 struct rm_rdma_op *op) 546{ 547 struct rds_message *rm, *tmp, *found = NULL; 548 unsigned long flags; 549 550 spin_lock_irqsave(&conn->c_lock, flags); 551 552 list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { 553 if (&rm->rdma == op) { 554 atomic_inc(&rm->m_refcount); 555 found = rm; 556 goto out; 557 } 558 } 559 560 list_for_each_entry_safe(rm, tmp, &conn->c_send_queue, m_conn_item) { 561 if (&rm->rdma == op) { 562 atomic_inc(&rm->m_refcount); 563 found = rm; 564 break; 565 } 566 } 567 568out: 569 spin_unlock_irqrestore(&conn->c_lock, flags); 570 571 return found; 572} 573EXPORT_SYMBOL_GPL(rds_send_get_message); 574 575/* 576 * This removes messages from the socket's list if they're on it. The list 577 * argument must be private to the caller, we must be able to modify it 578 * without locks. The messages must have a reference held for their 579 * position on the list. This function will drop that reference after 580 * removing the messages from the 'messages' list regardless of if it found 581 * the messages on the socket list or not. 582 */ 583void rds_send_remove_from_sock(struct list_head *messages, int status) 584{ 585 unsigned long flags; 586 struct rds_sock *rs = NULL; 587 struct rds_message *rm; 588 589 while (!list_empty(messages)) { 590 int was_on_sock = 0; 591 592 rm = list_entry(messages->next, struct rds_message, 593 m_conn_item); 594 list_del_init(&rm->m_conn_item); 595 596 /* 597 * If we see this flag cleared then we're *sure* that someone 598 * else beat us to removing it from the sock. If we race 599 * with their flag update we'll get the lock and then really 600 * see that the flag has been cleared. 601 * 602 * The message spinlock makes sure nobody clears rm->m_rs 603 * while we're messing with it. It does not prevent the 604 * message from being removed from the socket, though. 605 */ 606 spin_lock_irqsave(&rm->m_rs_lock, flags); 607 if (!test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) 608 goto unlock_and_drop; 609 610 if (rs != rm->m_rs) { 611 if (rs) { 612 rds_wake_sk_sleep(rs); 613 sock_put(rds_rs_to_sk(rs)); 614 } 615 rs = rm->m_rs; 616 sock_hold(rds_rs_to_sk(rs)); 617 } 618 spin_lock(&rs->rs_lock); 619 620 if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) { 621 struct rm_rdma_op *ro = &rm->rdma; 622 struct rds_notifier *notifier; 623 624 list_del_init(&rm->m_sock_item); 625 rds_send_sndbuf_remove(rs, rm); 626 627 if (ro->op_active && ro->op_notifier && 628 (ro->op_notify || (ro->op_recverr && status))) { 629 notifier = ro->op_notifier; 630 list_add_tail(¬ifier->n_list, 631 &rs->rs_notify_queue); 632 if (!notifier->n_status) 633 notifier->n_status = status; 634 rm->rdma.op_notifier = NULL; 635 } 636 was_on_sock = 1; 637 rm->m_rs = NULL; 638 } 639 spin_unlock(&rs->rs_lock); 640 641unlock_and_drop: 642 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 643 rds_message_put(rm); 644 if (was_on_sock) 645 rds_message_put(rm); 646 } 647 648 if (rs) { 649 rds_wake_sk_sleep(rs); 650 sock_put(rds_rs_to_sk(rs)); 651 } 652} 653 654/* 655 * Transports call here when they've determined that the receiver queued 656 * messages up to, and including, the given sequence number. Messages are 657 * moved to the retrans queue when rds_send_xmit picks them off the send 658 * queue. This means that in the TCP case, the message may not have been 659 * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked 660 * checks the RDS_MSG_HAS_ACK_SEQ bit. 661 * 662 * XXX It's not clear to me how this is safely serialized with socket 663 * destruction. Maybe it should bail if it sees SOCK_DEAD. 664 */ 665void rds_send_drop_acked(struct rds_connection *conn, u64 ack, 666 is_acked_func is_acked) 667{ 668 struct rds_message *rm, *tmp; 669 unsigned long flags; 670 LIST_HEAD(list); 671 672 spin_lock_irqsave(&conn->c_lock, flags); 673 674 list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { 675 if (!rds_send_is_acked(rm, ack, is_acked)) 676 break; 677 678 list_move(&rm->m_conn_item, &list); 679 clear_bit(RDS_MSG_ON_CONN, &rm->m_flags); 680 } 681 682 /* order flag updates with spin locks */ 683 if (!list_empty(&list)) 684 smp_mb__after_clear_bit(); 685 686 spin_unlock_irqrestore(&conn->c_lock, flags); 687 688 /* now remove the messages from the sock list as needed */ 689 rds_send_remove_from_sock(&list, RDS_RDMA_SUCCESS); 690} 691EXPORT_SYMBOL_GPL(rds_send_drop_acked); 692 693void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) 694{ 695 struct rds_message *rm, *tmp; 696 struct rds_connection *conn; 697 unsigned long flags; 698 LIST_HEAD(list); 699 700 /* get all the messages we're dropping under the rs lock */ 701 spin_lock_irqsave(&rs->rs_lock, flags); 702 703 list_for_each_entry_safe(rm, tmp, &rs->rs_send_queue, m_sock_item) { 704 if (dest && (dest->sin_addr.s_addr != rm->m_daddr || 705 dest->sin_port != rm->m_inc.i_hdr.h_dport)) 706 continue; 707 708 list_move(&rm->m_sock_item, &list); 709 rds_send_sndbuf_remove(rs, rm); 710 clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags); 711 } 712 713 /* order flag updates with the rs lock */ 714 smp_mb__after_clear_bit(); 715 716 spin_unlock_irqrestore(&rs->rs_lock, flags); 717 718 if (list_empty(&list)) 719 return; 720 721 /* Remove the messages from the conn */ 722 list_for_each_entry(rm, &list, m_sock_item) { 723 724 conn = rm->m_inc.i_conn; 725 726 spin_lock_irqsave(&conn->c_lock, flags); 727 /* 728 * Maybe someone else beat us to removing rm from the conn. 729 * If we race with their flag update we'll get the lock and 730 * then really see that the flag has been cleared. 731 */ 732 if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) { 733 spin_unlock_irqrestore(&conn->c_lock, flags); 734 continue; 735 } 736 list_del_init(&rm->m_conn_item); 737 spin_unlock_irqrestore(&conn->c_lock, flags); 738 739 /* 740 * Couldn't grab m_rs_lock in top loop (lock ordering), 741 * but we can now. 742 */ 743 spin_lock_irqsave(&rm->m_rs_lock, flags); 744 745 spin_lock(&rs->rs_lock); 746 __rds_send_complete(rs, rm, RDS_RDMA_CANCELED); 747 spin_unlock(&rs->rs_lock); 748 749 rm->m_rs = NULL; 750 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 751 752 rds_message_put(rm); 753 } 754 755 rds_wake_sk_sleep(rs); 756 757 while (!list_empty(&list)) { 758 rm = list_entry(list.next, struct rds_message, m_sock_item); 759 list_del_init(&rm->m_sock_item); 760 761 rds_message_wait(rm); 762 rds_message_put(rm); 763 } 764} 765 766/* 767 * we only want this to fire once so we use the callers 'queued'. It's 768 * possible that another thread can race with us and remove the 769 * message from the flow with RDS_CANCEL_SENT_TO. 770 */ 771static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn, 772 struct rds_message *rm, __be16 sport, 773 __be16 dport, int *queued) 774{ 775 unsigned long flags; 776 u32 len; 777 778 if (*queued) 779 goto out; 780 781 len = be32_to_cpu(rm->m_inc.i_hdr.h_len); 782 783 /* this is the only place which holds both the socket's rs_lock 784 * and the connection's c_lock */ 785 spin_lock_irqsave(&rs->rs_lock, flags); 786 787 /* 788 * If there is a little space in sndbuf, we don't queue anything, 789 * and userspace gets -EAGAIN. But poll() indicates there's send 790 * room. This can lead to bad behavior (spinning) if snd_bytes isn't 791 * freed up by incoming acks. So we check the *old* value of 792 * rs_snd_bytes here to allow the last msg to exceed the buffer, 793 * and poll() now knows no more data can be sent. 794 */ 795 if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) { 796 rs->rs_snd_bytes += len; 797 798 /* let recv side know we are close to send space exhaustion. 799 * This is probably not the optimal way to do it, as this 800 * means we set the flag on *all* messages as soon as our 801 * throughput hits a certain threshold. 802 */ 803 if (rs->rs_snd_bytes >= rds_sk_sndbuf(rs) / 2) 804 __set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); 805 806 list_add_tail(&rm->m_sock_item, &rs->rs_send_queue); 807 set_bit(RDS_MSG_ON_SOCK, &rm->m_flags); 808 rds_message_addref(rm); 809 rm->m_rs = rs; 810 811 /* The code ordering is a little weird, but we're 812 trying to minimize the time we hold c_lock */ 813 rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, 0); 814 rm->m_inc.i_conn = conn; 815 rds_message_addref(rm); 816 817 spin_lock(&conn->c_lock); 818 rm->m_inc.i_hdr.h_sequence = cpu_to_be64(conn->c_next_tx_seq++); 819 list_add_tail(&rm->m_conn_item, &conn->c_send_queue); 820 set_bit(RDS_MSG_ON_CONN, &rm->m_flags); 821 spin_unlock(&conn->c_lock); 822 823 rdsdebug("queued msg %p len %d, rs %p bytes %d seq %llu\n", 824 rm, len, rs, rs->rs_snd_bytes, 825 (unsigned long long)be64_to_cpu(rm->m_inc.i_hdr.h_sequence)); 826 827 *queued = 1; 828 } 829 830 spin_unlock_irqrestore(&rs->rs_lock, flags); 831out: 832 return *queued; 833} 834 835/* 836 * rds_message is getting to be quite complicated, and we'd like to allocate 837 * it all in one go. This figures out how big it needs to be up front. 838 */ 839static int rds_rm_size(struct msghdr *msg, int data_len) 840{ 841 struct cmsghdr *cmsg; 842 int size = 0; 843 int cmsg_groups = 0; 844 int retval; 845 846 for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) { 847 if (!CMSG_OK(msg, cmsg)) 848 return -EINVAL; 849 850 if (cmsg->cmsg_level != SOL_RDS) 851 continue; 852 853 switch (cmsg->cmsg_type) { 854 case RDS_CMSG_RDMA_ARGS: 855 cmsg_groups |= 1; 856 retval = rds_rdma_extra_size(CMSG_DATA(cmsg)); 857 if (retval < 0) 858 return retval; 859 size += retval; 860 861 break; 862 863 case RDS_CMSG_RDMA_DEST: 864 case RDS_CMSG_RDMA_MAP: 865 cmsg_groups |= 2; 866 /* these are valid but do no add any size */ 867 break; 868 869 case RDS_CMSG_ATOMIC_CSWP: 870 case RDS_CMSG_ATOMIC_FADD: 871 cmsg_groups |= 1; 872 size += sizeof(struct scatterlist); 873 break; 874 875 default: 876 return -EINVAL; 877 } 878 879 } 880 881 size += ceil(data_len, PAGE_SIZE) * sizeof(struct scatterlist); 882 883 /* Ensure (DEST, MAP) are never used with (ARGS, ATOMIC) */ 884 if (cmsg_groups == 3) 885 return -EINVAL; 886 887 return size; 888} 889 890static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, 891 struct msghdr *msg, int *allocated_mr) 892{ 893 struct cmsghdr *cmsg; 894 int ret = 0; 895 896 for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) { 897 if (!CMSG_OK(msg, cmsg)) 898 return -EINVAL; 899 900 if (cmsg->cmsg_level != SOL_RDS) 901 continue; 902 903 /* As a side effect, RDMA_DEST and RDMA_MAP will set 904 * rm->rdma.m_rdma_cookie and rm->rdma.m_rdma_mr. 905 */ 906 switch (cmsg->cmsg_type) { 907 case RDS_CMSG_RDMA_ARGS: 908 ret = rds_cmsg_rdma_args(rs, rm, cmsg); 909 break; 910 911 case RDS_CMSG_RDMA_DEST: 912 ret = rds_cmsg_rdma_dest(rs, rm, cmsg); 913 break; 914 915 case RDS_CMSG_RDMA_MAP: 916 ret = rds_cmsg_rdma_map(rs, rm, cmsg); 917 if (!ret) 918 *allocated_mr = 1; 919 break; 920 case RDS_CMSG_ATOMIC_CSWP: 921 case RDS_CMSG_ATOMIC_FADD: 922 ret = rds_cmsg_atomic(rs, rm, cmsg); 923 break; 924 925 default: 926 return -EINVAL; 927 } 928 929 if (ret) 930 break; 931 } 932 933 return ret; 934} 935 936int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, 937 size_t payload_len) 938{ 939 struct sock *sk = sock->sk; 940 struct rds_sock *rs = rds_sk_to_rs(sk); 941 struct sockaddr_in *usin = (struct sockaddr_in *)msg->msg_name; 942 __be32 daddr; 943 __be16 dport; 944 struct rds_message *rm = NULL; 945 struct rds_connection *conn; 946 int ret = 0; 947 int queued = 0, allocated_mr = 0; 948 int nonblock = msg->msg_flags & MSG_DONTWAIT; 949 long timeo = sock_sndtimeo(sk, nonblock); 950 951 /* Mirror Linux UDP mirror of BSD error message compatibility */ 952 /* XXX: Perhaps MSG_MORE someday */ 953 if (msg->msg_flags & ~(MSG_DONTWAIT | MSG_CMSG_COMPAT)) { 954 printk(KERN_INFO "msg_flags 0x%08X\n", msg->msg_flags); 955 ret = -EOPNOTSUPP; 956 goto out; 957 } 958 959 if (msg->msg_namelen) { 960 /* XXX fail non-unicast destination IPs? */ 961 if (msg->msg_namelen < sizeof(*usin) || usin->sin_family != AF_INET) { 962 ret = -EINVAL; 963 goto out; 964 } 965 daddr = usin->sin_addr.s_addr; 966 dport = usin->sin_port; 967 } else { 968 /* We only care about consistency with ->connect() */ 969 lock_sock(sk); 970 daddr = rs->rs_conn_addr; 971 dport = rs->rs_conn_port; 972 release_sock(sk); 973 } 974 975 /* racing with another thread binding seems ok here */ 976 if (daddr == 0 || rs->rs_bound_addr == 0) { 977 ret = -ENOTCONN; /* XXX not a great errno */ 978 goto out; 979 } 980 981 /* size of rm including all sgs */ 982 ret = rds_rm_size(msg, payload_len); 983 if (ret < 0) 984 goto out; 985 986 rm = rds_message_alloc(ret, GFP_KERNEL); 987 if (!rm) { 988 ret = -ENOMEM; 989 goto out; 990 } 991 992 /* Attach data to the rm */ 993 if (payload_len) { 994 rm->data.op_sg = rds_message_alloc_sgs(rm, ceil(payload_len, PAGE_SIZE)); 995 ret = rds_message_copy_from_user(rm, msg->msg_iov, payload_len); 996 if (ret) 997 goto out; 998 } 999 rm->data.op_active = 1; 1000 1001 rm->m_daddr = daddr; 1002 1003 /* rds_conn_create has a spinlock that runs with IRQ off. 1004 * Caching the conn in the socket helps a lot. */ 1005 if (rs->rs_conn && rs->rs_conn->c_faddr == daddr) 1006 conn = rs->rs_conn; 1007 else { 1008 conn = rds_conn_create_outgoing(rs->rs_bound_addr, daddr, 1009 rs->rs_transport, 1010 sock->sk->sk_allocation); 1011 if (IS_ERR(conn)) { 1012 ret = PTR_ERR(conn); 1013 goto out; 1014 } 1015 rs->rs_conn = conn; 1016 } 1017 1018 /* Parse any control messages the user may have included. */ 1019 ret = rds_cmsg_send(rs, rm, msg, &allocated_mr); 1020 if (ret) 1021 goto out; 1022 1023 if (rm->rdma.op_active && !conn->c_trans->xmit_rdma) { 1024 if (printk_ratelimit()) 1025 printk(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n", 1026 &rm->rdma, conn->c_trans->xmit_rdma); 1027 ret = -EOPNOTSUPP; 1028 goto out; 1029 } 1030 1031 if (rm->atomic.op_active && !conn->c_trans->xmit_atomic) { 1032 if (printk_ratelimit()) 1033 printk(KERN_NOTICE "atomic_op %p conn xmit_atomic %p\n", 1034 &rm->atomic, conn->c_trans->xmit_atomic); 1035 ret = -EOPNOTSUPP; 1036 goto out; 1037 } 1038 1039 /* If the connection is down, trigger a connect. We may 1040 * have scheduled a delayed reconnect however - in this case 1041 * we should not interfere. 1042 */ 1043 if (rds_conn_state(conn) == RDS_CONN_DOWN && 1044 !test_and_set_bit(RDS_RECONNECT_PENDING, &conn->c_flags)) 1045 queue_delayed_work(rds_wq, &conn->c_conn_w, 0); 1046 1047 ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs); 1048 if (ret) { 1049 rs->rs_seen_congestion = 1; 1050 goto out; 1051 } 1052 1053 while (!rds_send_queue_rm(rs, conn, rm, rs->rs_bound_port, 1054 dport, &queued)) { 1055 rds_stats_inc(s_send_queue_full); 1056 /* XXX make sure this is reasonable */ 1057 if (payload_len > rds_sk_sndbuf(rs)) { 1058 ret = -EMSGSIZE; 1059 goto out; 1060 } 1061 if (nonblock) { 1062 ret = -EAGAIN; 1063 goto out; 1064 } 1065 1066 timeo = wait_event_interruptible_timeout(*sk_sleep(sk), 1067 rds_send_queue_rm(rs, conn, rm, 1068 rs->rs_bound_port, 1069 dport, 1070 &queued), 1071 timeo); 1072 rdsdebug("sendmsg woke queued %d timeo %ld\n", queued, timeo); 1073 if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT) 1074 continue; 1075 1076 ret = timeo; 1077 if (ret == 0) 1078 ret = -ETIMEDOUT; 1079 goto out; 1080 } 1081 1082 /* 1083 * By now we've committed to the send. We reuse rds_send_worker() 1084 * to retry sends in the rds thread if the transport asks us to. 1085 */ 1086 rds_stats_inc(s_send_queued); 1087 1088 if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags)) 1089 rds_send_xmit(conn); 1090 1091 rds_message_put(rm); 1092 return payload_len; 1093 1094out: 1095 /* If the user included a RDMA_MAP cmsg, we allocated a MR on the fly. 1096 * If the sendmsg goes through, we keep the MR. If it fails with EAGAIN 1097 * or in any other way, we need to destroy the MR again */ 1098 if (allocated_mr) 1099 rds_rdma_unuse(rs, rds_rdma_cookie_key(rm->m_rdma_cookie), 1); 1100 1101 if (rm) 1102 rds_message_put(rm); 1103 return ret; 1104} 1105 1106/* 1107 * Reply to a ping packet. 1108 */ 1109int 1110rds_send_pong(struct rds_connection *conn, __be16 dport) 1111{ 1112 struct rds_message *rm; 1113 unsigned long flags; 1114 int ret = 0; 1115 1116 rm = rds_message_alloc(0, GFP_ATOMIC); 1117 if (!rm) { 1118 ret = -ENOMEM; 1119 goto out; 1120 } 1121 1122 rm->m_daddr = conn->c_faddr; 1123 rm->data.op_active = 1; 1124 1125 /* If the connection is down, trigger a connect. We may 1126 * have scheduled a delayed reconnect however - in this case 1127 * we should not interfere. 1128 */ 1129 if (rds_conn_state(conn) == RDS_CONN_DOWN && 1130 !test_and_set_bit(RDS_RECONNECT_PENDING, &conn->c_flags)) 1131 queue_delayed_work(rds_wq, &conn->c_conn_w, 0); 1132 1133 ret = rds_cong_wait(conn->c_fcong, dport, 1, NULL); 1134 if (ret) 1135 goto out; 1136 1137 spin_lock_irqsave(&conn->c_lock, flags); 1138 list_add_tail(&rm->m_conn_item, &conn->c_send_queue); 1139 set_bit(RDS_MSG_ON_CONN, &rm->m_flags); 1140 rds_message_addref(rm); 1141 rm->m_inc.i_conn = conn; 1142 1143 rds_message_populate_header(&rm->m_inc.i_hdr, 0, dport, 1144 conn->c_next_tx_seq); 1145 conn->c_next_tx_seq++; 1146 spin_unlock_irqrestore(&conn->c_lock, flags); 1147 1148 rds_stats_inc(s_send_queued); 1149 rds_stats_inc(s_send_pong); 1150 1151 if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags)) 1152 rds_send_xmit(conn); 1153 1154 rds_message_put(rm); 1155 return 0; 1156 1157out: 1158 if (rm) 1159 rds_message_put(rm); 1160 return ret; 1161} 1162