aio-stress.c revision 43088e16aa60d69e3ec5a69cdd8bdd45b8891127
1/* 2 * Copyright (c) 2004 SuSE, Inc. All Rights Reserved. 3 * 4 * This program is free software; you can redistribute it and/or modify it 5 * under the terms of version 2 of the GNU General Public License as 6 * published by the Free Software Foundation. 7 * 8 * This program is distributed in the hope that it would be useful, but 9 * WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 11 * 12 * Further, this software is distributed without any warranty that it is 13 * free of the rightful claim of any third person regarding infringement 14 * or the like. Any license provided herein, whether implied or 15 * otherwise, applies only to this software file. Patent licenses, if 16 * any, provided herein do not apply to combinations of this program with 17 * other software, or any other product whatsoever. 18 * 19 * You should have received a copy of the GNU General Public License along 20 * with this program; if not, write the Free Software Foundation, Inc., 59 21 * Temple Place - Suite 330, Boston MA 02111-1307, USA. 22 * 23 * Contact information: Silicon Graphics, Inc., 1600 Amphitheatre Pkwy, 24 * Mountain View, CA 94043, or: 25 * 26 * 27 * aio-stress 28 * 29 * will open or create each file on the command line, and start a series 30 * of aio to it. 31 * 32 * aio is done in a rotating loop. first file1 gets 8 requests, then 33 * file2, then file3 etc. As each file finishes writing, it is switched 34 * to reads 35 * 36 * io buffers are aligned in case you want to do raw io 37 * 38 * compile with gcc -Wall -laio -lpthread -o aio-stress aio-stress.c 39 * 40 * run aio-stress -h to see the options 41 * 42 * Please mail Chris Mason (mason@suse.com) with bug reports or patches 43 */ 44#define _FILE_OFFSET_BITS 64 45#define PROG_VERSION "0.21" 46#define NEW_GETEVENTS 47 48#include <stdio.h> 49#include <errno.h> 50#include <assert.h> 51#include <stdlib.h> 52 53#include <sys/types.h> 54#include <sys/stat.h> 55#include <fcntl.h> 56#include <unistd.h> 57#include <sys/time.h> 58#include <libaio.h> 59#include <sys/ipc.h> 60#include <sys/shm.h> 61#include <sys/mman.h> 62#include <string.h> 63#include <pthread.h> 64 65#define IO_FREE 0 66#define IO_PENDING 1 67#define RUN_FOREVER -1 68 69enum { 70 WRITE, 71 READ, 72 RWRITE, 73 RREAD, 74 LAST_STAGE, 75}; 76 77#define USE_MALLOC 0 78#define USE_SHM 1 79#define USE_SHMFS 2 80 81/* 82 * various globals, these are effectively read only by the time the threads 83 * are started 84 */ 85long stages = 0; 86unsigned long page_size_mask; 87int o_direct = 0; 88int o_sync = 0; 89int latency_stats = 0; 90int completion_latency_stats = 0; 91int io_iter = 8; 92int iterations = RUN_FOREVER; 93int max_io_submit = 0; 94long rec_len = 64 * 1024; 95int depth = 64; 96int num_threads = 1; 97int num_contexts = 1; 98off_t context_offset = 2 * 1024 * 1024; 99int fsync_stages = 1; 100int use_shm = 0; 101int shm_id; 102char *unaligned_buffer = NULL; 103char *aligned_buffer = NULL; 104int padded_reclen = 0; 105int stonewall = 1; 106int verify = 0; 107char *verify_buf = NULL; 108int unlink_files = 0; 109 110struct io_unit; 111struct thread_info; 112 113/* pthread mutexes and other globals for keeping the threads in sync */ 114pthread_cond_t stage_cond = PTHREAD_COND_INITIALIZER; 115pthread_mutex_t stage_mutex = PTHREAD_MUTEX_INITIALIZER; 116int threads_ending = 0; 117int threads_starting = 0; 118struct timeval global_stage_start_time; 119struct thread_info *global_thread_info; 120 121/* 122 * latencies during io_submit are measured, these are the 123 * granularities for deviations 124 */ 125#define DEVIATIONS 6 126int deviations[DEVIATIONS] = { 100, 250, 500, 1000, 5000, 10000 }; 127struct io_latency { 128 double max; 129 double min; 130 double total_io; 131 double total_lat; 132 double deviations[DEVIATIONS]; 133}; 134 135/* container for a series of operations to a file */ 136struct io_oper { 137 /* already open file descriptor, valid for whatever operation you want */ 138 int fd; 139 140 /* starting byte of the operation */ 141 off_t start; 142 143 /* ending byte of the operation */ 144 off_t end; 145 146 /* size of the read/write buffer */ 147 int reclen; 148 149 /* max number of pending requests before a wait is triggered */ 150 int depth; 151 152 /* current number of pending requests */ 153 int num_pending; 154 155 /* last error, zero if there were none */ 156 int last_err; 157 158 /* total number of errors hit. */ 159 int num_err; 160 161 /* read,write, random, etc */ 162 int rw; 163 164 /* number of ios that will get sent to aio */ 165 int total_ios; 166 167 /* number of ios we've already sent */ 168 int started_ios; 169 170 /* last offset used in an io operation */ 171 off_t last_offset; 172 173 /* stonewalled = 1 when we got cut off before submitting all our ios */ 174 int stonewalled; 175 176 /* list management */ 177 struct io_oper *next; 178 struct io_oper *prev; 179 180 struct timeval start_time; 181 182 char *file_name; 183}; 184 185/* a single io, and all the tracking needed for it */ 186struct io_unit { 187 /* note, iocb must go first! */ 188 struct iocb iocb; 189 190 /* pointer to parent io operation struct */ 191 struct io_oper *io_oper; 192 193 /* aligned buffer */ 194 char *buf; 195 196 /* size of the aligned buffer (record size) */ 197 int buf_size; 198 199 /* state of this io unit (free, pending, done) */ 200 int busy; 201 202 /* result of last operation */ 203 long res; 204 205 struct io_unit *next; 206 207 struct timeval io_start_time; /* time of io_submit */ 208}; 209 210struct thread_info { 211 io_context_t io_ctx; 212 pthread_t tid; 213 214 /* allocated array of io_unit structs */ 215 struct io_unit *ios; 216 217 /* list of io units available for io */ 218 struct io_unit *free_ious; 219 220 /* number of io units in the ios array */ 221 int num_global_ios; 222 223 /* number of io units in flight */ 224 int num_global_pending; 225 226 /* preallocated array of iocb pointers, only used in run_active */ 227 struct iocb **iocbs; 228 229 /* preallocated array of events */ 230 struct io_event *events; 231 232 /* size of the events array */ 233 int num_global_events; 234 235 /* latency stats for io_submit */ 236 struct io_latency io_submit_latency; 237 238 /* list of operations still in progress, and of those finished */ 239 struct io_oper *active_opers; 240 struct io_oper *finished_opers; 241 242 /* number of files this thread is doing io on */ 243 int num_files; 244 245 /* how much io this thread did in the last stage */ 246 double stage_mb_trans; 247 248 /* latency completion stats i/o time from io_submit until io_getevents */ 249 struct io_latency io_completion_latency; 250}; 251 252/* 253 * return seconds between start_tv and stop_tv in double precision 254 */ 255static double time_since(struct timeval *start_tv, struct timeval *stop_tv) 256{ 257 double sec, usec; 258 double ret; 259 sec = stop_tv->tv_sec - start_tv->tv_sec; 260 usec = stop_tv->tv_usec - start_tv->tv_usec; 261 if (sec > 0 && usec < 0) { 262 sec--; 263 usec += 1000000; 264 } 265 ret = sec + usec / (double)1000000; 266 if (ret < 0) 267 ret = 0; 268 return ret; 269} 270 271/* 272 * return seconds between start_tv and now in double precision 273 */ 274static double time_since_now(struct timeval *start_tv) 275{ 276 struct timeval stop_time; 277 gettimeofday(&stop_time, NULL); 278 return time_since(start_tv, &stop_time); 279} 280 281/* 282 * Add latency info to latency struct 283 */ 284static void calc_latency(struct timeval *start_tv, struct timeval *stop_tv, 285 struct io_latency *lat) 286{ 287 double delta; 288 int i; 289 delta = time_since(start_tv, stop_tv); 290 delta = delta * 1000; 291 292 if (delta > lat->max) 293 lat->max = delta; 294 if (!lat->min || delta < lat->min) 295 lat->min = delta; 296 lat->total_io++; 297 lat->total_lat += delta; 298 for (i = 0 ; i < DEVIATIONS ; i++) { 299 if (delta < deviations[i]) { 300 lat->deviations[i]++; 301 break; 302 } 303 } 304} 305 306static void oper_list_add(struct io_oper *oper, struct io_oper **list) 307{ 308 if (!*list) { 309 *list = oper; 310 oper->prev = oper->next = oper; 311 return; 312 } 313 oper->prev = (*list)->prev; 314 oper->next = *list; 315 (*list)->prev->next = oper; 316 (*list)->prev = oper; 317 return; 318} 319 320static void oper_list_del(struct io_oper *oper, struct io_oper **list) 321{ 322 if ((*list)->next == (*list)->prev && *list == (*list)->next) { 323 *list = NULL; 324 return; 325 } 326 oper->prev->next = oper->next; 327 oper->next->prev = oper->prev; 328 if (*list == oper) 329 *list = oper->next; 330} 331 332/* worker func to check error fields in the io unit */ 333static int check_finished_io(struct io_unit *io) { 334 int i; 335 if (io->res != io->buf_size) { 336 337 struct stat s; 338 fstat(io->io_oper->fd, &s); 339 340 /* 341 * If file size is large enough for the read, then this short 342 * read is an error. 343 */ 344 if ((io->io_oper->rw == READ || io->io_oper->rw == RREAD) && 345 s.st_size > (io->iocb.u.c.offset + io->res)) { 346 347 fprintf(stderr, "io err %lu (%s) op %d, off %Lu size %d\n", 348 io->res, strerror(-io->res), io->iocb.aio_lio_opcode, 349 io->iocb.u.c.offset, io->buf_size); 350 io->io_oper->last_err = io->res; 351 io->io_oper->num_err++; 352 return -1; 353 } 354 } 355 if (verify && io->io_oper->rw == READ) { 356 if (memcmp(io->buf, verify_buf, io->io_oper->reclen)) { 357 fprintf(stderr, "verify error, file %s offset %Lu contents (offset:bad:good):\n", 358 io->io_oper->file_name, io->iocb.u.c.offset); 359 360 for (i = 0 ; i < io->io_oper->reclen ; i++) { 361 if (io->buf[i] != verify_buf[i]) { 362 fprintf(stderr, "%d:%c:%c ", i, io->buf[i], verify_buf[i]); 363 } 364 } 365 fprintf(stderr, "\n"); 366 } 367 368 } 369 return 0; 370} 371 372/* worker func to check the busy bits and get an io unit ready for use */ 373static int grab_iou(struct io_unit *io, struct io_oper *oper) { 374 if (io->busy == IO_PENDING) 375 return -1; 376 377 io->busy = IO_PENDING; 378 io->res = 0; 379 io->io_oper = oper; 380 return 0; 381} 382 383char *stage_name(int rw) { 384 switch(rw) { 385 case WRITE: 386 return "write"; 387 case READ: 388 return "read"; 389 case RWRITE: 390 return "random write"; 391 case RREAD: 392 return "random read"; 393 } 394 return "unknown"; 395} 396 397static inline double oper_mb_trans(struct io_oper *oper) { 398 return ((double)oper->started_ios * (double)oper->reclen) / 399 (double)(1024 * 1024); 400} 401 402static void print_time(struct io_oper *oper) { 403 double runtime; 404 double tput; 405 double mb; 406 407 runtime = time_since_now(&oper->start_time); 408 mb = oper_mb_trans(oper); 409 tput = mb / runtime; 410 fprintf(stderr, "%s on %s (%.2f MB/s) %.2f MB in %.2fs\n", 411 stage_name(oper->rw), oper->file_name, tput, mb, runtime); 412} 413 414static void print_lat(char *str, struct io_latency *lat) { 415 double avg = lat->total_lat / lat->total_io; 416 int i; 417 double total_counted = 0; 418 fprintf(stderr, "%s min %.2f avg %.2f max %.2f\n\t", 419 str, lat->min, avg, lat->max); 420 421 for (i = 0 ; i < DEVIATIONS ; i++) { 422 fprintf(stderr, " %.0f < %d", lat->deviations[i], deviations[i]); 423 total_counted += lat->deviations[i]; 424 } 425 if (total_counted && lat->total_io - total_counted) 426 fprintf(stderr, " < %.0f", lat->total_io - total_counted); 427 fprintf(stderr, "\n"); 428 memset(lat, 0, sizeof(*lat)); 429} 430 431static void print_latency(struct thread_info *t) 432{ 433 struct io_latency *lat = &t->io_submit_latency; 434 print_lat("latency", lat); 435} 436 437static void print_completion_latency(struct thread_info *t) 438{ 439 struct io_latency *lat = &t->io_completion_latency; 440 print_lat("completion latency", lat); 441} 442 443/* 444 * updates the fields in the io operation struct that belongs to this 445 * io unit, and make the io unit reusable again 446 */ 447void finish_io(struct thread_info *t, struct io_unit *io, long result, 448 struct timeval *tv_now) { 449 struct io_oper *oper = io->io_oper; 450 451 calc_latency(&io->io_start_time, tv_now, &t->io_completion_latency); 452 io->res = result; 453 io->busy = IO_FREE; 454 io->next = t->free_ious; 455 t->free_ious = io; 456 oper->num_pending--; 457 t->num_global_pending--; 458 check_finished_io(io); 459 if (oper->num_pending == 0 && 460 (oper->started_ios == oper->total_ios || oper->stonewalled)) 461 { 462 print_time(oper); 463 } 464} 465 466int read_some_events(struct thread_info *t) { 467 struct io_unit *event_io; 468 struct io_event *event; 469 int nr; 470 int i; 471 int min_nr = io_iter; 472 struct timeval stop_time; 473 474 if (t->num_global_pending < io_iter) 475 min_nr = t->num_global_pending; 476 477#ifdef NEW_GETEVENTS 478 nr = io_getevents(t->io_ctx, min_nr, t->num_global_events, t->events,NULL); 479#else 480 nr = io_getevents(t->io_ctx, t->num_global_events, t->events, NULL); 481#endif 482 if (nr <= 0) 483 return nr; 484 485 gettimeofday(&stop_time, NULL); 486 for (i = 0 ; i < nr ; i++) { 487 event = t->events + i; 488 event_io = (struct io_unit *)((unsigned long)event->obj); 489 finish_io(t, event_io, event->res, &stop_time); 490 } 491 return nr; 492} 493 494/* 495 * finds a free io unit, waiting for pending requests if required. returns 496 * null if none could be found 497 */ 498static struct io_unit *find_iou(struct thread_info *t, struct io_oper *oper) 499{ 500 struct io_unit *event_io; 501 int nr; 502 503retry: 504 if (t->free_ious) { 505 event_io = t->free_ious; 506 t->free_ious = t->free_ious->next; 507 if (grab_iou(event_io, oper)) { 508 fprintf(stderr, "io unit on free list but not free\n"); 509 abort(); 510 } 511 return event_io; 512 } 513 nr = read_some_events(t); 514 if (nr > 0) 515 goto retry; 516 else 517 fprintf(stderr, "no free ious after read_some_events\n"); 518 return NULL; 519} 520 521/* 522 * wait for all pending requests for this io operation to finish 523 */ 524static int io_oper_wait(struct thread_info *t, struct io_oper *oper) { 525 struct io_event event; 526 struct io_unit *event_io; 527 528 if (oper == NULL) { 529 return 0; 530 } 531 532 if (oper->num_pending == 0) 533 goto done; 534 535 /* this func is not speed sensitive, no need to go wild reading 536 * more than one event at a time 537 */ 538#ifdef NEW_GETEVENTS 539 while (io_getevents(t->io_ctx, 1, 1, &event, NULL) > 0) { 540#else 541 while (io_getevents(t->io_ctx, 1, &event, NULL) > 0) { 542#endif 543 struct timeval tv_now; 544 event_io = (struct io_unit *)((unsigned long)event.obj); 545 546 gettimeofday(&tv_now, NULL); 547 finish_io(t, event_io, event.res, &tv_now); 548 549 if (oper->num_pending == 0) 550 break; 551 } 552done: 553 if (oper->num_err) { 554 fprintf(stderr, "%u errors on oper, last %u\n", 555 oper->num_err, oper->last_err); 556 } 557 return 0; 558} 559 560off_t random_byte_offset(struct io_oper *oper) { 561 off_t num; 562 off_t rand_byte = oper->start; 563 off_t range; 564 off_t offset = 1; 565 566 range = (oper->end - oper->start) / (1024 * 1024); 567 if ((page_size_mask+1) > (1024 * 1024)) 568 offset = (page_size_mask+1) / (1024 * 1024); 569 if (range < offset) 570 range = 0; 571 else 572 range -= offset; 573 574 /* find a random mb offset */ 575 num = 1 + (int)((double)range * rand() / (RAND_MAX + 1.0 )); 576 rand_byte += num * 1024 * 1024; 577 578 /* find a random byte offset */ 579 num = 1 + (int)((double)(1024 * 1024) * rand() / (RAND_MAX + 1.0)); 580 581 /* page align */ 582 num = (num + page_size_mask) & ~page_size_mask; 583 rand_byte += num; 584 585 if (rand_byte + oper->reclen > oper->end) { 586 rand_byte -= oper->reclen; 587 } 588 return rand_byte; 589} 590 591/* 592 * build an aio iocb for an operation, based on oper->rw and the 593 * last offset used. This finds the struct io_unit that will be attached 594 * to the iocb, and things are ready for submission to aio after this 595 * is called. 596 * 597 * returns null on error 598 */ 599static struct io_unit *build_iocb(struct thread_info *t, struct io_oper *oper) 600{ 601 struct io_unit *io; 602 off_t rand_byte; 603 604 io = find_iou(t, oper); 605 if (!io) { 606 fprintf(stderr, "unable to find io unit\n"); 607 return NULL; 608 } 609 610 switch(oper->rw) { 611 case WRITE: 612 io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen, 613 oper->last_offset); 614 oper->last_offset += oper->reclen; 615 break; 616 case READ: 617 io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen, 618 oper->last_offset); 619 oper->last_offset += oper->reclen; 620 break; 621 case RREAD: 622 rand_byte = random_byte_offset(oper); 623 oper->last_offset = rand_byte; 624 io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen, 625 rand_byte); 626 break; 627 case RWRITE: 628 rand_byte = random_byte_offset(oper); 629 oper->last_offset = rand_byte; 630 io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen, 631 rand_byte); 632 633 break; 634 } 635 636 return io; 637} 638 639/* 640 * wait for any pending requests, and then free all ram associated with 641 * an operation. returns the last error the operation hit (zero means none) 642 */ 643static int 644finish_oper(struct thread_info *t, struct io_oper *oper) 645{ 646 unsigned long last_err; 647 648 io_oper_wait(t, oper); 649 last_err = oper->last_err; 650 if (oper->num_pending > 0) { 651 fprintf(stderr, "oper num_pending is %d\n", oper->num_pending); 652 } 653 close(oper->fd); 654 free(oper); 655 return last_err; 656} 657 658/* 659 * allocates an io operation and fills in all the fields. returns 660 * null on error 661 */ 662static struct io_oper * 663create_oper(int fd, int rw, off_t start, off_t end, int reclen, int depth, 664 int iter, char *file_name) 665{ 666 struct io_oper *oper; 667 668 oper = malloc (sizeof(*oper)); 669 if (!oper) { 670 fprintf(stderr, "unable to allocate io oper\n"); 671 return NULL; 672 } 673 memset(oper, 0, sizeof(*oper)); 674 675 oper->depth = depth; 676 oper->start = start; 677 oper->end = end; 678 oper->last_offset = oper->start; 679 oper->fd = fd; 680 oper->reclen = reclen; 681 oper->rw = rw; 682 oper->total_ios = (oper->end - oper->start) / oper->reclen; 683 oper->file_name = file_name; 684 685 return oper; 686} 687 688/* 689 * does setup on num_ios worth of iocbs, but does not actually 690 * start any io 691 */ 692int build_oper(struct thread_info *t, struct io_oper *oper, int num_ios, 693 struct iocb **my_iocbs) 694{ 695 int i; 696 struct io_unit *io; 697 698 if (oper->started_ios == 0) 699 gettimeofday(&oper->start_time, NULL); 700 701 if (num_ios == 0) 702 num_ios = oper->total_ios; 703 704 if ((oper->started_ios + num_ios) > oper->total_ios) 705 num_ios = oper->total_ios - oper->started_ios; 706 707 for ( i = 0 ; i < num_ios ; i++) { 708 io = build_iocb(t, oper); 709 if (!io) { 710 return -1; 711 } 712 my_iocbs[i] = &io->iocb; 713 } 714 return num_ios; 715} 716 717/* 718 * runs through the iocbs in the array provided and updates 719 * counters in the associated oper struct 720 */ 721static void update_iou_counters(struct iocb **my_iocbs, int nr, 722 struct timeval *tv_now) 723{ 724 struct io_unit *io; 725 int i; 726 for (i = 0 ; i < nr ; i++) { 727 io = (struct io_unit *)(my_iocbs[i]); 728 io->io_oper->num_pending++; 729 io->io_oper->started_ios++; 730 io->io_start_time = *tv_now; /* set time of io_submit */ 731 } 732} 733 734/* starts some io for a given file, returns zero if all went well */ 735int run_built(struct thread_info *t, int num_ios, struct iocb **my_iocbs) 736{ 737 int ret; 738 struct timeval start_time; 739 struct timeval stop_time; 740 741resubmit: 742 gettimeofday(&start_time, NULL); 743 ret = io_submit(t->io_ctx, num_ios, my_iocbs); 744 gettimeofday(&stop_time, NULL); 745 calc_latency(&start_time, &stop_time, &t->io_submit_latency); 746 747 if (ret != num_ios) { 748 /* some ios got through */ 749 if (ret > 0) { 750 update_iou_counters(my_iocbs, ret, &stop_time); 751 my_iocbs += ret; 752 t->num_global_pending += ret; 753 num_ios -= ret; 754 } 755 /* 756 * we've used all the requests allocated in aio_init, wait and 757 * retry 758 */ 759 if (ret > 0 || ret == -EAGAIN) { 760 int old_ret = ret; 761 if ((ret = read_some_events(t) > 0)) { 762 goto resubmit; 763 } else { 764 fprintf(stderr, "ret was %d and now is %d\n", ret, old_ret); 765 abort(); 766 } 767 } 768 769 fprintf(stderr, "ret %d (%s) on io_submit\n", ret, strerror(-ret)); 770 return -1; 771 } 772 update_iou_counters(my_iocbs, ret, &stop_time); 773 t->num_global_pending += ret; 774 return 0; 775} 776 777/* 778 * changes oper->rw to the next in a command sequence, or returns zero 779 * to say this operation is really, completely done for 780 */ 781static int restart_oper(struct io_oper *oper) { 782 int new_rw = 0; 783 if (oper->last_err) 784 return 0; 785 786 /* this switch falls through */ 787 switch(oper->rw) { 788 case WRITE: 789 if (stages & (1 << READ)) 790 new_rw = READ; 791 case READ: 792 if (!new_rw && stages & (1 << RWRITE)) 793 new_rw = RWRITE; 794 case RWRITE: 795 if (!new_rw && stages & (1 << RREAD)) 796 new_rw = RREAD; 797 } 798 799 if (new_rw) { 800 oper->started_ios = 0; 801 oper->last_offset = oper->start; 802 oper->stonewalled = 0; 803 804 /* 805 * we're restarting an operation with pending requests, so the 806 * timing info won't be printed by finish_io. Printing it here 807 */ 808 if (oper->num_pending) 809 print_time(oper); 810 811 oper->rw = new_rw; 812 return 1; 813 } 814 return 0; 815} 816 817static int oper_runnable(struct io_oper *oper) { 818 struct stat buf; 819 int ret; 820 821 /* first context is always runnable, if started_ios > 0, no need to 822 * redo the calculations 823 */ 824 if (oper->started_ios || oper->start == 0) 825 return 1; 826 /* 827 * only the sequential phases force delays in starting */ 828 if (oper->rw >= RWRITE) 829 return 1; 830 ret = fstat(oper->fd, &buf); 831 if (ret < 0) { 832 perror("fstat"); 833 exit(1); 834 } 835 if (S_ISREG(buf.st_mode) && buf.st_size < oper->start) 836 return 0; 837 return 1; 838} 839 840/* 841 * runs through all the io operations on the active list, and starts 842 * a chunk of io on each. If any io operations are completely finished, 843 * it either switches them to the next stage or puts them on the 844 * finished list. 845 * 846 * this function stops after max_io_submit iocbs are sent down the 847 * pipe, even if it has not yet touched all the operations on the 848 * active list. Any operations that have finished are moved onto 849 * the finished_opers list. 850 */ 851static int run_active_list(struct thread_info *t, 852 int io_iter, 853 int max_io_submit) 854{ 855 struct io_oper *oper; 856 struct io_oper *built_opers = NULL; 857 struct iocb **my_iocbs = t->iocbs; 858 int ret = 0; 859 int num_built = 0; 860 861 oper = t->active_opers; 862 while (oper) { 863 if (!oper_runnable(oper)) { 864 oper = oper->next; 865 if (oper == t->active_opers) 866 break; 867 continue; 868 } 869 ret = build_oper(t, oper, io_iter, my_iocbs); 870 if (ret >= 0) { 871 my_iocbs += ret; 872 num_built += ret; 873 oper_list_del(oper, &t->active_opers); 874 oper_list_add(oper, &built_opers); 875 oper = t->active_opers; 876 if (num_built + io_iter > max_io_submit) 877 break; 878 } else 879 break; 880 } 881 if (num_built) { 882 ret = run_built(t, num_built, t->iocbs); 883 if (ret < 0) { 884 fprintf(stderr, "error %d on run_built\n", ret); 885 exit(1); 886 } 887 while (built_opers) { 888 oper = built_opers; 889 oper_list_del(oper, &built_opers); 890 oper_list_add(oper, &t->active_opers); 891 if (oper->started_ios == oper->total_ios) { 892 oper_list_del(oper, &t->active_opers); 893 oper_list_add(oper, &t->finished_opers); 894 } 895 } 896 } 897 return 0; 898} 899 900void drop_shm() { 901 int ret; 902 struct shmid_ds ds; 903 if (use_shm != USE_SHM) 904 return; 905 906 ret = shmctl(shm_id, IPC_RMID, &ds); 907 if (ret) { 908 perror("shmctl IPC_RMID"); 909 } 910} 911 912void aio_setup(io_context_t *io_ctx, int n) 913{ 914 int res = io_queue_init(n, io_ctx); 915 if (res != 0) { 916 fprintf(stderr, "io_queue_setup(%d) returned %d (%s)\n", 917 n, res, strerror(-res)); 918 exit(3); 919 } 920} 921 922/* 923 * allocate io operation and event arrays for a given thread 924 */ 925int setup_ious(struct thread_info *t, 926 int num_files, int depth, 927 int reclen, int max_io_submit) { 928 int i; 929 size_t bytes = num_files * depth * sizeof(*t->ios); 930 931 t->ios = malloc(bytes); 932 if (!t->ios) { 933 fprintf(stderr, "unable to allocate io units\n"); 934 return -1; 935 } 936 memset(t->ios, 0, bytes); 937 938 for (i = 0 ; i < depth * num_files; i++) { 939 t->ios[i].buf = aligned_buffer; 940 aligned_buffer += padded_reclen; 941 t->ios[i].buf_size = reclen; 942 if (verify) 943 memset(t->ios[i].buf, 'b', reclen); 944 else 945 memset(t->ios[i].buf, 0, reclen); 946 t->ios[i].next = t->free_ious; 947 t->free_ious = t->ios + i; 948 } 949 if (verify) { 950 verify_buf = aligned_buffer; 951 memset(verify_buf, 'b', reclen); 952 } 953 954 t->iocbs = malloc(sizeof(struct iocb *) * max_io_submit); 955 if (!t->iocbs) { 956 fprintf(stderr, "unable to allocate iocbs\n"); 957 goto free_buffers; 958 } 959 960 memset(t->iocbs, 0, max_io_submit * sizeof(struct iocb *)); 961 962 t->events = malloc(sizeof(struct io_event) * depth * num_files); 963 if (!t->events) { 964 fprintf(stderr, "unable to allocate ram for events\n"); 965 goto free_buffers; 966 } 967 memset(t->events, 0, num_files * sizeof(struct io_event)*depth); 968 969 t->num_global_ios = num_files * depth; 970 t->num_global_events = t->num_global_ios; 971 return 0; 972 973free_buffers: 974 if (t->ios) 975 free(t->ios); 976 if (t->iocbs) 977 free(t->iocbs); 978 if (t->events) 979 free(t->events); 980 return -1; 981} 982 983/* 984 * The buffers used for file data are allocated as a single big 985 * malloc, and then each thread and operation takes a piece and uses 986 * that for file data. This lets us do a large shm or bigpages alloc 987 * and without trying to find a special place in each thread to map the 988 * buffers to 989 */ 990int setup_shared_mem(int num_threads, int num_files, int depth, 991 int reclen, int max_io_submit) 992{ 993 char *p = NULL; 994 size_t total_ram; 995 996 padded_reclen = (reclen + page_size_mask) / (page_size_mask+1); 997 padded_reclen = padded_reclen * (page_size_mask+1); 998 total_ram = num_files * depth * padded_reclen + num_threads; 999 if (verify) 1000 total_ram += padded_reclen; 1001 1002 if (use_shm == USE_MALLOC) { 1003 p = malloc(total_ram + page_size_mask); 1004 } else if (use_shm == USE_SHM) { 1005 shm_id = shmget(IPC_PRIVATE, total_ram, IPC_CREAT | 0700); 1006 if (shm_id < 0) { 1007 perror("shmget"); 1008 drop_shm(); 1009 goto free_buffers; 1010 } 1011 p = shmat(shm_id, (char *)0x50000000, 0); 1012 if ((long)p == -1) { 1013 perror("shmat"); 1014 goto free_buffers; 1015 } 1016 /* won't really be dropped until we shmdt */ 1017 drop_shm(); 1018 } else if (use_shm == USE_SHMFS) { 1019 char mmap_name[16]; /* /dev/shm/ + null + XXXXXX */ 1020 int fd; 1021 1022 strcpy(mmap_name, "/dev/shm/XXXXXX"); 1023 fd = mkstemp(mmap_name); 1024 if (fd < 0) { 1025 perror("mkstemp"); 1026 goto free_buffers; 1027 } 1028 unlink(mmap_name); 1029 ftruncate(fd, total_ram); 1030 shm_id = fd; 1031 p = mmap((char *)0x50000000, total_ram, 1032 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 1033 1034 if (p == MAP_FAILED) { 1035 perror("mmap"); 1036 goto free_buffers; 1037 } 1038 } 1039 if (!p) { 1040 fprintf(stderr, "unable to allocate buffers\n"); 1041 goto free_buffers; 1042 } 1043 unaligned_buffer = p; 1044 p = (char*)((intptr_t) (p + page_size_mask) & ~page_size_mask); 1045 aligned_buffer = p; 1046 return 0; 1047 1048free_buffers: 1049 drop_shm(); 1050 if (unaligned_buffer) 1051 free(unaligned_buffer); 1052 return -1; 1053} 1054 1055/* 1056 * runs through all the thread_info structs and calculates a combined 1057 * throughput 1058 */ 1059void global_thread_throughput(struct thread_info *t, char *this_stage) { 1060 int i; 1061 double runtime = time_since_now(&global_stage_start_time); 1062 double total_mb = 0; 1063 double min_trans = 0; 1064 1065 for (i = 0 ; i < num_threads ; i++) { 1066 total_mb += global_thread_info[i].stage_mb_trans; 1067 if (!min_trans || t->stage_mb_trans < min_trans) 1068 min_trans = t->stage_mb_trans; 1069 } 1070 if (total_mb) { 1071 fprintf(stderr, "%s throughput (%.2f MB/s) ", this_stage, 1072 total_mb / runtime); 1073 fprintf(stderr, "%.2f MB in %.2fs", total_mb, runtime); 1074 if (stonewall) 1075 fprintf(stderr, " min transfer %.2fMB", min_trans); 1076 fprintf(stderr, "\n"); 1077 } 1078} 1079 1080 1081/* this is the meat of the state machine. There is a list of 1082 * active operations structs, and as each one finishes the required 1083 * io it is moved to a list of finished operations. Once they have 1084 * all finished whatever stage they were in, they are given the chance 1085 * to restart and pick a different stage (read/write/random read etc) 1086 * 1087 * various timings are printed in between the stages, along with 1088 * thread synchronization if there are more than one threads. 1089 */ 1090int worker(struct thread_info *t) 1091{ 1092 struct io_oper *oper; 1093 char *this_stage = NULL; 1094 struct timeval stage_time; 1095 int status = 0; 1096 int iteration = 0; 1097 int cnt; 1098 1099 aio_setup(&t->io_ctx, 512); 1100 1101restart: 1102 if (num_threads > 1) { 1103 pthread_mutex_lock(&stage_mutex); 1104 threads_starting++; 1105 if (threads_starting == num_threads) { 1106 threads_ending = 0; 1107 gettimeofday(&global_stage_start_time, NULL); 1108 pthread_cond_broadcast(&stage_cond); 1109 } 1110 while (threads_starting != num_threads) 1111 pthread_cond_wait(&stage_cond, &stage_mutex); 1112 pthread_mutex_unlock(&stage_mutex); 1113 } 1114 if (t->active_opers) { 1115 this_stage = stage_name(t->active_opers->rw); 1116 gettimeofday(&stage_time, NULL); 1117 t->stage_mb_trans = 0; 1118 } 1119 1120 cnt = 0; 1121 /* first we send everything through aio */ 1122 while (t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) { 1123 if (stonewall && threads_ending) { 1124 oper = t->active_opers; 1125 oper->stonewalled = 1; 1126 oper_list_del(oper, &t->active_opers); 1127 oper_list_add(oper, &t->finished_opers); 1128 } else { 1129 run_active_list(t, io_iter, max_io_submit); 1130 } 1131 cnt++; 1132 } 1133 if (latency_stats) 1134 print_latency(t); 1135 1136 if (completion_latency_stats) 1137 print_completion_latency(t); 1138 1139 /* then we wait for all the operations to finish */ 1140 oper = t->finished_opers; 1141 do { 1142 if (!oper) 1143 break; 1144 io_oper_wait(t, oper); 1145 oper = oper->next; 1146 } while (oper != t->finished_opers); 1147 1148 /* then we do an fsync to get the timing for any future operations 1149 * right, and check to see if any of these need to get restarted 1150 */ 1151 oper = t->finished_opers; 1152 while (oper) { 1153 if (fsync_stages) 1154 fsync(oper->fd); 1155 t->stage_mb_trans += oper_mb_trans(oper); 1156 if (restart_oper(oper)) { 1157 oper_list_del(oper, &t->finished_opers); 1158 oper_list_add(oper, &t->active_opers); 1159 oper = t->finished_opers; 1160 continue; 1161 } 1162 oper = oper->next; 1163 if (oper == t->finished_opers) 1164 break; 1165 } 1166 1167 if (t->stage_mb_trans && t->num_files > 0) { 1168 double seconds = time_since_now(&stage_time); 1169 fprintf(stderr, "thread %d %s totals (%.2f MB/s) %.2f MB in %.2fs\n", 1170 t - global_thread_info, this_stage, t->stage_mb_trans/seconds, 1171 t->stage_mb_trans, seconds); 1172 } 1173 1174 if (num_threads > 1) { 1175 pthread_mutex_lock(&stage_mutex); 1176 threads_ending++; 1177 if (threads_ending == num_threads) { 1178 threads_starting = 0; 1179 pthread_cond_broadcast(&stage_cond); 1180 global_thread_throughput(t, this_stage); 1181 } 1182 while (threads_ending != num_threads) 1183 pthread_cond_wait(&stage_cond, &stage_mutex); 1184 pthread_mutex_unlock(&stage_mutex); 1185 } 1186 1187 /* someone got restarted, go back to the beginning */ 1188 if (t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) { 1189 iteration++; 1190 goto restart; 1191 } 1192 1193 /* finally, free all the ram */ 1194 while (t->finished_opers) { 1195 oper = t->finished_opers; 1196 oper_list_del(oper, &t->finished_opers); 1197 status = finish_oper(t, oper); 1198 } 1199 1200 if (t->num_global_pending) { 1201 fprintf(stderr, "global num pending is %d\n", t->num_global_pending); 1202 } 1203 io_queue_release(t->io_ctx); 1204 1205 return status; 1206} 1207 1208typedef void * (*start_routine)(void *); 1209int run_workers(struct thread_info *t, int num_threads) 1210{ 1211 int ret; 1212 int thread_ret; 1213 int i; 1214 1215 for (i = 0 ; i < num_threads ; i++) { 1216 ret = pthread_create(&t[i].tid, NULL, (start_routine)worker, t + i); 1217 if (ret) { 1218 perror("pthread_create"); 1219 exit(1); 1220 } 1221 } 1222 for (i = 0 ; i < num_threads ; i++) { 1223 ret = pthread_join(t[i].tid, (void *)&thread_ret); 1224 if (ret) { 1225 perror("pthread_join"); 1226 exit(1); 1227 } 1228 } 1229 return 0; 1230} 1231 1232off_t parse_size(char *size_arg, off_t mult) { 1233 char c; 1234 int num; 1235 off_t ret; 1236 c = size_arg[strlen(size_arg) - 1]; 1237 if (c > '9') { 1238 size_arg[strlen(size_arg) - 1] = '\0'; 1239 } 1240 num = atoi(size_arg); 1241 switch(c) { 1242 case 'g': 1243 case 'G': 1244 mult = 1024 * 1024 * 1024; 1245 break; 1246 case 'm': 1247 case 'M': 1248 mult = 1024 * 1024; 1249 break; 1250 case 'k': 1251 case 'K': 1252 mult = 1024; 1253 break; 1254 case 'b': 1255 case 'B': 1256 mult = 1; 1257 break; 1258 } 1259 ret = mult * num; 1260 return ret; 1261} 1262 1263void print_usage(void) { 1264 printf("usage: aio-stress [-s size] [-r size] [-a size] [-d num] [-b num]\n"); 1265 printf(" [-i num] [-t num] [-c num] [-C size] [-nxhOS ]\n"); 1266 printf(" file1 [file2 ...]\n"); 1267 printf("\t-a size in KB at which to align buffers\n"); 1268 printf("\t-b max number of iocbs to give io_submit at once\n"); 1269 printf("\t-c number of io contexts per file\n"); 1270 printf("\t-C offset between contexts, default 2MB\n"); 1271 printf("\t-s size in MB of the test file(s), default 1024MB\n"); 1272 printf("\t-r record size in KB used for each io, default 64KB\n"); 1273 printf("\t-d number of pending aio requests for each file, default 64\n"); 1274 printf("\t-i number of ios per file sent before switching\n\t to the next file, default 8\n"); 1275 printf("\t-I total number of ayncs IOs the program will run, default is run until Cntl-C\n"); 1276 printf("\t-O Use O_DIRECT (not available in 2.4 kernels),\n"); 1277 printf("\t-S Use O_SYNC for writes\n"); 1278 printf("\t-o add an operation to the list: write=0, read=1,\n"); 1279 printf("\t random write=2, random read=3.\n"); 1280 printf("\t repeat -o to specify multiple ops: -o 0 -o 1 etc.\n"); 1281 printf("\t-m shm use ipc shared memory for io buffers instead of malloc\n"); 1282 printf("\t-m shmfs mmap a file in /dev/shm for io buffers\n"); 1283 printf("\t-n no fsyncs between write stage and read stage\n"); 1284 printf("\t-l print io_submit latencies after each stage\n"); 1285 printf("\t-L print io completion latencies after each stage\n"); 1286 printf("\t-t number of threads to run\n"); 1287 printf("\t-u unlink files after completion\n"); 1288 printf("\t-v verification of bytes written\n"); 1289 printf("\t-x turn off thread stonewalling\n"); 1290 printf("\t-h this message\n"); 1291 printf("\n\t the size options (-a -s and -r) allow modifiers -s 400{k,m,g}\n"); 1292 printf("\t translate to 400KB, 400MB and 400GB\n"); 1293 printf("version %s\n", PROG_VERSION); 1294} 1295 1296int main(int ac, char **av) 1297{ 1298 int rwfd; 1299 int i; 1300 int j; 1301 int c; 1302 1303 off_t file_size = 1 * 1024 * 1024 * 1024; 1304 int first_stage = WRITE; 1305 struct io_oper *oper; 1306 int status = 0; 1307 int num_files = 0; 1308 int open_fds = 0; 1309 struct thread_info *t; 1310 1311 page_size_mask = getpagesize() - 1; 1312 1313 while (1) { 1314 c = getopt(ac, av, "a:b:c:C:m:s:r:d:i:I:o:t:lLnhOSxvu"); 1315 if (c < 0) 1316 break; 1317 1318 switch(c) { 1319 case 'a': 1320 page_size_mask = parse_size(optarg, 1024); 1321 page_size_mask--; 1322 break; 1323 case 'c': 1324 num_contexts = atoi(optarg); 1325 break; 1326 case 'C': 1327 context_offset = parse_size(optarg, 1024 * 1024); 1328 case 'b': 1329 max_io_submit = atoi(optarg); 1330 break; 1331 case 's': 1332 file_size = parse_size(optarg, 1024 * 1024); 1333 break; 1334 case 'd': 1335 depth = atoi(optarg); 1336 break; 1337 case 'r': 1338 rec_len = parse_size(optarg, 1024); 1339 break; 1340 case 'i': 1341 io_iter = atoi(optarg); 1342 break; 1343 case 'I': 1344 iterations = atoi(optarg); 1345 break; 1346 case 'n': 1347 fsync_stages = 0; 1348 break; 1349 case 'l': 1350 latency_stats = 1; 1351 break; 1352 case 'L': 1353 completion_latency_stats = 1; 1354 break; 1355 case 'm': 1356 if (!strcmp(optarg, "shm")) { 1357 fprintf(stderr, "using ipc shm\n"); 1358 use_shm = USE_SHM; 1359 } else if (!strcmp(optarg, "shmfs")) { 1360 fprintf(stderr, "using /dev/shm for buffers\n"); 1361 use_shm = USE_SHMFS; 1362 } 1363 break; 1364 case 'o': 1365 i = atoi(optarg); 1366 stages |= 1 << i; 1367 fprintf(stderr, "adding stage %s\n", stage_name(i)); 1368 break; 1369 case 'O': 1370 o_direct = O_DIRECT; 1371 break; 1372 case 'S': 1373 o_sync = O_SYNC; 1374 break; 1375 case 't': 1376 num_threads = atoi(optarg); 1377 break; 1378 case 'x': 1379 stonewall = 0; 1380 break; 1381 case 'u': 1382 unlink_files = 1; 1383 break; 1384 case 'v': 1385 verify = 1; 1386 break; 1387 case 'h': 1388 default: 1389 print_usage(); 1390 exit(1); 1391 } 1392 } 1393 1394 /* 1395 * make sure we don't try to submit more ios than we have allocated 1396 * memory for 1397 */ 1398 if (depth < io_iter) { 1399 io_iter = depth; 1400 fprintf(stderr, "dropping io_iter to %d\n", io_iter); 1401 } 1402 1403 if (optind >= ac) { 1404 print_usage(); 1405 exit(1); 1406 } 1407 1408 num_files = ac - optind; 1409 1410 if (num_threads > (num_files * num_contexts)) { 1411 num_threads = num_files * num_contexts; 1412 fprintf(stderr, "dropping thread count to the number of contexts %d\n", 1413 num_threads); 1414 } 1415 1416 t = malloc(num_threads * sizeof(*t)); 1417 if (!t) { 1418 perror("malloc"); 1419 exit(1); 1420 } 1421 global_thread_info = t; 1422 1423 /* by default, allow a huge number of iocbs to be sent towards 1424 * io_submit 1425 */ 1426 if (!max_io_submit) 1427 max_io_submit = num_files * io_iter * num_contexts; 1428 1429 /* 1430 * make sure we don't try to submit more ios than max_io_submit allows 1431 */ 1432 if (max_io_submit < io_iter) { 1433 io_iter = max_io_submit; 1434 fprintf(stderr, "dropping io_iter to %d\n", io_iter); 1435 } 1436 1437 if (!stages) { 1438 stages = (1 << WRITE) | (1 << READ) | (1 << RREAD) | (1 << RWRITE); 1439 } else { 1440 for (i = 0 ; i < LAST_STAGE; i++) { 1441 if (stages & (1 << i)) { 1442 first_stage = i; 1443 fprintf(stderr, "starting with %s\n", stage_name(i)); 1444 break; 1445 } 1446 } 1447 } 1448 1449 if (file_size < num_contexts * context_offset) { 1450 fprintf(stderr, "file size %Lu too small for %d contexts\n", 1451 file_size, num_contexts); 1452 exit(1); 1453 } 1454 1455 fprintf(stderr, "file size %LuMB, record size %luKB, depth %d, ios per iteration %d\n", file_size / (1024 * 1024), rec_len / 1024, depth, io_iter); 1456 fprintf(stderr, "max io_submit %d, buffer alignment set to %luKB\n", 1457 max_io_submit, (page_size_mask + 1)/1024); 1458 fprintf(stderr, "threads %d files %d contexts %d context offset %LuMB verification %s\n", 1459 num_threads, num_files, num_contexts, 1460 context_offset / (1024 * 1024), verify ? "on" : "off"); 1461 /* open all the files and do any required setup for them */ 1462 for (i = optind ; i < ac ; i++) { 1463 int thread_index; 1464 for (j = 0 ; j < num_contexts ; j++) { 1465 thread_index = open_fds % num_threads; 1466 open_fds++; 1467 1468 rwfd = open(av[i], O_CREAT | O_RDWR | o_direct | o_sync, 0600); 1469 if (rwfd == -1) { 1470 fprintf(stderr, "error while creating file %s: %s", av[i], strerror(errno)); 1471 exit(1); 1472 } 1473 1474 1475 oper = create_oper(rwfd, first_stage, j * context_offset, 1476 file_size - j * context_offset, rec_len, 1477 depth, io_iter, av[i]); 1478 if (!oper) { 1479 fprintf(stderr, "error in create_oper\n"); 1480 exit(-1); 1481 } 1482 oper_list_add(oper, &t[thread_index].active_opers); 1483 t[thread_index].num_files++; 1484 } 1485 } 1486 if (setup_shared_mem(num_threads, num_files * num_contexts, 1487 depth, rec_len, max_io_submit)) 1488 { 1489 exit(1); 1490 } 1491 for (i = 0 ; i < num_threads ; i++) { 1492 if (setup_ious(&t[i], t[i].num_files, depth, rec_len, max_io_submit)) 1493 exit(1); 1494 } 1495 if (num_threads > 1) { 1496 printf("Running multi thread version num_threads:%d\n", num_threads); 1497 run_workers(t, num_threads); 1498 } else { 1499 printf("Running single thread version \n"); 1500 status = worker(t); 1501 } 1502 if (unlink_files) { 1503 for (i = optind ; i < ac ; i++) { 1504 printf("Cleaning up file %s \n", av[i]); 1505 unlink(av[i]); 1506 } 1507 } 1508 1509 if (status) { 1510 exit(1); 1511 } 1512 return status; 1513} 1514