aio-stress.c revision 43088e16aa60d69e3ec5a69cdd8bdd45b8891127
1/*
2 * Copyright (c) 2004 SuSE, Inc.  All Rights Reserved.
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of version 2 of the GNU General Public License as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it would be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
11 *
12 * Further, this software is distributed without any warranty that it is
13 * free of the rightful claim of any third person regarding infringement
14 * or the like.  Any license provided herein, whether implied or
15 * otherwise, applies only to this software file.  Patent licenses, if
16 * any, provided herein do not apply to combinations of this program with
17 * other software, or any other product whatsoever.
18 *
19 * You should have received a copy of the GNU General Public License along
20 * with this program; if not, write the Free Software Foundation, Inc., 59
21 * Temple Place - Suite 330, Boston MA 02111-1307, USA.
22 *
23 * Contact information: Silicon Graphics, Inc., 1600 Amphitheatre Pkwy,
24 * Mountain View, CA  94043, or:
25 *
26 *
27 * aio-stress
28 *
29 * will open or create each file on the command line, and start a series
30 * of aio to it.
31 *
32 * aio is done in a rotating loop.  first file1 gets 8 requests, then
33 * file2, then file3 etc.  As each file finishes writing, it is switched
34 * to reads
35 *
36 * io buffers are aligned in case you want to do raw io
37 *
38 * compile with gcc -Wall -laio -lpthread -o aio-stress aio-stress.c
39 *
40 * run aio-stress -h to see the options
41 *
42 * Please mail Chris Mason (mason@suse.com) with bug reports or patches
43 */
44#define _FILE_OFFSET_BITS 64
45#define PROG_VERSION "0.21"
46#define NEW_GETEVENTS
47
48#include <stdio.h>
49#include <errno.h>
50#include <assert.h>
51#include <stdlib.h>
52
53#include <sys/types.h>
54#include <sys/stat.h>
55#include <fcntl.h>
56#include <unistd.h>
57#include <sys/time.h>
58#include <libaio.h>
59#include <sys/ipc.h>
60#include <sys/shm.h>
61#include <sys/mman.h>
62#include <string.h>
63#include <pthread.h>
64
65#define IO_FREE 0
66#define IO_PENDING 1
67#define RUN_FOREVER -1
68
69enum {
70    WRITE,
71    READ,
72    RWRITE,
73    RREAD,
74    LAST_STAGE,
75};
76
77#define USE_MALLOC 0
78#define USE_SHM 1
79#define USE_SHMFS 2
80
81/*
82 * various globals, these are effectively read only by the time the threads
83 * are started
84 */
85long stages = 0;
86unsigned long page_size_mask;
87int o_direct = 0;
88int o_sync = 0;
89int latency_stats = 0;
90int completion_latency_stats = 0;
91int io_iter = 8;
92int iterations = RUN_FOREVER;
93int max_io_submit = 0;
94long rec_len = 64 * 1024;
95int depth = 64;
96int num_threads = 1;
97int num_contexts = 1;
98off_t context_offset = 2 * 1024 * 1024;
99int fsync_stages = 1;
100int use_shm = 0;
101int shm_id;
102char *unaligned_buffer = NULL;
103char *aligned_buffer = NULL;
104int padded_reclen = 0;
105int stonewall = 1;
106int verify = 0;
107char *verify_buf = NULL;
108int unlink_files = 0;
109
110struct io_unit;
111struct thread_info;
112
113/* pthread mutexes and other globals for keeping the threads in sync */
114pthread_cond_t stage_cond = PTHREAD_COND_INITIALIZER;
115pthread_mutex_t stage_mutex = PTHREAD_MUTEX_INITIALIZER;
116int threads_ending = 0;
117int threads_starting = 0;
118struct timeval global_stage_start_time;
119struct thread_info *global_thread_info;
120
121/*
122 * latencies during io_submit are measured, these are the
123 * granularities for deviations
124 */
125#define DEVIATIONS 6
126int deviations[DEVIATIONS] = { 100, 250, 500, 1000, 5000, 10000 };
127struct io_latency {
128    double max;
129    double min;
130    double total_io;
131    double total_lat;
132    double deviations[DEVIATIONS];
133};
134
135/* container for a series of operations to a file */
136struct io_oper {
137    /* already open file descriptor, valid for whatever operation you want */
138    int fd;
139
140    /* starting byte of the operation */
141    off_t start;
142
143    /* ending byte of the operation */
144    off_t end;
145
146    /* size of the read/write buffer */
147    int reclen;
148
149    /* max number of pending requests before a wait is triggered */
150    int depth;
151
152    /* current number of pending requests */
153    int num_pending;
154
155    /* last error, zero if there were none */
156    int last_err;
157
158    /* total number of errors hit. */
159    int num_err;
160
161    /* read,write, random, etc */
162    int rw;
163
164    /* number of ios that will get sent to aio */
165    int total_ios;
166
167    /* number of ios we've already sent */
168    int started_ios;
169
170    /* last offset used in an io operation */
171    off_t last_offset;
172
173    /* stonewalled = 1 when we got cut off before submitting all our ios */
174    int stonewalled;
175
176    /* list management */
177    struct io_oper *next;
178    struct io_oper *prev;
179
180    struct timeval start_time;
181
182    char *file_name;
183};
184
185/* a single io, and all the tracking needed for it */
186struct io_unit {
187    /* note, iocb must go first! */
188    struct iocb iocb;
189
190    /* pointer to parent io operation struct */
191    struct io_oper *io_oper;
192
193    /* aligned buffer */
194    char *buf;
195
196    /* size of the aligned buffer (record size) */
197    int buf_size;
198
199    /* state of this io unit (free, pending, done) */
200    int busy;
201
202    /* result of last operation */
203    long res;
204
205    struct io_unit *next;
206
207    struct timeval io_start_time;		/* time of io_submit */
208};
209
210struct thread_info {
211    io_context_t io_ctx;
212    pthread_t tid;
213
214    /* allocated array of io_unit structs */
215    struct io_unit *ios;
216
217    /* list of io units available for io */
218    struct io_unit *free_ious;
219
220    /* number of io units in the ios array */
221    int num_global_ios;
222
223    /* number of io units in flight */
224    int num_global_pending;
225
226    /* preallocated array of iocb pointers, only used in run_active */
227    struct iocb **iocbs;
228
229    /* preallocated array of events */
230    struct io_event *events;
231
232    /* size of the events array */
233    int num_global_events;
234
235    /* latency stats for io_submit */
236    struct io_latency io_submit_latency;
237
238    /* list of operations still in progress, and of those finished */
239    struct io_oper *active_opers;
240    struct io_oper *finished_opers;
241
242    /* number of files this thread is doing io on */
243    int num_files;
244
245    /* how much io this thread did in the last stage */
246    double stage_mb_trans;
247
248    /* latency completion stats i/o time from io_submit until io_getevents */
249    struct io_latency io_completion_latency;
250};
251
252/*
253 * return seconds between start_tv and stop_tv in double precision
254 */
255static double time_since(struct timeval *start_tv, struct timeval *stop_tv)
256{
257    double sec, usec;
258    double ret;
259    sec = stop_tv->tv_sec - start_tv->tv_sec;
260    usec = stop_tv->tv_usec - start_tv->tv_usec;
261    if (sec > 0 && usec < 0) {
262        sec--;
263	usec += 1000000;
264    }
265    ret = sec + usec / (double)1000000;
266    if (ret < 0)
267        ret = 0;
268    return ret;
269}
270
271/*
272 * return seconds between start_tv and now in double precision
273 */
274static double time_since_now(struct timeval *start_tv)
275{
276    struct timeval stop_time;
277    gettimeofday(&stop_time, NULL);
278    return time_since(start_tv, &stop_time);
279}
280
281/*
282 * Add latency info to latency struct
283 */
284static void calc_latency(struct timeval *start_tv, struct timeval *stop_tv,
285			struct io_latency *lat)
286{
287    double delta;
288    int i;
289    delta = time_since(start_tv, stop_tv);
290    delta = delta * 1000;
291
292    if (delta > lat->max)
293    	lat->max = delta;
294    if (!lat->min || delta < lat->min)
295    	lat->min = delta;
296    lat->total_io++;
297    lat->total_lat += delta;
298    for (i = 0 ; i < DEVIATIONS ; i++) {
299        if (delta < deviations[i]) {
300	    lat->deviations[i]++;
301	    break;
302	}
303    }
304}
305
306static void oper_list_add(struct io_oper *oper, struct io_oper **list)
307{
308    if (!*list) {
309        *list = oper;
310	oper->prev = oper->next = oper;
311	return;
312    }
313    oper->prev = (*list)->prev;
314    oper->next = *list;
315    (*list)->prev->next = oper;
316    (*list)->prev = oper;
317    return;
318}
319
320static void oper_list_del(struct io_oper *oper, struct io_oper **list)
321{
322    if ((*list)->next == (*list)->prev && *list == (*list)->next) {
323        *list = NULL;
324	return;
325    }
326    oper->prev->next = oper->next;
327    oper->next->prev = oper->prev;
328    if (*list == oper)
329        *list = oper->next;
330}
331
332/* worker func to check error fields in the io unit */
333static int check_finished_io(struct io_unit *io) {
334    int i;
335    if (io->res != io->buf_size) {
336
337  		 struct stat s;
338  		 fstat(io->io_oper->fd, &s);
339
340  		 /*
341  		  * If file size is large enough for the read, then this short
342  		  * read is an error.
343  		  */
344  		 if ((io->io_oper->rw == READ || io->io_oper->rw == RREAD) &&
345  		     s.st_size > (io->iocb.u.c.offset + io->res)) {
346
347  		 		 fprintf(stderr, "io err %lu (%s) op %d, off %Lu size %d\n",
348  		 		 		 io->res, strerror(-io->res), io->iocb.aio_lio_opcode,
349  		 		 		 io->iocb.u.c.offset, io->buf_size);
350  		 		 io->io_oper->last_err = io->res;
351  		 		 io->io_oper->num_err++;
352  		 		 return -1;
353  		 }
354    }
355    if (verify && io->io_oper->rw == READ) {
356        if (memcmp(io->buf, verify_buf, io->io_oper->reclen)) {
357	    fprintf(stderr, "verify error, file %s offset %Lu contents (offset:bad:good):\n",
358	            io->io_oper->file_name, io->iocb.u.c.offset);
359
360	    for (i = 0 ; i < io->io_oper->reclen ; i++) {
361	        if (io->buf[i] != verify_buf[i]) {
362		    fprintf(stderr, "%d:%c:%c ", i, io->buf[i], verify_buf[i]);
363		}
364	    }
365	    fprintf(stderr, "\n");
366	}
367
368    }
369    return 0;
370}
371
372/* worker func to check the busy bits and get an io unit ready for use */
373static int grab_iou(struct io_unit *io, struct io_oper *oper) {
374    if (io->busy == IO_PENDING)
375        return -1;
376
377    io->busy = IO_PENDING;
378    io->res = 0;
379    io->io_oper = oper;
380    return 0;
381}
382
383char *stage_name(int rw) {
384    switch(rw) {
385    case WRITE:
386        return "write";
387    case READ:
388        return "read";
389    case RWRITE:
390        return "random write";
391    case RREAD:
392        return "random read";
393    }
394    return "unknown";
395}
396
397static inline double oper_mb_trans(struct io_oper *oper) {
398    return ((double)oper->started_ios * (double)oper->reclen) /
399                (double)(1024 * 1024);
400}
401
402static void print_time(struct io_oper *oper) {
403    double runtime;
404    double tput;
405    double mb;
406
407    runtime = time_since_now(&oper->start_time);
408    mb = oper_mb_trans(oper);
409    tput = mb / runtime;
410    fprintf(stderr, "%s on %s (%.2f MB/s) %.2f MB in %.2fs\n",
411	    stage_name(oper->rw), oper->file_name, tput, mb, runtime);
412}
413
414static void print_lat(char *str, struct io_latency *lat) {
415    double avg = lat->total_lat / lat->total_io;
416    int i;
417    double total_counted = 0;
418    fprintf(stderr, "%s min %.2f avg %.2f max %.2f\n\t",
419            str, lat->min, avg, lat->max);
420
421    for (i = 0 ; i < DEVIATIONS ; i++) {
422	fprintf(stderr, " %.0f < %d", lat->deviations[i], deviations[i]);
423	total_counted += lat->deviations[i];
424    }
425    if (total_counted && lat->total_io - total_counted)
426        fprintf(stderr, " < %.0f", lat->total_io - total_counted);
427    fprintf(stderr, "\n");
428    memset(lat, 0, sizeof(*lat));
429}
430
431static void print_latency(struct thread_info *t)
432{
433    struct io_latency *lat = &t->io_submit_latency;
434    print_lat("latency", lat);
435}
436
437static void print_completion_latency(struct thread_info *t)
438{
439    struct io_latency *lat = &t->io_completion_latency;
440    print_lat("completion latency", lat);
441}
442
443/*
444 * updates the fields in the io operation struct that belongs to this
445 * io unit, and make the io unit reusable again
446 */
447void finish_io(struct thread_info *t, struct io_unit *io, long result,
448		struct timeval *tv_now) {
449    struct io_oper *oper = io->io_oper;
450
451    calc_latency(&io->io_start_time, tv_now, &t->io_completion_latency);
452    io->res = result;
453    io->busy = IO_FREE;
454    io->next = t->free_ious;
455    t->free_ious = io;
456    oper->num_pending--;
457    t->num_global_pending--;
458    check_finished_io(io);
459    if (oper->num_pending == 0 &&
460       (oper->started_ios == oper->total_ios || oper->stonewalled))
461    {
462        print_time(oper);
463    }
464}
465
466int read_some_events(struct thread_info *t) {
467    struct io_unit *event_io;
468    struct io_event *event;
469    int nr;
470    int i;
471    int min_nr = io_iter;
472    struct timeval stop_time;
473
474    if (t->num_global_pending < io_iter)
475        min_nr = t->num_global_pending;
476
477#ifdef NEW_GETEVENTS
478    nr = io_getevents(t->io_ctx, min_nr, t->num_global_events, t->events,NULL);
479#else
480    nr = io_getevents(t->io_ctx, t->num_global_events, t->events, NULL);
481#endif
482    if (nr <= 0)
483        return nr;
484
485    gettimeofday(&stop_time, NULL);
486    for (i = 0 ; i < nr ; i++) {
487	event = t->events + i;
488	event_io = (struct io_unit *)((unsigned long)event->obj);
489	finish_io(t, event_io, event->res, &stop_time);
490    }
491    return nr;
492}
493
494/*
495 * finds a free io unit, waiting for pending requests if required.  returns
496 * null if none could be found
497 */
498static struct io_unit *find_iou(struct thread_info *t, struct io_oper *oper)
499{
500    struct io_unit *event_io;
501    int nr;
502
503retry:
504    if (t->free_ious) {
505        event_io = t->free_ious;
506	t->free_ious = t->free_ious->next;
507	if (grab_iou(event_io, oper)) {
508	    fprintf(stderr, "io unit on free list but not free\n");
509	    abort();
510	}
511	return event_io;
512    }
513    nr = read_some_events(t);
514    if (nr > 0)
515    	goto retry;
516    else
517    	fprintf(stderr, "no free ious after read_some_events\n");
518    return NULL;
519}
520
521/*
522 * wait for all pending requests for this io operation to finish
523 */
524static int io_oper_wait(struct thread_info *t, struct io_oper *oper) {
525    struct io_event event;
526    struct io_unit *event_io;
527
528    if (oper == NULL) {
529        return 0;
530    }
531
532    if (oper->num_pending == 0)
533        goto done;
534
535    /* this func is not speed sensitive, no need to go wild reading
536     * more than one event at a time
537     */
538#ifdef NEW_GETEVENTS
539    while (io_getevents(t->io_ctx, 1, 1, &event, NULL) > 0) {
540#else
541    while (io_getevents(t->io_ctx, 1, &event, NULL) > 0) {
542#endif
543	struct timeval tv_now;
544        event_io = (struct io_unit *)((unsigned long)event.obj);
545
546	gettimeofday(&tv_now, NULL);
547	finish_io(t, event_io, event.res, &tv_now);
548
549	if (oper->num_pending == 0)
550	    break;
551    }
552done:
553    if (oper->num_err) {
554        fprintf(stderr, "%u errors on oper, last %u\n",
555	        oper->num_err, oper->last_err);
556    }
557    return 0;
558}
559
560off_t random_byte_offset(struct io_oper *oper) {
561    off_t num;
562    off_t rand_byte = oper->start;
563    off_t range;
564    off_t offset = 1;
565
566    range = (oper->end - oper->start) / (1024 * 1024);
567    if ((page_size_mask+1) > (1024 * 1024))
568        offset = (page_size_mask+1) / (1024 * 1024);
569    if (range < offset)
570        range = 0;
571    else
572        range -= offset;
573
574    /* find a random mb offset */
575    num = 1 + (int)((double)range * rand() / (RAND_MAX + 1.0 ));
576    rand_byte += num * 1024 * 1024;
577
578    /* find a random byte offset */
579    num = 1 + (int)((double)(1024 * 1024) * rand() / (RAND_MAX + 1.0));
580
581    /* page align */
582    num = (num + page_size_mask) & ~page_size_mask;
583    rand_byte += num;
584
585    if (rand_byte + oper->reclen > oper->end) {
586	rand_byte -= oper->reclen;
587    }
588    return rand_byte;
589}
590
591/*
592 * build an aio iocb for an operation, based on oper->rw and the
593 * last offset used.  This finds the struct io_unit that will be attached
594 * to the iocb, and things are ready for submission to aio after this
595 * is called.
596 *
597 * returns null on error
598 */
599static struct io_unit *build_iocb(struct thread_info *t, struct io_oper *oper)
600{
601    struct io_unit *io;
602    off_t rand_byte;
603
604    io = find_iou(t, oper);
605    if (!io) {
606        fprintf(stderr, "unable to find io unit\n");
607	return NULL;
608    }
609
610    switch(oper->rw) {
611    case WRITE:
612        io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen,
613	               oper->last_offset);
614	oper->last_offset += oper->reclen;
615	break;
616    case READ:
617        io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen,
618	              oper->last_offset);
619	oper->last_offset += oper->reclen;
620	break;
621    case RREAD:
622	rand_byte = random_byte_offset(oper);
623	oper->last_offset = rand_byte;
624        io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen,
625	              rand_byte);
626        break;
627    case RWRITE:
628	rand_byte = random_byte_offset(oper);
629	oper->last_offset = rand_byte;
630        io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen,
631	              rand_byte);
632
633        break;
634    }
635
636    return io;
637}
638
639/*
640 * wait for any pending requests, and then free all ram associated with
641 * an operation.  returns the last error the operation hit (zero means none)
642 */
643static int
644finish_oper(struct thread_info *t, struct io_oper *oper)
645{
646    unsigned long last_err;
647
648    io_oper_wait(t, oper);
649    last_err = oper->last_err;
650    if (oper->num_pending > 0) {
651        fprintf(stderr, "oper num_pending is %d\n", oper->num_pending);
652    }
653    close(oper->fd);
654    free(oper);
655    return last_err;
656}
657
658/*
659 * allocates an io operation and fills in all the fields.  returns
660 * null on error
661 */
662static struct io_oper *
663create_oper(int fd, int rw, off_t start, off_t end, int reclen, int depth,
664            int iter, char *file_name)
665{
666    struct io_oper *oper;
667
668    oper = malloc (sizeof(*oper));
669    if (!oper) {
670	fprintf(stderr, "unable to allocate io oper\n");
671	return NULL;
672    }
673    memset(oper, 0, sizeof(*oper));
674
675    oper->depth = depth;
676    oper->start = start;
677    oper->end = end;
678    oper->last_offset = oper->start;
679    oper->fd = fd;
680    oper->reclen = reclen;
681    oper->rw = rw;
682    oper->total_ios = (oper->end - oper->start) / oper->reclen;
683    oper->file_name = file_name;
684
685    return oper;
686}
687
688/*
689 * does setup on num_ios worth of iocbs, but does not actually
690 * start any io
691 */
692int build_oper(struct thread_info *t, struct io_oper *oper, int num_ios,
693               struct iocb **my_iocbs)
694{
695    int i;
696    struct io_unit *io;
697
698    if (oper->started_ios == 0)
699	gettimeofday(&oper->start_time, NULL);
700
701    if (num_ios == 0)
702        num_ios = oper->total_ios;
703
704    if ((oper->started_ios + num_ios) > oper->total_ios)
705        num_ios = oper->total_ios - oper->started_ios;
706
707    for ( i = 0 ; i < num_ios ; i++) {
708	io = build_iocb(t, oper);
709	if (!io) {
710	    return -1;
711	}
712	my_iocbs[i] = &io->iocb;
713    }
714    return num_ios;
715}
716
717/*
718 * runs through the iocbs in the array provided and updates
719 * counters in the associated oper struct
720 */
721static void update_iou_counters(struct iocb **my_iocbs, int nr,
722	struct timeval *tv_now)
723{
724    struct io_unit *io;
725    int i;
726    for (i = 0 ; i < nr ; i++) {
727	io = (struct io_unit *)(my_iocbs[i]);
728	io->io_oper->num_pending++;
729	io->io_oper->started_ios++;
730	io->io_start_time = *tv_now;	/* set time of io_submit */
731    }
732}
733
734/* starts some io for a given file, returns zero if all went well */
735int run_built(struct thread_info *t, int num_ios, struct iocb **my_iocbs)
736{
737    int ret;
738    struct timeval start_time;
739    struct timeval stop_time;
740
741resubmit:
742    gettimeofday(&start_time, NULL);
743    ret = io_submit(t->io_ctx, num_ios, my_iocbs);
744    gettimeofday(&stop_time, NULL);
745    calc_latency(&start_time, &stop_time, &t->io_submit_latency);
746
747    if (ret != num_ios) {
748	/* some ios got through */
749	if (ret > 0) {
750	    update_iou_counters(my_iocbs, ret, &stop_time);
751	    my_iocbs += ret;
752	    t->num_global_pending += ret;
753	    num_ios -= ret;
754	}
755	/*
756	 * we've used all the requests allocated in aio_init, wait and
757	 * retry
758	 */
759	if (ret > 0 || ret == -EAGAIN) {
760	    int old_ret = ret;
761	    if ((ret = read_some_events(t) > 0)) {
762		goto resubmit;
763	    } else {
764	    	fprintf(stderr, "ret was %d and now is %d\n", ret, old_ret);
765		abort();
766	    }
767	}
768
769	fprintf(stderr, "ret %d (%s) on io_submit\n", ret, strerror(-ret));
770	return -1;
771    }
772    update_iou_counters(my_iocbs, ret, &stop_time);
773    t->num_global_pending += ret;
774    return 0;
775}
776
777/*
778 * changes oper->rw to the next in a command sequence, or returns zero
779 * to say this operation is really, completely done for
780 */
781static int restart_oper(struct io_oper *oper) {
782    int new_rw  = 0;
783    if (oper->last_err)
784        return 0;
785
786    /* this switch falls through */
787    switch(oper->rw) {
788    case WRITE:
789	if (stages & (1 << READ))
790	    new_rw = READ;
791    case READ:
792	if (!new_rw && stages & (1 << RWRITE))
793	    new_rw = RWRITE;
794    case RWRITE:
795	if (!new_rw && stages & (1 << RREAD))
796	    new_rw = RREAD;
797    }
798
799    if (new_rw) {
800	oper->started_ios = 0;
801	oper->last_offset = oper->start;
802	oper->stonewalled = 0;
803
804	/*
805	 * we're restarting an operation with pending requests, so the
806	 * timing info won't be printed by finish_io.  Printing it here
807	 */
808	if (oper->num_pending)
809	    print_time(oper);
810
811	oper->rw = new_rw;
812	return 1;
813    }
814    return 0;
815}
816
817static int oper_runnable(struct io_oper *oper) {
818    struct stat buf;
819    int ret;
820
821    /* first context is always runnable, if started_ios > 0, no need to
822     * redo the calculations
823     */
824    if (oper->started_ios || oper->start == 0)
825        return 1;
826    /*
827     * only the sequential phases force delays in starting */
828    if (oper->rw >= RWRITE)
829        return 1;
830    ret = fstat(oper->fd, &buf);
831    if (ret < 0) {
832        perror("fstat");
833	exit(1);
834    }
835    if (S_ISREG(buf.st_mode) && buf.st_size < oper->start)
836        return 0;
837    return 1;
838}
839
840/*
841 * runs through all the io operations on the active list, and starts
842 * a chunk of io on each.  If any io operations are completely finished,
843 * it either switches them to the next stage or puts them on the
844 * finished list.
845 *
846 * this function stops after max_io_submit iocbs are sent down the
847 * pipe, even if it has not yet touched all the operations on the
848 * active list.  Any operations that have finished are moved onto
849 * the finished_opers list.
850 */
851static int run_active_list(struct thread_info *t,
852			 int io_iter,
853			 int max_io_submit)
854{
855    struct io_oper *oper;
856    struct io_oper *built_opers = NULL;
857    struct iocb **my_iocbs = t->iocbs;
858    int ret = 0;
859    int num_built = 0;
860
861    oper = t->active_opers;
862    while (oper) {
863	if (!oper_runnable(oper)) {
864	    oper = oper->next;
865	    if (oper == t->active_opers)
866	        break;
867	    continue;
868	}
869	ret = build_oper(t, oper, io_iter, my_iocbs);
870	if (ret >= 0) {
871	    my_iocbs += ret;
872	    num_built += ret;
873	    oper_list_del(oper, &t->active_opers);
874	    oper_list_add(oper, &built_opers);
875	    oper = t->active_opers;
876	    if (num_built + io_iter > max_io_submit)
877	        break;
878	} else
879	    break;
880    }
881    if (num_built) {
882	ret = run_built(t, num_built, t->iocbs);
883	if (ret < 0) {
884	    fprintf(stderr, "error %d on run_built\n", ret);
885	    exit(1);
886	}
887	while (built_opers) {
888	    oper = built_opers;
889	    oper_list_del(oper, &built_opers);
890	    oper_list_add(oper, &t->active_opers);
891	    if (oper->started_ios == oper->total_ios) {
892		oper_list_del(oper, &t->active_opers);
893		oper_list_add(oper, &t->finished_opers);
894	    }
895	}
896    }
897    return 0;
898}
899
900void drop_shm() {
901    int ret;
902    struct shmid_ds ds;
903    if (use_shm != USE_SHM)
904        return;
905
906    ret = shmctl(shm_id, IPC_RMID, &ds);
907    if (ret) {
908        perror("shmctl IPC_RMID");
909    }
910}
911
912void aio_setup(io_context_t *io_ctx, int n)
913{
914    int res = io_queue_init(n, io_ctx);
915    if (res != 0) {
916	fprintf(stderr, "io_queue_setup(%d) returned %d (%s)\n",
917		n, res, strerror(-res));
918	exit(3);
919    }
920}
921
922/*
923 * allocate io operation and event arrays for a given thread
924 */
925int setup_ious(struct thread_info *t,
926              int num_files, int depth,
927	      int reclen, int max_io_submit) {
928    int i;
929    size_t bytes = num_files * depth * sizeof(*t->ios);
930
931    t->ios = malloc(bytes);
932    if (!t->ios) {
933	fprintf(stderr, "unable to allocate io units\n");
934	return -1;
935    }
936    memset(t->ios, 0, bytes);
937
938    for (i = 0 ; i < depth * num_files; i++) {
939	t->ios[i].buf = aligned_buffer;
940	aligned_buffer += padded_reclen;
941	t->ios[i].buf_size = reclen;
942	if (verify)
943	    memset(t->ios[i].buf, 'b', reclen);
944	else
945	    memset(t->ios[i].buf, 0, reclen);
946	t->ios[i].next = t->free_ious;
947	t->free_ious = t->ios + i;
948    }
949    if (verify) {
950        verify_buf = aligned_buffer;
951        memset(verify_buf, 'b', reclen);
952    }
953
954    t->iocbs = malloc(sizeof(struct iocb *) * max_io_submit);
955    if (!t->iocbs) {
956        fprintf(stderr, "unable to allocate iocbs\n");
957	goto free_buffers;
958    }
959
960    memset(t->iocbs, 0, max_io_submit * sizeof(struct iocb *));
961
962    t->events = malloc(sizeof(struct io_event) * depth * num_files);
963    if (!t->events) {
964        fprintf(stderr, "unable to allocate ram for events\n");
965	goto free_buffers;
966    }
967    memset(t->events, 0, num_files * sizeof(struct io_event)*depth);
968
969    t->num_global_ios = num_files * depth;
970    t->num_global_events = t->num_global_ios;
971    return 0;
972
973free_buffers:
974    if (t->ios)
975        free(t->ios);
976    if (t->iocbs)
977        free(t->iocbs);
978    if (t->events)
979        free(t->events);
980    return -1;
981}
982
983/*
984 * The buffers used for file data are allocated as a single big
985 * malloc, and then each thread and operation takes a piece and uses
986 * that for file data.  This lets us do a large shm or bigpages alloc
987 * and without trying to find a special place in each thread to map the
988 * buffers to
989 */
990int setup_shared_mem(int num_threads, int num_files, int depth,
991                     int reclen, int max_io_submit)
992{
993    char *p = NULL;
994    size_t total_ram;
995
996    padded_reclen = (reclen + page_size_mask) / (page_size_mask+1);
997    padded_reclen = padded_reclen * (page_size_mask+1);
998    total_ram = num_files * depth * padded_reclen + num_threads;
999    if (verify)
1000    	total_ram += padded_reclen;
1001
1002    if (use_shm == USE_MALLOC) {
1003	p = malloc(total_ram + page_size_mask);
1004    } else if (use_shm == USE_SHM) {
1005        shm_id = shmget(IPC_PRIVATE, total_ram, IPC_CREAT | 0700);
1006	if (shm_id < 0) {
1007	    perror("shmget");
1008	    drop_shm();
1009	    goto free_buffers;
1010	}
1011	p = shmat(shm_id, (char *)0x50000000, 0);
1012        if ((long)p == -1) {
1013	    perror("shmat");
1014	    goto free_buffers;
1015	}
1016	/* won't really be dropped until we shmdt */
1017	drop_shm();
1018    } else if (use_shm == USE_SHMFS) {
1019        char mmap_name[16]; /* /dev/shm/ + null + XXXXXX */
1020	int fd;
1021
1022	strcpy(mmap_name, "/dev/shm/XXXXXX");
1023	fd = mkstemp(mmap_name);
1024        if (fd < 0) {
1025	    perror("mkstemp");
1026	    goto free_buffers;
1027	}
1028	unlink(mmap_name);
1029	ftruncate(fd, total_ram);
1030	shm_id = fd;
1031	p = mmap((char *)0x50000000, total_ram,
1032	         PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1033
1034        if (p == MAP_FAILED) {
1035	    perror("mmap");
1036	    goto free_buffers;
1037	}
1038    }
1039    if (!p) {
1040        fprintf(stderr, "unable to allocate buffers\n");
1041	goto free_buffers;
1042    }
1043    unaligned_buffer = p;
1044    p = (char*)((intptr_t) (p + page_size_mask) & ~page_size_mask);
1045    aligned_buffer = p;
1046    return 0;
1047
1048free_buffers:
1049    drop_shm();
1050    if (unaligned_buffer)
1051        free(unaligned_buffer);
1052    return -1;
1053}
1054
1055/*
1056 * runs through all the thread_info structs and calculates a combined
1057 * throughput
1058 */
1059void global_thread_throughput(struct thread_info *t, char *this_stage) {
1060    int i;
1061    double runtime = time_since_now(&global_stage_start_time);
1062    double total_mb = 0;
1063    double min_trans = 0;
1064
1065    for (i = 0 ; i < num_threads ; i++) {
1066        total_mb += global_thread_info[i].stage_mb_trans;
1067	if (!min_trans || t->stage_mb_trans < min_trans)
1068	    min_trans = t->stage_mb_trans;
1069    }
1070    if (total_mb) {
1071	fprintf(stderr, "%s throughput (%.2f MB/s) ", this_stage,
1072	        total_mb / runtime);
1073	fprintf(stderr, "%.2f MB in %.2fs", total_mb, runtime);
1074        if (stonewall)
1075	    fprintf(stderr, " min transfer %.2fMB", min_trans);
1076        fprintf(stderr, "\n");
1077    }
1078}
1079
1080
1081/* this is the meat of the state machine.  There is a list of
1082 * active operations structs, and as each one finishes the required
1083 * io it is moved to a list of finished operations.  Once they have
1084 * all finished whatever stage they were in, they are given the chance
1085 * to restart and pick a different stage (read/write/random read etc)
1086 *
1087 * various timings are printed in between the stages, along with
1088 * thread synchronization if there are more than one threads.
1089 */
1090int worker(struct thread_info *t)
1091{
1092    struct io_oper *oper;
1093    char *this_stage = NULL;
1094    struct timeval stage_time;
1095    int status = 0;
1096    int iteration = 0;
1097    int cnt;
1098
1099    aio_setup(&t->io_ctx, 512);
1100
1101restart:
1102    if (num_threads > 1) {
1103        pthread_mutex_lock(&stage_mutex);
1104	threads_starting++;
1105	if (threads_starting == num_threads) {
1106	    threads_ending = 0;
1107	    gettimeofday(&global_stage_start_time, NULL);
1108	    pthread_cond_broadcast(&stage_cond);
1109	}
1110	while (threads_starting != num_threads)
1111	    pthread_cond_wait(&stage_cond, &stage_mutex);
1112        pthread_mutex_unlock(&stage_mutex);
1113    }
1114    if (t->active_opers) {
1115        this_stage = stage_name(t->active_opers->rw);
1116	gettimeofday(&stage_time, NULL);
1117	t->stage_mb_trans = 0;
1118    }
1119
1120    cnt = 0;
1121    /* first we send everything through aio */
1122    while (t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) {
1123	if (stonewall && threads_ending) {
1124	    oper = t->active_opers;
1125	    oper->stonewalled = 1;
1126	    oper_list_del(oper, &t->active_opers);
1127	    oper_list_add(oper, &t->finished_opers);
1128	} else {
1129	    run_active_list(t, io_iter,  max_io_submit);
1130        }
1131	cnt++;
1132    }
1133    if (latency_stats)
1134        print_latency(t);
1135
1136    if (completion_latency_stats)
1137	print_completion_latency(t);
1138
1139    /* then we wait for all the operations to finish */
1140    oper = t->finished_opers;
1141    do {
1142	if (!oper)
1143		break;
1144	io_oper_wait(t, oper);
1145	oper = oper->next;
1146    } while (oper != t->finished_opers);
1147
1148    /* then we do an fsync to get the timing for any future operations
1149     * right, and check to see if any of these need to get restarted
1150     */
1151    oper = t->finished_opers;
1152    while (oper) {
1153	if (fsync_stages)
1154            fsync(oper->fd);
1155	t->stage_mb_trans += oper_mb_trans(oper);
1156	if (restart_oper(oper)) {
1157	    oper_list_del(oper, &t->finished_opers);
1158	    oper_list_add(oper, &t->active_opers);
1159	    oper = t->finished_opers;
1160	    continue;
1161	}
1162	oper = oper->next;
1163	if (oper == t->finished_opers)
1164	    break;
1165    }
1166
1167    if (t->stage_mb_trans && t->num_files > 0) {
1168        double seconds = time_since_now(&stage_time);
1169	fprintf(stderr, "thread %d %s totals (%.2f MB/s) %.2f MB in %.2fs\n",
1170	        t - global_thread_info, this_stage, t->stage_mb_trans/seconds,
1171		t->stage_mb_trans, seconds);
1172    }
1173
1174    if (num_threads > 1) {
1175	pthread_mutex_lock(&stage_mutex);
1176	threads_ending++;
1177	if (threads_ending == num_threads) {
1178	    threads_starting = 0;
1179	    pthread_cond_broadcast(&stage_cond);
1180	    global_thread_throughput(t, this_stage);
1181	}
1182	while (threads_ending != num_threads)
1183	    pthread_cond_wait(&stage_cond, &stage_mutex);
1184	pthread_mutex_unlock(&stage_mutex);
1185    }
1186
1187    /* someone got restarted, go back to the beginning */
1188    if (t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) {
1189	iteration++;
1190        goto restart;
1191    }
1192
1193    /* finally, free all the ram */
1194    while (t->finished_opers) {
1195	oper = t->finished_opers;
1196	oper_list_del(oper, &t->finished_opers);
1197	status = finish_oper(t, oper);
1198    }
1199
1200    if (t->num_global_pending) {
1201        fprintf(stderr, "global num pending is %d\n", t->num_global_pending);
1202    }
1203    io_queue_release(t->io_ctx);
1204
1205    return status;
1206}
1207
1208typedef void * (*start_routine)(void *);
1209int run_workers(struct thread_info *t, int num_threads)
1210{
1211    int ret;
1212    int thread_ret;
1213    int i;
1214
1215    for (i = 0 ; i < num_threads ; i++) {
1216        ret = pthread_create(&t[i].tid, NULL, (start_routine)worker, t + i);
1217	if (ret) {
1218	    perror("pthread_create");
1219	    exit(1);
1220	}
1221    }
1222    for (i = 0 ; i < num_threads ; i++) {
1223        ret = pthread_join(t[i].tid, (void *)&thread_ret);
1224        if (ret) {
1225	    perror("pthread_join");
1226	    exit(1);
1227	}
1228    }
1229    return 0;
1230}
1231
1232off_t parse_size(char *size_arg, off_t mult) {
1233    char c;
1234    int num;
1235    off_t ret;
1236    c = size_arg[strlen(size_arg) - 1];
1237    if (c > '9') {
1238        size_arg[strlen(size_arg) - 1] = '\0';
1239    }
1240    num = atoi(size_arg);
1241    switch(c) {
1242    case 'g':
1243    case 'G':
1244        mult = 1024 * 1024 * 1024;
1245	break;
1246    case 'm':
1247    case 'M':
1248        mult = 1024 * 1024;
1249	break;
1250    case 'k':
1251    case 'K':
1252        mult = 1024;
1253	break;
1254    case 'b':
1255    case 'B':
1256        mult = 1;
1257	break;
1258    }
1259    ret = mult * num;
1260    return ret;
1261}
1262
1263void print_usage(void) {
1264    printf("usage: aio-stress [-s size] [-r size] [-a size] [-d num] [-b num]\n");
1265    printf("                  [-i num] [-t num] [-c num] [-C size] [-nxhOS ]\n");
1266    printf("                  file1 [file2 ...]\n");
1267    printf("\t-a size in KB at which to align buffers\n");
1268    printf("\t-b max number of iocbs to give io_submit at once\n");
1269    printf("\t-c number of io contexts per file\n");
1270    printf("\t-C offset between contexts, default 2MB\n");
1271    printf("\t-s size in MB of the test file(s), default 1024MB\n");
1272    printf("\t-r record size in KB used for each io, default 64KB\n");
1273    printf("\t-d number of pending aio requests for each file, default 64\n");
1274    printf("\t-i number of ios per file sent before switching\n\t   to the next file, default 8\n");
1275    printf("\t-I total number of ayncs IOs the program will run, default is run until Cntl-C\n");
1276    printf("\t-O Use O_DIRECT (not available in 2.4 kernels),\n");
1277    printf("\t-S Use O_SYNC for writes\n");
1278    printf("\t-o add an operation to the list: write=0, read=1,\n");
1279    printf("\t   random write=2, random read=3.\n");
1280    printf("\t   repeat -o to specify multiple ops: -o 0 -o 1 etc.\n");
1281    printf("\t-m shm use ipc shared memory for io buffers instead of malloc\n");
1282    printf("\t-m shmfs mmap a file in /dev/shm for io buffers\n");
1283    printf("\t-n no fsyncs between write stage and read stage\n");
1284    printf("\t-l print io_submit latencies after each stage\n");
1285    printf("\t-L print io completion latencies after each stage\n");
1286    printf("\t-t number of threads to run\n");
1287    printf("\t-u unlink files after completion\n");
1288    printf("\t-v verification of bytes written\n");
1289    printf("\t-x turn off thread stonewalling\n");
1290    printf("\t-h this message\n");
1291    printf("\n\t   the size options (-a -s and -r) allow modifiers -s 400{k,m,g}\n");
1292    printf("\t   translate to 400KB, 400MB and 400GB\n");
1293    printf("version %s\n", PROG_VERSION);
1294}
1295
1296int main(int ac, char **av)
1297{
1298    int rwfd;
1299    int i;
1300    int j;
1301    int c;
1302
1303    off_t file_size = 1 * 1024 * 1024 * 1024;
1304    int first_stage = WRITE;
1305    struct io_oper *oper;
1306    int status = 0;
1307    int num_files = 0;
1308    int open_fds = 0;
1309    struct thread_info *t;
1310
1311    page_size_mask = getpagesize() - 1;
1312
1313    while (1) {
1314	c = getopt(ac, av, "a:b:c:C:m:s:r:d:i:I:o:t:lLnhOSxvu");
1315	if  (c < 0)
1316	    break;
1317
1318        switch(c) {
1319	case 'a':
1320	    page_size_mask = parse_size(optarg, 1024);
1321	    page_size_mask--;
1322	    break;
1323	case 'c':
1324	    num_contexts = atoi(optarg);
1325	    break;
1326	case 'C':
1327	    context_offset = parse_size(optarg, 1024 * 1024);
1328	case 'b':
1329	    max_io_submit = atoi(optarg);
1330	    break;
1331	case 's':
1332	    file_size = parse_size(optarg, 1024 * 1024);
1333	    break;
1334	case 'd':
1335	    depth = atoi(optarg);
1336	    break;
1337	case 'r':
1338	    rec_len = parse_size(optarg, 1024);
1339	    break;
1340	case 'i':
1341	    io_iter = atoi(optarg);
1342	    break;
1343        case 'I':
1344          iterations = atoi(optarg);
1345        break;
1346	case 'n':
1347	    fsync_stages = 0;
1348	    break;
1349	case 'l':
1350	    latency_stats = 1;
1351	    break;
1352	case 'L':
1353	    completion_latency_stats = 1;
1354	    break;
1355	case 'm':
1356	    if (!strcmp(optarg, "shm")) {
1357		fprintf(stderr, "using ipc shm\n");
1358	        use_shm = USE_SHM;
1359	    } else if (!strcmp(optarg, "shmfs")) {
1360	        fprintf(stderr, "using /dev/shm for buffers\n");
1361		use_shm = USE_SHMFS;
1362	    }
1363	    break;
1364	case 'o':
1365	    i = atoi(optarg);
1366	    stages |= 1 << i;
1367	    fprintf(stderr, "adding stage %s\n", stage_name(i));
1368	    break;
1369	case 'O':
1370	    o_direct = O_DIRECT;
1371	    break;
1372	case 'S':
1373	    o_sync = O_SYNC;
1374	    break;
1375	case 't':
1376	    num_threads = atoi(optarg);
1377	    break;
1378	case 'x':
1379	    stonewall = 0;
1380	    break;
1381	case 'u':
1382	    unlink_files = 1;
1383	    break;
1384	case 'v':
1385	    verify = 1;
1386	    break;
1387	case 'h':
1388	default:
1389	    print_usage();
1390	    exit(1);
1391	}
1392    }
1393
1394    /*
1395     * make sure we don't try to submit more ios than we have allocated
1396     * memory for
1397     */
1398    if (depth < io_iter) {
1399	io_iter = depth;
1400        fprintf(stderr, "dropping io_iter to %d\n", io_iter);
1401    }
1402
1403    if (optind >= ac) {
1404	print_usage();
1405	exit(1);
1406    }
1407
1408    num_files = ac - optind;
1409
1410    if (num_threads > (num_files * num_contexts)) {
1411        num_threads = num_files * num_contexts;
1412	fprintf(stderr, "dropping thread count to the number of contexts %d\n",
1413	        num_threads);
1414    }
1415
1416    t = malloc(num_threads * sizeof(*t));
1417    if (!t) {
1418        perror("malloc");
1419	exit(1);
1420    }
1421    global_thread_info = t;
1422
1423    /* by default, allow a huge number of iocbs to be sent towards
1424     * io_submit
1425     */
1426    if (!max_io_submit)
1427        max_io_submit = num_files * io_iter * num_contexts;
1428
1429    /*
1430     * make sure we don't try to submit more ios than max_io_submit allows
1431     */
1432    if (max_io_submit < io_iter) {
1433        io_iter = max_io_submit;
1434	fprintf(stderr, "dropping io_iter to %d\n", io_iter);
1435    }
1436
1437    if (!stages) {
1438        stages = (1 << WRITE) | (1 << READ) | (1 << RREAD) | (1 << RWRITE);
1439    } else {
1440        for (i = 0 ; i < LAST_STAGE; i++) {
1441	    if (stages & (1 << i)) {
1442	        first_stage = i;
1443		fprintf(stderr, "starting with %s\n", stage_name(i));
1444		break;
1445	    }
1446	}
1447    }
1448
1449    if (file_size < num_contexts * context_offset) {
1450        fprintf(stderr, "file size %Lu too small for %d contexts\n",
1451	        file_size, num_contexts);
1452	exit(1);
1453    }
1454
1455    fprintf(stderr, "file size %LuMB, record size %luKB, depth %d, ios per iteration %d\n", file_size / (1024 * 1024), rec_len / 1024, depth, io_iter);
1456    fprintf(stderr, "max io_submit %d, buffer alignment set to %luKB\n",
1457            max_io_submit, (page_size_mask + 1)/1024);
1458    fprintf(stderr, "threads %d files %d contexts %d context offset %LuMB verification %s\n",
1459            num_threads, num_files, num_contexts,
1460	    context_offset / (1024 * 1024), verify ? "on" : "off");
1461    /* open all the files and do any required setup for them */
1462    for (i = optind ; i < ac ; i++) {
1463	int thread_index;
1464	for (j = 0 ; j < num_contexts ; j++) {
1465	    thread_index = open_fds % num_threads;
1466	    open_fds++;
1467
1468	    rwfd = open(av[i], O_CREAT | O_RDWR | o_direct | o_sync, 0600);
1469	    if (rwfd == -1) {
1470		fprintf(stderr, "error while creating file %s: %s", av[i], strerror(errno));
1471		exit(1);
1472	    }
1473
1474
1475	    oper = create_oper(rwfd, first_stage, j * context_offset,
1476	                       file_size - j * context_offset, rec_len,
1477			       depth, io_iter, av[i]);
1478	    if (!oper) {
1479		fprintf(stderr, "error in create_oper\n");
1480		exit(-1);
1481	    }
1482	    oper_list_add(oper, &t[thread_index].active_opers);
1483	    t[thread_index].num_files++;
1484	}
1485    }
1486    if (setup_shared_mem(num_threads, num_files * num_contexts,
1487                         depth, rec_len, max_io_submit))
1488    {
1489        exit(1);
1490    }
1491    for (i = 0 ; i < num_threads ; i++) {
1492	if (setup_ious(&t[i], t[i].num_files, depth, rec_len, max_io_submit))
1493		exit(1);
1494    }
1495    if (num_threads > 1) {
1496        printf("Running multi thread version num_threads:%d\n", num_threads);
1497        run_workers(t, num_threads);
1498    } else {
1499        printf("Running single thread version \n");
1500	status = worker(t);
1501    }
1502    if (unlink_files) {
1503	for (i = optind ; i < ac ; i++) {
1504	    printf("Cleaning up file %s \n", av[i]);
1505	    unlink(av[i]);
1506	}
1507    }
1508
1509    if (status) {
1510	exit(1);
1511    }
1512    return status;
1513}
1514