aio-stress.c revision 44e106d9d7f71c1b28aedef05ef90d4f20466f98
1/* 2 * Copyright (c) 2004 SuSE, Inc. All Rights Reserved. 3 * 4 * This program is free software; you can redistribute it and/or modify it 5 * under the terms of version 2 of the GNU General Public License as 6 * published by the Free Software Foundation. 7 * 8 * This program is distributed in the hope that it would be useful, but 9 * WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 11 * 12 * Further, this software is distributed without any warranty that it is 13 * free of the rightful claim of any third person regarding infringement 14 * or the like. Any license provided herein, whether implied or 15 * otherwise, applies only to this software file. Patent licenses, if 16 * any, provided herein do not apply to combinations of this program with 17 * other software, or any other product whatsoever. 18 * 19 * You should have received a copy of the GNU General Public License along 20 * with this program; if not, write the Free Software Foundation, Inc., 59 21 * Temple Place - Suite 330, Boston MA 02111-1307, USA. 22 * 23 * Contact information: Silicon Graphics, Inc., 1600 Amphitheatre Pkwy, 24 * Mountain View, CA 94043, or: 25 * 26 * 27 * aio-stress 28 * 29 * will open or create each file on the command line, and start a series 30 * of aio to it. 31 * 32 * aio is done in a rotating loop. first file1 gets 8 requests, then 33 * file2, then file3 etc. As each file finishes writing, it is switched 34 * to reads 35 * 36 * io buffers are aligned in case you want to do raw io 37 * 38 * compile with gcc -Wall -laio -lpthread -o aio-stress aio-stress.c 39 * 40 * run aio-stress -h to see the options 41 * 42 * Please mail Chris Mason (mason@suse.com) with bug reports or patches 43 */ 44#define _FILE_OFFSET_BITS 64 45#define PROG_VERSION "0.21" 46#define NEW_GETEVENTS 47 48#include <stdio.h> 49#include <errno.h> 50#include <assert.h> 51#include <stdlib.h> 52 53#include <sys/types.h> 54#include <sys/stat.h> 55#include <fcntl.h> 56#include <unistd.h> 57#include <sys/time.h> 58#include <libaio.h> 59#include <sys/ipc.h> 60#include <sys/shm.h> 61#include <sys/mman.h> 62#include <string.h> 63#include <pthread.h> 64 65#define IO_FREE 0 66#define IO_PENDING 1 67#define RUN_FOREVER -1 68 69enum { 70 WRITE, 71 READ, 72 RWRITE, 73 RREAD, 74 LAST_STAGE, 75}; 76 77#define USE_MALLOC 0 78#define USE_SHM 1 79#define USE_SHMFS 2 80 81/* 82 * various globals, these are effectively read only by the time the threads 83 * are started 84 */ 85long stages = 0; 86unsigned long page_size_mask; 87int o_direct = 0; 88int o_sync = 0; 89int latency_stats = 0; 90int completion_latency_stats = 0; 91int io_iter = 8; 92int iterations = RUN_FOREVER; 93int max_io_submit = 0; 94long rec_len = 64 * 1024; 95int depth = 64; 96int num_threads = 1; 97int num_contexts = 1; 98off_t context_offset = 2 * 1024 * 1024; 99int fsync_stages = 1; 100int use_shm = 0; 101int shm_id; 102char *unaligned_buffer = NULL; 103char *aligned_buffer = NULL; 104int padded_reclen = 0; 105int stonewall = 1; 106int verify = 0; 107char *verify_buf = NULL; 108int unlink_files = 0; 109 110struct io_unit; 111struct thread_info; 112 113/* pthread mutexes and other globals for keeping the threads in sync */ 114pthread_cond_t stage_cond = PTHREAD_COND_INITIALIZER; 115pthread_mutex_t stage_mutex = PTHREAD_MUTEX_INITIALIZER; 116int threads_ending = 0; 117int threads_starting = 0; 118struct timeval global_stage_start_time; 119struct thread_info *global_thread_info; 120 121/* 122 * latencies during io_submit are measured, these are the 123 * granularities for deviations 124 */ 125#define DEVIATIONS 6 126int deviations[DEVIATIONS] = { 100, 250, 500, 1000, 5000, 10000 }; 127struct io_latency { 128 double max; 129 double min; 130 double total_io; 131 double total_lat; 132 double deviations[DEVIATIONS]; 133}; 134 135/* container for a series of operations to a file */ 136struct io_oper { 137 /* already open file descriptor, valid for whatever operation you want */ 138 int fd; 139 140 /* starting byte of the operation */ 141 off_t start; 142 143 /* ending byte of the operation */ 144 off_t end; 145 146 /* size of the read/write buffer */ 147 int reclen; 148 149 /* max number of pending requests before a wait is triggered */ 150 int depth; 151 152 /* current number of pending requests */ 153 int num_pending; 154 155 /* last error, zero if there were none */ 156 int last_err; 157 158 /* total number of errors hit. */ 159 int num_err; 160 161 /* read,write, random, etc */ 162 int rw; 163 164 /* number of I/O that will get sent to aio */ 165 int total_ios; 166 167 /* number of I/O we've already sent */ 168 int started_ios; 169 170 /* last offset used in an io operation */ 171 off_t last_offset; 172 173 /* stonewalled = 1 when we got cut off before submitting all our I/O */ 174 int stonewalled; 175 176 /* list management */ 177 struct io_oper *next; 178 struct io_oper *prev; 179 180 struct timeval start_time; 181 182 char *file_name; 183}; 184 185/* a single io, and all the tracking needed for it */ 186struct io_unit { 187 /* note, iocb must go first! */ 188 struct iocb iocb; 189 190 /* pointer to parent io operation struct */ 191 struct io_oper *io_oper; 192 193 /* aligned buffer */ 194 char *buf; 195 196 /* size of the aligned buffer (record size) */ 197 int buf_size; 198 199 /* state of this io unit (free, pending, done) */ 200 int busy; 201 202 /* result of last operation */ 203 long res; 204 205 struct io_unit *next; 206 207 struct timeval io_start_time; /* time of io_submit */ 208}; 209 210struct thread_info { 211 io_context_t io_ctx; 212 pthread_t tid; 213 214 /* allocated array of io_unit structs */ 215 struct io_unit *ios; 216 217 /* list of io units available for io */ 218 struct io_unit *free_ious; 219 220 /* number of io units in the I/O array */ 221 int num_global_ios; 222 223 /* number of io units in flight */ 224 int num_global_pending; 225 226 /* preallocated array of iocb pointers, only used in run_active */ 227 struct iocb **iocbs; 228 229 /* preallocated array of events */ 230 struct io_event *events; 231 232 /* size of the events array */ 233 int num_global_events; 234 235 /* latency stats for io_submit */ 236 struct io_latency io_submit_latency; 237 238 /* list of operations still in progress, and of those finished */ 239 struct io_oper *active_opers; 240 struct io_oper *finished_opers; 241 242 /* number of files this thread is doing io on */ 243 int num_files; 244 245 /* how much io this thread did in the last stage */ 246 double stage_mb_trans; 247 248 /* latency completion stats i/o time from io_submit until io_getevents */ 249 struct io_latency io_completion_latency; 250}; 251 252/* 253 * return seconds between start_tv and stop_tv in double precision 254 */ 255static double time_since(struct timeval *start_tv, struct timeval *stop_tv) 256{ 257 double sec, usec; 258 double ret; 259 sec = stop_tv->tv_sec - start_tv->tv_sec; 260 usec = stop_tv->tv_usec - start_tv->tv_usec; 261 if (sec > 0 && usec < 0) { 262 sec--; 263 usec += 1000000; 264 } 265 ret = sec + usec / (double)1000000; 266 if (ret < 0) 267 ret = 0; 268 return ret; 269} 270 271/* 272 * return seconds between start_tv and now in double precision 273 */ 274static double time_since_now(struct timeval *start_tv) 275{ 276 struct timeval stop_time; 277 gettimeofday(&stop_time, NULL); 278 return time_since(start_tv, &stop_time); 279} 280 281/* 282 * Add latency info to latency struct 283 */ 284static void calc_latency(struct timeval *start_tv, struct timeval *stop_tv, 285 struct io_latency *lat) 286{ 287 double delta; 288 int i; 289 delta = time_since(start_tv, stop_tv); 290 delta = delta * 1000; 291 292 if (delta > lat->max) 293 lat->max = delta; 294 if (!lat->min || delta < lat->min) 295 lat->min = delta; 296 lat->total_io++; 297 lat->total_lat += delta; 298 for (i = 0 ; i < DEVIATIONS ; i++) { 299 if (delta < deviations[i]) { 300 lat->deviations[i]++; 301 break; 302 } 303 } 304} 305 306static void oper_list_add(struct io_oper *oper, struct io_oper **list) 307{ 308 if (!*list) { 309 *list = oper; 310 oper->prev = oper->next = oper; 311 return; 312 } 313 oper->prev = (*list)->prev; 314 oper->next = *list; 315 (*list)->prev->next = oper; 316 (*list)->prev = oper; 317 return; 318} 319 320static void oper_list_del(struct io_oper *oper, struct io_oper **list) 321{ 322 if ((*list)->next == (*list)->prev && *list == (*list)->next) { 323 *list = NULL; 324 return; 325 } 326 oper->prev->next = oper->next; 327 oper->next->prev = oper->prev; 328 if (*list == oper) 329 *list = oper->next; 330} 331 332/* worker func to check error fields in the io unit */ 333static int check_finished_io(struct io_unit *io) { 334 int i; 335 if (io->res != io->buf_size) { 336 337 struct stat s; 338 fstat(io->io_oper->fd, &s); 339 340 /* 341 * If file size is large enough for the read, then this short 342 * read is an error. 343 */ 344 if ((io->io_oper->rw == READ || io->io_oper->rw == RREAD) && 345 s.st_size > (io->iocb.u.c.offset + io->res)) { 346 347 fprintf(stderr, "io err %lu (%s) op %d, off %Lu size %d\n", 348 io->res, strerror(-io->res), io->iocb.aio_lio_opcode, 349 io->iocb.u.c.offset, io->buf_size); 350 io->io_oper->last_err = io->res; 351 io->io_oper->num_err++; 352 return -1; 353 } 354 } 355 if (verify && io->io_oper->rw == READ) { 356 if (memcmp(io->buf, verify_buf, io->io_oper->reclen)) { 357 fprintf(stderr, "verify error, file %s offset %Lu contents (offset:bad:good):\n", 358 io->io_oper->file_name, io->iocb.u.c.offset); 359 360 for (i = 0 ; i < io->io_oper->reclen ; i++) { 361 if (io->buf[i] != verify_buf[i]) { 362 fprintf(stderr, "%d:%c:%c ", i, io->buf[i], verify_buf[i]); 363 } 364 } 365 fprintf(stderr, "\n"); 366 } 367 368 } 369 return 0; 370} 371 372/* worker func to check the busy bits and get an io unit ready for use */ 373static int grab_iou(struct io_unit *io, struct io_oper *oper) { 374 if (io->busy == IO_PENDING) 375 return -1; 376 377 io->busy = IO_PENDING; 378 io->res = 0; 379 io->io_oper = oper; 380 return 0; 381} 382 383char *stage_name(int rw) { 384 switch(rw) { 385 case WRITE: 386 return "write"; 387 case READ: 388 return "read"; 389 case RWRITE: 390 return "random write"; 391 case RREAD: 392 return "random read"; 393 } 394 return "unknown"; 395} 396 397static inline double oper_mb_trans(struct io_oper *oper) { 398 return ((double)oper->started_ios * (double)oper->reclen) / 399 (double)(1024 * 1024); 400} 401 402static void print_time(struct io_oper *oper) { 403 double runtime; 404 double tput; 405 double mb; 406 407 runtime = time_since_now(&oper->start_time); 408 mb = oper_mb_trans(oper); 409 tput = mb / runtime; 410 fprintf(stderr, "%s on %s (%.2f MB/s) %.2f MB in %.2fs\n", 411 stage_name(oper->rw), oper->file_name, tput, mb, runtime); 412} 413 414static void print_lat(char *str, struct io_latency *lat) { 415 double avg = lat->total_lat / lat->total_io; 416 int i; 417 double total_counted = 0; 418 fprintf(stderr, "%s min %.2f avg %.2f max %.2f\n\t", 419 str, lat->min, avg, lat->max); 420 421 for (i = 0 ; i < DEVIATIONS ; i++) { 422 fprintf(stderr, " %.0f < %d", lat->deviations[i], deviations[i]); 423 total_counted += lat->deviations[i]; 424 } 425 if (total_counted && lat->total_io - total_counted) 426 fprintf(stderr, " < %.0f", lat->total_io - total_counted); 427 fprintf(stderr, "\n"); 428 memset(lat, 0, sizeof(*lat)); 429} 430 431static void print_latency(struct thread_info *t) 432{ 433 struct io_latency *lat = &t->io_submit_latency; 434 print_lat("latency", lat); 435} 436 437static void print_completion_latency(struct thread_info *t) 438{ 439 struct io_latency *lat = &t->io_completion_latency; 440 print_lat("completion latency", lat); 441} 442 443/* 444 * updates the fields in the io operation struct that belongs to this 445 * io unit, and make the io unit reusable again 446 */ 447void finish_io(struct thread_info *t, struct io_unit *io, long result, 448 struct timeval *tv_now) { 449 struct io_oper *oper = io->io_oper; 450 451 calc_latency(&io->io_start_time, tv_now, &t->io_completion_latency); 452 io->res = result; 453 io->busy = IO_FREE; 454 io->next = t->free_ious; 455 t->free_ious = io; 456 oper->num_pending--; 457 t->num_global_pending--; 458 check_finished_io(io); 459 if (oper->num_pending == 0 && 460 (oper->started_ios == oper->total_ios || oper->stonewalled)) 461 { 462 print_time(oper); 463 } 464} 465 466int read_some_events(struct thread_info *t) { 467 struct io_unit *event_io; 468 struct io_event *event; 469 int nr; 470 int i; 471 int min_nr = io_iter; 472 struct timeval stop_time; 473 474 if (t->num_global_pending < io_iter) 475 min_nr = t->num_global_pending; 476 477#ifdef NEW_GETEVENTS 478 nr = io_getevents(t->io_ctx, min_nr, t->num_global_events, t->events,NULL); 479#else 480 nr = io_getevents(t->io_ctx, t->num_global_events, t->events, NULL); 481#endif 482 if (nr <= 0) 483 return nr; 484 485 gettimeofday(&stop_time, NULL); 486 for (i = 0 ; i < nr ; i++) { 487 event = t->events + i; 488 event_io = (struct io_unit *)((unsigned long)event->obj); 489 finish_io(t, event_io, event->res, &stop_time); 490 } 491 return nr; 492} 493 494/* 495 * finds a free io unit, waiting for pending requests if required. returns 496 * null if none could be found 497 */ 498static struct io_unit *find_iou(struct thread_info *t, struct io_oper *oper) 499{ 500 struct io_unit *event_io; 501 int nr; 502 503retry: 504 if (t->free_ious) { 505 event_io = t->free_ious; 506 t->free_ious = t->free_ious->next; 507 if (grab_iou(event_io, oper)) { 508 fprintf(stderr, "io unit on free list but not free\n"); 509 abort(); 510 } 511 return event_io; 512 } 513 nr = read_some_events(t); 514 if (nr > 0) 515 goto retry; 516 else 517 fprintf(stderr, "no free ious after read_some_events\n"); 518 return NULL; 519} 520 521/* 522 * wait for all pending requests for this io operation to finish 523 */ 524static int io_oper_wait(struct thread_info *t, struct io_oper *oper) { 525 struct io_event event; 526 struct io_unit *event_io; 527 528 if (oper == NULL) { 529 return 0; 530 } 531 532 if (oper->num_pending == 0) 533 goto done; 534 535 /* this func is not speed sensitive, no need to go wild reading 536 * more than one event at a time 537 */ 538#ifdef NEW_GETEVENTS 539 while (io_getevents(t->io_ctx, 1, 1, &event, NULL) > 0) { 540#else 541 while (io_getevents(t->io_ctx, 1, &event, NULL) > 0) { 542#endif 543 struct timeval tv_now; 544 event_io = (struct io_unit *)((unsigned long)event.obj); 545 546 gettimeofday(&tv_now, NULL); 547 finish_io(t, event_io, event.res, &tv_now); 548 549 if (oper->num_pending == 0) 550 break; 551 } 552done: 553 if (oper->num_err) { 554 fprintf(stderr, "%u errors on oper, last %u\n", 555 oper->num_err, oper->last_err); 556 } 557 return 0; 558} 559 560off_t random_byte_offset(struct io_oper *oper) { 561 off_t num; 562 off_t rand_byte = oper->start; 563 off_t range; 564 off_t offset = 1; 565 566 range = (oper->end - oper->start) / (1024 * 1024); 567 if ((page_size_mask+1) > (1024 * 1024)) 568 offset = (page_size_mask+1) / (1024 * 1024); 569 if (range < offset) 570 range = 0; 571 else 572 range -= offset; 573 574 /* find a random mb offset */ 575 num = 1 + (int)((double)range * rand() / (RAND_MAX + 1.0 )); 576 rand_byte += num * 1024 * 1024; 577 578 /* find a random byte offset */ 579 num = 1 + (int)((double)(1024 * 1024) * rand() / (RAND_MAX + 1.0)); 580 581 /* page align */ 582 num = (num + page_size_mask) & ~page_size_mask; 583 rand_byte += num; 584 585 if (rand_byte + oper->reclen > oper->end) { 586 rand_byte -= oper->reclen; 587 } 588 return rand_byte; 589} 590 591/* 592 * build an aio iocb for an operation, based on oper->rw and the 593 * last offset used. This finds the struct io_unit that will be attached 594 * to the iocb, and things are ready for submission to aio after this 595 * is called. 596 * 597 * returns null on error 598 */ 599static struct io_unit *build_iocb(struct thread_info *t, struct io_oper *oper) 600{ 601 struct io_unit *io; 602 off_t rand_byte; 603 604 io = find_iou(t, oper); 605 if (!io) { 606 fprintf(stderr, "unable to find io unit\n"); 607 return NULL; 608 } 609 610 switch(oper->rw) { 611 case WRITE: 612 io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen, 613 oper->last_offset); 614 oper->last_offset += oper->reclen; 615 break; 616 case READ: 617 io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen, 618 oper->last_offset); 619 oper->last_offset += oper->reclen; 620 break; 621 case RREAD: 622 rand_byte = random_byte_offset(oper); 623 oper->last_offset = rand_byte; 624 io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen, 625 rand_byte); 626 break; 627 case RWRITE: 628 rand_byte = random_byte_offset(oper); 629 oper->last_offset = rand_byte; 630 io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen, 631 rand_byte); 632 633 break; 634 } 635 636 return io; 637} 638 639/* 640 * wait for any pending requests, and then free all ram associated with 641 * an operation. returns the last error the operation hit (zero means none) 642 */ 643static int 644finish_oper(struct thread_info *t, struct io_oper *oper) 645{ 646 unsigned long last_err; 647 648 io_oper_wait(t, oper); 649 last_err = oper->last_err; 650 if (oper->num_pending > 0) { 651 fprintf(stderr, "oper num_pending is %d\n", oper->num_pending); 652 } 653 close(oper->fd); 654 free(oper); 655 return last_err; 656} 657 658/* 659 * allocates an io operation and fills in all the fields. returns 660 * null on error 661 */ 662static struct io_oper * 663create_oper(int fd, int rw, off_t start, off_t end, int reclen, int depth, 664 int iter, char *file_name) 665{ 666 struct io_oper *oper; 667 668 oper = malloc (sizeof(*oper)); 669 if (!oper) { 670 fprintf(stderr, "unable to allocate io oper\n"); 671 return NULL; 672 } 673 memset(oper, 0, sizeof(*oper)); 674 675 oper->depth = depth; 676 oper->start = start; 677 oper->end = end; 678 oper->last_offset = oper->start; 679 oper->fd = fd; 680 oper->reclen = reclen; 681 oper->rw = rw; 682 oper->total_ios = (oper->end - oper->start) / oper->reclen; 683 oper->file_name = file_name; 684 685 return oper; 686} 687 688/* 689 * does setup on num_ios worth of iocbs, but does not actually 690 * start any io 691 */ 692int build_oper(struct thread_info *t, struct io_oper *oper, int num_ios, 693 struct iocb **my_iocbs) 694{ 695 int i; 696 struct io_unit *io; 697 698 if (oper->started_ios == 0) 699 gettimeofday(&oper->start_time, NULL); 700 701 if (num_ios == 0) 702 num_ios = oper->total_ios; 703 704 if ((oper->started_ios + num_ios) > oper->total_ios) 705 num_ios = oper->total_ios - oper->started_ios; 706 707 for (i = 0 ; i < num_ios ; i++) { 708 io = build_iocb(t, oper); 709 if (!io) { 710 return -1; 711 } 712 my_iocbs[i] = &io->iocb; 713 } 714 return num_ios; 715} 716 717/* 718 * runs through the iocbs in the array provided and updates 719 * counters in the associated oper struct 720 */ 721static void update_iou_counters(struct iocb **my_iocbs, int nr, 722 struct timeval *tv_now) 723{ 724 struct io_unit *io; 725 int i; 726 for (i = 0 ; i < nr ; i++) { 727 io = (struct io_unit *)(my_iocbs[i]); 728 io->io_oper->num_pending++; 729 io->io_oper->started_ios++; 730 io->io_start_time = *tv_now; /* set time of io_submit */ 731 } 732} 733 734/* starts some io for a given file, returns zero if all went well */ 735int run_built(struct thread_info *t, int num_ios, struct iocb **my_iocbs) 736{ 737 int ret; 738 struct timeval start_time; 739 struct timeval stop_time; 740 741resubmit: 742 gettimeofday(&start_time, NULL); 743 ret = io_submit(t->io_ctx, num_ios, my_iocbs); 744 gettimeofday(&stop_time, NULL); 745 calc_latency(&start_time, &stop_time, &t->io_submit_latency); 746 747 if (ret != num_ios) { 748 /* some I/O got through */ 749 if (ret > 0) { 750 update_iou_counters(my_iocbs, ret, &stop_time); 751 my_iocbs += ret; 752 t->num_global_pending += ret; 753 num_ios -= ret; 754 } 755 /* 756 * we've used all the requests allocated in aio_init, wait and 757 * retry 758 */ 759 if (ret > 0 || ret == -EAGAIN) { 760 int old_ret = ret; 761 if ((ret = read_some_events(t) > 0)) { 762 goto resubmit; 763 } else { 764 fprintf(stderr, "ret was %d and now is %d\n", ret, old_ret); 765 abort(); 766 } 767 } 768 769 fprintf(stderr, "ret %d (%s) on io_submit\n", ret, strerror(-ret)); 770 return -1; 771 } 772 update_iou_counters(my_iocbs, ret, &stop_time); 773 t->num_global_pending += ret; 774 return 0; 775} 776 777/* 778 * changes oper->rw to the next in a command sequence, or returns zero 779 * to say this operation is really, completely done for 780 */ 781static int restart_oper(struct io_oper *oper) { 782 int new_rw = 0; 783 if (oper->last_err) 784 return 0; 785 786 /* this switch falls through */ 787 switch(oper->rw) { 788 case WRITE: 789 if (stages & (1 << READ)) 790 new_rw = READ; 791 case READ: 792 if (!new_rw && stages & (1 << RWRITE)) 793 new_rw = RWRITE; 794 case RWRITE: 795 if (!new_rw && stages & (1 << RREAD)) 796 new_rw = RREAD; 797 } 798 799 if (new_rw) { 800 oper->started_ios = 0; 801 oper->last_offset = oper->start; 802 oper->stonewalled = 0; 803 804 /* 805 * we're restarting an operation with pending requests, so the 806 * timing info won't be printed by finish_io. Printing it here 807 */ 808 if (oper->num_pending) 809 print_time(oper); 810 811 oper->rw = new_rw; 812 return 1; 813 } 814 return 0; 815} 816 817static int oper_runnable(struct io_oper *oper) { 818 struct stat buf; 819 int ret; 820 821 /* first context is always runnable, if started_ios > 0, no need to 822 * redo the calculations 823 */ 824 if (oper->started_ios || oper->start == 0) 825 return 1; 826 /* 827 * only the sequential phases force delays in starting */ 828 if (oper->rw >= RWRITE) 829 return 1; 830 ret = fstat(oper->fd, &buf); 831 if (ret < 0) { 832 perror("fstat"); 833 exit(1); 834 } 835 if (S_ISREG(buf.st_mode) && buf.st_size < oper->start) 836 return 0; 837 return 1; 838} 839 840/* 841 * runs through all the io operations on the active list, and starts 842 * a chunk of io on each. If any io operations are completely finished, 843 * it either switches them to the next stage or puts them on the 844 * finished list. 845 * 846 * this function stops after max_io_submit iocbs are sent down the 847 * pipe, even if it has not yet touched all the operations on the 848 * active list. Any operations that have finished are moved onto 849 * the finished_opers list. 850 */ 851static int run_active_list(struct thread_info *t, 852 int io_iter, 853 int max_io_submit) 854{ 855 struct io_oper *oper; 856 struct io_oper *built_opers = NULL; 857 struct iocb **my_iocbs = t->iocbs; 858 int ret = 0; 859 int num_built = 0; 860 861 oper = t->active_opers; 862 while (oper) { 863 if (!oper_runnable(oper)) { 864 oper = oper->next; 865 if (oper == t->active_opers) 866 break; 867 continue; 868 } 869 ret = build_oper(t, oper, io_iter, my_iocbs); 870 if (ret >= 0) { 871 my_iocbs += ret; 872 num_built += ret; 873 oper_list_del(oper, &t->active_opers); 874 oper_list_add(oper, &built_opers); 875 oper = t->active_opers; 876 if (num_built + io_iter > max_io_submit) 877 break; 878 } else 879 break; 880 } 881 if (num_built) { 882 ret = run_built(t, num_built, t->iocbs); 883 if (ret < 0) { 884 fprintf(stderr, "error %d on run_built\n", ret); 885 exit(1); 886 } 887 while (built_opers) { 888 oper = built_opers; 889 oper_list_del(oper, &built_opers); 890 oper_list_add(oper, &t->active_opers); 891 if (oper->started_ios == oper->total_ios) { 892 oper_list_del(oper, &t->active_opers); 893 oper_list_add(oper, &t->finished_opers); 894 } 895 } 896 } 897 return 0; 898} 899 900void drop_shm() { 901 int ret; 902 struct shmid_ds ds; 903 if (use_shm != USE_SHM) 904 return; 905 906 ret = shmctl(shm_id, IPC_RMID, &ds); 907 if (ret) { 908 perror("shmctl IPC_RMID"); 909 } 910} 911 912void aio_setup(io_context_t *io_ctx, int n) 913{ 914 int res = io_queue_init(n, io_ctx); 915 if (res != 0) { 916 fprintf(stderr, "io_queue_setup(%d) returned %d (%s)\n", 917 n, res, strerror(-res)); 918 exit(3); 919 } 920} 921 922/* 923 * allocate io operation and event arrays for a given thread 924 */ 925int setup_ious(struct thread_info *t, 926 int num_files, int depth, 927 int reclen, int max_io_submit) { 928 int i; 929 size_t bytes = num_files * depth * sizeof(*t->ios); 930 931 t->ios = malloc(bytes); 932 if (!t->ios) { 933 fprintf(stderr, "unable to allocate io units\n"); 934 return -1; 935 } 936 memset(t->ios, 0, bytes); 937 938 for (i = 0 ; i < depth * num_files; i++) { 939 t->ios[i].buf = aligned_buffer; 940 aligned_buffer += padded_reclen; 941 t->ios[i].buf_size = reclen; 942 if (verify) 943 memset(t->ios[i].buf, 'b', reclen); 944 else 945 memset(t->ios[i].buf, 0, reclen); 946 t->ios[i].next = t->free_ious; 947 t->free_ious = t->ios + i; 948 } 949 if (verify) { 950 verify_buf = aligned_buffer; 951 memset(verify_buf, 'b', reclen); 952 } 953 954 t->iocbs = malloc(sizeof(struct iocb *) * max_io_submit); 955 if (!t->iocbs) { 956 fprintf(stderr, "unable to allocate iocbs\n"); 957 goto free_buffers; 958 } 959 960 memset(t->iocbs, 0, max_io_submit * sizeof(struct iocb *)); 961 962 t->events = malloc(sizeof(struct io_event) * depth * num_files); 963 if (!t->events) { 964 fprintf(stderr, "unable to allocate ram for events\n"); 965 goto free_buffers; 966 } 967 memset(t->events, 0, num_files * sizeof(struct io_event)*depth); 968 969 t->num_global_ios = num_files * depth; 970 t->num_global_events = t->num_global_ios; 971 return 0; 972 973free_buffers: 974 if (t->ios) 975 free(t->ios); 976 if (t->iocbs) 977 free(t->iocbs); 978 if (t->events) 979 free(t->events); 980 return -1; 981} 982 983/* 984 * The buffers used for file data are allocated as a single big 985 * malloc, and then each thread and operation takes a piece and uses 986 * that for file data. This lets us do a large shm or bigpages alloc 987 * and without trying to find a special place in each thread to map the 988 * buffers to 989 */ 990int setup_shared_mem(int num_threads, int num_files, int depth, 991 int reclen, int max_io_submit) 992{ 993 char *p = NULL; 994 size_t total_ram; 995 996 padded_reclen = (reclen + page_size_mask) / (page_size_mask+1); 997 padded_reclen = padded_reclen * (page_size_mask+1); 998 total_ram = num_files * depth * padded_reclen + num_threads; 999 if (verify) 1000 total_ram += padded_reclen; 1001 1002 if (use_shm == USE_MALLOC) { 1003 p = malloc(total_ram + page_size_mask); 1004 } else if (use_shm == USE_SHM) { 1005 shm_id = shmget(IPC_PRIVATE, total_ram, IPC_CREAT | 0700); 1006 if (shm_id < 0) { 1007 perror("shmget"); 1008 drop_shm(); 1009 goto free_buffers; 1010 } 1011 p = shmat(shm_id, (char *)0x50000000, 0); 1012 if ((long)p == -1) { 1013 perror("shmat"); 1014 goto free_buffers; 1015 } 1016 /* won't really be dropped until we shmdt */ 1017 drop_shm(); 1018 } else if (use_shm == USE_SHMFS) { 1019 char mmap_name[16]; /* /dev/shm/ + null + XXXXXX */ 1020 int fd; 1021 1022 strcpy(mmap_name, "/dev/shm/XXXXXX"); 1023 fd = mkstemp(mmap_name); 1024 if (fd < 0) { 1025 perror("mkstemp"); 1026 goto free_buffers; 1027 } 1028 unlink(mmap_name); 1029 ftruncate(fd, total_ram); 1030 shm_id = fd; 1031 p = mmap((char *)0x50000000, total_ram, 1032 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 1033 1034 if (p == MAP_FAILED) { 1035 perror("mmap"); 1036 goto free_buffers; 1037 } 1038 } 1039 if (!p) { 1040 fprintf(stderr, "unable to allocate buffers\n"); 1041 goto free_buffers; 1042 } 1043 unaligned_buffer = p; 1044 p = (char*)((intptr_t) (p + page_size_mask) & ~page_size_mask); 1045 aligned_buffer = p; 1046 return 0; 1047 1048free_buffers: 1049 drop_shm(); 1050 if (unaligned_buffer) 1051 free(unaligned_buffer); 1052 return -1; 1053} 1054 1055/* 1056 * runs through all the thread_info structs and calculates a combined 1057 * throughput 1058 */ 1059void global_thread_throughput(struct thread_info *t, char *this_stage) { 1060 int i; 1061 double runtime = time_since_now(&global_stage_start_time); 1062 double total_mb = 0; 1063 double min_trans = 0; 1064 1065 for (i = 0 ; i < num_threads ; i++) { 1066 total_mb += global_thread_info[i].stage_mb_trans; 1067 if (!min_trans || t->stage_mb_trans < min_trans) 1068 min_trans = t->stage_mb_trans; 1069 } 1070 if (total_mb) { 1071 fprintf(stderr, "%s throughput (%.2f MB/s) ", this_stage, 1072 total_mb / runtime); 1073 fprintf(stderr, "%.2f MB in %.2fs", total_mb, runtime); 1074 if (stonewall) 1075 fprintf(stderr, " min transfer %.2fMB", min_trans); 1076 fprintf(stderr, "\n"); 1077 } 1078} 1079 1080/* this is the meat of the state machine. There is a list of 1081 * active operations structs, and as each one finishes the required 1082 * io it is moved to a list of finished operations. Once they have 1083 * all finished whatever stage they were in, they are given the chance 1084 * to restart and pick a different stage (read/write/random read etc) 1085 * 1086 * various timings are printed in between the stages, along with 1087 * thread synchronization if there are more than one threads. 1088 */ 1089int worker(struct thread_info *t) 1090{ 1091 struct io_oper *oper; 1092 char *this_stage = NULL; 1093 struct timeval stage_time; 1094 int status = 0; 1095 int iteration = 0; 1096 int cnt; 1097 1098 aio_setup(&t->io_ctx, 512); 1099 1100restart: 1101 if (num_threads > 1) { 1102 pthread_mutex_lock(&stage_mutex); 1103 threads_starting++; 1104 if (threads_starting == num_threads) { 1105 threads_ending = 0; 1106 gettimeofday(&global_stage_start_time, NULL); 1107 pthread_cond_broadcast(&stage_cond); 1108 } 1109 while (threads_starting != num_threads) 1110 pthread_cond_wait(&stage_cond, &stage_mutex); 1111 pthread_mutex_unlock(&stage_mutex); 1112 } 1113 if (t->active_opers) { 1114 this_stage = stage_name(t->active_opers->rw); 1115 gettimeofday(&stage_time, NULL); 1116 t->stage_mb_trans = 0; 1117 } 1118 1119 cnt = 0; 1120 /* first we send everything through aio */ 1121 while (t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) { 1122 if (stonewall && threads_ending) { 1123 oper = t->active_opers; 1124 oper->stonewalled = 1; 1125 oper_list_del(oper, &t->active_opers); 1126 oper_list_add(oper, &t->finished_opers); 1127 } else { 1128 run_active_list(t, io_iter, max_io_submit); 1129 } 1130 cnt++; 1131 } 1132 if (latency_stats) 1133 print_latency(t); 1134 1135 if (completion_latency_stats) 1136 print_completion_latency(t); 1137 1138 /* then we wait for all the operations to finish */ 1139 oper = t->finished_opers; 1140 do { 1141 if (!oper) 1142 break; 1143 io_oper_wait(t, oper); 1144 oper = oper->next; 1145 } while (oper != t->finished_opers); 1146 1147 /* then we do an fsync to get the timing for any future operations 1148 * right, and check to see if any of these need to get restarted 1149 */ 1150 oper = t->finished_opers; 1151 while (oper) { 1152 if (fsync_stages) 1153 fsync(oper->fd); 1154 t->stage_mb_trans += oper_mb_trans(oper); 1155 if (restart_oper(oper)) { 1156 oper_list_del(oper, &t->finished_opers); 1157 oper_list_add(oper, &t->active_opers); 1158 oper = t->finished_opers; 1159 continue; 1160 } 1161 oper = oper->next; 1162 if (oper == t->finished_opers) 1163 break; 1164 } 1165 1166 if (t->stage_mb_trans && t->num_files > 0) { 1167 double seconds = time_since_now(&stage_time); 1168 fprintf(stderr, "thread %td %s totals (%.2f MB/s) %.2f MB in %.2fs\n", 1169 t - global_thread_info, this_stage, t->stage_mb_trans/seconds, 1170 t->stage_mb_trans, seconds); 1171 } 1172 1173 if (num_threads > 1) { 1174 pthread_mutex_lock(&stage_mutex); 1175 threads_ending++; 1176 if (threads_ending == num_threads) { 1177 threads_starting = 0; 1178 pthread_cond_broadcast(&stage_cond); 1179 global_thread_throughput(t, this_stage); 1180 } 1181 while (threads_ending != num_threads) 1182 pthread_cond_wait(&stage_cond, &stage_mutex); 1183 pthread_mutex_unlock(&stage_mutex); 1184 } 1185 1186 /* someone got restarted, go back to the beginning */ 1187 if (t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) { 1188 iteration++; 1189 goto restart; 1190 } 1191 1192 /* finally, free all the ram */ 1193 while (t->finished_opers) { 1194 oper = t->finished_opers; 1195 oper_list_del(oper, &t->finished_opers); 1196 status = finish_oper(t, oper); 1197 } 1198 1199 if (t->num_global_pending) { 1200 fprintf(stderr, "global num pending is %d\n", t->num_global_pending); 1201 } 1202 io_queue_release(t->io_ctx); 1203 1204 return status; 1205} 1206 1207typedef void * (*start_routine)(void *); 1208int run_workers(struct thread_info *t, int num_threads) 1209{ 1210 int ret; 1211 int thread_ret; 1212 int i; 1213 1214 for (i = 0 ; i < num_threads ; i++) { 1215 ret = pthread_create(&t[i].tid, NULL, (start_routine)worker, t + i); 1216 if (ret) { 1217 perror("pthread_create"); 1218 exit(1); 1219 } 1220 } 1221 for (i = 0 ; i < num_threads ; i++) { 1222 ret = pthread_join(t[i].tid, (void *)&thread_ret); 1223 if (ret) { 1224 perror("pthread_join"); 1225 exit(1); 1226 } 1227 } 1228 return 0; 1229} 1230 1231off_t parse_size(char *size_arg, off_t mult) { 1232 char c; 1233 int num; 1234 off_t ret; 1235 c = size_arg[strlen(size_arg) - 1]; 1236 if (c > '9') { 1237 size_arg[strlen(size_arg) - 1] = '\0'; 1238 } 1239 num = atoi(size_arg); 1240 switch(c) { 1241 case 'g': 1242 case 'G': 1243 mult = 1024 * 1024 * 1024; 1244 break; 1245 case 'm': 1246 case 'M': 1247 mult = 1024 * 1024; 1248 break; 1249 case 'k': 1250 case 'K': 1251 mult = 1024; 1252 break; 1253 case 'b': 1254 case 'B': 1255 mult = 1; 1256 break; 1257 } 1258 ret = mult * num; 1259 return ret; 1260} 1261 1262void print_usage(void) { 1263 printf("usage: aio-stress [-s size] [-r size] [-a size] [-d num] [-b num]\n"); 1264 printf(" [-i num] [-t num] [-c num] [-C size] [-nxhOS ]\n"); 1265 printf(" file1 [file2 ...]\n"); 1266 printf("\t-a size in KB at which to align buffers\n"); 1267 printf("\t-b max number of iocbs to give io_submit at once\n"); 1268 printf("\t-c number of io contexts per file\n"); 1269 printf("\t-C offset between contexts, default 2MB\n"); 1270 printf("\t-s size in MB of the test file(s), default 1024MB\n"); 1271 printf("\t-r record size in KB used for each io, default 64KB\n"); 1272 printf("\t-d number of pending aio requests for each file, default 64\n"); 1273 printf("\t-i number of I/O per file sent before switching\n" 1274 "\t to the next file, default 8\n"); 1275 printf("\t-I total number of ayncs I/O the program will run, " 1276 "default is run until Cntl-C\n"); 1277 printf("\t-O Use O_DIRECT (not available in 2.4 kernels),\n"); 1278 printf("\t-S Use O_SYNC for writes\n"); 1279 printf("\t-o add an operation to the list: write=0, read=1,\n"); 1280 printf("\t random write=2, random read=3.\n"); 1281 printf("\t repeat -o to specify multiple ops: -o 0 -o 1 etc.\n"); 1282 printf("\t-m shm use ipc shared memory for io buffers instead of malloc\n"); 1283 printf("\t-m shmfs mmap a file in /dev/shm for io buffers\n"); 1284 printf("\t-n no fsyncs between write stage and read stage\n"); 1285 printf("\t-l print io_submit latencies after each stage\n"); 1286 printf("\t-L print io completion latencies after each stage\n"); 1287 printf("\t-t number of threads to run\n"); 1288 printf("\t-u unlink files after completion\n"); 1289 printf("\t-v verification of bytes written\n"); 1290 printf("\t-x turn off thread stonewalling\n"); 1291 printf("\t-h this message\n"); 1292 printf("\n\t the size options (-a -s and -r) allow modifiers -s 400{k,m,g}\n"); 1293 printf("\t translate to 400KB, 400MB and 400GB\n"); 1294 printf("version %s\n", PROG_VERSION); 1295} 1296 1297int main(int ac, char **av) 1298{ 1299 int rwfd; 1300 int i; 1301 int j; 1302 int c; 1303 1304 off_t file_size = 1 * 1024 * 1024 * 1024; 1305 int first_stage = WRITE; 1306 struct io_oper *oper; 1307 int status = 0; 1308 int num_files = 0; 1309 int open_fds = 0; 1310 struct thread_info *t; 1311 1312 page_size_mask = getpagesize() - 1; 1313 1314 while (1) { 1315 c = getopt(ac, av, "a:b:c:C:m:s:r:d:i:I:o:t:lLnhOSxvu"); 1316 if (c < 0) 1317 break; 1318 1319 switch(c) { 1320 case 'a': 1321 page_size_mask = parse_size(optarg, 1024); 1322 page_size_mask--; 1323 break; 1324 case 'c': 1325 num_contexts = atoi(optarg); 1326 break; 1327 case 'C': 1328 context_offset = parse_size(optarg, 1024 * 1024); 1329 case 'b': 1330 max_io_submit = atoi(optarg); 1331 break; 1332 case 's': 1333 file_size = parse_size(optarg, 1024 * 1024); 1334 break; 1335 case 'd': 1336 depth = atoi(optarg); 1337 break; 1338 case 'r': 1339 rec_len = parse_size(optarg, 1024); 1340 break; 1341 case 'i': 1342 io_iter = atoi(optarg); 1343 break; 1344 case 'I': 1345 iterations = atoi(optarg); 1346 break; 1347 case 'n': 1348 fsync_stages = 0; 1349 break; 1350 case 'l': 1351 latency_stats = 1; 1352 break; 1353 case 'L': 1354 completion_latency_stats = 1; 1355 break; 1356 case 'm': 1357 if (!strcmp(optarg, "shm")) { 1358 fprintf(stderr, "using ipc shm\n"); 1359 use_shm = USE_SHM; 1360 } else if (!strcmp(optarg, "shmfs")) { 1361 fprintf(stderr, "using /dev/shm for buffers\n"); 1362 use_shm = USE_SHMFS; 1363 } 1364 break; 1365 case 'o': 1366 i = atoi(optarg); 1367 stages |= 1 << i; 1368 fprintf(stderr, "adding stage %s\n", stage_name(i)); 1369 break; 1370 case 'O': 1371 o_direct = O_DIRECT; 1372 break; 1373 case 'S': 1374 o_sync = O_SYNC; 1375 break; 1376 case 't': 1377 num_threads = atoi(optarg); 1378 break; 1379 case 'x': 1380 stonewall = 0; 1381 break; 1382 case 'u': 1383 unlink_files = 1; 1384 break; 1385 case 'v': 1386 verify = 1; 1387 break; 1388 case 'h': 1389 default: 1390 print_usage(); 1391 exit(1); 1392 } 1393 } 1394 1395 /* 1396 * make sure we don't try to submit more I/O than we have allocated 1397 * memory for 1398 */ 1399 if (depth < io_iter) { 1400 io_iter = depth; 1401 fprintf(stderr, "dropping io_iter to %d\n", io_iter); 1402 } 1403 1404 if (optind >= ac) { 1405 print_usage(); 1406 exit(1); 1407 } 1408 1409 num_files = ac - optind; 1410 1411 if (num_threads > (num_files * num_contexts)) { 1412 num_threads = num_files * num_contexts; 1413 fprintf(stderr, "dropping thread count to the number of contexts %d\n", 1414 num_threads); 1415 } 1416 1417 t = malloc(num_threads * sizeof(*t)); 1418 if (!t) { 1419 perror("malloc"); 1420 exit(1); 1421 } 1422 global_thread_info = t; 1423 1424 /* by default, allow a huge number of iocbs to be sent towards 1425 * io_submit 1426 */ 1427 if (!max_io_submit) 1428 max_io_submit = num_files * io_iter * num_contexts; 1429 1430 /* 1431 * make sure we don't try to submit more I/O than max_io_submit allows 1432 */ 1433 if (max_io_submit < io_iter) { 1434 io_iter = max_io_submit; 1435 fprintf(stderr, "dropping io_iter to %d\n", io_iter); 1436 } 1437 1438 if (!stages) { 1439 stages = (1 << WRITE) | (1 << READ) | (1 << RREAD) | (1 << RWRITE); 1440 } else { 1441 for (i = 0 ; i < LAST_STAGE; i++) { 1442 if (stages & (1 << i)) { 1443 first_stage = i; 1444 fprintf(stderr, "starting with %s\n", stage_name(i)); 1445 break; 1446 } 1447 } 1448 } 1449 1450 if (file_size < num_contexts * context_offset) { 1451 fprintf(stderr, "file size %ld too small for %d contexts\n", 1452 (long)file_size, num_contexts); 1453 exit(1); 1454 } 1455 1456 fprintf(stderr, "file size %ldMB, record size %ldKB, depth %d, " 1457 "I/O per iteration %d\n", 1458 (long)(file_size / (1024 * 1024)), 1459 rec_len / 1024, depth, io_iter); 1460 fprintf(stderr, "max io_submit %d, buffer alignment set to %luKB\n", 1461 max_io_submit, (page_size_mask + 1)/1024); 1462 fprintf(stderr, "threads %d files %d contexts %d context offset %ldMB " 1463 "verification %s\n", num_threads, num_files, num_contexts, 1464 (long)(context_offset / (1024 * 1024)), 1465 verify ? "on" : "off"); 1466 /* open all the files and do any required setup for them */ 1467 for (i = optind ; i < ac ; i++) { 1468 int thread_index; 1469 for (j = 0 ; j < num_contexts ; j++) { 1470 thread_index = open_fds % num_threads; 1471 open_fds++; 1472 1473 rwfd = open(av[i], O_CREAT | O_RDWR | o_direct | o_sync, 0600); 1474 if (rwfd == -1) { 1475 fprintf(stderr, "error while creating file %s: %s", av[i], strerror(errno)); 1476 exit(1); 1477 } 1478 1479 oper = create_oper(rwfd, first_stage, j * context_offset, 1480 file_size - j * context_offset, rec_len, 1481 depth, io_iter, av[i]); 1482 if (!oper) { 1483 fprintf(stderr, "error in create_oper\n"); 1484 exit(-1); 1485 } 1486 oper_list_add(oper, &t[thread_index].active_opers); 1487 t[thread_index].num_files++; 1488 } 1489 } 1490 if (setup_shared_mem(num_threads, num_files * num_contexts, 1491 depth, rec_len, max_io_submit)) 1492 { 1493 exit(1); 1494 } 1495 for (i = 0 ; i < num_threads ; i++) { 1496 if (setup_ious(&t[i], t[i].num_files, depth, rec_len, max_io_submit)) 1497 exit(1); 1498 } 1499 if (num_threads > 1) { 1500 printf("Running multi thread version num_threads:%d\n", num_threads); 1501 run_workers(t, num_threads); 1502 } else { 1503 printf("Running single thread version \n"); 1504 status = worker(t); 1505 } 1506 if (unlink_files) { 1507 for (i = optind ; i < ac ; i++) { 1508 printf("Cleaning up file %s \n", av[i]); 1509 unlink(av[i]); 1510 } 1511 } 1512 1513 if (status) { 1514 exit(1); 1515 } 1516 return status; 1517} 1518