1/*
2 * Copyright (c) 2004 SuSE, Inc.  All Rights Reserved.
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of version 2 of the GNU General Public License as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it would be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
11 *
12 * Further, this software is distributed without any warranty that it is
13 * free of the rightful claim of any third person regarding infringement
14 * or the like.  Any license provided herein, whether implied or
15 * otherwise, applies only to this software file.  Patent licenses, if
16 * any, provided herein do not apply to combinations of this program with
17 * other software, or any other product whatsoever.
18 *
19 * You should have received a copy of the GNU General Public License along
20 * with this program; if not, write the Free Software Foundation, Inc., 59
21 * Temple Place - Suite 330, Boston MA 02111-1307, USA.
22 *
23 * Contact information: Silicon Graphics, Inc., 1600 Amphitheatre Pkwy,
24 * Mountain View, CA  94043, or:
25 *
26 *
27 * aio-stress
28 *
29 * will open or create each file on the command line, and start a series
30 * of aio to it.
31 *
32 * aio is done in a rotating loop.  first file1 gets 8 requests, then
33 * file2, then file3 etc.  As each file finishes writing, it is switched
34 * to reads
35 *
36 * io buffers are aligned in case you want to do raw io
37 *
38 * compile with gcc -Wall -laio -lpthread -o aio-stress aio-stress.c
39 *
40 * run aio-stress -h to see the options
41 *
42 * Please mail Chris Mason (mason@suse.com) with bug reports or patches
43 */
44#define _FILE_OFFSET_BITS 64
45#define PROG_VERSION "0.21"
46#define NEW_GETEVENTS
47
48#include <stdio.h>
49#include <errno.h>
50#include <assert.h>
51#include <stdlib.h>
52
53#include <sys/types.h>
54#include <sys/stat.h>
55#include <fcntl.h>
56#include <unistd.h>
57#include <sys/time.h>
58#include <libaio.h>
59#include <sys/ipc.h>
60#include <sys/shm.h>
61#include <sys/mman.h>
62#include <string.h>
63#include <pthread.h>
64
65#define IO_FREE 0
66#define IO_PENDING 1
67#define RUN_FOREVER -1
68
69#ifndef O_DIRECT
70#define O_DIRECT         040000 /* direct disk access hint */
71#endif
72
73enum {
74    WRITE,
75    READ,
76    RWRITE,
77    RREAD,
78    LAST_STAGE,
79};
80
81#define USE_MALLOC 0
82#define USE_SHM 1
83#define USE_SHMFS 2
84
85/*
86 * various globals, these are effectively read only by the time the threads
87 * are started
88 */
89long stages = 0;
90unsigned long page_size_mask;
91int o_direct = 0;
92int o_sync = 0;
93int latency_stats = 0;
94int completion_latency_stats = 0;
95int io_iter = 8;
96int iterations = RUN_FOREVER;
97int max_io_submit = 0;
98long rec_len = 64 * 1024;
99int depth = 64;
100int num_threads = 1;
101int num_contexts = 1;
102off_t context_offset = 2 * 1024 * 1024;
103int fsync_stages = 1;
104int use_shm = 0;
105int shm_id;
106char *unaligned_buffer = NULL;
107char *aligned_buffer = NULL;
108int padded_reclen = 0;
109int stonewall = 1;
110int verify = 0;
111char *verify_buf = NULL;
112int unlink_files = 0;
113
114struct io_unit;
115struct thread_info;
116
117/* pthread mutexes and other globals for keeping the threads in sync */
118pthread_cond_t stage_cond = PTHREAD_COND_INITIALIZER;
119pthread_mutex_t stage_mutex = PTHREAD_MUTEX_INITIALIZER;
120int threads_ending = 0;
121int threads_starting = 0;
122struct timeval global_stage_start_time;
123struct thread_info *global_thread_info;
124
125/*
126 * latencies during io_submit are measured, these are the
127 * granularities for deviations
128 */
129#define DEVIATIONS 6
130int deviations[DEVIATIONS] = { 100, 250, 500, 1000, 5000, 10000 };
131struct io_latency {
132    double max;
133    double min;
134    double total_io;
135    double total_lat;
136    double deviations[DEVIATIONS];
137};
138
139/* container for a series of operations to a file */
140struct io_oper {
141    /* already open file descriptor, valid for whatever operation you want */
142    int fd;
143
144    /* starting byte of the operation */
145    off_t start;
146
147    /* ending byte of the operation */
148    off_t end;
149
150    /* size of the read/write buffer */
151    int reclen;
152
153    /* max number of pending requests before a wait is triggered */
154    int depth;
155
156    /* current number of pending requests */
157    int num_pending;
158
159    /* last error, zero if there were none */
160    int last_err;
161
162    /* total number of errors hit. */
163    int num_err;
164
165    /* read,write, random, etc */
166    int rw;
167
168    /* number of ios that will get sent to aio */
169    int total_ios;
170
171    /* number of ios we've already sent */
172    int started_ios;
173
174    /* last offset used in an io operation */
175    off_t last_offset;
176
177    /* stonewalled = 1 when we got cut off before submitting all our ios */
178    int stonewalled;
179
180    /* list management */
181    struct io_oper *next;
182    struct io_oper *prev;
183
184    struct timeval start_time;
185
186    char *file_name;
187};
188
189/* a single io, and all the tracking needed for it */
190struct io_unit {
191    /* note, iocb must go first! */
192    struct iocb iocb;
193
194    /* pointer to parent io operation struct */
195    struct io_oper *io_oper;
196
197    /* aligned buffer */
198    char *buf;
199
200    /* size of the aligned buffer (record size) */
201    int buf_size;
202
203    /* state of this io unit (free, pending, done) */
204    int busy;
205
206    /* result of last operation */
207    long res;
208
209    struct io_unit *next;
210
211    struct timeval io_start_time;		/* time of io_submit */
212};
213
214struct thread_info {
215    io_context_t io_ctx;
216    pthread_t tid;
217
218    /* allocated array of io_unit structs */
219    struct io_unit *ios;
220
221    /* list of io units available for io */
222    struct io_unit *free_ious;
223
224    /* number of io units in the ios array */
225    int num_global_ios;
226
227    /* number of io units in flight */
228    int num_global_pending;
229
230    /* preallocated array of iocb pointers, only used in run_active */
231    struct iocb **iocbs;
232
233    /* preallocated array of events */
234    struct io_event *events;
235
236    /* size of the events array */
237    int num_global_events;
238
239    /* latency stats for io_submit */
240    struct io_latency io_submit_latency;
241
242    /* list of operations still in progress, and of those finished */
243    struct io_oper *active_opers;
244    struct io_oper *finished_opers;
245
246    /* number of files this thread is doing io on */
247    int num_files;
248
249    /* how much io this thread did in the last stage */
250    double stage_mb_trans;
251
252    /* latency completion stats i/o time from io_submit until io_getevents */
253    struct io_latency io_completion_latency;
254};
255
256/*
257 * return seconds between start_tv and stop_tv in double precision
258 */
259static double time_since(struct timeval *start_tv, struct timeval *stop_tv)
260{
261    double sec, usec;
262    double ret;
263    sec = stop_tv->tv_sec - start_tv->tv_sec;
264    usec = stop_tv->tv_usec - start_tv->tv_usec;
265    if (sec > 0 && usec < 0) {
266        sec--;
267	usec += 1000000;
268    }
269    ret = sec + usec / (double)1000000;
270    if (ret < 0)
271        ret = 0;
272    return ret;
273}
274
275/*
276 * return seconds between start_tv and now in double precision
277 */
278static double time_since_now(struct timeval *start_tv)
279{
280    struct timeval stop_time;
281    gettimeofday(&stop_time, NULL);
282    return time_since(start_tv, &stop_time);
283}
284
285/*
286 * Add latency info to latency struct
287 */
288static void calc_latency(struct timeval *start_tv, struct timeval *stop_tv,
289			struct io_latency *lat)
290{
291    double delta;
292    int i;
293    delta = time_since(start_tv, stop_tv);
294    delta = delta * 1000;
295
296    if (delta > lat->max)
297    	lat->max = delta;
298    if (!lat->min || delta < lat->min)
299    	lat->min = delta;
300    lat->total_io++;
301    lat->total_lat += delta;
302    for (i = 0 ; i < DEVIATIONS ; i++) {
303        if (delta < deviations[i]) {
304	    lat->deviations[i]++;
305	    break;
306	}
307    }
308}
309
310static void oper_list_add(struct io_oper *oper, struct io_oper **list)
311{
312    if (!*list) {
313        *list = oper;
314	oper->prev = oper->next = oper;
315	return;
316    }
317    oper->prev = (*list)->prev;
318    oper->next = *list;
319    (*list)->prev->next = oper;
320    (*list)->prev = oper;
321    return;
322}
323
324static void oper_list_del(struct io_oper *oper, struct io_oper **list)
325{
326    if ((*list)->next == (*list)->prev && *list == (*list)->next) {
327        *list = NULL;
328	return;
329    }
330    oper->prev->next = oper->next;
331    oper->next->prev = oper->prev;
332    if (*list == oper)
333        *list = oper->next;
334}
335
336/* worker func to check error fields in the io unit */
337static int check_finished_io(struct io_unit *io) {
338    int i;
339    if (io->res != io->buf_size) {
340
341  		 struct stat s;
342  		 fstat(io->io_oper->fd, &s);
343
344  		 /*
345  		  * If file size is large enough for the read, then this short
346  		  * read is an error.
347  		  */
348  		 if ((io->io_oper->rw == READ || io->io_oper->rw == RREAD) &&
349  		     s.st_size > (io->iocb.u.c.offset + io->res)) {
350
351  		 		 fprintf(stderr, "io err %lu (%s) op %d, off %Lu size %d\n",
352  		 		 		 io->res, strerror(-io->res), io->iocb.aio_lio_opcode,
353  		 		 		 io->iocb.u.c.offset, io->buf_size);
354  		 		 io->io_oper->last_err = io->res;
355  		 		 io->io_oper->num_err++;
356  		 		 return -1;
357  		 }
358    }
359    if (verify && io->io_oper->rw == READ) {
360        if (memcmp(io->buf, verify_buf, io->io_oper->reclen)) {
361	    fprintf(stderr, "verify error, file %s offset %Lu contents (offset:bad:good):\n",
362	            io->io_oper->file_name, io->iocb.u.c.offset);
363
364	    for (i = 0 ; i < io->io_oper->reclen ; i++) {
365	        if (io->buf[i] != verify_buf[i]) {
366		    fprintf(stderr, "%d:%c:%c ", i, io->buf[i], verify_buf[i]);
367		}
368	    }
369	    fprintf(stderr, "\n");
370	}
371
372    }
373    return 0;
374}
375
376/* worker func to check the busy bits and get an io unit ready for use */
377static int grab_iou(struct io_unit *io, struct io_oper *oper) {
378    if (io->busy == IO_PENDING)
379        return -1;
380
381    io->busy = IO_PENDING;
382    io->res = 0;
383    io->io_oper = oper;
384    return 0;
385}
386
387char *stage_name(int rw) {
388    switch(rw) {
389    case WRITE:
390        return "write";
391    case READ:
392        return "read";
393    case RWRITE:
394        return "random write";
395    case RREAD:
396        return "random read";
397    }
398    return "unknown";
399}
400
401static inline double oper_mb_trans(struct io_oper *oper) {
402    return ((double)oper->started_ios * (double)oper->reclen) /
403                (double)(1024 * 1024);
404}
405
406static void print_time(struct io_oper *oper) {
407    double runtime;
408    double tput;
409    double mb;
410
411    runtime = time_since_now(&oper->start_time);
412    mb = oper_mb_trans(oper);
413    tput = mb / runtime;
414    fprintf(stderr, "%s on %s (%.2f MB/s) %.2f MB in %.2fs\n",
415	    stage_name(oper->rw), oper->file_name, tput, mb, runtime);
416}
417
418static void print_lat(char *str, struct io_latency *lat) {
419    double avg = lat->total_lat / lat->total_io;
420    int i;
421    double total_counted = 0;
422    fprintf(stderr, "%s min %.2f avg %.2f max %.2f\n\t",
423            str, lat->min, avg, lat->max);
424
425    for (i = 0 ; i < DEVIATIONS ; i++) {
426	fprintf(stderr, " %.0f < %d", lat->deviations[i], deviations[i]);
427	total_counted += lat->deviations[i];
428    }
429    if (total_counted && lat->total_io - total_counted)
430        fprintf(stderr, " < %.0f", lat->total_io - total_counted);
431    fprintf(stderr, "\n");
432    memset(lat, 0, sizeof(*lat));
433}
434
435static void print_latency(struct thread_info *t)
436{
437    struct io_latency *lat = &t->io_submit_latency;
438    print_lat("latency", lat);
439}
440
441static void print_completion_latency(struct thread_info *t)
442{
443    struct io_latency *lat = &t->io_completion_latency;
444    print_lat("completion latency", lat);
445}
446
447/*
448 * updates the fields in the io operation struct that belongs to this
449 * io unit, and make the io unit reusable again
450 */
451void finish_io(struct thread_info *t, struct io_unit *io, long result,
452		struct timeval *tv_now) {
453    struct io_oper *oper = io->io_oper;
454
455    calc_latency(&io->io_start_time, tv_now, &t->io_completion_latency);
456    io->res = result;
457    io->busy = IO_FREE;
458    io->next = t->free_ious;
459    t->free_ious = io;
460    oper->num_pending--;
461    t->num_global_pending--;
462    check_finished_io(io);
463    if (oper->num_pending == 0 &&
464       (oper->started_ios == oper->total_ios || oper->stonewalled))
465    {
466        print_time(oper);
467    }
468}
469
470int read_some_events(struct thread_info *t) {
471    struct io_unit *event_io;
472    struct io_event *event;
473    int nr;
474    int i;
475    int min_nr = io_iter;
476    struct timeval stop_time;
477
478    if (t->num_global_pending < io_iter)
479        min_nr = t->num_global_pending;
480
481#ifdef NEW_GETEVENTS
482    nr = io_getevents(t->io_ctx, min_nr, t->num_global_events, t->events,NULL);
483#else
484    nr = io_getevents(t->io_ctx, t->num_global_events, t->events, NULL);
485#endif
486    if (nr <= 0)
487        return nr;
488
489    gettimeofday(&stop_time, NULL);
490    for (i = 0 ; i < nr ; i++) {
491	event = t->events + i;
492	event_io = (struct io_unit *)((unsigned long)event->obj);
493	finish_io(t, event_io, event->res, &stop_time);
494    }
495    return nr;
496}
497
498/*
499 * finds a free io unit, waiting for pending requests if required.  returns
500 * null if none could be found
501 */
502static struct io_unit *find_iou(struct thread_info *t, struct io_oper *oper)
503{
504    struct io_unit *event_io;
505    int nr;
506
507retry:
508    if (t->free_ious) {
509        event_io = t->free_ious;
510	t->free_ious = t->free_ious->next;
511	if (grab_iou(event_io, oper)) {
512	    fprintf(stderr, "io unit on free list but not free\n");
513	    abort();
514	}
515	return event_io;
516    }
517    nr = read_some_events(t);
518    if (nr > 0)
519    	goto retry;
520    else
521    	fprintf(stderr, "no free ious after read_some_events\n");
522    return NULL;
523}
524
525/*
526 * wait for all pending requests for this io operation to finish
527 */
528static int io_oper_wait(struct thread_info *t, struct io_oper *oper) {
529    struct io_event event;
530    struct io_unit *event_io;
531
532    if (oper == NULL) {
533        return 0;
534    }
535
536    if (oper->num_pending == 0)
537        goto done;
538
539    /* this func is not speed sensitive, no need to go wild reading
540     * more than one event at a time
541     */
542#ifdef NEW_GETEVENTS
543    while(io_getevents(t->io_ctx, 1, 1, &event, NULL) > 0) {
544#else
545    while(io_getevents(t->io_ctx, 1, &event, NULL) > 0) {
546#endif
547	struct timeval tv_now;
548        event_io = (struct io_unit *)((unsigned long)event.obj);
549
550	gettimeofday(&tv_now, NULL);
551	finish_io(t, event_io, event.res, &tv_now);
552
553	if (oper->num_pending == 0)
554	    break;
555    }
556done:
557    if (oper->num_err) {
558        fprintf(stderr, "%u errors on oper, last %u\n",
559	        oper->num_err, oper->last_err);
560    }
561    return 0;
562}
563
564off_t random_byte_offset(struct io_oper *oper) {
565    off_t num;
566    off_t rand_byte = oper->start;
567    off_t range;
568    off_t offset = 1;
569
570    range = (oper->end - oper->start) / (1024 * 1024);
571    if ((page_size_mask+1) > (1024 * 1024))
572        offset = (page_size_mask+1) / (1024 * 1024);
573    if (range < offset)
574        range = 0;
575    else
576        range -= offset;
577
578    /* find a random mb offset */
579    num = 1 + (int)((double)range * rand() / (RAND_MAX + 1.0 ));
580    rand_byte += num * 1024 * 1024;
581
582    /* find a random byte offset */
583    num = 1 + (int)((double)(1024 * 1024) * rand() / (RAND_MAX + 1.0));
584
585    /* page align */
586    num = (num + page_size_mask) & ~page_size_mask;
587    rand_byte += num;
588
589    if (rand_byte + oper->reclen > oper->end) {
590	rand_byte -= oper->reclen;
591    }
592    return rand_byte;
593}
594
595/*
596 * build an aio iocb for an operation, based on oper->rw and the
597 * last offset used.  This finds the struct io_unit that will be attached
598 * to the iocb, and things are ready for submission to aio after this
599 * is called.
600 *
601 * returns null on error
602 */
603static struct io_unit *build_iocb(struct thread_info *t, struct io_oper *oper)
604{
605    struct io_unit *io;
606    off_t rand_byte;
607
608    io = find_iou(t, oper);
609    if (!io) {
610        fprintf(stderr, "unable to find io unit\n");
611	return NULL;
612    }
613
614    switch(oper->rw) {
615    case WRITE:
616        io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen,
617	               oper->last_offset);
618	oper->last_offset += oper->reclen;
619	break;
620    case READ:
621        io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen,
622	              oper->last_offset);
623	oper->last_offset += oper->reclen;
624	break;
625    case RREAD:
626	rand_byte = random_byte_offset(oper);
627	oper->last_offset = rand_byte;
628        io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen,
629	              rand_byte);
630        break;
631    case RWRITE:
632	rand_byte = random_byte_offset(oper);
633	oper->last_offset = rand_byte;
634        io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen,
635	              rand_byte);
636
637        break;
638    }
639
640    return io;
641}
642
643/*
644 * wait for any pending requests, and then free all ram associated with
645 * an operation.  returns the last error the operation hit (zero means none)
646 */
647static int
648finish_oper(struct thread_info *t, struct io_oper *oper)
649{
650    unsigned long last_err;
651
652    io_oper_wait(t, oper);
653    last_err = oper->last_err;
654    if (oper->num_pending > 0) {
655        fprintf(stderr, "oper num_pending is %d\n", oper->num_pending);
656    }
657    close(oper->fd);
658    free(oper);
659    return last_err;
660}
661
662/*
663 * allocates an io operation and fills in all the fields.  returns
664 * null on error
665 */
666static struct io_oper *
667create_oper(int fd, int rw, off_t start, off_t end, int reclen, int depth,
668            int iter, char *file_name)
669{
670    struct io_oper *oper;
671
672    oper = malloc (sizeof(*oper));
673    if (!oper) {
674	fprintf(stderr, "unable to allocate io oper\n");
675	return NULL;
676    }
677    memset(oper, 0, sizeof(*oper));
678
679    oper->depth = depth;
680    oper->start = start;
681    oper->end = end;
682    oper->last_offset = oper->start;
683    oper->fd = fd;
684    oper->reclen = reclen;
685    oper->rw = rw;
686    oper->total_ios = (oper->end - oper->start) / oper->reclen;
687    oper->file_name = file_name;
688
689    return oper;
690}
691
692/*
693 * does setup on num_ios worth of iocbs, but does not actually
694 * start any io
695 */
696int build_oper(struct thread_info *t, struct io_oper *oper, int num_ios,
697               struct iocb **my_iocbs)
698{
699    int i;
700    struct io_unit *io;
701
702    if (oper->started_ios == 0)
703	gettimeofday(&oper->start_time, NULL);
704
705    if (num_ios == 0)
706        num_ios = oper->total_ios;
707
708    if ((oper->started_ios + num_ios) > oper->total_ios)
709        num_ios = oper->total_ios - oper->started_ios;
710
711    for( i = 0 ; i < num_ios ; i++) {
712	io = build_iocb(t, oper);
713	if (!io) {
714	    return -1;
715	}
716	my_iocbs[i] = &io->iocb;
717    }
718    return num_ios;
719}
720
721/*
722 * runs through the iocbs in the array provided and updates
723 * counters in the associated oper struct
724 */
725static void update_iou_counters(struct iocb **my_iocbs, int nr,
726	struct timeval *tv_now)
727{
728    struct io_unit *io;
729    int i;
730    for (i = 0 ; i < nr ; i++) {
731	io = (struct io_unit *)(my_iocbs[i]);
732	io->io_oper->num_pending++;
733	io->io_oper->started_ios++;
734	io->io_start_time = *tv_now;	/* set time of io_submit */
735    }
736}
737
738/* starts some io for a given file, returns zero if all went well */
739int run_built(struct thread_info *t, int num_ios, struct iocb **my_iocbs)
740{
741    int ret;
742    struct timeval start_time;
743    struct timeval stop_time;
744
745resubmit:
746    gettimeofday(&start_time, NULL);
747    ret = io_submit(t->io_ctx, num_ios, my_iocbs);
748    gettimeofday(&stop_time, NULL);
749    calc_latency(&start_time, &stop_time, &t->io_submit_latency);
750
751    if (ret != num_ios) {
752	/* some ios got through */
753	if (ret > 0) {
754	    update_iou_counters(my_iocbs, ret, &stop_time);
755	    my_iocbs += ret;
756	    t->num_global_pending += ret;
757	    num_ios -= ret;
758	}
759	/*
760	 * we've used all the requests allocated in aio_init, wait and
761	 * retry
762	 */
763	if (ret > 0 || ret == -EAGAIN) {
764	    int old_ret = ret;
765	    if ((ret = read_some_events(t) > 0)) {
766		goto resubmit;
767	    } else {
768	    	fprintf(stderr, "ret was %d and now is %d\n", ret, old_ret);
769		abort();
770	    }
771	}
772
773	fprintf(stderr, "ret %d (%s) on io_submit\n", ret, strerror(-ret));
774	return -1;
775    }
776    update_iou_counters(my_iocbs, ret, &stop_time);
777    t->num_global_pending += ret;
778    return 0;
779}
780
781/*
782 * changes oper->rw to the next in a command sequence, or returns zero
783 * to say this operation is really, completely done for
784 */
785static int restart_oper(struct io_oper *oper) {
786    int new_rw  = 0;
787    if (oper->last_err)
788        return 0;
789
790    /* this switch falls through */
791    switch(oper->rw) {
792    case WRITE:
793	if (stages & (1 << READ))
794	    new_rw = READ;
795    case READ:
796	if (!new_rw && stages & (1 << RWRITE))
797	    new_rw = RWRITE;
798    case RWRITE:
799	if (!new_rw && stages & (1 << RREAD))
800	    new_rw = RREAD;
801    }
802
803    if (new_rw) {
804	oper->started_ios = 0;
805	oper->last_offset = oper->start;
806	oper->stonewalled = 0;
807
808	/*
809	 * we're restarting an operation with pending requests, so the
810	 * timing info won't be printed by finish_io.  Printing it here
811	 */
812	if (oper->num_pending)
813	    print_time(oper);
814
815	oper->rw = new_rw;
816	return 1;
817    }
818    return 0;
819}
820
821static int oper_runnable(struct io_oper *oper) {
822    struct stat buf;
823    int ret;
824
825    /* first context is always runnable, if started_ios > 0, no need to
826     * redo the calculations
827     */
828    if (oper->started_ios || oper->start == 0)
829        return 1;
830    /*
831     * only the sequential phases force delays in starting */
832    if (oper->rw >= RWRITE)
833        return 1;
834    ret = fstat(oper->fd, &buf);
835    if (ret < 0) {
836        perror("fstat");
837	exit(1);
838    }
839    if (S_ISREG(buf.st_mode) && buf.st_size < oper->start)
840        return 0;
841    return 1;
842}
843
844/*
845 * runs through all the io operations on the active list, and starts
846 * a chunk of io on each.  If any io operations are completely finished,
847 * it either switches them to the next stage or puts them on the
848 * finished list.
849 *
850 * this function stops after max_io_submit iocbs are sent down the
851 * pipe, even if it has not yet touched all the operations on the
852 * active list.  Any operations that have finished are moved onto
853 * the finished_opers list.
854 */
855static int run_active_list(struct thread_info *t,
856			 int io_iter,
857			 int max_io_submit)
858{
859    struct io_oper *oper;
860    struct io_oper *built_opers = NULL;
861    struct iocb **my_iocbs = t->iocbs;
862    int ret = 0;
863    int num_built = 0;
864
865    oper = t->active_opers;
866    while(oper) {
867	if (!oper_runnable(oper)) {
868	    oper = oper->next;
869	    if (oper == t->active_opers)
870	        break;
871	    continue;
872	}
873	ret = build_oper(t, oper, io_iter, my_iocbs);
874	if (ret >= 0) {
875	    my_iocbs += ret;
876	    num_built += ret;
877	    oper_list_del(oper, &t->active_opers);
878	    oper_list_add(oper, &built_opers);
879	    oper = t->active_opers;
880	    if (num_built + io_iter > max_io_submit)
881	        break;
882	} else
883	    break;
884    }
885    if (num_built) {
886	ret = run_built(t, num_built, t->iocbs);
887	if (ret < 0) {
888	    fprintf(stderr, "error %d on run_built\n", ret);
889	    exit(1);
890	}
891	while(built_opers) {
892	    oper = built_opers;
893	    oper_list_del(oper, &built_opers);
894	    oper_list_add(oper, &t->active_opers);
895	    if (oper->started_ios == oper->total_ios) {
896		oper_list_del(oper, &t->active_opers);
897		oper_list_add(oper, &t->finished_opers);
898	    }
899	}
900    }
901    return 0;
902}
903
904void drop_shm() {
905    int ret;
906    struct shmid_ds ds;
907    if (use_shm != USE_SHM)
908        return;
909
910    ret = shmctl(shm_id, IPC_RMID, &ds);
911    if (ret) {
912        perror("shmctl IPC_RMID");
913    }
914}
915
916void aio_setup(io_context_t *io_ctx, int n)
917{
918    int res = io_queue_init(n, io_ctx);
919    if (res != 0) {
920	fprintf(stderr, "io_queue_setup(%d) returned %d (%s)\n",
921		n, res, strerror(-res));
922	exit(3);
923    }
924}
925
926/*
927 * allocate io operation and event arrays for a given thread
928 */
929int setup_ious(struct thread_info *t,
930              int num_files, int depth,
931	      int reclen, int max_io_submit) {
932    int i;
933    size_t bytes = num_files * depth * sizeof(*t->ios);
934
935    t->ios = malloc(bytes);
936    if (!t->ios) {
937	fprintf(stderr, "unable to allocate io units\n");
938	return -1;
939    }
940    memset(t->ios, 0, bytes);
941
942    for (i = 0 ; i < depth * num_files; i++) {
943	t->ios[i].buf = aligned_buffer;
944	aligned_buffer += padded_reclen;
945	t->ios[i].buf_size = reclen;
946	if (verify)
947	    memset(t->ios[i].buf, 'b', reclen);
948	else
949	    memset(t->ios[i].buf, 0, reclen);
950	t->ios[i].next = t->free_ious;
951	t->free_ious = t->ios + i;
952    }
953    if (verify) {
954        verify_buf = aligned_buffer;
955        memset(verify_buf, 'b', reclen);
956    }
957
958    t->iocbs = malloc(sizeof(struct iocb *) * max_io_submit);
959    if (!t->iocbs) {
960        fprintf(stderr, "unable to allocate iocbs\n");
961	goto free_buffers;
962    }
963
964    memset(t->iocbs, 0, max_io_submit * sizeof(struct iocb *));
965
966    t->events = malloc(sizeof(struct io_event) * depth * num_files);
967    if (!t->events) {
968        fprintf(stderr, "unable to allocate ram for events\n");
969	goto free_buffers;
970    }
971    memset(t->events, 0, num_files * sizeof(struct io_event)*depth);
972
973    t->num_global_ios = num_files * depth;
974    t->num_global_events = t->num_global_ios;
975    return 0;
976
977free_buffers:
978    if (t->ios)
979        free(t->ios);
980    if (t->iocbs)
981        free(t->iocbs);
982    if (t->events)
983        free(t->events);
984    return -1;
985}
986
987/*
988 * The buffers used for file data are allocated as a single big
989 * malloc, and then each thread and operation takes a piece and uses
990 * that for file data.  This lets us do a large shm or bigpages alloc
991 * and without trying to find a special place in each thread to map the
992 * buffers to
993 */
994int setup_shared_mem(int num_threads, int num_files, int depth,
995                     int reclen, int max_io_submit)
996{
997    char *p = NULL;
998    size_t total_ram;
999
1000    padded_reclen = (reclen + page_size_mask) / (page_size_mask+1);
1001    padded_reclen = padded_reclen * (page_size_mask+1);
1002    total_ram = num_files * depth * padded_reclen + num_threads;
1003    if (verify)
1004    	total_ram += padded_reclen;
1005
1006    if (use_shm == USE_MALLOC) {
1007	p = malloc(total_ram + page_size_mask);
1008    } else if (use_shm == USE_SHM) {
1009        shm_id = shmget(IPC_PRIVATE, total_ram, IPC_CREAT | 0700);
1010	if (shm_id < 0) {
1011	    perror("shmget");
1012	    drop_shm();
1013	    goto free_buffers;
1014	}
1015	p = shmat(shm_id, (char *)0x50000000, 0);
1016        if ((long)p == -1) {
1017	    perror("shmat");
1018	    goto free_buffers;
1019	}
1020	/* won't really be dropped until we shmdt */
1021	drop_shm();
1022    } else if (use_shm == USE_SHMFS) {
1023        char mmap_name[16]; /* /dev/shm/ + null + XXXXXX */
1024	int fd;
1025
1026	strcpy(mmap_name, "/dev/shm/XXXXXX");
1027	fd = mkstemp(mmap_name);
1028        if (fd < 0) {
1029	    perror("mkstemp");
1030	    goto free_buffers;
1031	}
1032	unlink(mmap_name);
1033	ftruncate(fd, total_ram);
1034	shm_id = fd;
1035	p = mmap((char *)0x50000000, total_ram,
1036	         PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1037
1038        if (p == MAP_FAILED) {
1039	    perror("mmap");
1040	    goto free_buffers;
1041	}
1042    }
1043    if (!p) {
1044        fprintf(stderr, "unable to allocate buffers\n");
1045	goto free_buffers;
1046    }
1047    unaligned_buffer = p;
1048    p = (char*)((intptr_t) (p + page_size_mask) & ~page_size_mask);
1049    aligned_buffer = p;
1050    return 0;
1051
1052free_buffers:
1053    drop_shm();
1054    if (unaligned_buffer)
1055        free(unaligned_buffer);
1056    return -1;
1057}
1058
1059/*
1060 * runs through all the thread_info structs and calculates a combined
1061 * throughput
1062 */
1063void global_thread_throughput(struct thread_info *t, char *this_stage) {
1064    int i;
1065    double runtime = time_since_now(&global_stage_start_time);
1066    double total_mb = 0;
1067    double min_trans = 0;
1068
1069    for (i = 0 ; i < num_threads ; i++) {
1070        total_mb += global_thread_info[i].stage_mb_trans;
1071	if (!min_trans || t->stage_mb_trans < min_trans)
1072	    min_trans = t->stage_mb_trans;
1073    }
1074    if (total_mb) {
1075	fprintf(stderr, "%s throughput (%.2f MB/s) ", this_stage,
1076	        total_mb / runtime);
1077	fprintf(stderr, "%.2f MB in %.2fs", total_mb, runtime);
1078        if (stonewall)
1079	    fprintf(stderr, " min transfer %.2fMB", min_trans);
1080        fprintf(stderr, "\n");
1081    }
1082}
1083
1084
1085/* this is the meat of the state machine.  There is a list of
1086 * active operations structs, and as each one finishes the required
1087 * io it is moved to a list of finished operations.  Once they have
1088 * all finished whatever stage they were in, they are given the chance
1089 * to restart and pick a different stage (read/write/random read etc)
1090 *
1091 * various timings are printed in between the stages, along with
1092 * thread synchronization if there are more than one threads.
1093 */
1094int worker(struct thread_info *t)
1095{
1096    struct io_oper *oper;
1097    char *this_stage = NULL;
1098    struct timeval stage_time;
1099    int status = 0;
1100    int iteration = 0;
1101    int cnt;
1102
1103    aio_setup(&t->io_ctx, 512);
1104
1105restart:
1106    if (num_threads > 1) {
1107        pthread_mutex_lock(&stage_mutex);
1108	threads_starting++;
1109	if (threads_starting == num_threads) {
1110	    threads_ending = 0;
1111	    gettimeofday(&global_stage_start_time, NULL);
1112	    pthread_cond_broadcast(&stage_cond);
1113	}
1114	while (threads_starting != num_threads)
1115	    pthread_cond_wait(&stage_cond, &stage_mutex);
1116        pthread_mutex_unlock(&stage_mutex);
1117    }
1118    if (t->active_opers) {
1119        this_stage = stage_name(t->active_opers->rw);
1120	gettimeofday(&stage_time, NULL);
1121	t->stage_mb_trans = 0;
1122    }
1123
1124    cnt = 0;
1125    /* first we send everything through aio */
1126    while(t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) {
1127	if (stonewall && threads_ending) {
1128	    oper = t->active_opers;
1129	    oper->stonewalled = 1;
1130	    oper_list_del(oper, &t->active_opers);
1131	    oper_list_add(oper, &t->finished_opers);
1132	} else {
1133	    run_active_list(t, io_iter,  max_io_submit);
1134        }
1135	cnt++;
1136    }
1137    if (latency_stats)
1138        print_latency(t);
1139
1140    if (completion_latency_stats)
1141	print_completion_latency(t);
1142
1143    /* then we wait for all the operations to finish */
1144    oper = t->finished_opers;
1145    do {
1146	if (!oper)
1147		break;
1148	io_oper_wait(t, oper);
1149	oper = oper->next;
1150    } while(oper != t->finished_opers);
1151
1152    /* then we do an fsync to get the timing for any future operations
1153     * right, and check to see if any of these need to get restarted
1154     */
1155    oper = t->finished_opers;
1156    while(oper) {
1157	if (fsync_stages)
1158            fsync(oper->fd);
1159	t->stage_mb_trans += oper_mb_trans(oper);
1160	if (restart_oper(oper)) {
1161	    oper_list_del(oper, &t->finished_opers);
1162	    oper_list_add(oper, &t->active_opers);
1163	    oper = t->finished_opers;
1164	    continue;
1165	}
1166	oper = oper->next;
1167	if (oper == t->finished_opers)
1168	    break;
1169    }
1170
1171    if (t->stage_mb_trans && t->num_files > 0) {
1172        double seconds = time_since_now(&stage_time);
1173	fprintf(stderr, "thread %d %s totals (%.2f MB/s) %.2f MB in %.2fs\n",
1174	        t - global_thread_info, this_stage, t->stage_mb_trans/seconds,
1175		t->stage_mb_trans, seconds);
1176    }
1177
1178    if (num_threads > 1) {
1179	pthread_mutex_lock(&stage_mutex);
1180	threads_ending++;
1181	if (threads_ending == num_threads) {
1182	    threads_starting = 0;
1183	    pthread_cond_broadcast(&stage_cond);
1184	    global_thread_throughput(t, this_stage);
1185	}
1186	while(threads_ending != num_threads)
1187	    pthread_cond_wait(&stage_cond, &stage_mutex);
1188	pthread_mutex_unlock(&stage_mutex);
1189    }
1190
1191    /* someone got restarted, go back to the beginning */
1192    if (t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) {
1193	iteration++;
1194        goto restart;
1195    }
1196
1197    /* finally, free all the ram */
1198    while(t->finished_opers) {
1199	oper = t->finished_opers;
1200	oper_list_del(oper, &t->finished_opers);
1201	status = finish_oper(t, oper);
1202    }
1203
1204    if (t->num_global_pending) {
1205        fprintf(stderr, "global num pending is %d\n", t->num_global_pending);
1206    }
1207    io_queue_release(t->io_ctx);
1208
1209    return status;
1210}
1211
1212typedef void * (*start_routine)(void *);
1213int run_workers(struct thread_info *t, int num_threads)
1214{
1215    int ret;
1216    int thread_ret;
1217    int i;
1218
1219    for(i = 0 ; i < num_threads ; i++) {
1220        ret = pthread_create(&t[i].tid, NULL, (start_routine)worker, t + i);
1221	if (ret) {
1222	    perror("pthread_create");
1223	    exit(1);
1224	}
1225    }
1226    for(i = 0 ; i < num_threads ; i++) {
1227        ret = pthread_join(t[i].tid, (void *)&thread_ret);
1228        if (ret) {
1229	    perror("pthread_join");
1230	    exit(1);
1231	}
1232    }
1233    return 0;
1234}
1235
1236off_t parse_size(char *size_arg, off_t mult) {
1237    char c;
1238    int num;
1239    off_t ret;
1240    c = size_arg[strlen(size_arg) - 1];
1241    if (c > '9') {
1242        size_arg[strlen(size_arg) - 1] = '\0';
1243    }
1244    num = atoi(size_arg);
1245    switch(c) {
1246    case 'g':
1247    case 'G':
1248        mult = 1024 * 1024 * 1024;
1249	break;
1250    case 'm':
1251    case 'M':
1252        mult = 1024 * 1024;
1253	break;
1254    case 'k':
1255    case 'K':
1256        mult = 1024;
1257	break;
1258    case 'b':
1259    case 'B':
1260        mult = 1;
1261	break;
1262    }
1263    ret = mult * num;
1264    return ret;
1265}
1266
1267void print_usage(void) {
1268    printf("usage: aio-stress [-s size] [-r size] [-a size] [-d num] [-b num]\n");
1269    printf("                  [-i num] [-t num] [-c num] [-C size] [-nxhOS ]\n");
1270    printf("                  file1 [file2 ...]\n");
1271    printf("\t-a size in KB at which to align buffers\n");
1272    printf("\t-b max number of iocbs to give io_submit at once\n");
1273    printf("\t-c number of io contexts per file\n");
1274    printf("\t-C offset between contexts, default 2MB\n");
1275    printf("\t-s size in MB of the test file(s), default 1024MB\n");
1276    printf("\t-r record size in KB used for each io, default 64KB\n");
1277    printf("\t-d number of pending aio requests for each file, default 64\n");
1278    printf("\t-i number of ios per file sent before switching\n\t   to the next file, default 8\n");
1279    printf("\t-I total number of ayncs IOs the program will run, default is run until Cntl-C\n");
1280    printf("\t-O Use O_DIRECT (not available in 2.4 kernels),\n");
1281    printf("\t-S Use O_SYNC for writes\n");
1282    printf("\t-o add an operation to the list: write=0, read=1,\n");
1283    printf("\t   random write=2, random read=3.\n");
1284    printf("\t   repeat -o to specify multiple ops: -o 0 -o 1 etc.\n");
1285    printf("\t-m shm use ipc shared memory for io buffers instead of malloc\n");
1286    printf("\t-m shmfs mmap a file in /dev/shm for io buffers\n");
1287    printf("\t-n no fsyncs between write stage and read stage\n");
1288    printf("\t-l print io_submit latencies after each stage\n");
1289    printf("\t-L print io completion latencies after each stage\n");
1290    printf("\t-t number of threads to run\n");
1291    printf("\t-u unlink files after completion\n");
1292    printf("\t-v verification of bytes written\n");
1293    printf("\t-x turn off thread stonewalling\n");
1294    printf("\t-h this message\n");
1295    printf("\n\t   the size options (-a -s and -r) allow modifiers -s 400{k,m,g}\n");
1296    printf("\t   translate to 400KB, 400MB and 400GB\n");
1297    printf("version %s\n", PROG_VERSION);
1298}
1299
1300int main(int ac, char **av)
1301{
1302    int rwfd;
1303    int i;
1304    int j;
1305    int c;
1306
1307    off_t file_size = 1 * 1024 * 1024 * 1024;
1308    int first_stage = WRITE;
1309    struct io_oper *oper;
1310    int status = 0;
1311    int num_files = 0;
1312    int open_fds = 0;
1313    struct thread_info *t;
1314
1315    page_size_mask = getpagesize() - 1;
1316
1317    while(1) {
1318	c = getopt(ac, av, "a:b:c:C:m:s:r:d:i:I:o:t:lLnhOSxvu");
1319	if  (c < 0)
1320	    break;
1321
1322        switch(c) {
1323	case 'a':
1324	    page_size_mask = parse_size(optarg, 1024);
1325	    page_size_mask--;
1326	    break;
1327	case 'c':
1328	    num_contexts = atoi(optarg);
1329	    break;
1330	case 'C':
1331	    context_offset = parse_size(optarg, 1024 * 1024);
1332	case 'b':
1333	    max_io_submit = atoi(optarg);
1334	    break;
1335	case 's':
1336	    file_size = parse_size(optarg, 1024 * 1024);
1337	    break;
1338	case 'd':
1339	    depth = atoi(optarg);
1340	    break;
1341	case 'r':
1342	    rec_len = parse_size(optarg, 1024);
1343	    break;
1344	case 'i':
1345	    io_iter = atoi(optarg);
1346	    break;
1347        case 'I':
1348          iterations = atoi(optarg);
1349        break;
1350	case 'n':
1351	    fsync_stages = 0;
1352	    break;
1353	case 'l':
1354	    latency_stats = 1;
1355	    break;
1356	case 'L':
1357	    completion_latency_stats = 1;
1358	    break;
1359	case 'm':
1360	    if (!strcmp(optarg, "shm")) {
1361		fprintf(stderr, "using ipc shm\n");
1362	        use_shm = USE_SHM;
1363	    } else if (!strcmp(optarg, "shmfs")) {
1364	        fprintf(stderr, "using /dev/shm for buffers\n");
1365		use_shm = USE_SHMFS;
1366	    }
1367	    break;
1368	case 'o':
1369	    i = atoi(optarg);
1370	    stages |= 1 << i;
1371	    fprintf(stderr, "adding stage %s\n", stage_name(i));
1372	    break;
1373	case 'O':
1374	    o_direct = O_DIRECT;
1375	    break;
1376	case 'S':
1377	    o_sync = O_SYNC;
1378	    break;
1379	case 't':
1380	    num_threads = atoi(optarg);
1381	    break;
1382	case 'x':
1383	    stonewall = 0;
1384	    break;
1385	case 'u':
1386	    unlink_files = 1;
1387	    break;
1388	case 'v':
1389	    verify = 1;
1390	    break;
1391	case 'h':
1392	default:
1393	    print_usage();
1394	    exit(1);
1395	}
1396    }
1397
1398    /*
1399     * make sure we don't try to submit more ios than we have allocated
1400     * memory for
1401     */
1402    if (depth < io_iter) {
1403	io_iter = depth;
1404        fprintf(stderr, "dropping io_iter to %d\n", io_iter);
1405    }
1406
1407    if (optind >= ac) {
1408	print_usage();
1409	exit(1);
1410    }
1411
1412    num_files = ac - optind;
1413
1414    if (num_threads > (num_files * num_contexts)) {
1415        num_threads = num_files * num_contexts;
1416	fprintf(stderr, "dropping thread count to the number of contexts %d\n",
1417	        num_threads);
1418    }
1419
1420    t = malloc(num_threads * sizeof(*t));
1421    if (!t) {
1422        perror("malloc");
1423	exit(1);
1424    }
1425    global_thread_info = t;
1426
1427    /* by default, allow a huge number of iocbs to be sent towards
1428     * io_submit
1429     */
1430    if (!max_io_submit)
1431        max_io_submit = num_files * io_iter * num_contexts;
1432
1433    /*
1434     * make sure we don't try to submit more ios than max_io_submit allows
1435     */
1436    if (max_io_submit < io_iter) {
1437        io_iter = max_io_submit;
1438	fprintf(stderr, "dropping io_iter to %d\n", io_iter);
1439    }
1440
1441    if (!stages) {
1442        stages = (1 << WRITE) | (1 << READ) | (1 << RREAD) | (1 << RWRITE);
1443    } else {
1444        for (i = 0 ; i < LAST_STAGE; i++) {
1445	    if (stages & (1 << i)) {
1446	        first_stage = i;
1447		fprintf(stderr, "starting with %s\n", stage_name(i));
1448		break;
1449	    }
1450	}
1451    }
1452
1453    if (file_size < num_contexts * context_offset) {
1454        fprintf(stderr, "file size %Lu too small for %d contexts\n",
1455	        file_size, num_contexts);
1456	exit(1);
1457    }
1458
1459    fprintf(stderr, "file size %LuMB, record size %luKB, depth %d, ios per iteration %d\n", file_size / (1024 * 1024), rec_len / 1024, depth, io_iter);
1460    fprintf(stderr, "max io_submit %d, buffer alignment set to %luKB\n",
1461            max_io_submit, (page_size_mask + 1)/1024);
1462    fprintf(stderr, "threads %d files %d contexts %d context offset %LuMB verification %s\n",
1463            num_threads, num_files, num_contexts,
1464	    context_offset / (1024 * 1024), verify ? "on" : "off");
1465    /* open all the files and do any required setup for them */
1466    for (i = optind ; i < ac ; i++) {
1467	int thread_index;
1468	for (j = 0 ; j < num_contexts ; j++) {
1469	    thread_index = open_fds % num_threads;
1470	    open_fds++;
1471
1472	    rwfd = open(av[i], O_CREAT | O_RDWR | o_direct | o_sync, 0600);
1473	    assert(rwfd != -1);
1474
1475	    oper = create_oper(rwfd, first_stage, j * context_offset,
1476	                       file_size - j * context_offset, rec_len,
1477			       depth, io_iter, av[i]);
1478	    if (!oper) {
1479		fprintf(stderr, "error in create_oper\n");
1480		exit(-1);
1481	    }
1482	    oper_list_add(oper, &t[thread_index].active_opers);
1483	    t[thread_index].num_files++;
1484	}
1485    }
1486    if (setup_shared_mem(num_threads, num_files * num_contexts,
1487                         depth, rec_len, max_io_submit))
1488    {
1489        exit(1);
1490    }
1491    for (i = 0 ; i < num_threads ; i++) {
1492	if (setup_ious(&t[i], t[i].num_files, depth, rec_len, max_io_submit))
1493		exit(1);
1494    }
1495    if (num_threads > 1){
1496        printf("Running multi thread version num_threads:%d\n", num_threads);
1497        run_workers(t, num_threads);
1498    } else {
1499        printf("Running single thread version \n");
1500	status = worker(t);
1501    }
1502    if (unlink_files) {
1503	for (i = optind ; i < ac ; i++) {
1504	    printf("Cleaning up file %s \n", av[i]);
1505	    unlink(av[i]);
1506	}
1507    }
1508
1509    if (status) {
1510	exit(1);
1511    }
1512    return status;
1513}
1514
1515