backend.c revision 20876c53b5d32f2da9049af5e7fb102133946981
1/* 2 * fio - the flexible io tester 3 * 4 * Copyright (C) 2005 Jens Axboe <axboe@suse.de> 5 * Copyright (C) 2006-2012 Jens Axboe <axboe@kernel.dk> 6 * 7 * The license below covers all files distributed with fio unless otherwise 8 * noted in the file itself. 9 * 10 * This program is free software; you can redistribute it and/or modify 11 * it under the terms of the GNU General Public License version 2 as 12 * published by the Free Software Foundation. 13 * 14 * This program is distributed in the hope that it will be useful, 15 * but WITHOUT ANY WARRANTY; without even the implied warranty of 16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 * GNU General Public License for more details. 18 * 19 * You should have received a copy of the GNU General Public License 20 * along with this program; if not, write to the Free Software 21 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 22 * 23 */ 24#include <unistd.h> 25#include <fcntl.h> 26#include <string.h> 27#include <limits.h> 28#include <signal.h> 29#include <time.h> 30#include <locale.h> 31#include <assert.h> 32#include <time.h> 33#include <inttypes.h> 34#include <sys/stat.h> 35#include <sys/wait.h> 36#include <sys/ipc.h> 37#include <sys/mman.h> 38 39#include "fio.h" 40#ifndef FIO_NO_HAVE_SHM_H 41#include <sys/shm.h> 42#endif 43#include "hash.h" 44#include "smalloc.h" 45#include "verify.h" 46#include "trim.h" 47#include "diskutil.h" 48#include "cgroup.h" 49#include "profile.h" 50#include "lib/rand.h" 51#include "memalign.h" 52#include "server.h" 53#include "lib/getrusage.h" 54#include "idletime.h" 55 56static pthread_t disk_util_thread; 57static struct fio_mutex *disk_thread_mutex; 58static struct fio_mutex *startup_mutex; 59static struct fio_mutex *writeout_mutex; 60static struct flist_head *cgroup_list; 61static char *cgroup_mnt; 62static int exit_value; 63static volatile int fio_abort; 64static unsigned int nr_process = 0; 65static unsigned int nr_thread = 0; 66 67struct io_log *agg_io_log[DDIR_RWDIR_CNT]; 68 69int groupid = 0; 70unsigned int thread_number = 0; 71unsigned int stat_number = 0; 72int shm_id = 0; 73int temp_stall_ts; 74unsigned long done_secs = 0; 75volatile int disk_util_exit = 0; 76 77#define PAGE_ALIGN(buf) \ 78 (char *) (((uintptr_t) (buf) + page_mask) & ~page_mask) 79 80#define JOB_START_TIMEOUT (5 * 1000) 81 82static void sig_int(int sig) 83{ 84 if (threads) { 85 if (is_backend) 86 fio_server_got_signal(sig); 87 else { 88 log_info("\nfio: terminating on signal %d\n", sig); 89 fflush(stdout); 90 exit_value = 128; 91 } 92 93 fio_terminate_threads(TERMINATE_ALL); 94 } 95} 96 97static void sig_show_status(int sig) 98{ 99 show_running_run_stats(); 100} 101 102static void set_sig_handlers(void) 103{ 104 struct sigaction act; 105 106 memset(&act, 0, sizeof(act)); 107 act.sa_handler = sig_int; 108 act.sa_flags = SA_RESTART; 109 sigaction(SIGINT, &act, NULL); 110 111 memset(&act, 0, sizeof(act)); 112 act.sa_handler = sig_int; 113 act.sa_flags = SA_RESTART; 114 sigaction(SIGTERM, &act, NULL); 115 116/* Windows uses SIGBREAK as a quit signal from other applications */ 117#ifdef WIN32 118 memset(&act, 0, sizeof(act)); 119 act.sa_handler = sig_int; 120 act.sa_flags = SA_RESTART; 121 sigaction(SIGBREAK, &act, NULL); 122#endif 123 124 memset(&act, 0, sizeof(act)); 125 act.sa_handler = sig_show_status; 126 act.sa_flags = SA_RESTART; 127 sigaction(SIGUSR1, &act, NULL); 128 129 if (is_backend) { 130 memset(&act, 0, sizeof(act)); 131 act.sa_handler = sig_int; 132 act.sa_flags = SA_RESTART; 133 sigaction(SIGPIPE, &act, NULL); 134 } 135} 136 137/* 138 * Check if we are above the minimum rate given. 139 */ 140static int __check_min_rate(struct thread_data *td, struct timeval *now, 141 enum fio_ddir ddir) 142{ 143 unsigned long long bytes = 0; 144 unsigned long iops = 0; 145 unsigned long spent; 146 unsigned long rate; 147 unsigned int ratemin = 0; 148 unsigned int rate_iops = 0; 149 unsigned int rate_iops_min = 0; 150 151 assert(ddir_rw(ddir)); 152 153 if (!td->o.ratemin[ddir] && !td->o.rate_iops_min[ddir]) 154 return 0; 155 156 /* 157 * allow a 2 second settle period in the beginning 158 */ 159 if (mtime_since(&td->start, now) < 2000) 160 return 0; 161 162 iops += td->this_io_blocks[ddir]; 163 bytes += td->this_io_bytes[ddir]; 164 ratemin += td->o.ratemin[ddir]; 165 rate_iops += td->o.rate_iops[ddir]; 166 rate_iops_min += td->o.rate_iops_min[ddir]; 167 168 /* 169 * if rate blocks is set, sample is running 170 */ 171 if (td->rate_bytes[ddir] || td->rate_blocks[ddir]) { 172 spent = mtime_since(&td->lastrate[ddir], now); 173 if (spent < td->o.ratecycle) 174 return 0; 175 176 if (td->o.rate[ddir]) { 177 /* 178 * check bandwidth specified rate 179 */ 180 if (bytes < td->rate_bytes[ddir]) { 181 log_err("%s: min rate %u not met\n", td->o.name, 182 ratemin); 183 return 1; 184 } else { 185 rate = ((bytes - td->rate_bytes[ddir]) * 1000) / spent; 186 if (rate < ratemin || 187 bytes < td->rate_bytes[ddir]) { 188 log_err("%s: min rate %u not met, got" 189 " %luKB/sec\n", td->o.name, 190 ratemin, rate); 191 return 1; 192 } 193 } 194 } else { 195 /* 196 * checks iops specified rate 197 */ 198 if (iops < rate_iops) { 199 log_err("%s: min iops rate %u not met\n", 200 td->o.name, rate_iops); 201 return 1; 202 } else { 203 rate = ((iops - td->rate_blocks[ddir]) * 1000) / spent; 204 if (rate < rate_iops_min || 205 iops < td->rate_blocks[ddir]) { 206 log_err("%s: min iops rate %u not met," 207 " got %lu\n", td->o.name, 208 rate_iops_min, rate); 209 } 210 } 211 } 212 } 213 214 td->rate_bytes[ddir] = bytes; 215 td->rate_blocks[ddir] = iops; 216 memcpy(&td->lastrate[ddir], now, sizeof(*now)); 217 return 0; 218} 219 220static int check_min_rate(struct thread_data *td, struct timeval *now, 221 uint64_t *bytes_done) 222{ 223 int ret = 0; 224 225 if (bytes_done[DDIR_READ]) 226 ret |= __check_min_rate(td, now, DDIR_READ); 227 if (bytes_done[DDIR_WRITE]) 228 ret |= __check_min_rate(td, now, DDIR_WRITE); 229 if (bytes_done[DDIR_TRIM]) 230 ret |= __check_min_rate(td, now, DDIR_TRIM); 231 232 return ret; 233} 234 235/* 236 * When job exits, we can cancel the in-flight IO if we are using async 237 * io. Attempt to do so. 238 */ 239static void cleanup_pending_aio(struct thread_data *td) 240{ 241 int r; 242 243 /* 244 * get immediately available events, if any 245 */ 246 r = io_u_queued_complete(td, 0, NULL); 247 if (r < 0) 248 return; 249 250 /* 251 * now cancel remaining active events 252 */ 253 if (td->io_ops->cancel) { 254 struct io_u *io_u; 255 int i; 256 257 io_u_qiter(&td->io_u_all, io_u, i) { 258 if (io_u->flags & IO_U_F_FLIGHT) { 259 r = td->io_ops->cancel(td, io_u); 260 if (!r) 261 put_io_u(td, io_u); 262 } 263 } 264 } 265 266 if (td->cur_depth) 267 r = io_u_queued_complete(td, td->cur_depth, NULL); 268} 269 270/* 271 * Helper to handle the final sync of a file. Works just like the normal 272 * io path, just does everything sync. 273 */ 274static int fio_io_sync(struct thread_data *td, struct fio_file *f) 275{ 276 struct io_u *io_u = __get_io_u(td); 277 int ret; 278 279 if (!io_u) 280 return 1; 281 282 io_u->ddir = DDIR_SYNC; 283 io_u->file = f; 284 285 if (td_io_prep(td, io_u)) { 286 put_io_u(td, io_u); 287 return 1; 288 } 289 290requeue: 291 ret = td_io_queue(td, io_u); 292 if (ret < 0) { 293 td_verror(td, io_u->error, "td_io_queue"); 294 put_io_u(td, io_u); 295 return 1; 296 } else if (ret == FIO_Q_QUEUED) { 297 if (io_u_queued_complete(td, 1, NULL) < 0) 298 return 1; 299 } else if (ret == FIO_Q_COMPLETED) { 300 if (io_u->error) { 301 td_verror(td, io_u->error, "td_io_queue"); 302 return 1; 303 } 304 305 if (io_u_sync_complete(td, io_u, NULL) < 0) 306 return 1; 307 } else if (ret == FIO_Q_BUSY) { 308 if (td_io_commit(td)) 309 return 1; 310 goto requeue; 311 } 312 313 return 0; 314} 315 316static int fio_file_fsync(struct thread_data *td, struct fio_file *f) 317{ 318 int ret; 319 320 if (fio_file_open(f)) 321 return fio_io_sync(td, f); 322 323 if (td_io_open_file(td, f)) 324 return 1; 325 326 ret = fio_io_sync(td, f); 327 td_io_close_file(td, f); 328 return ret; 329} 330 331static inline void __update_tv_cache(struct thread_data *td) 332{ 333 fio_gettime(&td->tv_cache, NULL); 334} 335 336static inline void update_tv_cache(struct thread_data *td) 337{ 338 if ((++td->tv_cache_nr & td->tv_cache_mask) == td->tv_cache_mask) 339 __update_tv_cache(td); 340} 341 342static inline int runtime_exceeded(struct thread_data *td, struct timeval *t) 343{ 344 if (in_ramp_time(td)) 345 return 0; 346 if (!td->o.timeout) 347 return 0; 348 if (mtime_since(&td->epoch, t) >= td->o.timeout * 1000) 349 return 1; 350 351 return 0; 352} 353 354static int break_on_this_error(struct thread_data *td, enum fio_ddir ddir, 355 int *retptr) 356{ 357 int ret = *retptr; 358 359 if (ret < 0 || td->error) { 360 int err = td->error; 361 enum error_type_bit eb; 362 363 if (ret < 0) 364 err = -ret; 365 366 eb = td_error_type(ddir, err); 367 if (!(td->o.continue_on_error & (1 << eb))) 368 return 1; 369 370 if (td_non_fatal_error(td, eb, err)) { 371 /* 372 * Continue with the I/Os in case of 373 * a non fatal error. 374 */ 375 update_error_count(td, err); 376 td_clear_error(td); 377 *retptr = 0; 378 return 0; 379 } else if (td->o.fill_device && err == ENOSPC) { 380 /* 381 * We expect to hit this error if 382 * fill_device option is set. 383 */ 384 td_clear_error(td); 385 td->terminate = 1; 386 return 1; 387 } else { 388 /* 389 * Stop the I/O in case of a fatal 390 * error. 391 */ 392 update_error_count(td, err); 393 return 1; 394 } 395 } 396 397 return 0; 398} 399 400static void check_update_rusage(struct thread_data *td) 401{ 402 if (td->update_rusage) { 403 td->update_rusage = 0; 404 update_rusage_stat(td); 405 fio_mutex_up(td->rusage_sem); 406 } 407} 408 409/* 410 * The main verify engine. Runs over the writes we previously submitted, 411 * reads the blocks back in, and checks the crc/md5 of the data. 412 */ 413static void do_verify(struct thread_data *td, uint64_t verify_bytes) 414{ 415 uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 }; 416 struct fio_file *f; 417 struct io_u *io_u; 418 int ret, min_events; 419 unsigned int i; 420 421 dprint(FD_VERIFY, "starting loop\n"); 422 423 /* 424 * sync io first and invalidate cache, to make sure we really 425 * read from disk. 426 */ 427 for_each_file(td, f, i) { 428 if (!fio_file_open(f)) 429 continue; 430 if (fio_io_sync(td, f)) 431 break; 432 if (file_invalidate_cache(td, f)) 433 break; 434 } 435 436 check_update_rusage(td); 437 438 if (td->error) 439 return; 440 441 td_set_runstate(td, TD_VERIFYING); 442 443 io_u = NULL; 444 while (!td->terminate) { 445 enum fio_ddir ddir; 446 int ret2, full; 447 448 update_tv_cache(td); 449 check_update_rusage(td); 450 451 if (runtime_exceeded(td, &td->tv_cache)) { 452 __update_tv_cache(td); 453 if (runtime_exceeded(td, &td->tv_cache)) { 454 td->terminate = 1; 455 break; 456 } 457 } 458 459 if (flow_threshold_exceeded(td)) 460 continue; 461 462 if (!td->o.experimental_verify) { 463 io_u = __get_io_u(td); 464 if (!io_u) 465 break; 466 467 if (get_next_verify(td, io_u)) { 468 put_io_u(td, io_u); 469 break; 470 } 471 472 if (td_io_prep(td, io_u)) { 473 put_io_u(td, io_u); 474 break; 475 } 476 } else { 477 if (ddir_rw_sum(bytes_done) + td->o.rw_min_bs > verify_bytes) 478 break; 479 480 while ((io_u = get_io_u(td)) != NULL) { 481 /* 482 * We are only interested in the places where 483 * we wrote or trimmed IOs. Turn those into 484 * reads for verification purposes. 485 */ 486 if (io_u->ddir == DDIR_READ) { 487 /* 488 * Pretend we issued it for rwmix 489 * accounting 490 */ 491 td->io_issues[DDIR_READ]++; 492 put_io_u(td, io_u); 493 continue; 494 } else if (io_u->ddir == DDIR_TRIM) { 495 io_u->ddir = DDIR_READ; 496 io_u->flags |= IO_U_F_TRIMMED; 497 break; 498 } else if (io_u->ddir == DDIR_WRITE) { 499 io_u->ddir = DDIR_READ; 500 break; 501 } else { 502 put_io_u(td, io_u); 503 continue; 504 } 505 } 506 507 if (!io_u) 508 break; 509 } 510 511 if (td->o.verify_async) 512 io_u->end_io = verify_io_u_async; 513 else 514 io_u->end_io = verify_io_u; 515 516 ddir = io_u->ddir; 517 518 ret = td_io_queue(td, io_u); 519 switch (ret) { 520 case FIO_Q_COMPLETED: 521 if (io_u->error) { 522 ret = -io_u->error; 523 clear_io_u(td, io_u); 524 } else if (io_u->resid) { 525 int bytes = io_u->xfer_buflen - io_u->resid; 526 527 /* 528 * zero read, fail 529 */ 530 if (!bytes) { 531 td_verror(td, EIO, "full resid"); 532 put_io_u(td, io_u); 533 break; 534 } 535 536 io_u->xfer_buflen = io_u->resid; 537 io_u->xfer_buf += bytes; 538 io_u->offset += bytes; 539 540 if (ddir_rw(io_u->ddir)) 541 td->ts.short_io_u[io_u->ddir]++; 542 543 f = io_u->file; 544 if (io_u->offset == f->real_file_size) 545 goto sync_done; 546 547 requeue_io_u(td, &io_u); 548 } else { 549sync_done: 550 ret = io_u_sync_complete(td, io_u, bytes_done); 551 if (ret < 0) 552 break; 553 } 554 continue; 555 case FIO_Q_QUEUED: 556 break; 557 case FIO_Q_BUSY: 558 requeue_io_u(td, &io_u); 559 ret2 = td_io_commit(td); 560 if (ret2 < 0) 561 ret = ret2; 562 break; 563 default: 564 assert(ret < 0); 565 td_verror(td, -ret, "td_io_queue"); 566 break; 567 } 568 569 if (break_on_this_error(td, ddir, &ret)) 570 break; 571 572 /* 573 * if we can queue more, do so. but check if there are 574 * completed io_u's first. Note that we can get BUSY even 575 * without IO queued, if the system is resource starved. 576 */ 577 full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth); 578 if (full || !td->o.iodepth_batch_complete) { 579 min_events = min(td->o.iodepth_batch_complete, 580 td->cur_depth); 581 /* 582 * if the queue is full, we MUST reap at least 1 event 583 */ 584 if (full && !min_events) 585 min_events = 1; 586 587 do { 588 /* 589 * Reap required number of io units, if any, 590 * and do the verification on them through 591 * the callback handler 592 */ 593 if (io_u_queued_complete(td, min_events, bytes_done) < 0) { 594 ret = -1; 595 break; 596 } 597 } while (full && (td->cur_depth > td->o.iodepth_low)); 598 } 599 if (ret < 0) 600 break; 601 } 602 603 check_update_rusage(td); 604 605 if (!td->error) { 606 min_events = td->cur_depth; 607 608 if (min_events) 609 ret = io_u_queued_complete(td, min_events, NULL); 610 } else 611 cleanup_pending_aio(td); 612 613 td_set_runstate(td, TD_RUNNING); 614 615 dprint(FD_VERIFY, "exiting loop\n"); 616} 617 618static int io_bytes_exceeded(struct thread_data *td) 619{ 620 unsigned long long bytes; 621 622 if (td_rw(td)) 623 bytes = td->this_io_bytes[DDIR_READ] + td->this_io_bytes[DDIR_WRITE]; 624 else if (td_write(td)) 625 bytes = td->this_io_bytes[DDIR_WRITE]; 626 else if (td_read(td)) 627 bytes = td->this_io_bytes[DDIR_READ]; 628 else 629 bytes = td->this_io_bytes[DDIR_TRIM]; 630 631 return bytes >= td->o.size; 632} 633 634/* 635 * Main IO worker function. It retrieves io_u's to process and queues 636 * and reaps them, checking for rate and errors along the way. 637 * 638 * Returns number of bytes written and trimmed. 639 */ 640static uint64_t do_io(struct thread_data *td) 641{ 642 uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 }; 643 unsigned int i; 644 int ret = 0; 645 uint64_t bytes_issued = 0; 646 647 if (in_ramp_time(td)) 648 td_set_runstate(td, TD_RAMP); 649 else 650 td_set_runstate(td, TD_RUNNING); 651 652 while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) || 653 (!flist_empty(&td->trim_list)) || !io_bytes_exceeded(td) || 654 td->o.time_based) { 655 struct timeval comp_time; 656 int min_evts = 0; 657 struct io_u *io_u; 658 int ret2, full; 659 enum fio_ddir ddir; 660 661 check_update_rusage(td); 662 663 if (td->terminate || td->done) 664 break; 665 666 update_tv_cache(td); 667 668 if (runtime_exceeded(td, &td->tv_cache)) { 669 __update_tv_cache(td); 670 if (runtime_exceeded(td, &td->tv_cache)) { 671 td->terminate = 1; 672 break; 673 } 674 } 675 676 if (flow_threshold_exceeded(td)) 677 continue; 678 679 if (bytes_issued >= (uint64_t) td->o.size) 680 break; 681 682 io_u = get_io_u(td); 683 if (!io_u) 684 break; 685 686 ddir = io_u->ddir; 687 688 /* 689 * Add verification end_io handler if: 690 * - Asked to verify (!td_rw(td)) 691 * - Or the io_u is from our verify list (mixed write/ver) 692 */ 693 if (td->o.verify != VERIFY_NONE && io_u->ddir == DDIR_READ && 694 ((io_u->flags & IO_U_F_VER_LIST) || !td_rw(td))) { 695 if (td->o.verify_async) 696 io_u->end_io = verify_io_u_async; 697 else 698 io_u->end_io = verify_io_u; 699 td_set_runstate(td, TD_VERIFYING); 700 } else if (in_ramp_time(td)) 701 td_set_runstate(td, TD_RAMP); 702 else 703 td_set_runstate(td, TD_RUNNING); 704 705 ret = td_io_queue(td, io_u); 706 switch (ret) { 707 case FIO_Q_COMPLETED: 708 if (io_u->error) { 709 ret = -io_u->error; 710 clear_io_u(td, io_u); 711 } else if (io_u->resid) { 712 int bytes = io_u->xfer_buflen - io_u->resid; 713 struct fio_file *f = io_u->file; 714 715 bytes_issued += bytes; 716 /* 717 * zero read, fail 718 */ 719 if (!bytes) { 720 td_verror(td, EIO, "full resid"); 721 put_io_u(td, io_u); 722 break; 723 } 724 725 io_u->xfer_buflen = io_u->resid; 726 io_u->xfer_buf += bytes; 727 io_u->offset += bytes; 728 729 if (ddir_rw(io_u->ddir)) 730 td->ts.short_io_u[io_u->ddir]++; 731 732 if (io_u->offset == f->real_file_size) 733 goto sync_done; 734 735 requeue_io_u(td, &io_u); 736 } else { 737sync_done: 738 if (__should_check_rate(td, DDIR_READ) || 739 __should_check_rate(td, DDIR_WRITE) || 740 __should_check_rate(td, DDIR_TRIM)) 741 fio_gettime(&comp_time, NULL); 742 743 ret = io_u_sync_complete(td, io_u, bytes_done); 744 if (ret < 0) 745 break; 746 bytes_issued += io_u->xfer_buflen; 747 } 748 break; 749 case FIO_Q_QUEUED: 750 /* 751 * if the engine doesn't have a commit hook, 752 * the io_u is really queued. if it does have such 753 * a hook, it has to call io_u_queued() itself. 754 */ 755 if (td->io_ops->commit == NULL) 756 io_u_queued(td, io_u); 757 bytes_issued += io_u->xfer_buflen; 758 break; 759 case FIO_Q_BUSY: 760 requeue_io_u(td, &io_u); 761 ret2 = td_io_commit(td); 762 if (ret2 < 0) 763 ret = ret2; 764 break; 765 default: 766 assert(ret < 0); 767 put_io_u(td, io_u); 768 break; 769 } 770 771 if (break_on_this_error(td, ddir, &ret)) 772 break; 773 774 /* 775 * See if we need to complete some commands. Note that we 776 * can get BUSY even without IO queued, if the system is 777 * resource starved. 778 */ 779 full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth); 780 if (full || !td->o.iodepth_batch_complete) { 781 min_evts = min(td->o.iodepth_batch_complete, 782 td->cur_depth); 783 /* 784 * if the queue is full, we MUST reap at least 1 event 785 */ 786 if (full && !min_evts) 787 min_evts = 1; 788 789 if (__should_check_rate(td, DDIR_READ) || 790 __should_check_rate(td, DDIR_WRITE) || 791 __should_check_rate(td, DDIR_TRIM)) 792 fio_gettime(&comp_time, NULL); 793 794 do { 795 ret = io_u_queued_complete(td, min_evts, bytes_done); 796 if (ret < 0) 797 break; 798 799 } while (full && (td->cur_depth > td->o.iodepth_low)); 800 } 801 802 if (ret < 0) 803 break; 804 if (!ddir_rw_sum(bytes_done) && !(td->io_ops->flags & FIO_NOIO)) 805 continue; 806 807 if (!in_ramp_time(td) && should_check_rate(td, bytes_done)) { 808 if (check_min_rate(td, &comp_time, bytes_done)) { 809 if (exitall_on_terminate) 810 fio_terminate_threads(td->groupid); 811 td_verror(td, EIO, "check_min_rate"); 812 break; 813 } 814 } 815 816 if (td->o.thinktime) { 817 unsigned long long b; 818 819 b = ddir_rw_sum(td->io_blocks); 820 if (!(b % td->o.thinktime_blocks)) { 821 int left; 822 823 io_u_quiesce(td); 824 825 if (td->o.thinktime_spin) 826 usec_spin(td->o.thinktime_spin); 827 828 left = td->o.thinktime - td->o.thinktime_spin; 829 if (left) 830 usec_sleep(td, left); 831 } 832 } 833 } 834 835 check_update_rusage(td); 836 837 if (td->trim_entries) 838 log_err("fio: %lu trim entries leaked?\n", td->trim_entries); 839 840 if (td->o.fill_device && td->error == ENOSPC) { 841 td->error = 0; 842 td->terminate = 1; 843 } 844 if (!td->error) { 845 struct fio_file *f; 846 847 i = td->cur_depth; 848 if (i) { 849 ret = io_u_queued_complete(td, i, bytes_done); 850 if (td->o.fill_device && td->error == ENOSPC) 851 td->error = 0; 852 } 853 854 if (should_fsync(td) && td->o.end_fsync) { 855 td_set_runstate(td, TD_FSYNCING); 856 857 for_each_file(td, f, i) { 858 if (!fio_file_fsync(td, f)) 859 continue; 860 861 log_err("fio: end_fsync failed for file %s\n", 862 f->file_name); 863 } 864 } 865 } else 866 cleanup_pending_aio(td); 867 868 /* 869 * stop job if we failed doing any IO 870 */ 871 if (!ddir_rw_sum(td->this_io_bytes)) 872 td->done = 1; 873 874 return bytes_done[DDIR_WRITE] + bytes_done[DDIR_TRIM]; 875} 876 877static void cleanup_io_u(struct thread_data *td) 878{ 879 struct io_u *io_u; 880 881 while ((io_u = io_u_qpop(&td->io_u_freelist)) != NULL) { 882 883 if (td->io_ops->io_u_free) 884 td->io_ops->io_u_free(td, io_u); 885 886 fio_memfree(io_u, sizeof(*io_u)); 887 } 888 889 free_io_mem(td); 890 891 io_u_rexit(&td->io_u_requeues); 892 io_u_qexit(&td->io_u_freelist); 893 io_u_qexit(&td->io_u_all); 894} 895 896static int init_io_u(struct thread_data *td) 897{ 898 struct io_u *io_u; 899 unsigned int max_bs, min_write; 900 int cl_align, i, max_units; 901 int data_xfer = 1, err; 902 char *p; 903 904 max_units = td->o.iodepth; 905 max_bs = td_max_bs(td); 906 min_write = td->o.min_bs[DDIR_WRITE]; 907 td->orig_buffer_size = (unsigned long long) max_bs 908 * (unsigned long long) max_units; 909 910 if ((td->io_ops->flags & FIO_NOIO) || !(td_read(td) || td_write(td))) 911 data_xfer = 0; 912 913 err = 0; 914 err += io_u_rinit(&td->io_u_requeues, td->o.iodepth); 915 err += io_u_qinit(&td->io_u_freelist, td->o.iodepth); 916 err += io_u_qinit(&td->io_u_all, td->o.iodepth); 917 918 if (err) { 919 log_err("fio: failed setting up IO queues\n"); 920 return 1; 921 } 922 923 /* 924 * if we may later need to do address alignment, then add any 925 * possible adjustment here so that we don't cause a buffer 926 * overflow later. this adjustment may be too much if we get 927 * lucky and the allocator gives us an aligned address. 928 */ 929 if (td->o.odirect || td->o.mem_align || (td->io_ops->flags & FIO_RAWIO)) 930 td->orig_buffer_size += page_mask + td->o.mem_align; 931 932 if (td->o.mem_type == MEM_SHMHUGE || td->o.mem_type == MEM_MMAPHUGE) { 933 unsigned long bs; 934 935 bs = td->orig_buffer_size + td->o.hugepage_size - 1; 936 td->orig_buffer_size = bs & ~(td->o.hugepage_size - 1); 937 } 938 939 if (td->orig_buffer_size != (size_t) td->orig_buffer_size) { 940 log_err("fio: IO memory too large. Reduce max_bs or iodepth\n"); 941 return 1; 942 } 943 944 if (data_xfer && allocate_io_mem(td)) 945 return 1; 946 947 if (td->o.odirect || td->o.mem_align || 948 (td->io_ops->flags & FIO_RAWIO)) 949 p = PAGE_ALIGN(td->orig_buffer) + td->o.mem_align; 950 else 951 p = td->orig_buffer; 952 953 cl_align = os_cache_line_size(); 954 955 for (i = 0; i < max_units; i++) { 956 void *ptr; 957 958 if (td->terminate) 959 return 1; 960 961 ptr = fio_memalign(cl_align, sizeof(*io_u)); 962 if (!ptr) { 963 log_err("fio: unable to allocate aligned memory\n"); 964 break; 965 } 966 967 io_u = ptr; 968 memset(io_u, 0, sizeof(*io_u)); 969 INIT_FLIST_HEAD(&io_u->verify_list); 970 dprint(FD_MEM, "io_u alloc %p, index %u\n", io_u, i); 971 972 if (data_xfer) { 973 io_u->buf = p; 974 dprint(FD_MEM, "io_u %p, mem %p\n", io_u, io_u->buf); 975 976 if (td_write(td)) 977 io_u_fill_buffer(td, io_u, min_write, max_bs); 978 if (td_write(td) && td->o.verify_pattern_bytes) { 979 /* 980 * Fill the buffer with the pattern if we are 981 * going to be doing writes. 982 */ 983 fill_pattern(td, io_u->buf, max_bs, io_u, 0, 0); 984 } 985 } 986 987 io_u->index = i; 988 io_u->flags = IO_U_F_FREE; 989 io_u_qpush(&td->io_u_freelist, io_u); 990 991 /* 992 * io_u never leaves this stack, used for iteration of all 993 * io_u buffers. 994 */ 995 io_u_qpush(&td->io_u_all, io_u); 996 997 if (td->io_ops->io_u_init) { 998 int ret = td->io_ops->io_u_init(td, io_u); 999 1000 if (ret) { 1001 log_err("fio: failed to init engine data: %d\n", ret); 1002 return 1; 1003 } 1004 } 1005 1006 p += max_bs; 1007 } 1008 1009 return 0; 1010} 1011 1012static int switch_ioscheduler(struct thread_data *td) 1013{ 1014 char tmp[256], tmp2[128]; 1015 FILE *f; 1016 int ret; 1017 1018 if (td->io_ops->flags & FIO_DISKLESSIO) 1019 return 0; 1020 1021 sprintf(tmp, "%s/queue/scheduler", td->sysfs_root); 1022 1023 f = fopen(tmp, "r+"); 1024 if (!f) { 1025 if (errno == ENOENT) { 1026 log_err("fio: os or kernel doesn't support IO scheduler" 1027 " switching\n"); 1028 return 0; 1029 } 1030 td_verror(td, errno, "fopen iosched"); 1031 return 1; 1032 } 1033 1034 /* 1035 * Set io scheduler. 1036 */ 1037 ret = fwrite(td->o.ioscheduler, strlen(td->o.ioscheduler), 1, f); 1038 if (ferror(f) || ret != 1) { 1039 td_verror(td, errno, "fwrite"); 1040 fclose(f); 1041 return 1; 1042 } 1043 1044 rewind(f); 1045 1046 /* 1047 * Read back and check that the selected scheduler is now the default. 1048 */ 1049 ret = fread(tmp, 1, sizeof(tmp), f); 1050 if (ferror(f) || ret < 0) { 1051 td_verror(td, errno, "fread"); 1052 fclose(f); 1053 return 1; 1054 } 1055 1056 sprintf(tmp2, "[%s]", td->o.ioscheduler); 1057 if (!strstr(tmp, tmp2)) { 1058 log_err("fio: io scheduler %s not found\n", td->o.ioscheduler); 1059 td_verror(td, EINVAL, "iosched_switch"); 1060 fclose(f); 1061 return 1; 1062 } 1063 1064 fclose(f); 1065 return 0; 1066} 1067 1068static int keep_running(struct thread_data *td) 1069{ 1070 if (td->done) 1071 return 0; 1072 if (td->o.time_based) 1073 return 1; 1074 if (td->o.loops) { 1075 td->o.loops--; 1076 return 1; 1077 } 1078 1079 if (td->o.size != -1ULL && ddir_rw_sum(td->io_bytes) < td->o.size) { 1080 uint64_t diff; 1081 1082 /* 1083 * If the difference is less than the minimum IO size, we 1084 * are done. 1085 */ 1086 diff = td->o.size - ddir_rw_sum(td->io_bytes); 1087 if (diff < td_max_bs(td)) 1088 return 0; 1089 1090 return 1; 1091 } 1092 1093 return 0; 1094} 1095 1096static int exec_string(struct thread_options *o, const char *string, const char *mode) 1097{ 1098 int ret, newlen = strlen(string) + strlen(o->name) + strlen(mode) + 9 + 1; 1099 char *str; 1100 1101 str = malloc(newlen); 1102 sprintf(str, "%s &> %s.%s.txt", string, o->name, mode); 1103 1104 log_info("%s : Saving output of %s in %s.%s.txt\n",o->name, mode, o->name, mode); 1105 ret = system(str); 1106 if (ret == -1) 1107 log_err("fio: exec of cmd <%s> failed\n", str); 1108 1109 free(str); 1110 return ret; 1111} 1112 1113/* 1114 * Entry point for the thread based jobs. The process based jobs end up 1115 * here as well, after a little setup. 1116 */ 1117static void *thread_main(void *data) 1118{ 1119 unsigned long long elapsed; 1120 struct thread_data *td = data; 1121 struct thread_options *o = &td->o; 1122 pthread_condattr_t attr; 1123 int clear_state; 1124 int ret; 1125 1126 if (!o->use_thread) { 1127 setsid(); 1128 td->pid = getpid(); 1129 } else 1130 td->pid = gettid(); 1131 1132 fio_local_clock_init(o->use_thread); 1133 1134 dprint(FD_PROCESS, "jobs pid=%d started\n", (int) td->pid); 1135 1136 if (is_backend) 1137 fio_server_send_start(td); 1138 1139 INIT_FLIST_HEAD(&td->io_log_list); 1140 INIT_FLIST_HEAD(&td->io_hist_list); 1141 INIT_FLIST_HEAD(&td->verify_list); 1142 INIT_FLIST_HEAD(&td->trim_list); 1143 INIT_FLIST_HEAD(&td->next_rand_list); 1144 pthread_mutex_init(&td->io_u_lock, NULL); 1145 td->io_hist_tree = RB_ROOT; 1146 1147 pthread_condattr_init(&attr); 1148 pthread_cond_init(&td->verify_cond, &attr); 1149 pthread_cond_init(&td->free_cond, &attr); 1150 1151 td_set_runstate(td, TD_INITIALIZED); 1152 dprint(FD_MUTEX, "up startup_mutex\n"); 1153 fio_mutex_up(startup_mutex); 1154 dprint(FD_MUTEX, "wait on td->mutex\n"); 1155 fio_mutex_down(td->mutex); 1156 dprint(FD_MUTEX, "done waiting on td->mutex\n"); 1157 1158 /* 1159 * the ->mutex mutex is now no longer used, close it to avoid 1160 * eating a file descriptor 1161 */ 1162 fio_mutex_remove(td->mutex); 1163 td->mutex = NULL; 1164 1165 /* 1166 * A new gid requires privilege, so we need to do this before setting 1167 * the uid. 1168 */ 1169 if (o->gid != -1U && setgid(o->gid)) { 1170 td_verror(td, errno, "setgid"); 1171 goto err; 1172 } 1173 if (o->uid != -1U && setuid(o->uid)) { 1174 td_verror(td, errno, "setuid"); 1175 goto err; 1176 } 1177 1178 /* 1179 * If we have a gettimeofday() thread, make sure we exclude that 1180 * thread from this job 1181 */ 1182 if (o->gtod_cpu) 1183 fio_cpu_clear(&o->cpumask, o->gtod_cpu); 1184 1185 /* 1186 * Set affinity first, in case it has an impact on the memory 1187 * allocations. 1188 */ 1189 if (o->cpumask_set) { 1190 ret = fio_setaffinity(td->pid, o->cpumask); 1191 if (ret == -1) { 1192 td_verror(td, errno, "cpu_set_affinity"); 1193 goto err; 1194 } 1195 } 1196 1197#ifdef CONFIG_LIBNUMA 1198 /* numa node setup */ 1199 if (o->numa_cpumask_set || o->numa_memmask_set) { 1200 int ret; 1201 1202 if (numa_available() < 0) { 1203 td_verror(td, errno, "Does not support NUMA API\n"); 1204 goto err; 1205 } 1206 1207 if (o->numa_cpumask_set) { 1208 ret = numa_run_on_node_mask(o->numa_cpunodesmask); 1209 if (ret == -1) { 1210 td_verror(td, errno, \ 1211 "numa_run_on_node_mask failed\n"); 1212 goto err; 1213 } 1214 } 1215 1216 if (o->numa_memmask_set) { 1217 1218 switch (o->numa_mem_mode) { 1219 case MPOL_INTERLEAVE: 1220 numa_set_interleave_mask(o->numa_memnodesmask); 1221 break; 1222 case MPOL_BIND: 1223 numa_set_membind(o->numa_memnodesmask); 1224 break; 1225 case MPOL_LOCAL: 1226 numa_set_localalloc(); 1227 break; 1228 case MPOL_PREFERRED: 1229 numa_set_preferred(o->numa_mem_prefer_node); 1230 break; 1231 case MPOL_DEFAULT: 1232 default: 1233 break; 1234 } 1235 1236 } 1237 } 1238#endif 1239 1240 if (fio_pin_memory(td)) 1241 goto err; 1242 1243 /* 1244 * May alter parameters that init_io_u() will use, so we need to 1245 * do this first. 1246 */ 1247 if (init_iolog(td)) 1248 goto err; 1249 1250 if (init_io_u(td)) 1251 goto err; 1252 1253 if (o->verify_async && verify_async_init(td)) 1254 goto err; 1255 1256 if (o->ioprio) { 1257 ret = ioprio_set(IOPRIO_WHO_PROCESS, 0, o->ioprio_class, o->ioprio); 1258 if (ret == -1) { 1259 td_verror(td, errno, "ioprio_set"); 1260 goto err; 1261 } 1262 } 1263 1264 if (o->cgroup && cgroup_setup(td, cgroup_list, &cgroup_mnt)) 1265 goto err; 1266 1267 errno = 0; 1268 if (nice(o->nice) == -1 && errno != 0) { 1269 td_verror(td, errno, "nice"); 1270 goto err; 1271 } 1272 1273 if (o->ioscheduler && switch_ioscheduler(td)) 1274 goto err; 1275 1276 if (!o->create_serialize && setup_files(td)) 1277 goto err; 1278 1279 if (td_io_init(td)) 1280 goto err; 1281 1282 if (init_random_map(td)) 1283 goto err; 1284 1285 if (o->exec_prerun && exec_string(o, o->exec_prerun, (const char *)"prerun")) 1286 goto err; 1287 1288 if (o->pre_read) { 1289 if (pre_read_files(td) < 0) 1290 goto err; 1291 } 1292 1293 fio_verify_init(td); 1294 1295 fio_gettime(&td->epoch, NULL); 1296 fio_getrusage(&td->ru_start); 1297 clear_state = 0; 1298 while (keep_running(td)) { 1299 uint64_t verify_bytes; 1300 1301 fio_gettime(&td->start, NULL); 1302 memcpy(&td->bw_sample_time, &td->start, sizeof(td->start)); 1303 memcpy(&td->iops_sample_time, &td->start, sizeof(td->start)); 1304 memcpy(&td->tv_cache, &td->start, sizeof(td->start)); 1305 1306 if (o->ratemin[DDIR_READ] || o->ratemin[DDIR_WRITE] || 1307 o->ratemin[DDIR_TRIM]) { 1308 memcpy(&td->lastrate[DDIR_READ], &td->bw_sample_time, 1309 sizeof(td->bw_sample_time)); 1310 memcpy(&td->lastrate[DDIR_WRITE], &td->bw_sample_time, 1311 sizeof(td->bw_sample_time)); 1312 memcpy(&td->lastrate[DDIR_TRIM], &td->bw_sample_time, 1313 sizeof(td->bw_sample_time)); 1314 } 1315 1316 if (clear_state) 1317 clear_io_state(td); 1318 1319 prune_io_piece_log(td); 1320 1321 verify_bytes = do_io(td); 1322 1323 clear_state = 1; 1324 1325 if (td_read(td) && td->io_bytes[DDIR_READ]) { 1326 elapsed = utime_since_now(&td->start); 1327 td->ts.runtime[DDIR_READ] += elapsed; 1328 } 1329 if (td_write(td) && td->io_bytes[DDIR_WRITE]) { 1330 elapsed = utime_since_now(&td->start); 1331 td->ts.runtime[DDIR_WRITE] += elapsed; 1332 } 1333 if (td_trim(td) && td->io_bytes[DDIR_TRIM]) { 1334 elapsed = utime_since_now(&td->start); 1335 td->ts.runtime[DDIR_TRIM] += elapsed; 1336 } 1337 1338 if (td->error || td->terminate) 1339 break; 1340 1341 if (!o->do_verify || 1342 o->verify == VERIFY_NONE || 1343 (td->io_ops->flags & FIO_UNIDIR)) 1344 continue; 1345 1346 clear_io_state(td); 1347 1348 fio_gettime(&td->start, NULL); 1349 1350 do_verify(td, verify_bytes); 1351 1352 td->ts.runtime[DDIR_READ] += utime_since_now(&td->start); 1353 1354 if (td->error || td->terminate) 1355 break; 1356 } 1357 1358 update_rusage_stat(td); 1359 td->ts.runtime[DDIR_READ] = (td->ts.runtime[DDIR_READ] + 999) / 1000; 1360 td->ts.runtime[DDIR_WRITE] = (td->ts.runtime[DDIR_WRITE] + 999) / 1000; 1361 td->ts.runtime[DDIR_TRIM] = (td->ts.runtime[DDIR_TRIM] + 999) / 1000; 1362 td->ts.total_run_time = mtime_since_now(&td->epoch); 1363 td->ts.io_bytes[DDIR_READ] = td->io_bytes[DDIR_READ]; 1364 td->ts.io_bytes[DDIR_WRITE] = td->io_bytes[DDIR_WRITE]; 1365 td->ts.io_bytes[DDIR_TRIM] = td->io_bytes[DDIR_TRIM]; 1366 1367 fio_unpin_memory(td); 1368 1369 fio_mutex_down(writeout_mutex); 1370 if (td->bw_log) { 1371 if (o->bw_log_file) { 1372 finish_log_named(td, td->bw_log, 1373 o->bw_log_file, "bw"); 1374 } else 1375 finish_log(td, td->bw_log, "bw"); 1376 } 1377 if (td->lat_log) { 1378 if (o->lat_log_file) { 1379 finish_log_named(td, td->lat_log, 1380 o->lat_log_file, "lat"); 1381 } else 1382 finish_log(td, td->lat_log, "lat"); 1383 } 1384 if (td->slat_log) { 1385 if (o->lat_log_file) { 1386 finish_log_named(td, td->slat_log, 1387 o->lat_log_file, "slat"); 1388 } else 1389 finish_log(td, td->slat_log, "slat"); 1390 } 1391 if (td->clat_log) { 1392 if (o->lat_log_file) { 1393 finish_log_named(td, td->clat_log, 1394 o->lat_log_file, "clat"); 1395 } else 1396 finish_log(td, td->clat_log, "clat"); 1397 } 1398 if (td->iops_log) { 1399 if (o->iops_log_file) { 1400 finish_log_named(td, td->iops_log, 1401 o->iops_log_file, "iops"); 1402 } else 1403 finish_log(td, td->iops_log, "iops"); 1404 } 1405 1406 fio_mutex_up(writeout_mutex); 1407 if (o->exec_postrun) 1408 exec_string(o, o->exec_postrun, (const char *)"postrun"); 1409 1410 if (exitall_on_terminate) 1411 fio_terminate_threads(td->groupid); 1412 1413err: 1414 if (td->error) 1415 log_info("fio: pid=%d, err=%d/%s\n", (int) td->pid, td->error, 1416 td->verror); 1417 1418 if (o->verify_async) 1419 verify_async_exit(td); 1420 1421 close_and_free_files(td); 1422 cleanup_io_u(td); 1423 close_ioengine(td); 1424 cgroup_shutdown(td, &cgroup_mnt); 1425 1426 if (o->cpumask_set) { 1427 int ret = fio_cpuset_exit(&o->cpumask); 1428 1429 td_verror(td, ret, "fio_cpuset_exit"); 1430 } 1431 1432 /* 1433 * do this very late, it will log file closing as well 1434 */ 1435 if (o->write_iolog_file) 1436 write_iolog_close(td); 1437 1438 fio_mutex_remove(td->rusage_sem); 1439 td->rusage_sem = NULL; 1440 1441 td_set_runstate(td, TD_EXITED); 1442 return (void *) (uintptr_t) td->error; 1443} 1444 1445 1446/* 1447 * We cannot pass the td data into a forked process, so attach the td and 1448 * pass it to the thread worker. 1449 */ 1450static int fork_main(int shmid, int offset) 1451{ 1452 struct thread_data *td; 1453 void *data, *ret; 1454 1455#ifndef __hpux 1456 data = shmat(shmid, NULL, 0); 1457 if (data == (void *) -1) { 1458 int __err = errno; 1459 1460 perror("shmat"); 1461 return __err; 1462 } 1463#else 1464 /* 1465 * HP-UX inherits shm mappings? 1466 */ 1467 data = threads; 1468#endif 1469 1470 td = data + offset * sizeof(struct thread_data); 1471 ret = thread_main(td); 1472 shmdt(data); 1473 return (int) (uintptr_t) ret; 1474} 1475 1476/* 1477 * Run over the job map and reap the threads that have exited, if any. 1478 */ 1479static void reap_threads(unsigned int *nr_running, unsigned int *t_rate, 1480 unsigned int *m_rate) 1481{ 1482 struct thread_data *td; 1483 unsigned int cputhreads, realthreads, pending; 1484 int i, status, ret; 1485 1486 /* 1487 * reap exited threads (TD_EXITED -> TD_REAPED) 1488 */ 1489 realthreads = pending = cputhreads = 0; 1490 for_each_td(td, i) { 1491 int flags = 0; 1492 1493 /* 1494 * ->io_ops is NULL for a thread that has closed its 1495 * io engine 1496 */ 1497 if (td->io_ops && !strcmp(td->io_ops->name, "cpuio")) 1498 cputhreads++; 1499 else 1500 realthreads++; 1501 1502 if (!td->pid) { 1503 pending++; 1504 continue; 1505 } 1506 if (td->runstate == TD_REAPED) 1507 continue; 1508 if (td->o.use_thread) { 1509 if (td->runstate == TD_EXITED) { 1510 td_set_runstate(td, TD_REAPED); 1511 goto reaped; 1512 } 1513 continue; 1514 } 1515 1516 flags = WNOHANG; 1517 if (td->runstate == TD_EXITED) 1518 flags = 0; 1519 1520 /* 1521 * check if someone quit or got killed in an unusual way 1522 */ 1523 ret = waitpid(td->pid, &status, flags); 1524 if (ret < 0) { 1525 if (errno == ECHILD) { 1526 log_err("fio: pid=%d disappeared %d\n", 1527 (int) td->pid, td->runstate); 1528 td->sig = ECHILD; 1529 td_set_runstate(td, TD_REAPED); 1530 goto reaped; 1531 } 1532 perror("waitpid"); 1533 } else if (ret == td->pid) { 1534 if (WIFSIGNALED(status)) { 1535 int sig = WTERMSIG(status); 1536 1537 if (sig != SIGTERM && sig != SIGUSR2) 1538 log_err("fio: pid=%d, got signal=%d\n", 1539 (int) td->pid, sig); 1540 td->sig = sig; 1541 td_set_runstate(td, TD_REAPED); 1542 goto reaped; 1543 } 1544 if (WIFEXITED(status)) { 1545 if (WEXITSTATUS(status) && !td->error) 1546 td->error = WEXITSTATUS(status); 1547 1548 td_set_runstate(td, TD_REAPED); 1549 goto reaped; 1550 } 1551 } 1552 1553 /* 1554 * thread is not dead, continue 1555 */ 1556 pending++; 1557 continue; 1558reaped: 1559 (*nr_running)--; 1560 (*m_rate) -= ddir_rw_sum(td->o.ratemin); 1561 (*t_rate) -= ddir_rw_sum(td->o.rate); 1562 if (!td->pid) 1563 pending--; 1564 1565 if (td->error) 1566 exit_value++; 1567 1568 done_secs += mtime_since_now(&td->epoch) / 1000; 1569 profile_td_exit(td); 1570 } 1571 1572 if (*nr_running == cputhreads && !pending && realthreads) 1573 fio_terminate_threads(TERMINATE_ALL); 1574} 1575 1576static void do_usleep(unsigned int usecs) 1577{ 1578 check_for_running_stats(); 1579 usleep(usecs); 1580} 1581 1582/* 1583 * Main function for kicking off and reaping jobs, as needed. 1584 */ 1585static void run_threads(void) 1586{ 1587 struct thread_data *td; 1588 unsigned long spent; 1589 unsigned int i, todo, nr_running, m_rate, t_rate, nr_started; 1590 1591 if (fio_gtod_offload && fio_start_gtod_thread()) 1592 return; 1593 1594 fio_idle_prof_init(); 1595 1596 set_sig_handlers(); 1597 1598 nr_thread = nr_process = 0; 1599 for_each_td(td, i) { 1600 if (td->o.use_thread) 1601 nr_thread++; 1602 else 1603 nr_process++; 1604 } 1605 1606 if (output_format == FIO_OUTPUT_NORMAL) { 1607 log_info("Starting "); 1608 if (nr_thread) 1609 log_info("%d thread%s", nr_thread, 1610 nr_thread > 1 ? "s" : ""); 1611 if (nr_process) { 1612 if (nr_thread) 1613 log_info(" and "); 1614 log_info("%d process%s", nr_process, 1615 nr_process > 1 ? "es" : ""); 1616 } 1617 log_info("\n"); 1618 fflush(stdout); 1619 } 1620 1621 todo = thread_number; 1622 nr_running = 0; 1623 nr_started = 0; 1624 m_rate = t_rate = 0; 1625 1626 for_each_td(td, i) { 1627 print_status_init(td->thread_number - 1); 1628 1629 if (!td->o.create_serialize) 1630 continue; 1631 1632 /* 1633 * do file setup here so it happens sequentially, 1634 * we don't want X number of threads getting their 1635 * client data interspersed on disk 1636 */ 1637 if (setup_files(td)) { 1638 exit_value++; 1639 if (td->error) 1640 log_err("fio: pid=%d, err=%d/%s\n", 1641 (int) td->pid, td->error, td->verror); 1642 td_set_runstate(td, TD_REAPED); 1643 todo--; 1644 } else { 1645 struct fio_file *f; 1646 unsigned int j; 1647 1648 /* 1649 * for sharing to work, each job must always open 1650 * its own files. so close them, if we opened them 1651 * for creation 1652 */ 1653 for_each_file(td, f, j) { 1654 if (fio_file_open(f)) 1655 td_io_close_file(td, f); 1656 } 1657 } 1658 } 1659 1660 /* start idle threads before io threads start to run */ 1661 fio_idle_prof_start(); 1662 1663 set_genesis_time(); 1664 1665 while (todo) { 1666 struct thread_data *map[REAL_MAX_JOBS]; 1667 struct timeval this_start; 1668 int this_jobs = 0, left; 1669 1670 /* 1671 * create threads (TD_NOT_CREATED -> TD_CREATED) 1672 */ 1673 for_each_td(td, i) { 1674 if (td->runstate != TD_NOT_CREATED) 1675 continue; 1676 1677 /* 1678 * never got a chance to start, killed by other 1679 * thread for some reason 1680 */ 1681 if (td->terminate) { 1682 todo--; 1683 continue; 1684 } 1685 1686 if (td->o.start_delay) { 1687 spent = mtime_since_genesis(); 1688 1689 if (td->o.start_delay * 1000 > spent) 1690 continue; 1691 } 1692 1693 if (td->o.stonewall && (nr_started || nr_running)) { 1694 dprint(FD_PROCESS, "%s: stonewall wait\n", 1695 td->o.name); 1696 break; 1697 } 1698 1699 init_disk_util(td); 1700 1701 td->rusage_sem = fio_mutex_init(FIO_MUTEX_LOCKED); 1702 td->update_rusage = 0; 1703 1704 /* 1705 * Set state to created. Thread will transition 1706 * to TD_INITIALIZED when it's done setting up. 1707 */ 1708 td_set_runstate(td, TD_CREATED); 1709 map[this_jobs++] = td; 1710 nr_started++; 1711 1712 if (td->o.use_thread) { 1713 int ret; 1714 1715 dprint(FD_PROCESS, "will pthread_create\n"); 1716 ret = pthread_create(&td->thread, NULL, 1717 thread_main, td); 1718 if (ret) { 1719 log_err("pthread_create: %s\n", 1720 strerror(ret)); 1721 nr_started--; 1722 break; 1723 } 1724 ret = pthread_detach(td->thread); 1725 if (ret) 1726 log_err("pthread_detach: %s", 1727 strerror(ret)); 1728 } else { 1729 pid_t pid; 1730 dprint(FD_PROCESS, "will fork\n"); 1731 pid = fork(); 1732 if (!pid) { 1733 int ret = fork_main(shm_id, i); 1734 1735 _exit(ret); 1736 } else if (i == fio_debug_jobno) 1737 *fio_debug_jobp = pid; 1738 } 1739 dprint(FD_MUTEX, "wait on startup_mutex\n"); 1740 if (fio_mutex_down_timeout(startup_mutex, 10)) { 1741 log_err("fio: job startup hung? exiting.\n"); 1742 fio_terminate_threads(TERMINATE_ALL); 1743 fio_abort = 1; 1744 nr_started--; 1745 break; 1746 } 1747 dprint(FD_MUTEX, "done waiting on startup_mutex\n"); 1748 } 1749 1750 /* 1751 * Wait for the started threads to transition to 1752 * TD_INITIALIZED. 1753 */ 1754 fio_gettime(&this_start, NULL); 1755 left = this_jobs; 1756 while (left && !fio_abort) { 1757 if (mtime_since_now(&this_start) > JOB_START_TIMEOUT) 1758 break; 1759 1760 do_usleep(100000); 1761 1762 for (i = 0; i < this_jobs; i++) { 1763 td = map[i]; 1764 if (!td) 1765 continue; 1766 if (td->runstate == TD_INITIALIZED) { 1767 map[i] = NULL; 1768 left--; 1769 } else if (td->runstate >= TD_EXITED) { 1770 map[i] = NULL; 1771 left--; 1772 todo--; 1773 nr_running++; /* work-around... */ 1774 } 1775 } 1776 } 1777 1778 if (left) { 1779 log_err("fio: %d job%s failed to start\n", left, 1780 left > 1 ? "s" : ""); 1781 for (i = 0; i < this_jobs; i++) { 1782 td = map[i]; 1783 if (!td) 1784 continue; 1785 kill(td->pid, SIGTERM); 1786 } 1787 break; 1788 } 1789 1790 /* 1791 * start created threads (TD_INITIALIZED -> TD_RUNNING). 1792 */ 1793 for_each_td(td, i) { 1794 if (td->runstate != TD_INITIALIZED) 1795 continue; 1796 1797 if (in_ramp_time(td)) 1798 td_set_runstate(td, TD_RAMP); 1799 else 1800 td_set_runstate(td, TD_RUNNING); 1801 nr_running++; 1802 nr_started--; 1803 m_rate += ddir_rw_sum(td->o.ratemin); 1804 t_rate += ddir_rw_sum(td->o.rate); 1805 todo--; 1806 fio_mutex_up(td->mutex); 1807 } 1808 1809 reap_threads(&nr_running, &t_rate, &m_rate); 1810 1811 if (todo) 1812 do_usleep(100000); 1813 } 1814 1815 while (nr_running) { 1816 reap_threads(&nr_running, &t_rate, &m_rate); 1817 do_usleep(10000); 1818 } 1819 1820 fio_idle_prof_stop(); 1821 1822 update_io_ticks(); 1823} 1824 1825void wait_for_disk_thread_exit(void) 1826{ 1827 fio_mutex_down(disk_thread_mutex); 1828} 1829 1830static void free_disk_util(void) 1831{ 1832 disk_util_start_exit(); 1833 wait_for_disk_thread_exit(); 1834 disk_util_prune_entries(); 1835} 1836 1837static void *disk_thread_main(void *data) 1838{ 1839 int ret = 0; 1840 1841 fio_mutex_up(startup_mutex); 1842 1843 while (threads && !ret) { 1844 usleep(DISK_UTIL_MSEC * 1000); 1845 if (!threads) 1846 break; 1847 ret = update_io_ticks(); 1848 1849 if (!is_backend) 1850 print_thread_status(); 1851 } 1852 1853 fio_mutex_up(disk_thread_mutex); 1854 return NULL; 1855} 1856 1857static int create_disk_util_thread(void) 1858{ 1859 int ret; 1860 1861 setup_disk_util(); 1862 1863 disk_thread_mutex = fio_mutex_init(FIO_MUTEX_LOCKED); 1864 1865 ret = pthread_create(&disk_util_thread, NULL, disk_thread_main, NULL); 1866 if (ret) { 1867 fio_mutex_remove(disk_thread_mutex); 1868 log_err("Can't create disk util thread: %s\n", strerror(ret)); 1869 return 1; 1870 } 1871 1872 ret = pthread_detach(disk_util_thread); 1873 if (ret) { 1874 fio_mutex_remove(disk_thread_mutex); 1875 log_err("Can't detatch disk util thread: %s\n", strerror(ret)); 1876 return 1; 1877 } 1878 1879 dprint(FD_MUTEX, "wait on startup_mutex\n"); 1880 fio_mutex_down(startup_mutex); 1881 dprint(FD_MUTEX, "done waiting on startup_mutex\n"); 1882 return 0; 1883} 1884 1885int fio_backend(void) 1886{ 1887 struct thread_data *td; 1888 int i; 1889 1890 if (exec_profile) { 1891 if (load_profile(exec_profile)) 1892 return 1; 1893 free(exec_profile); 1894 exec_profile = NULL; 1895 } 1896 if (!thread_number) 1897 return 0; 1898 1899 if (write_bw_log) { 1900 setup_log(&agg_io_log[DDIR_READ], 0, IO_LOG_TYPE_BW); 1901 setup_log(&agg_io_log[DDIR_WRITE], 0, IO_LOG_TYPE_BW); 1902 setup_log(&agg_io_log[DDIR_TRIM], 0, IO_LOG_TYPE_BW); 1903 } 1904 1905 startup_mutex = fio_mutex_init(FIO_MUTEX_LOCKED); 1906 if (startup_mutex == NULL) 1907 return 1; 1908 writeout_mutex = fio_mutex_init(FIO_MUTEX_UNLOCKED); 1909 if (writeout_mutex == NULL) 1910 return 1; 1911 1912 set_genesis_time(); 1913 stat_init(); 1914 create_disk_util_thread(); 1915 1916 cgroup_list = smalloc(sizeof(*cgroup_list)); 1917 INIT_FLIST_HEAD(cgroup_list); 1918 1919 run_threads(); 1920 1921 if (!fio_abort) { 1922 show_run_stats(); 1923 if (write_bw_log) { 1924 __finish_log(agg_io_log[DDIR_READ], "agg-read_bw.log"); 1925 __finish_log(agg_io_log[DDIR_WRITE], 1926 "agg-write_bw.log"); 1927 __finish_log(agg_io_log[DDIR_TRIM], 1928 "agg-write_bw.log"); 1929 } 1930 } 1931 1932 for_each_td(td, i) 1933 fio_options_free(td); 1934 1935 free_disk_util(); 1936 cgroup_kill(cgroup_list); 1937 sfree(cgroup_list); 1938 sfree(cgroup_mnt); 1939 1940 fio_mutex_remove(startup_mutex); 1941 fio_mutex_remove(writeout_mutex); 1942 fio_mutex_remove(disk_thread_mutex); 1943 stat_exit(); 1944 return exit_value; 1945} 1946