1/* 2 * Copyright (c) 2004 SuSE, Inc. All Rights Reserved. 3 * 4 * This program is free software; you can redistribute it and/or modify it 5 * under the terms of version 2 of the GNU General Public License as 6 * published by the Free Software Foundation. 7 * 8 * This program is distributed in the hope that it would be useful, but 9 * WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 11 * 12 * Further, this software is distributed without any warranty that it is 13 * free of the rightful claim of any third person regarding infringement 14 * or the like. Any license provided herein, whether implied or 15 * otherwise, applies only to this software file. Patent licenses, if 16 * any, provided herein do not apply to combinations of this program with 17 * other software, or any other product whatsoever. 18 * 19 * You should have received a copy of the GNU General Public License along 20 * with this program; if not, write the Free Software Foundation, Inc., 59 21 * Temple Place - Suite 330, Boston MA 02111-1307, USA. 22 * 23 * Contact information: Silicon Graphics, Inc., 1600 Amphitheatre Pkwy, 24 * Mountain View, CA 94043, or: 25 * 26 * 27 * aio-stress 28 * 29 * will open or create each file on the command line, and start a series 30 * of aio to it. 31 * 32 * aio is done in a rotating loop. first file1 gets 8 requests, then 33 * file2, then file3 etc. As each file finishes writing, it is switched 34 * to reads 35 * 36 * io buffers are aligned in case you want to do raw io 37 * 38 * compile with gcc -Wall -laio -lpthread -o aio-stress aio-stress.c 39 * 40 * run aio-stress -h to see the options 41 * 42 * Please mail Chris Mason (mason@suse.com) with bug reports or patches 43 */ 44#define _FILE_OFFSET_BITS 64 45#define PROG_VERSION "0.21" 46#define NEW_GETEVENTS 47 48#include <stdio.h> 49#include <errno.h> 50#include <assert.h> 51#include <stdlib.h> 52 53#include <sys/types.h> 54#include <sys/stat.h> 55#include <fcntl.h> 56#include <unistd.h> 57#include <sys/time.h> 58#include <libaio.h> 59#include <sys/ipc.h> 60#include <sys/shm.h> 61#include <sys/mman.h> 62#include <string.h> 63#include <pthread.h> 64 65#define IO_FREE 0 66#define IO_PENDING 1 67#define RUN_FOREVER -1 68 69#ifndef O_DIRECT 70#define O_DIRECT 040000 /* direct disk access hint */ 71#endif 72 73enum { 74 WRITE, 75 READ, 76 RWRITE, 77 RREAD, 78 LAST_STAGE, 79}; 80 81#define USE_MALLOC 0 82#define USE_SHM 1 83#define USE_SHMFS 2 84 85/* 86 * various globals, these are effectively read only by the time the threads 87 * are started 88 */ 89long stages = 0; 90unsigned long page_size_mask; 91int o_direct = 0; 92int o_sync = 0; 93int latency_stats = 0; 94int completion_latency_stats = 0; 95int io_iter = 8; 96int iterations = RUN_FOREVER; 97int max_io_submit = 0; 98long rec_len = 64 * 1024; 99int depth = 64; 100int num_threads = 1; 101int num_contexts = 1; 102off_t context_offset = 2 * 1024 * 1024; 103int fsync_stages = 1; 104int use_shm = 0; 105int shm_id; 106char *unaligned_buffer = NULL; 107char *aligned_buffer = NULL; 108int padded_reclen = 0; 109int stonewall = 1; 110int verify = 0; 111char *verify_buf = NULL; 112int unlink_files = 0; 113 114struct io_unit; 115struct thread_info; 116 117/* pthread mutexes and other globals for keeping the threads in sync */ 118pthread_cond_t stage_cond = PTHREAD_COND_INITIALIZER; 119pthread_mutex_t stage_mutex = PTHREAD_MUTEX_INITIALIZER; 120int threads_ending = 0; 121int threads_starting = 0; 122struct timeval global_stage_start_time; 123struct thread_info *global_thread_info; 124 125/* 126 * latencies during io_submit are measured, these are the 127 * granularities for deviations 128 */ 129#define DEVIATIONS 6 130int deviations[DEVIATIONS] = { 100, 250, 500, 1000, 5000, 10000 }; 131struct io_latency { 132 double max; 133 double min; 134 double total_io; 135 double total_lat; 136 double deviations[DEVIATIONS]; 137}; 138 139/* container for a series of operations to a file */ 140struct io_oper { 141 /* already open file descriptor, valid for whatever operation you want */ 142 int fd; 143 144 /* starting byte of the operation */ 145 off_t start; 146 147 /* ending byte of the operation */ 148 off_t end; 149 150 /* size of the read/write buffer */ 151 int reclen; 152 153 /* max number of pending requests before a wait is triggered */ 154 int depth; 155 156 /* current number of pending requests */ 157 int num_pending; 158 159 /* last error, zero if there were none */ 160 int last_err; 161 162 /* total number of errors hit. */ 163 int num_err; 164 165 /* read,write, random, etc */ 166 int rw; 167 168 /* number of ios that will get sent to aio */ 169 int total_ios; 170 171 /* number of ios we've already sent */ 172 int started_ios; 173 174 /* last offset used in an io operation */ 175 off_t last_offset; 176 177 /* stonewalled = 1 when we got cut off before submitting all our ios */ 178 int stonewalled; 179 180 /* list management */ 181 struct io_oper *next; 182 struct io_oper *prev; 183 184 struct timeval start_time; 185 186 char *file_name; 187}; 188 189/* a single io, and all the tracking needed for it */ 190struct io_unit { 191 /* note, iocb must go first! */ 192 struct iocb iocb; 193 194 /* pointer to parent io operation struct */ 195 struct io_oper *io_oper; 196 197 /* aligned buffer */ 198 char *buf; 199 200 /* size of the aligned buffer (record size) */ 201 int buf_size; 202 203 /* state of this io unit (free, pending, done) */ 204 int busy; 205 206 /* result of last operation */ 207 long res; 208 209 struct io_unit *next; 210 211 struct timeval io_start_time; /* time of io_submit */ 212}; 213 214struct thread_info { 215 io_context_t io_ctx; 216 pthread_t tid; 217 218 /* allocated array of io_unit structs */ 219 struct io_unit *ios; 220 221 /* list of io units available for io */ 222 struct io_unit *free_ious; 223 224 /* number of io units in the ios array */ 225 int num_global_ios; 226 227 /* number of io units in flight */ 228 int num_global_pending; 229 230 /* preallocated array of iocb pointers, only used in run_active */ 231 struct iocb **iocbs; 232 233 /* preallocated array of events */ 234 struct io_event *events; 235 236 /* size of the events array */ 237 int num_global_events; 238 239 /* latency stats for io_submit */ 240 struct io_latency io_submit_latency; 241 242 /* list of operations still in progress, and of those finished */ 243 struct io_oper *active_opers; 244 struct io_oper *finished_opers; 245 246 /* number of files this thread is doing io on */ 247 int num_files; 248 249 /* how much io this thread did in the last stage */ 250 double stage_mb_trans; 251 252 /* latency completion stats i/o time from io_submit until io_getevents */ 253 struct io_latency io_completion_latency; 254}; 255 256/* 257 * return seconds between start_tv and stop_tv in double precision 258 */ 259static double time_since(struct timeval *start_tv, struct timeval *stop_tv) 260{ 261 double sec, usec; 262 double ret; 263 sec = stop_tv->tv_sec - start_tv->tv_sec; 264 usec = stop_tv->tv_usec - start_tv->tv_usec; 265 if (sec > 0 && usec < 0) { 266 sec--; 267 usec += 1000000; 268 } 269 ret = sec + usec / (double)1000000; 270 if (ret < 0) 271 ret = 0; 272 return ret; 273} 274 275/* 276 * return seconds between start_tv and now in double precision 277 */ 278static double time_since_now(struct timeval *start_tv) 279{ 280 struct timeval stop_time; 281 gettimeofday(&stop_time, NULL); 282 return time_since(start_tv, &stop_time); 283} 284 285/* 286 * Add latency info to latency struct 287 */ 288static void calc_latency(struct timeval *start_tv, struct timeval *stop_tv, 289 struct io_latency *lat) 290{ 291 double delta; 292 int i; 293 delta = time_since(start_tv, stop_tv); 294 delta = delta * 1000; 295 296 if (delta > lat->max) 297 lat->max = delta; 298 if (!lat->min || delta < lat->min) 299 lat->min = delta; 300 lat->total_io++; 301 lat->total_lat += delta; 302 for (i = 0 ; i < DEVIATIONS ; i++) { 303 if (delta < deviations[i]) { 304 lat->deviations[i]++; 305 break; 306 } 307 } 308} 309 310static void oper_list_add(struct io_oper *oper, struct io_oper **list) 311{ 312 if (!*list) { 313 *list = oper; 314 oper->prev = oper->next = oper; 315 return; 316 } 317 oper->prev = (*list)->prev; 318 oper->next = *list; 319 (*list)->prev->next = oper; 320 (*list)->prev = oper; 321 return; 322} 323 324static void oper_list_del(struct io_oper *oper, struct io_oper **list) 325{ 326 if ((*list)->next == (*list)->prev && *list == (*list)->next) { 327 *list = NULL; 328 return; 329 } 330 oper->prev->next = oper->next; 331 oper->next->prev = oper->prev; 332 if (*list == oper) 333 *list = oper->next; 334} 335 336/* worker func to check error fields in the io unit */ 337static int check_finished_io(struct io_unit *io) { 338 int i; 339 if (io->res != io->buf_size) { 340 341 struct stat s; 342 fstat(io->io_oper->fd, &s); 343 344 /* 345 * If file size is large enough for the read, then this short 346 * read is an error. 347 */ 348 if ((io->io_oper->rw == READ || io->io_oper->rw == RREAD) && 349 s.st_size > (io->iocb.u.c.offset + io->res)) { 350 351 fprintf(stderr, "io err %lu (%s) op %d, off %Lu size %d\n", 352 io->res, strerror(-io->res), io->iocb.aio_lio_opcode, 353 io->iocb.u.c.offset, io->buf_size); 354 io->io_oper->last_err = io->res; 355 io->io_oper->num_err++; 356 return -1; 357 } 358 } 359 if (verify && io->io_oper->rw == READ) { 360 if (memcmp(io->buf, verify_buf, io->io_oper->reclen)) { 361 fprintf(stderr, "verify error, file %s offset %Lu contents (offset:bad:good):\n", 362 io->io_oper->file_name, io->iocb.u.c.offset); 363 364 for (i = 0 ; i < io->io_oper->reclen ; i++) { 365 if (io->buf[i] != verify_buf[i]) { 366 fprintf(stderr, "%d:%c:%c ", i, io->buf[i], verify_buf[i]); 367 } 368 } 369 fprintf(stderr, "\n"); 370 } 371 372 } 373 return 0; 374} 375 376/* worker func to check the busy bits and get an io unit ready for use */ 377static int grab_iou(struct io_unit *io, struct io_oper *oper) { 378 if (io->busy == IO_PENDING) 379 return -1; 380 381 io->busy = IO_PENDING; 382 io->res = 0; 383 io->io_oper = oper; 384 return 0; 385} 386 387char *stage_name(int rw) { 388 switch(rw) { 389 case WRITE: 390 return "write"; 391 case READ: 392 return "read"; 393 case RWRITE: 394 return "random write"; 395 case RREAD: 396 return "random read"; 397 } 398 return "unknown"; 399} 400 401static inline double oper_mb_trans(struct io_oper *oper) { 402 return ((double)oper->started_ios * (double)oper->reclen) / 403 (double)(1024 * 1024); 404} 405 406static void print_time(struct io_oper *oper) { 407 double runtime; 408 double tput; 409 double mb; 410 411 runtime = time_since_now(&oper->start_time); 412 mb = oper_mb_trans(oper); 413 tput = mb / runtime; 414 fprintf(stderr, "%s on %s (%.2f MB/s) %.2f MB in %.2fs\n", 415 stage_name(oper->rw), oper->file_name, tput, mb, runtime); 416} 417 418static void print_lat(char *str, struct io_latency *lat) { 419 double avg = lat->total_lat / lat->total_io; 420 int i; 421 double total_counted = 0; 422 fprintf(stderr, "%s min %.2f avg %.2f max %.2f\n\t", 423 str, lat->min, avg, lat->max); 424 425 for (i = 0 ; i < DEVIATIONS ; i++) { 426 fprintf(stderr, " %.0f < %d", lat->deviations[i], deviations[i]); 427 total_counted += lat->deviations[i]; 428 } 429 if (total_counted && lat->total_io - total_counted) 430 fprintf(stderr, " < %.0f", lat->total_io - total_counted); 431 fprintf(stderr, "\n"); 432 memset(lat, 0, sizeof(*lat)); 433} 434 435static void print_latency(struct thread_info *t) 436{ 437 struct io_latency *lat = &t->io_submit_latency; 438 print_lat("latency", lat); 439} 440 441static void print_completion_latency(struct thread_info *t) 442{ 443 struct io_latency *lat = &t->io_completion_latency; 444 print_lat("completion latency", lat); 445} 446 447/* 448 * updates the fields in the io operation struct that belongs to this 449 * io unit, and make the io unit reusable again 450 */ 451void finish_io(struct thread_info *t, struct io_unit *io, long result, 452 struct timeval *tv_now) { 453 struct io_oper *oper = io->io_oper; 454 455 calc_latency(&io->io_start_time, tv_now, &t->io_completion_latency); 456 io->res = result; 457 io->busy = IO_FREE; 458 io->next = t->free_ious; 459 t->free_ious = io; 460 oper->num_pending--; 461 t->num_global_pending--; 462 check_finished_io(io); 463 if (oper->num_pending == 0 && 464 (oper->started_ios == oper->total_ios || oper->stonewalled)) 465 { 466 print_time(oper); 467 } 468} 469 470int read_some_events(struct thread_info *t) { 471 struct io_unit *event_io; 472 struct io_event *event; 473 int nr; 474 int i; 475 int min_nr = io_iter; 476 struct timeval stop_time; 477 478 if (t->num_global_pending < io_iter) 479 min_nr = t->num_global_pending; 480 481#ifdef NEW_GETEVENTS 482 nr = io_getevents(t->io_ctx, min_nr, t->num_global_events, t->events,NULL); 483#else 484 nr = io_getevents(t->io_ctx, t->num_global_events, t->events, NULL); 485#endif 486 if (nr <= 0) 487 return nr; 488 489 gettimeofday(&stop_time, NULL); 490 for (i = 0 ; i < nr ; i++) { 491 event = t->events + i; 492 event_io = (struct io_unit *)((unsigned long)event->obj); 493 finish_io(t, event_io, event->res, &stop_time); 494 } 495 return nr; 496} 497 498/* 499 * finds a free io unit, waiting for pending requests if required. returns 500 * null if none could be found 501 */ 502static struct io_unit *find_iou(struct thread_info *t, struct io_oper *oper) 503{ 504 struct io_unit *event_io; 505 int nr; 506 507retry: 508 if (t->free_ious) { 509 event_io = t->free_ious; 510 t->free_ious = t->free_ious->next; 511 if (grab_iou(event_io, oper)) { 512 fprintf(stderr, "io unit on free list but not free\n"); 513 abort(); 514 } 515 return event_io; 516 } 517 nr = read_some_events(t); 518 if (nr > 0) 519 goto retry; 520 else 521 fprintf(stderr, "no free ious after read_some_events\n"); 522 return NULL; 523} 524 525/* 526 * wait for all pending requests for this io operation to finish 527 */ 528static int io_oper_wait(struct thread_info *t, struct io_oper *oper) { 529 struct io_event event; 530 struct io_unit *event_io; 531 532 if (oper == NULL) { 533 return 0; 534 } 535 536 if (oper->num_pending == 0) 537 goto done; 538 539 /* this func is not speed sensitive, no need to go wild reading 540 * more than one event at a time 541 */ 542#ifdef NEW_GETEVENTS 543 while(io_getevents(t->io_ctx, 1, 1, &event, NULL) > 0) { 544#else 545 while(io_getevents(t->io_ctx, 1, &event, NULL) > 0) { 546#endif 547 struct timeval tv_now; 548 event_io = (struct io_unit *)((unsigned long)event.obj); 549 550 gettimeofday(&tv_now, NULL); 551 finish_io(t, event_io, event.res, &tv_now); 552 553 if (oper->num_pending == 0) 554 break; 555 } 556done: 557 if (oper->num_err) { 558 fprintf(stderr, "%u errors on oper, last %u\n", 559 oper->num_err, oper->last_err); 560 } 561 return 0; 562} 563 564off_t random_byte_offset(struct io_oper *oper) { 565 off_t num; 566 off_t rand_byte = oper->start; 567 off_t range; 568 off_t offset = 1; 569 570 range = (oper->end - oper->start) / (1024 * 1024); 571 if ((page_size_mask+1) > (1024 * 1024)) 572 offset = (page_size_mask+1) / (1024 * 1024); 573 if (range < offset) 574 range = 0; 575 else 576 range -= offset; 577 578 /* find a random mb offset */ 579 num = 1 + (int)((double)range * rand() / (RAND_MAX + 1.0 )); 580 rand_byte += num * 1024 * 1024; 581 582 /* find a random byte offset */ 583 num = 1 + (int)((double)(1024 * 1024) * rand() / (RAND_MAX + 1.0)); 584 585 /* page align */ 586 num = (num + page_size_mask) & ~page_size_mask; 587 rand_byte += num; 588 589 if (rand_byte + oper->reclen > oper->end) { 590 rand_byte -= oper->reclen; 591 } 592 return rand_byte; 593} 594 595/* 596 * build an aio iocb for an operation, based on oper->rw and the 597 * last offset used. This finds the struct io_unit that will be attached 598 * to the iocb, and things are ready for submission to aio after this 599 * is called. 600 * 601 * returns null on error 602 */ 603static struct io_unit *build_iocb(struct thread_info *t, struct io_oper *oper) 604{ 605 struct io_unit *io; 606 off_t rand_byte; 607 608 io = find_iou(t, oper); 609 if (!io) { 610 fprintf(stderr, "unable to find io unit\n"); 611 return NULL; 612 } 613 614 switch(oper->rw) { 615 case WRITE: 616 io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen, 617 oper->last_offset); 618 oper->last_offset += oper->reclen; 619 break; 620 case READ: 621 io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen, 622 oper->last_offset); 623 oper->last_offset += oper->reclen; 624 break; 625 case RREAD: 626 rand_byte = random_byte_offset(oper); 627 oper->last_offset = rand_byte; 628 io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen, 629 rand_byte); 630 break; 631 case RWRITE: 632 rand_byte = random_byte_offset(oper); 633 oper->last_offset = rand_byte; 634 io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen, 635 rand_byte); 636 637 break; 638 } 639 640 return io; 641} 642 643/* 644 * wait for any pending requests, and then free all ram associated with 645 * an operation. returns the last error the operation hit (zero means none) 646 */ 647static int 648finish_oper(struct thread_info *t, struct io_oper *oper) 649{ 650 unsigned long last_err; 651 652 io_oper_wait(t, oper); 653 last_err = oper->last_err; 654 if (oper->num_pending > 0) { 655 fprintf(stderr, "oper num_pending is %d\n", oper->num_pending); 656 } 657 close(oper->fd); 658 free(oper); 659 return last_err; 660} 661 662/* 663 * allocates an io operation and fills in all the fields. returns 664 * null on error 665 */ 666static struct io_oper * 667create_oper(int fd, int rw, off_t start, off_t end, int reclen, int depth, 668 int iter, char *file_name) 669{ 670 struct io_oper *oper; 671 672 oper = malloc (sizeof(*oper)); 673 if (!oper) { 674 fprintf(stderr, "unable to allocate io oper\n"); 675 return NULL; 676 } 677 memset(oper, 0, sizeof(*oper)); 678 679 oper->depth = depth; 680 oper->start = start; 681 oper->end = end; 682 oper->last_offset = oper->start; 683 oper->fd = fd; 684 oper->reclen = reclen; 685 oper->rw = rw; 686 oper->total_ios = (oper->end - oper->start) / oper->reclen; 687 oper->file_name = file_name; 688 689 return oper; 690} 691 692/* 693 * does setup on num_ios worth of iocbs, but does not actually 694 * start any io 695 */ 696int build_oper(struct thread_info *t, struct io_oper *oper, int num_ios, 697 struct iocb **my_iocbs) 698{ 699 int i; 700 struct io_unit *io; 701 702 if (oper->started_ios == 0) 703 gettimeofday(&oper->start_time, NULL); 704 705 if (num_ios == 0) 706 num_ios = oper->total_ios; 707 708 if ((oper->started_ios + num_ios) > oper->total_ios) 709 num_ios = oper->total_ios - oper->started_ios; 710 711 for( i = 0 ; i < num_ios ; i++) { 712 io = build_iocb(t, oper); 713 if (!io) { 714 return -1; 715 } 716 my_iocbs[i] = &io->iocb; 717 } 718 return num_ios; 719} 720 721/* 722 * runs through the iocbs in the array provided and updates 723 * counters in the associated oper struct 724 */ 725static void update_iou_counters(struct iocb **my_iocbs, int nr, 726 struct timeval *tv_now) 727{ 728 struct io_unit *io; 729 int i; 730 for (i = 0 ; i < nr ; i++) { 731 io = (struct io_unit *)(my_iocbs[i]); 732 io->io_oper->num_pending++; 733 io->io_oper->started_ios++; 734 io->io_start_time = *tv_now; /* set time of io_submit */ 735 } 736} 737 738/* starts some io for a given file, returns zero if all went well */ 739int run_built(struct thread_info *t, int num_ios, struct iocb **my_iocbs) 740{ 741 int ret; 742 struct timeval start_time; 743 struct timeval stop_time; 744 745resubmit: 746 gettimeofday(&start_time, NULL); 747 ret = io_submit(t->io_ctx, num_ios, my_iocbs); 748 gettimeofday(&stop_time, NULL); 749 calc_latency(&start_time, &stop_time, &t->io_submit_latency); 750 751 if (ret != num_ios) { 752 /* some ios got through */ 753 if (ret > 0) { 754 update_iou_counters(my_iocbs, ret, &stop_time); 755 my_iocbs += ret; 756 t->num_global_pending += ret; 757 num_ios -= ret; 758 } 759 /* 760 * we've used all the requests allocated in aio_init, wait and 761 * retry 762 */ 763 if (ret > 0 || ret == -EAGAIN) { 764 int old_ret = ret; 765 if ((ret = read_some_events(t) > 0)) { 766 goto resubmit; 767 } else { 768 fprintf(stderr, "ret was %d and now is %d\n", ret, old_ret); 769 abort(); 770 } 771 } 772 773 fprintf(stderr, "ret %d (%s) on io_submit\n", ret, strerror(-ret)); 774 return -1; 775 } 776 update_iou_counters(my_iocbs, ret, &stop_time); 777 t->num_global_pending += ret; 778 return 0; 779} 780 781/* 782 * changes oper->rw to the next in a command sequence, or returns zero 783 * to say this operation is really, completely done for 784 */ 785static int restart_oper(struct io_oper *oper) { 786 int new_rw = 0; 787 if (oper->last_err) 788 return 0; 789 790 /* this switch falls through */ 791 switch(oper->rw) { 792 case WRITE: 793 if (stages & (1 << READ)) 794 new_rw = READ; 795 case READ: 796 if (!new_rw && stages & (1 << RWRITE)) 797 new_rw = RWRITE; 798 case RWRITE: 799 if (!new_rw && stages & (1 << RREAD)) 800 new_rw = RREAD; 801 } 802 803 if (new_rw) { 804 oper->started_ios = 0; 805 oper->last_offset = oper->start; 806 oper->stonewalled = 0; 807 808 /* 809 * we're restarting an operation with pending requests, so the 810 * timing info won't be printed by finish_io. Printing it here 811 */ 812 if (oper->num_pending) 813 print_time(oper); 814 815 oper->rw = new_rw; 816 return 1; 817 } 818 return 0; 819} 820 821static int oper_runnable(struct io_oper *oper) { 822 struct stat buf; 823 int ret; 824 825 /* first context is always runnable, if started_ios > 0, no need to 826 * redo the calculations 827 */ 828 if (oper->started_ios || oper->start == 0) 829 return 1; 830 /* 831 * only the sequential phases force delays in starting */ 832 if (oper->rw >= RWRITE) 833 return 1; 834 ret = fstat(oper->fd, &buf); 835 if (ret < 0) { 836 perror("fstat"); 837 exit(1); 838 } 839 if (S_ISREG(buf.st_mode) && buf.st_size < oper->start) 840 return 0; 841 return 1; 842} 843 844/* 845 * runs through all the io operations on the active list, and starts 846 * a chunk of io on each. If any io operations are completely finished, 847 * it either switches them to the next stage or puts them on the 848 * finished list. 849 * 850 * this function stops after max_io_submit iocbs are sent down the 851 * pipe, even if it has not yet touched all the operations on the 852 * active list. Any operations that have finished are moved onto 853 * the finished_opers list. 854 */ 855static int run_active_list(struct thread_info *t, 856 int io_iter, 857 int max_io_submit) 858{ 859 struct io_oper *oper; 860 struct io_oper *built_opers = NULL; 861 struct iocb **my_iocbs = t->iocbs; 862 int ret = 0; 863 int num_built = 0; 864 865 oper = t->active_opers; 866 while(oper) { 867 if (!oper_runnable(oper)) { 868 oper = oper->next; 869 if (oper == t->active_opers) 870 break; 871 continue; 872 } 873 ret = build_oper(t, oper, io_iter, my_iocbs); 874 if (ret >= 0) { 875 my_iocbs += ret; 876 num_built += ret; 877 oper_list_del(oper, &t->active_opers); 878 oper_list_add(oper, &built_opers); 879 oper = t->active_opers; 880 if (num_built + io_iter > max_io_submit) 881 break; 882 } else 883 break; 884 } 885 if (num_built) { 886 ret = run_built(t, num_built, t->iocbs); 887 if (ret < 0) { 888 fprintf(stderr, "error %d on run_built\n", ret); 889 exit(1); 890 } 891 while(built_opers) { 892 oper = built_opers; 893 oper_list_del(oper, &built_opers); 894 oper_list_add(oper, &t->active_opers); 895 if (oper->started_ios == oper->total_ios) { 896 oper_list_del(oper, &t->active_opers); 897 oper_list_add(oper, &t->finished_opers); 898 } 899 } 900 } 901 return 0; 902} 903 904void drop_shm() { 905 int ret; 906 struct shmid_ds ds; 907 if (use_shm != USE_SHM) 908 return; 909 910 ret = shmctl(shm_id, IPC_RMID, &ds); 911 if (ret) { 912 perror("shmctl IPC_RMID"); 913 } 914} 915 916void aio_setup(io_context_t *io_ctx, int n) 917{ 918 int res = io_queue_init(n, io_ctx); 919 if (res != 0) { 920 fprintf(stderr, "io_queue_setup(%d) returned %d (%s)\n", 921 n, res, strerror(-res)); 922 exit(3); 923 } 924} 925 926/* 927 * allocate io operation and event arrays for a given thread 928 */ 929int setup_ious(struct thread_info *t, 930 int num_files, int depth, 931 int reclen, int max_io_submit) { 932 int i; 933 size_t bytes = num_files * depth * sizeof(*t->ios); 934 935 t->ios = malloc(bytes); 936 if (!t->ios) { 937 fprintf(stderr, "unable to allocate io units\n"); 938 return -1; 939 } 940 memset(t->ios, 0, bytes); 941 942 for (i = 0 ; i < depth * num_files; i++) { 943 t->ios[i].buf = aligned_buffer; 944 aligned_buffer += padded_reclen; 945 t->ios[i].buf_size = reclen; 946 if (verify) 947 memset(t->ios[i].buf, 'b', reclen); 948 else 949 memset(t->ios[i].buf, 0, reclen); 950 t->ios[i].next = t->free_ious; 951 t->free_ious = t->ios + i; 952 } 953 if (verify) { 954 verify_buf = aligned_buffer; 955 memset(verify_buf, 'b', reclen); 956 } 957 958 t->iocbs = malloc(sizeof(struct iocb *) * max_io_submit); 959 if (!t->iocbs) { 960 fprintf(stderr, "unable to allocate iocbs\n"); 961 goto free_buffers; 962 } 963 964 memset(t->iocbs, 0, max_io_submit * sizeof(struct iocb *)); 965 966 t->events = malloc(sizeof(struct io_event) * depth * num_files); 967 if (!t->events) { 968 fprintf(stderr, "unable to allocate ram for events\n"); 969 goto free_buffers; 970 } 971 memset(t->events, 0, num_files * sizeof(struct io_event)*depth); 972 973 t->num_global_ios = num_files * depth; 974 t->num_global_events = t->num_global_ios; 975 return 0; 976 977free_buffers: 978 if (t->ios) 979 free(t->ios); 980 if (t->iocbs) 981 free(t->iocbs); 982 if (t->events) 983 free(t->events); 984 return -1; 985} 986 987/* 988 * The buffers used for file data are allocated as a single big 989 * malloc, and then each thread and operation takes a piece and uses 990 * that for file data. This lets us do a large shm or bigpages alloc 991 * and without trying to find a special place in each thread to map the 992 * buffers to 993 */ 994int setup_shared_mem(int num_threads, int num_files, int depth, 995 int reclen, int max_io_submit) 996{ 997 char *p = NULL; 998 size_t total_ram; 999 1000 padded_reclen = (reclen + page_size_mask) / (page_size_mask+1); 1001 padded_reclen = padded_reclen * (page_size_mask+1); 1002 total_ram = num_files * depth * padded_reclen + num_threads; 1003 if (verify) 1004 total_ram += padded_reclen; 1005 1006 if (use_shm == USE_MALLOC) { 1007 p = malloc(total_ram + page_size_mask); 1008 } else if (use_shm == USE_SHM) { 1009 shm_id = shmget(IPC_PRIVATE, total_ram, IPC_CREAT | 0700); 1010 if (shm_id < 0) { 1011 perror("shmget"); 1012 drop_shm(); 1013 goto free_buffers; 1014 } 1015 p = shmat(shm_id, (char *)0x50000000, 0); 1016 if ((long)p == -1) { 1017 perror("shmat"); 1018 goto free_buffers; 1019 } 1020 /* won't really be dropped until we shmdt */ 1021 drop_shm(); 1022 } else if (use_shm == USE_SHMFS) { 1023 char mmap_name[16]; /* /dev/shm/ + null + XXXXXX */ 1024 int fd; 1025 1026 strcpy(mmap_name, "/dev/shm/XXXXXX"); 1027 fd = mkstemp(mmap_name); 1028 if (fd < 0) { 1029 perror("mkstemp"); 1030 goto free_buffers; 1031 } 1032 unlink(mmap_name); 1033 ftruncate(fd, total_ram); 1034 shm_id = fd; 1035 p = mmap((char *)0x50000000, total_ram, 1036 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 1037 1038 if (p == MAP_FAILED) { 1039 perror("mmap"); 1040 goto free_buffers; 1041 } 1042 } 1043 if (!p) { 1044 fprintf(stderr, "unable to allocate buffers\n"); 1045 goto free_buffers; 1046 } 1047 unaligned_buffer = p; 1048 p = (char*)((intptr_t) (p + page_size_mask) & ~page_size_mask); 1049 aligned_buffer = p; 1050 return 0; 1051 1052free_buffers: 1053 drop_shm(); 1054 if (unaligned_buffer) 1055 free(unaligned_buffer); 1056 return -1; 1057} 1058 1059/* 1060 * runs through all the thread_info structs and calculates a combined 1061 * throughput 1062 */ 1063void global_thread_throughput(struct thread_info *t, char *this_stage) { 1064 int i; 1065 double runtime = time_since_now(&global_stage_start_time); 1066 double total_mb = 0; 1067 double min_trans = 0; 1068 1069 for (i = 0 ; i < num_threads ; i++) { 1070 total_mb += global_thread_info[i].stage_mb_trans; 1071 if (!min_trans || t->stage_mb_trans < min_trans) 1072 min_trans = t->stage_mb_trans; 1073 } 1074 if (total_mb) { 1075 fprintf(stderr, "%s throughput (%.2f MB/s) ", this_stage, 1076 total_mb / runtime); 1077 fprintf(stderr, "%.2f MB in %.2fs", total_mb, runtime); 1078 if (stonewall) 1079 fprintf(stderr, " min transfer %.2fMB", min_trans); 1080 fprintf(stderr, "\n"); 1081 } 1082} 1083 1084 1085/* this is the meat of the state machine. There is a list of 1086 * active operations structs, and as each one finishes the required 1087 * io it is moved to a list of finished operations. Once they have 1088 * all finished whatever stage they were in, they are given the chance 1089 * to restart and pick a different stage (read/write/random read etc) 1090 * 1091 * various timings are printed in between the stages, along with 1092 * thread synchronization if there are more than one threads. 1093 */ 1094int worker(struct thread_info *t) 1095{ 1096 struct io_oper *oper; 1097 char *this_stage = NULL; 1098 struct timeval stage_time; 1099 int status = 0; 1100 int iteration = 0; 1101 int cnt; 1102 1103 aio_setup(&t->io_ctx, 512); 1104 1105restart: 1106 if (num_threads > 1) { 1107 pthread_mutex_lock(&stage_mutex); 1108 threads_starting++; 1109 if (threads_starting == num_threads) { 1110 threads_ending = 0; 1111 gettimeofday(&global_stage_start_time, NULL); 1112 pthread_cond_broadcast(&stage_cond); 1113 } 1114 while (threads_starting != num_threads) 1115 pthread_cond_wait(&stage_cond, &stage_mutex); 1116 pthread_mutex_unlock(&stage_mutex); 1117 } 1118 if (t->active_opers) { 1119 this_stage = stage_name(t->active_opers->rw); 1120 gettimeofday(&stage_time, NULL); 1121 t->stage_mb_trans = 0; 1122 } 1123 1124 cnt = 0; 1125 /* first we send everything through aio */ 1126 while(t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) { 1127 if (stonewall && threads_ending) { 1128 oper = t->active_opers; 1129 oper->stonewalled = 1; 1130 oper_list_del(oper, &t->active_opers); 1131 oper_list_add(oper, &t->finished_opers); 1132 } else { 1133 run_active_list(t, io_iter, max_io_submit); 1134 } 1135 cnt++; 1136 } 1137 if (latency_stats) 1138 print_latency(t); 1139 1140 if (completion_latency_stats) 1141 print_completion_latency(t); 1142 1143 /* then we wait for all the operations to finish */ 1144 oper = t->finished_opers; 1145 do { 1146 if (!oper) 1147 break; 1148 io_oper_wait(t, oper); 1149 oper = oper->next; 1150 } while(oper != t->finished_opers); 1151 1152 /* then we do an fsync to get the timing for any future operations 1153 * right, and check to see if any of these need to get restarted 1154 */ 1155 oper = t->finished_opers; 1156 while(oper) { 1157 if (fsync_stages) 1158 fsync(oper->fd); 1159 t->stage_mb_trans += oper_mb_trans(oper); 1160 if (restart_oper(oper)) { 1161 oper_list_del(oper, &t->finished_opers); 1162 oper_list_add(oper, &t->active_opers); 1163 oper = t->finished_opers; 1164 continue; 1165 } 1166 oper = oper->next; 1167 if (oper == t->finished_opers) 1168 break; 1169 } 1170 1171 if (t->stage_mb_trans && t->num_files > 0) { 1172 double seconds = time_since_now(&stage_time); 1173 fprintf(stderr, "thread %d %s totals (%.2f MB/s) %.2f MB in %.2fs\n", 1174 t - global_thread_info, this_stage, t->stage_mb_trans/seconds, 1175 t->stage_mb_trans, seconds); 1176 } 1177 1178 if (num_threads > 1) { 1179 pthread_mutex_lock(&stage_mutex); 1180 threads_ending++; 1181 if (threads_ending == num_threads) { 1182 threads_starting = 0; 1183 pthread_cond_broadcast(&stage_cond); 1184 global_thread_throughput(t, this_stage); 1185 } 1186 while(threads_ending != num_threads) 1187 pthread_cond_wait(&stage_cond, &stage_mutex); 1188 pthread_mutex_unlock(&stage_mutex); 1189 } 1190 1191 /* someone got restarted, go back to the beginning */ 1192 if (t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) { 1193 iteration++; 1194 goto restart; 1195 } 1196 1197 /* finally, free all the ram */ 1198 while(t->finished_opers) { 1199 oper = t->finished_opers; 1200 oper_list_del(oper, &t->finished_opers); 1201 status = finish_oper(t, oper); 1202 } 1203 1204 if (t->num_global_pending) { 1205 fprintf(stderr, "global num pending is %d\n", t->num_global_pending); 1206 } 1207 io_queue_release(t->io_ctx); 1208 1209 return status; 1210} 1211 1212typedef void * (*start_routine)(void *); 1213int run_workers(struct thread_info *t, int num_threads) 1214{ 1215 int ret; 1216 int thread_ret; 1217 int i; 1218 1219 for(i = 0 ; i < num_threads ; i++) { 1220 ret = pthread_create(&t[i].tid, NULL, (start_routine)worker, t + i); 1221 if (ret) { 1222 perror("pthread_create"); 1223 exit(1); 1224 } 1225 } 1226 for(i = 0 ; i < num_threads ; i++) { 1227 ret = pthread_join(t[i].tid, (void *)&thread_ret); 1228 if (ret) { 1229 perror("pthread_join"); 1230 exit(1); 1231 } 1232 } 1233 return 0; 1234} 1235 1236off_t parse_size(char *size_arg, off_t mult) { 1237 char c; 1238 int num; 1239 off_t ret; 1240 c = size_arg[strlen(size_arg) - 1]; 1241 if (c > '9') { 1242 size_arg[strlen(size_arg) - 1] = '\0'; 1243 } 1244 num = atoi(size_arg); 1245 switch(c) { 1246 case 'g': 1247 case 'G': 1248 mult = 1024 * 1024 * 1024; 1249 break; 1250 case 'm': 1251 case 'M': 1252 mult = 1024 * 1024; 1253 break; 1254 case 'k': 1255 case 'K': 1256 mult = 1024; 1257 break; 1258 case 'b': 1259 case 'B': 1260 mult = 1; 1261 break; 1262 } 1263 ret = mult * num; 1264 return ret; 1265} 1266 1267void print_usage(void) { 1268 printf("usage: aio-stress [-s size] [-r size] [-a size] [-d num] [-b num]\n"); 1269 printf(" [-i num] [-t num] [-c num] [-C size] [-nxhOS ]\n"); 1270 printf(" file1 [file2 ...]\n"); 1271 printf("\t-a size in KB at which to align buffers\n"); 1272 printf("\t-b max number of iocbs to give io_submit at once\n"); 1273 printf("\t-c number of io contexts per file\n"); 1274 printf("\t-C offset between contexts, default 2MB\n"); 1275 printf("\t-s size in MB of the test file(s), default 1024MB\n"); 1276 printf("\t-r record size in KB used for each io, default 64KB\n"); 1277 printf("\t-d number of pending aio requests for each file, default 64\n"); 1278 printf("\t-i number of ios per file sent before switching\n\t to the next file, default 8\n"); 1279 printf("\t-I total number of ayncs IOs the program will run, default is run until Cntl-C\n"); 1280 printf("\t-O Use O_DIRECT (not available in 2.4 kernels),\n"); 1281 printf("\t-S Use O_SYNC for writes\n"); 1282 printf("\t-o add an operation to the list: write=0, read=1,\n"); 1283 printf("\t random write=2, random read=3.\n"); 1284 printf("\t repeat -o to specify multiple ops: -o 0 -o 1 etc.\n"); 1285 printf("\t-m shm use ipc shared memory for io buffers instead of malloc\n"); 1286 printf("\t-m shmfs mmap a file in /dev/shm for io buffers\n"); 1287 printf("\t-n no fsyncs between write stage and read stage\n"); 1288 printf("\t-l print io_submit latencies after each stage\n"); 1289 printf("\t-L print io completion latencies after each stage\n"); 1290 printf("\t-t number of threads to run\n"); 1291 printf("\t-u unlink files after completion\n"); 1292 printf("\t-v verification of bytes written\n"); 1293 printf("\t-x turn off thread stonewalling\n"); 1294 printf("\t-h this message\n"); 1295 printf("\n\t the size options (-a -s and -r) allow modifiers -s 400{k,m,g}\n"); 1296 printf("\t translate to 400KB, 400MB and 400GB\n"); 1297 printf("version %s\n", PROG_VERSION); 1298} 1299 1300int main(int ac, char **av) 1301{ 1302 int rwfd; 1303 int i; 1304 int j; 1305 int c; 1306 1307 off_t file_size = 1 * 1024 * 1024 * 1024; 1308 int first_stage = WRITE; 1309 struct io_oper *oper; 1310 int status = 0; 1311 int num_files = 0; 1312 int open_fds = 0; 1313 struct thread_info *t; 1314 1315 page_size_mask = getpagesize() - 1; 1316 1317 while(1) { 1318 c = getopt(ac, av, "a:b:c:C:m:s:r:d:i:I:o:t:lLnhOSxvu"); 1319 if (c < 0) 1320 break; 1321 1322 switch(c) { 1323 case 'a': 1324 page_size_mask = parse_size(optarg, 1024); 1325 page_size_mask--; 1326 break; 1327 case 'c': 1328 num_contexts = atoi(optarg); 1329 break; 1330 case 'C': 1331 context_offset = parse_size(optarg, 1024 * 1024); 1332 case 'b': 1333 max_io_submit = atoi(optarg); 1334 break; 1335 case 's': 1336 file_size = parse_size(optarg, 1024 * 1024); 1337 break; 1338 case 'd': 1339 depth = atoi(optarg); 1340 break; 1341 case 'r': 1342 rec_len = parse_size(optarg, 1024); 1343 break; 1344 case 'i': 1345 io_iter = atoi(optarg); 1346 break; 1347 case 'I': 1348 iterations = atoi(optarg); 1349 break; 1350 case 'n': 1351 fsync_stages = 0; 1352 break; 1353 case 'l': 1354 latency_stats = 1; 1355 break; 1356 case 'L': 1357 completion_latency_stats = 1; 1358 break; 1359 case 'm': 1360 if (!strcmp(optarg, "shm")) { 1361 fprintf(stderr, "using ipc shm\n"); 1362 use_shm = USE_SHM; 1363 } else if (!strcmp(optarg, "shmfs")) { 1364 fprintf(stderr, "using /dev/shm for buffers\n"); 1365 use_shm = USE_SHMFS; 1366 } 1367 break; 1368 case 'o': 1369 i = atoi(optarg); 1370 stages |= 1 << i; 1371 fprintf(stderr, "adding stage %s\n", stage_name(i)); 1372 break; 1373 case 'O': 1374 o_direct = O_DIRECT; 1375 break; 1376 case 'S': 1377 o_sync = O_SYNC; 1378 break; 1379 case 't': 1380 num_threads = atoi(optarg); 1381 break; 1382 case 'x': 1383 stonewall = 0; 1384 break; 1385 case 'u': 1386 unlink_files = 1; 1387 break; 1388 case 'v': 1389 verify = 1; 1390 break; 1391 case 'h': 1392 default: 1393 print_usage(); 1394 exit(1); 1395 } 1396 } 1397 1398 /* 1399 * make sure we don't try to submit more ios than we have allocated 1400 * memory for 1401 */ 1402 if (depth < io_iter) { 1403 io_iter = depth; 1404 fprintf(stderr, "dropping io_iter to %d\n", io_iter); 1405 } 1406 1407 if (optind >= ac) { 1408 print_usage(); 1409 exit(1); 1410 } 1411 1412 num_files = ac - optind; 1413 1414 if (num_threads > (num_files * num_contexts)) { 1415 num_threads = num_files * num_contexts; 1416 fprintf(stderr, "dropping thread count to the number of contexts %d\n", 1417 num_threads); 1418 } 1419 1420 t = malloc(num_threads * sizeof(*t)); 1421 if (!t) { 1422 perror("malloc"); 1423 exit(1); 1424 } 1425 global_thread_info = t; 1426 1427 /* by default, allow a huge number of iocbs to be sent towards 1428 * io_submit 1429 */ 1430 if (!max_io_submit) 1431 max_io_submit = num_files * io_iter * num_contexts; 1432 1433 /* 1434 * make sure we don't try to submit more ios than max_io_submit allows 1435 */ 1436 if (max_io_submit < io_iter) { 1437 io_iter = max_io_submit; 1438 fprintf(stderr, "dropping io_iter to %d\n", io_iter); 1439 } 1440 1441 if (!stages) { 1442 stages = (1 << WRITE) | (1 << READ) | (1 << RREAD) | (1 << RWRITE); 1443 } else { 1444 for (i = 0 ; i < LAST_STAGE; i++) { 1445 if (stages & (1 << i)) { 1446 first_stage = i; 1447 fprintf(stderr, "starting with %s\n", stage_name(i)); 1448 break; 1449 } 1450 } 1451 } 1452 1453 if (file_size < num_contexts * context_offset) { 1454 fprintf(stderr, "file size %Lu too small for %d contexts\n", 1455 file_size, num_contexts); 1456 exit(1); 1457 } 1458 1459 fprintf(stderr, "file size %LuMB, record size %luKB, depth %d, ios per iteration %d\n", file_size / (1024 * 1024), rec_len / 1024, depth, io_iter); 1460 fprintf(stderr, "max io_submit %d, buffer alignment set to %luKB\n", 1461 max_io_submit, (page_size_mask + 1)/1024); 1462 fprintf(stderr, "threads %d files %d contexts %d context offset %LuMB verification %s\n", 1463 num_threads, num_files, num_contexts, 1464 context_offset / (1024 * 1024), verify ? "on" : "off"); 1465 /* open all the files and do any required setup for them */ 1466 for (i = optind ; i < ac ; i++) { 1467 int thread_index; 1468 for (j = 0 ; j < num_contexts ; j++) { 1469 thread_index = open_fds % num_threads; 1470 open_fds++; 1471 1472 rwfd = open(av[i], O_CREAT | O_RDWR | o_direct | o_sync, 0600); 1473 assert(rwfd != -1); 1474 1475 oper = create_oper(rwfd, first_stage, j * context_offset, 1476 file_size - j * context_offset, rec_len, 1477 depth, io_iter, av[i]); 1478 if (!oper) { 1479 fprintf(stderr, "error in create_oper\n"); 1480 exit(-1); 1481 } 1482 oper_list_add(oper, &t[thread_index].active_opers); 1483 t[thread_index].num_files++; 1484 } 1485 } 1486 if (setup_shared_mem(num_threads, num_files * num_contexts, 1487 depth, rec_len, max_io_submit)) 1488 { 1489 exit(1); 1490 } 1491 for (i = 0 ; i < num_threads ; i++) { 1492 if (setup_ious(&t[i], t[i].num_files, depth, rec_len, max_io_submit)) 1493 exit(1); 1494 } 1495 if (num_threads > 1){ 1496 printf("Running multi thread version num_threads:%d\n", num_threads); 1497 run_workers(t, num_threads); 1498 } else { 1499 printf("Running single thread version \n"); 1500 status = worker(t); 1501 } 1502 if (unlink_files) { 1503 for (i = optind ; i < ac ; i++) { 1504 printf("Cleaning up file %s \n", av[i]); 1505 unlink(av[i]); 1506 } 1507 } 1508 1509 if (status) { 1510 exit(1); 1511 } 1512 return status; 1513} 1514 1515