aio-stress.c revision 44e106d9d7f71c1b28aedef05ef90d4f20466f98
1/*
2 * Copyright (c) 2004 SuSE, Inc.  All Rights Reserved.
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of version 2 of the GNU General Public License as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it would be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
11 *
12 * Further, this software is distributed without any warranty that it is
13 * free of the rightful claim of any third person regarding infringement
14 * or the like.  Any license provided herein, whether implied or
15 * otherwise, applies only to this software file.  Patent licenses, if
16 * any, provided herein do not apply to combinations of this program with
17 * other software, or any other product whatsoever.
18 *
19 * You should have received a copy of the GNU General Public License along
20 * with this program; if not, write the Free Software Foundation, Inc., 59
21 * Temple Place - Suite 330, Boston MA 02111-1307, USA.
22 *
23 * Contact information: Silicon Graphics, Inc., 1600 Amphitheatre Pkwy,
24 * Mountain View, CA  94043, or:
25 *
26 *
27 * aio-stress
28 *
29 * will open or create each file on the command line, and start a series
30 * of aio to it.
31 *
32 * aio is done in a rotating loop.  first file1 gets 8 requests, then
33 * file2, then file3 etc.  As each file finishes writing, it is switched
34 * to reads
35 *
36 * io buffers are aligned in case you want to do raw io
37 *
38 * compile with gcc -Wall -laio -lpthread -o aio-stress aio-stress.c
39 *
40 * run aio-stress -h to see the options
41 *
42 * Please mail Chris Mason (mason@suse.com) with bug reports or patches
43 */
44#define _FILE_OFFSET_BITS 64
45#define PROG_VERSION "0.21"
46#define NEW_GETEVENTS
47
48#include <stdio.h>
49#include <errno.h>
50#include <assert.h>
51#include <stdlib.h>
52
53#include <sys/types.h>
54#include <sys/stat.h>
55#include <fcntl.h>
56#include <unistd.h>
57#include <sys/time.h>
58#include <libaio.h>
59#include <sys/ipc.h>
60#include <sys/shm.h>
61#include <sys/mman.h>
62#include <string.h>
63#include <pthread.h>
64
65#define IO_FREE 0
66#define IO_PENDING 1
67#define RUN_FOREVER -1
68
69enum {
70    WRITE,
71    READ,
72    RWRITE,
73    RREAD,
74    LAST_STAGE,
75};
76
77#define USE_MALLOC 0
78#define USE_SHM 1
79#define USE_SHMFS 2
80
81/*
82 * various globals, these are effectively read only by the time the threads
83 * are started
84 */
85long stages = 0;
86unsigned long page_size_mask;
87int o_direct = 0;
88int o_sync = 0;
89int latency_stats = 0;
90int completion_latency_stats = 0;
91int io_iter = 8;
92int iterations = RUN_FOREVER;
93int max_io_submit = 0;
94long rec_len = 64 * 1024;
95int depth = 64;
96int num_threads = 1;
97int num_contexts = 1;
98off_t context_offset = 2 * 1024 * 1024;
99int fsync_stages = 1;
100int use_shm = 0;
101int shm_id;
102char *unaligned_buffer = NULL;
103char *aligned_buffer = NULL;
104int padded_reclen = 0;
105int stonewall = 1;
106int verify = 0;
107char *verify_buf = NULL;
108int unlink_files = 0;
109
110struct io_unit;
111struct thread_info;
112
113/* pthread mutexes and other globals for keeping the threads in sync */
114pthread_cond_t stage_cond = PTHREAD_COND_INITIALIZER;
115pthread_mutex_t stage_mutex = PTHREAD_MUTEX_INITIALIZER;
116int threads_ending = 0;
117int threads_starting = 0;
118struct timeval global_stage_start_time;
119struct thread_info *global_thread_info;
120
121/*
122 * latencies during io_submit are measured, these are the
123 * granularities for deviations
124 */
125#define DEVIATIONS 6
126int deviations[DEVIATIONS] = { 100, 250, 500, 1000, 5000, 10000 };
127struct io_latency {
128    double max;
129    double min;
130    double total_io;
131    double total_lat;
132    double deviations[DEVIATIONS];
133};
134
135/* container for a series of operations to a file */
136struct io_oper {
137    /* already open file descriptor, valid for whatever operation you want */
138    int fd;
139
140    /* starting byte of the operation */
141    off_t start;
142
143    /* ending byte of the operation */
144    off_t end;
145
146    /* size of the read/write buffer */
147    int reclen;
148
149    /* max number of pending requests before a wait is triggered */
150    int depth;
151
152    /* current number of pending requests */
153    int num_pending;
154
155    /* last error, zero if there were none */
156    int last_err;
157
158    /* total number of errors hit. */
159    int num_err;
160
161    /* read,write, random, etc */
162    int rw;
163
164    /* number of I/O that will get sent to aio */
165    int total_ios;
166
167    /* number of I/O we've already sent */
168    int started_ios;
169
170    /* last offset used in an io operation */
171    off_t last_offset;
172
173    /* stonewalled = 1 when we got cut off before submitting all our I/O */
174    int stonewalled;
175
176    /* list management */
177    struct io_oper *next;
178    struct io_oper *prev;
179
180    struct timeval start_time;
181
182    char *file_name;
183};
184
185/* a single io, and all the tracking needed for it */
186struct io_unit {
187    /* note, iocb must go first! */
188    struct iocb iocb;
189
190    /* pointer to parent io operation struct */
191    struct io_oper *io_oper;
192
193    /* aligned buffer */
194    char *buf;
195
196    /* size of the aligned buffer (record size) */
197    int buf_size;
198
199    /* state of this io unit (free, pending, done) */
200    int busy;
201
202    /* result of last operation */
203    long res;
204
205    struct io_unit *next;
206
207    struct timeval io_start_time;		/* time of io_submit */
208};
209
210struct thread_info {
211    io_context_t io_ctx;
212    pthread_t tid;
213
214    /* allocated array of io_unit structs */
215    struct io_unit *ios;
216
217    /* list of io units available for io */
218    struct io_unit *free_ious;
219
220    /* number of io units in the I/O array */
221    int num_global_ios;
222
223    /* number of io units in flight */
224    int num_global_pending;
225
226    /* preallocated array of iocb pointers, only used in run_active */
227    struct iocb **iocbs;
228
229    /* preallocated array of events */
230    struct io_event *events;
231
232    /* size of the events array */
233    int num_global_events;
234
235    /* latency stats for io_submit */
236    struct io_latency io_submit_latency;
237
238    /* list of operations still in progress, and of those finished */
239    struct io_oper *active_opers;
240    struct io_oper *finished_opers;
241
242    /* number of files this thread is doing io on */
243    int num_files;
244
245    /* how much io this thread did in the last stage */
246    double stage_mb_trans;
247
248    /* latency completion stats i/o time from io_submit until io_getevents */
249    struct io_latency io_completion_latency;
250};
251
252/*
253 * return seconds between start_tv and stop_tv in double precision
254 */
255static double time_since(struct timeval *start_tv, struct timeval *stop_tv)
256{
257    double sec, usec;
258    double ret;
259    sec = stop_tv->tv_sec - start_tv->tv_sec;
260    usec = stop_tv->tv_usec - start_tv->tv_usec;
261    if (sec > 0 && usec < 0) {
262        sec--;
263	usec += 1000000;
264    }
265    ret = sec + usec / (double)1000000;
266    if (ret < 0)
267        ret = 0;
268    return ret;
269}
270
271/*
272 * return seconds between start_tv and now in double precision
273 */
274static double time_since_now(struct timeval *start_tv)
275{
276    struct timeval stop_time;
277    gettimeofday(&stop_time, NULL);
278    return time_since(start_tv, &stop_time);
279}
280
281/*
282 * Add latency info to latency struct
283 */
284static void calc_latency(struct timeval *start_tv, struct timeval *stop_tv,
285			struct io_latency *lat)
286{
287    double delta;
288    int i;
289    delta = time_since(start_tv, stop_tv);
290    delta = delta * 1000;
291
292    if (delta > lat->max)
293    	lat->max = delta;
294    if (!lat->min || delta < lat->min)
295    	lat->min = delta;
296    lat->total_io++;
297    lat->total_lat += delta;
298    for (i = 0 ; i < DEVIATIONS ; i++) {
299        if (delta < deviations[i]) {
300	    lat->deviations[i]++;
301	    break;
302	}
303    }
304}
305
306static void oper_list_add(struct io_oper *oper, struct io_oper **list)
307{
308    if (!*list) {
309        *list = oper;
310	oper->prev = oper->next = oper;
311	return;
312    }
313    oper->prev = (*list)->prev;
314    oper->next = *list;
315    (*list)->prev->next = oper;
316    (*list)->prev = oper;
317    return;
318}
319
320static void oper_list_del(struct io_oper *oper, struct io_oper **list)
321{
322    if ((*list)->next == (*list)->prev && *list == (*list)->next) {
323        *list = NULL;
324	return;
325    }
326    oper->prev->next = oper->next;
327    oper->next->prev = oper->prev;
328    if (*list == oper)
329        *list = oper->next;
330}
331
332/* worker func to check error fields in the io unit */
333static int check_finished_io(struct io_unit *io) {
334    int i;
335    if (io->res != io->buf_size) {
336
337  		 struct stat s;
338  		 fstat(io->io_oper->fd, &s);
339
340  		 /*
341  		  * If file size is large enough for the read, then this short
342  		  * read is an error.
343  		  */
344  		 if ((io->io_oper->rw == READ || io->io_oper->rw == RREAD) &&
345  		     s.st_size > (io->iocb.u.c.offset + io->res)) {
346
347  		 		 fprintf(stderr, "io err %lu (%s) op %d, off %Lu size %d\n",
348  		 		 		 io->res, strerror(-io->res), io->iocb.aio_lio_opcode,
349  		 		 		 io->iocb.u.c.offset, io->buf_size);
350  		 		 io->io_oper->last_err = io->res;
351  		 		 io->io_oper->num_err++;
352  		 		 return -1;
353  		 }
354    }
355    if (verify && io->io_oper->rw == READ) {
356        if (memcmp(io->buf, verify_buf, io->io_oper->reclen)) {
357	    fprintf(stderr, "verify error, file %s offset %Lu contents (offset:bad:good):\n",
358	            io->io_oper->file_name, io->iocb.u.c.offset);
359
360	    for (i = 0 ; i < io->io_oper->reclen ; i++) {
361	        if (io->buf[i] != verify_buf[i]) {
362		    fprintf(stderr, "%d:%c:%c ", i, io->buf[i], verify_buf[i]);
363		}
364	    }
365	    fprintf(stderr, "\n");
366	}
367
368    }
369  return 0;
370}
371
372/* worker func to check the busy bits and get an io unit ready for use */
373static int grab_iou(struct io_unit *io, struct io_oper *oper) {
374    if (io->busy == IO_PENDING)
375        return -1;
376
377    io->busy = IO_PENDING;
378    io->res = 0;
379    io->io_oper = oper;
380  return 0;
381}
382
383char *stage_name(int rw) {
384    switch(rw) {
385    case WRITE:
386        return "write";
387    case READ:
388        return "read";
389    case RWRITE:
390        return "random write";
391    case RREAD:
392        return "random read";
393    }
394    return "unknown";
395}
396
397static inline double oper_mb_trans(struct io_oper *oper) {
398    return ((double)oper->started_ios * (double)oper->reclen) /
399                (double)(1024 * 1024);
400}
401
402static void print_time(struct io_oper *oper) {
403    double runtime;
404    double tput;
405    double mb;
406
407    runtime = time_since_now(&oper->start_time);
408    mb = oper_mb_trans(oper);
409    tput = mb / runtime;
410    fprintf(stderr, "%s on %s (%.2f MB/s) %.2f MB in %.2fs\n",
411	    stage_name(oper->rw), oper->file_name, tput, mb, runtime);
412}
413
414static void print_lat(char *str, struct io_latency *lat) {
415    double avg = lat->total_lat / lat->total_io;
416    int i;
417    double total_counted = 0;
418    fprintf(stderr, "%s min %.2f avg %.2f max %.2f\n\t",
419            str, lat->min, avg, lat->max);
420
421    for (i = 0 ; i < DEVIATIONS ; i++) {
422	fprintf(stderr, " %.0f < %d", lat->deviations[i], deviations[i]);
423	total_counted += lat->deviations[i];
424    }
425    if (total_counted && lat->total_io - total_counted)
426        fprintf(stderr, " < %.0f", lat->total_io - total_counted);
427    fprintf(stderr, "\n");
428    memset(lat, 0, sizeof(*lat));
429}
430
431static void print_latency(struct thread_info *t)
432{
433    struct io_latency *lat = &t->io_submit_latency;
434    print_lat("latency", lat);
435}
436
437static void print_completion_latency(struct thread_info *t)
438{
439    struct io_latency *lat = &t->io_completion_latency;
440    print_lat("completion latency", lat);
441}
442
443/*
444 * updates the fields in the io operation struct that belongs to this
445 * io unit, and make the io unit reusable again
446 */
447void finish_io(struct thread_info *t, struct io_unit *io, long result,
448		struct timeval *tv_now) {
449    struct io_oper *oper = io->io_oper;
450
451    calc_latency(&io->io_start_time, tv_now, &t->io_completion_latency);
452    io->res = result;
453    io->busy = IO_FREE;
454    io->next = t->free_ious;
455    t->free_ious = io;
456    oper->num_pending--;
457    t->num_global_pending--;
458    check_finished_io(io);
459    if (oper->num_pending == 0 &&
460       (oper->started_ios == oper->total_ios || oper->stonewalled))
461    {
462        print_time(oper);
463    }
464}
465
466int read_some_events(struct thread_info *t) {
467    struct io_unit *event_io;
468    struct io_event *event;
469    int nr;
470    int i;
471    int min_nr = io_iter;
472    struct timeval stop_time;
473
474    if (t->num_global_pending < io_iter)
475        min_nr = t->num_global_pending;
476
477#ifdef NEW_GETEVENTS
478    nr = io_getevents(t->io_ctx, min_nr, t->num_global_events, t->events,NULL);
479#else
480    nr = io_getevents(t->io_ctx, t->num_global_events, t->events, NULL);
481#endif
482    if (nr <= 0)
483        return nr;
484
485    gettimeofday(&stop_time, NULL);
486    for (i = 0 ; i < nr ; i++) {
487	event = t->events + i;
488	event_io = (struct io_unit *)((unsigned long)event->obj);
489	finish_io(t, event_io, event->res, &stop_time);
490    }
491    return nr;
492}
493
494/*
495 * finds a free io unit, waiting for pending requests if required.  returns
496 * null if none could be found
497 */
498static struct io_unit *find_iou(struct thread_info *t, struct io_oper *oper)
499{
500    struct io_unit *event_io;
501    int nr;
502
503retry:
504    if (t->free_ious) {
505        event_io = t->free_ious;
506	t->free_ious = t->free_ious->next;
507	if (grab_iou(event_io, oper)) {
508	    fprintf(stderr, "io unit on free list but not free\n");
509	    abort();
510	}
511	return event_io;
512    }
513    nr = read_some_events(t);
514    if (nr > 0)
515    	goto retry;
516    else
517    	fprintf(stderr, "no free ious after read_some_events\n");
518    return NULL;
519}
520
521/*
522 * wait for all pending requests for this io operation to finish
523 */
524static int io_oper_wait(struct thread_info *t, struct io_oper *oper) {
525    struct io_event event;
526    struct io_unit *event_io;
527
528    if (oper == NULL) {
529      return 0;
530    }
531
532    if (oper->num_pending == 0)
533        goto done;
534
535    /* this func is not speed sensitive, no need to go wild reading
536     * more than one event at a time
537     */
538#ifdef NEW_GETEVENTS
539    while (io_getevents(t->io_ctx, 1, 1, &event, NULL) > 0) {
540#else
541    while (io_getevents(t->io_ctx, 1, &event, NULL) > 0) {
542#endif
543	struct timeval tv_now;
544        event_io = (struct io_unit *)((unsigned long)event.obj);
545
546	gettimeofday(&tv_now, NULL);
547	finish_io(t, event_io, event.res, &tv_now);
548
549	if (oper->num_pending == 0)
550	    break;
551    }
552done:
553    if (oper->num_err) {
554        fprintf(stderr, "%u errors on oper, last %u\n",
555	        oper->num_err, oper->last_err);
556    }
557  return 0;
558}
559
560off_t random_byte_offset(struct io_oper *oper) {
561    off_t num;
562    off_t rand_byte = oper->start;
563    off_t range;
564    off_t offset = 1;
565
566    range = (oper->end - oper->start) / (1024 * 1024);
567    if ((page_size_mask+1) > (1024 * 1024))
568        offset = (page_size_mask+1) / (1024 * 1024);
569    if (range < offset)
570        range = 0;
571    else
572        range -= offset;
573
574    /* find a random mb offset */
575    num = 1 + (int)((double)range * rand() / (RAND_MAX + 1.0 ));
576    rand_byte += num * 1024 * 1024;
577
578    /* find a random byte offset */
579    num = 1 + (int)((double)(1024 * 1024) * rand() / (RAND_MAX + 1.0));
580
581    /* page align */
582    num = (num + page_size_mask) & ~page_size_mask;
583    rand_byte += num;
584
585    if (rand_byte + oper->reclen > oper->end) {
586	rand_byte -= oper->reclen;
587    }
588    return rand_byte;
589}
590
591/*
592 * build an aio iocb for an operation, based on oper->rw and the
593 * last offset used.  This finds the struct io_unit that will be attached
594 * to the iocb, and things are ready for submission to aio after this
595 * is called.
596 *
597 * returns null on error
598 */
599static struct io_unit *build_iocb(struct thread_info *t, struct io_oper *oper)
600{
601    struct io_unit *io;
602    off_t rand_byte;
603
604    io = find_iou(t, oper);
605    if (!io) {
606        fprintf(stderr, "unable to find io unit\n");
607	return NULL;
608    }
609
610    switch(oper->rw) {
611    case WRITE:
612        io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen,
613	               oper->last_offset);
614	oper->last_offset += oper->reclen;
615	break;
616    case READ:
617        io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen,
618	              oper->last_offset);
619	oper->last_offset += oper->reclen;
620	break;
621    case RREAD:
622	rand_byte = random_byte_offset(oper);
623	oper->last_offset = rand_byte;
624        io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen,
625	              rand_byte);
626        break;
627    case RWRITE:
628	rand_byte = random_byte_offset(oper);
629	oper->last_offset = rand_byte;
630        io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen,
631	              rand_byte);
632
633        break;
634    }
635
636    return io;
637}
638
639/*
640 * wait for any pending requests, and then free all ram associated with
641 * an operation.  returns the last error the operation hit (zero means none)
642 */
643static int
644finish_oper(struct thread_info *t, struct io_oper *oper)
645{
646    unsigned long last_err;
647
648    io_oper_wait(t, oper);
649    last_err = oper->last_err;
650    if (oper->num_pending > 0) {
651        fprintf(stderr, "oper num_pending is %d\n", oper->num_pending);
652    }
653    close(oper->fd);
654    free(oper);
655    return last_err;
656}
657
658/*
659 * allocates an io operation and fills in all the fields.  returns
660 * null on error
661 */
662static struct io_oper *
663create_oper(int fd, int rw, off_t start, off_t end, int reclen, int depth,
664            int iter, char *file_name)
665{
666    struct io_oper *oper;
667
668    oper = malloc (sizeof(*oper));
669    if (!oper) {
670	fprintf(stderr, "unable to allocate io oper\n");
671	return NULL;
672    }
673    memset(oper, 0, sizeof(*oper));
674
675    oper->depth = depth;
676    oper->start = start;
677    oper->end = end;
678    oper->last_offset = oper->start;
679    oper->fd = fd;
680    oper->reclen = reclen;
681    oper->rw = rw;
682    oper->total_ios = (oper->end - oper->start) / oper->reclen;
683    oper->file_name = file_name;
684
685    return oper;
686}
687
688/*
689 * does setup on num_ios worth of iocbs, but does not actually
690 * start any io
691 */
692int build_oper(struct thread_info *t, struct io_oper *oper, int num_ios,
693               struct iocb **my_iocbs)
694{
695    int i;
696    struct io_unit *io;
697
698    if (oper->started_ios == 0)
699	gettimeofday(&oper->start_time, NULL);
700
701    if (num_ios == 0)
702        num_ios = oper->total_ios;
703
704    if ((oper->started_ios + num_ios) > oper->total_ios)
705        num_ios = oper->total_ios - oper->started_ios;
706
707    for (i = 0 ; i < num_ios ; i++) {
708	io = build_iocb(t, oper);
709	if (!io) {
710	    return -1;
711	}
712	my_iocbs[i] = &io->iocb;
713    }
714    return num_ios;
715}
716
717/*
718 * runs through the iocbs in the array provided and updates
719 * counters in the associated oper struct
720 */
721static void update_iou_counters(struct iocb **my_iocbs, int nr,
722	struct timeval *tv_now)
723{
724    struct io_unit *io;
725    int i;
726    for (i = 0 ; i < nr ; i++) {
727	io = (struct io_unit *)(my_iocbs[i]);
728	io->io_oper->num_pending++;
729	io->io_oper->started_ios++;
730	io->io_start_time = *tv_now;	/* set time of io_submit */
731    }
732}
733
734/* starts some io for a given file, returns zero if all went well */
735int run_built(struct thread_info *t, int num_ios, struct iocb **my_iocbs)
736{
737    int ret;
738    struct timeval start_time;
739    struct timeval stop_time;
740
741resubmit:
742    gettimeofday(&start_time, NULL);
743    ret = io_submit(t->io_ctx, num_ios, my_iocbs);
744    gettimeofday(&stop_time, NULL);
745    calc_latency(&start_time, &stop_time, &t->io_submit_latency);
746
747    if (ret != num_ios) {
748	/* some I/O got through */
749	if (ret > 0) {
750	    update_iou_counters(my_iocbs, ret, &stop_time);
751	    my_iocbs += ret;
752	    t->num_global_pending += ret;
753	    num_ios -= ret;
754	}
755	/*
756	 * we've used all the requests allocated in aio_init, wait and
757	 * retry
758	 */
759	if (ret > 0 || ret == -EAGAIN) {
760	    int old_ret = ret;
761	    if ((ret = read_some_events(t) > 0)) {
762		goto resubmit;
763	    } else {
764	    	fprintf(stderr, "ret was %d and now is %d\n", ret, old_ret);
765		abort();
766	    }
767	}
768
769	fprintf(stderr, "ret %d (%s) on io_submit\n", ret, strerror(-ret));
770	return -1;
771    }
772    update_iou_counters(my_iocbs, ret, &stop_time);
773    t->num_global_pending += ret;
774  return 0;
775}
776
777/*
778 * changes oper->rw to the next in a command sequence, or returns zero
779 * to say this operation is really, completely done for
780 */
781static int restart_oper(struct io_oper *oper) {
782    int new_rw  = 0;
783    if (oper->last_err)
784      return 0;
785
786    /* this switch falls through */
787    switch(oper->rw) {
788    case WRITE:
789	if (stages & (1 << READ))
790	    new_rw = READ;
791    case READ:
792	if (!new_rw && stages & (1 << RWRITE))
793	    new_rw = RWRITE;
794    case RWRITE:
795	if (!new_rw && stages & (1 << RREAD))
796	    new_rw = RREAD;
797    }
798
799    if (new_rw) {
800	oper->started_ios = 0;
801	oper->last_offset = oper->start;
802	oper->stonewalled = 0;
803
804	/*
805	 * we're restarting an operation with pending requests, so the
806	 * timing info won't be printed by finish_io.  Printing it here
807	 */
808	if (oper->num_pending)
809	    print_time(oper);
810
811	oper->rw = new_rw;
812	return 1;
813    }
814  return 0;
815}
816
817static int oper_runnable(struct io_oper *oper) {
818    struct stat buf;
819    int ret;
820
821    /* first context is always runnable, if started_ios > 0, no need to
822     * redo the calculations
823     */
824    if (oper->started_ios || oper->start == 0)
825        return 1;
826    /*
827     * only the sequential phases force delays in starting */
828    if (oper->rw >= RWRITE)
829        return 1;
830    ret = fstat(oper->fd, &buf);
831    if (ret < 0) {
832        perror("fstat");
833	exit(1);
834    }
835    if (S_ISREG(buf.st_mode) && buf.st_size < oper->start)
836      return 0;
837    return 1;
838}
839
840/*
841 * runs through all the io operations on the active list, and starts
842 * a chunk of io on each.  If any io operations are completely finished,
843 * it either switches them to the next stage or puts them on the
844 * finished list.
845 *
846 * this function stops after max_io_submit iocbs are sent down the
847 * pipe, even if it has not yet touched all the operations on the
848 * active list.  Any operations that have finished are moved onto
849 * the finished_opers list.
850 */
851static int run_active_list(struct thread_info *t,
852			 int io_iter,
853			 int max_io_submit)
854{
855    struct io_oper *oper;
856    struct io_oper *built_opers = NULL;
857    struct iocb **my_iocbs = t->iocbs;
858    int ret = 0;
859    int num_built = 0;
860
861    oper = t->active_opers;
862    while (oper) {
863	if (!oper_runnable(oper)) {
864	    oper = oper->next;
865	    if (oper == t->active_opers)
866	        break;
867	    continue;
868	}
869	ret = build_oper(t, oper, io_iter, my_iocbs);
870	if (ret >= 0) {
871	    my_iocbs += ret;
872	    num_built += ret;
873	    oper_list_del(oper, &t->active_opers);
874	    oper_list_add(oper, &built_opers);
875	    oper = t->active_opers;
876	    if (num_built + io_iter > max_io_submit)
877	        break;
878	} else
879	    break;
880    }
881    if (num_built) {
882	ret = run_built(t, num_built, t->iocbs);
883	if (ret < 0) {
884	    fprintf(stderr, "error %d on run_built\n", ret);
885	    exit(1);
886	}
887	while (built_opers) {
888	    oper = built_opers;
889	    oper_list_del(oper, &built_opers);
890	    oper_list_add(oper, &t->active_opers);
891	    if (oper->started_ios == oper->total_ios) {
892		oper_list_del(oper, &t->active_opers);
893		oper_list_add(oper, &t->finished_opers);
894	    }
895	}
896    }
897  return 0;
898}
899
900void drop_shm() {
901    int ret;
902    struct shmid_ds ds;
903    if (use_shm != USE_SHM)
904        return;
905
906    ret = shmctl(shm_id, IPC_RMID, &ds);
907    if (ret) {
908        perror("shmctl IPC_RMID");
909    }
910}
911
912void aio_setup(io_context_t *io_ctx, int n)
913{
914    int res = io_queue_init(n, io_ctx);
915    if (res != 0) {
916	fprintf(stderr, "io_queue_setup(%d) returned %d (%s)\n",
917		n, res, strerror(-res));
918	exit(3);
919    }
920}
921
922/*
923 * allocate io operation and event arrays for a given thread
924 */
925int setup_ious(struct thread_info *t,
926              int num_files, int depth,
927	      int reclen, int max_io_submit) {
928    int i;
929    size_t bytes = num_files * depth * sizeof(*t->ios);
930
931    t->ios = malloc(bytes);
932    if (!t->ios) {
933	fprintf(stderr, "unable to allocate io units\n");
934	return -1;
935    }
936    memset(t->ios, 0, bytes);
937
938    for (i = 0 ; i < depth * num_files; i++) {
939	t->ios[i].buf = aligned_buffer;
940	aligned_buffer += padded_reclen;
941	t->ios[i].buf_size = reclen;
942	if (verify)
943	    memset(t->ios[i].buf, 'b', reclen);
944	else
945	    memset(t->ios[i].buf, 0, reclen);
946	t->ios[i].next = t->free_ious;
947	t->free_ious = t->ios + i;
948    }
949    if (verify) {
950        verify_buf = aligned_buffer;
951        memset(verify_buf, 'b', reclen);
952    }
953
954    t->iocbs = malloc(sizeof(struct iocb *) * max_io_submit);
955    if (!t->iocbs) {
956        fprintf(stderr, "unable to allocate iocbs\n");
957	goto free_buffers;
958    }
959
960    memset(t->iocbs, 0, max_io_submit * sizeof(struct iocb *));
961
962    t->events = malloc(sizeof(struct io_event) * depth * num_files);
963    if (!t->events) {
964        fprintf(stderr, "unable to allocate ram for events\n");
965	goto free_buffers;
966    }
967    memset(t->events, 0, num_files * sizeof(struct io_event)*depth);
968
969    t->num_global_ios = num_files * depth;
970    t->num_global_events = t->num_global_ios;
971  return 0;
972
973free_buffers:
974    if (t->ios)
975        free(t->ios);
976    if (t->iocbs)
977        free(t->iocbs);
978    if (t->events)
979        free(t->events);
980    return -1;
981}
982
983/*
984 * The buffers used for file data are allocated as a single big
985 * malloc, and then each thread and operation takes a piece and uses
986 * that for file data.  This lets us do a large shm or bigpages alloc
987 * and without trying to find a special place in each thread to map the
988 * buffers to
989 */
990int setup_shared_mem(int num_threads, int num_files, int depth,
991                     int reclen, int max_io_submit)
992{
993    char *p = NULL;
994    size_t total_ram;
995
996    padded_reclen = (reclen + page_size_mask) / (page_size_mask+1);
997    padded_reclen = padded_reclen * (page_size_mask+1);
998    total_ram = num_files * depth * padded_reclen + num_threads;
999    if (verify)
1000    	total_ram += padded_reclen;
1001
1002    if (use_shm == USE_MALLOC) {
1003	p = malloc(total_ram + page_size_mask);
1004    } else if (use_shm == USE_SHM) {
1005        shm_id = shmget(IPC_PRIVATE, total_ram, IPC_CREAT | 0700);
1006	if (shm_id < 0) {
1007	    perror("shmget");
1008	    drop_shm();
1009	    goto free_buffers;
1010	}
1011	p = shmat(shm_id, (char *)0x50000000, 0);
1012        if ((long)p == -1) {
1013	    perror("shmat");
1014	    goto free_buffers;
1015	}
1016	/* won't really be dropped until we shmdt */
1017	drop_shm();
1018    } else if (use_shm == USE_SHMFS) {
1019        char mmap_name[16]; /* /dev/shm/ + null + XXXXXX */
1020	int fd;
1021
1022	strcpy(mmap_name, "/dev/shm/XXXXXX");
1023	fd = mkstemp(mmap_name);
1024        if (fd < 0) {
1025	    perror("mkstemp");
1026	    goto free_buffers;
1027	}
1028	unlink(mmap_name);
1029	ftruncate(fd, total_ram);
1030	shm_id = fd;
1031	p = mmap((char *)0x50000000, total_ram,
1032	         PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1033
1034        if (p == MAP_FAILED) {
1035	    perror("mmap");
1036	    goto free_buffers;
1037	}
1038    }
1039    if (!p) {
1040        fprintf(stderr, "unable to allocate buffers\n");
1041	goto free_buffers;
1042    }
1043    unaligned_buffer = p;
1044    p = (char*)((intptr_t) (p + page_size_mask) & ~page_size_mask);
1045    aligned_buffer = p;
1046  return 0;
1047
1048free_buffers:
1049    drop_shm();
1050    if (unaligned_buffer)
1051        free(unaligned_buffer);
1052    return -1;
1053}
1054
1055/*
1056 * runs through all the thread_info structs and calculates a combined
1057 * throughput
1058 */
1059void global_thread_throughput(struct thread_info *t, char *this_stage) {
1060    int i;
1061    double runtime = time_since_now(&global_stage_start_time);
1062    double total_mb = 0;
1063    double min_trans = 0;
1064
1065    for (i = 0 ; i < num_threads ; i++) {
1066        total_mb += global_thread_info[i].stage_mb_trans;
1067	if (!min_trans || t->stage_mb_trans < min_trans)
1068	    min_trans = t->stage_mb_trans;
1069    }
1070    if (total_mb) {
1071	fprintf(stderr, "%s throughput (%.2f MB/s) ", this_stage,
1072	        total_mb / runtime);
1073	fprintf(stderr, "%.2f MB in %.2fs", total_mb, runtime);
1074        if (stonewall)
1075	    fprintf(stderr, " min transfer %.2fMB", min_trans);
1076        fprintf(stderr, "\n");
1077    }
1078}
1079
1080/* this is the meat of the state machine.  There is a list of
1081 * active operations structs, and as each one finishes the required
1082 * io it is moved to a list of finished operations.  Once they have
1083 * all finished whatever stage they were in, they are given the chance
1084 * to restart and pick a different stage (read/write/random read etc)
1085 *
1086 * various timings are printed in between the stages, along with
1087 * thread synchronization if there are more than one threads.
1088 */
1089int worker(struct thread_info *t)
1090{
1091    struct io_oper *oper;
1092    char *this_stage = NULL;
1093    struct timeval stage_time;
1094    int status = 0;
1095    int iteration = 0;
1096    int cnt;
1097
1098    aio_setup(&t->io_ctx, 512);
1099
1100restart:
1101    if (num_threads > 1) {
1102        pthread_mutex_lock(&stage_mutex);
1103	threads_starting++;
1104	if (threads_starting == num_threads) {
1105	    threads_ending = 0;
1106	    gettimeofday(&global_stage_start_time, NULL);
1107	    pthread_cond_broadcast(&stage_cond);
1108	}
1109	while (threads_starting != num_threads)
1110	    pthread_cond_wait(&stage_cond, &stage_mutex);
1111        pthread_mutex_unlock(&stage_mutex);
1112    }
1113    if (t->active_opers) {
1114        this_stage = stage_name(t->active_opers->rw);
1115	gettimeofday(&stage_time, NULL);
1116	t->stage_mb_trans = 0;
1117    }
1118
1119    cnt = 0;
1120    /* first we send everything through aio */
1121    while (t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) {
1122	if (stonewall && threads_ending) {
1123	    oper = t->active_opers;
1124	    oper->stonewalled = 1;
1125	    oper_list_del(oper, &t->active_opers);
1126	    oper_list_add(oper, &t->finished_opers);
1127	} else {
1128	    run_active_list(t, io_iter,  max_io_submit);
1129        }
1130	cnt++;
1131    }
1132    if (latency_stats)
1133        print_latency(t);
1134
1135    if (completion_latency_stats)
1136	print_completion_latency(t);
1137
1138    /* then we wait for all the operations to finish */
1139    oper = t->finished_opers;
1140    do {
1141	if (!oper)
1142		break;
1143	io_oper_wait(t, oper);
1144	oper = oper->next;
1145    } while (oper != t->finished_opers);
1146
1147    /* then we do an fsync to get the timing for any future operations
1148     * right, and check to see if any of these need to get restarted
1149     */
1150    oper = t->finished_opers;
1151    while (oper) {
1152	if (fsync_stages)
1153            fsync(oper->fd);
1154	t->stage_mb_trans += oper_mb_trans(oper);
1155	if (restart_oper(oper)) {
1156	    oper_list_del(oper, &t->finished_opers);
1157	    oper_list_add(oper, &t->active_opers);
1158	    oper = t->finished_opers;
1159	    continue;
1160	}
1161	oper = oper->next;
1162	if (oper == t->finished_opers)
1163	    break;
1164    }
1165
1166    if (t->stage_mb_trans && t->num_files > 0) {
1167        double seconds = time_since_now(&stage_time);
1168	fprintf(stderr, "thread %td %s totals (%.2f MB/s) %.2f MB in %.2fs\n",
1169	        t - global_thread_info, this_stage, t->stage_mb_trans/seconds,
1170		t->stage_mb_trans, seconds);
1171    }
1172
1173    if (num_threads > 1) {
1174	pthread_mutex_lock(&stage_mutex);
1175	threads_ending++;
1176	if (threads_ending == num_threads) {
1177	    threads_starting = 0;
1178	    pthread_cond_broadcast(&stage_cond);
1179	    global_thread_throughput(t, this_stage);
1180	}
1181	while (threads_ending != num_threads)
1182	    pthread_cond_wait(&stage_cond, &stage_mutex);
1183	pthread_mutex_unlock(&stage_mutex);
1184    }
1185
1186    /* someone got restarted, go back to the beginning */
1187    if (t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) {
1188	iteration++;
1189        goto restart;
1190    }
1191
1192    /* finally, free all the ram */
1193    while (t->finished_opers) {
1194	oper = t->finished_opers;
1195	oper_list_del(oper, &t->finished_opers);
1196	status = finish_oper(t, oper);
1197    }
1198
1199    if (t->num_global_pending) {
1200        fprintf(stderr, "global num pending is %d\n", t->num_global_pending);
1201    }
1202    io_queue_release(t->io_ctx);
1203
1204    return status;
1205}
1206
1207typedef void * (*start_routine)(void *);
1208int run_workers(struct thread_info *t, int num_threads)
1209{
1210    int ret;
1211    int thread_ret;
1212    int i;
1213
1214    for (i = 0 ; i < num_threads ; i++) {
1215        ret = pthread_create(&t[i].tid, NULL, (start_routine)worker, t + i);
1216	if (ret) {
1217	    perror("pthread_create");
1218	    exit(1);
1219	}
1220    }
1221    for (i = 0 ; i < num_threads ; i++) {
1222        ret = pthread_join(t[i].tid, (void *)&thread_ret);
1223        if (ret) {
1224	    perror("pthread_join");
1225	    exit(1);
1226	}
1227    }
1228  return 0;
1229}
1230
1231off_t parse_size(char *size_arg, off_t mult) {
1232    char c;
1233    int num;
1234    off_t ret;
1235    c = size_arg[strlen(size_arg) - 1];
1236    if (c > '9') {
1237        size_arg[strlen(size_arg) - 1] = '\0';
1238    }
1239    num = atoi(size_arg);
1240    switch(c) {
1241    case 'g':
1242    case 'G':
1243        mult = 1024 * 1024 * 1024;
1244	break;
1245    case 'm':
1246    case 'M':
1247        mult = 1024 * 1024;
1248	break;
1249    case 'k':
1250    case 'K':
1251        mult = 1024;
1252	break;
1253    case 'b':
1254    case 'B':
1255        mult = 1;
1256	break;
1257    }
1258    ret = mult * num;
1259    return ret;
1260}
1261
1262void print_usage(void) {
1263    printf("usage: aio-stress [-s size] [-r size] [-a size] [-d num] [-b num]\n");
1264    printf("                  [-i num] [-t num] [-c num] [-C size] [-nxhOS ]\n");
1265    printf("                  file1 [file2 ...]\n");
1266    printf("\t-a size in KB at which to align buffers\n");
1267    printf("\t-b max number of iocbs to give io_submit at once\n");
1268    printf("\t-c number of io contexts per file\n");
1269    printf("\t-C offset between contexts, default 2MB\n");
1270    printf("\t-s size in MB of the test file(s), default 1024MB\n");
1271    printf("\t-r record size in KB used for each io, default 64KB\n");
1272    printf("\t-d number of pending aio requests for each file, default 64\n");
1273    printf("\t-i number of I/O per file sent before switching\n"
1274	    "\t   to the next file, default 8\n");
1275    printf("\t-I total number of ayncs I/O the program will run, "
1276	    "default is run until Cntl-C\n");
1277    printf("\t-O Use O_DIRECT (not available in 2.4 kernels),\n");
1278    printf("\t-S Use O_SYNC for writes\n");
1279    printf("\t-o add an operation to the list: write=0, read=1,\n");
1280    printf("\t   random write=2, random read=3.\n");
1281    printf("\t   repeat -o to specify multiple ops: -o 0 -o 1 etc.\n");
1282    printf("\t-m shm use ipc shared memory for io buffers instead of malloc\n");
1283    printf("\t-m shmfs mmap a file in /dev/shm for io buffers\n");
1284    printf("\t-n no fsyncs between write stage and read stage\n");
1285    printf("\t-l print io_submit latencies after each stage\n");
1286    printf("\t-L print io completion latencies after each stage\n");
1287    printf("\t-t number of threads to run\n");
1288    printf("\t-u unlink files after completion\n");
1289    printf("\t-v verification of bytes written\n");
1290    printf("\t-x turn off thread stonewalling\n");
1291    printf("\t-h this message\n");
1292    printf("\n\t   the size options (-a -s and -r) allow modifiers -s 400{k,m,g}\n");
1293    printf("\t   translate to 400KB, 400MB and 400GB\n");
1294    printf("version %s\n", PROG_VERSION);
1295}
1296
1297int main(int ac, char **av)
1298{
1299    int rwfd;
1300    int i;
1301    int j;
1302    int c;
1303
1304    off_t file_size = 1 * 1024 * 1024 * 1024;
1305    int first_stage = WRITE;
1306    struct io_oper *oper;
1307    int status = 0;
1308    int num_files = 0;
1309    int open_fds = 0;
1310    struct thread_info *t;
1311
1312    page_size_mask = getpagesize() - 1;
1313
1314    while (1) {
1315	c = getopt(ac, av, "a:b:c:C:m:s:r:d:i:I:o:t:lLnhOSxvu");
1316	if  (c < 0)
1317	    break;
1318
1319        switch(c) {
1320	case 'a':
1321	    page_size_mask = parse_size(optarg, 1024);
1322	    page_size_mask--;
1323	    break;
1324	case 'c':
1325	    num_contexts = atoi(optarg);
1326	    break;
1327	case 'C':
1328	    context_offset = parse_size(optarg, 1024 * 1024);
1329	case 'b':
1330	    max_io_submit = atoi(optarg);
1331	    break;
1332	case 's':
1333	    file_size = parse_size(optarg, 1024 * 1024);
1334	    break;
1335	case 'd':
1336	    depth = atoi(optarg);
1337	    break;
1338	case 'r':
1339	    rec_len = parse_size(optarg, 1024);
1340	    break;
1341	case 'i':
1342	    io_iter = atoi(optarg);
1343	    break;
1344        case 'I':
1345          iterations = atoi(optarg);
1346        break;
1347	case 'n':
1348	    fsync_stages = 0;
1349	    break;
1350	case 'l':
1351	    latency_stats = 1;
1352	    break;
1353	case 'L':
1354	    completion_latency_stats = 1;
1355	    break;
1356	case 'm':
1357	    if (!strcmp(optarg, "shm")) {
1358		fprintf(stderr, "using ipc shm\n");
1359	        use_shm = USE_SHM;
1360	    } else if (!strcmp(optarg, "shmfs")) {
1361	        fprintf(stderr, "using /dev/shm for buffers\n");
1362		use_shm = USE_SHMFS;
1363	    }
1364	    break;
1365	case 'o':
1366	    i = atoi(optarg);
1367	    stages |= 1 << i;
1368	    fprintf(stderr, "adding stage %s\n", stage_name(i));
1369	    break;
1370	case 'O':
1371	    o_direct = O_DIRECT;
1372	    break;
1373	case 'S':
1374	    o_sync = O_SYNC;
1375	    break;
1376	case 't':
1377	    num_threads = atoi(optarg);
1378	    break;
1379	case 'x':
1380	    stonewall = 0;
1381	    break;
1382	case 'u':
1383	    unlink_files = 1;
1384	    break;
1385	case 'v':
1386	    verify = 1;
1387	    break;
1388	case 'h':
1389	default:
1390	    print_usage();
1391	    exit(1);
1392	}
1393    }
1394
1395    /*
1396     * make sure we don't try to submit more I/O than we have allocated
1397     * memory for
1398     */
1399    if (depth < io_iter) {
1400	io_iter = depth;
1401        fprintf(stderr, "dropping io_iter to %d\n", io_iter);
1402    }
1403
1404    if (optind >= ac) {
1405	print_usage();
1406	exit(1);
1407    }
1408
1409    num_files = ac - optind;
1410
1411    if (num_threads > (num_files * num_contexts)) {
1412        num_threads = num_files * num_contexts;
1413	fprintf(stderr, "dropping thread count to the number of contexts %d\n",
1414	        num_threads);
1415    }
1416
1417    t = malloc(num_threads * sizeof(*t));
1418    if (!t) {
1419        perror("malloc");
1420	exit(1);
1421    }
1422    global_thread_info = t;
1423
1424    /* by default, allow a huge number of iocbs to be sent towards
1425     * io_submit
1426     */
1427    if (!max_io_submit)
1428        max_io_submit = num_files * io_iter * num_contexts;
1429
1430    /*
1431     * make sure we don't try to submit more I/O than max_io_submit allows
1432     */
1433    if (max_io_submit < io_iter) {
1434        io_iter = max_io_submit;
1435	fprintf(stderr, "dropping io_iter to %d\n", io_iter);
1436    }
1437
1438    if (!stages) {
1439        stages = (1 << WRITE) | (1 << READ) | (1 << RREAD) | (1 << RWRITE);
1440    } else {
1441        for (i = 0 ; i < LAST_STAGE; i++) {
1442	    if (stages & (1 << i)) {
1443	        first_stage = i;
1444		fprintf(stderr, "starting with %s\n", stage_name(i));
1445		break;
1446	    }
1447	}
1448    }
1449
1450    if (file_size < num_contexts * context_offset) {
1451        fprintf(stderr, "file size %ld too small for %d contexts\n",
1452	        (long)file_size, num_contexts);
1453	exit(1);
1454    }
1455
1456    fprintf(stderr, "file size %ldMB, record size %ldKB, depth %d, "
1457	    "I/O per iteration %d\n",
1458	    (long)(file_size / (1024 * 1024)),
1459	    rec_len / 1024, depth, io_iter);
1460    fprintf(stderr, "max io_submit %d, buffer alignment set to %luKB\n",
1461            max_io_submit, (page_size_mask + 1)/1024);
1462    fprintf(stderr, "threads %d files %d contexts %d context offset %ldMB "
1463	    "verification %s\n", num_threads, num_files, num_contexts,
1464	    (long)(context_offset / (1024 * 1024)),
1465	    verify ? "on" : "off");
1466    /* open all the files and do any required setup for them */
1467    for (i = optind ; i < ac ; i++) {
1468	int thread_index;
1469	for (j = 0 ; j < num_contexts ; j++) {
1470	    thread_index = open_fds % num_threads;
1471	    open_fds++;
1472
1473	    rwfd = open(av[i], O_CREAT | O_RDWR | o_direct | o_sync, 0600);
1474	    if (rwfd == -1) {
1475		fprintf(stderr, "error while creating file %s: %s", av[i], strerror(errno));
1476		exit(1);
1477	    }
1478
1479	    oper = create_oper(rwfd, first_stage, j * context_offset,
1480	                       file_size - j * context_offset, rec_len,
1481			       depth, io_iter, av[i]);
1482	    if (!oper) {
1483		fprintf(stderr, "error in create_oper\n");
1484		exit(-1);
1485	    }
1486	    oper_list_add(oper, &t[thread_index].active_opers);
1487	    t[thread_index].num_files++;
1488	}
1489    }
1490    if (setup_shared_mem(num_threads, num_files * num_contexts,
1491                         depth, rec_len, max_io_submit))
1492    {
1493        exit(1);
1494    }
1495    for (i = 0 ; i < num_threads ; i++) {
1496	if (setup_ious(&t[i], t[i].num_files, depth, rec_len, max_io_submit))
1497		exit(1);
1498    }
1499    if (num_threads > 1) {
1500        printf("Running multi thread version num_threads:%d\n", num_threads);
1501        run_workers(t, num_threads);
1502    } else {
1503        printf("Running single thread version \n");
1504	status = worker(t);
1505    }
1506    if (unlink_files) {
1507	for (i = optind ; i < ac ; i++) {
1508	    printf("Cleaning up file %s \n", av[i]);
1509	    unlink(av[i]);
1510	}
1511    }
1512
1513    if (status) {
1514	exit(1);
1515    }
1516    return status;
1517}
1518