net.c revision 67bf982340d95ca98098ea050b54b4c7adb116c0
1/* 2 * net engine 3 * 4 * IO engine that reads/writes to/from sockets. 5 * 6 */ 7#include <stdio.h> 8#include <stdlib.h> 9#include <unistd.h> 10#include <signal.h> 11#include <errno.h> 12#include <assert.h> 13#include <netinet/in.h> 14#include <arpa/inet.h> 15#include <netdb.h> 16#include <sys/poll.h> 17#include <sys/types.h> 18#include <sys/stat.h> 19#include <sys/socket.h> 20#include <sys/un.h> 21 22#include "../fio.h" 23 24struct netio_data { 25 int listenfd; 26 int use_splice; 27 int pipes[2]; 28 struct sockaddr_in addr; 29 struct sockaddr_un addr_un; 30}; 31 32struct netio_options { 33 struct thread_data *td; 34 unsigned int port; 35 unsigned int proto; 36 unsigned int listen; 37 unsigned int pingpong; 38}; 39 40struct udp_close_msg { 41 uint32_t magic; 42 uint32_t cmd; 43}; 44 45enum { 46 FIO_LINK_CLOSE = 0x89, 47 FIO_LINK_OPEN_CLOSE_MAGIC = 0x6c696e6b, 48 FIO_LINK_OPEN = 0x98, 49 50 FIO_TYPE_TCP = 1, 51 FIO_TYPE_UDP = 2, 52 FIO_TYPE_UNIX = 3, 53}; 54 55static int str_hostname_cb(void *data, const char *input); 56static struct fio_option options[] = { 57 { 58 .name = "hostname", 59 .type = FIO_OPT_STR_STORE, 60 .cb = str_hostname_cb, 61 .help = "Hostname for net IO engine", 62 }, 63 { 64 .name = "port", 65 .type = FIO_OPT_INT, 66 .off1 = offsetof(struct netio_options, port), 67 .minval = 1, 68 .maxval = 65535, 69 .help = "Port to use for TCP or UDP net connections", 70 }, 71 { 72 .name = "protocol", 73 .alias = "proto", 74 .type = FIO_OPT_STR, 75 .off1 = offsetof(struct netio_options, proto), 76 .help = "Network protocol to use", 77 .def = "tcp", 78 .posval = { 79 { .ival = "tcp", 80 .oval = FIO_TYPE_TCP, 81 .help = "Transmission Control Protocol", 82 }, 83 { .ival = "udp", 84 .oval = FIO_TYPE_UDP, 85 .help = "User Datagram Protocol", 86 }, 87 { .ival = "unix", 88 .oval = FIO_TYPE_UNIX, 89 .help = "UNIX domain socket", 90 }, 91 }, 92 }, 93 { 94 .name = "listen", 95 .type = FIO_OPT_STR_SET, 96 .off1 = offsetof(struct netio_options, listen), 97 .help = "Listen for incoming TCP connections", 98 }, 99 { 100 .name = "pingpong", 101 .type = FIO_OPT_STR_SET, 102 .off1 = offsetof(struct netio_options, pingpong), 103 .help = "Ping-pong IO requests", 104 }, 105 { 106 .name = NULL, 107 }, 108}; 109 110/* 111 * Return -1 for error and 'nr events' for a positive number 112 * of events 113 */ 114static int poll_wait(struct thread_data *td, int fd, short events) 115{ 116 struct pollfd pfd; 117 int ret; 118 119 while (!td->terminate) { 120 pfd.fd = fd; 121 pfd.events = events; 122 ret = poll(&pfd, 1, -1); 123 if (ret < 0) { 124 if (errno == EINTR) 125 break; 126 127 td_verror(td, errno, "poll"); 128 return -1; 129 } else if (!ret) 130 continue; 131 132 break; 133 } 134 135 if (pfd.revents & events) 136 return 1; 137 138 return -1; 139} 140 141static int fio_netio_prep(struct thread_data *td, struct io_u *io_u) 142{ 143 struct netio_options *o = td->eo; 144 145 /* 146 * Make sure we don't see spurious reads to a receiver, and vice versa 147 */ 148 if (o->proto == FIO_TYPE_TCP) 149 return 0; 150 151 if ((o->listen && io_u->ddir == DDIR_WRITE) || 152 (!o->listen && io_u->ddir == DDIR_READ)) { 153 td_verror(td, EINVAL, "bad direction"); 154 return 1; 155 } 156 157 return 0; 158} 159 160#ifdef CONFIG_LINUX_SPLICE 161static int splice_io_u(int fdin, int fdout, unsigned int len) 162{ 163 int bytes = 0; 164 165 while (len) { 166 int ret = splice(fdin, NULL, fdout, NULL, len, 0); 167 168 if (ret < 0) { 169 if (!bytes) 170 bytes = ret; 171 172 break; 173 } else if (!ret) 174 break; 175 176 bytes += ret; 177 len -= ret; 178 } 179 180 return bytes; 181} 182 183/* 184 * Receive bytes from a socket and fill them into the internal pipe 185 */ 186static int splice_in(struct thread_data *td, struct io_u *io_u) 187{ 188 struct netio_data *nd = td->io_ops->data; 189 190 return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen); 191} 192 193/* 194 * Transmit 'len' bytes from the internal pipe 195 */ 196static int splice_out(struct thread_data *td, struct io_u *io_u, 197 unsigned int len) 198{ 199 struct netio_data *nd = td->io_ops->data; 200 201 return splice_io_u(nd->pipes[0], io_u->file->fd, len); 202} 203 204static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len) 205{ 206 struct iovec iov = { 207 .iov_base = io_u->xfer_buf, 208 .iov_len = len, 209 }; 210 int bytes = 0; 211 212 while (iov.iov_len) { 213 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE); 214 215 if (ret < 0) { 216 if (!bytes) 217 bytes = ret; 218 break; 219 } else if (!ret) 220 break; 221 222 iov.iov_len -= ret; 223 iov.iov_base += ret; 224 bytes += ret; 225 } 226 227 return bytes; 228 229} 230 231/* 232 * vmsplice() pipe to io_u buffer 233 */ 234static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u, 235 unsigned int len) 236{ 237 struct netio_data *nd = td->io_ops->data; 238 239 return vmsplice_io_u(io_u, nd->pipes[0], len); 240} 241 242/* 243 * vmsplice() io_u to pipe 244 */ 245static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u) 246{ 247 struct netio_data *nd = td->io_ops->data; 248 249 return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen); 250} 251 252/* 253 * splice receive - transfer socket data into a pipe using splice, then map 254 * that pipe data into the io_u using vmsplice. 255 */ 256static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u) 257{ 258 int ret; 259 260 ret = splice_in(td, io_u); 261 if (ret > 0) 262 return vmsplice_io_u_out(td, io_u, ret); 263 264 return ret; 265} 266 267/* 268 * splice transmit - map data from the io_u into a pipe by using vmsplice, 269 * then transfer that pipe to a socket using splice. 270 */ 271static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u) 272{ 273 int ret; 274 275 ret = vmsplice_io_u_in(td, io_u); 276 if (ret > 0) 277 return splice_out(td, io_u, ret); 278 279 return ret; 280} 281#else 282static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u) 283{ 284 errno = EOPNOTSUPP; 285 return -1; 286} 287 288static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u) 289{ 290 errno = EOPNOTSUPP; 291 return -1; 292} 293#endif 294 295static int fio_netio_send(struct thread_data *td, struct io_u *io_u) 296{ 297 struct netio_data *nd = td->io_ops->data; 298 struct netio_options *o = td->eo; 299 int ret, flags = 0; 300 301 do { 302 if (o->proto == FIO_TYPE_UDP) { 303 struct sockaddr *to = (struct sockaddr *) &nd->addr; 304 305 ret = sendto(io_u->file->fd, io_u->xfer_buf, 306 io_u->xfer_buflen, flags, to, 307 sizeof(*to)); 308 } else { 309 /* 310 * if we are going to write more, set MSG_MORE 311 */ 312#ifdef MSG_MORE 313 if ((td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen < 314 td->o.size) && !o->pingpong) 315 flags |= MSG_MORE; 316#endif 317 ret = send(io_u->file->fd, io_u->xfer_buf, 318 io_u->xfer_buflen, flags); 319 } 320 if (ret > 0) 321 break; 322 323 ret = poll_wait(td, io_u->file->fd, POLLOUT); 324 if (ret <= 0) 325 break; 326 } while (1); 327 328 return ret; 329} 330 331static int is_udp_close(struct io_u *io_u, int len) 332{ 333 struct udp_close_msg *msg; 334 335 if (len != sizeof(struct udp_close_msg)) 336 return 0; 337 338 msg = io_u->xfer_buf; 339 if (ntohl(msg->magic) != FIO_LINK_OPEN_CLOSE_MAGIC) 340 return 0; 341 if (ntohl(msg->cmd) != FIO_LINK_CLOSE) 342 return 0; 343 344 return 1; 345} 346 347static int fio_netio_recv(struct thread_data *td, struct io_u *io_u) 348{ 349 struct netio_data *nd = td->io_ops->data; 350 struct netio_options *o = td->eo; 351 int ret, flags = 0; 352 353 do { 354 if (o->proto == FIO_TYPE_UDP) { 355 socklen_t len = sizeof(nd->addr); 356 struct sockaddr *from = (struct sockaddr *) &nd->addr; 357 358 ret = recvfrom(io_u->file->fd, io_u->xfer_buf, 359 io_u->xfer_buflen, flags, from, &len); 360 if (is_udp_close(io_u, ret)) { 361 td->done = 1; 362 return 0; 363 } 364 } else { 365 ret = recv(io_u->file->fd, io_u->xfer_buf, 366 io_u->xfer_buflen, flags); 367 } 368 if (ret > 0) 369 break; 370 else if (!ret && (flags & MSG_WAITALL)) 371 break; 372 373 ret = poll_wait(td, io_u->file->fd, POLLIN); 374 if (ret <= 0) 375 break; 376 flags |= MSG_WAITALL; 377 } while (1); 378 379 return ret; 380} 381 382static int __fio_netio_queue(struct thread_data *td, struct io_u *io_u, 383 enum fio_ddir ddir) 384{ 385 struct netio_data *nd = td->io_ops->data; 386 struct netio_options *o = td->eo; 387 int ret; 388 389 if (ddir == DDIR_WRITE) { 390 if (!nd->use_splice || o->proto == FIO_TYPE_UDP || 391 o->proto == FIO_TYPE_UNIX) 392 ret = fio_netio_send(td, io_u); 393 else 394 ret = fio_netio_splice_out(td, io_u); 395 } else if (ddir == DDIR_READ) { 396 if (!nd->use_splice || o->proto == FIO_TYPE_UDP || 397 o->proto == FIO_TYPE_UNIX) 398 ret = fio_netio_recv(td, io_u); 399 else 400 ret = fio_netio_splice_in(td, io_u); 401 } else 402 ret = 0; /* must be a SYNC */ 403 404 if (ret != (int) io_u->xfer_buflen) { 405 if (ret >= 0) { 406 io_u->resid = io_u->xfer_buflen - ret; 407 io_u->error = 0; 408 return FIO_Q_COMPLETED; 409 } else { 410 int err = errno; 411 412 if (ddir == DDIR_WRITE && err == EMSGSIZE) 413 return FIO_Q_BUSY; 414 415 io_u->error = err; 416 } 417 } 418 419 if (io_u->error) 420 td_verror(td, io_u->error, "xfer"); 421 422 return FIO_Q_COMPLETED; 423} 424 425static int fio_netio_queue(struct thread_data *td, struct io_u *io_u) 426{ 427 struct netio_options *o = td->eo; 428 int ret; 429 430 fio_ro_check(td, io_u); 431 432 ret = __fio_netio_queue(td, io_u, io_u->ddir); 433 if (!o->pingpong || ret != FIO_Q_COMPLETED) 434 return ret; 435 436 /* 437 * For ping-pong mode, receive or send reply as needed 438 */ 439 if (td_read(td) && io_u->ddir == DDIR_READ) 440 ret = __fio_netio_queue(td, io_u, DDIR_WRITE); 441 else if (td_write(td) && io_u->ddir == DDIR_WRITE) 442 ret = __fio_netio_queue(td, io_u, DDIR_READ); 443 444 return ret; 445} 446 447static int fio_netio_connect(struct thread_data *td, struct fio_file *f) 448{ 449 struct netio_data *nd = td->io_ops->data; 450 struct netio_options *o = td->eo; 451 int type, domain; 452 453 if (o->proto == FIO_TYPE_TCP) { 454 domain = AF_INET; 455 type = SOCK_STREAM; 456 } else if (o->proto == FIO_TYPE_UDP) { 457 domain = AF_INET; 458 type = SOCK_DGRAM; 459 } else if (o->proto == FIO_TYPE_UNIX) { 460 domain = AF_UNIX; 461 type = SOCK_STREAM; 462 } else { 463 log_err("fio: bad network type %d\n", o->proto); 464 f->fd = -1; 465 return 1; 466 } 467 468 f->fd = socket(domain, type, 0); 469 if (f->fd < 0) { 470 td_verror(td, errno, "socket"); 471 return 1; 472 } 473 474 if (o->proto == FIO_TYPE_UDP) 475 return 0; 476 else if (o->proto == FIO_TYPE_TCP) { 477 socklen_t len = sizeof(nd->addr); 478 479 if (connect(f->fd, (struct sockaddr *) &nd->addr, len) < 0) { 480 td_verror(td, errno, "connect"); 481 close(f->fd); 482 return 1; 483 } 484 } else { 485 struct sockaddr_un *addr = &nd->addr_un; 486 socklen_t len; 487 488 len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1; 489 490 if (connect(f->fd, (struct sockaddr *) addr, len) < 0) { 491 td_verror(td, errno, "connect"); 492 close(f->fd); 493 return 1; 494 } 495 } 496 497 return 0; 498} 499 500static int fio_netio_accept(struct thread_data *td, struct fio_file *f) 501{ 502 struct netio_data *nd = td->io_ops->data; 503 struct netio_options *o = td->eo; 504 socklen_t socklen = sizeof(nd->addr); 505 int state; 506 507 if (o->proto == FIO_TYPE_UDP) { 508 f->fd = nd->listenfd; 509 return 0; 510 } 511 512 state = td->runstate; 513 td_set_runstate(td, TD_SETTING_UP); 514 515 log_info("fio: waiting for connection\n"); 516 517 if (poll_wait(td, nd->listenfd, POLLIN) < 0) 518 goto err; 519 520 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen); 521 if (f->fd < 0) { 522 td_verror(td, errno, "accept"); 523 goto err; 524 } 525 526 reset_all_stats(td); 527 td_set_runstate(td, state); 528 return 0; 529err: 530 td_set_runstate(td, state); 531 return 1; 532} 533 534static void fio_netio_udp_close(struct thread_data *td, struct fio_file *f) 535{ 536 struct netio_data *nd = td->io_ops->data; 537 struct udp_close_msg msg; 538 struct sockaddr *to = (struct sockaddr *) &nd->addr; 539 int ret; 540 541 msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC); 542 msg.cmd = htonl(FIO_LINK_CLOSE); 543 544 ret = sendto(f->fd, &msg, sizeof(msg), MSG_WAITALL, to, 545 sizeof(nd->addr)); 546 if (ret < 0) 547 td_verror(td, errno, "sendto udp link close"); 548} 549 550static int fio_netio_close_file(struct thread_data *td, struct fio_file *f) 551{ 552 struct netio_options *o = td->eo; 553 554 /* 555 * If this is an UDP connection, notify the receiver that we are 556 * closing down the link 557 */ 558 if (o->proto == FIO_TYPE_UDP) 559 fio_netio_udp_close(td, f); 560 561 return generic_close_file(td, f); 562} 563 564static int fio_netio_udp_recv_open(struct thread_data *td, struct fio_file *f) 565{ 566 struct netio_data *nd = td->io_ops->data; 567 struct udp_close_msg msg; 568 struct sockaddr *to = (struct sockaddr *) &nd->addr; 569 socklen_t len = sizeof(nd->addr); 570 int ret; 571 572 ret = recvfrom(f->fd, &msg, sizeof(msg), MSG_WAITALL, to, &len); 573 if (ret < 0) { 574 td_verror(td, errno, "sendto udp link open"); 575 return ret; 576 } 577 578 if (ntohl(msg.magic) != FIO_LINK_OPEN_CLOSE_MAGIC || 579 ntohl(msg.cmd) != FIO_LINK_OPEN) { 580 log_err("fio: bad udp open magic %x/%x\n", ntohl(msg.magic), 581 ntohl(msg.cmd)); 582 return -1; 583 } 584 585 return 0; 586} 587 588static int fio_netio_udp_send_open(struct thread_data *td, struct fio_file *f) 589{ 590 struct netio_data *nd = td->io_ops->data; 591 struct udp_close_msg msg; 592 struct sockaddr *to = (struct sockaddr *) &nd->addr; 593 int ret; 594 595 msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC); 596 msg.cmd = htonl(FIO_LINK_OPEN); 597 598 ret = sendto(f->fd, &msg, sizeof(msg), MSG_WAITALL, to, 599 sizeof(nd->addr)); 600 if (ret < 0) { 601 td_verror(td, errno, "sendto udp link open"); 602 return ret; 603 } 604 605 return 0; 606} 607 608static int fio_netio_open_file(struct thread_data *td, struct fio_file *f) 609{ 610 int ret; 611 struct netio_options *o = td->eo; 612 613 if (o->listen) 614 ret = fio_netio_accept(td, f); 615 else 616 ret = fio_netio_connect(td, f); 617 618 if (ret) { 619 f->fd = -1; 620 return ret; 621 } 622 623 if (o->proto == FIO_TYPE_UDP) { 624 if (td_write(td)) 625 ret = fio_netio_udp_send_open(td, f); 626 else { 627 int state; 628 629 state = td->runstate; 630 td_set_runstate(td, TD_SETTING_UP); 631 ret = fio_netio_udp_recv_open(td, f); 632 td_set_runstate(td, state); 633 } 634 } 635 636 if (ret) 637 fio_netio_close_file(td, f); 638 639 return ret; 640} 641 642static int fio_netio_setup_connect_inet(struct thread_data *td, 643 const char *host, unsigned short port) 644{ 645 struct netio_data *nd = td->io_ops->data; 646 647 if (!host) { 648 log_err("fio: connect with no host to connect to.\n"); 649 if (td_read(td)) 650 log_err("fio: did you forget to set 'listen'?\n"); 651 652 td_verror(td, EINVAL, "no hostname= set"); 653 return 1; 654 } 655 656 nd->addr.sin_family = AF_INET; 657 nd->addr.sin_port = htons(port); 658 659 if (inet_aton(host, &nd->addr.sin_addr) != 1) { 660 struct hostent *hent; 661 662 hent = gethostbyname(host); 663 if (!hent) { 664 td_verror(td, errno, "gethostbyname"); 665 return 1; 666 } 667 668 memcpy(&nd->addr.sin_addr, hent->h_addr, 4); 669 } 670 671 return 0; 672} 673 674static int fio_netio_setup_connect_unix(struct thread_data *td, 675 const char *path) 676{ 677 struct netio_data *nd = td->io_ops->data; 678 struct sockaddr_un *soun = &nd->addr_un; 679 680 soun->sun_family = AF_UNIX; 681 strcpy(soun->sun_path, path); 682 return 0; 683} 684 685static int fio_netio_setup_connect(struct thread_data *td) 686{ 687 struct netio_options *o = td->eo; 688 689 if (o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_TCP) 690 return fio_netio_setup_connect_inet(td, td->o.filename,o->port); 691 else 692 return fio_netio_setup_connect_unix(td, td->o.filename); 693} 694 695static int fio_netio_setup_listen_unix(struct thread_data *td, const char *path) 696{ 697 struct netio_data *nd = td->io_ops->data; 698 struct sockaddr_un *addr = &nd->addr_un; 699 mode_t mode; 700 int len, fd; 701 702 fd = socket(AF_UNIX, SOCK_STREAM, 0); 703 if (fd < 0) { 704 log_err("fio: socket: %s\n", strerror(errno)); 705 return -1; 706 } 707 708 mode = umask(000); 709 710 memset(addr, 0, sizeof(*addr)); 711 addr->sun_family = AF_UNIX; 712 strcpy(addr->sun_path, path); 713 unlink(path); 714 715 len = sizeof(addr->sun_family) + strlen(path) + 1; 716 717 if (bind(fd, (struct sockaddr *) addr, len) < 0) { 718 log_err("fio: bind: %s\n", strerror(errno)); 719 close(fd); 720 return -1; 721 } 722 723 umask(mode); 724 nd->listenfd = fd; 725 return 0; 726} 727 728static int fio_netio_setup_listen_inet(struct thread_data *td, short port) 729{ 730 struct netio_data *nd = td->io_ops->data; 731 struct netio_options *o = td->eo; 732 int fd, opt, type; 733 734 if (o->proto == FIO_TYPE_TCP) 735 type = SOCK_STREAM; 736 else 737 type = SOCK_DGRAM; 738 739 fd = socket(AF_INET, type, 0); 740 if (fd < 0) { 741 td_verror(td, errno, "socket"); 742 return 1; 743 } 744 745 opt = 1; 746 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) { 747 td_verror(td, errno, "setsockopt"); 748 return 1; 749 } 750#ifdef SO_REUSEPORT 751 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) { 752 td_verror(td, errno, "setsockopt"); 753 return 1; 754 } 755#endif 756 757 nd->addr.sin_family = AF_INET; 758 nd->addr.sin_addr.s_addr = htonl(INADDR_ANY); 759 nd->addr.sin_port = htons(port); 760 761 if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) { 762 td_verror(td, errno, "bind"); 763 return 1; 764 } 765 766 nd->listenfd = fd; 767 return 0; 768} 769 770static int fio_netio_setup_listen(struct thread_data *td) 771{ 772 struct netio_data *nd = td->io_ops->data; 773 struct netio_options *o = td->eo; 774 int ret; 775 776 if (o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_TCP) 777 ret = fio_netio_setup_listen_inet(td, o->port); 778 else 779 ret = fio_netio_setup_listen_unix(td, td->o.filename); 780 781 if (ret) 782 return ret; 783 if (o->proto == FIO_TYPE_UDP) 784 return 0; 785 786 if (listen(nd->listenfd, 10) < 0) { 787 td_verror(td, errno, "listen"); 788 nd->listenfd = -1; 789 return 1; 790 } 791 792 return 0; 793} 794 795static int fio_netio_init(struct thread_data *td) 796{ 797 struct netio_options *o = td->eo; 798 int ret; 799 800#ifdef WIN32 801 WSADATA wsd; 802 WSAStartup(MAKEWORD(2,2), &wsd); 803#endif 804 805 if (td_random(td)) { 806 log_err("fio: network IO can't be random\n"); 807 return 1; 808 } 809 810 if (o->proto == FIO_TYPE_UNIX && o->port) { 811 log_err("fio: network IO port not valid with unix socket\n"); 812 return 1; 813 } else if (o->proto != FIO_TYPE_UNIX && !o->port) { 814 log_err("fio: network IO requires port for tcp or udp\n"); 815 return 1; 816 } 817 818 if (o->proto != FIO_TYPE_TCP) { 819 if (o->listen) { 820 log_err("fio: listen only valid for TCP proto IO\n"); 821 return 1; 822 } 823 if (td_rw(td)) { 824 log_err("fio: datagram network connections must be" 825 " read OR write\n"); 826 return 1; 827 } 828 if (o->proto == FIO_TYPE_UNIX && !td->o.filename) { 829 log_err("fio: UNIX sockets need host/filename\n"); 830 return 1; 831 } 832 o->listen = td_read(td); 833 } 834 835 if (o->proto != FIO_TYPE_UNIX && o->listen && td->o.filename) { 836 log_err("fio: hostname not valid for inbound network IO\n"); 837 return 1; 838 } 839 840 if (o->listen) 841 ret = fio_netio_setup_listen(td); 842 else 843 ret = fio_netio_setup_connect(td); 844 845 return ret; 846} 847 848static void fio_netio_cleanup(struct thread_data *td) 849{ 850 struct netio_data *nd = td->io_ops->data; 851 852 if (nd) { 853 if (nd->listenfd != -1) 854 close(nd->listenfd); 855 if (nd->pipes[0] != -1) 856 close(nd->pipes[0]); 857 if (nd->pipes[1] != -1) 858 close(nd->pipes[1]); 859 860 free(nd); 861 } 862} 863 864static int fio_netio_setup(struct thread_data *td) 865{ 866 struct netio_data *nd; 867 868 if (!td->files_index) { 869 add_file(td, td->o.filename ?: "net"); 870 td->o.nr_files = td->o.nr_files ?: 1; 871 } 872 873 if (!td->io_ops->data) { 874 nd = malloc(sizeof(*nd));; 875 876 memset(nd, 0, sizeof(*nd)); 877 nd->listenfd = -1; 878 nd->pipes[0] = nd->pipes[1] = -1; 879 td->io_ops->data = nd; 880 } 881 882 return 0; 883} 884 885static void fio_netio_terminate(struct thread_data *td) 886{ 887 kill(td->pid, SIGUSR2); 888} 889 890#ifdef CONFIG_LINUX_SPLICE 891static int fio_netio_setup_splice(struct thread_data *td) 892{ 893 struct netio_data *nd; 894 895 fio_netio_setup(td); 896 897 nd = td->io_ops->data; 898 if (nd) { 899 if (pipe(nd->pipes) < 0) 900 return 1; 901 902 nd->use_splice = 1; 903 return 0; 904 } 905 906 return 1; 907} 908 909static struct ioengine_ops ioengine_splice = { 910 .name = "netsplice", 911 .version = FIO_IOOPS_VERSION, 912 .prep = fio_netio_prep, 913 .queue = fio_netio_queue, 914 .setup = fio_netio_setup_splice, 915 .init = fio_netio_init, 916 .cleanup = fio_netio_cleanup, 917 .open_file = fio_netio_open_file, 918 .close_file = fio_netio_close_file, 919 .terminate = fio_netio_terminate, 920 .options = options, 921 .option_struct_size = sizeof(struct netio_options), 922 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR | 923 FIO_PIPEIO, 924}; 925#endif 926 927static struct ioengine_ops ioengine_rw = { 928 .name = "net", 929 .version = FIO_IOOPS_VERSION, 930 .prep = fio_netio_prep, 931 .queue = fio_netio_queue, 932 .setup = fio_netio_setup, 933 .init = fio_netio_init, 934 .cleanup = fio_netio_cleanup, 935 .open_file = fio_netio_open_file, 936 .close_file = fio_netio_close_file, 937 .terminate = fio_netio_terminate, 938 .options = options, 939 .option_struct_size = sizeof(struct netio_options), 940 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR | 941 FIO_PIPEIO, 942}; 943 944static int str_hostname_cb(void *data, const char *input) 945{ 946 struct netio_options *o = data; 947 948 if (o->td->o.filename) 949 free(o->td->o.filename); 950 o->td->o.filename = strdup(input); 951 return 0; 952} 953 954static void fio_init fio_netio_register(void) 955{ 956 register_ioengine(&ioengine_rw); 957#ifdef CONFIG_LINUX_SPLICE 958 register_ioengine(&ioengine_splice); 959#endif 960} 961 962static void fio_exit fio_netio_unregister(void) 963{ 964 unregister_ioengine(&ioengine_rw); 965#ifdef CONFIG_LINUX_SPLICE 966 unregister_ioengine(&ioengine_splice); 967#endif 968} 969