backend.c revision 2e1df07d1ea30e0304cc65370f3ed161a6f22cd4
1/* 2 * fio - the flexible io tester 3 * 4 * Copyright (C) 2005 Jens Axboe <axboe@suse.de> 5 * Copyright (C) 2006-2012 Jens Axboe <axboe@kernel.dk> 6 * 7 * The license below covers all files distributed with fio unless otherwise 8 * noted in the file itself. 9 * 10 * This program is free software; you can redistribute it and/or modify 11 * it under the terms of the GNU General Public License version 2 as 12 * published by the Free Software Foundation. 13 * 14 * This program is distributed in the hope that it will be useful, 15 * but WITHOUT ANY WARRANTY; without even the implied warranty of 16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 * GNU General Public License for more details. 18 * 19 * You should have received a copy of the GNU General Public License 20 * along with this program; if not, write to the Free Software 21 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 22 * 23 */ 24#include <unistd.h> 25#include <fcntl.h> 26#include <string.h> 27#include <limits.h> 28#include <signal.h> 29#include <time.h> 30#include <locale.h> 31#include <assert.h> 32#include <time.h> 33#include <sys/stat.h> 34#include <sys/wait.h> 35#include <sys/ipc.h> 36#include <sys/shm.h> 37#include <sys/mman.h> 38 39#include "fio.h" 40#include "hash.h" 41#include "smalloc.h" 42#include "verify.h" 43#include "trim.h" 44#include "diskutil.h" 45#include "cgroup.h" 46#include "profile.h" 47#include "lib/rand.h" 48#include "memalign.h" 49#include "server.h" 50 51static pthread_t disk_util_thread; 52static struct fio_mutex *startup_mutex; 53static struct fio_mutex *writeout_mutex; 54static struct flist_head *cgroup_list; 55static char *cgroup_mnt; 56static int exit_value; 57static volatile int fio_abort; 58 59struct io_log *agg_io_log[2]; 60 61#define PAGE_ALIGN(buf) \ 62 (char *) (((unsigned long) (buf) + page_mask) & ~page_mask) 63 64#define JOB_START_TIMEOUT (5 * 1000) 65 66static void sig_int(int sig) 67{ 68 if (threads) { 69 if (is_backend) 70 fio_server_got_signal(sig); 71 else { 72 log_info("\nfio: terminating on signal %d\n", sig); 73 fflush(stdout); 74 exit_value = 128; 75 } 76 77 fio_terminate_threads(TERMINATE_ALL); 78 } 79} 80 81static void set_sig_handlers(void) 82{ 83 struct sigaction act; 84 85 memset(&act, 0, sizeof(act)); 86 act.sa_handler = sig_int; 87 act.sa_flags = SA_RESTART; 88 sigaction(SIGINT, &act, NULL); 89 90 memset(&act, 0, sizeof(act)); 91 act.sa_handler = sig_int; 92 act.sa_flags = SA_RESTART; 93 sigaction(SIGTERM, &act, NULL); 94 95 if (is_backend) { 96 memset(&act, 0, sizeof(act)); 97 act.sa_handler = sig_int; 98 act.sa_flags = SA_RESTART; 99 sigaction(SIGPIPE, &act, NULL); 100 } 101} 102 103/* 104 * Check if we are above the minimum rate given. 105 */ 106static int __check_min_rate(struct thread_data *td, struct timeval *now, 107 enum fio_ddir ddir) 108{ 109 unsigned long long bytes = 0; 110 unsigned long iops = 0; 111 unsigned long spent; 112 unsigned long rate; 113 unsigned int ratemin = 0; 114 unsigned int rate_iops = 0; 115 unsigned int rate_iops_min = 0; 116 117 assert(ddir_rw(ddir)); 118 119 if (!td->o.ratemin[ddir] && !td->o.rate_iops_min[ddir]) 120 return 0; 121 122 /* 123 * allow a 2 second settle period in the beginning 124 */ 125 if (mtime_since(&td->start, now) < 2000) 126 return 0; 127 128 iops += td->this_io_blocks[ddir]; 129 bytes += td->this_io_bytes[ddir]; 130 ratemin += td->o.ratemin[ddir]; 131 rate_iops += td->o.rate_iops[ddir]; 132 rate_iops_min += td->o.rate_iops_min[ddir]; 133 134 /* 135 * if rate blocks is set, sample is running 136 */ 137 if (td->rate_bytes[ddir] || td->rate_blocks[ddir]) { 138 spent = mtime_since(&td->lastrate[ddir], now); 139 if (spent < td->o.ratecycle) 140 return 0; 141 142 if (td->o.rate[ddir]) { 143 /* 144 * check bandwidth specified rate 145 */ 146 if (bytes < td->rate_bytes[ddir]) { 147 log_err("%s: min rate %u not met\n", td->o.name, 148 ratemin); 149 return 1; 150 } else { 151 rate = ((bytes - td->rate_bytes[ddir]) * 1000) / spent; 152 if (rate < ratemin || 153 bytes < td->rate_bytes[ddir]) { 154 log_err("%s: min rate %u not met, got" 155 " %luKB/sec\n", td->o.name, 156 ratemin, rate); 157 return 1; 158 } 159 } 160 } else { 161 /* 162 * checks iops specified rate 163 */ 164 if (iops < rate_iops) { 165 log_err("%s: min iops rate %u not met\n", 166 td->o.name, rate_iops); 167 return 1; 168 } else { 169 rate = ((iops - td->rate_blocks[ddir]) * 1000) / spent; 170 if (rate < rate_iops_min || 171 iops < td->rate_blocks[ddir]) { 172 log_err("%s: min iops rate %u not met," 173 " got %lu\n", td->o.name, 174 rate_iops_min, rate); 175 } 176 } 177 } 178 } 179 180 td->rate_bytes[ddir] = bytes; 181 td->rate_blocks[ddir] = iops; 182 memcpy(&td->lastrate[ddir], now, sizeof(*now)); 183 return 0; 184} 185 186static int check_min_rate(struct thread_data *td, struct timeval *now, 187 unsigned long *bytes_done) 188{ 189 int ret = 0; 190 191 if (bytes_done[0]) 192 ret |= __check_min_rate(td, now, 0); 193 if (bytes_done[1]) 194 ret |= __check_min_rate(td, now, 1); 195 196 return ret; 197} 198 199/* 200 * When job exits, we can cancel the in-flight IO if we are using async 201 * io. Attempt to do so. 202 */ 203static void cleanup_pending_aio(struct thread_data *td) 204{ 205 struct flist_head *entry, *n; 206 struct io_u *io_u; 207 int r; 208 209 /* 210 * get immediately available events, if any 211 */ 212 r = io_u_queued_complete(td, 0, NULL); 213 if (r < 0) 214 return; 215 216 /* 217 * now cancel remaining active events 218 */ 219 if (td->io_ops->cancel) { 220 flist_for_each_safe(entry, n, &td->io_u_busylist) { 221 io_u = flist_entry(entry, struct io_u, list); 222 223 /* 224 * if the io_u isn't in flight, then that generally 225 * means someone leaked an io_u. complain but fix 226 * it up, so we don't stall here. 227 */ 228 if ((io_u->flags & IO_U_F_FLIGHT) == 0) { 229 log_err("fio: non-busy IO on busy list\n"); 230 put_io_u(td, io_u); 231 } else { 232 r = td->io_ops->cancel(td, io_u); 233 if (!r) 234 put_io_u(td, io_u); 235 } 236 } 237 } 238 239 if (td->cur_depth) 240 r = io_u_queued_complete(td, td->cur_depth, NULL); 241} 242 243/* 244 * Helper to handle the final sync of a file. Works just like the normal 245 * io path, just does everything sync. 246 */ 247static int fio_io_sync(struct thread_data *td, struct fio_file *f) 248{ 249 struct io_u *io_u = __get_io_u(td); 250 int ret; 251 252 if (!io_u) 253 return 1; 254 255 io_u->ddir = DDIR_SYNC; 256 io_u->file = f; 257 258 if (td_io_prep(td, io_u)) { 259 put_io_u(td, io_u); 260 return 1; 261 } 262 263requeue: 264 ret = td_io_queue(td, io_u); 265 if (ret < 0) { 266 td_verror(td, io_u->error, "td_io_queue"); 267 put_io_u(td, io_u); 268 return 1; 269 } else if (ret == FIO_Q_QUEUED) { 270 if (io_u_queued_complete(td, 1, NULL) < 0) 271 return 1; 272 } else if (ret == FIO_Q_COMPLETED) { 273 if (io_u->error) { 274 td_verror(td, io_u->error, "td_io_queue"); 275 return 1; 276 } 277 278 if (io_u_sync_complete(td, io_u, NULL) < 0) 279 return 1; 280 } else if (ret == FIO_Q_BUSY) { 281 if (td_io_commit(td)) 282 return 1; 283 goto requeue; 284 } 285 286 return 0; 287} 288static inline void __update_tv_cache(struct thread_data *td) 289{ 290 fio_gettime(&td->tv_cache, NULL); 291} 292 293static inline void update_tv_cache(struct thread_data *td) 294{ 295 if ((++td->tv_cache_nr & td->tv_cache_mask) == td->tv_cache_mask) 296 __update_tv_cache(td); 297} 298 299static inline int runtime_exceeded(struct thread_data *td, struct timeval *t) 300{ 301 if (in_ramp_time(td)) 302 return 0; 303 if (!td->o.timeout) 304 return 0; 305 if (mtime_since(&td->epoch, t) >= td->o.timeout * 1000) 306 return 1; 307 308 return 0; 309} 310 311static int break_on_this_error(struct thread_data *td, enum fio_ddir ddir, 312 int *retptr) 313{ 314 int ret = *retptr; 315 316 if (ret < 0 || td->error) { 317 int err; 318 319 if (ret < 0) 320 err = -ret; 321 else 322 err = td->error; 323 324 if (!(td->o.continue_on_error & td_error_type(ddir, err))) 325 return 1; 326 327 if (td_non_fatal_error(err)) { 328 /* 329 * Continue with the I/Os in case of 330 * a non fatal error. 331 */ 332 update_error_count(td, err); 333 td_clear_error(td); 334 *retptr = 0; 335 return 0; 336 } else if (td->o.fill_device && err == ENOSPC) { 337 /* 338 * We expect to hit this error if 339 * fill_device option is set. 340 */ 341 td_clear_error(td); 342 td->terminate = 1; 343 return 1; 344 } else { 345 /* 346 * Stop the I/O in case of a fatal 347 * error. 348 */ 349 update_error_count(td, err); 350 return 1; 351 } 352 } 353 354 return 0; 355} 356 357 358 359/* 360 * The main verify engine. Runs over the writes we previously submitted, 361 * reads the blocks back in, and checks the crc/md5 of the data. 362 */ 363static void do_verify(struct thread_data *td) 364{ 365 struct fio_file *f; 366 struct io_u *io_u; 367 int ret, min_events; 368 unsigned int i; 369 370 dprint(FD_VERIFY, "starting loop\n"); 371 372 /* 373 * sync io first and invalidate cache, to make sure we really 374 * read from disk. 375 */ 376 for_each_file(td, f, i) { 377 if (!fio_file_open(f)) 378 continue; 379 if (fio_io_sync(td, f)) 380 break; 381 if (file_invalidate_cache(td, f)) 382 break; 383 } 384 385 if (td->error) 386 return; 387 388 td_set_runstate(td, TD_VERIFYING); 389 390 io_u = NULL; 391 while (!td->terminate) { 392 int ret2, full; 393 394 update_tv_cache(td); 395 396 if (runtime_exceeded(td, &td->tv_cache)) { 397 __update_tv_cache(td); 398 if (runtime_exceeded(td, &td->tv_cache)) { 399 td->terminate = 1; 400 break; 401 } 402 } 403 404 io_u = __get_io_u(td); 405 if (!io_u) 406 break; 407 408 if (get_next_verify(td, io_u)) { 409 put_io_u(td, io_u); 410 break; 411 } 412 413 if (td_io_prep(td, io_u)) { 414 put_io_u(td, io_u); 415 break; 416 } 417 418 if (td->o.verify_async) 419 io_u->end_io = verify_io_u_async; 420 else 421 io_u->end_io = verify_io_u; 422 423 ret = td_io_queue(td, io_u); 424 switch (ret) { 425 case FIO_Q_COMPLETED: 426 if (io_u->error) { 427 ret = -io_u->error; 428 clear_io_u(td, io_u); 429 } else if (io_u->resid) { 430 int bytes = io_u->xfer_buflen - io_u->resid; 431 432 /* 433 * zero read, fail 434 */ 435 if (!bytes) { 436 td_verror(td, EIO, "full resid"); 437 put_io_u(td, io_u); 438 break; 439 } 440 441 io_u->xfer_buflen = io_u->resid; 442 io_u->xfer_buf += bytes; 443 io_u->offset += bytes; 444 445 if (ddir_rw(io_u->ddir)) 446 td->ts.short_io_u[io_u->ddir]++; 447 448 f = io_u->file; 449 if (io_u->offset == f->real_file_size) 450 goto sync_done; 451 452 requeue_io_u(td, &io_u); 453 } else { 454sync_done: 455 ret = io_u_sync_complete(td, io_u, NULL); 456 if (ret < 0) 457 break; 458 } 459 continue; 460 case FIO_Q_QUEUED: 461 break; 462 case FIO_Q_BUSY: 463 requeue_io_u(td, &io_u); 464 ret2 = td_io_commit(td); 465 if (ret2 < 0) 466 ret = ret2; 467 break; 468 default: 469 assert(ret < 0); 470 td_verror(td, -ret, "td_io_queue"); 471 break; 472 } 473 474 if (break_on_this_error(td, io_u->ddir, &ret)) 475 break; 476 477 /* 478 * if we can queue more, do so. but check if there are 479 * completed io_u's first. Note that we can get BUSY even 480 * without IO queued, if the system is resource starved. 481 */ 482 full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth); 483 if (full || !td->o.iodepth_batch_complete) { 484 min_events = min(td->o.iodepth_batch_complete, 485 td->cur_depth); 486 if (full && !min_events && td->o.iodepth_batch_complete != 0) 487 min_events = 1; 488 489 do { 490 /* 491 * Reap required number of io units, if any, 492 * and do the verification on them through 493 * the callback handler 494 */ 495 if (io_u_queued_complete(td, min_events, NULL) < 0) { 496 ret = -1; 497 break; 498 } 499 } while (full && (td->cur_depth > td->o.iodepth_low)); 500 } 501 if (ret < 0) 502 break; 503 } 504 505 if (!td->error) { 506 min_events = td->cur_depth; 507 508 if (min_events) 509 ret = io_u_queued_complete(td, min_events, NULL); 510 } else 511 cleanup_pending_aio(td); 512 513 td_set_runstate(td, TD_RUNNING); 514 515 dprint(FD_VERIFY, "exiting loop\n"); 516} 517 518/* 519 * Main IO worker function. It retrieves io_u's to process and queues 520 * and reaps them, checking for rate and errors along the way. 521 */ 522static void do_io(struct thread_data *td) 523{ 524 unsigned int i; 525 int ret = 0; 526 527 if (in_ramp_time(td)) 528 td_set_runstate(td, TD_RAMP); 529 else 530 td_set_runstate(td, TD_RUNNING); 531 532 while ( (td->o.read_iolog_file && !flist_empty(&td->io_log_list)) || 533 (!flist_empty(&td->trim_list)) || 534 ((td->this_io_bytes[0] + td->this_io_bytes[1]) < td->o.size) ) { 535 struct timeval comp_time; 536 unsigned long bytes_done[2] = { 0, 0 }; 537 int min_evts = 0; 538 struct io_u *io_u; 539 int ret2, full; 540 enum fio_ddir ddir; 541 542 if (td->terminate) 543 break; 544 545 update_tv_cache(td); 546 547 if (runtime_exceeded(td, &td->tv_cache)) { 548 __update_tv_cache(td); 549 if (runtime_exceeded(td, &td->tv_cache)) { 550 td->terminate = 1; 551 break; 552 } 553 } 554 555 io_u = get_io_u(td); 556 if (!io_u) 557 break; 558 559 ddir = io_u->ddir; 560 561 /* 562 * Add verification end_io handler, if asked to verify 563 * a previously written file. 564 */ 565 if (td->o.verify != VERIFY_NONE && io_u->ddir == DDIR_READ && 566 !td_rw(td)) { 567 if (td->o.verify_async) 568 io_u->end_io = verify_io_u_async; 569 else 570 io_u->end_io = verify_io_u; 571 td_set_runstate(td, TD_VERIFYING); 572 } else if (in_ramp_time(td)) 573 td_set_runstate(td, TD_RAMP); 574 else 575 td_set_runstate(td, TD_RUNNING); 576 577 ret = td_io_queue(td, io_u); 578 switch (ret) { 579 case FIO_Q_COMPLETED: 580 if (io_u->error) { 581 ret = -io_u->error; 582 clear_io_u(td, io_u); 583 } else if (io_u->resid) { 584 int bytes = io_u->xfer_buflen - io_u->resid; 585 struct fio_file *f = io_u->file; 586 587 /* 588 * zero read, fail 589 */ 590 if (!bytes) { 591 td_verror(td, EIO, "full resid"); 592 put_io_u(td, io_u); 593 break; 594 } 595 596 io_u->xfer_buflen = io_u->resid; 597 io_u->xfer_buf += bytes; 598 io_u->offset += bytes; 599 600 if (ddir_rw(io_u->ddir)) 601 td->ts.short_io_u[io_u->ddir]++; 602 603 if (io_u->offset == f->real_file_size) 604 goto sync_done; 605 606 requeue_io_u(td, &io_u); 607 } else { 608sync_done: 609 if (__should_check_rate(td, 0) || 610 __should_check_rate(td, 1)) 611 fio_gettime(&comp_time, NULL); 612 613 ret = io_u_sync_complete(td, io_u, bytes_done); 614 if (ret < 0) 615 break; 616 } 617 break; 618 case FIO_Q_QUEUED: 619 /* 620 * if the engine doesn't have a commit hook, 621 * the io_u is really queued. if it does have such 622 * a hook, it has to call io_u_queued() itself. 623 */ 624 if (td->io_ops->commit == NULL) 625 io_u_queued(td, io_u); 626 break; 627 case FIO_Q_BUSY: 628 requeue_io_u(td, &io_u); 629 ret2 = td_io_commit(td); 630 if (ret2 < 0) 631 ret = ret2; 632 break; 633 default: 634 assert(ret < 0); 635 put_io_u(td, io_u); 636 break; 637 } 638 639 if (break_on_this_error(td, ddir, &ret)) 640 break; 641 642 /* 643 * See if we need to complete some commands. Note that we 644 * can get BUSY even without IO queued, if the system is 645 * resource starved. 646 */ 647 full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth); 648 if (full || !td->o.iodepth_batch_complete) { 649 min_evts = min(td->o.iodepth_batch_complete, 650 td->cur_depth); 651 if (full && !min_evts && td->o.iodepth_batch_complete != 0) 652 min_evts = 1; 653 654 if (__should_check_rate(td, 0) || 655 __should_check_rate(td, 1)) 656 fio_gettime(&comp_time, NULL); 657 658 do { 659 ret = io_u_queued_complete(td, min_evts, bytes_done); 660 if (ret < 0) 661 break; 662 663 } while (full && (td->cur_depth > td->o.iodepth_low)); 664 } 665 666 if (ret < 0) 667 break; 668 if (!(bytes_done[0] + bytes_done[1])) 669 continue; 670 671 if (!in_ramp_time(td) && should_check_rate(td, bytes_done)) { 672 if (check_min_rate(td, &comp_time, bytes_done)) { 673 if (exitall_on_terminate) 674 fio_terminate_threads(td->groupid); 675 td_verror(td, EIO, "check_min_rate"); 676 break; 677 } 678 } 679 680 if (td->o.thinktime) { 681 unsigned long long b; 682 683 b = td->io_blocks[0] + td->io_blocks[1]; 684 if (!(b % td->o.thinktime_blocks)) { 685 int left; 686 687 if (td->o.thinktime_spin) 688 usec_spin(td->o.thinktime_spin); 689 690 left = td->o.thinktime - td->o.thinktime_spin; 691 if (left) 692 usec_sleep(td, left); 693 } 694 } 695 } 696 697 if (td->trim_entries) 698 log_err("fio: %d trim entries leaked?\n", td->trim_entries); 699 700 if (td->o.fill_device && td->error == ENOSPC) { 701 td->error = 0; 702 td->terminate = 1; 703 } 704 if (!td->error) { 705 struct fio_file *f; 706 707 i = td->cur_depth; 708 if (i) { 709 ret = io_u_queued_complete(td, i, NULL); 710 if (td->o.fill_device && td->error == ENOSPC) 711 td->error = 0; 712 } 713 714 if (should_fsync(td) && td->o.end_fsync) { 715 td_set_runstate(td, TD_FSYNCING); 716 717 for_each_file(td, f, i) { 718 if (!fio_file_open(f)) 719 continue; 720 fio_io_sync(td, f); 721 } 722 } 723 } else 724 cleanup_pending_aio(td); 725 726 /* 727 * stop job if we failed doing any IO 728 */ 729 if ((td->this_io_bytes[0] + td->this_io_bytes[1]) == 0) 730 td->done = 1; 731} 732 733static void cleanup_io_u(struct thread_data *td) 734{ 735 struct flist_head *entry, *n; 736 struct io_u *io_u; 737 738 flist_for_each_safe(entry, n, &td->io_u_freelist) { 739 io_u = flist_entry(entry, struct io_u, list); 740 741 flist_del(&io_u->list); 742 fio_memfree(io_u, sizeof(*io_u)); 743 } 744 745 free_io_mem(td); 746} 747 748static int init_io_u(struct thread_data *td) 749{ 750 struct io_u *io_u; 751 unsigned int max_bs; 752 int cl_align, i, max_units; 753 char *p; 754 755 max_units = td->o.iodepth; 756 max_bs = max(td->o.max_bs[DDIR_READ], td->o.max_bs[DDIR_WRITE]); 757 td->orig_buffer_size = (unsigned long long) max_bs 758 * (unsigned long long) max_units; 759 760 if (td->o.mem_type == MEM_SHMHUGE || td->o.mem_type == MEM_MMAPHUGE) { 761 unsigned long bs; 762 763 bs = td->orig_buffer_size + td->o.hugepage_size - 1; 764 td->orig_buffer_size = bs & ~(td->o.hugepage_size - 1); 765 } 766 767 if (td->orig_buffer_size != (size_t) td->orig_buffer_size) { 768 log_err("fio: IO memory too large. Reduce max_bs or iodepth\n"); 769 return 1; 770 } 771 772 if (allocate_io_mem(td)) 773 return 1; 774 775 if (td->o.odirect || td->o.mem_align || 776 (td->io_ops->flags & FIO_RAWIO)) 777 p = PAGE_ALIGN(td->orig_buffer) + td->o.mem_align; 778 else 779 p = td->orig_buffer; 780 781 cl_align = os_cache_line_size(); 782 783 for (i = 0; i < max_units; i++) { 784 void *ptr; 785 786 if (td->terminate) 787 return 1; 788 789 ptr = fio_memalign(cl_align, sizeof(*io_u)); 790 if (!ptr) { 791 log_err("fio: unable to allocate aligned memory\n"); 792 break; 793 } 794 795 io_u = ptr; 796 memset(io_u, 0, sizeof(*io_u)); 797 INIT_FLIST_HEAD(&io_u->list); 798 dprint(FD_MEM, "io_u alloc %p, index %u\n", io_u, i); 799 800 if (!(td->io_ops->flags & FIO_NOIO)) { 801 io_u->buf = p; 802 dprint(FD_MEM, "io_u %p, mem %p\n", io_u, io_u->buf); 803 804 if (td_write(td)) 805 io_u_fill_buffer(td, io_u, max_bs); 806 if (td_write(td) && td->o.verify_pattern_bytes) { 807 /* 808 * Fill the buffer with the pattern if we are 809 * going to be doing writes. 810 */ 811 fill_pattern(td, io_u->buf, max_bs, io_u, 0, 0); 812 } 813 } 814 815 io_u->index = i; 816 io_u->flags = IO_U_F_FREE; 817 flist_add(&io_u->list, &td->io_u_freelist); 818 p += max_bs; 819 } 820 821 return 0; 822} 823 824static int switch_ioscheduler(struct thread_data *td) 825{ 826 char tmp[256], tmp2[128]; 827 FILE *f; 828 int ret; 829 830 if (td->io_ops->flags & FIO_DISKLESSIO) 831 return 0; 832 833 sprintf(tmp, "%s/queue/scheduler", td->sysfs_root); 834 835 f = fopen(tmp, "r+"); 836 if (!f) { 837 if (errno == ENOENT) { 838 log_err("fio: os or kernel doesn't support IO scheduler" 839 " switching\n"); 840 return 0; 841 } 842 td_verror(td, errno, "fopen iosched"); 843 return 1; 844 } 845 846 /* 847 * Set io scheduler. 848 */ 849 ret = fwrite(td->o.ioscheduler, strlen(td->o.ioscheduler), 1, f); 850 if (ferror(f) || ret != 1) { 851 td_verror(td, errno, "fwrite"); 852 fclose(f); 853 return 1; 854 } 855 856 rewind(f); 857 858 /* 859 * Read back and check that the selected scheduler is now the default. 860 */ 861 ret = fread(tmp, 1, sizeof(tmp), f); 862 if (ferror(f) || ret < 0) { 863 td_verror(td, errno, "fread"); 864 fclose(f); 865 return 1; 866 } 867 868 sprintf(tmp2, "[%s]", td->o.ioscheduler); 869 if (!strstr(tmp, tmp2)) { 870 log_err("fio: io scheduler %s not found\n", td->o.ioscheduler); 871 td_verror(td, EINVAL, "iosched_switch"); 872 fclose(f); 873 return 1; 874 } 875 876 fclose(f); 877 return 0; 878} 879 880static int keep_running(struct thread_data *td) 881{ 882 unsigned long long io_done; 883 884 if (td->done) 885 return 0; 886 if (td->o.time_based) 887 return 1; 888 if (td->o.loops) { 889 td->o.loops--; 890 return 1; 891 } 892 893 io_done = td->io_bytes[DDIR_READ] + td->io_bytes[DDIR_WRITE] 894 + td->io_skip_bytes; 895 if (io_done < td->o.size) 896 return 1; 897 898 return 0; 899} 900 901static int exec_string(const char *string) 902{ 903 int ret, newlen = strlen(string) + 1 + 8; 904 char *str; 905 906 str = malloc(newlen); 907 sprintf(str, "sh -c %s", string); 908 909 ret = system(str); 910 if (ret == -1) 911 log_err("fio: exec of cmd <%s> failed\n", str); 912 913 free(str); 914 return ret; 915} 916 917/* 918 * Entry point for the thread based jobs. The process based jobs end up 919 * here as well, after a little setup. 920 */ 921static void *thread_main(void *data) 922{ 923 unsigned long long elapsed; 924 struct thread_data *td = data; 925 pthread_condattr_t attr; 926 int clear_state; 927 928 if (!td->o.use_thread) { 929 setsid(); 930 td->pid = getpid(); 931 } else 932 td->pid = gettid(); 933 934 dprint(FD_PROCESS, "jobs pid=%d started\n", (int) td->pid); 935 936 INIT_FLIST_HEAD(&td->io_u_freelist); 937 INIT_FLIST_HEAD(&td->io_u_busylist); 938 INIT_FLIST_HEAD(&td->io_u_requeues); 939 INIT_FLIST_HEAD(&td->io_log_list); 940 INIT_FLIST_HEAD(&td->io_hist_list); 941 INIT_FLIST_HEAD(&td->verify_list); 942 INIT_FLIST_HEAD(&td->trim_list); 943 pthread_mutex_init(&td->io_u_lock, NULL); 944 td->io_hist_tree = RB_ROOT; 945 946 pthread_condattr_init(&attr); 947 pthread_cond_init(&td->verify_cond, &attr); 948 pthread_cond_init(&td->free_cond, &attr); 949 950 td_set_runstate(td, TD_INITIALIZED); 951 dprint(FD_MUTEX, "up startup_mutex\n"); 952 fio_mutex_up(startup_mutex); 953 dprint(FD_MUTEX, "wait on td->mutex\n"); 954 fio_mutex_down(td->mutex); 955 dprint(FD_MUTEX, "done waiting on td->mutex\n"); 956 957 /* 958 * the ->mutex mutex is now no longer used, close it to avoid 959 * eating a file descriptor 960 */ 961 fio_mutex_remove(td->mutex); 962 963 /* 964 * A new gid requires privilege, so we need to do this before setting 965 * the uid. 966 */ 967 if (td->o.gid != -1U && setgid(td->o.gid)) { 968 td_verror(td, errno, "setgid"); 969 goto err; 970 } 971 if (td->o.uid != -1U && setuid(td->o.uid)) { 972 td_verror(td, errno, "setuid"); 973 goto err; 974 } 975 976 /* 977 * If we have a gettimeofday() thread, make sure we exclude that 978 * thread from this job 979 */ 980 if (td->o.gtod_cpu) 981 fio_cpu_clear(&td->o.cpumask, td->o.gtod_cpu); 982 983 /* 984 * Set affinity first, in case it has an impact on the memory 985 * allocations. 986 */ 987 if (td->o.cpumask_set && fio_setaffinity(td->pid, td->o.cpumask) == -1) { 988 td_verror(td, errno, "cpu_set_affinity"); 989 goto err; 990 } 991 992 /* 993 * May alter parameters that init_io_u() will use, so we need to 994 * do this first. 995 */ 996 if (init_iolog(td)) 997 goto err; 998 999 if (init_io_u(td)) 1000 goto err; 1001 1002 if (td->o.verify_async && verify_async_init(td)) 1003 goto err; 1004 1005 if (td->ioprio_set) { 1006 if (ioprio_set(IOPRIO_WHO_PROCESS, 0, td->ioprio) == -1) { 1007 td_verror(td, errno, "ioprio_set"); 1008 goto err; 1009 } 1010 } 1011 1012 if (td->o.cgroup_weight && cgroup_setup(td, cgroup_list, &cgroup_mnt)) 1013 goto err; 1014 1015 if (nice(td->o.nice) == -1) { 1016 td_verror(td, errno, "nice"); 1017 goto err; 1018 } 1019 1020 if (td->o.ioscheduler && switch_ioscheduler(td)) 1021 goto err; 1022 1023 if (!td->o.create_serialize && setup_files(td)) 1024 goto err; 1025 1026 if (td_io_init(td)) 1027 goto err; 1028 1029 if (init_random_map(td)) 1030 goto err; 1031 1032 if (td->o.exec_prerun) { 1033 if (exec_string(td->o.exec_prerun)) 1034 goto err; 1035 } 1036 1037 if (td->o.pre_read) { 1038 if (pre_read_files(td) < 0) 1039 goto err; 1040 } 1041 1042 fio_gettime(&td->epoch, NULL); 1043 getrusage(RUSAGE_SELF, &td->ru_start); 1044 1045 clear_state = 0; 1046 while (keep_running(td)) { 1047 fio_gettime(&td->start, NULL); 1048 memcpy(&td->bw_sample_time, &td->start, sizeof(td->start)); 1049 memcpy(&td->iops_sample_time, &td->start, sizeof(td->start)); 1050 memcpy(&td->tv_cache, &td->start, sizeof(td->start)); 1051 1052 if (td->o.ratemin[0] || td->o.ratemin[1]) { 1053 memcpy(&td->lastrate[0], &td->bw_sample_time, 1054 sizeof(td->bw_sample_time)); 1055 memcpy(&td->lastrate[1], &td->bw_sample_time, 1056 sizeof(td->bw_sample_time)); 1057 } 1058 1059 if (clear_state) 1060 clear_io_state(td); 1061 1062 prune_io_piece_log(td); 1063 1064 do_io(td); 1065 1066 clear_state = 1; 1067 1068 if (td_read(td) && td->io_bytes[DDIR_READ]) { 1069 elapsed = utime_since_now(&td->start); 1070 td->ts.runtime[DDIR_READ] += elapsed; 1071 } 1072 if (td_write(td) && td->io_bytes[DDIR_WRITE]) { 1073 elapsed = utime_since_now(&td->start); 1074 td->ts.runtime[DDIR_WRITE] += elapsed; 1075 } 1076 1077 if (td->error || td->terminate) 1078 break; 1079 1080 if (!td->o.do_verify || 1081 td->o.verify == VERIFY_NONE || 1082 (td->io_ops->flags & FIO_UNIDIR)) 1083 continue; 1084 1085 clear_io_state(td); 1086 1087 fio_gettime(&td->start, NULL); 1088 1089 do_verify(td); 1090 1091 td->ts.runtime[DDIR_READ] += utime_since_now(&td->start); 1092 1093 if (td->error || td->terminate) 1094 break; 1095 } 1096 1097 update_rusage_stat(td); 1098 td->ts.runtime[0] = (td->ts.runtime[0] + 999) / 1000; 1099 td->ts.runtime[1] = (td->ts.runtime[1] + 999) / 1000; 1100 td->ts.total_run_time = mtime_since_now(&td->epoch); 1101 td->ts.io_bytes[0] = td->io_bytes[0]; 1102 td->ts.io_bytes[1] = td->io_bytes[1]; 1103 1104 fio_mutex_down(writeout_mutex); 1105 if (td->bw_log) { 1106 if (td->o.bw_log_file) { 1107 finish_log_named(td, td->bw_log, 1108 td->o.bw_log_file, "bw"); 1109 } else 1110 finish_log(td, td->bw_log, "bw"); 1111 } 1112 if (td->lat_log) { 1113 if (td->o.lat_log_file) { 1114 finish_log_named(td, td->lat_log, 1115 td->o.lat_log_file, "lat"); 1116 } else 1117 finish_log(td, td->lat_log, "lat"); 1118 } 1119 if (td->slat_log) { 1120 if (td->o.lat_log_file) { 1121 finish_log_named(td, td->slat_log, 1122 td->o.lat_log_file, "slat"); 1123 } else 1124 finish_log(td, td->slat_log, "slat"); 1125 } 1126 if (td->clat_log) { 1127 if (td->o.lat_log_file) { 1128 finish_log_named(td, td->clat_log, 1129 td->o.lat_log_file, "clat"); 1130 } else 1131 finish_log(td, td->clat_log, "clat"); 1132 } 1133 if (td->iops_log) { 1134 if (td->o.iops_log_file) { 1135 finish_log_named(td, td->iops_log, 1136 td->o.iops_log_file, "iops"); 1137 } else 1138 finish_log(td, td->iops_log, "iops"); 1139 } 1140 1141 fio_mutex_up(writeout_mutex); 1142 if (td->o.exec_postrun) 1143 exec_string(td->o.exec_postrun); 1144 1145 if (exitall_on_terminate) 1146 fio_terminate_threads(td->groupid); 1147 1148err: 1149 if (td->error) 1150 log_info("fio: pid=%d, err=%d/%s\n", (int) td->pid, td->error, 1151 td->verror); 1152 1153 if (td->o.verify_async) 1154 verify_async_exit(td); 1155 1156 close_and_free_files(td); 1157 close_ioengine(td); 1158 cleanup_io_u(td); 1159 cgroup_shutdown(td, &cgroup_mnt); 1160 1161 if (td->o.cpumask_set) { 1162 int ret = fio_cpuset_exit(&td->o.cpumask); 1163 1164 td_verror(td, ret, "fio_cpuset_exit"); 1165 } 1166 1167 /* 1168 * do this very late, it will log file closing as well 1169 */ 1170 if (td->o.write_iolog_file) 1171 write_iolog_close(td); 1172 1173 td_set_runstate(td, TD_EXITED); 1174 return (void *) (unsigned long) td->error; 1175} 1176 1177 1178/* 1179 * We cannot pass the td data into a forked process, so attach the td and 1180 * pass it to the thread worker. 1181 */ 1182static int fork_main(int shmid, int offset) 1183{ 1184 struct thread_data *td; 1185 void *data, *ret; 1186 1187#ifndef __hpux 1188 data = shmat(shmid, NULL, 0); 1189 if (data == (void *) -1) { 1190 int __err = errno; 1191 1192 perror("shmat"); 1193 return __err; 1194 } 1195#else 1196 /* 1197 * HP-UX inherits shm mappings? 1198 */ 1199 data = threads; 1200#endif 1201 1202 td = data + offset * sizeof(struct thread_data); 1203 ret = thread_main(td); 1204 shmdt(data); 1205 return (int) (unsigned long) ret; 1206} 1207 1208/* 1209 * Run over the job map and reap the threads that have exited, if any. 1210 */ 1211static void reap_threads(unsigned int *nr_running, unsigned int *t_rate, 1212 unsigned int *m_rate) 1213{ 1214 struct thread_data *td; 1215 unsigned int cputhreads, realthreads, pending; 1216 int i, status, ret; 1217 1218 /* 1219 * reap exited threads (TD_EXITED -> TD_REAPED) 1220 */ 1221 realthreads = pending = cputhreads = 0; 1222 for_each_td(td, i) { 1223 int flags = 0; 1224 1225 /* 1226 * ->io_ops is NULL for a thread that has closed its 1227 * io engine 1228 */ 1229 if (td->io_ops && !strcmp(td->io_ops->name, "cpuio")) 1230 cputhreads++; 1231 else 1232 realthreads++; 1233 1234 if (!td->pid) { 1235 pending++; 1236 continue; 1237 } 1238 if (td->runstate == TD_REAPED) 1239 continue; 1240 if (td->o.use_thread) { 1241 if (td->runstate == TD_EXITED) { 1242 td_set_runstate(td, TD_REAPED); 1243 goto reaped; 1244 } 1245 continue; 1246 } 1247 1248 flags = WNOHANG; 1249 if (td->runstate == TD_EXITED) 1250 flags = 0; 1251 1252 /* 1253 * check if someone quit or got killed in an unusual way 1254 */ 1255 ret = waitpid(td->pid, &status, flags); 1256 if (ret < 0) { 1257 if (errno == ECHILD) { 1258 log_err("fio: pid=%d disappeared %d\n", 1259 (int) td->pid, td->runstate); 1260 td_set_runstate(td, TD_REAPED); 1261 goto reaped; 1262 } 1263 perror("waitpid"); 1264 } else if (ret == td->pid) { 1265 if (WIFSIGNALED(status)) { 1266 int sig = WTERMSIG(status); 1267 1268 if (sig != SIGTERM) 1269 log_err("fio: pid=%d, got signal=%d\n", 1270 (int) td->pid, sig); 1271 td_set_runstate(td, TD_REAPED); 1272 goto reaped; 1273 } 1274 if (WIFEXITED(status)) { 1275 if (WEXITSTATUS(status) && !td->error) 1276 td->error = WEXITSTATUS(status); 1277 1278 td_set_runstate(td, TD_REAPED); 1279 goto reaped; 1280 } 1281 } 1282 1283 /* 1284 * thread is not dead, continue 1285 */ 1286 pending++; 1287 continue; 1288reaped: 1289 (*nr_running)--; 1290 (*m_rate) -= (td->o.ratemin[0] + td->o.ratemin[1]); 1291 (*t_rate) -= (td->o.rate[0] + td->o.rate[1]); 1292 if (!td->pid) 1293 pending--; 1294 1295 if (td->error) 1296 exit_value++; 1297 1298 done_secs += mtime_since_now(&td->epoch) / 1000; 1299 } 1300 1301 if (*nr_running == cputhreads && !pending && realthreads) 1302 fio_terminate_threads(TERMINATE_ALL); 1303} 1304 1305 1306 1307/* 1308 * Main function for kicking off and reaping jobs, as needed. 1309 */ 1310static void run_threads(void) 1311{ 1312 struct thread_data *td; 1313 unsigned long spent; 1314 unsigned int i, todo, nr_running, m_rate, t_rate, nr_started; 1315 1316 if (fio_pin_memory()) 1317 return; 1318 1319 if (fio_gtod_offload && fio_start_gtod_thread()) 1320 return; 1321 1322 set_sig_handlers(); 1323 1324 if (!terse_output) { 1325 log_info("Starting "); 1326 if (nr_thread) 1327 log_info("%d thread%s", nr_thread, 1328 nr_thread > 1 ? "s" : ""); 1329 if (nr_process) { 1330 if (nr_thread) 1331 log_info(" and "); 1332 log_info("%d process%s", nr_process, 1333 nr_process > 1 ? "es" : ""); 1334 } 1335 log_info("\n"); 1336 fflush(stdout); 1337 } 1338 1339 todo = thread_number; 1340 nr_running = 0; 1341 nr_started = 0; 1342 m_rate = t_rate = 0; 1343 1344 for_each_td(td, i) { 1345 print_status_init(td->thread_number - 1); 1346 1347 if (!td->o.create_serialize) 1348 continue; 1349 1350 /* 1351 * do file setup here so it happens sequentially, 1352 * we don't want X number of threads getting their 1353 * client data interspersed on disk 1354 */ 1355 if (setup_files(td)) { 1356 exit_value++; 1357 if (td->error) 1358 log_err("fio: pid=%d, err=%d/%s\n", 1359 (int) td->pid, td->error, td->verror); 1360 td_set_runstate(td, TD_REAPED); 1361 todo--; 1362 } else { 1363 struct fio_file *f; 1364 unsigned int j; 1365 1366 /* 1367 * for sharing to work, each job must always open 1368 * its own files. so close them, if we opened them 1369 * for creation 1370 */ 1371 for_each_file(td, f, j) { 1372 if (fio_file_open(f)) 1373 td_io_close_file(td, f); 1374 } 1375 } 1376 } 1377 1378 set_genesis_time(); 1379 1380 while (todo) { 1381 struct thread_data *map[REAL_MAX_JOBS]; 1382 struct timeval this_start; 1383 int this_jobs = 0, left; 1384 1385 /* 1386 * create threads (TD_NOT_CREATED -> TD_CREATED) 1387 */ 1388 for_each_td(td, i) { 1389 if (td->runstate != TD_NOT_CREATED) 1390 continue; 1391 1392 /* 1393 * never got a chance to start, killed by other 1394 * thread for some reason 1395 */ 1396 if (td->terminate) { 1397 todo--; 1398 continue; 1399 } 1400 1401 if (td->o.start_delay) { 1402 spent = mtime_since_genesis(); 1403 1404 if (td->o.start_delay * 1000 > spent) 1405 continue; 1406 } 1407 1408 if (td->o.stonewall && (nr_started || nr_running)) { 1409 dprint(FD_PROCESS, "%s: stonewall wait\n", 1410 td->o.name); 1411 break; 1412 } 1413 1414 init_disk_util(td); 1415 1416 /* 1417 * Set state to created. Thread will transition 1418 * to TD_INITIALIZED when it's done setting up. 1419 */ 1420 td_set_runstate(td, TD_CREATED); 1421 map[this_jobs++] = td; 1422 nr_started++; 1423 1424 if (td->o.use_thread) { 1425 int ret; 1426 1427 dprint(FD_PROCESS, "will pthread_create\n"); 1428 ret = pthread_create(&td->thread, NULL, 1429 thread_main, td); 1430 if (ret) { 1431 log_err("pthread_create: %s\n", 1432 strerror(ret)); 1433 nr_started--; 1434 break; 1435 } 1436 ret = pthread_detach(td->thread); 1437 if (ret) 1438 log_err("pthread_detach: %s", 1439 strerror(ret)); 1440 } else { 1441 pid_t pid; 1442 dprint(FD_PROCESS, "will fork\n"); 1443 pid = fork(); 1444 if (!pid) { 1445 int ret = fork_main(shm_id, i); 1446 1447 _exit(ret); 1448 } else if (i == fio_debug_jobno) 1449 *fio_debug_jobp = pid; 1450 } 1451 dprint(FD_MUTEX, "wait on startup_mutex\n"); 1452 if (fio_mutex_down_timeout(startup_mutex, 10)) { 1453 log_err("fio: job startup hung? exiting.\n"); 1454 fio_terminate_threads(TERMINATE_ALL); 1455 fio_abort = 1; 1456 nr_started--; 1457 break; 1458 } 1459 dprint(FD_MUTEX, "done waiting on startup_mutex\n"); 1460 } 1461 1462 /* 1463 * Wait for the started threads to transition to 1464 * TD_INITIALIZED. 1465 */ 1466 fio_gettime(&this_start, NULL); 1467 left = this_jobs; 1468 while (left && !fio_abort) { 1469 if (mtime_since_now(&this_start) > JOB_START_TIMEOUT) 1470 break; 1471 1472 usleep(100000); 1473 1474 for (i = 0; i < this_jobs; i++) { 1475 td = map[i]; 1476 if (!td) 1477 continue; 1478 if (td->runstate == TD_INITIALIZED) { 1479 map[i] = NULL; 1480 left--; 1481 } else if (td->runstate >= TD_EXITED) { 1482 map[i] = NULL; 1483 left--; 1484 todo--; 1485 nr_running++; /* work-around... */ 1486 } 1487 } 1488 } 1489 1490 if (left) { 1491 log_err("fio: %d jobs failed to start\n", left); 1492 for (i = 0; i < this_jobs; i++) { 1493 td = map[i]; 1494 if (!td) 1495 continue; 1496 kill(td->pid, SIGTERM); 1497 } 1498 break; 1499 } 1500 1501 /* 1502 * start created threads (TD_INITIALIZED -> TD_RUNNING). 1503 */ 1504 for_each_td(td, i) { 1505 if (td->runstate != TD_INITIALIZED) 1506 continue; 1507 1508 if (in_ramp_time(td)) 1509 td_set_runstate(td, TD_RAMP); 1510 else 1511 td_set_runstate(td, TD_RUNNING); 1512 nr_running++; 1513 nr_started--; 1514 m_rate += td->o.ratemin[0] + td->o.ratemin[1]; 1515 t_rate += td->o.rate[0] + td->o.rate[1]; 1516 todo--; 1517 fio_mutex_up(td->mutex); 1518 } 1519 1520 reap_threads(&nr_running, &t_rate, &m_rate); 1521 1522 if (todo) { 1523 if (is_backend) 1524 fio_server_idle_loop(); 1525 else 1526 usleep(100000); 1527 } 1528 } 1529 1530 while (nr_running) { 1531 reap_threads(&nr_running, &t_rate, &m_rate); 1532 1533 if (is_backend) 1534 fio_server_idle_loop(); 1535 else 1536 usleep(10000); 1537 } 1538 1539 update_io_ticks(); 1540 fio_unpin_memory(); 1541} 1542 1543static void *disk_thread_main(void *data) 1544{ 1545 fio_mutex_up(startup_mutex); 1546 1547 while (threads) { 1548 usleep(DISK_UTIL_MSEC * 1000); 1549 if (!threads) 1550 break; 1551 update_io_ticks(); 1552 1553 if (!is_backend) 1554 print_thread_status(); 1555 } 1556 1557 return NULL; 1558} 1559 1560static int create_disk_util_thread(void) 1561{ 1562 int ret; 1563 1564 ret = pthread_create(&disk_util_thread, NULL, disk_thread_main, NULL); 1565 if (ret) { 1566 log_err("Can't create disk util thread: %s\n", strerror(ret)); 1567 return 1; 1568 } 1569 1570 ret = pthread_detach(disk_util_thread); 1571 if (ret) { 1572 log_err("Can't detatch disk util thread: %s\n", strerror(ret)); 1573 return 1; 1574 } 1575 1576 dprint(FD_MUTEX, "wait on startup_mutex\n"); 1577 fio_mutex_down(startup_mutex); 1578 dprint(FD_MUTEX, "done waiting on startup_mutex\n"); 1579 return 0; 1580} 1581 1582 1583int fio_backend(void) 1584{ 1585 struct thread_data *td; 1586 int i; 1587 1588 if (exec_profile) { 1589 if (load_profile(exec_profile)) 1590 return 1; 1591 free(exec_profile); 1592 exec_profile = NULL; 1593 } 1594 if (!thread_number) 1595 return 0; 1596 1597 if (write_bw_log) { 1598 setup_log(&agg_io_log[DDIR_READ], 0); 1599 setup_log(&agg_io_log[DDIR_WRITE], 0); 1600 } 1601 1602 startup_mutex = fio_mutex_init(0); 1603 if (startup_mutex == NULL) 1604 return 1; 1605 writeout_mutex = fio_mutex_init(1); 1606 if (writeout_mutex == NULL) 1607 return 1; 1608 1609 set_genesis_time(); 1610 create_disk_util_thread(); 1611 1612 cgroup_list = smalloc(sizeof(*cgroup_list)); 1613 INIT_FLIST_HEAD(cgroup_list); 1614 1615 run_threads(); 1616 1617 if (!fio_abort) { 1618 show_run_stats(); 1619 if (write_bw_log) { 1620 __finish_log(agg_io_log[DDIR_READ], "agg-read_bw.log"); 1621 __finish_log(agg_io_log[DDIR_WRITE], 1622 "agg-write_bw.log"); 1623 } 1624 } 1625 1626 for_each_td(td, i) 1627 fio_options_free(td); 1628 1629 cgroup_kill(cgroup_list); 1630 sfree(cgroup_list); 1631 sfree(cgroup_mnt); 1632 1633 fio_mutex_remove(startup_mutex); 1634 fio_mutex_remove(writeout_mutex); 1635 return exit_value; 1636} 1637 1638 1639