aio-stress.c revision 5f7ff66f177d38f24c37fca2b8b8e4a7d8852b90
1/* 2 * Copyright (c) 2004 SuSE, Inc. All Rights Reserved. 3 * 4 * This program is free software; you can redistribute it and/or modify it 5 * under the terms of version 2 of the GNU General Public License as 6 * published by the Free Software Foundation. 7 * 8 * This program is distributed in the hope that it would be useful, but 9 * WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 11 * 12 * Further, this software is distributed without any warranty that it is 13 * free of the rightful claim of any third person regarding infringement 14 * or the like. Any license provided herein, whether implied or 15 * otherwise, applies only to this software file. Patent licenses, if 16 * any, provided herein do not apply to combinations of this program with 17 * other software, or any other product whatsoever. 18 * 19 * You should have received a copy of the GNU General Public License along 20 * with this program; if not, write the Free Software Foundation, Inc., 59 21 * Temple Place - Suite 330, Boston MA 02111-1307, USA. 22 * 23 * Contact information: Silicon Graphics, Inc., 1600 Amphitheatre Pkwy, 24 * Mountain View, CA 94043, or: 25 * 26 * 27 * aio-stress 28 * 29 * will open or create each file on the command line, and start a series 30 * of aio to it. 31 * 32 * aio is done in a rotating loop. first file1 gets 8 requests, then 33 * file2, then file3 etc. As each file finishes writing, it is switched 34 * to reads 35 * 36 * io buffers are aligned in case you want to do raw io 37 * 38 * compile with gcc -Wall -laio -lpthread -o aio-stress aio-stress.c 39 * 40 * run aio-stress -h to see the options 41 * 42 * Please mail Chris Mason (mason@suse.com) with bug reports or patches 43 */ 44#define _FILE_OFFSET_BITS 64 45#define PROG_VERSION "0.18" 46#define NEW_GETEVENTS 47 48#include <stdio.h> 49#include <errno.h> 50#include <assert.h> 51#include <stdlib.h> 52 53#include <sys/types.h> 54#include <sys/stat.h> 55#include <fcntl.h> 56#include <unistd.h> 57#include <sys/time.h> 58#include <libaio.h> 59#include <sys/ipc.h> 60#include <sys/shm.h> 61#include <sys/mman.h> 62#include <string.h> 63#include <pthread.h> 64 65#define IO_FREE 0 66#define IO_PENDING 1 67#define RUN_FOREVER -1 68 69#ifndef O_DIRECT 70#define O_DIRECT 040000 /* direct disk access hint */ 71#endif 72 73enum { 74 WRITE, 75 READ, 76 RWRITE, 77 RREAD, 78 LAST_STAGE, 79}; 80 81#define USE_MALLOC 0 82#define USE_SHM 1 83#define USE_SHMFS 2 84 85/* 86 * various globals, these are effectively read only by the time the threads 87 * are started 88 */ 89long stages = 0; 90unsigned long page_size_mask; 91int o_direct = 0; 92int o_sync = 0; 93int latency_stats = 0; 94int io_iter = 8; 95int iterations = RUN_FOREVER; 96int max_io_submit = 0; 97long rec_len = 64 * 1024; 98int depth = 64; 99int num_threads = 1; 100int num_contexts = 1; 101off_t context_offset = 2 * 1024 * 1024; 102int fsync_stages = 1; 103int use_shm = 0; 104int shm_id; 105char *unaligned_buffer = NULL; 106char *aligned_buffer = NULL; 107int padded_reclen = 0; 108int stonewall = 1; 109int verify = 0; 110char *verify_buf = NULL; 111 112struct io_unit; 113struct thread_info; 114 115/* pthread mutexes and other globals for keeping the threads in sync */ 116pthread_cond_t stage_cond = PTHREAD_COND_INITIALIZER; 117pthread_mutex_t stage_mutex = PTHREAD_MUTEX_INITIALIZER; 118int threads_ending = 0; 119int threads_starting = 0; 120struct timeval global_stage_start_time; 121struct thread_info *global_thread_info; 122 123/* 124 * latencies during io_submit are measured, these are the 125 * granularities for deviations 126 */ 127#define DEVIATIONS 6 128int deviations[DEVIATIONS] = { 100, 250, 500, 1000, 5000, 10000 }; 129struct io_latency { 130 double max; 131 double min; 132 double total_io; 133 double total_lat; 134 double deviations[DEVIATIONS]; 135}; 136 137/* container for a series of operations to a file */ 138struct io_oper { 139 /* already open file descriptor, valid for whatever operation you want */ 140 int fd; 141 142 /* starting byte of the operation */ 143 off_t start; 144 145 /* ending byte of the operation */ 146 off_t end; 147 148 /* size of the read/write buffer */ 149 int reclen; 150 151 /* max number of pending requests before a wait is triggered */ 152 int depth; 153 154 /* current number of pending requests */ 155 int num_pending; 156 157 /* last error, zero if there were none */ 158 int last_err; 159 160 /* total number of errors hit. */ 161 int num_err; 162 163 /* read,write, random, etc */ 164 int rw; 165 166 /* number of ios that will get sent to aio */ 167 int total_ios; 168 169 /* number of ios we've already sent */ 170 int started_ios; 171 172 /* last offset used in an io operation */ 173 off_t last_offset; 174 175 /* stonewalled = 1 when we got cut off before submitting all our ios */ 176 int stonewalled; 177 178 /* list management */ 179 struct io_oper *next; 180 struct io_oper *prev; 181 182 struct timeval start_time; 183 184 char *file_name; 185}; 186 187/* a single io, and all the tracking needed for it */ 188struct io_unit { 189 /* note, iocb must go first! */ 190 struct iocb iocb; 191 192 /* pointer to parent io operation struct */ 193 struct io_oper *io_oper; 194 195 /* aligned buffer */ 196 char *buf; 197 198 /* size of the aligned buffer (record size) */ 199 int buf_size; 200 201 /* state of this io unit (free, pending, done) */ 202 int busy; 203 204 /* result of last operation */ 205 long res; 206 207 struct io_unit *next; 208}; 209 210struct thread_info { 211 io_context_t io_ctx; 212 pthread_t tid; 213 214 /* allocated array of io_unit structs */ 215 struct io_unit *ios; 216 217 /* list of io units available for io */ 218 struct io_unit *free_ious; 219 220 /* number of io units in the ios array */ 221 int num_global_ios; 222 223 /* number of io units in flight */ 224 int num_global_pending; 225 226 /* preallocated array of iocb pointers, only used in run_active */ 227 struct iocb **iocbs; 228 229 /* preallocated array of events */ 230 struct io_event *events; 231 232 /* size of the events array */ 233 int num_global_events; 234 235 /* latency stats for io_submit */ 236 struct io_latency io_submit_latency; 237 238 /* list of operations still in progress, and of those finished */ 239 struct io_oper *active_opers; 240 struct io_oper *finished_opers; 241 242 /* number of files this thread is doing io on */ 243 int num_files; 244 245 /* how much io this thread did in the last stage */ 246 double stage_mb_trans; 247}; 248 249static double time_since(struct timeval *tv) { 250 double sec, usec; 251 double ret; 252 struct timeval stop; 253 gettimeofday(&stop, NULL); 254 sec = stop.tv_sec - tv->tv_sec; 255 usec = stop.tv_usec - tv->tv_usec; 256 if (sec > 0 && usec < 0) { 257 sec--; 258 usec += 1000000; 259 } 260 ret = sec + usec / (double)1000000; 261 if (ret < 0) 262 ret = 0; 263 return ret; 264} 265 266static void calc_latency(struct timeval *tv, struct io_latency *lat) 267{ 268 double delta; 269 int i; 270 delta = time_since(tv); 271 delta = delta * 1000; 272 273 if (delta > lat->max) 274 lat->max = delta; 275 if (!lat->min || delta < lat->min) 276 lat->min = delta; 277 lat->total_io++; 278 lat->total_lat += delta; 279 for (i = 0 ; i < DEVIATIONS ; i++) { 280 if (delta < deviations[i]) { 281 lat->deviations[i]++; 282 break; 283 } 284 } 285} 286 287static void oper_list_add(struct io_oper *oper, struct io_oper **list) 288{ 289 if (!*list) { 290 *list = oper; 291 oper->prev = oper->next = oper; 292 return; 293 } 294 oper->prev = (*list)->prev; 295 oper->next = *list; 296 (*list)->prev->next = oper; 297 (*list)->prev = oper; 298 return; 299} 300 301static void oper_list_del(struct io_oper *oper, struct io_oper **list) 302{ 303 if ((*list)->next == (*list)->prev && *list == (*list)->next) { 304 *list = NULL; 305 return; 306 } 307 oper->prev->next = oper->next; 308 oper->next->prev = oper->prev; 309 if (*list == oper) 310 *list = oper->next; 311} 312 313/* worker func to check error fields in the io unit */ 314static int check_finished_io(struct io_unit *io) { 315 int i; 316 if (io->res != io->buf_size) { 317 fprintf(stderr, "io err %lu (%s) op %d, size %d\n", 318 io->res, strerror(-io->res), io->iocb.aio_lio_opcode, 319 io->buf_size); 320 io->io_oper->last_err = io->res; 321 io->io_oper->num_err++; 322 return -1; 323 } 324 if (verify && io->io_oper->rw == READ) { 325 if (memcmp(io->buf, verify_buf, io->io_oper->reclen)) { 326 fprintf(stderr, "verify error, file %s offset %Lu contents (offset:bad:good):\n", 327 io->io_oper->file_name, io->iocb.u.c.offset); 328 329 for (i = 0 ; i < io->io_oper->reclen ; i++) { 330 if (io->buf[i] != verify_buf[i]) { 331 fprintf(stderr, "%d:%c:%c ", i, io->buf[i], verify_buf[i]); 332 } 333 } 334 fprintf(stderr, "\n"); 335 } 336 337 } 338 return 0; 339} 340 341/* worker func to check the busy bits and get an io unit ready for use */ 342static int grab_iou(struct io_unit *io, struct io_oper *oper) { 343 if (io->busy == IO_PENDING) 344 return -1; 345 346 io->busy = IO_PENDING; 347 io->res = 0; 348 io->io_oper = oper; 349 return 0; 350} 351 352char *stage_name(int rw) { 353 switch(rw) { 354 case WRITE: 355 return "write"; 356 case READ: 357 return "read"; 358 case RWRITE: 359 return "random write"; 360 case RREAD: 361 return "random read"; 362 } 363 return "unknown"; 364} 365 366static inline double oper_mb_trans(struct io_oper *oper) { 367 return ((double)oper->started_ios * (double)oper->reclen) / 368 (double)(1024 * 1024); 369} 370 371static void print_time(struct io_oper *oper) { 372 double runtime; 373 double tput; 374 double mb; 375 376 runtime = time_since(&oper->start_time); 377 mb = oper_mb_trans(oper); 378 tput = mb / runtime; 379 fprintf(stderr, "%s on %s (%.2f MB/s) %.2f MB in %.2fs\n", 380 stage_name(oper->rw), oper->file_name, tput, mb, runtime); 381} 382 383static void print_latency(struct thread_info *t) { 384 struct io_latency *lat = &t->io_submit_latency; 385 double avg = lat->total_lat / lat->total_io; 386 int i; 387 double total_counted = 0; 388 fprintf(stderr, "latency min %.2f avg %.2f max %.2f\n\t", 389 lat->min, avg, lat->max); 390 391 for (i = 0 ; i < DEVIATIONS ; i++) { 392 fprintf(stderr, " %.0f < %d", lat->deviations[i], deviations[i]); 393 total_counted += lat->deviations[i]; 394 } 395 if (total_counted && lat->total_io - total_counted) 396 fprintf(stderr, " < %.0f", lat->total_io - total_counted); 397 fprintf(stderr, "\n"); 398 memset(&t->io_submit_latency, 0, sizeof(t->io_submit_latency)); 399} 400 401/* 402 * updates the fields in the io operation struct that belongs to this 403 * io unit, and make the io unit reusable again 404 */ 405void finish_io(struct thread_info *t, struct io_unit *io, long result) { 406 struct io_oper *oper = io->io_oper; 407 408 io->res = result; 409 io->busy = IO_FREE; 410 io->next = t->free_ious; 411 t->free_ious = io; 412 oper->num_pending--; 413 t->num_global_pending--; 414 check_finished_io(io); 415 if (oper->num_pending == 0 && 416 (oper->started_ios == oper->total_ios || oper->stonewalled)) 417 { 418 print_time(oper); 419 } 420} 421 422int read_some_events(struct thread_info *t) { 423 struct io_unit *event_io; 424 struct io_event *event; 425 int nr; 426 int i; 427 int min_nr = io_iter; 428 429 if (t->num_global_pending < io_iter) 430 min_nr = t->num_global_pending; 431 432#ifdef NEW_GETEVENTS 433 nr = io_getevents(t->io_ctx, min_nr, t->num_global_events, t->events,NULL); 434#else 435 nr = io_getevents(t->io_ctx, t->num_global_events, t->events, NULL); 436#endif 437 if (nr <= 0) 438 return nr; 439 440 for (i = 0 ; i < nr ; i++) { 441 event = t->events + i; 442 event_io = (struct io_unit *)((unsigned long)event->obj); 443 finish_io(t, event_io, event->res); 444 } 445 return nr; 446} 447 448/* 449 * finds a free io unit, waiting for pending requests if required. returns 450 * null if none could be found 451 */ 452static struct io_unit *find_iou(struct thread_info *t, struct io_oper *oper) 453{ 454 struct io_unit *event_io; 455 int nr; 456 457retry: 458 if (t->free_ious) { 459 event_io = t->free_ious; 460 t->free_ious = t->free_ious->next; 461 if (grab_iou(event_io, oper)) { 462 fprintf(stderr, "io unit on free list but not free\n"); 463 abort(); 464 } 465 return event_io; 466 } 467 nr = read_some_events(t); 468 if (nr > 0) 469 goto retry; 470 else 471 fprintf(stderr, "no free ious after read_some_events\n"); 472 return NULL; 473} 474 475/* 476 * wait for all pending requests for this io operation to finish 477 */ 478static int io_oper_wait(struct thread_info *t, struct io_oper *oper) { 479 struct io_event event; 480 struct io_unit *event_io; 481 482 if (oper == NULL) { 483 return 0; 484 } 485 486 if (oper->num_pending == 0) 487 goto done; 488 489 /* this func is not speed sensitive, no need to go wild reading 490 * more than one event at a time 491 */ 492#ifdef NEW_GETEVENTS 493 while(io_getevents(t->io_ctx, 1, 1, &event, NULL) > 0) { 494#else 495 while(io_getevents(t->io_ctx, 1, &event, NULL) > 0) { 496#endif 497 event_io = (struct io_unit *)((unsigned long)event.obj); 498 499 finish_io(t, event_io, event.res); 500 501 if (oper->num_pending == 0) 502 break; 503 } 504done: 505 if (oper->num_err) { 506 fprintf(stderr, "%u errors on oper, last %u\n", 507 oper->num_err, oper->last_err); 508 } 509 return 0; 510} 511 512off_t random_byte_offset(struct io_oper *oper) { 513 off_t num; 514 off_t rand_byte = oper->start; 515 off_t range; 516 off_t offset = 1; 517 518 range = (oper->end - oper->start) / (1024 * 1024); 519 if ((page_size_mask+1) > (1024 * 1024)) 520 offset = (page_size_mask+1) / (1024 * 1024); 521 if (range < offset) 522 range = 0; 523 else 524 range -= offset; 525 526 /* find a random mb offset */ 527 num = 1 + (int)((double)range * rand() / (RAND_MAX + 1.0 )); 528 rand_byte += num * 1024 * 1024; 529 530 /* find a random byte offset */ 531 num = 1 + (int)((double)(1024 * 1024) * rand() / (RAND_MAX + 1.0)); 532 533 /* page align */ 534 num = (num + page_size_mask) & ~page_size_mask; 535 rand_byte += num; 536 537 if (rand_byte + oper->reclen > oper->end) { 538 rand_byte -= oper->reclen; 539 } 540 return rand_byte; 541} 542 543/* 544 * build an aio iocb for an operation, based on oper->rw and the 545 * last offset used. This finds the struct io_unit that will be attached 546 * to the iocb, and things are ready for submission to aio after this 547 * is called. 548 * 549 * returns null on error 550 */ 551static struct io_unit *build_iocb(struct thread_info *t, struct io_oper *oper) 552{ 553 struct io_unit *io; 554 off_t rand_byte; 555 556 io = find_iou(t, oper); 557 if (!io) { 558 fprintf(stderr, "unable to find io unit\n"); 559 return NULL; 560 } 561 562 switch(oper->rw) { 563 case WRITE: 564 io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen, 565 oper->last_offset); 566 oper->last_offset += oper->reclen; 567 break; 568 case READ: 569 io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen, 570 oper->last_offset); 571 oper->last_offset += oper->reclen; 572 break; 573 case RREAD: 574 rand_byte = random_byte_offset(oper); 575 oper->last_offset = rand_byte; 576 io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen, 577 rand_byte); 578 break; 579 case RWRITE: 580 rand_byte = random_byte_offset(oper); 581 oper->last_offset = rand_byte; 582 io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen, 583 rand_byte); 584 585 break; 586 } 587 588 return io; 589} 590 591/* 592 * wait for any pending requests, and then free all ram associated with 593 * an operation. returns the last error the operation hit (zero means none) 594 */ 595static int 596finish_oper(struct thread_info *t, struct io_oper *oper) 597{ 598 unsigned long last_err; 599 600 io_oper_wait(t, oper); 601 last_err = oper->last_err; 602 if (oper->num_pending > 0) { 603 fprintf(stderr, "oper num_pending is %d\n", oper->num_pending); 604 } 605 close(oper->fd); 606 free(oper); 607 return last_err; 608} 609 610/* 611 * allocates an io operation and fills in all the fields. returns 612 * null on error 613 */ 614static struct io_oper * 615create_oper(int fd, int rw, off_t start, off_t end, int reclen, int depth, 616 int iter, char *file_name) 617{ 618 struct io_oper *oper; 619 620 oper = malloc (sizeof(*oper)); 621 if (!oper) { 622 fprintf(stderr, "unable to allocate io oper\n"); 623 return NULL; 624 } 625 memset(oper, 0, sizeof(*oper)); 626 627 oper->depth = depth; 628 oper->start = start; 629 oper->end = end; 630 oper->last_offset = oper->start; 631 oper->fd = fd; 632 oper->reclen = reclen; 633 oper->rw = rw; 634 oper->total_ios = (oper->end - oper->start) / oper->reclen; 635 oper->file_name = file_name; 636 637 return oper; 638} 639 640/* 641 * does setup on num_ios worth of iocbs, but does not actually 642 * start any io 643 */ 644int build_oper(struct thread_info *t, struct io_oper *oper, int num_ios, 645 struct iocb **my_iocbs) 646{ 647 int i; 648 struct io_unit *io; 649 650 if (oper->started_ios == 0) 651 gettimeofday(&oper->start_time, NULL); 652 653 if (num_ios == 0) 654 num_ios = oper->total_ios; 655 656 if ((oper->started_ios + num_ios) > oper->total_ios) 657 num_ios = oper->total_ios - oper->started_ios; 658 659 for( i = 0 ; i < num_ios ; i++) { 660 io = build_iocb(t, oper); 661 if (!io) { 662 return -1; 663 } 664 my_iocbs[i] = &io->iocb; 665 } 666 return num_ios; 667} 668 669/* 670 * runs through the iocbs in the array provided and updates 671 * counters in the associated oper struct 672 */ 673static void update_iou_counters(struct iocb **my_iocbs, int nr) 674{ 675 struct io_unit *io; 676 int i; 677 for (i = 0 ; i < nr ; i++) { 678 io = (struct io_unit *)(my_iocbs[i]); 679 io->io_oper->num_pending++; 680 io->io_oper->started_ios++; 681 } 682} 683 684/* starts some io for a given file, returns zero if all went well */ 685int run_built(struct thread_info *t, int num_ios, struct iocb **my_iocbs) 686{ 687 int ret; 688 struct timeval start_time; 689 690resubmit: 691 gettimeofday(&start_time, NULL); 692 ret = io_submit(t->io_ctx, num_ios, my_iocbs); 693 calc_latency(&start_time, &t->io_submit_latency); 694 if (ret != num_ios) { 695 /* some ios got through */ 696 if (ret > 0) { 697 update_iou_counters(my_iocbs, ret); 698 my_iocbs += ret; 699 t->num_global_pending += ret; 700 num_ios -= ret; 701 } 702 /* 703 * we've used all the requests allocated in aio_init, wait and 704 * retry 705 */ 706 if (ret > 0 || ret == -EAGAIN) { 707 if ((ret = read_some_events(t) > 0)) { 708 goto resubmit; 709 } 710 } 711 712 fprintf(stderr, "ret %d (%s) on io_submit\n", ret, strerror(-ret)); 713 return -1; 714 } 715 update_iou_counters(my_iocbs, ret); 716 t->num_global_pending += ret; 717 return 0; 718} 719 720/* 721 * changes oper->rw to the next in a command sequence, or returns zero 722 * to say this operation is really, completely done for 723 */ 724static int restart_oper(struct io_oper *oper) { 725 int new_rw = 0; 726 if (oper->last_err) 727 return 0; 728 729 /* this switch falls through */ 730 switch(oper->rw) { 731 case WRITE: 732 if (stages & (1 << READ)) 733 new_rw = READ; 734 case READ: 735 if (!new_rw && stages & (1 << RWRITE)) 736 new_rw = RWRITE; 737 case RWRITE: 738 if (!new_rw && stages & (1 << RREAD)) 739 new_rw = RREAD; 740 } 741 742 if (new_rw) { 743 oper->started_ios = 0; 744 oper->last_offset = oper->start; 745 oper->stonewalled = 0; 746 747 /* 748 * we're restarting an operation with pending requests, so the 749 * timing info won't be printed by finish_io. Printing it here 750 */ 751 if (oper->num_pending) 752 print_time(oper); 753 754 oper->rw = new_rw; 755 return 1; 756 } 757 return 0; 758} 759 760static int oper_runnable(struct io_oper *oper) { 761 struct stat buf; 762 int ret; 763 764 /* first context is always runnable, if started_ios > 0, no need to 765 * redo the calculations 766 */ 767 if (oper->started_ios || oper->start == 0) 768 return 1; 769 /* 770 * only the sequential phases force delays in starting */ 771 if (oper->rw >= RWRITE) 772 return 1; 773 ret = fstat(oper->fd, &buf); 774 if (ret < 0) { 775 perror("fstat"); 776 exit(1); 777 } 778 if (S_ISREG(buf.st_mode) && buf.st_size < oper->start) 779 return 0; 780 return 1; 781} 782 783/* 784 * runs through all the io operations on the active list, and starts 785 * a chunk of io on each. If any io operations are completely finished, 786 * it either switches them to the next stage or puts them on the 787 * finished list. 788 * 789 * this function stops after max_io_submit iocbs are sent down the 790 * pipe, even if it has not yet touched all the operations on the 791 * active list. Any operations that have finished are moved onto 792 * the finished_opers list. 793 */ 794static int run_active_list(struct thread_info *t, 795 int io_iter, 796 int max_io_submit) 797{ 798 struct io_oper *oper; 799 struct io_oper *built_opers = NULL; 800 struct iocb **my_iocbs = t->iocbs; 801 int ret = 0; 802 int num_built = 0; 803 804 oper = t->active_opers; 805 while(oper) { 806 if (!oper_runnable(oper)) { 807 oper = oper->next; 808 if (oper == t->active_opers) 809 break; 810 continue; 811 } 812 ret = build_oper(t, oper, io_iter, my_iocbs); 813 if (ret >= 0) { 814 my_iocbs += ret; 815 num_built += ret; 816 oper_list_del(oper, &t->active_opers); 817 oper_list_add(oper, &built_opers); 818 oper = t->active_opers; 819 if (num_built + io_iter > max_io_submit) 820 break; 821 } else 822 break; 823 } 824 if (num_built) { 825 ret = run_built(t, num_built, t->iocbs); 826 if (ret < 0) { 827 fprintf(stderr, "error %d on run_built\n", ret); 828 exit(1); 829 } 830 while(built_opers) { 831 oper = built_opers; 832 oper_list_del(oper, &built_opers); 833 oper_list_add(oper, &t->active_opers); 834 if (oper->started_ios == oper->total_ios) { 835 oper_list_del(oper, &t->active_opers); 836 oper_list_add(oper, &t->finished_opers); 837 } 838 } 839 } 840 return 0; 841} 842 843void drop_shm() { 844 int ret; 845 struct shmid_ds ds; 846 if (use_shm != USE_SHM) 847 return; 848 849 ret = shmctl(shm_id, IPC_RMID, &ds); 850 if (ret) { 851 perror("shmctl IPC_RMID"); 852 } 853} 854 855void aio_setup(io_context_t *io_ctx, int n) 856{ 857 int res = io_queue_init(n, io_ctx); 858 if (res != 0) { 859 fprintf(stderr, "io_queue_setup(%d) returned %d (%s)\n", 860 n, res, strerror(-res)); 861 exit(3); 862 } 863} 864 865/* 866 * allocate io operation and event arrays for a given thread 867 */ 868int setup_ious(struct thread_info *t, 869 int num_files, int depth, 870 int reclen, int max_io_submit) { 871 int i; 872 size_t bytes = num_files * depth * sizeof(*t->ios); 873 874 t->ios = malloc(bytes); 875 if (!t->ios) { 876 fprintf(stderr, "unable to allocate io units\n"); 877 return -1; 878 } 879 memset(t->ios, 0, bytes); 880 881 for (i = 0 ; i < depth * num_files; i++) { 882 t->ios[i].buf = aligned_buffer; 883 aligned_buffer += padded_reclen; 884 t->ios[i].buf_size = reclen; 885 if (verify) 886 memset(t->ios[i].buf, 'b', reclen); 887 else 888 memset(t->ios[i].buf, 0, reclen); 889 t->ios[i].next = t->free_ious; 890 t->free_ious = t->ios + i; 891 } 892 if (verify) { 893 verify_buf = aligned_buffer; 894 memset(verify_buf, 'b', reclen); 895 } 896 897 t->iocbs = malloc(sizeof(struct iocb *) * max_io_submit); 898 if (!t->iocbs) { 899 fprintf(stderr, "unable to allocate iocbs\n"); 900 goto free_buffers; 901 } 902 903 memset(t->iocbs, 0, max_io_submit * sizeof(struct iocb *)); 904 905 t->events = malloc(sizeof(struct io_event) * depth * num_files); 906 if (!t->events) { 907 fprintf(stderr, "unable to allocate ram for events\n"); 908 goto free_buffers; 909 } 910 memset(t->events, 0, num_files * sizeof(struct io_event)*depth); 911 912 t->num_global_ios = num_files * depth; 913 t->num_global_events = t->num_global_ios; 914 return 0; 915 916free_buffers: 917 if (t->ios) 918 free(t->ios); 919 if (t->iocbs) 920 free(t->iocbs); 921 if (t->events) 922 free(t->events); 923 return -1; 924} 925 926/* 927 * The buffers used for file data are allocated as a single big 928 * malloc, and then each thread and operation takes a piece and uses 929 * that for file data. This lets us do a large shm or bigpages alloc 930 * and without trying to find a special place in each thread to map the 931 * buffers to 932 */ 933int setup_shared_mem(int num_threads, int num_files, int depth, 934 int reclen, int max_io_submit) 935{ 936 char *p = NULL; 937 size_t total_ram; 938 939 padded_reclen = (reclen + page_size_mask) / (page_size_mask+1); 940 padded_reclen = padded_reclen * (page_size_mask+1); 941 total_ram = num_files * depth * padded_reclen + num_threads; 942 if (verify) 943 total_ram += padded_reclen; 944 945 if (use_shm == USE_MALLOC) { 946 p = malloc(total_ram + page_size_mask); 947 } else if (use_shm == USE_SHM) { 948 shm_id = shmget(IPC_PRIVATE, total_ram, IPC_CREAT | 0700); 949 if (shm_id < 0) { 950 perror("shmget"); 951 drop_shm(); 952 goto free_buffers; 953 } 954 p = shmat(shm_id, (char *)0x50000000, 0); 955 if ((long)p == -1) { 956 perror("shmat"); 957 goto free_buffers; 958 } 959 /* won't really be dropped until we shmdt */ 960 drop_shm(); 961 } else if (use_shm == USE_SHMFS) { 962 char mmap_name[16]; /* /dev/shm/ + null + XXXXXX */ 963 int fd; 964 965 strcpy(mmap_name, "/dev/shm/XXXXXX"); 966 fd = mkstemp(mmap_name); 967 if (fd < 0) { 968 perror("mkstemp"); 969 goto free_buffers; 970 } 971 unlink(mmap_name); 972 ftruncate(fd, total_ram); 973 shm_id = fd; 974 p = mmap((char *)0x50000000, total_ram, 975 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 976 977 if (p == MAP_FAILED) { 978 perror("mmap"); 979 goto free_buffers; 980 } 981 } 982 if (!p) { 983 fprintf(stderr, "unable to allocate buffers\n"); 984 goto free_buffers; 985 } 986 unaligned_buffer = p; 987 (unsigned long)p = ((unsigned long) (p + page_size_mask) & ~page_size_mask); 988 aligned_buffer = p; 989 return 0; 990 991free_buffers: 992 drop_shm(); 993 if (unaligned_buffer) 994 free(unaligned_buffer); 995 return -1; 996} 997 998/* 999 * runs through all the thread_info structs and calculates a combined 1000 * throughput 1001 */ 1002void global_thread_throughput(struct thread_info *t, char *this_stage) { 1003 int i; 1004 double runtime = time_since(&global_stage_start_time); 1005 double total_mb = 0; 1006 double min_trans = 0; 1007 1008 for (i = 0 ; i < num_threads ; i++) { 1009 total_mb += global_thread_info[i].stage_mb_trans; 1010 if (!min_trans || t->stage_mb_trans < min_trans) 1011 min_trans = t->stage_mb_trans; 1012 } 1013 if (total_mb) { 1014 fprintf(stderr, "%s throughput (%.2f MB/s) ", this_stage, 1015 total_mb / runtime); 1016 fprintf(stderr, "%.2f MB in %.2fs", total_mb, runtime); 1017 if (stonewall) 1018 fprintf(stderr, " min transfer %.2fMB", min_trans); 1019 fprintf(stderr, "\n"); 1020 } 1021} 1022 1023 1024/* this is the meat of the state machine. There is a list of 1025 * active operations structs, and as each one finishes the required 1026 * io it is moved to a list of finished operations. Once they have 1027 * all finished whatever stage they were in, they are given the chance 1028 * to restart and pick a different stage (read/write/random read etc) 1029 * 1030 * various timings are printed in between the stages, along with 1031 * thread synchronization if there are more than one threads. 1032 */ 1033int worker(struct thread_info *t) 1034{ 1035 struct io_oper *oper; 1036 char *this_stage = NULL; 1037 struct timeval stage_time; 1038 int status = 0; 1039 int iteration = 0; 1040 int cnt; 1041 1042 aio_setup(&t->io_ctx, 512); 1043 1044restart: 1045 printf("Starting %s iter:%d \n", __FUNCTION__,iteration); 1046 if (num_threads > 1) { 1047 printf("num_threads %d \n", num_threads); 1048 pthread_mutex_lock(&stage_mutex); 1049 threads_starting++; 1050 if (threads_starting == num_threads) { 1051 threads_ending = 0; 1052 gettimeofday(&global_stage_start_time, NULL); 1053 pthread_cond_broadcast(&stage_cond); 1054 } 1055 while (threads_starting != num_threads) 1056 pthread_cond_wait(&stage_cond, &stage_mutex); 1057 pthread_mutex_unlock(&stage_mutex); 1058 } 1059 if (t->active_opers) { 1060// printf("active_opers %p line:%d\n", t->active_opers, __LINE__); 1061 this_stage = stage_name(t->active_opers->rw); 1062 gettimeofday(&stage_time, NULL); 1063 t->stage_mb_trans = 0; 1064 } 1065 cnt = 0; 1066 /* first we send everything through aio */ 1067// printf("cnt:%d max_iterations:%d oper:%p\n",cnt, iterations,oper); 1068 1069 while (t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) { 1070 printf("active_opers %p line:%d cnt:%d ", t->active_opers,__LINE__,cnt); 1071 if (stonewall && threads_ending) { 1072 oper = t->active_opers; 1073 oper->stonewalled = 1; 1074 oper_list_del(oper, &t->active_opers); 1075 oper_list_add(oper, &t->finished_opers); 1076 printf(" if branch\n"); 1077 } else { 1078 run_active_list(t, io_iter, max_io_submit); 1079 printf(" else branch\n"); 1080 } 1081 cnt++; 1082 } 1083 1084 if (latency_stats) 1085 print_latency(t); 1086 1087 /* then we wait for all the operations to finish */ 1088 oper = t->finished_opers; 1089// printf("line:%d oper:%p\n", __LINE__, oper); 1090 do { 1091 io_oper_wait(t, oper); 1092 if (oper != NULL) { 1093 oper = oper->next; 1094 } 1095 } while (oper != t->finished_opers); 1096// printf("finished_opers %p line:%d\n", t->finished_opers,__LINE__); 1097 1098 /* then we do an fsync to get the timing for any future operations 1099 * right, and check to see if any of these need to get restarted 1100 */ 1101 oper = t->finished_opers; 1102// printf("oper %p line:%d\n", oper,__LINE__); 1103 while (oper) { 1104 if (fsync_stages) 1105 fsync(oper->fd); 1106 t->stage_mb_trans += oper_mb_trans(oper); 1107 if (restart_oper(oper)) { 1108 oper_list_del(oper, &t->finished_opers); 1109 oper_list_add(oper, &t->active_opers); 1110 oper = t->finished_opers; 1111 continue; 1112 } 1113 oper = oper->next; 1114 if (oper == t->finished_opers) 1115 break; 1116 } 1117 1118 if (t->stage_mb_trans && t->num_files > 0) { 1119// printf("num_files %d line:%d\n", t->num_files,__LINE__); 1120 double seconds = time_since(&stage_time); 1121 fprintf(stderr, "thread %d %s totals (%.2f MB/s) %.2f MB in %.2fs\n", 1122 t - global_thread_info, this_stage, t->stage_mb_trans/seconds, 1123 t->stage_mb_trans, seconds); 1124 } 1125 1126 if (num_threads > 1) { 1127// printf("num_threads %d line:%d\n", num_threads,__LINE__); 1128 pthread_mutex_lock(&stage_mutex); 1129 threads_ending++; 1130 if (threads_ending == num_threads) { 1131 threads_starting = 0; 1132 pthread_cond_broadcast(&stage_cond); 1133 global_thread_throughput(t, this_stage); 1134 } 1135// printf("threads_ending %d line:%d\n", threads_ending,__LINE__); 1136 while (threads_ending != num_threads) 1137 pthread_cond_wait(&stage_cond, &stage_mutex); 1138 pthread_mutex_unlock(&stage_mutex); 1139 } 1140 1141 /* someone got restarted, go back to the beginning */ 1142 if (t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) { 1143 iteration++; 1144 goto restart; 1145 } 1146 1147 /* finally, free all the ram */ 1148// printf("finished_opers %p line:%d\n", t->finished_opers,__LINE__); 1149 while (t->finished_opers) { 1150 oper = t->finished_opers; 1151 oper_list_del(oper, &t->finished_opers); 1152 status = finish_oper(t, oper); 1153 } 1154 1155 if (t->num_global_pending) { 1156 fprintf(stderr, "global num pending is %d\n", t->num_global_pending); 1157 } 1158 io_queue_release(t->io_ctx); 1159 1160 return status; 1161} 1162 1163typedef void * (*start_routine)(void *); 1164int run_workers(struct thread_info *t, int num_threads) 1165{ 1166 int ret; 1167 int thread_ret; 1168 int i; 1169 1170// printf("%s num_threads %d line:%d\n", __FUNCTION__,num_threads,__LINE__); 1171 for(i = 0 ; i < num_threads ; i++) { 1172 ret = pthread_create(&t[i].tid, NULL, (start_routine)worker, t + i); 1173 if (ret) { 1174 perror("pthread_create"); 1175 exit(1); 1176 } 1177 } 1178 for(i = 0 ; i < num_threads ; i++) { 1179 ret = pthread_join(t[i].tid, (void *)&thread_ret); 1180 if (ret) { 1181 perror("pthread_join"); 1182 exit(1); 1183 } 1184 } 1185 return 0; 1186} 1187 1188off_t parse_size(char *size_arg, off_t mult) { 1189 char c; 1190 int num; 1191 off_t ret; 1192 c = size_arg[strlen(size_arg) - 1]; 1193 if (c > '9') { 1194 size_arg[strlen(size_arg) - 1] = '\0'; 1195 } 1196 num = atoi(size_arg); 1197 switch(c) { 1198 case 'g': 1199 case 'G': 1200 mult = 1024 * 1024 * 1024; 1201 break; 1202 case 'm': 1203 case 'M': 1204 mult = 1024 * 1024; 1205 break; 1206 case 'k': 1207 case 'K': 1208 mult = 1024; 1209 break; 1210 } 1211 ret = mult * num; 1212 return ret; 1213} 1214 1215void print_usage(void) { 1216 printf("usage: aio-stress [-s size] [-r size] [-a size] [-d num] [-b num]\n"); 1217 printf(" [-i num] [-t num] [-c num] [-C size] [-nxhOS ]\n"); 1218 printf(" file1 [file2 ...]\n"); 1219 printf("\t-a size in KB at which to align buffers\n"); 1220 printf("\t-b max number of iocbs to give io_submit at once\n"); 1221 printf("\t-c number of io contexts per file\n"); 1222 printf("\t-C offset between contexts, default 2MB\n"); 1223 printf("\t-s size in MB of the test file(s), default 1024MB\n"); 1224 printf("\t-r record size in KB used for each io, default 64KB\n"); 1225 printf("\t-d number of pending aio requests for each file, default 64\n"); 1226 printf("\t-i number of ios per file sent before switching\n\t to the next file, default 8\n"); 1227 printf("\t-I total number of ayncs IOs the program will run, default is run until Cntl-C\n"); 1228 printf("\t-O Use O_DIRECT (not available in 2.4 kernels),\n"); 1229 printf("\t-S Use O_SYNC for writes\n"); 1230 printf("\t-o add an operation to the list: write=0, read=1,\n"); 1231 printf("\t random write=2, random read=3.\n"); 1232 printf("\t repeat -o to specify multiple ops: -o 0 -o 1 etc.\n"); 1233 printf("\t-m shm use ipc shared memory for io buffers instead of malloc\n"); 1234 printf("\t-m shmfs mmap a file in /dev/shm for io buffers\n"); 1235 printf("\t-n no fsyncs between write stage and read stage\n"); 1236 printf("\t-l print io_submit latencies after each stage\n"); 1237 printf("\t-t number of threads to run\n"); 1238 printf("\t-v verification of bytes written\n"); 1239 printf("\t-x turn off thread stonewalling\n"); 1240 printf("\t-h this message\n"); 1241 printf("\n\t the size options (-a -s and -r) allow modifiers -s 400{k,m,g}\n"); 1242 printf("\t translate to 400KB, 400MB and 400GB\n"); 1243 printf("version %s\n", PROG_VERSION); 1244} 1245 1246int main(int ac, char **av) 1247{ 1248 int rwfd; 1249 int i; 1250 int j; 1251 int c; 1252 1253 off_t file_size = 1 * 1024 * 1024 * 1024; 1254 int first_stage = WRITE; 1255 struct io_oper *oper; 1256 int status = 0; 1257 int num_files = 0; 1258 int open_fds = 0; 1259 struct thread_info *t; 1260 1261 page_size_mask = getpagesize() - 1; 1262 1263 while(1) { 1264 c = getopt(ac, av, "a:b:c:C:m:s:r:d:i:I:o:t:lnhOSxv"); 1265 if (c < 0) 1266 break; 1267 1268 switch(c) { 1269 case 'a': 1270 page_size_mask = parse_size(optarg, 1024); 1271 page_size_mask--; 1272 break; 1273 case 'c': 1274 num_contexts = atoi(optarg); 1275 break; 1276 case 'C': 1277 context_offset = parse_size(optarg, 1024 * 1024); 1278 case 'b': 1279 max_io_submit = atoi(optarg); 1280 break; 1281 case 's': 1282 file_size = parse_size(optarg, 1024 * 1024); 1283 break; 1284 case 'd': 1285 depth = atoi(optarg); 1286 break; 1287 case 'r': 1288 rec_len = parse_size(optarg, 1024); 1289 break; 1290 case 'i': 1291 io_iter = atoi(optarg); 1292 break; 1293 case 'I': 1294 iterations = atoi(optarg); 1295 break; 1296 case 'n': 1297 fsync_stages = 0; 1298 break; 1299 case 'l': 1300 latency_stats = 1; 1301 break; 1302 case 'm': 1303 if (!strcmp(optarg, "shm")) { 1304 fprintf(stderr, "using ipc shm\n"); 1305 use_shm = USE_SHM; 1306 } else if (!strcmp(optarg, "shmfs")) { 1307 fprintf(stderr, "using /dev/shm for buffers\n"); 1308 use_shm = USE_SHMFS; 1309 } 1310 break; 1311 case 'o': 1312 i = atoi(optarg); 1313 stages |= 1 << i; 1314 fprintf(stderr, "adding stage %s\n", stage_name(i)); 1315 break; 1316 case 'O': 1317 o_direct = O_DIRECT; 1318 break; 1319 case 'S': 1320 o_sync = O_SYNC; 1321 break; 1322 case 't': 1323 num_threads = atoi(optarg); 1324 break; 1325 case 'x': 1326 stonewall = 0; 1327 break; 1328 case 'v': 1329 verify = 1; 1330 break; 1331 case 'h': 1332 default: 1333 print_usage(); 1334 exit(1); 1335 } 1336 } 1337 1338 /* 1339 * make sure we don't try to submit more ios than we have allocated 1340 * memory for 1341 */ 1342 if (depth < io_iter) { 1343 io_iter = depth; 1344 fprintf(stderr, "dropping io_iter to %d\n", io_iter); 1345 } 1346 1347 if (optind >= ac) { 1348 print_usage(); 1349 exit(1); 1350 } 1351 1352 num_files = ac - optind; 1353 1354 if (num_threads > (num_files * num_contexts)) { 1355 num_threads = num_files * num_contexts; 1356 fprintf(stderr, "dropping thread count to the number of contexts %d\n", 1357 num_threads); 1358 } 1359 1360 t = malloc(num_threads * sizeof(*t)); 1361 if (!t) { 1362 perror("malloc"); 1363 exit(1); 1364 } 1365 global_thread_info = t; 1366 1367 /* by default, allow a huge number of iocbs to be sent towards 1368 * io_submit 1369 */ 1370 if (!max_io_submit) 1371 max_io_submit = num_files * io_iter * num_contexts; 1372 1373 /* 1374 * make sure we don't try to submit more ios than max_io_submit allows 1375 */ 1376 if (max_io_submit < io_iter) { 1377 io_iter = max_io_submit; 1378 fprintf(stderr, "dropping io_iter to %d\n", io_iter); 1379 } 1380 1381 if (!stages) { 1382 stages = (1 << WRITE) | (1 << READ) | (1 << RREAD) | (1 << RWRITE); 1383 } else { 1384 for (i = 0 ; i < LAST_STAGE; i++) { 1385 if (stages & (1 << i)) { 1386 first_stage = i; 1387 fprintf(stderr, "starting with %s\n", stage_name(i)); 1388 break; 1389 } 1390 } 1391 } 1392 1393 if (file_size < num_contexts * context_offset) { 1394 fprintf(stderr, "file size %Lu too small for %d contexts\n", 1395 file_size, num_contexts); 1396 exit(1); 1397 } 1398 1399 fprintf(stderr, "file size %LuMB, record size %luKB, depth %d, ios per iteration %d\n", file_size / (1024 * 1024), rec_len / 1024, depth, io_iter); 1400 fprintf(stderr, "max io_submit %d, buffer alignment set to %luKB\n", 1401 max_io_submit, (page_size_mask + 1)/1024); 1402 fprintf(stderr, "threads %d files %d contexts %d context offset %LuMB verification %s\n", 1403 num_threads, num_files, num_contexts, 1404 context_offset / (1024 * 1024), verify ? "on" : "off"); 1405 /* open all the files and do any required setup for them */ 1406 for (i = optind ; i < ac ; i++) { 1407 int thread_index; 1408 for (j = 0 ; j < num_contexts ; j++) { 1409 thread_index = open_fds % num_threads; 1410 open_fds++; 1411 fprintf(stderr, "adding file %s thread %d\n", av[i], thread_index); 1412 1413 rwfd = open(av[i], O_CREAT | O_RDWR | o_direct | o_sync, 0600); 1414 assert(rwfd != -1); 1415 1416 oper = create_oper(rwfd, first_stage, j * context_offset, 1417 file_size - j * context_offset, rec_len, 1418 depth, io_iter, av[i]); 1419 if (!oper) { 1420 fprintf(stderr, "error in create_oper\n"); 1421 exit(-1); 1422 } 1423 oper_list_add(oper, &t[thread_index].active_opers); 1424 t[thread_index].num_files++; 1425 } 1426 } 1427 if (setup_shared_mem(num_threads, num_files * num_contexts, 1428 depth, rec_len, max_io_submit)) 1429 { 1430 exit(1); 1431 } 1432 for (i = 0 ; i < num_threads ; i++) { 1433 if (setup_ious(&t[i], t[i].num_files, depth, rec_len, max_io_submit)) 1434 exit(1); 1435 } 1436 if (num_threads > 1){ 1437 printf("Running multi thread version num_threads:%d\n", num_threads); 1438 run_workers(t, num_threads); 1439 } 1440 else { 1441 printf("Running single thread version \n"); 1442 status = worker(t); 1443 } 1444 1445 if (status) { 1446 printf("non zero return %d \n", status); 1447 exit(1); 1448 } 1449 return status; 1450} 1451 1452