net.c revision 414c2a3e741bb7dd7147ce6843f529c7773cea38
1/* 2 * net engine 3 * 4 * IO engine that reads/writes to/from sockets. 5 * 6 */ 7#include <stdio.h> 8#include <stdlib.h> 9#include <unistd.h> 10#include <errno.h> 11#include <assert.h> 12#include <netinet/in.h> 13#include <arpa/inet.h> 14#include <netdb.h> 15#include <sys/poll.h> 16#include <sys/types.h> 17#include <sys/socket.h> 18 19#include "../fio.h" 20 21struct netio_data { 22 int listenfd; 23 int send_to_net; 24 int use_splice; 25 int net_protocol; 26 int pipes[2]; 27 char host[64]; 28 struct sockaddr_in addr; 29}; 30 31static int fio_netio_prep(struct thread_data *td, struct io_u *io_u) 32{ 33 struct netio_data *nd = td->io_ops->data; 34 35 /* 36 * Make sure we don't see spurious reads to a receiver, and vice versa 37 */ 38 if ((nd->send_to_net && io_u->ddir == DDIR_READ) || 39 (!nd->send_to_net && io_u->ddir == DDIR_WRITE)) { 40 td_verror(td, EINVAL, "bad direction"); 41 return 1; 42 } 43 44 return 0; 45} 46 47#ifdef FIO_HAVE_SPLICE 48static int splice_io_u(int fdin, int fdout, unsigned int len) 49{ 50 int bytes = 0; 51 52 while (len) { 53 int ret = splice(fdin, NULL, fdout, NULL, len, 0); 54 55 if (ret < 0) { 56 if (!bytes) 57 bytes = ret; 58 59 break; 60 } else if (!ret) 61 break; 62 63 bytes += ret; 64 len -= ret; 65 } 66 67 return bytes; 68} 69 70/* 71 * Receive bytes from a socket and fill them into the internal pipe 72 */ 73static int splice_in(struct thread_data *td, struct io_u *io_u) 74{ 75 struct netio_data *nd = td->io_ops->data; 76 77 return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen); 78} 79 80/* 81 * Transmit 'len' bytes from the internal pipe 82 */ 83static int splice_out(struct thread_data *td, struct io_u *io_u, 84 unsigned int len) 85{ 86 struct netio_data *nd = td->io_ops->data; 87 88 return splice_io_u(nd->pipes[0], io_u->file->fd, len); 89} 90 91static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len) 92{ 93 struct iovec iov = { 94 .iov_base = io_u->xfer_buf, 95 .iov_len = len, 96 }; 97 int bytes = 0; 98 99 while (iov.iov_len) { 100 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE); 101 102 if (ret < 0) { 103 if (!bytes) 104 bytes = ret; 105 break; 106 } else if (!ret) 107 break; 108 109 iov.iov_len -= ret; 110 iov.iov_base += ret; 111 bytes += ret; 112 } 113 114 return bytes; 115 116} 117 118/* 119 * vmsplice() pipe to io_u buffer 120 */ 121static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u, 122 unsigned int len) 123{ 124 struct netio_data *nd = td->io_ops->data; 125 126 return vmsplice_io_u(io_u, nd->pipes[0], len); 127} 128 129/* 130 * vmsplice() io_u to pipe 131 */ 132static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u) 133{ 134 struct netio_data *nd = td->io_ops->data; 135 136 return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen); 137} 138 139/* 140 * splice receive - transfer socket data into a pipe using splice, then map 141 * that pipe data into the io_u using vmsplice. 142 */ 143static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u) 144{ 145 int ret; 146 147 ret = splice_in(td, io_u); 148 if (ret > 0) 149 return vmsplice_io_u_out(td, io_u, ret); 150 151 return ret; 152} 153 154/* 155 * splice transmit - map data from the io_u into a pipe by using vmsplice, 156 * then transfer that pipe to a socket using splice. 157 */ 158static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u) 159{ 160 int ret; 161 162 ret = vmsplice_io_u_in(td, io_u); 163 if (ret > 0) 164 return splice_out(td, io_u, ret); 165 166 return ret; 167} 168#else 169static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u) 170{ 171 errno = EOPNOTSUPP; 172 return -1; 173} 174 175static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u) 176{ 177 errno = EOPNOTSUPP; 178 return -1; 179} 180#endif 181 182static int fio_netio_send(struct thread_data *td, struct io_u *io_u) 183{ 184 struct netio_data *nd = td->io_ops->data; 185 int flags = 0; 186 187 /* 188 * if we are going to write more, set MSG_MORE 189 */ 190#ifdef MSG_MORE 191 if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen < td->o.size) 192 flags = MSG_MORE; 193#endif 194 195 if (nd->net_protocol == IPPROTO_UDP) { 196 return sendto(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen, 197 0, &nd->addr, sizeof(nd->addr)); 198 } else { 199 return send(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen, 200 flags); 201 } 202} 203 204static int fio_netio_recv(struct thread_data *td, struct io_u *io_u) 205{ 206 struct netio_data *nd = td->io_ops->data; 207 int flags = MSG_WAITALL; 208 209 if (nd->net_protocol == IPPROTO_UDP) { 210 socklen_t len = sizeof(nd->addr); 211 212 return recvfrom(io_u->file->fd, io_u->xfer_buf, 213 io_u->xfer_buflen, 0, &nd->addr, &len); 214 } else { 215 return recv(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen, 216 flags); 217 } 218} 219 220static int fio_netio_queue(struct thread_data *td, struct io_u *io_u) 221{ 222 struct netio_data *nd = td->io_ops->data; 223 int ret; 224 225 fio_ro_check(td, io_u); 226 227 if (io_u->ddir == DDIR_WRITE) { 228 if (!nd->use_splice || nd->net_protocol == IPPROTO_UDP) 229 ret = fio_netio_send(td, io_u); 230 else 231 ret = fio_netio_splice_out(td, io_u); 232 } else if (io_u->ddir == DDIR_READ) { 233 if (!nd->use_splice || nd->net_protocol == IPPROTO_UDP) 234 ret = fio_netio_recv(td, io_u); 235 else 236 ret = fio_netio_splice_in(td, io_u); 237 } else 238 ret = 0; /* must be a SYNC */ 239 240 if (ret != (int) io_u->xfer_buflen) { 241 if (ret >= 0) { 242 io_u->resid = io_u->xfer_buflen - ret; 243 io_u->error = 0; 244 return FIO_Q_COMPLETED; 245 } else { 246 int err = errno; 247 248 if (io_u->ddir == DDIR_WRITE && err == EMSGSIZE) 249 return FIO_Q_BUSY; 250 251 io_u->error = err; 252 } 253 } 254 255 if (io_u->error) 256 td_verror(td, io_u->error, "xfer"); 257 258 return FIO_Q_COMPLETED; 259} 260 261static int fio_netio_connect(struct thread_data *td, struct fio_file *f) 262{ 263 struct netio_data *nd = td->io_ops->data; 264 int type; 265 266 if (nd->net_protocol == IPPROTO_TCP) 267 type = SOCK_STREAM; 268 else 269 type = SOCK_DGRAM; 270 271 f->fd = socket(AF_INET, type, nd->net_protocol); 272 if (f->fd < 0) { 273 td_verror(td, errno, "socket"); 274 return 1; 275 } 276 277 if (nd->net_protocol == IPPROTO_UDP) 278 return 0; 279 280 if (connect(f->fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) { 281 td_verror(td, errno, "connect"); 282 return 1; 283 } 284 285 return 0; 286} 287 288static int fio_netio_accept(struct thread_data *td, struct fio_file *f) 289{ 290 struct netio_data *nd = td->io_ops->data; 291 socklen_t socklen = sizeof(nd->addr); 292 struct pollfd pfd; 293 int ret; 294 295 if (nd->net_protocol == IPPROTO_UDP) { 296 f->fd = nd->listenfd; 297 return 0; 298 } 299 300 log_info("fio: waiting for connection\n"); 301 302 /* 303 * Accept loop. poll for incoming events, accept them. Repeat until we 304 * have all connections. 305 */ 306 while (!td->terminate) { 307 pfd.fd = nd->listenfd; 308 pfd.events = POLLIN; 309 310 ret = poll(&pfd, 1, -1); 311 if (ret < 0) { 312 if (errno == EINTR) 313 continue; 314 315 td_verror(td, errno, "poll"); 316 break; 317 } else if (!ret) 318 continue; 319 320 /* 321 * should be impossible 322 */ 323 if (!(pfd.revents & POLLIN)) 324 continue; 325 326 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen); 327 if (f->fd < 0) { 328 td_verror(td, errno, "accept"); 329 return 1; 330 } 331 break; 332 } 333 334 return 0; 335} 336 337static int fio_netio_open_file(struct thread_data *td, struct fio_file *f) 338{ 339 if (td_read(td)) 340 return fio_netio_accept(td, f); 341 else 342 return fio_netio_connect(td, f); 343} 344 345static int fio_netio_setup_connect(struct thread_data *td, const char *host, 346 unsigned short port) 347{ 348 struct netio_data *nd = td->io_ops->data; 349 350 nd->addr.sin_family = AF_INET; 351 nd->addr.sin_port = htons(port); 352 353 if (inet_aton(host, &nd->addr.sin_addr) != 1) { 354 struct hostent *hent; 355 356 hent = gethostbyname(host); 357 if (!hent) { 358 td_verror(td, errno, "gethostbyname"); 359 return 1; 360 } 361 362 memcpy(&nd->addr.sin_addr, hent->h_addr, 4); 363 } 364 365 return 0; 366} 367 368static int fio_netio_setup_listen(struct thread_data *td, short port) 369{ 370 struct netio_data *nd = td->io_ops->data; 371 int fd, opt, type; 372 373 if (nd->net_protocol == IPPROTO_TCP) 374 type = SOCK_STREAM; 375 else 376 type = SOCK_DGRAM; 377 378 fd = socket(AF_INET, type, nd->net_protocol); 379 if (fd < 0) { 380 td_verror(td, errno, "socket"); 381 return 1; 382 } 383 384 opt = 1; 385 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) { 386 td_verror(td, errno, "setsockopt"); 387 return 1; 388 } 389#ifdef SO_REUSEPORT 390 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) { 391 td_verror(td, errno, "setsockopt"); 392 return 1; 393 } 394#endif 395 396 nd->addr.sin_family = AF_INET; 397 nd->addr.sin_addr.s_addr = htonl(INADDR_ANY); 398 nd->addr.sin_port = htons(port); 399 400 if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) { 401 td_verror(td, errno, "bind"); 402 return 1; 403 } 404 if (nd->net_protocol == IPPROTO_TCP && listen(fd, 1) < 0) { 405 td_verror(td, errno, "listen"); 406 return 1; 407 } 408 409 nd->listenfd = fd; 410 return 0; 411} 412 413static int fio_netio_init(struct thread_data *td) 414{ 415 struct netio_data *nd = td->io_ops->data; 416 unsigned int port; 417 char host[64], buf[128]; 418 char *sep, *portp, *modep; 419 int ret; 420 421 if (td_rw(td)) { 422 log_err("fio: network connections must be read OR write\n"); 423 return 1; 424 } 425 if (td_random(td)) { 426 log_err("fio: network IO can't be random\n"); 427 return 1; 428 } 429 430 strcpy(buf, td->o.filename); 431 432 sep = strchr(buf, '/'); 433 if (!sep) 434 goto bad_host; 435 436 *sep = '\0'; 437 sep++; 438 strcpy(host, buf); 439 if (!strlen(host)) 440 goto bad_host; 441 442 modep = NULL; 443 portp = sep; 444 sep = strchr(portp, '/'); 445 if (sep) { 446 *sep = '\0'; 447 modep = sep + 1; 448 } 449 450 port = strtol(portp, NULL, 10); 451 if (!port || port > 65535) 452 goto bad_host; 453 454 if (modep) { 455 if (!strncmp("tcp", modep, strlen(modep))) 456 nd->net_protocol = IPPROTO_TCP; 457 else if (!strncmp("udp", modep, strlen(modep))) 458 nd->net_protocol = IPPROTO_UDP; 459 else 460 goto bad_host; 461 } else 462 nd->net_protocol = IPPROTO_TCP; 463 464 if (td_read(td)) { 465 nd->send_to_net = 0; 466 ret = fio_netio_setup_listen(td, port); 467 } else { 468 nd->send_to_net = 1; 469 ret = fio_netio_setup_connect(td, host, port); 470 } 471 472 return ret; 473bad_host: 474 log_err("fio: bad network host/port/protocol: %s\n", td->o.filename); 475 return 1; 476} 477 478static void fio_netio_cleanup(struct thread_data *td) 479{ 480 struct netio_data *nd = td->io_ops->data; 481 482 if (nd) { 483 if (nd->listenfd != -1) 484 close(nd->listenfd); 485 if (nd->pipes[0] != -1) 486 close(nd->pipes[0]); 487 if (nd->pipes[1] != -1) 488 close(nd->pipes[1]); 489 490 free(nd); 491 } 492} 493 494static int fio_netio_setup(struct thread_data *td) 495{ 496 struct netio_data *nd; 497 498 if (!td->io_ops->data) { 499 nd = malloc(sizeof(*nd));; 500 501 memset(nd, 0, sizeof(*nd)); 502 nd->listenfd = -1; 503 nd->pipes[0] = nd->pipes[1] = -1; 504 td->io_ops->data = nd; 505 } 506 507 return 0; 508} 509 510#ifdef FIO_HAVE_SPLICE 511static int fio_netio_setup_splice(struct thread_data *td) 512{ 513 struct netio_data *nd; 514 515 fio_netio_setup(td); 516 517 nd = td->io_ops->data; 518 if (nd) { 519 if (pipe(nd->pipes) < 0) 520 return 1; 521 522 nd->use_splice = 1; 523 return 0; 524 } 525 526 return 1; 527} 528 529static struct ioengine_ops ioengine_splice = { 530 .name = "netsplice", 531 .version = FIO_IOOPS_VERSION, 532 .prep = fio_netio_prep, 533 .queue = fio_netio_queue, 534 .setup = fio_netio_setup_splice, 535 .init = fio_netio_init, 536 .cleanup = fio_netio_cleanup, 537 .open_file = fio_netio_open_file, 538 .close_file = generic_close_file, 539 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR | 540 FIO_SIGQUIT, 541}; 542#endif 543 544static struct ioengine_ops ioengine_rw = { 545 .name = "net", 546 .version = FIO_IOOPS_VERSION, 547 .prep = fio_netio_prep, 548 .queue = fio_netio_queue, 549 .setup = fio_netio_setup, 550 .init = fio_netio_init, 551 .cleanup = fio_netio_cleanup, 552 .open_file = fio_netio_open_file, 553 .close_file = generic_close_file, 554 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR | 555 FIO_SIGQUIT, 556}; 557 558static void fio_init fio_netio_register(void) 559{ 560 register_ioengine(&ioengine_rw); 561#ifdef FIO_HAVE_SPLICE 562 register_ioengine(&ioengine_splice); 563#endif 564} 565 566static void fio_exit fio_netio_unregister(void) 567{ 568 unregister_ioengine(&ioengine_rw); 569#ifdef FIO_HAVE_SPLICE 570 unregister_ioengine(&ioengine_splice); 571#endif 572} 573