backend.c revision 36d80bc7c7f7fbc2612941b7dd7ceaf645798c7f
1/* 2 * fio - the flexible io tester 3 * 4 * Copyright (C) 2005 Jens Axboe <axboe@suse.de> 5 * Copyright (C) 2006-2012 Jens Axboe <axboe@kernel.dk> 6 * 7 * The license below covers all files distributed with fio unless otherwise 8 * noted in the file itself. 9 * 10 * This program is free software; you can redistribute it and/or modify 11 * it under the terms of the GNU General Public License version 2 as 12 * published by the Free Software Foundation. 13 * 14 * This program is distributed in the hope that it will be useful, 15 * but WITHOUT ANY WARRANTY; without even the implied warranty of 16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 * GNU General Public License for more details. 18 * 19 * You should have received a copy of the GNU General Public License 20 * along with this program; if not, write to the Free Software 21 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 22 * 23 */ 24#include <unistd.h> 25#include <fcntl.h> 26#include <string.h> 27#include <limits.h> 28#include <signal.h> 29#include <time.h> 30#include <locale.h> 31#include <assert.h> 32#include <time.h> 33#include <inttypes.h> 34#include <sys/stat.h> 35#include <sys/wait.h> 36#include <sys/ipc.h> 37#ifndef FIO_NO_HAVE_SHM_H 38#include <sys/shm.h> 39#endif 40#include <sys/mman.h> 41 42#include "fio.h" 43#include "hash.h" 44#include "smalloc.h" 45#include "verify.h" 46#include "trim.h" 47#include "diskutil.h" 48#include "cgroup.h" 49#include "profile.h" 50#include "lib/rand.h" 51#include "memalign.h" 52#include "server.h" 53 54static pthread_t disk_util_thread; 55static struct fio_mutex *disk_thread_mutex; 56static struct fio_mutex *startup_mutex; 57static struct fio_mutex *writeout_mutex; 58static struct flist_head *cgroup_list; 59static char *cgroup_mnt; 60static int exit_value; 61static volatile int fio_abort; 62 63struct io_log *agg_io_log[DDIR_RWDIR_CNT]; 64 65int groupid = 0; 66unsigned int thread_number = 0; 67unsigned int stat_number = 0; 68unsigned int nr_process = 0; 69unsigned int nr_thread = 0; 70int shm_id = 0; 71int temp_stall_ts; 72unsigned long done_secs = 0; 73volatile int disk_util_exit = 0; 74 75#define PAGE_ALIGN(buf) \ 76 (char *) (((uintptr_t) (buf) + page_mask) & ~page_mask) 77 78#define JOB_START_TIMEOUT (5 * 1000) 79 80static void sig_int(int sig) 81{ 82 if (threads) { 83 if (is_backend) 84 fio_server_got_signal(sig); 85 else { 86 log_info("\nfio: terminating on signal %d\n", sig); 87 fflush(stdout); 88 exit_value = 128; 89 } 90 91 fio_terminate_threads(TERMINATE_ALL); 92 } 93} 94 95static void sig_show_status(int sig) 96{ 97 show_running_run_stats(); 98} 99 100static void set_sig_handlers(void) 101{ 102 struct sigaction act; 103 104 memset(&act, 0, sizeof(act)); 105 act.sa_handler = sig_int; 106 act.sa_flags = SA_RESTART; 107 sigaction(SIGINT, &act, NULL); 108 109 memset(&act, 0, sizeof(act)); 110 act.sa_handler = sig_int; 111 act.sa_flags = SA_RESTART; 112 sigaction(SIGTERM, &act, NULL); 113 114/* Windows uses SIGBREAK as a quit signal from other applications */ 115#ifdef WIN32 116 memset(&act, 0, sizeof(act)); 117 act.sa_handler = sig_int; 118 act.sa_flags = SA_RESTART; 119 sigaction(SIGBREAK, &act, NULL); 120#endif 121 122 memset(&act, 0, sizeof(act)); 123 act.sa_handler = sig_show_status; 124 act.sa_flags = SA_RESTART; 125 sigaction(SIGUSR1, &act, NULL); 126 127 if (is_backend) { 128 memset(&act, 0, sizeof(act)); 129 act.sa_handler = sig_int; 130 act.sa_flags = SA_RESTART; 131 sigaction(SIGPIPE, &act, NULL); 132 } 133} 134 135/* 136 * Check if we are above the minimum rate given. 137 */ 138static int __check_min_rate(struct thread_data *td, struct timeval *now, 139 enum fio_ddir ddir) 140{ 141 unsigned long long bytes = 0; 142 unsigned long iops = 0; 143 unsigned long spent; 144 unsigned long rate; 145 unsigned int ratemin = 0; 146 unsigned int rate_iops = 0; 147 unsigned int rate_iops_min = 0; 148 149 assert(ddir_rw(ddir)); 150 151 if (!td->o.ratemin[ddir] && !td->o.rate_iops_min[ddir]) 152 return 0; 153 154 /* 155 * allow a 2 second settle period in the beginning 156 */ 157 if (mtime_since(&td->start, now) < 2000) 158 return 0; 159 160 iops += td->this_io_blocks[ddir]; 161 bytes += td->this_io_bytes[ddir]; 162 ratemin += td->o.ratemin[ddir]; 163 rate_iops += td->o.rate_iops[ddir]; 164 rate_iops_min += td->o.rate_iops_min[ddir]; 165 166 /* 167 * if rate blocks is set, sample is running 168 */ 169 if (td->rate_bytes[ddir] || td->rate_blocks[ddir]) { 170 spent = mtime_since(&td->lastrate[ddir], now); 171 if (spent < td->o.ratecycle) 172 return 0; 173 174 if (td->o.rate[ddir]) { 175 /* 176 * check bandwidth specified rate 177 */ 178 if (bytes < td->rate_bytes[ddir]) { 179 log_err("%s: min rate %u not met\n", td->o.name, 180 ratemin); 181 return 1; 182 } else { 183 rate = ((bytes - td->rate_bytes[ddir]) * 1000) / spent; 184 if (rate < ratemin || 185 bytes < td->rate_bytes[ddir]) { 186 log_err("%s: min rate %u not met, got" 187 " %luKB/sec\n", td->o.name, 188 ratemin, rate); 189 return 1; 190 } 191 } 192 } else { 193 /* 194 * checks iops specified rate 195 */ 196 if (iops < rate_iops) { 197 log_err("%s: min iops rate %u not met\n", 198 td->o.name, rate_iops); 199 return 1; 200 } else { 201 rate = ((iops - td->rate_blocks[ddir]) * 1000) / spent; 202 if (rate < rate_iops_min || 203 iops < td->rate_blocks[ddir]) { 204 log_err("%s: min iops rate %u not met," 205 " got %lu\n", td->o.name, 206 rate_iops_min, rate); 207 } 208 } 209 } 210 } 211 212 td->rate_bytes[ddir] = bytes; 213 td->rate_blocks[ddir] = iops; 214 memcpy(&td->lastrate[ddir], now, sizeof(*now)); 215 return 0; 216} 217 218static int check_min_rate(struct thread_data *td, struct timeval *now, 219 unsigned long *bytes_done) 220{ 221 int ret = 0; 222 223 if (bytes_done[DDIR_READ]) 224 ret |= __check_min_rate(td, now, DDIR_READ); 225 if (bytes_done[DDIR_WRITE]) 226 ret |= __check_min_rate(td, now, DDIR_WRITE); 227 if (bytes_done[DDIR_TRIM]) 228 ret |= __check_min_rate(td, now, DDIR_TRIM); 229 230 return ret; 231} 232 233/* 234 * When job exits, we can cancel the in-flight IO if we are using async 235 * io. Attempt to do so. 236 */ 237static void cleanup_pending_aio(struct thread_data *td) 238{ 239 struct flist_head *entry, *n; 240 struct io_u *io_u; 241 int r; 242 243 /* 244 * get immediately available events, if any 245 */ 246 r = io_u_queued_complete(td, 0, NULL); 247 if (r < 0) 248 return; 249 250 /* 251 * now cancel remaining active events 252 */ 253 if (td->io_ops->cancel) { 254 flist_for_each_safe(entry, n, &td->io_u_busylist) { 255 io_u = flist_entry(entry, struct io_u, list); 256 257 /* 258 * if the io_u isn't in flight, then that generally 259 * means someone leaked an io_u. complain but fix 260 * it up, so we don't stall here. 261 */ 262 if ((io_u->flags & IO_U_F_FLIGHT) == 0) { 263 log_err("fio: non-busy IO on busy list\n"); 264 put_io_u(td, io_u); 265 } else { 266 r = td->io_ops->cancel(td, io_u); 267 if (!r) 268 put_io_u(td, io_u); 269 } 270 } 271 } 272 273 if (td->cur_depth) 274 r = io_u_queued_complete(td, td->cur_depth, NULL); 275} 276 277/* 278 * Helper to handle the final sync of a file. Works just like the normal 279 * io path, just does everything sync. 280 */ 281static int fio_io_sync(struct thread_data *td, struct fio_file *f) 282{ 283 struct io_u *io_u = __get_io_u(td); 284 int ret; 285 286 if (!io_u) 287 return 1; 288 289 io_u->ddir = DDIR_SYNC; 290 io_u->file = f; 291 292 if (td_io_prep(td, io_u)) { 293 put_io_u(td, io_u); 294 return 1; 295 } 296 297requeue: 298 ret = td_io_queue(td, io_u); 299 if (ret < 0) { 300 td_verror(td, io_u->error, "td_io_queue"); 301 put_io_u(td, io_u); 302 return 1; 303 } else if (ret == FIO_Q_QUEUED) { 304 if (io_u_queued_complete(td, 1, NULL) < 0) 305 return 1; 306 } else if (ret == FIO_Q_COMPLETED) { 307 if (io_u->error) { 308 td_verror(td, io_u->error, "td_io_queue"); 309 return 1; 310 } 311 312 if (io_u_sync_complete(td, io_u, NULL) < 0) 313 return 1; 314 } else if (ret == FIO_Q_BUSY) { 315 if (td_io_commit(td)) 316 return 1; 317 goto requeue; 318 } 319 320 return 0; 321} 322 323static inline void __update_tv_cache(struct thread_data *td) 324{ 325 fio_gettime(&td->tv_cache, NULL); 326} 327 328static inline void update_tv_cache(struct thread_data *td) 329{ 330 if ((++td->tv_cache_nr & td->tv_cache_mask) == td->tv_cache_mask) 331 __update_tv_cache(td); 332} 333 334static inline int runtime_exceeded(struct thread_data *td, struct timeval *t) 335{ 336 if (in_ramp_time(td)) 337 return 0; 338 if (!td->o.timeout) 339 return 0; 340 if (mtime_since(&td->epoch, t) >= td->o.timeout * 1000) 341 return 1; 342 343 return 0; 344} 345 346static int break_on_this_error(struct thread_data *td, enum fio_ddir ddir, 347 int *retptr) 348{ 349 int ret = *retptr; 350 351 if (ret < 0 || td->error) { 352 int err = td->error; 353 enum error_type_bit eb; 354 355 if (ret < 0) 356 err = -ret; 357 358 eb = td_error_type(ddir, err); 359 if (!(td->o.continue_on_error & (1 << eb))) 360 return 1; 361 362 if (td_non_fatal_error(td, eb, err)) { 363 /* 364 * Continue with the I/Os in case of 365 * a non fatal error. 366 */ 367 update_error_count(td, err); 368 td_clear_error(td); 369 *retptr = 0; 370 return 0; 371 } else if (td->o.fill_device && err == ENOSPC) { 372 /* 373 * We expect to hit this error if 374 * fill_device option is set. 375 */ 376 td_clear_error(td); 377 td->terminate = 1; 378 return 1; 379 } else { 380 /* 381 * Stop the I/O in case of a fatal 382 * error. 383 */ 384 update_error_count(td, err); 385 return 1; 386 } 387 } 388 389 return 0; 390} 391 392/* 393 * The main verify engine. Runs over the writes we previously submitted, 394 * reads the blocks back in, and checks the crc/md5 of the data. 395 */ 396static void do_verify(struct thread_data *td) 397{ 398 struct fio_file *f; 399 struct io_u *io_u; 400 int ret, min_events; 401 unsigned int i; 402 403 dprint(FD_VERIFY, "starting loop\n"); 404 405 /* 406 * sync io first and invalidate cache, to make sure we really 407 * read from disk. 408 */ 409 for_each_file(td, f, i) { 410 if (!fio_file_open(f)) 411 continue; 412 if (fio_io_sync(td, f)) 413 break; 414 if (file_invalidate_cache(td, f)) 415 break; 416 } 417 418 if (td->error) 419 return; 420 421 td_set_runstate(td, TD_VERIFYING); 422 423 io_u = NULL; 424 while (!td->terminate) { 425 int ret2, full; 426 427 update_tv_cache(td); 428 429 if (runtime_exceeded(td, &td->tv_cache)) { 430 __update_tv_cache(td); 431 if (runtime_exceeded(td, &td->tv_cache)) { 432 td->terminate = 1; 433 break; 434 } 435 } 436 437 if (flow_threshold_exceeded(td)) 438 continue; 439 440 io_u = __get_io_u(td); 441 if (!io_u) 442 break; 443 444 if (get_next_verify(td, io_u)) { 445 put_io_u(td, io_u); 446 break; 447 } 448 449 if (td_io_prep(td, io_u)) { 450 put_io_u(td, io_u); 451 break; 452 } 453 454 if (td->o.verify_async) 455 io_u->end_io = verify_io_u_async; 456 else 457 io_u->end_io = verify_io_u; 458 459 ret = td_io_queue(td, io_u); 460 switch (ret) { 461 case FIO_Q_COMPLETED: 462 if (io_u->error) { 463 ret = -io_u->error; 464 clear_io_u(td, io_u); 465 } else if (io_u->resid) { 466 int bytes = io_u->xfer_buflen - io_u->resid; 467 468 /* 469 * zero read, fail 470 */ 471 if (!bytes) { 472 td_verror(td, EIO, "full resid"); 473 put_io_u(td, io_u); 474 break; 475 } 476 477 io_u->xfer_buflen = io_u->resid; 478 io_u->xfer_buf += bytes; 479 io_u->offset += bytes; 480 481 if (ddir_rw(io_u->ddir)) 482 td->ts.short_io_u[io_u->ddir]++; 483 484 f = io_u->file; 485 if (io_u->offset == f->real_file_size) 486 goto sync_done; 487 488 requeue_io_u(td, &io_u); 489 } else { 490sync_done: 491 ret = io_u_sync_complete(td, io_u, NULL); 492 if (ret < 0) 493 break; 494 } 495 continue; 496 case FIO_Q_QUEUED: 497 break; 498 case FIO_Q_BUSY: 499 requeue_io_u(td, &io_u); 500 ret2 = td_io_commit(td); 501 if (ret2 < 0) 502 ret = ret2; 503 break; 504 default: 505 assert(ret < 0); 506 td_verror(td, -ret, "td_io_queue"); 507 break; 508 } 509 510 if (break_on_this_error(td, io_u->ddir, &ret)) 511 break; 512 513 /* 514 * if we can queue more, do so. but check if there are 515 * completed io_u's first. Note that we can get BUSY even 516 * without IO queued, if the system is resource starved. 517 */ 518 full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth); 519 if (full || !td->o.iodepth_batch_complete) { 520 min_events = min(td->o.iodepth_batch_complete, 521 td->cur_depth); 522 /* 523 * if the queue is full, we MUST reap at least 1 event 524 */ 525 if (full && !min_events) 526 min_events = 1; 527 528 do { 529 /* 530 * Reap required number of io units, if any, 531 * and do the verification on them through 532 * the callback handler 533 */ 534 if (io_u_queued_complete(td, min_events, NULL) < 0) { 535 ret = -1; 536 break; 537 } 538 } while (full && (td->cur_depth > td->o.iodepth_low)); 539 } 540 if (ret < 0) 541 break; 542 } 543 544 if (!td->error) { 545 min_events = td->cur_depth; 546 547 if (min_events) 548 ret = io_u_queued_complete(td, min_events, NULL); 549 } else 550 cleanup_pending_aio(td); 551 552 td_set_runstate(td, TD_RUNNING); 553 554 dprint(FD_VERIFY, "exiting loop\n"); 555} 556 557static int io_bytes_exceeded(struct thread_data *td) 558{ 559 unsigned long long bytes; 560 561 if (td_rw(td)) 562 bytes = td->this_io_bytes[DDIR_READ] + td->this_io_bytes[DDIR_WRITE]; 563 else if (td_write(td)) 564 bytes = td->this_io_bytes[DDIR_WRITE]; 565 else if (td_read(td)) 566 bytes = td->this_io_bytes[DDIR_READ]; 567 else 568 bytes = td->this_io_bytes[DDIR_TRIM]; 569 570 return bytes >= td->o.size; 571} 572 573/* 574 * Main IO worker function. It retrieves io_u's to process and queues 575 * and reaps them, checking for rate and errors along the way. 576 */ 577static void do_io(struct thread_data *td) 578{ 579 unsigned int i; 580 int ret = 0; 581 582 if (in_ramp_time(td)) 583 td_set_runstate(td, TD_RAMP); 584 else 585 td_set_runstate(td, TD_RUNNING); 586 587 while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) || 588 (!flist_empty(&td->trim_list)) || !io_bytes_exceeded(td) || 589 td->o.time_based) { 590 struct timeval comp_time; 591 unsigned long bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 }; 592 int min_evts = 0; 593 struct io_u *io_u; 594 int ret2, full; 595 enum fio_ddir ddir; 596 597 if (td->terminate || td->done) 598 break; 599 600 update_tv_cache(td); 601 602 if (runtime_exceeded(td, &td->tv_cache)) { 603 __update_tv_cache(td); 604 if (runtime_exceeded(td, &td->tv_cache)) { 605 td->terminate = 1; 606 break; 607 } 608 } 609 610 if (flow_threshold_exceeded(td)) 611 continue; 612 613 io_u = get_io_u(td); 614 if (!io_u) 615 break; 616 617 ddir = io_u->ddir; 618 619 /* 620 * Add verification end_io handler if: 621 * - Asked to verify (!td_rw(td)) 622 * - Or the io_u is from our verify list (mixed write/ver) 623 */ 624 if (td->o.verify != VERIFY_NONE && io_u->ddir == DDIR_READ && 625 ((io_u->flags & IO_U_F_VER_LIST) || !td_rw(td))) { 626 if (td->o.verify_async) 627 io_u->end_io = verify_io_u_async; 628 else 629 io_u->end_io = verify_io_u; 630 td_set_runstate(td, TD_VERIFYING); 631 } else if (in_ramp_time(td)) 632 td_set_runstate(td, TD_RAMP); 633 else 634 td_set_runstate(td, TD_RUNNING); 635 636 ret = td_io_queue(td, io_u); 637 switch (ret) { 638 case FIO_Q_COMPLETED: 639 if (io_u->error) { 640 ret = -io_u->error; 641 clear_io_u(td, io_u); 642 } else if (io_u->resid) { 643 int bytes = io_u->xfer_buflen - io_u->resid; 644 struct fio_file *f = io_u->file; 645 646 /* 647 * zero read, fail 648 */ 649 if (!bytes) { 650 td_verror(td, EIO, "full resid"); 651 put_io_u(td, io_u); 652 break; 653 } 654 655 io_u->xfer_buflen = io_u->resid; 656 io_u->xfer_buf += bytes; 657 io_u->offset += bytes; 658 659 if (ddir_rw(io_u->ddir)) 660 td->ts.short_io_u[io_u->ddir]++; 661 662 if (io_u->offset == f->real_file_size) 663 goto sync_done; 664 665 requeue_io_u(td, &io_u); 666 } else { 667sync_done: 668 if (__should_check_rate(td, DDIR_READ) || 669 __should_check_rate(td, DDIR_WRITE) || 670 __should_check_rate(td, DDIR_TRIM)) 671 fio_gettime(&comp_time, NULL); 672 673 ret = io_u_sync_complete(td, io_u, bytes_done); 674 if (ret < 0) 675 break; 676 } 677 break; 678 case FIO_Q_QUEUED: 679 /* 680 * if the engine doesn't have a commit hook, 681 * the io_u is really queued. if it does have such 682 * a hook, it has to call io_u_queued() itself. 683 */ 684 if (td->io_ops->commit == NULL) 685 io_u_queued(td, io_u); 686 break; 687 case FIO_Q_BUSY: 688 requeue_io_u(td, &io_u); 689 ret2 = td_io_commit(td); 690 if (ret2 < 0) 691 ret = ret2; 692 break; 693 default: 694 assert(ret < 0); 695 put_io_u(td, io_u); 696 break; 697 } 698 699 if (break_on_this_error(td, ddir, &ret)) 700 break; 701 702 /* 703 * See if we need to complete some commands. Note that we 704 * can get BUSY even without IO queued, if the system is 705 * resource starved. 706 */ 707 full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth); 708 if (full || !td->o.iodepth_batch_complete) { 709 min_evts = min(td->o.iodepth_batch_complete, 710 td->cur_depth); 711 /* 712 * if the queue is full, we MUST reap at least 1 event 713 */ 714 if (full && !min_evts) 715 min_evts = 1; 716 717 if (__should_check_rate(td, DDIR_READ) || 718 __should_check_rate(td, DDIR_WRITE) || 719 __should_check_rate(td, DDIR_TRIM)) 720 fio_gettime(&comp_time, NULL); 721 722 do { 723 ret = io_u_queued_complete(td, min_evts, bytes_done); 724 if (ret < 0) 725 break; 726 727 } while (full && (td->cur_depth > td->o.iodepth_low)); 728 } 729 730 if (ret < 0) 731 break; 732 if (!ddir_rw_sum(bytes_done) && !(td->io_ops->flags & FIO_NOIO)) 733 continue; 734 735 if (!in_ramp_time(td) && should_check_rate(td, bytes_done)) { 736 if (check_min_rate(td, &comp_time, bytes_done)) { 737 if (exitall_on_terminate) 738 fio_terminate_threads(td->groupid); 739 td_verror(td, EIO, "check_min_rate"); 740 break; 741 } 742 } 743 744 if (td->o.thinktime) { 745 unsigned long long b; 746 747 b = ddir_rw_sum(td->io_blocks); 748 if (!(b % td->o.thinktime_blocks)) { 749 int left; 750 751 if (td->o.thinktime_spin) 752 usec_spin(td->o.thinktime_spin); 753 754 left = td->o.thinktime - td->o.thinktime_spin; 755 if (left) 756 usec_sleep(td, left); 757 } 758 } 759 } 760 761 if (td->trim_entries) 762 log_err("fio: %d trim entries leaked?\n", td->trim_entries); 763 764 if (td->o.fill_device && td->error == ENOSPC) { 765 td->error = 0; 766 td->terminate = 1; 767 } 768 if (!td->error) { 769 struct fio_file *f; 770 771 i = td->cur_depth; 772 if (i) { 773 ret = io_u_queued_complete(td, i, NULL); 774 if (td->o.fill_device && td->error == ENOSPC) 775 td->error = 0; 776 } 777 778 if (should_fsync(td) && td->o.end_fsync) { 779 td_set_runstate(td, TD_FSYNCING); 780 781 for_each_file(td, f, i) { 782 if (!fio_file_open(f)) 783 continue; 784 fio_io_sync(td, f); 785 } 786 } 787 } else 788 cleanup_pending_aio(td); 789 790 /* 791 * stop job if we failed doing any IO 792 */ 793 if (!ddir_rw_sum(td->this_io_bytes)) 794 td->done = 1; 795} 796 797static void cleanup_io_u(struct thread_data *td) 798{ 799 struct flist_head *entry, *n; 800 struct io_u *io_u; 801 802 flist_for_each_safe(entry, n, &td->io_u_freelist) { 803 io_u = flist_entry(entry, struct io_u, list); 804 805 flist_del(&io_u->list); 806 fio_memfree(io_u, sizeof(*io_u)); 807 } 808 809 free_io_mem(td); 810} 811 812static int init_io_u(struct thread_data *td) 813{ 814 struct io_u *io_u; 815 unsigned int max_bs, min_write; 816 int cl_align, i, max_units; 817 int data_xfer = 1; 818 char *p; 819 820 max_units = td->o.iodepth; 821 max_bs = max(td->o.max_bs[DDIR_READ], td->o.max_bs[DDIR_WRITE]); 822 max_bs = max(td->o.max_bs[DDIR_TRIM], max_bs); 823 min_write = td->o.min_bs[DDIR_WRITE]; 824 td->orig_buffer_size = (unsigned long long) max_bs 825 * (unsigned long long) max_units; 826 827 if ((td->io_ops->flags & FIO_NOIO) || !(td_read(td) || td_write(td))) 828 data_xfer = 0; 829 830 if (td->o.mem_type == MEM_SHMHUGE || td->o.mem_type == MEM_MMAPHUGE) { 831 unsigned long bs; 832 833 bs = td->orig_buffer_size + td->o.hugepage_size - 1; 834 td->orig_buffer_size = bs & ~(td->o.hugepage_size - 1); 835 } 836 837 if (td->orig_buffer_size != (size_t) td->orig_buffer_size) { 838 log_err("fio: IO memory too large. Reduce max_bs or iodepth\n"); 839 return 1; 840 } 841 842 if (data_xfer && allocate_io_mem(td)) 843 return 1; 844 845 if (td->o.odirect || td->o.mem_align || 846 (td->io_ops->flags & FIO_RAWIO)) 847 p = PAGE_ALIGN(td->orig_buffer) + td->o.mem_align; 848 else 849 p = td->orig_buffer; 850 851 cl_align = os_cache_line_size(); 852 853 for (i = 0; i < max_units; i++) { 854 void *ptr; 855 856 if (td->terminate) 857 return 1; 858 859 ptr = fio_memalign(cl_align, sizeof(*io_u)); 860 if (!ptr) { 861 log_err("fio: unable to allocate aligned memory\n"); 862 break; 863 } 864 865 io_u = ptr; 866 memset(io_u, 0, sizeof(*io_u)); 867 INIT_FLIST_HEAD(&io_u->list); 868 dprint(FD_MEM, "io_u alloc %p, index %u\n", io_u, i); 869 870 if (data_xfer) { 871 io_u->buf = p; 872 dprint(FD_MEM, "io_u %p, mem %p\n", io_u, io_u->buf); 873 874 if (td_write(td)) 875 io_u_fill_buffer(td, io_u, min_write, max_bs); 876 if (td_write(td) && td->o.verify_pattern_bytes) { 877 /* 878 * Fill the buffer with the pattern if we are 879 * going to be doing writes. 880 */ 881 fill_pattern(td, io_u->buf, max_bs, io_u, 0, 0); 882 } 883 } 884 885 io_u->index = i; 886 io_u->flags = IO_U_F_FREE; 887 flist_add(&io_u->list, &td->io_u_freelist); 888 p += max_bs; 889 } 890 891 return 0; 892} 893 894static int switch_ioscheduler(struct thread_data *td) 895{ 896 char tmp[256], tmp2[128]; 897 FILE *f; 898 int ret; 899 900 if (td->io_ops->flags & FIO_DISKLESSIO) 901 return 0; 902 903 sprintf(tmp, "%s/queue/scheduler", td->sysfs_root); 904 905 f = fopen(tmp, "r+"); 906 if (!f) { 907 if (errno == ENOENT) { 908 log_err("fio: os or kernel doesn't support IO scheduler" 909 " switching\n"); 910 return 0; 911 } 912 td_verror(td, errno, "fopen iosched"); 913 return 1; 914 } 915 916 /* 917 * Set io scheduler. 918 */ 919 ret = fwrite(td->o.ioscheduler, strlen(td->o.ioscheduler), 1, f); 920 if (ferror(f) || ret != 1) { 921 td_verror(td, errno, "fwrite"); 922 fclose(f); 923 return 1; 924 } 925 926 rewind(f); 927 928 /* 929 * Read back and check that the selected scheduler is now the default. 930 */ 931 ret = fread(tmp, 1, sizeof(tmp), f); 932 if (ferror(f) || ret < 0) { 933 td_verror(td, errno, "fread"); 934 fclose(f); 935 return 1; 936 } 937 938 sprintf(tmp2, "[%s]", td->o.ioscheduler); 939 if (!strstr(tmp, tmp2)) { 940 log_err("fio: io scheduler %s not found\n", td->o.ioscheduler); 941 td_verror(td, EINVAL, "iosched_switch"); 942 fclose(f); 943 return 1; 944 } 945 946 fclose(f); 947 return 0; 948} 949 950static int keep_running(struct thread_data *td) 951{ 952 if (td->done) 953 return 0; 954 if (td->o.time_based) 955 return 1; 956 if (td->o.loops) { 957 td->o.loops--; 958 return 1; 959 } 960 961 if (ddir_rw_sum(td->io_bytes) < td->o.size) 962 return 1; 963 964 return 0; 965} 966 967static int exec_string(const char *string) 968{ 969 int ret, newlen = strlen(string) + 1 + 8; 970 char *str; 971 972 str = malloc(newlen); 973 sprintf(str, "sh -c %s", string); 974 975 ret = system(str); 976 if (ret == -1) 977 log_err("fio: exec of cmd <%s> failed\n", str); 978 979 free(str); 980 return ret; 981} 982 983/* 984 * Entry point for the thread based jobs. The process based jobs end up 985 * here as well, after a little setup. 986 */ 987static void *thread_main(void *data) 988{ 989 unsigned long long elapsed; 990 struct thread_data *td = data; 991 pthread_condattr_t attr; 992 int clear_state; 993 994 if (!td->o.use_thread) { 995 setsid(); 996 td->pid = getpid(); 997 } else 998 td->pid = gettid(); 999 1000 dprint(FD_PROCESS, "jobs pid=%d started\n", (int) td->pid); 1001 1002 INIT_FLIST_HEAD(&td->io_u_freelist); 1003 INIT_FLIST_HEAD(&td->io_u_busylist); 1004 INIT_FLIST_HEAD(&td->io_u_requeues); 1005 INIT_FLIST_HEAD(&td->io_log_list); 1006 INIT_FLIST_HEAD(&td->io_hist_list); 1007 INIT_FLIST_HEAD(&td->verify_list); 1008 INIT_FLIST_HEAD(&td->trim_list); 1009 pthread_mutex_init(&td->io_u_lock, NULL); 1010 td->io_hist_tree = RB_ROOT; 1011 1012 pthread_condattr_init(&attr); 1013 pthread_cond_init(&td->verify_cond, &attr); 1014 pthread_cond_init(&td->free_cond, &attr); 1015 1016 td_set_runstate(td, TD_INITIALIZED); 1017 dprint(FD_MUTEX, "up startup_mutex\n"); 1018 fio_mutex_up(startup_mutex); 1019 dprint(FD_MUTEX, "wait on td->mutex\n"); 1020 fio_mutex_down(td->mutex); 1021 dprint(FD_MUTEX, "done waiting on td->mutex\n"); 1022 1023 /* 1024 * the ->mutex mutex is now no longer used, close it to avoid 1025 * eating a file descriptor 1026 */ 1027 fio_mutex_remove(td->mutex); 1028 1029 /* 1030 * A new gid requires privilege, so we need to do this before setting 1031 * the uid. 1032 */ 1033 if (td->o.gid != -1U && setgid(td->o.gid)) { 1034 td_verror(td, errno, "setgid"); 1035 goto err; 1036 } 1037 if (td->o.uid != -1U && setuid(td->o.uid)) { 1038 td_verror(td, errno, "setuid"); 1039 goto err; 1040 } 1041 1042 /* 1043 * If we have a gettimeofday() thread, make sure we exclude that 1044 * thread from this job 1045 */ 1046 if (td->o.gtod_cpu) 1047 fio_cpu_clear(&td->o.cpumask, td->o.gtod_cpu); 1048 1049 /* 1050 * Set affinity first, in case it has an impact on the memory 1051 * allocations. 1052 */ 1053 if (td->o.cpumask_set && fio_setaffinity(td->pid, td->o.cpumask) == -1) { 1054 td_verror(td, errno, "cpu_set_affinity"); 1055 goto err; 1056 } 1057 1058#ifdef FIO_HAVE_LIBNUMA 1059 /* numa node setup */ 1060 if (td->o.numa_cpumask_set || td->o.numa_memmask_set) { 1061 int ret; 1062 1063 if (numa_available() < 0) { 1064 td_verror(td, errno, "Does not support NUMA API\n"); 1065 goto err; 1066 } 1067 1068 if (td->o.numa_cpumask_set) { 1069 ret = numa_run_on_node_mask(td->o.numa_cpunodesmask); 1070 if (ret == -1) { 1071 td_verror(td, errno, \ 1072 "numa_run_on_node_mask failed\n"); 1073 goto err; 1074 } 1075 } 1076 1077 if (td->o.numa_memmask_set) { 1078 1079 switch (td->o.numa_mem_mode) { 1080 case MPOL_INTERLEAVE: 1081 numa_set_interleave_mask(td->o.numa_memnodesmask); 1082 break; 1083 case MPOL_BIND: 1084 numa_set_membind(td->o.numa_memnodesmask); 1085 break; 1086 case MPOL_LOCAL: 1087 numa_set_localalloc(); 1088 break; 1089 case MPOL_PREFERRED: 1090 numa_set_preferred(td->o.numa_mem_prefer_node); 1091 break; 1092 case MPOL_DEFAULT: 1093 default: 1094 break; 1095 } 1096 1097 } 1098 } 1099#endif 1100 1101 /* 1102 * May alter parameters that init_io_u() will use, so we need to 1103 * do this first. 1104 */ 1105 if (init_iolog(td)) 1106 goto err; 1107 1108 if (init_io_u(td)) 1109 goto err; 1110 1111 if (td->o.verify_async && verify_async_init(td)) 1112 goto err; 1113 1114 if (td->ioprio_set) { 1115 if (ioprio_set(IOPRIO_WHO_PROCESS, 0, td->ioprio) == -1) { 1116 td_verror(td, errno, "ioprio_set"); 1117 goto err; 1118 } 1119 } 1120 1121 if (td->o.cgroup && cgroup_setup(td, cgroup_list, &cgroup_mnt)) 1122 goto err; 1123 1124 errno = 0; 1125 if (nice(td->o.nice) == -1 && errno != 0) { 1126 td_verror(td, errno, "nice"); 1127 goto err; 1128 } 1129 1130 if (td->o.ioscheduler && switch_ioscheduler(td)) 1131 goto err; 1132 1133 if (!td->o.create_serialize && setup_files(td)) 1134 goto err; 1135 1136 if (td_io_init(td)) 1137 goto err; 1138 1139 if (init_random_map(td)) 1140 goto err; 1141 1142 if (td->o.exec_prerun) { 1143 if (exec_string(td->o.exec_prerun)) 1144 goto err; 1145 } 1146 1147 if (td->o.pre_read) { 1148 if (pre_read_files(td) < 0) 1149 goto err; 1150 } 1151 1152 fio_gettime(&td->epoch, NULL); 1153 getrusage(RUSAGE_SELF, &td->ru_start); 1154 1155 clear_state = 0; 1156 while (keep_running(td)) { 1157 fio_gettime(&td->start, NULL); 1158 memcpy(&td->bw_sample_time, &td->start, sizeof(td->start)); 1159 memcpy(&td->iops_sample_time, &td->start, sizeof(td->start)); 1160 memcpy(&td->tv_cache, &td->start, sizeof(td->start)); 1161 1162 if (td->o.ratemin[DDIR_READ] || td->o.ratemin[DDIR_WRITE] || 1163 td->o.ratemin[DDIR_TRIM]) { 1164 memcpy(&td->lastrate[DDIR_READ], &td->bw_sample_time, 1165 sizeof(td->bw_sample_time)); 1166 memcpy(&td->lastrate[DDIR_WRITE], &td->bw_sample_time, 1167 sizeof(td->bw_sample_time)); 1168 memcpy(&td->lastrate[DDIR_TRIM], &td->bw_sample_time, 1169 sizeof(td->bw_sample_time)); 1170 } 1171 1172 if (clear_state) 1173 clear_io_state(td); 1174 1175 prune_io_piece_log(td); 1176 1177 do_io(td); 1178 1179 clear_state = 1; 1180 1181 if (td_read(td) && td->io_bytes[DDIR_READ]) { 1182 elapsed = utime_since_now(&td->start); 1183 td->ts.runtime[DDIR_READ] += elapsed; 1184 } 1185 if (td_write(td) && td->io_bytes[DDIR_WRITE]) { 1186 elapsed = utime_since_now(&td->start); 1187 td->ts.runtime[DDIR_WRITE] += elapsed; 1188 } 1189 if (td_trim(td) && td->io_bytes[DDIR_TRIM]) { 1190 elapsed = utime_since_now(&td->start); 1191 td->ts.runtime[DDIR_TRIM] += elapsed; 1192 } 1193 1194 if (td->error || td->terminate) 1195 break; 1196 1197 if (!td->o.do_verify || 1198 td->o.verify == VERIFY_NONE || 1199 (td->io_ops->flags & FIO_UNIDIR)) 1200 continue; 1201 1202 clear_io_state(td); 1203 1204 fio_gettime(&td->start, NULL); 1205 1206 do_verify(td); 1207 1208 td->ts.runtime[DDIR_READ] += utime_since_now(&td->start); 1209 1210 if (td->error || td->terminate) 1211 break; 1212 } 1213 1214 update_rusage_stat(td); 1215 td->ts.runtime[DDIR_READ] = (td->ts.runtime[DDIR_READ] + 999) / 1000; 1216 td->ts.runtime[DDIR_WRITE] = (td->ts.runtime[DDIR_WRITE] + 999) / 1000; 1217 td->ts.runtime[DDIR_TRIM] = (td->ts.runtime[DDIR_TRIM] + 999) / 1000; 1218 td->ts.total_run_time = mtime_since_now(&td->epoch); 1219 td->ts.io_bytes[DDIR_READ] = td->io_bytes[DDIR_READ]; 1220 td->ts.io_bytes[DDIR_WRITE] = td->io_bytes[DDIR_WRITE]; 1221 td->ts.io_bytes[DDIR_TRIM] = td->io_bytes[DDIR_TRIM]; 1222 1223 fio_mutex_down(writeout_mutex); 1224 if (td->bw_log) { 1225 if (td->o.bw_log_file) { 1226 finish_log_named(td, td->bw_log, 1227 td->o.bw_log_file, "bw"); 1228 } else 1229 finish_log(td, td->bw_log, "bw"); 1230 } 1231 if (td->lat_log) { 1232 if (td->o.lat_log_file) { 1233 finish_log_named(td, td->lat_log, 1234 td->o.lat_log_file, "lat"); 1235 } else 1236 finish_log(td, td->lat_log, "lat"); 1237 } 1238 if (td->slat_log) { 1239 if (td->o.lat_log_file) { 1240 finish_log_named(td, td->slat_log, 1241 td->o.lat_log_file, "slat"); 1242 } else 1243 finish_log(td, td->slat_log, "slat"); 1244 } 1245 if (td->clat_log) { 1246 if (td->o.lat_log_file) { 1247 finish_log_named(td, td->clat_log, 1248 td->o.lat_log_file, "clat"); 1249 } else 1250 finish_log(td, td->clat_log, "clat"); 1251 } 1252 if (td->iops_log) { 1253 if (td->o.iops_log_file) { 1254 finish_log_named(td, td->iops_log, 1255 td->o.iops_log_file, "iops"); 1256 } else 1257 finish_log(td, td->iops_log, "iops"); 1258 } 1259 1260 fio_mutex_up(writeout_mutex); 1261 if (td->o.exec_postrun) 1262 exec_string(td->o.exec_postrun); 1263 1264 if (exitall_on_terminate) 1265 fio_terminate_threads(td->groupid); 1266 1267err: 1268 if (td->error) 1269 log_info("fio: pid=%d, err=%d/%s\n", (int) td->pid, td->error, 1270 td->verror); 1271 1272 if (td->o.verify_async) 1273 verify_async_exit(td); 1274 1275 close_and_free_files(td); 1276 close_ioengine(td); 1277 cleanup_io_u(td); 1278 cgroup_shutdown(td, &cgroup_mnt); 1279 1280 if (td->o.cpumask_set) { 1281 int ret = fio_cpuset_exit(&td->o.cpumask); 1282 1283 td_verror(td, ret, "fio_cpuset_exit"); 1284 } 1285 1286 /* 1287 * do this very late, it will log file closing as well 1288 */ 1289 if (td->o.write_iolog_file) 1290 write_iolog_close(td); 1291 1292 td_set_runstate(td, TD_EXITED); 1293 return (void *) (uintptr_t) td->error; 1294} 1295 1296 1297/* 1298 * We cannot pass the td data into a forked process, so attach the td and 1299 * pass it to the thread worker. 1300 */ 1301static int fork_main(int shmid, int offset) 1302{ 1303 struct thread_data *td; 1304 void *data, *ret; 1305 1306#ifndef __hpux 1307 data = shmat(shmid, NULL, 0); 1308 if (data == (void *) -1) { 1309 int __err = errno; 1310 1311 perror("shmat"); 1312 return __err; 1313 } 1314#else 1315 /* 1316 * HP-UX inherits shm mappings? 1317 */ 1318 data = threads; 1319#endif 1320 1321 td = data + offset * sizeof(struct thread_data); 1322 ret = thread_main(td); 1323 shmdt(data); 1324 return (int) (uintptr_t) ret; 1325} 1326 1327/* 1328 * Run over the job map and reap the threads that have exited, if any. 1329 */ 1330static void reap_threads(unsigned int *nr_running, unsigned int *t_rate, 1331 unsigned int *m_rate) 1332{ 1333 struct thread_data *td; 1334 unsigned int cputhreads, realthreads, pending; 1335 int i, status, ret; 1336 1337 /* 1338 * reap exited threads (TD_EXITED -> TD_REAPED) 1339 */ 1340 realthreads = pending = cputhreads = 0; 1341 for_each_td(td, i) { 1342 int flags = 0; 1343 1344 /* 1345 * ->io_ops is NULL for a thread that has closed its 1346 * io engine 1347 */ 1348 if (td->io_ops && !strcmp(td->io_ops->name, "cpuio")) 1349 cputhreads++; 1350 else 1351 realthreads++; 1352 1353 if (!td->pid) { 1354 pending++; 1355 continue; 1356 } 1357 if (td->runstate == TD_REAPED) 1358 continue; 1359 if (td->o.use_thread) { 1360 if (td->runstate == TD_EXITED) { 1361 td_set_runstate(td, TD_REAPED); 1362 goto reaped; 1363 } 1364 continue; 1365 } 1366 1367 flags = WNOHANG; 1368 if (td->runstate == TD_EXITED) 1369 flags = 0; 1370 1371 /* 1372 * check if someone quit or got killed in an unusual way 1373 */ 1374 ret = waitpid(td->pid, &status, flags); 1375 if (ret < 0) { 1376 if (errno == ECHILD) { 1377 log_err("fio: pid=%d disappeared %d\n", 1378 (int) td->pid, td->runstate); 1379 td->sig = ECHILD; 1380 td_set_runstate(td, TD_REAPED); 1381 goto reaped; 1382 } 1383 perror("waitpid"); 1384 } else if (ret == td->pid) { 1385 if (WIFSIGNALED(status)) { 1386 int sig = WTERMSIG(status); 1387 1388 if (sig != SIGTERM && sig != SIGUSR2) 1389 log_err("fio: pid=%d, got signal=%d\n", 1390 (int) td->pid, sig); 1391 td->sig = sig; 1392 td_set_runstate(td, TD_REAPED); 1393 goto reaped; 1394 } 1395 if (WIFEXITED(status)) { 1396 if (WEXITSTATUS(status) && !td->error) 1397 td->error = WEXITSTATUS(status); 1398 1399 td_set_runstate(td, TD_REAPED); 1400 goto reaped; 1401 } 1402 } 1403 1404 /* 1405 * thread is not dead, continue 1406 */ 1407 pending++; 1408 continue; 1409reaped: 1410 (*nr_running)--; 1411 (*m_rate) -= ddir_rw_sum(td->o.ratemin); 1412 (*t_rate) -= ddir_rw_sum(td->o.rate); 1413 if (!td->pid) 1414 pending--; 1415 1416 if (td->error) 1417 exit_value++; 1418 1419 done_secs += mtime_since_now(&td->epoch) / 1000; 1420 } 1421 1422 if (*nr_running == cputhreads && !pending && realthreads) 1423 fio_terminate_threads(TERMINATE_ALL); 1424} 1425 1426/* 1427 * Main function for kicking off and reaping jobs, as needed. 1428 */ 1429static void run_threads(void) 1430{ 1431 struct thread_data *td; 1432 unsigned long spent; 1433 unsigned int i, todo, nr_running, m_rate, t_rate, nr_started; 1434 1435 if (fio_pin_memory()) 1436 return; 1437 1438 if (fio_gtod_offload && fio_start_gtod_thread()) 1439 return; 1440 1441 set_sig_handlers(); 1442 1443 if (output_format == FIO_OUTPUT_NORMAL) { 1444 log_info("Starting "); 1445 if (nr_thread) 1446 log_info("%d thread%s", nr_thread, 1447 nr_thread > 1 ? "s" : ""); 1448 if (nr_process) { 1449 if (nr_thread) 1450 log_info(" and "); 1451 log_info("%d process%s", nr_process, 1452 nr_process > 1 ? "es" : ""); 1453 } 1454 log_info("\n"); 1455 fflush(stdout); 1456 } 1457 1458 todo = thread_number; 1459 nr_running = 0; 1460 nr_started = 0; 1461 m_rate = t_rate = 0; 1462 1463 for_each_td(td, i) { 1464 print_status_init(td->thread_number - 1); 1465 1466 if (!td->o.create_serialize) 1467 continue; 1468 1469 /* 1470 * do file setup here so it happens sequentially, 1471 * we don't want X number of threads getting their 1472 * client data interspersed on disk 1473 */ 1474 if (setup_files(td)) { 1475 exit_value++; 1476 if (td->error) 1477 log_err("fio: pid=%d, err=%d/%s\n", 1478 (int) td->pid, td->error, td->verror); 1479 td_set_runstate(td, TD_REAPED); 1480 todo--; 1481 } else { 1482 struct fio_file *f; 1483 unsigned int j; 1484 1485 /* 1486 * for sharing to work, each job must always open 1487 * its own files. so close them, if we opened them 1488 * for creation 1489 */ 1490 for_each_file(td, f, j) { 1491 if (fio_file_open(f)) 1492 td_io_close_file(td, f); 1493 } 1494 } 1495 } 1496 1497 set_genesis_time(); 1498 1499 while (todo) { 1500 struct thread_data *map[REAL_MAX_JOBS]; 1501 struct timeval this_start; 1502 int this_jobs = 0, left; 1503 1504 /* 1505 * create threads (TD_NOT_CREATED -> TD_CREATED) 1506 */ 1507 for_each_td(td, i) { 1508 if (td->runstate != TD_NOT_CREATED) 1509 continue; 1510 1511 /* 1512 * never got a chance to start, killed by other 1513 * thread for some reason 1514 */ 1515 if (td->terminate) { 1516 todo--; 1517 continue; 1518 } 1519 1520 if (td->o.start_delay) { 1521 spent = mtime_since_genesis(); 1522 1523 if (td->o.start_delay * 1000 > spent) 1524 continue; 1525 } 1526 1527 if (td->o.stonewall && (nr_started || nr_running)) { 1528 dprint(FD_PROCESS, "%s: stonewall wait\n", 1529 td->o.name); 1530 break; 1531 } 1532 1533 init_disk_util(td); 1534 1535 /* 1536 * Set state to created. Thread will transition 1537 * to TD_INITIALIZED when it's done setting up. 1538 */ 1539 td_set_runstate(td, TD_CREATED); 1540 map[this_jobs++] = td; 1541 nr_started++; 1542 1543 if (td->o.use_thread) { 1544 int ret; 1545 1546 dprint(FD_PROCESS, "will pthread_create\n"); 1547 ret = pthread_create(&td->thread, NULL, 1548 thread_main, td); 1549 if (ret) { 1550 log_err("pthread_create: %s\n", 1551 strerror(ret)); 1552 nr_started--; 1553 break; 1554 } 1555 ret = pthread_detach(td->thread); 1556 if (ret) 1557 log_err("pthread_detach: %s", 1558 strerror(ret)); 1559 } else { 1560 pid_t pid; 1561 dprint(FD_PROCESS, "will fork\n"); 1562 pid = fork(); 1563 if (!pid) { 1564 int ret = fork_main(shm_id, i); 1565 1566 _exit(ret); 1567 } else if (i == fio_debug_jobno) 1568 *fio_debug_jobp = pid; 1569 } 1570 dprint(FD_MUTEX, "wait on startup_mutex\n"); 1571 if (fio_mutex_down_timeout(startup_mutex, 10)) { 1572 log_err("fio: job startup hung? exiting.\n"); 1573 fio_terminate_threads(TERMINATE_ALL); 1574 fio_abort = 1; 1575 nr_started--; 1576 break; 1577 } 1578 dprint(FD_MUTEX, "done waiting on startup_mutex\n"); 1579 } 1580 1581 /* 1582 * Wait for the started threads to transition to 1583 * TD_INITIALIZED. 1584 */ 1585 fio_gettime(&this_start, NULL); 1586 left = this_jobs; 1587 while (left && !fio_abort) { 1588 if (mtime_since_now(&this_start) > JOB_START_TIMEOUT) 1589 break; 1590 1591 usleep(100000); 1592 1593 for (i = 0; i < this_jobs; i++) { 1594 td = map[i]; 1595 if (!td) 1596 continue; 1597 if (td->runstate == TD_INITIALIZED) { 1598 map[i] = NULL; 1599 left--; 1600 } else if (td->runstate >= TD_EXITED) { 1601 map[i] = NULL; 1602 left--; 1603 todo--; 1604 nr_running++; /* work-around... */ 1605 } 1606 } 1607 } 1608 1609 if (left) { 1610 log_err("fio: %d job%s failed to start\n", left, 1611 left > 1 ? "s" : ""); 1612 for (i = 0; i < this_jobs; i++) { 1613 td = map[i]; 1614 if (!td) 1615 continue; 1616 kill(td->pid, SIGTERM); 1617 } 1618 break; 1619 } 1620 1621 /* 1622 * start created threads (TD_INITIALIZED -> TD_RUNNING). 1623 */ 1624 for_each_td(td, i) { 1625 if (td->runstate != TD_INITIALIZED) 1626 continue; 1627 1628 if (in_ramp_time(td)) 1629 td_set_runstate(td, TD_RAMP); 1630 else 1631 td_set_runstate(td, TD_RUNNING); 1632 nr_running++; 1633 nr_started--; 1634 m_rate += ddir_rw_sum(td->o.ratemin); 1635 t_rate += ddir_rw_sum(td->o.rate); 1636 todo--; 1637 fio_mutex_up(td->mutex); 1638 } 1639 1640 reap_threads(&nr_running, &t_rate, &m_rate); 1641 1642 if (todo) { 1643 if (is_backend) 1644 fio_server_idle_loop(); 1645 else 1646 usleep(100000); 1647 } 1648 } 1649 1650 while (nr_running) { 1651 reap_threads(&nr_running, &t_rate, &m_rate); 1652 1653 if (is_backend) 1654 fio_server_idle_loop(); 1655 else 1656 usleep(10000); 1657 } 1658 1659 update_io_ticks(); 1660 fio_unpin_memory(); 1661} 1662 1663void wait_for_disk_thread_exit(void) 1664{ 1665 fio_mutex_down(disk_thread_mutex); 1666} 1667 1668static void free_disk_util(void) 1669{ 1670 disk_util_start_exit(); 1671 wait_for_disk_thread_exit(); 1672 disk_util_prune_entries(); 1673} 1674 1675static void *disk_thread_main(void *data) 1676{ 1677 int ret = 0; 1678 1679 fio_mutex_up(startup_mutex); 1680 1681 while (threads && !ret) { 1682 usleep(DISK_UTIL_MSEC * 1000); 1683 if (!threads) 1684 break; 1685 ret = update_io_ticks(); 1686 1687 if (!is_backend) 1688 print_thread_status(); 1689 } 1690 1691 fio_mutex_up(disk_thread_mutex); 1692 return NULL; 1693} 1694 1695static int create_disk_util_thread(void) 1696{ 1697 int ret; 1698 1699 setup_disk_util(); 1700 1701 disk_thread_mutex = fio_mutex_init(FIO_MUTEX_LOCKED); 1702 1703 ret = pthread_create(&disk_util_thread, NULL, disk_thread_main, NULL); 1704 if (ret) { 1705 fio_mutex_remove(disk_thread_mutex); 1706 log_err("Can't create disk util thread: %s\n", strerror(ret)); 1707 return 1; 1708 } 1709 1710 ret = pthread_detach(disk_util_thread); 1711 if (ret) { 1712 fio_mutex_remove(disk_thread_mutex); 1713 log_err("Can't detatch disk util thread: %s\n", strerror(ret)); 1714 return 1; 1715 } 1716 1717 dprint(FD_MUTEX, "wait on startup_mutex\n"); 1718 fio_mutex_down(startup_mutex); 1719 dprint(FD_MUTEX, "done waiting on startup_mutex\n"); 1720 return 0; 1721} 1722 1723int fio_backend(void) 1724{ 1725 struct thread_data *td; 1726 int i; 1727 1728 if (exec_profile) { 1729 if (load_profile(exec_profile)) 1730 return 1; 1731 free(exec_profile); 1732 exec_profile = NULL; 1733 } 1734 if (!thread_number) 1735 return 0; 1736 1737 if (write_bw_log) { 1738 setup_log(&agg_io_log[DDIR_READ], 0); 1739 setup_log(&agg_io_log[DDIR_WRITE], 0); 1740 setup_log(&agg_io_log[DDIR_TRIM], 0); 1741 } 1742 1743 startup_mutex = fio_mutex_init(FIO_MUTEX_LOCKED); 1744 if (startup_mutex == NULL) 1745 return 1; 1746 writeout_mutex = fio_mutex_init(FIO_MUTEX_UNLOCKED); 1747 if (writeout_mutex == NULL) 1748 return 1; 1749 1750 set_genesis_time(); 1751 create_disk_util_thread(); 1752 1753 cgroup_list = smalloc(sizeof(*cgroup_list)); 1754 INIT_FLIST_HEAD(cgroup_list); 1755 1756 run_threads(); 1757 1758 if (!fio_abort) { 1759 show_run_stats(); 1760 if (write_bw_log) { 1761 __finish_log(agg_io_log[DDIR_READ], "agg-read_bw.log"); 1762 __finish_log(agg_io_log[DDIR_WRITE], 1763 "agg-write_bw.log"); 1764 __finish_log(agg_io_log[DDIR_TRIM], 1765 "agg-write_bw.log"); 1766 } 1767 } 1768 1769 for_each_td(td, i) 1770 fio_options_free(td); 1771 1772 free_disk_util(); 1773 cgroup_kill(cgroup_list); 1774 sfree(cgroup_list); 1775 sfree(cgroup_mnt); 1776 1777 fio_mutex_remove(startup_mutex); 1778 fio_mutex_remove(writeout_mutex); 1779 fio_mutex_remove(disk_thread_mutex); 1780 return exit_value; 1781} 1782