net.c revision 5ba13ea6968cf2773f10d34376afe28ef81aeee5
1/* 2 * net engine 3 * 4 * IO engine that reads/writes to/from sockets. 5 * 6 */ 7#include <stdio.h> 8#include <stdlib.h> 9#include <unistd.h> 10#include <errno.h> 11#include <assert.h> 12#include <netinet/in.h> 13#include <arpa/inet.h> 14#include <netdb.h> 15#include <sys/poll.h> 16#include <sys/types.h> 17#include <sys/socket.h> 18 19#include "../fio.h" 20 21struct netio_data { 22 int listenfd; 23 int send_to_net; 24 int use_splice; 25 int net_protocol; 26 int pipes[2]; 27 char host[64]; 28 struct sockaddr_in addr; 29}; 30 31struct udp_close_msg { 32 uint32_t magic; 33 uint32_t cmd; 34}; 35 36enum { 37 FIO_LINK_CLOSE = 0x89, 38 FIO_LINK_CLOSE_MAGIC = 0x6c696e6b, 39}; 40 41/* 42 * Return -1 for error and 'nr events' for a positive number 43 * of events 44 */ 45static int poll_wait(struct thread_data *td, int fd, short events) 46{ 47 struct pollfd pfd; 48 int ret; 49 50 while (!td->terminate) { 51 pfd.fd = fd; 52 pfd.events = events; 53 ret = poll(&pfd, 1, -1); 54 if (ret < 0) { 55 if (errno == EINTR) 56 break; 57 58 td_verror(td, errno, "poll"); 59 return -1; 60 } else if (!ret) 61 continue; 62 63 break; 64 } 65 66 if (pfd.revents & events) 67 return 1; 68 69 return -1; 70} 71 72static int fio_netio_prep(struct thread_data *td, struct io_u *io_u) 73{ 74 struct netio_data *nd = td->io_ops->data; 75 76 /* 77 * Make sure we don't see spurious reads to a receiver, and vice versa 78 */ 79 if ((nd->send_to_net && io_u->ddir == DDIR_READ) || 80 (!nd->send_to_net && io_u->ddir == DDIR_WRITE)) { 81 td_verror(td, EINVAL, "bad direction"); 82 return 1; 83 } 84 85 return 0; 86} 87 88#ifdef FIO_HAVE_SPLICE 89static int splice_io_u(int fdin, int fdout, unsigned int len) 90{ 91 int bytes = 0; 92 93 while (len) { 94 int ret = splice(fdin, NULL, fdout, NULL, len, 0); 95 96 if (ret < 0) { 97 if (!bytes) 98 bytes = ret; 99 100 break; 101 } else if (!ret) 102 break; 103 104 bytes += ret; 105 len -= ret; 106 } 107 108 return bytes; 109} 110 111/* 112 * Receive bytes from a socket and fill them into the internal pipe 113 */ 114static int splice_in(struct thread_data *td, struct io_u *io_u) 115{ 116 struct netio_data *nd = td->io_ops->data; 117 118 return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen); 119} 120 121/* 122 * Transmit 'len' bytes from the internal pipe 123 */ 124static int splice_out(struct thread_data *td, struct io_u *io_u, 125 unsigned int len) 126{ 127 struct netio_data *nd = td->io_ops->data; 128 129 return splice_io_u(nd->pipes[0], io_u->file->fd, len); 130} 131 132static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len) 133{ 134 struct iovec iov = { 135 .iov_base = io_u->xfer_buf, 136 .iov_len = len, 137 }; 138 int bytes = 0; 139 140 while (iov.iov_len) { 141 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE); 142 143 if (ret < 0) { 144 if (!bytes) 145 bytes = ret; 146 break; 147 } else if (!ret) 148 break; 149 150 iov.iov_len -= ret; 151 iov.iov_base += ret; 152 bytes += ret; 153 } 154 155 return bytes; 156 157} 158 159/* 160 * vmsplice() pipe to io_u buffer 161 */ 162static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u, 163 unsigned int len) 164{ 165 struct netio_data *nd = td->io_ops->data; 166 167 return vmsplice_io_u(io_u, nd->pipes[0], len); 168} 169 170/* 171 * vmsplice() io_u to pipe 172 */ 173static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u) 174{ 175 struct netio_data *nd = td->io_ops->data; 176 177 return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen); 178} 179 180/* 181 * splice receive - transfer socket data into a pipe using splice, then map 182 * that pipe data into the io_u using vmsplice. 183 */ 184static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u) 185{ 186 int ret; 187 188 ret = splice_in(td, io_u); 189 if (ret > 0) 190 return vmsplice_io_u_out(td, io_u, ret); 191 192 return ret; 193} 194 195/* 196 * splice transmit - map data from the io_u into a pipe by using vmsplice, 197 * then transfer that pipe to a socket using splice. 198 */ 199static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u) 200{ 201 int ret; 202 203 ret = vmsplice_io_u_in(td, io_u); 204 if (ret > 0) 205 return splice_out(td, io_u, ret); 206 207 return ret; 208} 209#else 210static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u) 211{ 212 errno = EOPNOTSUPP; 213 return -1; 214} 215 216static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u) 217{ 218 errno = EOPNOTSUPP; 219 return -1; 220} 221#endif 222 223static int fio_netio_send(struct thread_data *td, struct io_u *io_u) 224{ 225 struct netio_data *nd = td->io_ops->data; 226 int ret, flags = OS_MSG_DONTWAIT; 227 228 do { 229 if (nd->net_protocol == IPPROTO_UDP) { 230 struct sockaddr *to = (struct sockaddr *) &nd->addr; 231 232 ret = sendto(io_u->file->fd, io_u->xfer_buf, 233 io_u->xfer_buflen, flags, to, 234 sizeof(*to)); 235 } else { 236 /* 237 * if we are going to write more, set MSG_MORE 238 */ 239#ifdef MSG_MORE 240 if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen < 241 td->o.size) 242 flags |= MSG_MORE; 243#endif 244 ret = send(io_u->file->fd, io_u->xfer_buf, 245 io_u->xfer_buflen, flags); 246 } 247 if (ret > 0) 248 break; 249 250 ret = poll_wait(td, io_u->file->fd, POLLOUT); 251 if (ret <= 0) 252 break; 253 254 flags &= ~OS_MSG_DONTWAIT; 255 } while (1); 256 257 return ret; 258} 259 260static int is_udp_close(struct io_u *io_u, int len) 261{ 262 struct udp_close_msg *msg; 263 264 if (len != sizeof(struct udp_close_msg)) 265 return 0; 266 267 msg = io_u->xfer_buf; 268 if (ntohl(msg->magic) != FIO_LINK_CLOSE_MAGIC) 269 return 0; 270 if (ntohl(msg->cmd) != FIO_LINK_CLOSE) 271 return 0; 272 273 return 1; 274} 275 276static int fio_netio_recv(struct thread_data *td, struct io_u *io_u) 277{ 278 struct netio_data *nd = td->io_ops->data; 279 int ret, flags = OS_MSG_DONTWAIT; 280 281 do { 282 if (nd->net_protocol == IPPROTO_UDP) { 283 fio_socklen_t len = sizeof(nd->addr); 284 struct sockaddr *from = (struct sockaddr *) &nd->addr; 285 286 ret = recvfrom(io_u->file->fd, io_u->xfer_buf, 287 io_u->xfer_buflen, flags, from, &len); 288 if (is_udp_close(io_u, ret)) { 289 td->done = 1; 290 return 0; 291 } 292 } else { 293 ret = recv(io_u->file->fd, io_u->xfer_buf, 294 io_u->xfer_buflen, flags); 295 } 296 if (ret > 0) 297 break; 298 299 ret = poll_wait(td, io_u->file->fd, POLLIN); 300 if (ret <= 0) 301 break; 302 flags &= ~OS_MSG_DONTWAIT; 303 flags |= MSG_WAITALL; 304 } while (1); 305 306 return ret; 307} 308 309static int fio_netio_queue(struct thread_data *td, struct io_u *io_u) 310{ 311 struct netio_data *nd = td->io_ops->data; 312 int ret; 313 314 fio_ro_check(td, io_u); 315 316 if (io_u->ddir == DDIR_WRITE) { 317 if (!nd->use_splice || nd->net_protocol == IPPROTO_UDP) 318 ret = fio_netio_send(td, io_u); 319 else 320 ret = fio_netio_splice_out(td, io_u); 321 } else if (io_u->ddir == DDIR_READ) { 322 if (!nd->use_splice || nd->net_protocol == IPPROTO_UDP) 323 ret = fio_netio_recv(td, io_u); 324 else 325 ret = fio_netio_splice_in(td, io_u); 326 } else 327 ret = 0; /* must be a SYNC */ 328 329 if (ret != (int) io_u->xfer_buflen) { 330 if (ret >= 0) { 331 io_u->resid = io_u->xfer_buflen - ret; 332 io_u->error = 0; 333 return FIO_Q_COMPLETED; 334 } else { 335 int err = errno; 336 337 if (io_u->ddir == DDIR_WRITE && err == EMSGSIZE) 338 return FIO_Q_BUSY; 339 340 io_u->error = err; 341 } 342 } 343 344 if (io_u->error) 345 td_verror(td, io_u->error, "xfer"); 346 347 return FIO_Q_COMPLETED; 348} 349 350static int fio_netio_connect(struct thread_data *td, struct fio_file *f) 351{ 352 struct netio_data *nd = td->io_ops->data; 353 int type; 354 355 if (nd->net_protocol == IPPROTO_TCP) 356 type = SOCK_STREAM; 357 else 358 type = SOCK_DGRAM; 359 360 f->fd = socket(AF_INET, type, nd->net_protocol); 361 if (f->fd < 0) { 362 td_verror(td, errno, "socket"); 363 return 1; 364 } 365 366 if (nd->net_protocol == IPPROTO_UDP) 367 return 0; 368 369 if (connect(f->fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) { 370 td_verror(td, errno, "connect"); 371 return 1; 372 } 373 374 return 0; 375} 376 377static int fio_netio_accept(struct thread_data *td, struct fio_file *f) 378{ 379 struct netio_data *nd = td->io_ops->data; 380 fio_socklen_t socklen = sizeof(nd->addr); 381 382 if (nd->net_protocol == IPPROTO_UDP) { 383 f->fd = nd->listenfd; 384 return 0; 385 } 386 387 log_info("fio: waiting for connection\n"); 388 389 if (poll_wait(td, nd->listenfd, POLLIN) < 0) 390 return 1; 391 392 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen); 393 if (f->fd < 0) { 394 td_verror(td, errno, "accept"); 395 return 1; 396 } 397 398 return 0; 399} 400 401static int fio_netio_open_file(struct thread_data *td, struct fio_file *f) 402{ 403 if (td_read(td)) 404 return fio_netio_accept(td, f); 405 else 406 return fio_netio_connect(td, f); 407} 408 409static void fio_netio_udp_close(struct thread_data *td, struct fio_file *f) 410{ 411 struct netio_data *nd = td->io_ops->data; 412 struct udp_close_msg msg; 413 struct sockaddr *to = (struct sockaddr *) &nd->addr; 414 int ret; 415 416 msg.magic = htonl(FIO_LINK_CLOSE_MAGIC); 417 msg.cmd = htonl(FIO_LINK_CLOSE); 418 419 ret = sendto(f->fd, &msg, sizeof(msg), MSG_WAITALL, to, 420 sizeof(nd->addr)); 421 if (ret < 0) 422 td_verror(td, errno, "sendto udp link close"); 423} 424 425static int fio_netio_close_file(struct thread_data *td, struct fio_file *f) 426{ 427 struct netio_data *nd = td->io_ops->data; 428 429 /* 430 * If this is an UDP connection, notify the receiver that we are 431 * closing down the link 432 */ 433 if (nd->net_protocol == IPPROTO_UDP) 434 fio_netio_udp_close(td, f); 435 436 return generic_close_file(td, f); 437} 438 439static int fio_netio_setup_connect(struct thread_data *td, const char *host, 440 unsigned short port) 441{ 442 struct netio_data *nd = td->io_ops->data; 443 444 nd->addr.sin_family = AF_INET; 445 nd->addr.sin_port = htons(port); 446 447 if (inet_aton(host, &nd->addr.sin_addr) != 1) { 448 struct hostent *hent; 449 450 hent = gethostbyname(host); 451 if (!hent) { 452 td_verror(td, errno, "gethostbyname"); 453 return 1; 454 } 455 456 memcpy(&nd->addr.sin_addr, hent->h_addr, 4); 457 } 458 459 return 0; 460} 461 462static int fio_netio_setup_listen(struct thread_data *td, short port) 463{ 464 struct netio_data *nd = td->io_ops->data; 465 int fd, opt, type; 466 467 if (nd->net_protocol == IPPROTO_TCP) 468 type = SOCK_STREAM; 469 else 470 type = SOCK_DGRAM; 471 472 fd = socket(AF_INET, type, nd->net_protocol); 473 if (fd < 0) { 474 td_verror(td, errno, "socket"); 475 return 1; 476 } 477 478 opt = 1; 479 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) { 480 td_verror(td, errno, "setsockopt"); 481 return 1; 482 } 483#ifdef SO_REUSEPORT 484 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) { 485 td_verror(td, errno, "setsockopt"); 486 return 1; 487 } 488#endif 489 490 nd->addr.sin_family = AF_INET; 491 nd->addr.sin_addr.s_addr = htonl(INADDR_ANY); 492 nd->addr.sin_port = htons(port); 493 494 if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) { 495 td_verror(td, errno, "bind"); 496 return 1; 497 } 498 if (nd->net_protocol == IPPROTO_TCP && listen(fd, 1) < 0) { 499 td_verror(td, errno, "listen"); 500 return 1; 501 } 502 503 nd->listenfd = fd; 504 return 0; 505} 506 507static int fio_netio_init(struct thread_data *td) 508{ 509 struct netio_data *nd = td->io_ops->data; 510 unsigned int port; 511 char host[64], buf[128]; 512 char *sep, *portp, *modep; 513 int ret; 514 515 if (td_rw(td)) { 516 log_err("fio: network connections must be read OR write\n"); 517 return 1; 518 } 519 if (td_random(td)) { 520 log_err("fio: network IO can't be random\n"); 521 return 1; 522 } 523 524 strcpy(buf, td->o.filename); 525 526 sep = strchr(buf, '/'); 527 if (!sep) 528 goto bad_host; 529 530 *sep = '\0'; 531 sep++; 532 strcpy(host, buf); 533 if (!strlen(host)) 534 goto bad_host; 535 536 modep = NULL; 537 portp = sep; 538 sep = strchr(portp, '/'); 539 if (sep) { 540 *sep = '\0'; 541 modep = sep + 1; 542 } 543 544 port = strtol(portp, NULL, 10); 545 if (!port || port > 65535) 546 goto bad_host; 547 548 if (modep) { 549 if (!strncmp("tcp", modep, strlen(modep)) || 550 !strncmp("TCP", modep, strlen(modep))) 551 nd->net_protocol = IPPROTO_TCP; 552 else if (!strncmp("udp", modep, strlen(modep)) || 553 !strncmp("UDP", modep, strlen(modep))) 554 nd->net_protocol = IPPROTO_UDP; 555 else 556 goto bad_host; 557 } else 558 nd->net_protocol = IPPROTO_TCP; 559 560 if (td_read(td)) { 561 nd->send_to_net = 0; 562 ret = fio_netio_setup_listen(td, port); 563 } else { 564 nd->send_to_net = 1; 565 ret = fio_netio_setup_connect(td, host, port); 566 } 567 568 return ret; 569bad_host: 570 log_err("fio: bad network host/port/protocol: %s\n", td->o.filename); 571 return 1; 572} 573 574static void fio_netio_cleanup(struct thread_data *td) 575{ 576 struct netio_data *nd = td->io_ops->data; 577 578 if (nd) { 579 if (nd->listenfd != -1) 580 close(nd->listenfd); 581 if (nd->pipes[0] != -1) 582 close(nd->pipes[0]); 583 if (nd->pipes[1] != -1) 584 close(nd->pipes[1]); 585 586 free(nd); 587 } 588} 589 590static int fio_netio_setup(struct thread_data *td) 591{ 592 struct netio_data *nd; 593 594 if (!td->io_ops->data) { 595 nd = malloc(sizeof(*nd));; 596 597 memset(nd, 0, sizeof(*nd)); 598 nd->listenfd = -1; 599 nd->pipes[0] = nd->pipes[1] = -1; 600 td->io_ops->data = nd; 601 } 602 603 return 0; 604} 605 606#ifdef FIO_HAVE_SPLICE 607static int fio_netio_setup_splice(struct thread_data *td) 608{ 609 struct netio_data *nd; 610 611 fio_netio_setup(td); 612 613 nd = td->io_ops->data; 614 if (nd) { 615 if (pipe(nd->pipes) < 0) 616 return 1; 617 618 nd->use_splice = 1; 619 return 0; 620 } 621 622 return 1; 623} 624 625static struct ioengine_ops ioengine_splice = { 626 .name = "netsplice", 627 .version = FIO_IOOPS_VERSION, 628 .prep = fio_netio_prep, 629 .queue = fio_netio_queue, 630 .setup = fio_netio_setup_splice, 631 .init = fio_netio_init, 632 .cleanup = fio_netio_cleanup, 633 .open_file = fio_netio_open_file, 634 .close_file = generic_close_file, 635 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR | 636 FIO_SIGTERM | FIO_PIPEIO, 637}; 638#endif 639 640static struct ioengine_ops ioengine_rw = { 641 .name = "net", 642 .version = FIO_IOOPS_VERSION, 643 .prep = fio_netio_prep, 644 .queue = fio_netio_queue, 645 .setup = fio_netio_setup, 646 .init = fio_netio_init, 647 .cleanup = fio_netio_cleanup, 648 .open_file = fio_netio_open_file, 649 .close_file = fio_netio_close_file, 650 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR | 651 FIO_SIGTERM | FIO_PIPEIO, 652}; 653 654static void fio_init fio_netio_register(void) 655{ 656 register_ioengine(&ioengine_rw); 657#ifdef FIO_HAVE_SPLICE 658 register_ioengine(&ioengine_splice); 659#endif 660} 661 662static void fio_exit fio_netio_unregister(void) 663{ 664 unregister_ioengine(&ioengine_rw); 665#ifdef FIO_HAVE_SPLICE 666 unregister_ioengine(&ioengine_splice); 667#endif 668} 669