send.c revision 5b2366bd2835919e2e6a836e837eab4a9274bd46
1/* 2 * Copyright (c) 2006 Oracle. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the 8 * OpenIB.org BSD license below: 9 * 10 * Redistribution and use in source and binary forms, with or 11 * without modification, are permitted provided that the following 12 * conditions are met: 13 * 14 * - Redistributions of source code must retain the above 15 * copyright notice, this list of conditions and the following 16 * disclaimer. 17 * 18 * - Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials 21 * provided with the distribution. 22 * 23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 30 * SOFTWARE. 31 * 32 */ 33#include <linux/kernel.h> 34#include <linux/gfp.h> 35#include <net/sock.h> 36#include <linux/in.h> 37#include <linux/list.h> 38 39#include "rds.h" 40 41/* When transmitting messages in rds_send_xmit, we need to emerge from 42 * time to time and briefly release the CPU. Otherwise the softlock watchdog 43 * will kick our shin. 44 * Also, it seems fairer to not let one busy connection stall all the 45 * others. 46 * 47 * send_batch_count is the number of times we'll loop in send_xmit. Setting 48 * it to 0 will restore the old behavior (where we looped until we had 49 * drained the queue). 50 */ 51static int send_batch_count = 64; 52module_param(send_batch_count, int, 0444); 53MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue"); 54 55/* 56 * Reset the send state. Caller must hold c_send_lock when calling here. 57 */ 58void rds_send_reset(struct rds_connection *conn) 59{ 60 struct rds_message *rm, *tmp; 61 unsigned long flags; 62 63 if (conn->c_xmit_rm) { 64 /* Tell the user the RDMA op is no longer mapped by the 65 * transport. This isn't entirely true (it's flushed out 66 * independently) but as the connection is down, there's 67 * no ongoing RDMA to/from that memory */ 68 rds_message_unmapped(conn->c_xmit_rm); 69 rds_message_put(conn->c_xmit_rm); 70 conn->c_xmit_rm = NULL; 71 } 72 conn->c_xmit_sg = 0; 73 conn->c_xmit_hdr_off = 0; 74 conn->c_xmit_data_off = 0; 75 conn->c_xmit_atomic_sent = 0; 76 conn->c_xmit_rdma_sent = 0; 77 conn->c_xmit_data_sent = 0; 78 79 conn->c_map_queued = 0; 80 81 conn->c_unacked_packets = rds_sysctl_max_unacked_packets; 82 conn->c_unacked_bytes = rds_sysctl_max_unacked_bytes; 83 84 /* Mark messages as retransmissions, and move them to the send q */ 85 spin_lock_irqsave(&conn->c_lock, flags); 86 list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { 87 set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); 88 set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags); 89 } 90 list_splice_init(&conn->c_retrans, &conn->c_send_queue); 91 spin_unlock_irqrestore(&conn->c_lock, flags); 92} 93 94/* 95 * We're making the concious trade-off here to only send one message 96 * down the connection at a time. 97 * Pro: 98 * - tx queueing is a simple fifo list 99 * - reassembly is optional and easily done by transports per conn 100 * - no per flow rx lookup at all, straight to the socket 101 * - less per-frag memory and wire overhead 102 * Con: 103 * - queued acks can be delayed behind large messages 104 * Depends: 105 * - small message latency is higher behind queued large messages 106 * - large message latency isn't starved by intervening small sends 107 */ 108int rds_send_xmit(struct rds_connection *conn) 109{ 110 struct rds_message *rm; 111 unsigned long flags; 112 unsigned int tmp; 113 unsigned int send_quota = send_batch_count; 114 struct scatterlist *sg; 115 int ret = 0; 116 int was_empty = 0; 117 LIST_HEAD(to_be_dropped); 118 119 /* 120 * sendmsg calls here after having queued its message on the send 121 * queue. We only have one task feeding the connection at a time. If 122 * another thread is already feeding the queue then we back off. This 123 * avoids blocking the caller and trading per-connection data between 124 * caches per message. 125 * 126 * The sem holder will issue a retry if they notice that someone queued 127 * a message after they stopped walking the send queue but before they 128 * dropped the sem. 129 */ 130 if (!mutex_trylock(&conn->c_send_lock)) { 131 rds_stats_inc(s_send_sem_contention); 132 ret = -ENOMEM; 133 goto out; 134 } 135 136 if (conn->c_trans->xmit_prepare) 137 conn->c_trans->xmit_prepare(conn); 138 139 /* 140 * spin trying to push headers and data down the connection until 141 * the connection doesn't make forward progress. 142 */ 143 while (--send_quota) { 144 145 rm = conn->c_xmit_rm; 146 147 /* 148 * If between sending messages, we can send a pending congestion 149 * map update. 150 * 151 * Transports either define a special xmit_cong_map function, 152 * or we allocate a cong_map message and treat it just like any 153 * other send. 154 */ 155 if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) { 156 if (conn->c_trans->xmit_cong_map) { 157 unsigned long map_offset = 0; 158 unsigned long map_bytes = sizeof(struct rds_header) + 159 RDS_CONG_MAP_BYTES; 160 161 while (map_bytes) { 162 ret = conn->c_trans->xmit_cong_map(conn, conn->c_lcong, 163 map_offset); 164 if (ret <= 0) { 165 /* too far down the rabbithole! */ 166 mutex_unlock(&conn->c_send_lock); 167 rds_conn_error(conn, "Cong map xmit failed\n"); 168 goto out; 169 } 170 171 map_offset += ret; 172 map_bytes -= ret; 173 } 174 } else { 175 /* send cong update like a normal rm */ 176 rm = rds_cong_update_alloc(conn); 177 if (IS_ERR(rm)) { 178 ret = PTR_ERR(rm); 179 break; 180 } 181 rm->data.op_active = 1; 182 183 conn->c_xmit_rm = rm; 184 } 185 } 186 187 /* 188 * If not already working on one, grab the next message. 189 * 190 * c_xmit_rm holds a ref while we're sending this message down 191 * the connction. We can use this ref while holding the 192 * send_sem.. rds_send_reset() is serialized with it. 193 */ 194 if (!rm) { 195 unsigned int len; 196 197 spin_lock_irqsave(&conn->c_lock, flags); 198 199 if (!list_empty(&conn->c_send_queue)) { 200 rm = list_entry(conn->c_send_queue.next, 201 struct rds_message, 202 m_conn_item); 203 rds_message_addref(rm); 204 205 /* 206 * Move the message from the send queue to the retransmit 207 * list right away. 208 */ 209 list_move_tail(&rm->m_conn_item, &conn->c_retrans); 210 } 211 212 spin_unlock_irqrestore(&conn->c_lock, flags); 213 214 if (!rm) { 215 was_empty = 1; 216 break; 217 } 218 219 /* Unfortunately, the way Infiniband deals with 220 * RDMA to a bad MR key is by moving the entire 221 * queue pair to error state. We cold possibly 222 * recover from that, but right now we drop the 223 * connection. 224 * Therefore, we never retransmit messages with RDMA ops. 225 */ 226 if (rm->rdma.op_active && 227 test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) { 228 spin_lock_irqsave(&conn->c_lock, flags); 229 if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) 230 list_move(&rm->m_conn_item, &to_be_dropped); 231 spin_unlock_irqrestore(&conn->c_lock, flags); 232 rds_message_put(rm); 233 continue; 234 } 235 236 /* Require an ACK every once in a while */ 237 len = ntohl(rm->m_inc.i_hdr.h_len); 238 if (conn->c_unacked_packets == 0 || 239 conn->c_unacked_bytes < len) { 240 __set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); 241 242 conn->c_unacked_packets = rds_sysctl_max_unacked_packets; 243 conn->c_unacked_bytes = rds_sysctl_max_unacked_bytes; 244 rds_stats_inc(s_send_ack_required); 245 } else { 246 conn->c_unacked_bytes -= len; 247 conn->c_unacked_packets--; 248 } 249 250 conn->c_xmit_rm = rm; 251 } 252 253 if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { 254 ret = conn->c_trans->xmit_atomic(conn, rm); 255 if (ret) 256 break; 257 conn->c_xmit_atomic_sent = 1; 258 /* The transport owns the mapped memory for now. 259 * You can't unmap it while it's on the send queue */ 260 set_bit(RDS_MSG_MAPPED, &rm->m_flags); 261 262 /* 263 * This is evil, muahaha. 264 * We permit 0-byte sends. (rds-ping depends on this.) 265 * BUT if there is an atomic op and no sent data, 266 * we turn off sending the header, to achieve 267 * "silent" atomics. 268 * But see below; RDMA op might toggle this back on! 269 */ 270 if (rm->data.op_nents == 0) 271 rm->data.op_active = 0; 272 } 273 274 /* The transport either sends the whole rdma or none of it */ 275 if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) { 276 ret = conn->c_trans->xmit_rdma(conn, &rm->rdma); 277 if (ret) 278 break; 279 conn->c_xmit_rdma_sent = 1; 280 281 /* rdmas need data sent, even if just the header */ 282 rm->data.op_active = 1; 283 284 /* The transport owns the mapped memory for now. 285 * You can't unmap it while it's on the send queue */ 286 set_bit(RDS_MSG_MAPPED, &rm->m_flags); 287 } 288 289 if (rm->data.op_active && !conn->c_xmit_data_sent) { 290 ret = conn->c_trans->xmit(conn, rm, 291 conn->c_xmit_hdr_off, 292 conn->c_xmit_sg, 293 conn->c_xmit_data_off); 294 if (ret <= 0) 295 break; 296 297 if (conn->c_xmit_hdr_off < sizeof(struct rds_header)) { 298 tmp = min_t(int, ret, 299 sizeof(struct rds_header) - 300 conn->c_xmit_hdr_off); 301 conn->c_xmit_hdr_off += tmp; 302 ret -= tmp; 303 } 304 305 sg = &rm->data.op_sg[conn->c_xmit_sg]; 306 while (ret) { 307 tmp = min_t(int, ret, sg->length - 308 conn->c_xmit_data_off); 309 conn->c_xmit_data_off += tmp; 310 ret -= tmp; 311 if (conn->c_xmit_data_off == sg->length) { 312 conn->c_xmit_data_off = 0; 313 sg++; 314 conn->c_xmit_sg++; 315 BUG_ON(ret != 0 && 316 conn->c_xmit_sg == rm->data.op_nents); 317 } 318 } 319 320 if (conn->c_xmit_hdr_off == sizeof(struct rds_header) && 321 (conn->c_xmit_sg == rm->data.op_nents)) 322 conn->c_xmit_data_sent = 1; 323 } 324 325 /* 326 * A rm will only take multiple times through this loop 327 * if there is a data op. Thus, if the data is sent (or there was 328 * none), then we're done with the rm. 329 */ 330 if (!rm->data.op_active || conn->c_xmit_data_sent) { 331 conn->c_xmit_rm = NULL; 332 conn->c_xmit_sg = 0; 333 conn->c_xmit_hdr_off = 0; 334 conn->c_xmit_data_off = 0; 335 conn->c_xmit_rdma_sent = 0; 336 conn->c_xmit_atomic_sent = 0; 337 conn->c_xmit_data_sent = 0; 338 339 rds_message_put(rm); 340 } 341 } 342 343 /* Nuke any messages we decided not to retransmit. */ 344 if (!list_empty(&to_be_dropped)) 345 rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED); 346 347 if (conn->c_trans->xmit_complete) 348 conn->c_trans->xmit_complete(conn); 349 350 /* 351 * We might be racing with another sender who queued a message but 352 * backed off on noticing that we held the c_send_lock. If we check 353 * for queued messages after dropping the sem then either we'll 354 * see the queued message or the queuer will get the sem. If we 355 * notice the queued message then we trigger an immediate retry. 356 * 357 * We need to be careful only to do this when we stopped processing 358 * the send queue because it was empty. It's the only way we 359 * stop processing the loop when the transport hasn't taken 360 * responsibility for forward progress. 361 */ 362 mutex_unlock(&conn->c_send_lock); 363 364 if (send_quota == 0 && !was_empty) { 365 /* We exhausted the send quota, but there's work left to 366 * do. Return and (re-)schedule the send worker. 367 */ 368 ret = -EAGAIN; 369 } 370 371 if (ret == 0 && was_empty) { 372 /* A simple bit test would be way faster than taking the 373 * spin lock */ 374 spin_lock_irqsave(&conn->c_lock, flags); 375 if (!list_empty(&conn->c_send_queue)) { 376 rds_stats_inc(s_send_sem_queue_raced); 377 ret = -EAGAIN; 378 } 379 spin_unlock_irqrestore(&conn->c_lock, flags); 380 } 381out: 382 return ret; 383} 384 385static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm) 386{ 387 u32 len = be32_to_cpu(rm->m_inc.i_hdr.h_len); 388 389 assert_spin_locked(&rs->rs_lock); 390 391 BUG_ON(rs->rs_snd_bytes < len); 392 rs->rs_snd_bytes -= len; 393 394 if (rs->rs_snd_bytes == 0) 395 rds_stats_inc(s_send_queue_empty); 396} 397 398static inline int rds_send_is_acked(struct rds_message *rm, u64 ack, 399 is_acked_func is_acked) 400{ 401 if (is_acked) 402 return is_acked(rm, ack); 403 return be64_to_cpu(rm->m_inc.i_hdr.h_sequence) <= ack; 404} 405 406/* 407 * Returns true if there are no messages on the send and retransmit queues 408 * which have a sequence number greater than or equal to the given sequence 409 * number. 410 */ 411int rds_send_acked_before(struct rds_connection *conn, u64 seq) 412{ 413 struct rds_message *rm, *tmp; 414 int ret = 1; 415 416 spin_lock(&conn->c_lock); 417 418 list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { 419 if (be64_to_cpu(rm->m_inc.i_hdr.h_sequence) < seq) 420 ret = 0; 421 break; 422 } 423 424 list_for_each_entry_safe(rm, tmp, &conn->c_send_queue, m_conn_item) { 425 if (be64_to_cpu(rm->m_inc.i_hdr.h_sequence) < seq) 426 ret = 0; 427 break; 428 } 429 430 spin_unlock(&conn->c_lock); 431 432 return ret; 433} 434 435/* 436 * This is pretty similar to what happens below in the ACK 437 * handling code - except that we call here as soon as we get 438 * the IB send completion on the RDMA op and the accompanying 439 * message. 440 */ 441void rds_rdma_send_complete(struct rds_message *rm, int status) 442{ 443 struct rds_sock *rs = NULL; 444 struct rm_rdma_op *ro; 445 struct rds_notifier *notifier; 446 unsigned long flags; 447 448 spin_lock_irqsave(&rm->m_rs_lock, flags); 449 450 ro = &rm->rdma; 451 if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) && 452 ro->op_active && ro->op_notify && ro->op_notifier) { 453 notifier = ro->op_notifier; 454 rs = rm->m_rs; 455 sock_hold(rds_rs_to_sk(rs)); 456 457 notifier->n_status = status; 458 spin_lock(&rs->rs_lock); 459 list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); 460 spin_unlock(&rs->rs_lock); 461 462 ro->op_notifier = NULL; 463 } 464 465 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 466 467 if (rs) { 468 rds_wake_sk_sleep(rs); 469 sock_put(rds_rs_to_sk(rs)); 470 } 471} 472EXPORT_SYMBOL_GPL(rds_rdma_send_complete); 473 474/* 475 * Just like above, except looks at atomic op 476 */ 477void rds_atomic_send_complete(struct rds_message *rm, int status) 478{ 479 struct rds_sock *rs = NULL; 480 struct rm_atomic_op *ao; 481 struct rds_notifier *notifier; 482 483 spin_lock(&rm->m_rs_lock); 484 485 ao = &rm->atomic; 486 if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) 487 && ao->op_active && ao->op_notify && ao->op_notifier) { 488 notifier = ao->op_notifier; 489 rs = rm->m_rs; 490 sock_hold(rds_rs_to_sk(rs)); 491 492 notifier->n_status = status; 493 spin_lock(&rs->rs_lock); 494 list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); 495 spin_unlock(&rs->rs_lock); 496 497 ao->op_notifier = NULL; 498 } 499 500 spin_unlock(&rm->m_rs_lock); 501 502 if (rs) { 503 rds_wake_sk_sleep(rs); 504 sock_put(rds_rs_to_sk(rs)); 505 } 506} 507EXPORT_SYMBOL_GPL(rds_atomic_send_complete); 508 509/* 510 * This is the same as rds_rdma_send_complete except we 511 * don't do any locking - we have all the ingredients (message, 512 * socket, socket lock) and can just move the notifier. 513 */ 514static inline void 515__rds_rdma_send_complete(struct rds_sock *rs, struct rds_message *rm, int status) 516{ 517 struct rm_rdma_op *ro; 518 519 ro = &rm->rdma; 520 if (ro->op_active && ro->op_notify && ro->op_notifier) { 521 ro->op_notifier->n_status = status; 522 list_add_tail(&ro->op_notifier->n_list, &rs->rs_notify_queue); 523 ro->op_notifier = NULL; 524 } 525 526 /* No need to wake the app - caller does this */ 527} 528 529/* 530 * This is called from the IB send completion when we detect 531 * a RDMA operation that failed with remote access error. 532 * So speed is not an issue here. 533 */ 534struct rds_message *rds_send_get_message(struct rds_connection *conn, 535 struct rm_rdma_op *op) 536{ 537 struct rds_message *rm, *tmp, *found = NULL; 538 unsigned long flags; 539 540 spin_lock_irqsave(&conn->c_lock, flags); 541 542 list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { 543 if (&rm->rdma == op) { 544 atomic_inc(&rm->m_refcount); 545 found = rm; 546 goto out; 547 } 548 } 549 550 list_for_each_entry_safe(rm, tmp, &conn->c_send_queue, m_conn_item) { 551 if (&rm->rdma == op) { 552 atomic_inc(&rm->m_refcount); 553 found = rm; 554 break; 555 } 556 } 557 558out: 559 spin_unlock_irqrestore(&conn->c_lock, flags); 560 561 return found; 562} 563EXPORT_SYMBOL_GPL(rds_send_get_message); 564 565/* 566 * This removes messages from the socket's list if they're on it. The list 567 * argument must be private to the caller, we must be able to modify it 568 * without locks. The messages must have a reference held for their 569 * position on the list. This function will drop that reference after 570 * removing the messages from the 'messages' list regardless of if it found 571 * the messages on the socket list or not. 572 */ 573void rds_send_remove_from_sock(struct list_head *messages, int status) 574{ 575 unsigned long flags; 576 struct rds_sock *rs = NULL; 577 struct rds_message *rm; 578 579 while (!list_empty(messages)) { 580 int was_on_sock = 0; 581 582 rm = list_entry(messages->next, struct rds_message, 583 m_conn_item); 584 list_del_init(&rm->m_conn_item); 585 586 /* 587 * If we see this flag cleared then we're *sure* that someone 588 * else beat us to removing it from the sock. If we race 589 * with their flag update we'll get the lock and then really 590 * see that the flag has been cleared. 591 * 592 * The message spinlock makes sure nobody clears rm->m_rs 593 * while we're messing with it. It does not prevent the 594 * message from being removed from the socket, though. 595 */ 596 spin_lock_irqsave(&rm->m_rs_lock, flags); 597 if (!test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) 598 goto unlock_and_drop; 599 600 if (rs != rm->m_rs) { 601 if (rs) { 602 rds_wake_sk_sleep(rs); 603 sock_put(rds_rs_to_sk(rs)); 604 } 605 rs = rm->m_rs; 606 sock_hold(rds_rs_to_sk(rs)); 607 } 608 spin_lock(&rs->rs_lock); 609 610 if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) { 611 struct rm_rdma_op *ro = &rm->rdma; 612 struct rds_notifier *notifier; 613 614 list_del_init(&rm->m_sock_item); 615 rds_send_sndbuf_remove(rs, rm); 616 617 if (ro->op_active && ro->op_notifier && 618 (ro->op_notify || (ro->op_recverr && status))) { 619 notifier = ro->op_notifier; 620 list_add_tail(¬ifier->n_list, 621 &rs->rs_notify_queue); 622 if (!notifier->n_status) 623 notifier->n_status = status; 624 rm->rdma.op_notifier = NULL; 625 } 626 was_on_sock = 1; 627 rm->m_rs = NULL; 628 } 629 spin_unlock(&rs->rs_lock); 630 631unlock_and_drop: 632 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 633 rds_message_put(rm); 634 if (was_on_sock) 635 rds_message_put(rm); 636 } 637 638 if (rs) { 639 rds_wake_sk_sleep(rs); 640 sock_put(rds_rs_to_sk(rs)); 641 } 642} 643 644/* 645 * Transports call here when they've determined that the receiver queued 646 * messages up to, and including, the given sequence number. Messages are 647 * moved to the retrans queue when rds_send_xmit picks them off the send 648 * queue. This means that in the TCP case, the message may not have been 649 * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked 650 * checks the RDS_MSG_HAS_ACK_SEQ bit. 651 * 652 * XXX It's not clear to me how this is safely serialized with socket 653 * destruction. Maybe it should bail if it sees SOCK_DEAD. 654 */ 655void rds_send_drop_acked(struct rds_connection *conn, u64 ack, 656 is_acked_func is_acked) 657{ 658 struct rds_message *rm, *tmp; 659 unsigned long flags; 660 LIST_HEAD(list); 661 662 spin_lock_irqsave(&conn->c_lock, flags); 663 664 list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { 665 if (!rds_send_is_acked(rm, ack, is_acked)) 666 break; 667 668 list_move(&rm->m_conn_item, &list); 669 clear_bit(RDS_MSG_ON_CONN, &rm->m_flags); 670 } 671 672 /* order flag updates with spin locks */ 673 if (!list_empty(&list)) 674 smp_mb__after_clear_bit(); 675 676 spin_unlock_irqrestore(&conn->c_lock, flags); 677 678 /* now remove the messages from the sock list as needed */ 679 rds_send_remove_from_sock(&list, RDS_RDMA_SUCCESS); 680} 681EXPORT_SYMBOL_GPL(rds_send_drop_acked); 682 683void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) 684{ 685 struct rds_message *rm, *tmp; 686 struct rds_connection *conn; 687 unsigned long flags; 688 LIST_HEAD(list); 689 690 /* get all the messages we're dropping under the rs lock */ 691 spin_lock_irqsave(&rs->rs_lock, flags); 692 693 list_for_each_entry_safe(rm, tmp, &rs->rs_send_queue, m_sock_item) { 694 if (dest && (dest->sin_addr.s_addr != rm->m_daddr || 695 dest->sin_port != rm->m_inc.i_hdr.h_dport)) 696 continue; 697 698 list_move(&rm->m_sock_item, &list); 699 rds_send_sndbuf_remove(rs, rm); 700 clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags); 701 } 702 703 /* order flag updates with the rs lock */ 704 smp_mb__after_clear_bit(); 705 706 spin_unlock_irqrestore(&rs->rs_lock, flags); 707 708 if (list_empty(&list)) 709 return; 710 711 /* Remove the messages from the conn */ 712 list_for_each_entry(rm, &list, m_sock_item) { 713 714 conn = rm->m_inc.i_conn; 715 716 spin_lock_irqsave(&conn->c_lock, flags); 717 /* 718 * Maybe someone else beat us to removing rm from the conn. 719 * If we race with their flag update we'll get the lock and 720 * then really see that the flag has been cleared. 721 */ 722 if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) { 723 spin_unlock_irqrestore(&conn->c_lock, flags); 724 continue; 725 } 726 list_del_init(&rm->m_conn_item); 727 spin_unlock_irqrestore(&conn->c_lock, flags); 728 729 /* 730 * Couldn't grab m_rs_lock in top loop (lock ordering), 731 * but we can now. 732 */ 733 spin_lock_irqsave(&rm->m_rs_lock, flags); 734 735 spin_lock(&rs->rs_lock); 736 __rds_rdma_send_complete(rs, rm, RDS_RDMA_CANCELED); 737 spin_unlock(&rs->rs_lock); 738 739 rm->m_rs = NULL; 740 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 741 742 rds_message_put(rm); 743 } 744 745 rds_wake_sk_sleep(rs); 746 747 while (!list_empty(&list)) { 748 rm = list_entry(list.next, struct rds_message, m_sock_item); 749 list_del_init(&rm->m_sock_item); 750 751 rds_message_wait(rm); 752 rds_message_put(rm); 753 } 754} 755 756/* 757 * we only want this to fire once so we use the callers 'queued'. It's 758 * possible that another thread can race with us and remove the 759 * message from the flow with RDS_CANCEL_SENT_TO. 760 */ 761static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn, 762 struct rds_message *rm, __be16 sport, 763 __be16 dport, int *queued) 764{ 765 unsigned long flags; 766 u32 len; 767 768 if (*queued) 769 goto out; 770 771 len = be32_to_cpu(rm->m_inc.i_hdr.h_len); 772 773 /* this is the only place which holds both the socket's rs_lock 774 * and the connection's c_lock */ 775 spin_lock_irqsave(&rs->rs_lock, flags); 776 777 /* 778 * If there is a little space in sndbuf, we don't queue anything, 779 * and userspace gets -EAGAIN. But poll() indicates there's send 780 * room. This can lead to bad behavior (spinning) if snd_bytes isn't 781 * freed up by incoming acks. So we check the *old* value of 782 * rs_snd_bytes here to allow the last msg to exceed the buffer, 783 * and poll() now knows no more data can be sent. 784 */ 785 if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) { 786 rs->rs_snd_bytes += len; 787 788 /* let recv side know we are close to send space exhaustion. 789 * This is probably not the optimal way to do it, as this 790 * means we set the flag on *all* messages as soon as our 791 * throughput hits a certain threshold. 792 */ 793 if (rs->rs_snd_bytes >= rds_sk_sndbuf(rs) / 2) 794 __set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); 795 796 list_add_tail(&rm->m_sock_item, &rs->rs_send_queue); 797 set_bit(RDS_MSG_ON_SOCK, &rm->m_flags); 798 rds_message_addref(rm); 799 rm->m_rs = rs; 800 801 /* The code ordering is a little weird, but we're 802 trying to minimize the time we hold c_lock */ 803 rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, 0); 804 rm->m_inc.i_conn = conn; 805 rds_message_addref(rm); 806 807 spin_lock(&conn->c_lock); 808 rm->m_inc.i_hdr.h_sequence = cpu_to_be64(conn->c_next_tx_seq++); 809 list_add_tail(&rm->m_conn_item, &conn->c_send_queue); 810 set_bit(RDS_MSG_ON_CONN, &rm->m_flags); 811 spin_unlock(&conn->c_lock); 812 813 rdsdebug("queued msg %p len %d, rs %p bytes %d seq %llu\n", 814 rm, len, rs, rs->rs_snd_bytes, 815 (unsigned long long)be64_to_cpu(rm->m_inc.i_hdr.h_sequence)); 816 817 *queued = 1; 818 } 819 820 spin_unlock_irqrestore(&rs->rs_lock, flags); 821out: 822 return *queued; 823} 824 825/* 826 * rds_message is getting to be quite complicated, and we'd like to allocate 827 * it all in one go. This figures out how big it needs to be up front. 828 */ 829static int rds_rm_size(struct msghdr *msg, int data_len) 830{ 831 struct cmsghdr *cmsg; 832 int size = 0; 833 int retval; 834 835 for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) { 836 if (!CMSG_OK(msg, cmsg)) 837 return -EINVAL; 838 839 if (cmsg->cmsg_level != SOL_RDS) 840 continue; 841 842 switch (cmsg->cmsg_type) { 843 case RDS_CMSG_RDMA_ARGS: 844 retval = rds_rdma_extra_size(CMSG_DATA(cmsg)); 845 if (retval < 0) 846 return retval; 847 size += retval; 848 break; 849 850 case RDS_CMSG_RDMA_DEST: 851 case RDS_CMSG_RDMA_MAP: 852 /* these are valid but do no add any size */ 853 break; 854 855 case RDS_CMSG_ATOMIC_CSWP: 856 case RDS_CMSG_ATOMIC_FADD: 857 size += sizeof(struct scatterlist); 858 break; 859 860 default: 861 return -EINVAL; 862 } 863 864 } 865 866 size += ceil(data_len, PAGE_SIZE) * sizeof(struct scatterlist); 867 868 return size; 869} 870 871static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, 872 struct msghdr *msg, int *allocated_mr) 873{ 874 struct cmsghdr *cmsg; 875 int ret = 0; 876 877 for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) { 878 if (!CMSG_OK(msg, cmsg)) 879 return -EINVAL; 880 881 if (cmsg->cmsg_level != SOL_RDS) 882 continue; 883 884 /* As a side effect, RDMA_DEST and RDMA_MAP will set 885 * rm->rdma.m_rdma_cookie and rm->rdma.m_rdma_mr. 886 */ 887 switch (cmsg->cmsg_type) { 888 case RDS_CMSG_RDMA_ARGS: 889 ret = rds_cmsg_rdma_args(rs, rm, cmsg); 890 break; 891 892 case RDS_CMSG_RDMA_DEST: 893 ret = rds_cmsg_rdma_dest(rs, rm, cmsg); 894 break; 895 896 case RDS_CMSG_RDMA_MAP: 897 ret = rds_cmsg_rdma_map(rs, rm, cmsg); 898 if (!ret) 899 *allocated_mr = 1; 900 break; 901 case RDS_CMSG_ATOMIC_CSWP: 902 case RDS_CMSG_ATOMIC_FADD: 903 ret = rds_cmsg_atomic(rs, rm, cmsg); 904 break; 905 906 default: 907 return -EINVAL; 908 } 909 910 if (ret) 911 break; 912 } 913 914 return ret; 915} 916 917int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, 918 size_t payload_len) 919{ 920 struct sock *sk = sock->sk; 921 struct rds_sock *rs = rds_sk_to_rs(sk); 922 struct sockaddr_in *usin = (struct sockaddr_in *)msg->msg_name; 923 __be32 daddr; 924 __be16 dport; 925 struct rds_message *rm = NULL; 926 struct rds_connection *conn; 927 int ret = 0; 928 int queued = 0, allocated_mr = 0; 929 int nonblock = msg->msg_flags & MSG_DONTWAIT; 930 long timeo = sock_sndtimeo(sk, nonblock); 931 932 /* Mirror Linux UDP mirror of BSD error message compatibility */ 933 /* XXX: Perhaps MSG_MORE someday */ 934 if (msg->msg_flags & ~(MSG_DONTWAIT | MSG_CMSG_COMPAT)) { 935 printk(KERN_INFO "msg_flags 0x%08X\n", msg->msg_flags); 936 ret = -EOPNOTSUPP; 937 goto out; 938 } 939 940 if (msg->msg_namelen) { 941 /* XXX fail non-unicast destination IPs? */ 942 if (msg->msg_namelen < sizeof(*usin) || usin->sin_family != AF_INET) { 943 ret = -EINVAL; 944 goto out; 945 } 946 daddr = usin->sin_addr.s_addr; 947 dport = usin->sin_port; 948 } else { 949 /* We only care about consistency with ->connect() */ 950 lock_sock(sk); 951 daddr = rs->rs_conn_addr; 952 dport = rs->rs_conn_port; 953 release_sock(sk); 954 } 955 956 /* racing with another thread binding seems ok here */ 957 if (daddr == 0 || rs->rs_bound_addr == 0) { 958 ret = -ENOTCONN; /* XXX not a great errno */ 959 goto out; 960 } 961 962 /* size of rm including all sgs */ 963 ret = rds_rm_size(msg, payload_len); 964 if (ret < 0) 965 goto out; 966 967 rm = rds_message_alloc(ret, GFP_KERNEL); 968 if (!rm) { 969 ret = -ENOMEM; 970 goto out; 971 } 972 973 rm->data.op_sg = rds_message_alloc_sgs(rm, ceil(payload_len, PAGE_SIZE)); 974 /* XXX fix this to not allocate memory */ 975 ret = rds_message_copy_from_user(rm, msg->msg_iov, payload_len); 976 if (ret) 977 goto out; 978 979 rm->m_daddr = daddr; 980 981 /* rds_conn_create has a spinlock that runs with IRQ off. 982 * Caching the conn in the socket helps a lot. */ 983 if (rs->rs_conn && rs->rs_conn->c_faddr == daddr) 984 conn = rs->rs_conn; 985 else { 986 conn = rds_conn_create_outgoing(rs->rs_bound_addr, daddr, 987 rs->rs_transport, 988 sock->sk->sk_allocation); 989 if (IS_ERR(conn)) { 990 ret = PTR_ERR(conn); 991 goto out; 992 } 993 rs->rs_conn = conn; 994 } 995 996 /* Parse any control messages the user may have included. */ 997 ret = rds_cmsg_send(rs, rm, msg, &allocated_mr); 998 if (ret) 999 goto out; 1000 1001 if ((rm->m_rdma_cookie || rm->rdma.op_active) && 1002 !conn->c_trans->xmit_rdma) { 1003 if (printk_ratelimit()) 1004 printk(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n", 1005 &rm->rdma, conn->c_trans->xmit_rdma); 1006 ret = -EOPNOTSUPP; 1007 goto out; 1008 } 1009 1010 if (rm->atomic.op_active && !conn->c_trans->xmit_atomic) { 1011 if (printk_ratelimit()) 1012 printk(KERN_NOTICE "atomic_op %p conn xmit_atomic %p\n", 1013 &rm->atomic, conn->c_trans->xmit_atomic); 1014 ret = -EOPNOTSUPP; 1015 goto out; 1016 } 1017 1018 /* If the connection is down, trigger a connect. We may 1019 * have scheduled a delayed reconnect however - in this case 1020 * we should not interfere. 1021 */ 1022 if (rds_conn_state(conn) == RDS_CONN_DOWN && 1023 !test_and_set_bit(RDS_RECONNECT_PENDING, &conn->c_flags)) 1024 queue_delayed_work(rds_wq, &conn->c_conn_w, 0); 1025 1026 ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs); 1027 if (ret) { 1028 rs->rs_seen_congestion = 1; 1029 goto out; 1030 } 1031 1032 while (!rds_send_queue_rm(rs, conn, rm, rs->rs_bound_port, 1033 dport, &queued)) { 1034 rds_stats_inc(s_send_queue_full); 1035 /* XXX make sure this is reasonable */ 1036 if (payload_len > rds_sk_sndbuf(rs)) { 1037 ret = -EMSGSIZE; 1038 goto out; 1039 } 1040 if (nonblock) { 1041 ret = -EAGAIN; 1042 goto out; 1043 } 1044 1045 timeo = wait_event_interruptible_timeout(*sk_sleep(sk), 1046 rds_send_queue_rm(rs, conn, rm, 1047 rs->rs_bound_port, 1048 dport, 1049 &queued), 1050 timeo); 1051 rdsdebug("sendmsg woke queued %d timeo %ld\n", queued, timeo); 1052 if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT) 1053 continue; 1054 1055 ret = timeo; 1056 if (ret == 0) 1057 ret = -ETIMEDOUT; 1058 goto out; 1059 } 1060 1061 /* 1062 * By now we've committed to the send. We reuse rds_send_worker() 1063 * to retry sends in the rds thread if the transport asks us to. 1064 */ 1065 rds_stats_inc(s_send_queued); 1066 1067 if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags)) 1068 rds_send_worker(&conn->c_send_w.work); 1069 1070 rds_message_put(rm); 1071 return payload_len; 1072 1073out: 1074 /* If the user included a RDMA_MAP cmsg, we allocated a MR on the fly. 1075 * If the sendmsg goes through, we keep the MR. If it fails with EAGAIN 1076 * or in any other way, we need to destroy the MR again */ 1077 if (allocated_mr) 1078 rds_rdma_unuse(rs, rds_rdma_cookie_key(rm->m_rdma_cookie), 1); 1079 1080 if (rm) 1081 rds_message_put(rm); 1082 return ret; 1083} 1084 1085/* 1086 * Reply to a ping packet. 1087 */ 1088int 1089rds_send_pong(struct rds_connection *conn, __be16 dport) 1090{ 1091 struct rds_message *rm; 1092 unsigned long flags; 1093 int ret = 0; 1094 1095 rm = rds_message_alloc(0, GFP_ATOMIC); 1096 if (!rm) { 1097 ret = -ENOMEM; 1098 goto out; 1099 } 1100 1101 rm->m_daddr = conn->c_faddr; 1102 1103 /* If the connection is down, trigger a connect. We may 1104 * have scheduled a delayed reconnect however - in this case 1105 * we should not interfere. 1106 */ 1107 if (rds_conn_state(conn) == RDS_CONN_DOWN && 1108 !test_and_set_bit(RDS_RECONNECT_PENDING, &conn->c_flags)) 1109 queue_delayed_work(rds_wq, &conn->c_conn_w, 0); 1110 1111 ret = rds_cong_wait(conn->c_fcong, dport, 1, NULL); 1112 if (ret) 1113 goto out; 1114 1115 spin_lock_irqsave(&conn->c_lock, flags); 1116 list_add_tail(&rm->m_conn_item, &conn->c_send_queue); 1117 set_bit(RDS_MSG_ON_CONN, &rm->m_flags); 1118 rds_message_addref(rm); 1119 rm->m_inc.i_conn = conn; 1120 1121 rds_message_populate_header(&rm->m_inc.i_hdr, 0, dport, 1122 conn->c_next_tx_seq); 1123 conn->c_next_tx_seq++; 1124 spin_unlock_irqrestore(&conn->c_lock, flags); 1125 1126 rds_stats_inc(s_send_queued); 1127 rds_stats_inc(s_send_pong); 1128 1129 queue_delayed_work(rds_wq, &conn->c_send_w, 0); 1130 rds_message_put(rm); 1131 return 0; 1132 1133out: 1134 if (rm) 1135 rds_message_put(rm); 1136 return ret; 1137} 1138