server.c revision 53bd8dbcbf692d1f622d6c9e62284121e710fdc3
1#include <stdio.h> 2#include <stdlib.h> 3#include <stdarg.h> 4#include <unistd.h> 5#include <limits.h> 6#include <errno.h> 7#include <fcntl.h> 8#include <sys/poll.h> 9#include <sys/types.h> 10#include <sys/wait.h> 11#include <sys/socket.h> 12#include <sys/stat.h> 13#include <sys/un.h> 14#include <netinet/in.h> 15#include <arpa/inet.h> 16#include <netdb.h> 17#include <syslog.h> 18#include <signal.h> 19#include <zlib.h> 20 21#include "fio.h" 22#include "server.h" 23#include "crc/crc16.h" 24#include "lib/ieee754.h" 25 26#include "fio_version.h" 27 28int fio_net_port = FIO_NET_PORT; 29 30int exit_backend = 0; 31 32static int server_fd = -1; 33static char *fio_server_arg; 34static char *bind_sock; 35static struct sockaddr_in saddr_in; 36static struct sockaddr_in6 saddr_in6; 37static int first_cmd_check; 38static int use_ipv6; 39 40static const char *fio_server_ops[FIO_NET_CMD_NR] = { 41 "", 42 "QUIT", 43 "EXIT", 44 "JOB", 45 "JOBLINE", 46 "TEXT", 47 "TS", 48 "GS", 49 "SEND_ETA", 50 "ETA", 51 "PROBE", 52 "START", 53 "STOP", 54 "DISK_UTIL", 55 "SERVER_START", 56 "ADD_JOB", 57 "CMD_RUN" 58}; 59 60const char *fio_server_op(unsigned int op) 61{ 62 static char buf[32]; 63 64 if (op < FIO_NET_CMD_NR) 65 return fio_server_ops[op]; 66 67 sprintf(buf, "UNKNOWN/%d", op); 68 return buf; 69} 70 71static ssize_t iov_total_len(const struct iovec *iov, int count) 72{ 73 ssize_t ret = 0; 74 75 while (count--) { 76 ret += iov->iov_len; 77 iov++; 78 } 79 80 return ret; 81} 82 83static int fio_sendv_data(int sk, struct iovec *iov, int count) 84{ 85 ssize_t total_len = iov_total_len(iov, count); 86 ssize_t ret; 87 88 do { 89 ret = writev(sk, iov, count); 90 if (ret > 0) { 91 total_len -= ret; 92 if (!total_len) 93 break; 94 95 while (ret) { 96 if (ret >= iov->iov_len) { 97 ret -= iov->iov_len; 98 iov++; 99 continue; 100 } 101 iov->iov_base += ret; 102 iov->iov_len -= ret; 103 ret = 0; 104 } 105 } else if (!ret) 106 break; 107 else if (errno == EAGAIN || errno == EINTR) 108 continue; 109 else 110 break; 111 } while (!exit_backend); 112 113 if (!total_len) 114 return 0; 115 116 if (errno) 117 return -errno; 118 119 return 1; 120} 121 122int fio_send_data(int sk, const void *p, unsigned int len) 123{ 124 struct iovec iov = { .iov_base = (void *) p, .iov_len = len }; 125 126 assert(len <= sizeof(struct fio_net_cmd) + FIO_SERVER_MAX_FRAGMENT_PDU); 127 128 return fio_sendv_data(sk, &iov, 1); 129} 130 131int fio_recv_data(int sk, void *p, unsigned int len) 132{ 133 do { 134 int ret = recv(sk, p, len, MSG_WAITALL); 135 136 if (ret > 0) { 137 len -= ret; 138 if (!len) 139 break; 140 p += ret; 141 continue; 142 } else if (!ret) 143 break; 144 else if (errno == EAGAIN || errno == EINTR) 145 continue; 146 else 147 break; 148 } while (!exit_backend); 149 150 if (!len) 151 return 0; 152 153 return -1; 154} 155 156static int verify_convert_cmd(struct fio_net_cmd *cmd) 157{ 158 uint16_t crc; 159 160 cmd->cmd_crc16 = le16_to_cpu(cmd->cmd_crc16); 161 cmd->pdu_crc16 = le16_to_cpu(cmd->pdu_crc16); 162 163 crc = fio_crc16(cmd, FIO_NET_CMD_CRC_SZ); 164 if (crc != cmd->cmd_crc16) { 165 log_err("fio: server bad crc on command (got %x, wanted %x)\n", 166 cmd->cmd_crc16, crc); 167 return 1; 168 } 169 170 cmd->version = le16_to_cpu(cmd->version); 171 cmd->opcode = le16_to_cpu(cmd->opcode); 172 cmd->flags = le32_to_cpu(cmd->flags); 173 cmd->tag = le64_to_cpu(cmd->tag); 174 cmd->pdu_len = le32_to_cpu(cmd->pdu_len); 175 176 switch (cmd->version) { 177 case FIO_SERVER_VER: 178 break; 179 default: 180 log_err("fio: bad server cmd version %d\n", cmd->version); 181 return 1; 182 } 183 184 if (cmd->pdu_len > FIO_SERVER_MAX_FRAGMENT_PDU) { 185 log_err("fio: command payload too large: %u\n", cmd->pdu_len); 186 return 1; 187 } 188 189 return 0; 190} 191 192/* 193 * Read (and defragment, if necessary) incoming commands 194 */ 195struct fio_net_cmd *fio_net_recv_cmd(int sk) 196{ 197 struct fio_net_cmd cmd, *cmdret = NULL; 198 size_t cmd_size = 0, pdu_offset = 0; 199 uint16_t crc; 200 int ret, first = 1; 201 void *pdu = NULL; 202 203 do { 204 ret = fio_recv_data(sk, &cmd, sizeof(cmd)); 205 if (ret) 206 break; 207 208 /* We have a command, verify it and swap if need be */ 209 ret = verify_convert_cmd(&cmd); 210 if (ret) 211 break; 212 213 if (first) { 214 /* if this is text, add room for \0 at the end */ 215 cmd_size = sizeof(cmd) + cmd.pdu_len + 1; 216 assert(!cmdret); 217 } else 218 cmd_size += cmd.pdu_len; 219 220 cmdret = realloc(cmdret, cmd_size); 221 222 if (first) 223 memcpy(cmdret, &cmd, sizeof(cmd)); 224 else if (cmdret->opcode != cmd.opcode) { 225 log_err("fio: fragment opcode mismatch (%d != %d)\n", 226 cmdret->opcode, cmd.opcode); 227 ret = 1; 228 break; 229 } 230 231 if (!cmd.pdu_len) 232 break; 233 234 /* There's payload, get it */ 235 pdu = (void *) cmdret->payload + pdu_offset; 236 ret = fio_recv_data(sk, pdu, cmd.pdu_len); 237 if (ret) 238 break; 239 240 /* Verify payload crc */ 241 crc = fio_crc16(pdu, cmd.pdu_len); 242 if (crc != cmd.pdu_crc16) { 243 log_err("fio: server bad crc on payload "); 244 log_err("(got %x, wanted %x)\n", cmd.pdu_crc16, crc); 245 ret = 1; 246 break; 247 } 248 249 pdu_offset += cmd.pdu_len; 250 if (!first) 251 cmdret->pdu_len += cmd.pdu_len; 252 first = 0; 253 } while (cmd.flags & FIO_NET_CMD_F_MORE); 254 255 if (ret) { 256 free(cmdret); 257 cmdret = NULL; 258 } else if (cmdret) { 259 /* zero-terminate text input */ 260 if (cmdret->pdu_len) { 261 if (cmdret->opcode == FIO_NET_CMD_TEXT) { 262 struct cmd_text_pdu *pdu = (struct cmd_text_pdu *) cmdret->payload; 263 char *buf = (char *) pdu->buf; 264 265 buf[pdu->buf_len ] = '\0'; 266 } else if (cmdret->opcode == FIO_NET_CMD_JOB) { 267 struct cmd_job_pdu *pdu = (struct cmd_job_pdu *) cmdret->payload; 268 char *buf = (char *) pdu->buf; 269 int len = le32_to_cpu(pdu->buf_len); 270 271 buf[len] = '\0'; 272 } 273 } 274 275 /* frag flag is internal */ 276 cmdret->flags &= ~FIO_NET_CMD_F_MORE; 277 } 278 279 return cmdret; 280} 281 282void fio_net_cmd_crc_pdu(struct fio_net_cmd *cmd, const void *pdu) 283{ 284 uint32_t pdu_len; 285 286 cmd->cmd_crc16 = __cpu_to_le16(fio_crc16(cmd, FIO_NET_CMD_CRC_SZ)); 287 288 pdu_len = le32_to_cpu(cmd->pdu_len); 289 cmd->pdu_crc16 = __cpu_to_le16(fio_crc16(pdu, pdu_len)); 290} 291 292void fio_net_cmd_crc(struct fio_net_cmd *cmd) 293{ 294 fio_net_cmd_crc_pdu(cmd, cmd->payload); 295} 296 297int fio_net_send_cmd(int fd, uint16_t opcode, const void *buf, off_t size, 298 uint64_t tag) 299{ 300 struct fio_net_cmd *cmd = NULL; 301 size_t this_len, cur_len = 0; 302 int ret; 303 304 do { 305 this_len = size; 306 if (this_len > FIO_SERVER_MAX_FRAGMENT_PDU) 307 this_len = FIO_SERVER_MAX_FRAGMENT_PDU; 308 309 if (!cmd || cur_len < sizeof(*cmd) + this_len) { 310 if (cmd) 311 free(cmd); 312 313 cur_len = sizeof(*cmd) + this_len; 314 cmd = malloc(cur_len); 315 } 316 317 fio_init_net_cmd(cmd, opcode, buf, this_len, tag); 318 319 if (this_len < size) 320 cmd->flags = __cpu_to_le32(FIO_NET_CMD_F_MORE); 321 322 fio_net_cmd_crc(cmd); 323 324 ret = fio_send_data(fd, cmd, sizeof(*cmd) + this_len); 325 size -= this_len; 326 buf += this_len; 327 } while (!ret && size); 328 329 if (cmd) 330 free(cmd); 331 332 return ret; 333} 334 335static int fio_net_send_simple_stack_cmd(int sk, uint16_t opcode, uint64_t tag) 336{ 337 struct fio_net_cmd cmd; 338 339 fio_init_net_cmd(&cmd, opcode, NULL, 0, tag); 340 fio_net_cmd_crc(&cmd); 341 342 return fio_send_data(sk, &cmd, sizeof(cmd)); 343} 344 345/* 346 * If 'list' is non-NULL, then allocate and store the sent command for 347 * later verification. 348 */ 349int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t tag, 350 struct flist_head *list) 351{ 352 struct fio_net_int_cmd *cmd; 353 int ret; 354 355 if (!list) 356 return fio_net_send_simple_stack_cmd(sk, opcode, tag); 357 358 cmd = malloc(sizeof(*cmd)); 359 360 fio_init_net_cmd(&cmd->cmd, opcode, NULL, 0, (uintptr_t) cmd); 361 fio_net_cmd_crc(&cmd->cmd); 362 363 INIT_FLIST_HEAD(&cmd->list); 364 gettimeofday(&cmd->tv, NULL); 365 cmd->saved_tag = tag; 366 367 ret = fio_send_data(sk, &cmd->cmd, sizeof(cmd->cmd)); 368 if (ret) { 369 free(cmd); 370 return ret; 371 } 372 373 flist_add_tail(&cmd->list, list); 374 return 0; 375} 376 377static int fio_server_send_quit_cmd(void) 378{ 379 dprint(FD_NET, "server: sending quit\n"); 380 return fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_QUIT, 0, NULL); 381} 382 383static int handle_run_cmd(struct fio_net_cmd *cmd) 384{ 385 struct cmd_end_pdu epdu; 386 int ret; 387 388 ret = fio_backend(); 389 390 epdu.error = ret; 391 fio_net_send_cmd(server_fd, FIO_NET_CMD_STOP, &epdu, sizeof(epdu), 0); 392 393 fio_server_send_quit_cmd(); 394 reset_fio_state(); 395 first_cmd_check = 0; 396 return ret; 397} 398 399static int handle_job_cmd(struct fio_net_cmd *cmd) 400{ 401 struct cmd_job_pdu *pdu = (struct cmd_job_pdu *) cmd->payload; 402 void *buf = pdu->buf; 403 struct cmd_start_pdu spdu; 404 405 pdu->buf_len = le32_to_cpu(pdu->buf_len); 406 pdu->client_type = le32_to_cpu(pdu->client_type); 407 408 if (parse_jobs_ini(buf, 1, 0, pdu->client_type)) { 409 fio_server_send_quit_cmd(); 410 return -1; 411 } 412 413 spdu.jobs = cpu_to_le32(thread_number); 414 fio_net_send_cmd(server_fd, FIO_NET_CMD_START, &spdu, sizeof(spdu), 0); 415 return 0; 416} 417 418static int handle_jobline_cmd(struct fio_net_cmd *cmd) 419{ 420 void *pdu = cmd->payload; 421 struct cmd_single_line_pdu *cslp; 422 struct cmd_line_pdu *clp; 423 unsigned long offset; 424 struct cmd_start_pdu spdu; 425 char **argv; 426 int i; 427 428 clp = pdu; 429 clp->lines = le16_to_cpu(clp->lines); 430 clp->client_type = le16_to_cpu(clp->client_type); 431 argv = malloc(clp->lines * sizeof(char *)); 432 offset = sizeof(*clp); 433 434 dprint(FD_NET, "server: %d command line args\n", clp->lines); 435 436 for (i = 0; i < clp->lines; i++) { 437 cslp = pdu + offset; 438 argv[i] = (char *) cslp->text; 439 440 offset += sizeof(*cslp) + le16_to_cpu(cslp->len); 441 dprint(FD_NET, "server: %d: %s\n", i, argv[i]); 442 } 443 444 if (parse_cmd_line(clp->lines, argv, clp->client_type)) { 445 fio_server_send_quit_cmd(); 446 free(argv); 447 return -1; 448 } 449 450 free(argv); 451 452 spdu.jobs = cpu_to_le32(thread_number); 453 fio_net_send_cmd(server_fd, FIO_NET_CMD_START, &spdu, sizeof(spdu), 0); 454 return 0; 455} 456 457static int handle_probe_cmd(struct fio_net_cmd *cmd) 458{ 459 struct cmd_probe_pdu probe; 460 461 dprint(FD_NET, "server: sending probe reply\n"); 462 463 memset(&probe, 0, sizeof(probe)); 464 gethostname((char *) probe.hostname, sizeof(probe.hostname)); 465#ifdef FIO_BIG_ENDIAN 466 probe.bigendian = 1; 467#endif 468 probe.fio_major = FIO_MAJOR; 469 probe.fio_minor = FIO_MINOR; 470 probe.fio_patch = FIO_PATCH; 471 472 probe.os = FIO_OS; 473 probe.arch = FIO_ARCH; 474 475 probe.bpp = sizeof(void *); 476 477 return fio_net_send_cmd(server_fd, FIO_NET_CMD_PROBE, &probe, sizeof(probe), cmd->tag); 478} 479 480static int handle_send_eta_cmd(struct fio_net_cmd *cmd) 481{ 482 struct jobs_eta *je; 483 size_t size; 484 int i; 485 486 if (!thread_number) 487 return 0; 488 489 size = sizeof(*je) + thread_number * sizeof(char) + 1; 490 je = malloc(size); 491 memset(je, 0, size); 492 493 if (!calc_thread_status(je, 1)) { 494 free(je); 495 return 0; 496 } 497 498 dprint(FD_NET, "server sending status\n"); 499 500 je->nr_running = cpu_to_le32(je->nr_running); 501 je->nr_ramp = cpu_to_le32(je->nr_ramp); 502 je->nr_pending = cpu_to_le32(je->nr_pending); 503 je->files_open = cpu_to_le32(je->files_open); 504 505 for (i = 0; i < 2; i++) { 506 je->m_rate[i] = cpu_to_le32(je->m_rate[i]); 507 je->t_rate[i] = cpu_to_le32(je->t_rate[i]); 508 je->m_iops[i] = cpu_to_le32(je->m_iops[i]); 509 je->t_iops[i] = cpu_to_le32(je->t_iops[i]); 510 je->rate[i] = cpu_to_le32(je->rate[i]); 511 je->iops[i] = cpu_to_le32(je->iops[i]); 512 } 513 514 je->elapsed_sec = cpu_to_le64(je->elapsed_sec); 515 je->eta_sec = cpu_to_le64(je->eta_sec); 516 je->nr_threads = cpu_to_le32(je->nr_threads); 517 518 fio_net_send_cmd(server_fd, FIO_NET_CMD_ETA, je, size, cmd->tag); 519 free(je); 520 return 0; 521} 522 523static int handle_command(struct fio_net_cmd *cmd) 524{ 525 int ret; 526 527 dprint(FD_NET, "server: got op [%s], pdu=%u, tag=%lx\n", 528 fio_server_op(cmd->opcode), cmd->pdu_len, cmd->tag); 529 530 switch (cmd->opcode) { 531 case FIO_NET_CMD_QUIT: 532 fio_terminate_threads(TERMINATE_ALL); 533 return -1; 534 case FIO_NET_CMD_EXIT: 535 exit_backend = 1; 536 return -1; 537 case FIO_NET_CMD_JOB: 538 ret = handle_job_cmd(cmd); 539 break; 540 case FIO_NET_CMD_JOBLINE: 541 ret = handle_jobline_cmd(cmd); 542 break; 543 case FIO_NET_CMD_PROBE: 544 ret = handle_probe_cmd(cmd); 545 break; 546 case FIO_NET_CMD_SEND_ETA: 547 ret = handle_send_eta_cmd(cmd); 548 break; 549 case FIO_NET_CMD_RUN: 550 ret = handle_run_cmd(cmd); 551 break; 552 default: 553 log_err("fio: unknown opcode: %s\n",fio_server_op(cmd->opcode)); 554 ret = 1; 555 } 556 557 return ret; 558} 559 560static int handle_connection(int sk, int block) 561{ 562 struct fio_net_cmd *cmd = NULL; 563 int ret = 0; 564 565 /* read forever */ 566 while (!exit_backend) { 567 struct pollfd pfd = { 568 .fd = sk, 569 .events = POLLIN, 570 }; 571 572 ret = 0; 573 do { 574 ret = poll(&pfd, 1, 100); 575 if (ret < 0) { 576 if (errno == EINTR) 577 break; 578 log_err("fio: poll: %s\n", strerror(errno)); 579 break; 580 } else if (!ret) { 581 if (!block) 582 return 0; 583 continue; 584 } 585 586 if (pfd.revents & POLLIN) 587 break; 588 if (pfd.revents & (POLLERR|POLLHUP)) { 589 ret = 1; 590 break; 591 } 592 } while (!exit_backend); 593 594 if (ret < 0) 595 break; 596 597 cmd = fio_net_recv_cmd(sk); 598 if (!cmd) { 599 ret = -1; 600 break; 601 } 602 603 ret = handle_command(cmd); 604 if (ret) 605 break; 606 607 free(cmd); 608 cmd = NULL; 609 } 610 611 if (cmd) 612 free(cmd); 613 614 return ret; 615} 616 617void fio_server_idle_loop(void) 618{ 619 if (!first_cmd_check) { 620 fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_SERVER_START, 0, NULL); 621 first_cmd_check = 1; 622 } 623 if (server_fd != -1) 624 handle_connection(server_fd, 0); 625} 626 627static int accept_loop(int listen_sk) 628{ 629 struct sockaddr_in addr; 630 fio_socklen_t len = sizeof(addr); 631 struct pollfd pfd; 632 int ret, sk, flags, exitval = 0; 633 634 dprint(FD_NET, "server enter accept loop\n"); 635 636 flags = fcntl(listen_sk, F_GETFL); 637 flags |= O_NONBLOCK; 638 fcntl(listen_sk, F_SETFL, flags); 639again: 640 pfd.fd = listen_sk; 641 pfd.events = POLLIN; 642 do { 643 ret = poll(&pfd, 1, 100); 644 if (ret < 0) { 645 if (errno == EINTR) 646 break; 647 log_err("fio: poll: %s\n", strerror(errno)); 648 goto out; 649 } else if (!ret) 650 continue; 651 652 if (pfd.revents & POLLIN) 653 break; 654 } while (!exit_backend); 655 656 if (exit_backend) 657 goto out; 658 659 sk = accept(listen_sk, (struct sockaddr *) &addr, &len); 660 if (sk < 0) { 661 log_err("fio: accept: %s\n", strerror(errno)); 662 return -1; 663 } 664 665 dprint(FD_NET, "server: connect from %s\n", inet_ntoa(addr.sin_addr)); 666 667 server_fd = sk; 668 669 exitval = handle_connection(sk, 1); 670 671 server_fd = -1; 672 close(sk); 673 674 if (!exit_backend) 675 goto again; 676 677out: 678 return exitval; 679} 680 681int fio_server_text_output(int level, const char *buf, size_t len) 682{ 683 struct cmd_text_pdu *pdu; 684 unsigned int tlen; 685 struct timeval tv; 686 687 if (server_fd == -1) 688 return log_local_buf(buf, len); 689 690 tlen = sizeof(*pdu) + len; 691 pdu = malloc(tlen); 692 693 pdu->level = __cpu_to_le32(level); 694 pdu->buf_len = __cpu_to_le32(len); 695 696 gettimeofday(&tv, NULL); 697 pdu->log_sec = __cpu_to_le64(tv.tv_sec); 698 pdu->log_usec = __cpu_to_le64(tv.tv_usec); 699 700 memcpy(pdu->buf, buf, len); 701 702 fio_net_send_cmd(server_fd, FIO_NET_CMD_TEXT, pdu, tlen, 0); 703 free(pdu); 704 return len; 705} 706 707static void convert_io_stat(struct io_stat *dst, struct io_stat *src) 708{ 709 dst->max_val = cpu_to_le64(src->max_val); 710 dst->min_val = cpu_to_le64(src->min_val); 711 dst->samples = cpu_to_le64(src->samples); 712 713 /* 714 * Encode to IEEE 754 for network transfer 715 */ 716 dst->mean.u.i = __cpu_to_le64(fio_double_to_uint64(src->mean.u.f)); 717 dst->S.u.i = __cpu_to_le64(fio_double_to_uint64(src->S.u.f)); 718} 719 720static void convert_gs(struct group_run_stats *dst, struct group_run_stats *src) 721{ 722 int i; 723 724 for (i = 0; i < 2; i++) { 725 dst->max_run[i] = cpu_to_le64(src->max_run[i]); 726 dst->min_run[i] = cpu_to_le64(src->min_run[i]); 727 dst->max_bw[i] = cpu_to_le64(src->max_bw[i]); 728 dst->min_bw[i] = cpu_to_le64(src->min_bw[i]); 729 dst->io_kb[i] = cpu_to_le64(src->io_kb[i]); 730 dst->agg[i] = cpu_to_le64(src->agg[i]); 731 } 732 733 dst->kb_base = cpu_to_le32(src->kb_base); 734 dst->groupid = cpu_to_le32(src->groupid); 735} 736 737/* 738 * Send a CMD_TS, which packs struct thread_stat and group_run_stats 739 * into a single payload. 740 */ 741void fio_server_send_ts(struct thread_stat *ts, struct group_run_stats *rs) 742{ 743 struct cmd_ts_pdu p; 744 int i, j; 745 746 dprint(FD_NET, "server sending end stats\n"); 747 748 memset(&p, 0, sizeof(p)); 749 750 strcpy(p.ts.name, ts->name); 751 strcpy(p.ts.verror, ts->verror); 752 strcpy(p.ts.description, ts->description); 753 754 p.ts.error = cpu_to_le32(ts->error); 755 p.ts.groupid = cpu_to_le32(ts->groupid); 756 p.ts.pid = cpu_to_le32(ts->pid); 757 p.ts.members = cpu_to_le32(ts->members); 758 759 for (i = 0; i < 2; i++) { 760 convert_io_stat(&p.ts.clat_stat[i], &ts->clat_stat[i]); 761 convert_io_stat(&p.ts.slat_stat[i], &ts->slat_stat[i]); 762 convert_io_stat(&p.ts.lat_stat[i], &ts->lat_stat[i]); 763 convert_io_stat(&p.ts.bw_stat[i], &ts->bw_stat[i]); 764 } 765 766 p.ts.usr_time = cpu_to_le64(ts->usr_time); 767 p.ts.sys_time = cpu_to_le64(ts->sys_time); 768 p.ts.ctx = cpu_to_le64(ts->ctx); 769 p.ts.minf = cpu_to_le64(ts->minf); 770 p.ts.majf = cpu_to_le64(ts->majf); 771 p.ts.clat_percentiles = cpu_to_le64(ts->clat_percentiles); 772 773 for (i = 0; i < FIO_IO_U_LIST_MAX_LEN; i++) { 774 fio_fp64_t *src = &ts->percentile_list[i]; 775 fio_fp64_t *dst = &p.ts.percentile_list[i]; 776 777 dst->u.i = __cpu_to_le64(fio_double_to_uint64(src->u.f)); 778 } 779 780 for (i = 0; i < FIO_IO_U_MAP_NR; i++) { 781 p.ts.io_u_map[i] = cpu_to_le32(ts->io_u_map[i]); 782 p.ts.io_u_submit[i] = cpu_to_le32(ts->io_u_submit[i]); 783 p.ts.io_u_complete[i] = cpu_to_le32(ts->io_u_complete[i]); 784 } 785 786 for (i = 0; i < FIO_IO_U_LAT_U_NR; i++) { 787 p.ts.io_u_lat_u[i] = cpu_to_le32(ts->io_u_lat_u[i]); 788 p.ts.io_u_lat_m[i] = cpu_to_le32(ts->io_u_lat_m[i]); 789 } 790 791 for (i = 0; i < 2; i++) 792 for (j = 0; j < FIO_IO_U_PLAT_NR; j++) 793 p.ts.io_u_plat[i][j] = cpu_to_le32(ts->io_u_plat[i][j]); 794 795 for (i = 0; i < 3; i++) { 796 p.ts.total_io_u[i] = cpu_to_le64(ts->total_io_u[i]); 797 p.ts.short_io_u[i] = cpu_to_le64(ts->short_io_u[i]); 798 } 799 800 p.ts.total_submit = cpu_to_le64(ts->total_submit); 801 p.ts.total_complete = cpu_to_le64(ts->total_complete); 802 803 for (i = 0; i < 2; i++) { 804 p.ts.io_bytes[i] = cpu_to_le64(ts->io_bytes[i]); 805 p.ts.runtime[i] = cpu_to_le64(ts->runtime[i]); 806 } 807 808 p.ts.total_run_time = cpu_to_le64(ts->total_run_time); 809 p.ts.continue_on_error = cpu_to_le16(ts->continue_on_error); 810 p.ts.total_err_count = cpu_to_le64(ts->total_err_count); 811 p.ts.first_error = cpu_to_le32(ts->first_error); 812 p.ts.kb_base = cpu_to_le32(ts->kb_base); 813 814 convert_gs(&p.rs, rs); 815 816 fio_net_send_cmd(server_fd, FIO_NET_CMD_TS, &p, sizeof(p), 0); 817} 818 819void fio_server_send_gs(struct group_run_stats *rs) 820{ 821 struct group_run_stats gs; 822 823 dprint(FD_NET, "server sending group run stats\n"); 824 825 convert_gs(&gs, rs); 826 fio_net_send_cmd(server_fd, FIO_NET_CMD_GS, &gs, sizeof(gs), 0); 827} 828 829static void convert_agg(struct disk_util_agg *dst, struct disk_util_agg *src) 830{ 831 int i; 832 833 for (i = 0; i < 2; i++) { 834 dst->ios[i] = cpu_to_le32(src->ios[i]); 835 dst->merges[i] = cpu_to_le32(src->merges[i]); 836 dst->sectors[i] = cpu_to_le64(src->sectors[i]); 837 dst->ticks[i] = cpu_to_le32(src->ticks[i]); 838 } 839 840 dst->io_ticks = cpu_to_le32(src->io_ticks); 841 dst->time_in_queue = cpu_to_le32(src->time_in_queue); 842 dst->slavecount = cpu_to_le32(src->slavecount); 843 dst->max_util.u.i = __cpu_to_le64(fio_double_to_uint64(src->max_util.u.f)); 844} 845 846static void convert_dus(struct disk_util_stat *dst, struct disk_util_stat *src) 847{ 848 int i; 849 850 strcpy((char *) dst->name, (char *) src->name); 851 852 for (i = 0; i < 2; i++) { 853 dst->ios[i] = cpu_to_le32(src->ios[i]); 854 dst->merges[i] = cpu_to_le32(src->merges[i]); 855 dst->sectors[i] = cpu_to_le64(src->sectors[i]); 856 dst->ticks[i] = cpu_to_le32(src->ticks[i]); 857 } 858 859 dst->io_ticks = cpu_to_le32(src->io_ticks); 860 dst->time_in_queue = cpu_to_le32(src->time_in_queue); 861 dst->msec = cpu_to_le64(src->msec); 862} 863 864void fio_server_send_du(void) 865{ 866 struct disk_util *du; 867 struct flist_head *entry; 868 struct cmd_du_pdu pdu; 869 870 dprint(FD_NET, "server: sending disk_util %d\n", !flist_empty(&disk_list)); 871 872 memset(&pdu, 0, sizeof(pdu)); 873 874 flist_for_each(entry, &disk_list) { 875 du = flist_entry(entry, struct disk_util, list); 876 877 convert_dus(&pdu.dus, &du->dus); 878 convert_agg(&pdu.agg, &du->agg); 879 880 fio_net_send_cmd(server_fd, FIO_NET_CMD_DU, &pdu, sizeof(pdu), 0); 881 } 882} 883 884/* 885 * Send a command with a separate PDU, not inlined in the command 886 */ 887static int fio_send_cmd_ext_pdu(int sk, uint16_t opcode, const void *buf, 888 off_t size, uint64_t tag, uint32_t flags) 889{ 890 struct fio_net_cmd cmd; 891 struct iovec iov[2]; 892 893 iov[0].iov_base = &cmd; 894 iov[0].iov_len = sizeof(cmd); 895 iov[1].iov_base = (void *) buf; 896 iov[1].iov_len = size; 897 898 __fio_init_net_cmd(&cmd, opcode, size, tag); 899 cmd.flags = __cpu_to_le32(flags); 900 fio_net_cmd_crc_pdu(&cmd, buf); 901 902 return fio_sendv_data(server_fd, iov, 2); 903} 904 905int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name) 906{ 907 struct cmd_iolog_pdu pdu; 908 z_stream stream; 909 void *out_pdu; 910 int i, ret = 0; 911 912 pdu.nr_samples = __cpu_to_le32(log->nr_samples); 913 pdu.log_type = cpu_to_le32(log->log_type); 914 strcpy((char *) pdu.name, name); 915 916 for (i = 0; i < log->nr_samples; i++) { 917 struct io_sample *s = &log->log[i]; 918 919 s->time = cpu_to_le64(s->time); 920 s->val = cpu_to_le64(s->val); 921 s->ddir = cpu_to_le32(s->ddir); 922 s->bs = cpu_to_le32(s->bs); 923 } 924 925 /* 926 * Dirty - since the log is potentially huge, compress it into 927 * FIO_SERVER_MAX_FRAGMENT_PDU chunks and let the receiving 928 * side defragment it. 929 */ 930 out_pdu = malloc(FIO_SERVER_MAX_FRAGMENT_PDU); 931 932 stream.zalloc = Z_NULL; 933 stream.zfree = Z_NULL; 934 stream.opaque = Z_NULL; 935 936 if (deflateInit(&stream, Z_DEFAULT_COMPRESSION) != Z_OK) { 937 ret = 1; 938 goto err; 939 } 940 941 /* 942 * Send header first, it's not compressed. 943 */ 944 ret = fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG, &pdu, 945 sizeof(pdu), 0, FIO_NET_CMD_F_MORE); 946 if (ret) 947 goto err_zlib; 948 949 stream.next_in = (void *) log->log; 950 stream.avail_in = log->nr_samples * sizeof(struct io_sample); 951 952 do { 953 unsigned int this_len, flags = 0; 954 int ret; 955 956 stream.avail_out = FIO_SERVER_MAX_FRAGMENT_PDU; 957 stream.next_out = out_pdu; 958 assert(deflate(&stream, Z_FINISH) == Z_OK); 959 960 this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream.avail_out; 961 962 if (stream.avail_in) 963 flags = FIO_NET_CMD_F_MORE; 964 965 ret = fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG, 966 out_pdu, this_len, 0, flags); 967 if (ret) 968 goto err_zlib; 969 } while (stream.avail_in); 970 971err_zlib: 972 deflateEnd(&stream); 973err: 974 free(out_pdu); 975 return ret; 976} 977 978void fio_server_send_add_job(struct thread_options *o, const char *ioengine) 979{ 980 struct cmd_add_job_pdu pdu; 981 982 memset(&pdu, 0, sizeof(pdu)); 983 convert_thread_options_to_net(&pdu.top, o); 984 985 fio_net_send_cmd(server_fd, FIO_NET_CMD_ADD_JOB, &pdu, sizeof(pdu), 0); 986} 987 988static int fio_init_server_ip(void) 989{ 990 struct sockaddr *addr; 991 fio_socklen_t socklen; 992 int sk, opt; 993 994 if (use_ipv6) 995 sk = socket(AF_INET6, SOCK_STREAM, 0); 996 else 997 sk = socket(AF_INET, SOCK_STREAM, 0); 998 999 if (sk < 0) { 1000 log_err("fio: socket: %s\n", strerror(errno)); 1001 return -1; 1002 } 1003 1004 opt = 1; 1005 if (setsockopt(sk, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) { 1006 log_err("fio: setsockopt: %s\n", strerror(errno)); 1007 close(sk); 1008 return -1; 1009 } 1010#ifdef SO_REUSEPORT 1011 if (setsockopt(sk, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) { 1012 log_err("fio: setsockopt: %s\n", strerror(errno)); 1013 close(sk); 1014 return -1; 1015 } 1016#endif 1017 1018 if (use_ipv6) { 1019 addr = (struct sockaddr *) &saddr_in6; 1020 socklen = sizeof(saddr_in6); 1021 saddr_in6.sin6_family = AF_INET6; 1022 } else { 1023 addr = (struct sockaddr *) &saddr_in; 1024 socklen = sizeof(saddr_in); 1025 saddr_in.sin_family = AF_INET; 1026 } 1027 1028 if (bind(sk, addr, socklen) < 0) { 1029 log_err("fio: bind: %s\n", strerror(errno)); 1030 close(sk); 1031 return -1; 1032 } 1033 1034 return sk; 1035} 1036 1037static int fio_init_server_sock(void) 1038{ 1039 struct sockaddr_un addr; 1040 fio_socklen_t len; 1041 mode_t mode; 1042 int sk; 1043 1044 sk = socket(AF_UNIX, SOCK_STREAM, 0); 1045 if (sk < 0) { 1046 log_err("fio: socket: %s\n", strerror(errno)); 1047 return -1; 1048 } 1049 1050 mode = umask(000); 1051 1052 memset(&addr, 0, sizeof(addr)); 1053 addr.sun_family = AF_UNIX; 1054 strcpy(addr.sun_path, bind_sock); 1055 unlink(bind_sock); 1056 1057 len = sizeof(addr.sun_family) + strlen(bind_sock) + 1; 1058 1059 if (bind(sk, (struct sockaddr *) &addr, len) < 0) { 1060 log_err("fio: bind: %s\n", strerror(errno)); 1061 close(sk); 1062 return -1; 1063 } 1064 1065 umask(mode); 1066 return sk; 1067} 1068 1069static int fio_init_server_connection(void) 1070{ 1071 char bind_str[128]; 1072 int sk; 1073 1074 dprint(FD_NET, "starting server\n"); 1075 1076 if (!bind_sock) 1077 sk = fio_init_server_ip(); 1078 else 1079 sk = fio_init_server_sock(); 1080 1081 if (sk < 0) 1082 return sk; 1083 1084 if (!bind_sock) { 1085 char *p, port[16]; 1086 const void *src; 1087 int af; 1088 1089 if (use_ipv6) { 1090 af = AF_INET6; 1091 src = &saddr_in6.sin6_addr; 1092 } else { 1093 af = AF_INET; 1094 src = &saddr_in.sin_addr; 1095 } 1096 1097 p = (char *) inet_ntop(af, src, bind_str, sizeof(bind_str)); 1098 1099 sprintf(port, ",%u", fio_net_port); 1100 if (p) 1101 strcat(p, port); 1102 else 1103 strcpy(bind_str, port); 1104 } else 1105 strcpy(bind_str, bind_sock); 1106 1107 log_info("fio: server listening on %s\n", bind_str); 1108 1109 if (listen(sk, 0) < 0) { 1110 log_err("fio: listen: %s\n", strerror(errno)); 1111 return -1; 1112 } 1113 1114 return sk; 1115} 1116 1117int fio_server_parse_host(const char *host, int *ipv6, struct in_addr *inp, 1118 struct in6_addr *inp6) 1119 1120{ 1121 int ret = 0; 1122 1123 if (*ipv6) 1124 ret = inet_pton(AF_INET6, host, inp6); 1125 else 1126 ret = inet_pton(AF_INET, host, inp); 1127 1128 if (ret != 1) { 1129 struct hostent *hent; 1130 1131 hent = gethostbyname(host); 1132 if (!hent) { 1133 log_err("fio: failed to resolve <%s>\n", host); 1134 return 0; 1135 } 1136 1137 if (*ipv6) { 1138 if (hent->h_addrtype != AF_INET6) { 1139 log_info("fio: falling back to IPv4\n"); 1140 *ipv6 = 0; 1141 } else 1142 memcpy(inp6, hent->h_addr_list[0], 16); 1143 } 1144 if (!*ipv6) { 1145 if (hent->h_addrtype != AF_INET) { 1146 log_err("fio: lookup type mismatch\n"); 1147 return 0; 1148 } 1149 memcpy(inp, hent->h_addr_list[0], 4); 1150 } 1151 ret = 1; 1152 } 1153 1154 return !(ret == 1); 1155} 1156 1157/* 1158 * Parse a host/ip/port string. Reads from 'str'. 1159 * 1160 * Outputs: 1161 * 1162 * For IPv4: 1163 * *ptr is the host, *port is the port, inp is the destination. 1164 * For IPv6: 1165 * *ptr is the host, *port is the port, inp6 is the dest, and *ipv6 is 1. 1166 * For local domain sockets: 1167 * *ptr is the filename, *is_sock is 1. 1168 */ 1169int fio_server_parse_string(const char *str, char **ptr, int *is_sock, 1170 int *port, struct in_addr *inp, 1171 struct in6_addr *inp6, int *ipv6) 1172{ 1173 const char *host = str; 1174 char *portp; 1175 int lport = 0; 1176 1177 *ptr = NULL; 1178 *is_sock = 0; 1179 *port = fio_net_port; 1180 *ipv6 = 0; 1181 1182 if (!strncmp(str, "sock:", 5)) { 1183 *ptr = strdup(str + 5); 1184 *is_sock = 1; 1185 1186 return 0; 1187 } 1188 1189 /* 1190 * Is it ip:<ip or host>:port 1191 */ 1192 if (!strncmp(host, "ip:", 3)) 1193 host += 3; 1194 else if (!strncmp(host, "ip4:", 4)) 1195 host += 4; 1196 else if (!strncmp(host, "ip6:", 4)) { 1197 host += 4; 1198 *ipv6 = 1; 1199 } else if (host[0] == ':') { 1200 /* String is :port */ 1201 host++; 1202 lport = atoi(host); 1203 if (!lport || lport > 65535) { 1204 log_err("fio: bad server port %u\n", port); 1205 return 1; 1206 } 1207 /* no hostname given, we are done */ 1208 *port = lport; 1209 return 0; 1210 } 1211 1212 /* 1213 * If no port seen yet, check if there's a last ':' at the end 1214 */ 1215 if (!lport) { 1216 portp = strchr(host, ','); 1217 if (portp) { 1218 *portp = '\0'; 1219 portp++; 1220 lport = atoi(portp); 1221 if (!lport || lport > 65535) { 1222 log_err("fio: bad server port %u\n", port); 1223 return 1; 1224 } 1225 } 1226 } 1227 1228 if (lport) 1229 *port = lport; 1230 1231 if (!strlen(host)) 1232 return 0; 1233 1234 *ptr = strdup(host); 1235 1236 if (fio_server_parse_host(*ptr, ipv6, inp, inp6)) { 1237 free(*ptr); 1238 *ptr = NULL; 1239 return 1; 1240 } 1241 1242 if (*port == 0) 1243 *port = fio_net_port; 1244 1245 return 0; 1246} 1247 1248/* 1249 * Server arg should be one of: 1250 * 1251 * sock:/path/to/socket 1252 * ip:1.2.3.4 1253 * 1.2.3.4 1254 * 1255 * Where sock uses unix domain sockets, and ip binds the server to 1256 * a specific interface. If no arguments are given to the server, it 1257 * uses IP and binds to 0.0.0.0. 1258 * 1259 */ 1260static int fio_handle_server_arg(void) 1261{ 1262 int port = fio_net_port; 1263 int is_sock, ret = 0; 1264 1265 saddr_in.sin_addr.s_addr = htonl(INADDR_ANY); 1266 1267 if (!fio_server_arg) 1268 goto out; 1269 1270 ret = fio_server_parse_string(fio_server_arg, &bind_sock, &is_sock, 1271 &port, &saddr_in.sin_addr, 1272 &saddr_in6.sin6_addr, &use_ipv6); 1273 1274 if (!is_sock && bind_sock) { 1275 free(bind_sock); 1276 bind_sock = NULL; 1277 } 1278 1279out: 1280 fio_net_port = port; 1281 saddr_in.sin_port = htons(port); 1282 saddr_in6.sin6_port = htons(port); 1283 return ret; 1284} 1285 1286static int fio_server(void) 1287{ 1288 int sk, ret; 1289 1290 dprint(FD_NET, "starting server\n"); 1291 1292 if (fio_handle_server_arg()) 1293 return -1; 1294 1295 sk = fio_init_server_connection(); 1296 if (sk < 0) 1297 return -1; 1298 1299 ret = accept_loop(sk); 1300 1301 close(sk); 1302 1303 if (fio_server_arg) { 1304 free(fio_server_arg); 1305 fio_server_arg = NULL; 1306 } 1307 if (bind_sock) 1308 free(bind_sock); 1309 1310 return ret; 1311} 1312 1313void fio_server_got_signal(int signal) 1314{ 1315 if (signal == SIGPIPE) 1316 server_fd = -1; 1317 else { 1318 log_info("\nfio: terminating on signal %d\n", signal); 1319 exit_backend = 1; 1320 } 1321} 1322 1323static int check_existing_pidfile(const char *pidfile) 1324{ 1325 struct stat sb; 1326 char buf[16]; 1327 pid_t pid; 1328 FILE *f; 1329 1330 if (stat(pidfile, &sb)) 1331 return 0; 1332 1333 f = fopen(pidfile, "r"); 1334 if (!f) 1335 return 0; 1336 1337 if (fread(buf, sb.st_size, 1, f) <= 0) { 1338 fclose(f); 1339 return 1; 1340 } 1341 fclose(f); 1342 1343 pid = atoi(buf); 1344 if (kill(pid, SIGCONT) < 0) 1345 return errno != ESRCH; 1346 1347 return 1; 1348} 1349 1350static int write_pid(pid_t pid, const char *pidfile) 1351{ 1352 FILE *fpid; 1353 1354 fpid = fopen(pidfile, "w"); 1355 if (!fpid) { 1356 log_err("fio: failed opening pid file %s\n", pidfile); 1357 return 1; 1358 } 1359 1360 fprintf(fpid, "%u\n", (unsigned int) pid); 1361 fclose(fpid); 1362 return 0; 1363} 1364 1365/* 1366 * If pidfile is specified, background us. 1367 */ 1368int fio_start_server(char *pidfile) 1369{ 1370 pid_t pid; 1371 int ret; 1372 1373#if defined(WIN32) 1374 WSADATA wsd; 1375 WSAStartup(MAKEWORD(2,2), &wsd); 1376#endif 1377 1378 if (!pidfile) 1379 return fio_server(); 1380 1381 if (check_existing_pidfile(pidfile)) { 1382 log_err("fio: pidfile %s exists and server appears alive\n", 1383 pidfile); 1384 return -1; 1385 } 1386 1387 pid = fork(); 1388 if (pid < 0) { 1389 log_err("fio: failed server fork: %s", strerror(errno)); 1390 free(pidfile); 1391 return -1; 1392 } else if (pid) { 1393 int ret = write_pid(pid, pidfile); 1394 1395 exit(ret); 1396 } 1397 1398 setsid(); 1399 openlog("fio", LOG_NDELAY|LOG_NOWAIT|LOG_PID, LOG_USER); 1400 log_syslog = 1; 1401 close(STDIN_FILENO); 1402 close(STDOUT_FILENO); 1403 close(STDERR_FILENO); 1404 f_out = NULL; 1405 f_err = NULL; 1406 1407 ret = fio_server(); 1408 1409 closelog(); 1410 unlink(pidfile); 1411 free(pidfile); 1412 return ret; 1413} 1414 1415void fio_server_set_arg(const char *arg) 1416{ 1417 fio_server_arg = strdup(arg); 1418} 1419