client.c revision 37f0c1ae23ad1716403d3d113c3dfdf41c47e329
1#include <stdio.h> 2#include <stdlib.h> 3#include <unistd.h> 4#include <limits.h> 5#include <errno.h> 6#include <fcntl.h> 7#include <sys/poll.h> 8#include <sys/types.h> 9#include <sys/stat.h> 10#include <sys/wait.h> 11#include <sys/socket.h> 12#include <sys/un.h> 13#include <netinet/in.h> 14#include <arpa/inet.h> 15#include <netdb.h> 16#include <signal.h> 17 18#include "fio.h" 19#include "server.h" 20#include "flist.h" 21#include "hash.h" 22 23struct client_eta { 24 struct jobs_eta eta; 25 unsigned int pending; 26}; 27 28struct fio_client { 29 struct flist_head list; 30 struct flist_head hash_list; 31 struct flist_head arg_list; 32 struct sockaddr_in addr; 33 struct sockaddr_un addr_un; 34 char *hostname; 35 int port; 36 int fd; 37 38 char *name; 39 40 int state; 41 42 int skip_newline; 43 int is_sock; 44 45 struct flist_head eta_list; 46 struct client_eta *eta_in_flight; 47 48 struct flist_head cmd_list; 49 50 uint16_t argc; 51 char **argv; 52}; 53 54static struct timeval eta_tv; 55 56enum { 57 Client_created = 0, 58 Client_connected = 1, 59 Client_started = 2, 60 Client_stopped = 3, 61 Client_exited = 4, 62}; 63 64static FLIST_HEAD(client_list); 65static FLIST_HEAD(eta_list); 66 67static FLIST_HEAD(arg_list); 68 69static struct thread_stat client_ts; 70static struct group_run_stats client_gs; 71static int sum_stat_clients; 72static int sum_stat_nr; 73 74#define FIO_CLIENT_HASH_BITS 7 75#define FIO_CLIENT_HASH_SZ (1 << FIO_CLIENT_HASH_BITS) 76#define FIO_CLIENT_HASH_MASK (FIO_CLIENT_HASH_SZ - 1) 77static struct flist_head client_hash[FIO_CLIENT_HASH_SZ]; 78 79static int handle_client(struct fio_client *client); 80static void dec_jobs_eta(struct client_eta *eta); 81 82static void fio_client_add_hash(struct fio_client *client) 83{ 84 int bucket = hash_long(client->fd, FIO_CLIENT_HASH_BITS); 85 86 bucket &= FIO_CLIENT_HASH_MASK; 87 flist_add(&client->hash_list, &client_hash[bucket]); 88} 89 90static void fio_client_remove_hash(struct fio_client *client) 91{ 92 if (!flist_empty(&client->hash_list)) 93 flist_del_init(&client->hash_list); 94} 95 96static void fio_init fio_client_hash_init(void) 97{ 98 int i; 99 100 for (i = 0; i < FIO_CLIENT_HASH_SZ; i++) 101 INIT_FLIST_HEAD(&client_hash[i]); 102} 103 104static struct fio_client *find_client_by_fd(int fd) 105{ 106 int bucket = hash_long(fd, FIO_CLIENT_HASH_BITS) & FIO_CLIENT_HASH_MASK; 107 struct fio_client *client; 108 struct flist_head *entry; 109 110 flist_for_each(entry, &client_hash[bucket]) { 111 client = flist_entry(entry, struct fio_client, hash_list); 112 113 if (client->fd == fd) 114 return client; 115 } 116 117 return NULL; 118} 119 120static void remove_client(struct fio_client *client) 121{ 122 dprint(FD_NET, "client: removed <%s>\n", client->hostname); 123 flist_del(&client->list); 124 125 fio_client_remove_hash(client); 126 127 if (!flist_empty(&client->eta_list)) { 128 flist_del_init(&client->eta_list); 129 dec_jobs_eta(client->eta_in_flight); 130 } 131 132 free(client->hostname); 133 if (client->argv) 134 free(client->argv); 135 if (client->name) 136 free(client->name); 137 138 free(client); 139 nr_clients--; 140} 141 142static void __fio_client_add_cmd_option(struct fio_client *client, 143 const char *opt) 144{ 145 int index; 146 147 index = client->argc++; 148 client->argv = realloc(client->argv, sizeof(char *) * client->argc); 149 client->argv[index] = strdup(opt); 150 dprint(FD_NET, "client: add cmd %d: %s\n", index, opt); 151} 152 153void fio_client_add_cmd_option(void *cookie, const char *opt) 154{ 155 struct fio_client *client = cookie; 156 struct flist_head *entry; 157 158 if (!client || !opt) 159 return; 160 161 __fio_client_add_cmd_option(client, opt); 162 163 /* 164 * Duplicate arguments to shared client group 165 */ 166 flist_for_each(entry, &arg_list) { 167 client = flist_entry(entry, struct fio_client, arg_list); 168 169 __fio_client_add_cmd_option(client, opt); 170 } 171} 172 173int fio_client_add(const char *hostname, void **cookie) 174{ 175 struct fio_client *existing = *cookie; 176 struct fio_client *client; 177 178 if (existing) { 179 /* 180 * We always add our "exec" name as the option, hence 1 181 * means empty. 182 */ 183 if (existing->argc == 1) 184 flist_add_tail(&existing->arg_list, &arg_list); 185 else { 186 while (!flist_empty(&arg_list)) 187 flist_del_init(arg_list.next); 188 } 189 } 190 191 client = malloc(sizeof(*client)); 192 memset(client, 0, sizeof(*client)); 193 194 INIT_FLIST_HEAD(&client->list); 195 INIT_FLIST_HEAD(&client->hash_list); 196 INIT_FLIST_HEAD(&client->arg_list); 197 INIT_FLIST_HEAD(&client->eta_list); 198 INIT_FLIST_HEAD(&client->cmd_list); 199 200 if (fio_server_parse_string(hostname, &client->hostname, 201 &client->is_sock, &client->port, 202 &client->addr.sin_addr)) 203 return -1; 204 205 client->fd = -1; 206 207 __fio_client_add_cmd_option(client, "fio"); 208 209 flist_add(&client->list, &client_list); 210 nr_clients++; 211 dprint(FD_NET, "client: added <%s>\n", client->hostname); 212 *cookie = client; 213 return 0; 214} 215 216static int fio_client_connect_ip(struct fio_client *client) 217{ 218 int fd; 219 220 client->addr.sin_family = AF_INET; 221 client->addr.sin_port = htons(client->port); 222 223 fd = socket(AF_INET, SOCK_STREAM, 0); 224 if (fd < 0) { 225 log_err("fio: socket: %s\n", strerror(errno)); 226 return -1; 227 } 228 229 if (connect(fd, (struct sockaddr *) &client->addr, sizeof(client->addr)) < 0) { 230 log_err("fio: connect: %s\n", strerror(errno)); 231 log_err("fio: failed to connect to %s:%u\n", client->hostname, 232 client->port); 233 close(fd); 234 return -1; 235 } 236 237 return fd; 238} 239 240static int fio_client_connect_sock(struct fio_client *client) 241{ 242 struct sockaddr_un *addr = &client->addr_un; 243 fio_socklen_t len; 244 int fd; 245 246 memset(addr, 0, sizeof(*addr)); 247 addr->sun_family = AF_UNIX; 248 strcpy(addr->sun_path, client->hostname); 249 250 fd = socket(AF_UNIX, SOCK_STREAM, 0); 251 if (fd < 0) { 252 log_err("fio: socket: %s\n", strerror(errno)); 253 return -1; 254 } 255 256 len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1; 257 if (connect(fd, (struct sockaddr *) addr, len) < 0) { 258 log_err("fio: connect; %s\n", strerror(errno)); 259 close(fd); 260 return -1; 261 } 262 263 return fd; 264} 265 266static int fio_client_connect(struct fio_client *client) 267{ 268 int fd; 269 270 dprint(FD_NET, "client: connect to host %s\n", client->hostname); 271 272 if (client->is_sock) 273 fd = fio_client_connect_sock(client); 274 else 275 fd = fio_client_connect_ip(client); 276 277 dprint(FD_NET, "client: %s connected %d\n", client->hostname, fd); 278 279 if (fd < 0) 280 return 1; 281 282 client->fd = fd; 283 fio_client_add_hash(client); 284 client->state = Client_connected; 285 return 0; 286} 287 288void fio_clients_terminate(void) 289{ 290 struct flist_head *entry; 291 struct fio_client *client; 292 293 dprint(FD_NET, "client: terminate clients\n"); 294 295 flist_for_each(entry, &client_list) { 296 client = flist_entry(entry, struct fio_client, list); 297 298 fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_QUIT, 0, NULL); 299 } 300} 301 302static void sig_int(int sig) 303{ 304 dprint(FD_NET, "client: got signal %d\n", sig); 305 fio_clients_terminate(); 306} 307 308static void client_signal_handler(void) 309{ 310 struct sigaction act; 311 312 memset(&act, 0, sizeof(act)); 313 act.sa_handler = sig_int; 314 act.sa_flags = SA_RESTART; 315 sigaction(SIGINT, &act, NULL); 316 317 memset(&act, 0, sizeof(act)); 318 act.sa_handler = sig_int; 319 act.sa_flags = SA_RESTART; 320 sigaction(SIGTERM, &act, NULL); 321} 322 323static void probe_client(struct fio_client *client) 324{ 325 dprint(FD_NET, "client: send probe\n"); 326 327 fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_PROBE, 0, &client->cmd_list); 328} 329 330static int send_client_cmd_line(struct fio_client *client) 331{ 332 struct cmd_single_line_pdu *cslp; 333 struct cmd_line_pdu *clp; 334 unsigned long offset; 335 unsigned int *lens; 336 void *pdu; 337 size_t mem; 338 int i, ret; 339 340 dprint(FD_NET, "client: send cmdline %d\n", client->argc); 341 342 lens = malloc(client->argc * sizeof(unsigned int)); 343 344 /* 345 * Find out how much mem we need 346 */ 347 for (i = 0, mem = 0; i < client->argc; i++) { 348 lens[i] = strlen(client->argv[i]) + 1; 349 mem += lens[i]; 350 } 351 352 /* 353 * We need one cmd_line_pdu, and argc number of cmd_single_line_pdu 354 */ 355 mem += sizeof(*clp) + (client->argc * sizeof(*cslp)); 356 357 pdu = malloc(mem); 358 clp = pdu; 359 offset = sizeof(*clp); 360 361 for (i = 0; i < client->argc; i++) { 362 uint16_t arg_len = lens[i]; 363 364 cslp = pdu + offset; 365 strcpy((char *) cslp->text, client->argv[i]); 366 cslp->len = cpu_to_le16(arg_len); 367 offset += sizeof(*cslp) + arg_len; 368 } 369 370 free(lens); 371 clp->lines = cpu_to_le16(client->argc); 372 ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOBLINE, pdu, mem, 0); 373 free(pdu); 374 return ret; 375} 376 377int fio_clients_connect(void) 378{ 379 struct fio_client *client; 380 struct flist_head *entry, *tmp; 381 int ret; 382 383 dprint(FD_NET, "client: connect all\n"); 384 385 client_signal_handler(); 386 387 flist_for_each_safe(entry, tmp, &client_list) { 388 client = flist_entry(entry, struct fio_client, list); 389 390 ret = fio_client_connect(client); 391 if (ret) { 392 remove_client(client); 393 continue; 394 } 395 396 probe_client(client); 397 398 if (client->argc > 1) 399 send_client_cmd_line(client); 400 } 401 402 return !nr_clients; 403} 404 405/* 406 * Send file contents to server backend. We could use sendfile(), but to remain 407 * more portable lets just read/write the darn thing. 408 */ 409static int fio_client_send_ini(struct fio_client *client, const char *filename) 410{ 411 struct stat sb; 412 char *p, *buf; 413 off_t len; 414 int fd, ret; 415 416 dprint(FD_NET, "send ini %s to %s\n", filename, client->hostname); 417 418 fd = open(filename, O_RDONLY); 419 if (fd < 0) { 420 log_err("fio: job file <%s> open: %s\n", filename, strerror(errno)); 421 return 1; 422 } 423 424 if (fstat(fd, &sb) < 0) { 425 log_err("fio: job file stat: %s\n", strerror(errno)); 426 close(fd); 427 return 1; 428 } 429 430 buf = malloc(sb.st_size); 431 432 len = sb.st_size; 433 p = buf; 434 do { 435 ret = read(fd, p, len); 436 if (ret > 0) { 437 len -= ret; 438 if (!len) 439 break; 440 p += ret; 441 continue; 442 } else if (!ret) 443 break; 444 else if (errno == EAGAIN || errno == EINTR) 445 continue; 446 } while (1); 447 448 if (len) { 449 log_err("fio: failed reading job file %s\n", filename); 450 close(fd); 451 free(buf); 452 return 1; 453 } 454 455 ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOB, buf, sb.st_size, 0); 456 free(buf); 457 close(fd); 458 return ret; 459} 460 461int fio_clients_send_ini(const char *filename) 462{ 463 struct fio_client *client; 464 struct flist_head *entry, *tmp; 465 466 flist_for_each_safe(entry, tmp, &client_list) { 467 client = flist_entry(entry, struct fio_client, list); 468 469 if (fio_client_send_ini(client, filename)) 470 remove_client(client); 471 } 472 473 return !nr_clients; 474} 475 476static void convert_io_stat(struct io_stat *dst, struct io_stat *src) 477{ 478 dst->max_val = le64_to_cpu(src->max_val); 479 dst->min_val = le64_to_cpu(src->min_val); 480 dst->samples = le64_to_cpu(src->samples); 481 482 /* 483 * Floats arrive as IEEE 754 encoded uint64_t, convert back to double 484 */ 485 dst->mean.u.f = fio_uint64_to_double(le64_to_cpu(dst->mean.u.i)); 486 dst->S.u.f = fio_uint64_to_double(le64_to_cpu(dst->S.u.i)); 487} 488 489static void convert_ts(struct thread_stat *dst, struct thread_stat *src) 490{ 491 int i, j; 492 493 dst->error = le32_to_cpu(src->error); 494 dst->groupid = le32_to_cpu(src->groupid); 495 dst->pid = le32_to_cpu(src->pid); 496 dst->members = le32_to_cpu(src->members); 497 498 for (i = 0; i < 2; i++) { 499 convert_io_stat(&dst->clat_stat[i], &src->clat_stat[i]); 500 convert_io_stat(&dst->slat_stat[i], &src->slat_stat[i]); 501 convert_io_stat(&dst->lat_stat[i], &src->lat_stat[i]); 502 convert_io_stat(&dst->bw_stat[i], &src->bw_stat[i]); 503 } 504 505 dst->usr_time = le64_to_cpu(src->usr_time); 506 dst->sys_time = le64_to_cpu(src->sys_time); 507 dst->ctx = le64_to_cpu(src->ctx); 508 dst->minf = le64_to_cpu(src->minf); 509 dst->majf = le64_to_cpu(src->majf); 510 dst->clat_percentiles = le64_to_cpu(src->clat_percentiles); 511 512 for (i = 0; i < FIO_IO_U_LIST_MAX_LEN; i++) { 513 fio_fp64_t *fps = &src->percentile_list[i]; 514 fio_fp64_t *fpd = &dst->percentile_list[i]; 515 516 fpd->u.f = fio_uint64_to_double(le64_to_cpu(fps->u.i)); 517 } 518 519 for (i = 0; i < FIO_IO_U_MAP_NR; i++) { 520 dst->io_u_map[i] = le32_to_cpu(src->io_u_map[i]); 521 dst->io_u_submit[i] = le32_to_cpu(src->io_u_submit[i]); 522 dst->io_u_complete[i] = le32_to_cpu(src->io_u_complete[i]); 523 } 524 525 for (i = 0; i < FIO_IO_U_LAT_U_NR; i++) { 526 dst->io_u_lat_u[i] = le32_to_cpu(src->io_u_lat_u[i]); 527 dst->io_u_lat_m[i] = le32_to_cpu(src->io_u_lat_m[i]); 528 } 529 530 for (i = 0; i < 2; i++) 531 for (j = 0; j < FIO_IO_U_PLAT_NR; j++) 532 dst->io_u_plat[i][j] = le32_to_cpu(src->io_u_plat[i][j]); 533 534 for (i = 0; i < 3; i++) { 535 dst->total_io_u[i] = le64_to_cpu(src->total_io_u[i]); 536 dst->short_io_u[i] = le64_to_cpu(src->short_io_u[i]); 537 } 538 539 dst->total_submit = le64_to_cpu(src->total_submit); 540 dst->total_complete = le64_to_cpu(src->total_complete); 541 542 for (i = 0; i < 2; i++) { 543 dst->io_bytes[i] = le64_to_cpu(src->io_bytes[i]); 544 dst->runtime[i] = le64_to_cpu(src->runtime[i]); 545 } 546 547 dst->total_run_time = le64_to_cpu(src->total_run_time); 548 dst->continue_on_error = le16_to_cpu(src->continue_on_error); 549 dst->total_err_count = le64_to_cpu(src->total_err_count); 550 dst->first_error = le32_to_cpu(src->first_error); 551 dst->kb_base = le32_to_cpu(src->kb_base); 552} 553 554static void convert_gs(struct group_run_stats *dst, struct group_run_stats *src) 555{ 556 int i; 557 558 for (i = 0; i < 2; i++) { 559 dst->max_run[i] = le64_to_cpu(src->max_run[i]); 560 dst->min_run[i] = le64_to_cpu(src->min_run[i]); 561 dst->max_bw[i] = le64_to_cpu(src->max_bw[i]); 562 dst->min_bw[i] = le64_to_cpu(src->min_bw[i]); 563 dst->io_kb[i] = le64_to_cpu(src->io_kb[i]); 564 dst->agg[i] = le64_to_cpu(src->agg[i]); 565 } 566 567 dst->kb_base = le32_to_cpu(src->kb_base); 568 dst->groupid = le32_to_cpu(src->groupid); 569} 570 571static void handle_ts(struct fio_net_cmd *cmd) 572{ 573 struct cmd_ts_pdu *p = (struct cmd_ts_pdu *) cmd->payload; 574 575 convert_ts(&p->ts, &p->ts); 576 convert_gs(&p->rs, &p->rs); 577 578 show_thread_status(&p->ts, &p->rs); 579 580 if (sum_stat_clients == 1) 581 return; 582 583 sum_thread_stats(&client_ts, &p->ts, sum_stat_nr); 584 sum_group_stats(&client_gs, &p->rs); 585 586 client_ts.members++; 587 client_ts.groupid = p->ts.groupid; 588 589 if (++sum_stat_nr == sum_stat_clients) { 590 strcpy(client_ts.name, "All clients"); 591 show_thread_status(&client_ts, &client_gs); 592 } 593} 594 595static void handle_gs(struct fio_net_cmd *cmd) 596{ 597 struct group_run_stats *gs = (struct group_run_stats *) cmd->payload; 598 599 convert_gs(gs, gs); 600 show_group_stats(gs); 601} 602 603static void convert_jobs_eta(struct jobs_eta *je) 604{ 605 int i; 606 607 je->nr_running = le32_to_cpu(je->nr_running); 608 je->nr_ramp = le32_to_cpu(je->nr_ramp); 609 je->nr_pending = le32_to_cpu(je->nr_pending); 610 je->files_open = le32_to_cpu(je->files_open); 611 je->m_rate = le32_to_cpu(je->m_rate); 612 je->t_rate = le32_to_cpu(je->t_rate); 613 je->m_iops = le32_to_cpu(je->m_iops); 614 je->t_iops = le32_to_cpu(je->t_iops); 615 616 for (i = 0; i < 2; i++) { 617 je->rate[i] = le32_to_cpu(je->rate[i]); 618 je->iops[i] = le32_to_cpu(je->iops[i]); 619 } 620 621 je->elapsed_sec = le64_to_cpu(je->elapsed_sec); 622 je->eta_sec = le64_to_cpu(je->eta_sec); 623} 624 625static void sum_jobs_eta(struct jobs_eta *dst, struct jobs_eta *je) 626{ 627 int i; 628 629 dst->nr_running += je->nr_running; 630 dst->nr_ramp += je->nr_ramp; 631 dst->nr_pending += je->nr_pending; 632 dst->files_open += je->files_open; 633 dst->m_rate += je->m_rate; 634 dst->t_rate += je->t_rate; 635 dst->m_iops += je->m_iops; 636 dst->t_iops += je->t_iops; 637 638 for (i = 0; i < 2; i++) { 639 dst->rate[i] += je->rate[i]; 640 dst->iops[i] += je->iops[i]; 641 } 642 643 dst->elapsed_sec += je->elapsed_sec; 644 645 if (je->eta_sec > dst->eta_sec) 646 dst->eta_sec = je->eta_sec; 647} 648 649static void dec_jobs_eta(struct client_eta *eta) 650{ 651 if (!--eta->pending) { 652 display_thread_status(&eta->eta); 653 free(eta); 654 } 655} 656 657static void remove_reply_cmd(struct fio_client *client, struct fio_net_cmd *cmd) 658{ 659 struct fio_net_int_cmd *icmd = NULL; 660 struct flist_head *entry; 661 662 flist_for_each(entry, &client->cmd_list) { 663 icmd = flist_entry(entry, struct fio_net_int_cmd, list); 664 665 if (cmd->tag == (uint64_t) icmd) 666 break; 667 668 icmd = NULL; 669 } 670 671 if (!icmd) { 672 log_err("fio: client: unable to find matching tag\n"); 673 return; 674 } 675 676 flist_del(&icmd->list); 677 cmd->tag = icmd->saved_tag; 678 free(icmd); 679} 680 681static void handle_eta(struct fio_client *client, struct fio_net_cmd *cmd) 682{ 683 struct jobs_eta *je = (struct jobs_eta *) cmd->payload; 684 struct client_eta *eta = (struct client_eta *) cmd->tag; 685 686 dprint(FD_NET, "client: got eta tag %p, %d\n", eta, eta->pending); 687 688 assert(client->eta_in_flight == eta); 689 690 client->eta_in_flight = NULL; 691 flist_del_init(&client->eta_list); 692 693 convert_jobs_eta(je); 694 sum_jobs_eta(&eta->eta, je); 695 dec_jobs_eta(eta); 696} 697 698static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd) 699{ 700 struct cmd_probe_pdu *probe = (struct cmd_probe_pdu *) cmd->payload; 701 const char *os, *arch; 702 703 os = fio_get_os_string(probe->os); 704 if (!os) 705 os = "unknown"; 706 707 arch = fio_get_arch_string(probe->arch); 708 if (!arch) 709 os = "unknown"; 710 711 log_info("hostname=%s, be=%u, os=%s, arch=%s, fio=%u.%u.%u\n", 712 probe->hostname, probe->bigendian, os, arch, probe->fio_major, 713 probe->fio_minor, probe->fio_patch); 714 715 if (!client->name) 716 client->name = strdup((char *) probe->hostname); 717} 718 719static int handle_client(struct fio_client *client) 720{ 721 struct fio_net_cmd *cmd; 722 723 dprint(FD_NET, "client: handle %s\n", client->hostname); 724 725 cmd = fio_net_recv_cmd(client->fd); 726 if (!cmd) 727 return 0; 728 729 dprint(FD_NET, "client: got cmd op %s from %s\n", 730 fio_server_op(cmd->opcode), client->hostname); 731 732 switch (cmd->opcode) { 733 case FIO_NET_CMD_QUIT: 734 remove_client(client); 735 free(cmd); 736 break; 737 case FIO_NET_CMD_TEXT: { 738 const char *buf = (const char *) cmd->payload; 739 const char *name; 740 int fio_unused ret; 741 742 name = client->name ? client->name : client->hostname; 743 744 if (!client->skip_newline) 745 fprintf(f_out, "<%s> ", name); 746 ret = fwrite(buf, cmd->pdu_len, 1, f_out); 747 fflush(f_out); 748 client->skip_newline = strchr(buf, '\n') == NULL; 749 free(cmd); 750 break; 751 } 752 case FIO_NET_CMD_TS: 753 handle_ts(cmd); 754 free(cmd); 755 break; 756 case FIO_NET_CMD_GS: 757 handle_gs(cmd); 758 free(cmd); 759 break; 760 case FIO_NET_CMD_ETA: 761 remove_reply_cmd(client, cmd); 762 handle_eta(client, cmd); 763 free(cmd); 764 break; 765 case FIO_NET_CMD_PROBE: 766 remove_reply_cmd(client, cmd); 767 handle_probe(client, cmd); 768 free(cmd); 769 break; 770 case FIO_NET_CMD_START: 771 client->state = Client_started; 772 free(cmd); 773 break; 774 case FIO_NET_CMD_STOP: 775 client->state = Client_stopped; 776 free(cmd); 777 break; 778 default: 779 log_err("fio: unknown client op: %s\n", fio_server_op(cmd->opcode)); 780 free(cmd); 781 break; 782 } 783 784 return 1; 785} 786 787static void request_client_etas(void) 788{ 789 struct fio_client *client; 790 struct flist_head *entry; 791 struct client_eta *eta; 792 int skipped = 0; 793 794 dprint(FD_NET, "client: request eta (%d)\n", nr_clients); 795 796 eta = malloc(sizeof(*eta)); 797 memset(&eta->eta, 0, sizeof(eta->eta)); 798 eta->pending = nr_clients; 799 800 flist_for_each(entry, &client_list) { 801 client = flist_entry(entry, struct fio_client, list); 802 803 if (!flist_empty(&client->eta_list)) { 804 skipped++; 805 continue; 806 } 807 808 assert(!client->eta_in_flight); 809 flist_add_tail(&client->eta_list, &eta_list); 810 client->eta_in_flight = eta; 811 fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_SEND_ETA, 812 (uint64_t) eta, &client->cmd_list); 813 } 814 815 while (skipped--) 816 dec_jobs_eta(eta); 817 818 dprint(FD_NET, "client: requested eta tag %p\n", eta); 819} 820 821static int client_check_cmd_timeout(struct fio_client *client, 822 struct timeval *now) 823{ 824 struct fio_net_int_cmd *cmd; 825 struct flist_head *entry, *tmp; 826 int ret = 0; 827 828 flist_for_each_safe(entry, tmp, &client->cmd_list) { 829 cmd = flist_entry(entry, struct fio_net_int_cmd, list); 830 831 if (mtime_since(&cmd->tv, now) < FIO_NET_CLIENT_TIMEOUT) 832 continue; 833 834 log_err("fio: client %s, timeout on cmd %s\n", client->hostname, 835 fio_server_op(cmd->cmd.opcode)); 836 flist_del(&cmd->list); 837 free(cmd); 838 ret = 1; 839 } 840 841 return flist_empty(&client->cmd_list) && ret; 842} 843 844static int fio_client_timed_out(void) 845{ 846 struct fio_client *client; 847 struct flist_head *entry, *tmp; 848 struct timeval tv; 849 int ret = 0; 850 851 gettimeofday(&tv, NULL); 852 853 flist_for_each_safe(entry, tmp, &client_list) { 854 client = flist_entry(entry, struct fio_client, list); 855 856 if (flist_empty(&client->cmd_list)) 857 continue; 858 859 if (!client_check_cmd_timeout(client, &tv)) 860 continue; 861 862 log_err("fio: client %s timed out\n", client->hostname); 863 remove_client(client); 864 ret = 1; 865 } 866 867 return ret; 868} 869 870int fio_handle_clients(void) 871{ 872 struct fio_client *client; 873 struct flist_head *entry; 874 struct pollfd *pfds; 875 int i, ret = 0; 876 877 gettimeofday(&eta_tv, NULL); 878 879 pfds = malloc(nr_clients * sizeof(struct pollfd)); 880 881 sum_stat_clients = nr_clients; 882 init_thread_stat(&client_ts); 883 init_group_run_stat(&client_gs); 884 885 while (!exit_backend && nr_clients) { 886 i = 0; 887 flist_for_each(entry, &client_list) { 888 client = flist_entry(entry, struct fio_client, list); 889 890 pfds[i].fd = client->fd; 891 pfds[i].events = POLLIN; 892 i++; 893 } 894 895 assert(i == nr_clients); 896 897 do { 898 struct timeval tv; 899 900 gettimeofday(&tv, NULL); 901 if (mtime_since(&eta_tv, &tv) >= 900) { 902 request_client_etas(); 903 memcpy(&eta_tv, &tv, sizeof(tv)); 904 905 if (fio_client_timed_out()) 906 break; 907 } 908 909 ret = poll(pfds, nr_clients, 100); 910 if (ret < 0) { 911 if (errno == EINTR) 912 continue; 913 log_err("fio: poll clients: %s\n", strerror(errno)); 914 break; 915 } else if (!ret) 916 continue; 917 } while (ret <= 0); 918 919 for (i = 0; i < nr_clients; i++) { 920 if (!(pfds[i].revents & POLLIN)) 921 continue; 922 923 client = find_client_by_fd(pfds[i].fd); 924 if (!client) { 925 log_err("fio: unknown client fd %d\n", pfds[i].fd); 926 continue; 927 } 928 if (!handle_client(client)) { 929 log_info("client: host=%s disconnected\n", 930 client->hostname); 931 remove_client(client); 932 } 933 } 934 } 935 936 free(pfds); 937 return 0; 938} 939