backend.c revision 621677626f2551bedfdc4a5fc3b3e5f8492b94fa
1/* 2 * fio - the flexible io tester 3 * 4 * Copyright (C) 2005 Jens Axboe <axboe@suse.de> 5 * Copyright (C) 2006-2012 Jens Axboe <axboe@kernel.dk> 6 * 7 * The license below covers all files distributed with fio unless otherwise 8 * noted in the file itself. 9 * 10 * This program is free software; you can redistribute it and/or modify 11 * it under the terms of the GNU General Public License version 2 as 12 * published by the Free Software Foundation. 13 * 14 * This program is distributed in the hope that it will be useful, 15 * but WITHOUT ANY WARRANTY; without even the implied warranty of 16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 * GNU General Public License for more details. 18 * 19 * You should have received a copy of the GNU General Public License 20 * along with this program; if not, write to the Free Software 21 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 22 * 23 */ 24#include <unistd.h> 25#include <fcntl.h> 26#include <string.h> 27#include <limits.h> 28#include <signal.h> 29#include <time.h> 30#include <locale.h> 31#include <assert.h> 32#include <time.h> 33#include <inttypes.h> 34#include <sys/stat.h> 35#include <sys/wait.h> 36#include <sys/ipc.h> 37#include <sys/mman.h> 38 39#include "fio.h" 40#ifndef FIO_NO_HAVE_SHM_H 41#include <sys/shm.h> 42#endif 43#include "hash.h" 44#include "smalloc.h" 45#include "verify.h" 46#include "trim.h" 47#include "diskutil.h" 48#include "cgroup.h" 49#include "profile.h" 50#include "lib/rand.h" 51#include "memalign.h" 52#include "server.h" 53#include "lib/getrusage.h" 54#include "idletime.h" 55 56static pthread_t disk_util_thread; 57static struct fio_mutex *disk_thread_mutex; 58static struct fio_mutex *startup_mutex; 59static struct fio_mutex *writeout_mutex; 60static struct flist_head *cgroup_list; 61static char *cgroup_mnt; 62static int exit_value; 63static volatile int fio_abort; 64static unsigned int nr_process = 0; 65static unsigned int nr_thread = 0; 66 67struct io_log *agg_io_log[DDIR_RWDIR_CNT]; 68 69int groupid = 0; 70unsigned int thread_number = 0; 71unsigned int stat_number = 0; 72int shm_id = 0; 73int temp_stall_ts; 74unsigned long done_secs = 0; 75volatile int disk_util_exit = 0; 76 77#define PAGE_ALIGN(buf) \ 78 (char *) (((uintptr_t) (buf) + page_mask) & ~page_mask) 79 80#define JOB_START_TIMEOUT (5 * 1000) 81 82static void sig_int(int sig) 83{ 84 if (threads) { 85 if (is_backend) 86 fio_server_got_signal(sig); 87 else { 88 log_info("\nfio: terminating on signal %d\n", sig); 89 fflush(stdout); 90 exit_value = 128; 91 } 92 93 fio_terminate_threads(TERMINATE_ALL); 94 } 95} 96 97static void sig_show_status(int sig) 98{ 99 show_running_run_stats(); 100} 101 102static void set_sig_handlers(void) 103{ 104 struct sigaction act; 105 106 memset(&act, 0, sizeof(act)); 107 act.sa_handler = sig_int; 108 act.sa_flags = SA_RESTART; 109 sigaction(SIGINT, &act, NULL); 110 111 memset(&act, 0, sizeof(act)); 112 act.sa_handler = sig_int; 113 act.sa_flags = SA_RESTART; 114 sigaction(SIGTERM, &act, NULL); 115 116/* Windows uses SIGBREAK as a quit signal from other applications */ 117#ifdef WIN32 118 memset(&act, 0, sizeof(act)); 119 act.sa_handler = sig_int; 120 act.sa_flags = SA_RESTART; 121 sigaction(SIGBREAK, &act, NULL); 122#endif 123 124 memset(&act, 0, sizeof(act)); 125 act.sa_handler = sig_show_status; 126 act.sa_flags = SA_RESTART; 127 sigaction(SIGUSR1, &act, NULL); 128 129 if (is_backend) { 130 memset(&act, 0, sizeof(act)); 131 act.sa_handler = sig_int; 132 act.sa_flags = SA_RESTART; 133 sigaction(SIGPIPE, &act, NULL); 134 } 135} 136 137/* 138 * Check if we are above the minimum rate given. 139 */ 140static int __check_min_rate(struct thread_data *td, struct timeval *now, 141 enum fio_ddir ddir) 142{ 143 unsigned long long bytes = 0; 144 unsigned long iops = 0; 145 unsigned long spent; 146 unsigned long rate; 147 unsigned int ratemin = 0; 148 unsigned int rate_iops = 0; 149 unsigned int rate_iops_min = 0; 150 151 assert(ddir_rw(ddir)); 152 153 if (!td->o.ratemin[ddir] && !td->o.rate_iops_min[ddir]) 154 return 0; 155 156 /* 157 * allow a 2 second settle period in the beginning 158 */ 159 if (mtime_since(&td->start, now) < 2000) 160 return 0; 161 162 iops += td->this_io_blocks[ddir]; 163 bytes += td->this_io_bytes[ddir]; 164 ratemin += td->o.ratemin[ddir]; 165 rate_iops += td->o.rate_iops[ddir]; 166 rate_iops_min += td->o.rate_iops_min[ddir]; 167 168 /* 169 * if rate blocks is set, sample is running 170 */ 171 if (td->rate_bytes[ddir] || td->rate_blocks[ddir]) { 172 spent = mtime_since(&td->lastrate[ddir], now); 173 if (spent < td->o.ratecycle) 174 return 0; 175 176 if (td->o.rate[ddir]) { 177 /* 178 * check bandwidth specified rate 179 */ 180 if (bytes < td->rate_bytes[ddir]) { 181 log_err("%s: min rate %u not met\n", td->o.name, 182 ratemin); 183 return 1; 184 } else { 185 rate = ((bytes - td->rate_bytes[ddir]) * 1000) / spent; 186 if (rate < ratemin || 187 bytes < td->rate_bytes[ddir]) { 188 log_err("%s: min rate %u not met, got" 189 " %luKB/sec\n", td->o.name, 190 ratemin, rate); 191 return 1; 192 } 193 } 194 } else { 195 /* 196 * checks iops specified rate 197 */ 198 if (iops < rate_iops) { 199 log_err("%s: min iops rate %u not met\n", 200 td->o.name, rate_iops); 201 return 1; 202 } else { 203 rate = ((iops - td->rate_blocks[ddir]) * 1000) / spent; 204 if (rate < rate_iops_min || 205 iops < td->rate_blocks[ddir]) { 206 log_err("%s: min iops rate %u not met," 207 " got %lu\n", td->o.name, 208 rate_iops_min, rate); 209 } 210 } 211 } 212 } 213 214 td->rate_bytes[ddir] = bytes; 215 td->rate_blocks[ddir] = iops; 216 memcpy(&td->lastrate[ddir], now, sizeof(*now)); 217 return 0; 218} 219 220static int check_min_rate(struct thread_data *td, struct timeval *now, 221 uint64_t *bytes_done) 222{ 223 int ret = 0; 224 225 if (bytes_done[DDIR_READ]) 226 ret |= __check_min_rate(td, now, DDIR_READ); 227 if (bytes_done[DDIR_WRITE]) 228 ret |= __check_min_rate(td, now, DDIR_WRITE); 229 if (bytes_done[DDIR_TRIM]) 230 ret |= __check_min_rate(td, now, DDIR_TRIM); 231 232 return ret; 233} 234 235/* 236 * When job exits, we can cancel the in-flight IO if we are using async 237 * io. Attempt to do so. 238 */ 239static void cleanup_pending_aio(struct thread_data *td) 240{ 241 int r; 242 243 /* 244 * get immediately available events, if any 245 */ 246 r = io_u_queued_complete(td, 0, NULL); 247 if (r < 0) 248 return; 249 250 /* 251 * now cancel remaining active events 252 */ 253 if (td->io_ops->cancel) { 254 struct io_u *io_u; 255 int i; 256 257 io_u_qiter(&td->io_u_all, io_u, i) { 258 if (io_u->flags & IO_U_F_FLIGHT) { 259 r = td->io_ops->cancel(td, io_u); 260 if (!r) 261 put_io_u(td, io_u); 262 } 263 } 264 } 265 266 if (td->cur_depth) 267 r = io_u_queued_complete(td, td->cur_depth, NULL); 268} 269 270/* 271 * Helper to handle the final sync of a file. Works just like the normal 272 * io path, just does everything sync. 273 */ 274static int fio_io_sync(struct thread_data *td, struct fio_file *f) 275{ 276 struct io_u *io_u = __get_io_u(td); 277 int ret; 278 279 if (!io_u) 280 return 1; 281 282 io_u->ddir = DDIR_SYNC; 283 io_u->file = f; 284 285 if (td_io_prep(td, io_u)) { 286 put_io_u(td, io_u); 287 return 1; 288 } 289 290requeue: 291 ret = td_io_queue(td, io_u); 292 if (ret < 0) { 293 td_verror(td, io_u->error, "td_io_queue"); 294 put_io_u(td, io_u); 295 return 1; 296 } else if (ret == FIO_Q_QUEUED) { 297 if (io_u_queued_complete(td, 1, NULL) < 0) 298 return 1; 299 } else if (ret == FIO_Q_COMPLETED) { 300 if (io_u->error) { 301 td_verror(td, io_u->error, "td_io_queue"); 302 return 1; 303 } 304 305 if (io_u_sync_complete(td, io_u, NULL) < 0) 306 return 1; 307 } else if (ret == FIO_Q_BUSY) { 308 if (td_io_commit(td)) 309 return 1; 310 goto requeue; 311 } 312 313 return 0; 314} 315 316static int fio_file_fsync(struct thread_data *td, struct fio_file *f) 317{ 318 int ret; 319 320 if (fio_file_open(f)) 321 return fio_io_sync(td, f); 322 323 if (td_io_open_file(td, f)) 324 return 1; 325 326 ret = fio_io_sync(td, f); 327 td_io_close_file(td, f); 328 return ret; 329} 330 331static inline void __update_tv_cache(struct thread_data *td) 332{ 333 fio_gettime(&td->tv_cache, NULL); 334} 335 336static inline void update_tv_cache(struct thread_data *td) 337{ 338 if ((++td->tv_cache_nr & td->tv_cache_mask) == td->tv_cache_mask) 339 __update_tv_cache(td); 340} 341 342static inline int runtime_exceeded(struct thread_data *td, struct timeval *t) 343{ 344 if (in_ramp_time(td)) 345 return 0; 346 if (!td->o.timeout) 347 return 0; 348 if (mtime_since(&td->epoch, t) >= td->o.timeout * 1000) 349 return 1; 350 351 return 0; 352} 353 354static int break_on_this_error(struct thread_data *td, enum fio_ddir ddir, 355 int *retptr) 356{ 357 int ret = *retptr; 358 359 if (ret < 0 || td->error) { 360 int err = td->error; 361 enum error_type_bit eb; 362 363 if (ret < 0) 364 err = -ret; 365 366 eb = td_error_type(ddir, err); 367 if (!(td->o.continue_on_error & (1 << eb))) 368 return 1; 369 370 if (td_non_fatal_error(td, eb, err)) { 371 /* 372 * Continue with the I/Os in case of 373 * a non fatal error. 374 */ 375 update_error_count(td, err); 376 td_clear_error(td); 377 *retptr = 0; 378 return 0; 379 } else if (td->o.fill_device && err == ENOSPC) { 380 /* 381 * We expect to hit this error if 382 * fill_device option is set. 383 */ 384 td_clear_error(td); 385 td->terminate = 1; 386 return 1; 387 } else { 388 /* 389 * Stop the I/O in case of a fatal 390 * error. 391 */ 392 update_error_count(td, err); 393 return 1; 394 } 395 } 396 397 return 0; 398} 399 400static void check_update_rusage(struct thread_data *td) 401{ 402 if (td->update_rusage) { 403 td->update_rusage = 0; 404 update_rusage_stat(td); 405 fio_mutex_up(td->rusage_sem); 406 } 407} 408 409/* 410 * The main verify engine. Runs over the writes we previously submitted, 411 * reads the blocks back in, and checks the crc/md5 of the data. 412 */ 413static void do_verify(struct thread_data *td, uint64_t verify_bytes) 414{ 415 uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 }; 416 struct fio_file *f; 417 struct io_u *io_u; 418 int ret, min_events; 419 unsigned int i; 420 421 dprint(FD_VERIFY, "starting loop\n"); 422 423 /* 424 * sync io first and invalidate cache, to make sure we really 425 * read from disk. 426 */ 427 for_each_file(td, f, i) { 428 if (!fio_file_open(f)) 429 continue; 430 if (fio_io_sync(td, f)) 431 break; 432 if (file_invalidate_cache(td, f)) 433 break; 434 } 435 436 check_update_rusage(td); 437 438 if (td->error) 439 return; 440 441 td_set_runstate(td, TD_VERIFYING); 442 443 io_u = NULL; 444 while (!td->terminate) { 445 enum fio_ddir ddir; 446 int ret2, full; 447 448 update_tv_cache(td); 449 check_update_rusage(td); 450 451 if (runtime_exceeded(td, &td->tv_cache)) { 452 __update_tv_cache(td); 453 if (runtime_exceeded(td, &td->tv_cache)) { 454 td->terminate = 1; 455 break; 456 } 457 } 458 459 if (flow_threshold_exceeded(td)) 460 continue; 461 462 if (!td->o.experimental_verify) { 463 io_u = __get_io_u(td); 464 if (!io_u) 465 break; 466 467 if (get_next_verify(td, io_u)) { 468 put_io_u(td, io_u); 469 break; 470 } 471 472 if (td_io_prep(td, io_u)) { 473 put_io_u(td, io_u); 474 break; 475 } 476 } else { 477 if (ddir_rw_sum(bytes_done) + td->o.rw_min_bs > verify_bytes) 478 break; 479 480 while ((io_u = get_io_u(td)) != NULL) { 481 /* 482 * We are only interested in the places where 483 * we wrote or trimmed IOs. Turn those into 484 * reads for verification purposes. 485 */ 486 if (io_u->ddir == DDIR_READ) { 487 /* 488 * Pretend we issued it for rwmix 489 * accounting 490 */ 491 td->io_issues[DDIR_READ]++; 492 put_io_u(td, io_u); 493 continue; 494 } else if (io_u->ddir == DDIR_TRIM) { 495 io_u->ddir = DDIR_READ; 496 io_u->flags |= IO_U_F_TRIMMED; 497 break; 498 } else if (io_u->ddir == DDIR_WRITE) { 499 io_u->ddir = DDIR_READ; 500 break; 501 } else { 502 put_io_u(td, io_u); 503 continue; 504 } 505 } 506 507 if (!io_u) 508 break; 509 } 510 511 if (td->o.verify_async) 512 io_u->end_io = verify_io_u_async; 513 else 514 io_u->end_io = verify_io_u; 515 516 ddir = io_u->ddir; 517 518 ret = td_io_queue(td, io_u); 519 switch (ret) { 520 case FIO_Q_COMPLETED: 521 if (io_u->error) { 522 ret = -io_u->error; 523 clear_io_u(td, io_u); 524 } else if (io_u->resid) { 525 int bytes = io_u->xfer_buflen - io_u->resid; 526 527 /* 528 * zero read, fail 529 */ 530 if (!bytes) { 531 td_verror(td, EIO, "full resid"); 532 put_io_u(td, io_u); 533 break; 534 } 535 536 io_u->xfer_buflen = io_u->resid; 537 io_u->xfer_buf += bytes; 538 io_u->offset += bytes; 539 540 if (ddir_rw(io_u->ddir)) 541 td->ts.short_io_u[io_u->ddir]++; 542 543 f = io_u->file; 544 if (io_u->offset == f->real_file_size) 545 goto sync_done; 546 547 requeue_io_u(td, &io_u); 548 } else { 549sync_done: 550 ret = io_u_sync_complete(td, io_u, bytes_done); 551 if (ret < 0) 552 break; 553 } 554 continue; 555 case FIO_Q_QUEUED: 556 break; 557 case FIO_Q_BUSY: 558 requeue_io_u(td, &io_u); 559 ret2 = td_io_commit(td); 560 if (ret2 < 0) 561 ret = ret2; 562 break; 563 default: 564 assert(ret < 0); 565 td_verror(td, -ret, "td_io_queue"); 566 break; 567 } 568 569 if (break_on_this_error(td, ddir, &ret)) 570 break; 571 572 /* 573 * if we can queue more, do so. but check if there are 574 * completed io_u's first. Note that we can get BUSY even 575 * without IO queued, if the system is resource starved. 576 */ 577 full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth); 578 if (full || !td->o.iodepth_batch_complete) { 579 min_events = min(td->o.iodepth_batch_complete, 580 td->cur_depth); 581 /* 582 * if the queue is full, we MUST reap at least 1 event 583 */ 584 if (full && !min_events) 585 min_events = 1; 586 587 do { 588 /* 589 * Reap required number of io units, if any, 590 * and do the verification on them through 591 * the callback handler 592 */ 593 if (io_u_queued_complete(td, min_events, bytes_done) < 0) { 594 ret = -1; 595 break; 596 } 597 } while (full && (td->cur_depth > td->o.iodepth_low)); 598 } 599 if (ret < 0) 600 break; 601 } 602 603 check_update_rusage(td); 604 605 if (!td->error) { 606 min_events = td->cur_depth; 607 608 if (min_events) 609 ret = io_u_queued_complete(td, min_events, NULL); 610 } else 611 cleanup_pending_aio(td); 612 613 td_set_runstate(td, TD_RUNNING); 614 615 dprint(FD_VERIFY, "exiting loop\n"); 616} 617 618static int io_bytes_exceeded(struct thread_data *td) 619{ 620 unsigned long long bytes; 621 622 if (td_rw(td)) 623 bytes = td->this_io_bytes[DDIR_READ] + td->this_io_bytes[DDIR_WRITE]; 624 else if (td_write(td)) 625 bytes = td->this_io_bytes[DDIR_WRITE]; 626 else if (td_read(td)) 627 bytes = td->this_io_bytes[DDIR_READ]; 628 else 629 bytes = td->this_io_bytes[DDIR_TRIM]; 630 631 return bytes >= td->o.size; 632} 633 634/* 635 * Main IO worker function. It retrieves io_u's to process and queues 636 * and reaps them, checking for rate and errors along the way. 637 * 638 * Returns number of bytes written and trimmed. 639 */ 640static uint64_t do_io(struct thread_data *td) 641{ 642 uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 }; 643 unsigned int i; 644 int ret = 0; 645 uint64_t bytes_issued = 0; 646 647 if (in_ramp_time(td)) 648 td_set_runstate(td, TD_RAMP); 649 else 650 td_set_runstate(td, TD_RUNNING); 651 652 lat_target_init(td); 653 654 while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) || 655 (!flist_empty(&td->trim_list)) || !io_bytes_exceeded(td) || 656 td->o.time_based) { 657 struct timeval comp_time; 658 int min_evts = 0; 659 struct io_u *io_u; 660 int ret2, full; 661 enum fio_ddir ddir; 662 663 check_update_rusage(td); 664 665 if (td->terminate || td->done) 666 break; 667 668 update_tv_cache(td); 669 670 if (runtime_exceeded(td, &td->tv_cache)) { 671 __update_tv_cache(td); 672 if (runtime_exceeded(td, &td->tv_cache)) { 673 td->terminate = 1; 674 break; 675 } 676 } 677 678 if (flow_threshold_exceeded(td)) 679 continue; 680 681 if (bytes_issued >= (uint64_t) td->o.size) 682 break; 683 684 io_u = get_io_u(td); 685 if (!io_u) { 686 if (td->o.latency_target) 687 goto reap; 688 break; 689 } 690 691 ddir = io_u->ddir; 692 693 /* 694 * Add verification end_io handler if: 695 * - Asked to verify (!td_rw(td)) 696 * - Or the io_u is from our verify list (mixed write/ver) 697 */ 698 if (td->o.verify != VERIFY_NONE && io_u->ddir == DDIR_READ && 699 ((io_u->flags & IO_U_F_VER_LIST) || !td_rw(td))) { 700 if (td->o.verify_async) 701 io_u->end_io = verify_io_u_async; 702 else 703 io_u->end_io = verify_io_u; 704 td_set_runstate(td, TD_VERIFYING); 705 } else if (in_ramp_time(td)) 706 td_set_runstate(td, TD_RAMP); 707 else 708 td_set_runstate(td, TD_RUNNING); 709 710 ret = td_io_queue(td, io_u); 711 switch (ret) { 712 case FIO_Q_COMPLETED: 713 if (io_u->error) { 714 ret = -io_u->error; 715 clear_io_u(td, io_u); 716 } else if (io_u->resid) { 717 int bytes = io_u->xfer_buflen - io_u->resid; 718 struct fio_file *f = io_u->file; 719 720 bytes_issued += bytes; 721 /* 722 * zero read, fail 723 */ 724 if (!bytes) { 725 td_verror(td, EIO, "full resid"); 726 put_io_u(td, io_u); 727 break; 728 } 729 730 io_u->xfer_buflen = io_u->resid; 731 io_u->xfer_buf += bytes; 732 io_u->offset += bytes; 733 734 if (ddir_rw(io_u->ddir)) 735 td->ts.short_io_u[io_u->ddir]++; 736 737 if (io_u->offset == f->real_file_size) 738 goto sync_done; 739 740 requeue_io_u(td, &io_u); 741 } else { 742sync_done: 743 if (__should_check_rate(td, DDIR_READ) || 744 __should_check_rate(td, DDIR_WRITE) || 745 __should_check_rate(td, DDIR_TRIM)) 746 fio_gettime(&comp_time, NULL); 747 748 ret = io_u_sync_complete(td, io_u, bytes_done); 749 if (ret < 0) 750 break; 751 bytes_issued += io_u->xfer_buflen; 752 } 753 break; 754 case FIO_Q_QUEUED: 755 /* 756 * if the engine doesn't have a commit hook, 757 * the io_u is really queued. if it does have such 758 * a hook, it has to call io_u_queued() itself. 759 */ 760 if (td->io_ops->commit == NULL) 761 io_u_queued(td, io_u); 762 bytes_issued += io_u->xfer_buflen; 763 break; 764 case FIO_Q_BUSY: 765 requeue_io_u(td, &io_u); 766 ret2 = td_io_commit(td); 767 if (ret2 < 0) 768 ret = ret2; 769 break; 770 default: 771 assert(ret < 0); 772 put_io_u(td, io_u); 773 break; 774 } 775 776 if (break_on_this_error(td, ddir, &ret)) 777 break; 778 779 /* 780 * See if we need to complete some commands. Note that we 781 * can get BUSY even without IO queued, if the system is 782 * resource starved. 783 */ 784reap: 785 full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth); 786 if (full || !td->o.iodepth_batch_complete) { 787 min_evts = min(td->o.iodepth_batch_complete, 788 td->cur_depth); 789 /* 790 * if the queue is full, we MUST reap at least 1 event 791 */ 792 if (full && !min_evts) 793 min_evts = 1; 794 795 if (__should_check_rate(td, DDIR_READ) || 796 __should_check_rate(td, DDIR_WRITE) || 797 __should_check_rate(td, DDIR_TRIM)) 798 fio_gettime(&comp_time, NULL); 799 800 do { 801 ret = io_u_queued_complete(td, min_evts, bytes_done); 802 if (ret < 0) 803 break; 804 805 } while (full && (td->cur_depth > td->o.iodepth_low)); 806 } 807 808 if (ret < 0) 809 break; 810 if (!ddir_rw_sum(bytes_done) && !(td->io_ops->flags & FIO_NOIO)) 811 continue; 812 813 if (!in_ramp_time(td) && should_check_rate(td, bytes_done)) { 814 if (check_min_rate(td, &comp_time, bytes_done)) { 815 if (exitall_on_terminate) 816 fio_terminate_threads(td->groupid); 817 td_verror(td, EIO, "check_min_rate"); 818 break; 819 } 820 } 821 if (!in_ramp_time(td) && td->o.latency_target) 822 lat_target_check(td); 823 824 if (td->o.thinktime) { 825 unsigned long long b; 826 827 b = ddir_rw_sum(td->io_blocks); 828 if (!(b % td->o.thinktime_blocks)) { 829 int left; 830 831 io_u_quiesce(td); 832 833 if (td->o.thinktime_spin) 834 usec_spin(td->o.thinktime_spin); 835 836 left = td->o.thinktime - td->o.thinktime_spin; 837 if (left) 838 usec_sleep(td, left); 839 } 840 } 841 } 842 843 check_update_rusage(td); 844 845 if (td->trim_entries) 846 log_err("fio: %lu trim entries leaked?\n", td->trim_entries); 847 848 if (td->o.fill_device && td->error == ENOSPC) { 849 td->error = 0; 850 td->terminate = 1; 851 } 852 if (!td->error) { 853 struct fio_file *f; 854 855 i = td->cur_depth; 856 if (i) { 857 ret = io_u_queued_complete(td, i, bytes_done); 858 if (td->o.fill_device && td->error == ENOSPC) 859 td->error = 0; 860 } 861 862 if (should_fsync(td) && td->o.end_fsync) { 863 td_set_runstate(td, TD_FSYNCING); 864 865 for_each_file(td, f, i) { 866 if (!fio_file_fsync(td, f)) 867 continue; 868 869 log_err("fio: end_fsync failed for file %s\n", 870 f->file_name); 871 } 872 } 873 } else 874 cleanup_pending_aio(td); 875 876 /* 877 * stop job if we failed doing any IO 878 */ 879 if (!ddir_rw_sum(td->this_io_bytes)) 880 td->done = 1; 881 882 return bytes_done[DDIR_WRITE] + bytes_done[DDIR_TRIM]; 883} 884 885static void cleanup_io_u(struct thread_data *td) 886{ 887 struct io_u *io_u; 888 889 while ((io_u = io_u_qpop(&td->io_u_freelist)) != NULL) { 890 891 if (td->io_ops->io_u_free) 892 td->io_ops->io_u_free(td, io_u); 893 894 fio_memfree(io_u, sizeof(*io_u)); 895 } 896 897 free_io_mem(td); 898 899 io_u_rexit(&td->io_u_requeues); 900 io_u_qexit(&td->io_u_freelist); 901 io_u_qexit(&td->io_u_all); 902} 903 904static int init_io_u(struct thread_data *td) 905{ 906 struct io_u *io_u; 907 unsigned int max_bs, min_write; 908 int cl_align, i, max_units; 909 int data_xfer = 1, err; 910 char *p; 911 912 max_units = td->o.iodepth; 913 max_bs = td_max_bs(td); 914 min_write = td->o.min_bs[DDIR_WRITE]; 915 td->orig_buffer_size = (unsigned long long) max_bs 916 * (unsigned long long) max_units; 917 918 if ((td->io_ops->flags & FIO_NOIO) || !(td_read(td) || td_write(td))) 919 data_xfer = 0; 920 921 err = 0; 922 err += io_u_rinit(&td->io_u_requeues, td->o.iodepth); 923 err += io_u_qinit(&td->io_u_freelist, td->o.iodepth); 924 err += io_u_qinit(&td->io_u_all, td->o.iodepth); 925 926 if (err) { 927 log_err("fio: failed setting up IO queues\n"); 928 return 1; 929 } 930 931 /* 932 * if we may later need to do address alignment, then add any 933 * possible adjustment here so that we don't cause a buffer 934 * overflow later. this adjustment may be too much if we get 935 * lucky and the allocator gives us an aligned address. 936 */ 937 if (td->o.odirect || td->o.mem_align || td->o.oatomic || 938 (td->io_ops->flags & FIO_RAWIO)) 939 td->orig_buffer_size += page_mask + td->o.mem_align; 940 941 if (td->o.mem_type == MEM_SHMHUGE || td->o.mem_type == MEM_MMAPHUGE) { 942 unsigned long bs; 943 944 bs = td->orig_buffer_size + td->o.hugepage_size - 1; 945 td->orig_buffer_size = bs & ~(td->o.hugepage_size - 1); 946 } 947 948 if (td->orig_buffer_size != (size_t) td->orig_buffer_size) { 949 log_err("fio: IO memory too large. Reduce max_bs or iodepth\n"); 950 return 1; 951 } 952 953 if (data_xfer && allocate_io_mem(td)) 954 return 1; 955 956 if (td->o.odirect || td->o.mem_align || td->o.oatomic || 957 (td->io_ops->flags & FIO_RAWIO)) 958 p = PAGE_ALIGN(td->orig_buffer) + td->o.mem_align; 959 else 960 p = td->orig_buffer; 961 962 cl_align = os_cache_line_size(); 963 964 for (i = 0; i < max_units; i++) { 965 void *ptr; 966 967 if (td->terminate) 968 return 1; 969 970 ptr = fio_memalign(cl_align, sizeof(*io_u)); 971 if (!ptr) { 972 log_err("fio: unable to allocate aligned memory\n"); 973 break; 974 } 975 976 io_u = ptr; 977 memset(io_u, 0, sizeof(*io_u)); 978 INIT_FLIST_HEAD(&io_u->verify_list); 979 dprint(FD_MEM, "io_u alloc %p, index %u\n", io_u, i); 980 981 if (data_xfer) { 982 io_u->buf = p; 983 dprint(FD_MEM, "io_u %p, mem %p\n", io_u, io_u->buf); 984 985 if (td_write(td)) 986 io_u_fill_buffer(td, io_u, min_write, max_bs); 987 if (td_write(td) && td->o.verify_pattern_bytes) { 988 /* 989 * Fill the buffer with the pattern if we are 990 * going to be doing writes. 991 */ 992 fill_verify_pattern(td, io_u->buf, max_bs, io_u, 0, 0); 993 } 994 } 995 996 io_u->index = i; 997 io_u->flags = IO_U_F_FREE; 998 io_u_qpush(&td->io_u_freelist, io_u); 999 1000 /* 1001 * io_u never leaves this stack, used for iteration of all 1002 * io_u buffers. 1003 */ 1004 io_u_qpush(&td->io_u_all, io_u); 1005 1006 if (td->io_ops->io_u_init) { 1007 int ret = td->io_ops->io_u_init(td, io_u); 1008 1009 if (ret) { 1010 log_err("fio: failed to init engine data: %d\n", ret); 1011 return 1; 1012 } 1013 } 1014 1015 p += max_bs; 1016 } 1017 1018 return 0; 1019} 1020 1021static int switch_ioscheduler(struct thread_data *td) 1022{ 1023 char tmp[256], tmp2[128]; 1024 FILE *f; 1025 int ret; 1026 1027 if (td->io_ops->flags & FIO_DISKLESSIO) 1028 return 0; 1029 1030 sprintf(tmp, "%s/queue/scheduler", td->sysfs_root); 1031 1032 f = fopen(tmp, "r+"); 1033 if (!f) { 1034 if (errno == ENOENT) { 1035 log_err("fio: os or kernel doesn't support IO scheduler" 1036 " switching\n"); 1037 return 0; 1038 } 1039 td_verror(td, errno, "fopen iosched"); 1040 return 1; 1041 } 1042 1043 /* 1044 * Set io scheduler. 1045 */ 1046 ret = fwrite(td->o.ioscheduler, strlen(td->o.ioscheduler), 1, f); 1047 if (ferror(f) || ret != 1) { 1048 td_verror(td, errno, "fwrite"); 1049 fclose(f); 1050 return 1; 1051 } 1052 1053 rewind(f); 1054 1055 /* 1056 * Read back and check that the selected scheduler is now the default. 1057 */ 1058 ret = fread(tmp, 1, sizeof(tmp), f); 1059 if (ferror(f) || ret < 0) { 1060 td_verror(td, errno, "fread"); 1061 fclose(f); 1062 return 1; 1063 } 1064 1065 sprintf(tmp2, "[%s]", td->o.ioscheduler); 1066 if (!strstr(tmp, tmp2)) { 1067 log_err("fio: io scheduler %s not found\n", td->o.ioscheduler); 1068 td_verror(td, EINVAL, "iosched_switch"); 1069 fclose(f); 1070 return 1; 1071 } 1072 1073 fclose(f); 1074 return 0; 1075} 1076 1077static int keep_running(struct thread_data *td) 1078{ 1079 if (td->done) 1080 return 0; 1081 if (td->o.time_based) 1082 return 1; 1083 if (td->o.loops) { 1084 td->o.loops--; 1085 return 1; 1086 } 1087 1088 if (td->o.size != -1ULL && ddir_rw_sum(td->io_bytes) < td->o.size) { 1089 uint64_t diff; 1090 1091 /* 1092 * If the difference is less than the minimum IO size, we 1093 * are done. 1094 */ 1095 diff = td->o.size - ddir_rw_sum(td->io_bytes); 1096 if (diff < td_max_bs(td)) 1097 return 0; 1098 1099 return 1; 1100 } 1101 1102 return 0; 1103} 1104 1105static int exec_string(struct thread_options *o, const char *string, const char *mode) 1106{ 1107 int ret, newlen = strlen(string) + strlen(o->name) + strlen(mode) + 9 + 1; 1108 char *str; 1109 1110 str = malloc(newlen); 1111 sprintf(str, "%s &> %s.%s.txt", string, o->name, mode); 1112 1113 log_info("%s : Saving output of %s in %s.%s.txt\n",o->name, mode, o->name, mode); 1114 ret = system(str); 1115 if (ret == -1) 1116 log_err("fio: exec of cmd <%s> failed\n", str); 1117 1118 free(str); 1119 return ret; 1120} 1121 1122/* 1123 * Dry run to compute correct state of numberio for verification. 1124 */ 1125static uint64_t do_dry_run(struct thread_data *td) 1126{ 1127 uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 }; 1128 1129 td_set_runstate(td, TD_RUNNING); 1130 1131 while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) || 1132 (!flist_empty(&td->trim_list)) || !io_bytes_exceeded(td)) { 1133 struct io_u *io_u; 1134 int ret; 1135 1136 if (td->terminate || td->done) 1137 break; 1138 1139 io_u = get_io_u(td); 1140 if (!io_u) 1141 break; 1142 1143 io_u->flags |= IO_U_F_FLIGHT; 1144 io_u->error = 0; 1145 io_u->resid = 0; 1146 if (ddir_rw(acct_ddir(io_u))) 1147 td->io_issues[acct_ddir(io_u)]++; 1148 if (ddir_rw(io_u->ddir)) { 1149 io_u_mark_depth(td, 1); 1150 td->ts.total_io_u[io_u->ddir]++; 1151 } 1152 1153 ret = io_u_sync_complete(td, io_u, bytes_done); 1154 (void) ret; 1155 } 1156 1157 return bytes_done[DDIR_WRITE] + bytes_done[DDIR_TRIM]; 1158} 1159 1160/* 1161 * Entry point for the thread based jobs. The process based jobs end up 1162 * here as well, after a little setup. 1163 */ 1164static void *thread_main(void *data) 1165{ 1166 unsigned long long elapsed; 1167 struct thread_data *td = data; 1168 struct thread_options *o = &td->o; 1169 pthread_condattr_t attr; 1170 int clear_state; 1171 int ret; 1172 1173 if (!o->use_thread) { 1174 setsid(); 1175 td->pid = getpid(); 1176 } else 1177 td->pid = gettid(); 1178 1179 /* 1180 * fio_time_init() may not have been called yet if running as a server 1181 */ 1182 fio_time_init(); 1183 1184 fio_local_clock_init(o->use_thread); 1185 1186 dprint(FD_PROCESS, "jobs pid=%d started\n", (int) td->pid); 1187 1188 if (is_backend) 1189 fio_server_send_start(td); 1190 1191 INIT_FLIST_HEAD(&td->io_log_list); 1192 INIT_FLIST_HEAD(&td->io_hist_list); 1193 INIT_FLIST_HEAD(&td->verify_list); 1194 INIT_FLIST_HEAD(&td->trim_list); 1195 INIT_FLIST_HEAD(&td->next_rand_list); 1196 pthread_mutex_init(&td->io_u_lock, NULL); 1197 td->io_hist_tree = RB_ROOT; 1198 1199 pthread_condattr_init(&attr); 1200 pthread_cond_init(&td->verify_cond, &attr); 1201 pthread_cond_init(&td->free_cond, &attr); 1202 1203 td_set_runstate(td, TD_INITIALIZED); 1204 dprint(FD_MUTEX, "up startup_mutex\n"); 1205 fio_mutex_up(startup_mutex); 1206 dprint(FD_MUTEX, "wait on td->mutex\n"); 1207 fio_mutex_down(td->mutex); 1208 dprint(FD_MUTEX, "done waiting on td->mutex\n"); 1209 1210 /* 1211 * the ->mutex mutex is now no longer used, close it to avoid 1212 * eating a file descriptor 1213 */ 1214 fio_mutex_remove(td->mutex); 1215 td->mutex = NULL; 1216 1217 /* 1218 * A new gid requires privilege, so we need to do this before setting 1219 * the uid. 1220 */ 1221 if (o->gid != -1U && setgid(o->gid)) { 1222 td_verror(td, errno, "setgid"); 1223 goto err; 1224 } 1225 if (o->uid != -1U && setuid(o->uid)) { 1226 td_verror(td, errno, "setuid"); 1227 goto err; 1228 } 1229 1230 /* 1231 * If we have a gettimeofday() thread, make sure we exclude that 1232 * thread from this job 1233 */ 1234 if (o->gtod_cpu) 1235 fio_cpu_clear(&o->cpumask, o->gtod_cpu); 1236 1237 /* 1238 * Set affinity first, in case it has an impact on the memory 1239 * allocations. 1240 */ 1241 if (o->cpumask_set) { 1242 ret = fio_setaffinity(td->pid, o->cpumask); 1243 if (ret == -1) { 1244 td_verror(td, errno, "cpu_set_affinity"); 1245 goto err; 1246 } 1247 } 1248 1249#ifdef CONFIG_LIBNUMA 1250 /* numa node setup */ 1251 if (o->numa_cpumask_set || o->numa_memmask_set) { 1252 int ret; 1253 1254 if (numa_available() < 0) { 1255 td_verror(td, errno, "Does not support NUMA API\n"); 1256 goto err; 1257 } 1258 1259 if (o->numa_cpumask_set) { 1260 ret = numa_run_on_node_mask(o->numa_cpunodesmask); 1261 if (ret == -1) { 1262 td_verror(td, errno, \ 1263 "numa_run_on_node_mask failed\n"); 1264 goto err; 1265 } 1266 } 1267 1268 if (o->numa_memmask_set) { 1269 1270 switch (o->numa_mem_mode) { 1271 case MPOL_INTERLEAVE: 1272 numa_set_interleave_mask(o->numa_memnodesmask); 1273 break; 1274 case MPOL_BIND: 1275 numa_set_membind(o->numa_memnodesmask); 1276 break; 1277 case MPOL_LOCAL: 1278 numa_set_localalloc(); 1279 break; 1280 case MPOL_PREFERRED: 1281 numa_set_preferred(o->numa_mem_prefer_node); 1282 break; 1283 case MPOL_DEFAULT: 1284 default: 1285 break; 1286 } 1287 1288 } 1289 } 1290#endif 1291 1292 if (fio_pin_memory(td)) 1293 goto err; 1294 1295 /* 1296 * May alter parameters that init_io_u() will use, so we need to 1297 * do this first. 1298 */ 1299 if (init_iolog(td)) 1300 goto err; 1301 1302 if (init_io_u(td)) 1303 goto err; 1304 1305 if (o->verify_async && verify_async_init(td)) 1306 goto err; 1307 1308 if (o->ioprio) { 1309 ret = ioprio_set(IOPRIO_WHO_PROCESS, 0, o->ioprio_class, o->ioprio); 1310 if (ret == -1) { 1311 td_verror(td, errno, "ioprio_set"); 1312 goto err; 1313 } 1314 } 1315 1316 if (o->cgroup && cgroup_setup(td, cgroup_list, &cgroup_mnt)) 1317 goto err; 1318 1319 errno = 0; 1320 if (nice(o->nice) == -1 && errno != 0) { 1321 td_verror(td, errno, "nice"); 1322 goto err; 1323 } 1324 1325 if (o->ioscheduler && switch_ioscheduler(td)) 1326 goto err; 1327 1328 if (!o->create_serialize && setup_files(td)) 1329 goto err; 1330 1331 if (td_io_init(td)) 1332 goto err; 1333 1334 if (init_random_map(td)) 1335 goto err; 1336 1337 if (o->exec_prerun && exec_string(o, o->exec_prerun, (const char *)"prerun")) 1338 goto err; 1339 1340 if (o->pre_read) { 1341 if (pre_read_files(td) < 0) 1342 goto err; 1343 } 1344 1345 fio_verify_init(td); 1346 1347 fio_gettime(&td->epoch, NULL); 1348 fio_getrusage(&td->ru_start); 1349 clear_state = 0; 1350 while (keep_running(td)) { 1351 uint64_t verify_bytes; 1352 1353 fio_gettime(&td->start, NULL); 1354 memcpy(&td->bw_sample_time, &td->start, sizeof(td->start)); 1355 memcpy(&td->iops_sample_time, &td->start, sizeof(td->start)); 1356 memcpy(&td->tv_cache, &td->start, sizeof(td->start)); 1357 1358 if (o->ratemin[DDIR_READ] || o->ratemin[DDIR_WRITE] || 1359 o->ratemin[DDIR_TRIM]) { 1360 memcpy(&td->lastrate[DDIR_READ], &td->bw_sample_time, 1361 sizeof(td->bw_sample_time)); 1362 memcpy(&td->lastrate[DDIR_WRITE], &td->bw_sample_time, 1363 sizeof(td->bw_sample_time)); 1364 memcpy(&td->lastrate[DDIR_TRIM], &td->bw_sample_time, 1365 sizeof(td->bw_sample_time)); 1366 } 1367 1368 if (clear_state) 1369 clear_io_state(td); 1370 1371 prune_io_piece_log(td); 1372 1373 if (td->o.verify_only && (td_write(td) || td_rw(td))) 1374 verify_bytes = do_dry_run(td); 1375 else 1376 verify_bytes = do_io(td); 1377 1378 clear_state = 1; 1379 1380 if (td_read(td) && td->io_bytes[DDIR_READ]) { 1381 elapsed = utime_since_now(&td->start); 1382 td->ts.runtime[DDIR_READ] += elapsed; 1383 } 1384 if (td_write(td) && td->io_bytes[DDIR_WRITE]) { 1385 elapsed = utime_since_now(&td->start); 1386 td->ts.runtime[DDIR_WRITE] += elapsed; 1387 } 1388 if (td_trim(td) && td->io_bytes[DDIR_TRIM]) { 1389 elapsed = utime_since_now(&td->start); 1390 td->ts.runtime[DDIR_TRIM] += elapsed; 1391 } 1392 1393 if (td->error || td->terminate) 1394 break; 1395 1396 if (!o->do_verify || 1397 o->verify == VERIFY_NONE || 1398 (td->io_ops->flags & FIO_UNIDIR)) 1399 continue; 1400 1401 clear_io_state(td); 1402 1403 fio_gettime(&td->start, NULL); 1404 1405 do_verify(td, verify_bytes); 1406 1407 td->ts.runtime[DDIR_READ] += utime_since_now(&td->start); 1408 1409 if (td->error || td->terminate) 1410 break; 1411 } 1412 1413 update_rusage_stat(td); 1414 td->ts.runtime[DDIR_READ] = (td->ts.runtime[DDIR_READ] + 999) / 1000; 1415 td->ts.runtime[DDIR_WRITE] = (td->ts.runtime[DDIR_WRITE] + 999) / 1000; 1416 td->ts.runtime[DDIR_TRIM] = (td->ts.runtime[DDIR_TRIM] + 999) / 1000; 1417 td->ts.total_run_time = mtime_since_now(&td->epoch); 1418 td->ts.io_bytes[DDIR_READ] = td->io_bytes[DDIR_READ]; 1419 td->ts.io_bytes[DDIR_WRITE] = td->io_bytes[DDIR_WRITE]; 1420 td->ts.io_bytes[DDIR_TRIM] = td->io_bytes[DDIR_TRIM]; 1421 1422 fio_unpin_memory(td); 1423 1424 fio_mutex_down(writeout_mutex); 1425 if (td->bw_log) { 1426 if (o->bw_log_file) { 1427 finish_log_named(td, td->bw_log, 1428 o->bw_log_file, "bw"); 1429 } else 1430 finish_log(td, td->bw_log, "bw"); 1431 } 1432 if (td->lat_log) { 1433 if (o->lat_log_file) { 1434 finish_log_named(td, td->lat_log, 1435 o->lat_log_file, "lat"); 1436 } else 1437 finish_log(td, td->lat_log, "lat"); 1438 } 1439 if (td->slat_log) { 1440 if (o->lat_log_file) { 1441 finish_log_named(td, td->slat_log, 1442 o->lat_log_file, "slat"); 1443 } else 1444 finish_log(td, td->slat_log, "slat"); 1445 } 1446 if (td->clat_log) { 1447 if (o->lat_log_file) { 1448 finish_log_named(td, td->clat_log, 1449 o->lat_log_file, "clat"); 1450 } else 1451 finish_log(td, td->clat_log, "clat"); 1452 } 1453 if (td->iops_log) { 1454 if (o->iops_log_file) { 1455 finish_log_named(td, td->iops_log, 1456 o->iops_log_file, "iops"); 1457 } else 1458 finish_log(td, td->iops_log, "iops"); 1459 } 1460 1461 fio_mutex_up(writeout_mutex); 1462 if (o->exec_postrun) 1463 exec_string(o, o->exec_postrun, (const char *)"postrun"); 1464 1465 if (exitall_on_terminate) 1466 fio_terminate_threads(td->groupid); 1467 1468err: 1469 if (td->error) 1470 log_info("fio: pid=%d, err=%d/%s\n", (int) td->pid, td->error, 1471 td->verror); 1472 1473 if (o->verify_async) 1474 verify_async_exit(td); 1475 1476 close_and_free_files(td); 1477 cleanup_io_u(td); 1478 close_ioengine(td); 1479 cgroup_shutdown(td, &cgroup_mnt); 1480 1481 if (o->cpumask_set) { 1482 int ret = fio_cpuset_exit(&o->cpumask); 1483 1484 td_verror(td, ret, "fio_cpuset_exit"); 1485 } 1486 1487 /* 1488 * do this very late, it will log file closing as well 1489 */ 1490 if (o->write_iolog_file) 1491 write_iolog_close(td); 1492 1493 fio_mutex_remove(td->rusage_sem); 1494 td->rusage_sem = NULL; 1495 1496 td_set_runstate(td, TD_EXITED); 1497 return (void *) (uintptr_t) td->error; 1498} 1499 1500 1501/* 1502 * We cannot pass the td data into a forked process, so attach the td and 1503 * pass it to the thread worker. 1504 */ 1505static int fork_main(int shmid, int offset) 1506{ 1507 struct thread_data *td; 1508 void *data, *ret; 1509 1510#ifndef __hpux 1511 data = shmat(shmid, NULL, 0); 1512 if (data == (void *) -1) { 1513 int __err = errno; 1514 1515 perror("shmat"); 1516 return __err; 1517 } 1518#else 1519 /* 1520 * HP-UX inherits shm mappings? 1521 */ 1522 data = threads; 1523#endif 1524 1525 td = data + offset * sizeof(struct thread_data); 1526 ret = thread_main(td); 1527 shmdt(data); 1528 return (int) (uintptr_t) ret; 1529} 1530 1531/* 1532 * Run over the job map and reap the threads that have exited, if any. 1533 */ 1534static void reap_threads(unsigned int *nr_running, unsigned int *t_rate, 1535 unsigned int *m_rate) 1536{ 1537 struct thread_data *td; 1538 unsigned int cputhreads, realthreads, pending; 1539 int i, status, ret; 1540 1541 /* 1542 * reap exited threads (TD_EXITED -> TD_REAPED) 1543 */ 1544 realthreads = pending = cputhreads = 0; 1545 for_each_td(td, i) { 1546 int flags = 0; 1547 1548 /* 1549 * ->io_ops is NULL for a thread that has closed its 1550 * io engine 1551 */ 1552 if (td->io_ops && !strcmp(td->io_ops->name, "cpuio")) 1553 cputhreads++; 1554 else 1555 realthreads++; 1556 1557 if (!td->pid) { 1558 pending++; 1559 continue; 1560 } 1561 if (td->runstate == TD_REAPED) 1562 continue; 1563 if (td->o.use_thread) { 1564 if (td->runstate == TD_EXITED) { 1565 td_set_runstate(td, TD_REAPED); 1566 goto reaped; 1567 } 1568 continue; 1569 } 1570 1571 flags = WNOHANG; 1572 if (td->runstate == TD_EXITED) 1573 flags = 0; 1574 1575 /* 1576 * check if someone quit or got killed in an unusual way 1577 */ 1578 ret = waitpid(td->pid, &status, flags); 1579 if (ret < 0) { 1580 if (errno == ECHILD) { 1581 log_err("fio: pid=%d disappeared %d\n", 1582 (int) td->pid, td->runstate); 1583 td->sig = ECHILD; 1584 td_set_runstate(td, TD_REAPED); 1585 goto reaped; 1586 } 1587 perror("waitpid"); 1588 } else if (ret == td->pid) { 1589 if (WIFSIGNALED(status)) { 1590 int sig = WTERMSIG(status); 1591 1592 if (sig != SIGTERM && sig != SIGUSR2) 1593 log_err("fio: pid=%d, got signal=%d\n", 1594 (int) td->pid, sig); 1595 td->sig = sig; 1596 td_set_runstate(td, TD_REAPED); 1597 goto reaped; 1598 } 1599 if (WIFEXITED(status)) { 1600 if (WEXITSTATUS(status) && !td->error) 1601 td->error = WEXITSTATUS(status); 1602 1603 td_set_runstate(td, TD_REAPED); 1604 goto reaped; 1605 } 1606 } 1607 1608 /* 1609 * thread is not dead, continue 1610 */ 1611 pending++; 1612 continue; 1613reaped: 1614 (*nr_running)--; 1615 (*m_rate) -= ddir_rw_sum(td->o.ratemin); 1616 (*t_rate) -= ddir_rw_sum(td->o.rate); 1617 if (!td->pid) 1618 pending--; 1619 1620 if (td->error) 1621 exit_value++; 1622 1623 done_secs += mtime_since_now(&td->epoch) / 1000; 1624 profile_td_exit(td); 1625 } 1626 1627 if (*nr_running == cputhreads && !pending && realthreads) 1628 fio_terminate_threads(TERMINATE_ALL); 1629} 1630 1631static void do_usleep(unsigned int usecs) 1632{ 1633 check_for_running_stats(); 1634 usleep(usecs); 1635} 1636 1637/* 1638 * Main function for kicking off and reaping jobs, as needed. 1639 */ 1640static void run_threads(void) 1641{ 1642 struct thread_data *td; 1643 unsigned long spent; 1644 unsigned int i, todo, nr_running, m_rate, t_rate, nr_started; 1645 1646 if (fio_gtod_offload && fio_start_gtod_thread()) 1647 return; 1648 1649 fio_idle_prof_init(); 1650 1651 set_sig_handlers(); 1652 1653 nr_thread = nr_process = 0; 1654 for_each_td(td, i) { 1655 if (td->o.use_thread) 1656 nr_thread++; 1657 else 1658 nr_process++; 1659 } 1660 1661 if (output_format == FIO_OUTPUT_NORMAL) { 1662 log_info("Starting "); 1663 if (nr_thread) 1664 log_info("%d thread%s", nr_thread, 1665 nr_thread > 1 ? "s" : ""); 1666 if (nr_process) { 1667 if (nr_thread) 1668 log_info(" and "); 1669 log_info("%d process%s", nr_process, 1670 nr_process > 1 ? "es" : ""); 1671 } 1672 log_info("\n"); 1673 fflush(stdout); 1674 } 1675 1676 todo = thread_number; 1677 nr_running = 0; 1678 nr_started = 0; 1679 m_rate = t_rate = 0; 1680 1681 for_each_td(td, i) { 1682 print_status_init(td->thread_number - 1); 1683 1684 if (!td->o.create_serialize) 1685 continue; 1686 1687 /* 1688 * do file setup here so it happens sequentially, 1689 * we don't want X number of threads getting their 1690 * client data interspersed on disk 1691 */ 1692 if (setup_files(td)) { 1693 exit_value++; 1694 if (td->error) 1695 log_err("fio: pid=%d, err=%d/%s\n", 1696 (int) td->pid, td->error, td->verror); 1697 td_set_runstate(td, TD_REAPED); 1698 todo--; 1699 } else { 1700 struct fio_file *f; 1701 unsigned int j; 1702 1703 /* 1704 * for sharing to work, each job must always open 1705 * its own files. so close them, if we opened them 1706 * for creation 1707 */ 1708 for_each_file(td, f, j) { 1709 if (fio_file_open(f)) 1710 td_io_close_file(td, f); 1711 } 1712 } 1713 } 1714 1715 /* start idle threads before io threads start to run */ 1716 fio_idle_prof_start(); 1717 1718 set_genesis_time(); 1719 1720 while (todo) { 1721 struct thread_data *map[REAL_MAX_JOBS]; 1722 struct timeval this_start; 1723 int this_jobs = 0, left; 1724 1725 /* 1726 * create threads (TD_NOT_CREATED -> TD_CREATED) 1727 */ 1728 for_each_td(td, i) { 1729 if (td->runstate != TD_NOT_CREATED) 1730 continue; 1731 1732 /* 1733 * never got a chance to start, killed by other 1734 * thread for some reason 1735 */ 1736 if (td->terminate) { 1737 todo--; 1738 continue; 1739 } 1740 1741 if (td->o.start_delay) { 1742 spent = mtime_since_genesis(); 1743 1744 if (td->o.start_delay * 1000 > spent) 1745 continue; 1746 } 1747 1748 if (td->o.stonewall && (nr_started || nr_running)) { 1749 dprint(FD_PROCESS, "%s: stonewall wait\n", 1750 td->o.name); 1751 break; 1752 } 1753 1754 init_disk_util(td); 1755 1756 td->rusage_sem = fio_mutex_init(FIO_MUTEX_LOCKED); 1757 td->update_rusage = 0; 1758 1759 /* 1760 * Set state to created. Thread will transition 1761 * to TD_INITIALIZED when it's done setting up. 1762 */ 1763 td_set_runstate(td, TD_CREATED); 1764 map[this_jobs++] = td; 1765 nr_started++; 1766 1767 if (td->o.use_thread) { 1768 int ret; 1769 1770 dprint(FD_PROCESS, "will pthread_create\n"); 1771 ret = pthread_create(&td->thread, NULL, 1772 thread_main, td); 1773 if (ret) { 1774 log_err("pthread_create: %s\n", 1775 strerror(ret)); 1776 nr_started--; 1777 break; 1778 } 1779 ret = pthread_detach(td->thread); 1780 if (ret) 1781 log_err("pthread_detach: %s", 1782 strerror(ret)); 1783 } else { 1784 pid_t pid; 1785 dprint(FD_PROCESS, "will fork\n"); 1786 pid = fork(); 1787 if (!pid) { 1788 int ret = fork_main(shm_id, i); 1789 1790 _exit(ret); 1791 } else if (i == fio_debug_jobno) 1792 *fio_debug_jobp = pid; 1793 } 1794 dprint(FD_MUTEX, "wait on startup_mutex\n"); 1795 if (fio_mutex_down_timeout(startup_mutex, 10)) { 1796 log_err("fio: job startup hung? exiting.\n"); 1797 fio_terminate_threads(TERMINATE_ALL); 1798 fio_abort = 1; 1799 nr_started--; 1800 break; 1801 } 1802 dprint(FD_MUTEX, "done waiting on startup_mutex\n"); 1803 } 1804 1805 /* 1806 * Wait for the started threads to transition to 1807 * TD_INITIALIZED. 1808 */ 1809 fio_gettime(&this_start, NULL); 1810 left = this_jobs; 1811 while (left && !fio_abort) { 1812 if (mtime_since_now(&this_start) > JOB_START_TIMEOUT) 1813 break; 1814 1815 do_usleep(100000); 1816 1817 for (i = 0; i < this_jobs; i++) { 1818 td = map[i]; 1819 if (!td) 1820 continue; 1821 if (td->runstate == TD_INITIALIZED) { 1822 map[i] = NULL; 1823 left--; 1824 } else if (td->runstate >= TD_EXITED) { 1825 map[i] = NULL; 1826 left--; 1827 todo--; 1828 nr_running++; /* work-around... */ 1829 } 1830 } 1831 } 1832 1833 if (left) { 1834 log_err("fio: %d job%s failed to start\n", left, 1835 left > 1 ? "s" : ""); 1836 for (i = 0; i < this_jobs; i++) { 1837 td = map[i]; 1838 if (!td) 1839 continue; 1840 kill(td->pid, SIGTERM); 1841 } 1842 break; 1843 } 1844 1845 /* 1846 * start created threads (TD_INITIALIZED -> TD_RUNNING). 1847 */ 1848 for_each_td(td, i) { 1849 if (td->runstate != TD_INITIALIZED) 1850 continue; 1851 1852 if (in_ramp_time(td)) 1853 td_set_runstate(td, TD_RAMP); 1854 else 1855 td_set_runstate(td, TD_RUNNING); 1856 nr_running++; 1857 nr_started--; 1858 m_rate += ddir_rw_sum(td->o.ratemin); 1859 t_rate += ddir_rw_sum(td->o.rate); 1860 todo--; 1861 fio_mutex_up(td->mutex); 1862 } 1863 1864 reap_threads(&nr_running, &t_rate, &m_rate); 1865 1866 if (todo) 1867 do_usleep(100000); 1868 } 1869 1870 while (nr_running) { 1871 reap_threads(&nr_running, &t_rate, &m_rate); 1872 do_usleep(10000); 1873 } 1874 1875 fio_idle_prof_stop(); 1876 1877 update_io_ticks(); 1878} 1879 1880void wait_for_disk_thread_exit(void) 1881{ 1882 fio_mutex_down(disk_thread_mutex); 1883} 1884 1885static void free_disk_util(void) 1886{ 1887 disk_util_start_exit(); 1888 wait_for_disk_thread_exit(); 1889 disk_util_prune_entries(); 1890} 1891 1892static void *disk_thread_main(void *data) 1893{ 1894 int ret = 0; 1895 1896 fio_mutex_up(startup_mutex); 1897 1898 while (threads && !ret) { 1899 usleep(DISK_UTIL_MSEC * 1000); 1900 if (!threads) 1901 break; 1902 ret = update_io_ticks(); 1903 1904 if (!is_backend) 1905 print_thread_status(); 1906 } 1907 1908 fio_mutex_up(disk_thread_mutex); 1909 return NULL; 1910} 1911 1912static int create_disk_util_thread(void) 1913{ 1914 int ret; 1915 1916 setup_disk_util(); 1917 1918 disk_thread_mutex = fio_mutex_init(FIO_MUTEX_LOCKED); 1919 1920 ret = pthread_create(&disk_util_thread, NULL, disk_thread_main, NULL); 1921 if (ret) { 1922 fio_mutex_remove(disk_thread_mutex); 1923 log_err("Can't create disk util thread: %s\n", strerror(ret)); 1924 return 1; 1925 } 1926 1927 ret = pthread_detach(disk_util_thread); 1928 if (ret) { 1929 fio_mutex_remove(disk_thread_mutex); 1930 log_err("Can't detatch disk util thread: %s\n", strerror(ret)); 1931 return 1; 1932 } 1933 1934 dprint(FD_MUTEX, "wait on startup_mutex\n"); 1935 fio_mutex_down(startup_mutex); 1936 dprint(FD_MUTEX, "done waiting on startup_mutex\n"); 1937 return 0; 1938} 1939 1940int fio_backend(void) 1941{ 1942 struct thread_data *td; 1943 int i; 1944 1945 if (exec_profile) { 1946 if (load_profile(exec_profile)) 1947 return 1; 1948 free(exec_profile); 1949 exec_profile = NULL; 1950 } 1951 if (!thread_number) 1952 return 0; 1953 1954 if (write_bw_log) { 1955 setup_log(&agg_io_log[DDIR_READ], 0, IO_LOG_TYPE_BW); 1956 setup_log(&agg_io_log[DDIR_WRITE], 0, IO_LOG_TYPE_BW); 1957 setup_log(&agg_io_log[DDIR_TRIM], 0, IO_LOG_TYPE_BW); 1958 } 1959 1960 startup_mutex = fio_mutex_init(FIO_MUTEX_LOCKED); 1961 if (startup_mutex == NULL) 1962 return 1; 1963 writeout_mutex = fio_mutex_init(FIO_MUTEX_UNLOCKED); 1964 if (writeout_mutex == NULL) 1965 return 1; 1966 1967 set_genesis_time(); 1968 stat_init(); 1969 create_disk_util_thread(); 1970 1971 cgroup_list = smalloc(sizeof(*cgroup_list)); 1972 INIT_FLIST_HEAD(cgroup_list); 1973 1974 run_threads(); 1975 1976 if (!fio_abort) { 1977 show_run_stats(); 1978 if (write_bw_log) { 1979 __finish_log(agg_io_log[DDIR_READ], "agg-read_bw.log"); 1980 __finish_log(agg_io_log[DDIR_WRITE], 1981 "agg-write_bw.log"); 1982 __finish_log(agg_io_log[DDIR_TRIM], 1983 "agg-write_bw.log"); 1984 } 1985 } 1986 1987 for_each_td(td, i) 1988 fio_options_free(td); 1989 1990 free_disk_util(); 1991 cgroup_kill(cgroup_list); 1992 sfree(cgroup_list); 1993 sfree(cgroup_mnt); 1994 1995 fio_mutex_remove(startup_mutex); 1996 fio_mutex_remove(writeout_mutex); 1997 fio_mutex_remove(disk_thread_mutex); 1998 stat_exit(); 1999 return exit_value; 2000} 2001