aio-stress.c revision 5f7ff66f177d38f24c37fca2b8b8e4a7d8852b90
1/*
2 * Copyright (c) 2004 SuSE, Inc.  All Rights Reserved.
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of version 2 of the GNU General Public License as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it would be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
11 *
12 * Further, this software is distributed without any warranty that it is
13 * free of the rightful claim of any third person regarding infringement
14 * or the like.  Any license provided herein, whether implied or
15 * otherwise, applies only to this software file.  Patent licenses, if
16 * any, provided herein do not apply to combinations of this program with
17 * other software, or any other product whatsoever.
18 *
19 * You should have received a copy of the GNU General Public License along
20 * with this program; if not, write the Free Software Foundation, Inc., 59
21 * Temple Place - Suite 330, Boston MA 02111-1307, USA.
22 *
23 * Contact information: Silicon Graphics, Inc., 1600 Amphitheatre Pkwy,
24 * Mountain View, CA  94043, or:
25 *
26 *
27 * aio-stress
28 *
29 * will open or create each file on the command line, and start a series
30 * of aio to it.
31 *
32 * aio is done in a rotating loop.  first file1 gets 8 requests, then
33 * file2, then file3 etc.  As each file finishes writing, it is switched
34 * to reads
35 *
36 * io buffers are aligned in case you want to do raw io
37 *
38 * compile with gcc -Wall -laio -lpthread -o aio-stress aio-stress.c
39 *
40 * run aio-stress -h to see the options
41 *
42 * Please mail Chris Mason (mason@suse.com) with bug reports or patches
43 */
44#define _FILE_OFFSET_BITS 64
45#define PROG_VERSION "0.18"
46#define NEW_GETEVENTS
47
48#include <stdio.h>
49#include <errno.h>
50#include <assert.h>
51#include <stdlib.h>
52
53#include <sys/types.h>
54#include <sys/stat.h>
55#include <fcntl.h>
56#include <unistd.h>
57#include <sys/time.h>
58#include <libaio.h>
59#include <sys/ipc.h>
60#include <sys/shm.h>
61#include <sys/mman.h>
62#include <string.h>
63#include <pthread.h>
64
65#define IO_FREE 0
66#define IO_PENDING 1
67#define RUN_FOREVER -1
68
69#ifndef O_DIRECT
70#define O_DIRECT         040000 /* direct disk access hint */
71#endif
72
73enum {
74    WRITE,
75    READ,
76    RWRITE,
77    RREAD,
78    LAST_STAGE,
79};
80
81#define USE_MALLOC 0
82#define USE_SHM 1
83#define USE_SHMFS 2
84
85/*
86 * various globals, these are effectively read only by the time the threads
87 * are started
88 */
89long stages = 0;
90unsigned long page_size_mask;
91int o_direct = 0;
92int o_sync = 0;
93int latency_stats = 0;
94int io_iter = 8;
95int iterations = RUN_FOREVER;
96int max_io_submit = 0;
97long rec_len = 64 * 1024;
98int depth = 64;
99int num_threads = 1;
100int num_contexts = 1;
101off_t context_offset = 2 * 1024 * 1024;
102int fsync_stages = 1;
103int use_shm = 0;
104int shm_id;
105char *unaligned_buffer = NULL;
106char *aligned_buffer = NULL;
107int padded_reclen = 0;
108int stonewall = 1;
109int verify = 0;
110char *verify_buf = NULL;
111
112struct io_unit;
113struct thread_info;
114
115/* pthread mutexes and other globals for keeping the threads in sync */
116pthread_cond_t stage_cond = PTHREAD_COND_INITIALIZER;
117pthread_mutex_t stage_mutex = PTHREAD_MUTEX_INITIALIZER;
118int threads_ending = 0;
119int threads_starting = 0;
120struct timeval global_stage_start_time;
121struct thread_info *global_thread_info;
122
123/*
124 * latencies during io_submit are measured, these are the
125 * granularities for deviations
126 */
127#define DEVIATIONS 6
128int deviations[DEVIATIONS] = { 100, 250, 500, 1000, 5000, 10000 };
129struct io_latency {
130    double max;
131    double min;
132    double total_io;
133    double total_lat;
134    double deviations[DEVIATIONS];
135};
136
137/* container for a series of operations to a file */
138struct io_oper {
139    /* already open file descriptor, valid for whatever operation you want */
140    int fd;
141
142    /* starting byte of the operation */
143    off_t start;
144
145    /* ending byte of the operation */
146    off_t end;
147
148    /* size of the read/write buffer */
149    int reclen;
150
151    /* max number of pending requests before a wait is triggered */
152    int depth;
153
154    /* current number of pending requests */
155    int num_pending;
156
157    /* last error, zero if there were none */
158    int last_err;
159
160    /* total number of errors hit. */
161    int num_err;
162
163    /* read,write, random, etc */
164    int rw;
165
166    /* number of ios that will get sent to aio */
167    int total_ios;
168
169    /* number of ios we've already sent */
170    int started_ios;
171
172    /* last offset used in an io operation */
173    off_t last_offset;
174
175    /* stonewalled = 1 when we got cut off before submitting all our ios */
176    int stonewalled;
177
178    /* list management */
179    struct io_oper *next;
180    struct io_oper *prev;
181
182    struct timeval start_time;
183
184    char *file_name;
185};
186
187/* a single io, and all the tracking needed for it */
188struct io_unit {
189    /* note, iocb must go first! */
190    struct iocb iocb;
191
192    /* pointer to parent io operation struct */
193    struct io_oper *io_oper;
194
195    /* aligned buffer */
196    char *buf;
197
198    /* size of the aligned buffer (record size) */
199    int buf_size;
200
201    /* state of this io unit (free, pending, done) */
202    int busy;
203
204    /* result of last operation */
205    long res;
206
207    struct io_unit *next;
208};
209
210struct thread_info {
211    io_context_t io_ctx;
212    pthread_t tid;
213
214    /* allocated array of io_unit structs */
215    struct io_unit *ios;
216
217    /* list of io units available for io */
218    struct io_unit *free_ious;
219
220    /* number of io units in the ios array */
221    int num_global_ios;
222
223    /* number of io units in flight */
224    int num_global_pending;
225
226    /* preallocated array of iocb pointers, only used in run_active */
227    struct iocb **iocbs;
228
229    /* preallocated array of events */
230    struct io_event *events;
231
232    /* size of the events array */
233    int num_global_events;
234
235    /* latency stats for io_submit */
236    struct io_latency io_submit_latency;
237
238    /* list of operations still in progress, and of those finished */
239    struct io_oper *active_opers;
240    struct io_oper *finished_opers;
241
242    /* number of files this thread is doing io on */
243    int num_files;
244
245    /* how much io this thread did in the last stage */
246    double stage_mb_trans;
247};
248
249static double time_since(struct timeval *tv) {
250    double sec, usec;
251    double ret;
252    struct timeval stop;
253    gettimeofday(&stop, NULL);
254    sec = stop.tv_sec - tv->tv_sec;
255    usec = stop.tv_usec - tv->tv_usec;
256    if (sec > 0 && usec < 0) {
257        sec--;
258	usec += 1000000;
259    }
260    ret = sec + usec / (double)1000000;
261    if (ret < 0)
262        ret = 0;
263    return ret;
264}
265
266static void calc_latency(struct timeval *tv, struct io_latency *lat)
267{
268    double delta;
269    int i;
270    delta = time_since(tv);
271    delta = delta * 1000;
272
273    if (delta > lat->max)
274    	lat->max = delta;
275    if (!lat->min || delta < lat->min)
276    	lat->min = delta;
277    lat->total_io++;
278    lat->total_lat += delta;
279    for (i = 0 ; i < DEVIATIONS ; i++) {
280        if (delta < deviations[i]) {
281	    lat->deviations[i]++;
282	    break;
283	}
284    }
285}
286
287static void oper_list_add(struct io_oper *oper, struct io_oper **list)
288{
289    if (!*list) {
290        *list = oper;
291	oper->prev = oper->next = oper;
292	return;
293    }
294    oper->prev = (*list)->prev;
295    oper->next = *list;
296    (*list)->prev->next = oper;
297    (*list)->prev = oper;
298    return;
299}
300
301static void oper_list_del(struct io_oper *oper, struct io_oper **list)
302{
303    if ((*list)->next == (*list)->prev && *list == (*list)->next) {
304        *list = NULL;
305	return;
306    }
307    oper->prev->next = oper->next;
308    oper->next->prev = oper->prev;
309    if (*list == oper)
310        *list = oper->next;
311}
312
313/* worker func to check error fields in the io unit */
314static int check_finished_io(struct io_unit *io) {
315    int i;
316    if (io->res != io->buf_size) {
317        fprintf(stderr, "io err %lu (%s) op %d, size %d\n",
318		io->res, strerror(-io->res), io->iocb.aio_lio_opcode,
319		io->buf_size);
320        io->io_oper->last_err = io->res;
321        io->io_oper->num_err++;
322	return -1;
323    }
324    if (verify && io->io_oper->rw == READ) {
325        if (memcmp(io->buf, verify_buf, io->io_oper->reclen)) {
326	    fprintf(stderr, "verify error, file %s offset %Lu contents (offset:bad:good):\n",
327	            io->io_oper->file_name, io->iocb.u.c.offset);
328
329	    for (i = 0 ; i < io->io_oper->reclen ; i++) {
330	        if (io->buf[i] != verify_buf[i]) {
331		    fprintf(stderr, "%d:%c:%c ", i, io->buf[i], verify_buf[i]);
332		}
333	    }
334	    fprintf(stderr, "\n");
335	}
336
337    }
338    return 0;
339}
340
341/* worker func to check the busy bits and get an io unit ready for use */
342static int grab_iou(struct io_unit *io, struct io_oper *oper) {
343    if (io->busy == IO_PENDING)
344        return -1;
345
346    io->busy = IO_PENDING;
347    io->res = 0;
348    io->io_oper = oper;
349    return 0;
350}
351
352char *stage_name(int rw) {
353    switch(rw) {
354    case WRITE:
355        return "write";
356    case READ:
357        return "read";
358    case RWRITE:
359        return "random write";
360    case RREAD:
361        return "random read";
362    }
363    return "unknown";
364}
365
366static inline double oper_mb_trans(struct io_oper *oper) {
367    return ((double)oper->started_ios * (double)oper->reclen) /
368                (double)(1024 * 1024);
369}
370
371static void print_time(struct io_oper *oper) {
372    double runtime;
373    double tput;
374    double mb;
375
376    runtime = time_since(&oper->start_time);
377    mb = oper_mb_trans(oper);
378    tput = mb / runtime;
379    fprintf(stderr, "%s on %s (%.2f MB/s) %.2f MB in %.2fs\n",
380	    stage_name(oper->rw), oper->file_name, tput, mb, runtime);
381}
382
383static void print_latency(struct thread_info *t) {
384    struct io_latency *lat = &t->io_submit_latency;
385    double avg = lat->total_lat / lat->total_io;
386    int i;
387    double total_counted = 0;
388    fprintf(stderr, "latency min %.2f avg %.2f max %.2f\n\t",
389            lat->min, avg, lat->max);
390
391    for (i = 0 ; i < DEVIATIONS ; i++) {
392	fprintf(stderr, " %.0f < %d", lat->deviations[i], deviations[i]);
393	total_counted += lat->deviations[i];
394    }
395    if (total_counted && lat->total_io - total_counted)
396        fprintf(stderr, " < %.0f", lat->total_io - total_counted);
397    fprintf(stderr, "\n");
398    memset(&t->io_submit_latency, 0, sizeof(t->io_submit_latency));
399}
400
401/*
402 * updates the fields in the io operation struct that belongs to this
403 * io unit, and make the io unit reusable again
404 */
405void finish_io(struct thread_info *t, struct io_unit *io, long result) {
406    struct io_oper *oper = io->io_oper;
407
408    io->res = result;
409    io->busy = IO_FREE;
410    io->next = t->free_ious;
411    t->free_ious = io;
412    oper->num_pending--;
413    t->num_global_pending--;
414    check_finished_io(io);
415    if (oper->num_pending == 0 &&
416       (oper->started_ios == oper->total_ios || oper->stonewalled))
417    {
418        print_time(oper);
419    }
420}
421
422int read_some_events(struct thread_info *t) {
423    struct io_unit *event_io;
424    struct io_event *event;
425    int nr;
426    int i;
427    int min_nr = io_iter;
428
429    if (t->num_global_pending < io_iter)
430        min_nr = t->num_global_pending;
431
432#ifdef NEW_GETEVENTS
433    nr = io_getevents(t->io_ctx, min_nr, t->num_global_events, t->events,NULL);
434#else
435    nr = io_getevents(t->io_ctx, t->num_global_events, t->events, NULL);
436#endif
437    if (nr <= 0)
438        return nr;
439
440    for (i = 0 ; i < nr ; i++) {
441	event = t->events + i;
442	event_io = (struct io_unit *)((unsigned long)event->obj);
443	finish_io(t, event_io, event->res);
444    }
445    return nr;
446}
447
448/*
449 * finds a free io unit, waiting for pending requests if required.  returns
450 * null if none could be found
451 */
452static struct io_unit *find_iou(struct thread_info *t, struct io_oper *oper)
453{
454    struct io_unit *event_io;
455    int nr;
456
457retry:
458    if (t->free_ious) {
459        event_io = t->free_ious;
460	t->free_ious = t->free_ious->next;
461	if (grab_iou(event_io, oper)) {
462	    fprintf(stderr, "io unit on free list but not free\n");
463	    abort();
464	}
465	return event_io;
466    }
467    nr = read_some_events(t);
468    if (nr > 0)
469    	goto retry;
470    else
471    	fprintf(stderr, "no free ious after read_some_events\n");
472    return NULL;
473}
474
475/*
476 * wait for all pending requests for this io operation to finish
477 */
478static int io_oper_wait(struct thread_info *t, struct io_oper *oper) {
479    struct io_event event;
480    struct io_unit *event_io;
481
482    if (oper == NULL) {
483        return 0;
484    }
485
486    if (oper->num_pending == 0)
487        goto done;
488
489    /* this func is not speed sensitive, no need to go wild reading
490     * more than one event at a time
491     */
492#ifdef NEW_GETEVENTS
493    while(io_getevents(t->io_ctx, 1, 1, &event, NULL) > 0) {
494#else
495    while(io_getevents(t->io_ctx, 1, &event, NULL) > 0) {
496#endif
497        event_io = (struct io_unit *)((unsigned long)event.obj);
498
499	finish_io(t, event_io, event.res);
500
501	if (oper->num_pending == 0)
502	    break;
503    }
504done:
505    if (oper->num_err) {
506        fprintf(stderr, "%u errors on oper, last %u\n",
507	        oper->num_err, oper->last_err);
508    }
509    return 0;
510}
511
512off_t random_byte_offset(struct io_oper *oper) {
513    off_t num;
514    off_t rand_byte = oper->start;
515    off_t range;
516    off_t offset = 1;
517
518    range = (oper->end - oper->start) / (1024 * 1024);
519    if ((page_size_mask+1) > (1024 * 1024))
520        offset = (page_size_mask+1) / (1024 * 1024);
521    if (range < offset)
522        range = 0;
523    else
524        range -= offset;
525
526    /* find a random mb offset */
527    num = 1 + (int)((double)range * rand() / (RAND_MAX + 1.0 ));
528    rand_byte += num * 1024 * 1024;
529
530    /* find a random byte offset */
531    num = 1 + (int)((double)(1024 * 1024) * rand() / (RAND_MAX + 1.0));
532
533    /* page align */
534    num = (num + page_size_mask) & ~page_size_mask;
535    rand_byte += num;
536
537    if (rand_byte + oper->reclen > oper->end) {
538	rand_byte -= oper->reclen;
539    }
540    return rand_byte;
541}
542
543/*
544 * build an aio iocb for an operation, based on oper->rw and the
545 * last offset used.  This finds the struct io_unit that will be attached
546 * to the iocb, and things are ready for submission to aio after this
547 * is called.
548 *
549 * returns null on error
550 */
551static struct io_unit *build_iocb(struct thread_info *t, struct io_oper *oper)
552{
553    struct io_unit *io;
554    off_t rand_byte;
555
556    io = find_iou(t, oper);
557    if (!io) {
558        fprintf(stderr, "unable to find io unit\n");
559	return NULL;
560    }
561
562    switch(oper->rw) {
563    case WRITE:
564        io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen,
565	               oper->last_offset);
566	oper->last_offset += oper->reclen;
567	break;
568    case READ:
569        io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen,
570	              oper->last_offset);
571	oper->last_offset += oper->reclen;
572	break;
573    case RREAD:
574	rand_byte = random_byte_offset(oper);
575	oper->last_offset = rand_byte;
576        io_prep_pread(&io->iocb,oper->fd, io->buf, oper->reclen,
577	              rand_byte);
578        break;
579    case RWRITE:
580	rand_byte = random_byte_offset(oper);
581	oper->last_offset = rand_byte;
582        io_prep_pwrite(&io->iocb,oper->fd, io->buf, oper->reclen,
583	              rand_byte);
584
585        break;
586    }
587
588    return io;
589}
590
591/*
592 * wait for any pending requests, and then free all ram associated with
593 * an operation.  returns the last error the operation hit (zero means none)
594 */
595static int
596finish_oper(struct thread_info *t, struct io_oper *oper)
597{
598    unsigned long last_err;
599
600    io_oper_wait(t, oper);
601    last_err = oper->last_err;
602    if (oper->num_pending > 0) {
603        fprintf(stderr, "oper num_pending is %d\n", oper->num_pending);
604    }
605    close(oper->fd);
606    free(oper);
607    return last_err;
608}
609
610/*
611 * allocates an io operation and fills in all the fields.  returns
612 * null on error
613 */
614static struct io_oper *
615create_oper(int fd, int rw, off_t start, off_t end, int reclen, int depth,
616            int iter, char *file_name)
617{
618    struct io_oper *oper;
619
620    oper = malloc (sizeof(*oper));
621    if (!oper) {
622	fprintf(stderr, "unable to allocate io oper\n");
623	return NULL;
624    }
625    memset(oper, 0, sizeof(*oper));
626
627    oper->depth = depth;
628    oper->start = start;
629    oper->end = end;
630    oper->last_offset = oper->start;
631    oper->fd = fd;
632    oper->reclen = reclen;
633    oper->rw = rw;
634    oper->total_ios = (oper->end - oper->start) / oper->reclen;
635    oper->file_name = file_name;
636
637    return oper;
638}
639
640/*
641 * does setup on num_ios worth of iocbs, but does not actually
642 * start any io
643 */
644int build_oper(struct thread_info *t, struct io_oper *oper, int num_ios,
645               struct iocb **my_iocbs)
646{
647    int i;
648    struct io_unit *io;
649
650    if (oper->started_ios == 0)
651	gettimeofday(&oper->start_time, NULL);
652
653    if (num_ios == 0)
654        num_ios = oper->total_ios;
655
656    if ((oper->started_ios + num_ios) > oper->total_ios)
657        num_ios = oper->total_ios - oper->started_ios;
658
659    for( i = 0 ; i < num_ios ; i++) {
660	io = build_iocb(t, oper);
661	if (!io) {
662	    return -1;
663	}
664	my_iocbs[i] = &io->iocb;
665    }
666    return num_ios;
667}
668
669/*
670 * runs through the iocbs in the array provided and updates
671 * counters in the associated oper struct
672 */
673static void update_iou_counters(struct iocb **my_iocbs, int nr)
674{
675    struct io_unit *io;
676    int i;
677    for (i = 0 ; i < nr ; i++) {
678	io = (struct io_unit *)(my_iocbs[i]);
679	io->io_oper->num_pending++;
680	io->io_oper->started_ios++;
681    }
682}
683
684/* starts some io for a given file, returns zero if all went well */
685int run_built(struct thread_info *t, int num_ios, struct iocb **my_iocbs)
686{
687    int ret;
688    struct timeval start_time;
689
690resubmit:
691    gettimeofday(&start_time, NULL);
692    ret = io_submit(t->io_ctx, num_ios, my_iocbs);
693    calc_latency(&start_time, &t->io_submit_latency);
694    if (ret != num_ios) {
695	/* some ios got through */
696	if (ret > 0) {
697	    update_iou_counters(my_iocbs, ret);
698	    my_iocbs += ret;
699	    t->num_global_pending += ret;
700	    num_ios -= ret;
701	}
702	/*
703	 * we've used all the requests allocated in aio_init, wait and
704	 * retry
705	 */
706	if (ret > 0 || ret == -EAGAIN) {
707	    if ((ret = read_some_events(t) > 0)) {
708		goto resubmit;
709	    }
710	}
711
712	fprintf(stderr, "ret %d (%s) on io_submit\n", ret, strerror(-ret));
713	return -1;
714    }
715    update_iou_counters(my_iocbs, ret);
716    t->num_global_pending += ret;
717    return 0;
718}
719
720/*
721 * changes oper->rw to the next in a command sequence, or returns zero
722 * to say this operation is really, completely done for
723 */
724static int restart_oper(struct io_oper *oper) {
725    int new_rw  = 0;
726    if (oper->last_err)
727        return 0;
728
729    /* this switch falls through */
730    switch(oper->rw) {
731    case WRITE:
732	if (stages & (1 << READ))
733	    new_rw = READ;
734    case READ:
735	if (!new_rw && stages & (1 << RWRITE))
736	    new_rw = RWRITE;
737    case RWRITE:
738	if (!new_rw && stages & (1 << RREAD))
739	    new_rw = RREAD;
740    }
741
742    if (new_rw) {
743	oper->started_ios = 0;
744	oper->last_offset = oper->start;
745	oper->stonewalled = 0;
746
747	/*
748	 * we're restarting an operation with pending requests, so the
749	 * timing info won't be printed by finish_io.  Printing it here
750	 */
751	if (oper->num_pending)
752	    print_time(oper);
753
754	oper->rw = new_rw;
755	return 1;
756    }
757    return 0;
758}
759
760static int oper_runnable(struct io_oper *oper) {
761    struct stat buf;
762    int ret;
763
764    /* first context is always runnable, if started_ios > 0, no need to
765     * redo the calculations
766     */
767    if (oper->started_ios || oper->start == 0)
768        return 1;
769    /*
770     * only the sequential phases force delays in starting */
771    if (oper->rw >= RWRITE)
772        return 1;
773    ret = fstat(oper->fd, &buf);
774    if (ret < 0) {
775        perror("fstat");
776	exit(1);
777    }
778    if (S_ISREG(buf.st_mode) && buf.st_size < oper->start)
779        return 0;
780    return 1;
781}
782
783/*
784 * runs through all the io operations on the active list, and starts
785 * a chunk of io on each.  If any io operations are completely finished,
786 * it either switches them to the next stage or puts them on the
787 * finished list.
788 *
789 * this function stops after max_io_submit iocbs are sent down the
790 * pipe, even if it has not yet touched all the operations on the
791 * active list.  Any operations that have finished are moved onto
792 * the finished_opers list.
793 */
794static int run_active_list(struct thread_info *t,
795			 int io_iter,
796			 int max_io_submit)
797{
798    struct io_oper *oper;
799    struct io_oper *built_opers = NULL;
800    struct iocb **my_iocbs = t->iocbs;
801    int ret = 0;
802    int num_built = 0;
803
804    oper = t->active_opers;
805    while(oper) {
806	if (!oper_runnable(oper)) {
807	    oper = oper->next;
808	    if (oper == t->active_opers)
809	        break;
810	    continue;
811	}
812	ret = build_oper(t, oper, io_iter, my_iocbs);
813	if (ret >= 0) {
814	    my_iocbs += ret;
815	    num_built += ret;
816	    oper_list_del(oper, &t->active_opers);
817	    oper_list_add(oper, &built_opers);
818	    oper = t->active_opers;
819	    if (num_built + io_iter > max_io_submit)
820	        break;
821	} else
822	    break;
823    }
824    if (num_built) {
825	ret = run_built(t, num_built, t->iocbs);
826	if (ret < 0) {
827	    fprintf(stderr, "error %d on run_built\n", ret);
828	    exit(1);
829	}
830	while(built_opers) {
831	    oper = built_opers;
832	    oper_list_del(oper, &built_opers);
833	    oper_list_add(oper, &t->active_opers);
834	    if (oper->started_ios == oper->total_ios) {
835		oper_list_del(oper, &t->active_opers);
836		oper_list_add(oper, &t->finished_opers);
837	    }
838	}
839    }
840    return 0;
841}
842
843void drop_shm() {
844    int ret;
845    struct shmid_ds ds;
846    if (use_shm != USE_SHM)
847        return;
848
849    ret = shmctl(shm_id, IPC_RMID, &ds);
850    if (ret) {
851        perror("shmctl IPC_RMID");
852    }
853}
854
855void aio_setup(io_context_t *io_ctx, int n)
856{
857    int res = io_queue_init(n, io_ctx);
858    if (res != 0) {
859	fprintf(stderr, "io_queue_setup(%d) returned %d (%s)\n",
860		n, res, strerror(-res));
861	exit(3);
862    }
863}
864
865/*
866 * allocate io operation and event arrays for a given thread
867 */
868int setup_ious(struct thread_info *t,
869              int num_files, int depth,
870	      int reclen, int max_io_submit) {
871    int i;
872    size_t bytes = num_files * depth * sizeof(*t->ios);
873
874    t->ios = malloc(bytes);
875    if (!t->ios) {
876	fprintf(stderr, "unable to allocate io units\n");
877	return -1;
878    }
879    memset(t->ios, 0, bytes);
880
881    for (i = 0 ; i < depth * num_files; i++) {
882	t->ios[i].buf = aligned_buffer;
883	aligned_buffer += padded_reclen;
884	t->ios[i].buf_size = reclen;
885	if (verify)
886	    memset(t->ios[i].buf, 'b', reclen);
887	else
888	    memset(t->ios[i].buf, 0, reclen);
889	t->ios[i].next = t->free_ious;
890	t->free_ious = t->ios + i;
891    }
892    if (verify) {
893        verify_buf = aligned_buffer;
894        memset(verify_buf, 'b', reclen);
895    }
896
897    t->iocbs = malloc(sizeof(struct iocb *) * max_io_submit);
898    if (!t->iocbs) {
899        fprintf(stderr, "unable to allocate iocbs\n");
900	goto free_buffers;
901    }
902
903    memset(t->iocbs, 0, max_io_submit * sizeof(struct iocb *));
904
905    t->events = malloc(sizeof(struct io_event) * depth * num_files);
906    if (!t->events) {
907        fprintf(stderr, "unable to allocate ram for events\n");
908	goto free_buffers;
909    }
910    memset(t->events, 0, num_files * sizeof(struct io_event)*depth);
911
912    t->num_global_ios = num_files * depth;
913    t->num_global_events = t->num_global_ios;
914    return 0;
915
916free_buffers:
917    if (t->ios)
918        free(t->ios);
919    if (t->iocbs)
920        free(t->iocbs);
921    if (t->events)
922        free(t->events);
923    return -1;
924}
925
926/*
927 * The buffers used for file data are allocated as a single big
928 * malloc, and then each thread and operation takes a piece and uses
929 * that for file data.  This lets us do a large shm or bigpages alloc
930 * and without trying to find a special place in each thread to map the
931 * buffers to
932 */
933int setup_shared_mem(int num_threads, int num_files, int depth,
934                     int reclen, int max_io_submit)
935{
936    char *p = NULL;
937    size_t total_ram;
938
939    padded_reclen = (reclen + page_size_mask) / (page_size_mask+1);
940    padded_reclen = padded_reclen * (page_size_mask+1);
941    total_ram = num_files * depth * padded_reclen + num_threads;
942    if (verify)
943    	total_ram += padded_reclen;
944
945    if (use_shm == USE_MALLOC) {
946	p = malloc(total_ram + page_size_mask);
947    } else if (use_shm == USE_SHM) {
948        shm_id = shmget(IPC_PRIVATE, total_ram, IPC_CREAT | 0700);
949	if (shm_id < 0) {
950	    perror("shmget");
951	    drop_shm();
952	    goto free_buffers;
953	}
954	p = shmat(shm_id, (char *)0x50000000, 0);
955        if ((long)p == -1) {
956	    perror("shmat");
957	    goto free_buffers;
958	}
959	/* won't really be dropped until we shmdt */
960	drop_shm();
961    } else if (use_shm == USE_SHMFS) {
962        char mmap_name[16]; /* /dev/shm/ + null + XXXXXX */
963	int fd;
964
965	strcpy(mmap_name, "/dev/shm/XXXXXX");
966	fd = mkstemp(mmap_name);
967        if (fd < 0) {
968	    perror("mkstemp");
969	    goto free_buffers;
970	}
971	unlink(mmap_name);
972	ftruncate(fd, total_ram);
973	shm_id = fd;
974	p = mmap((char *)0x50000000, total_ram,
975	         PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
976
977        if (p == MAP_FAILED) {
978	    perror("mmap");
979	    goto free_buffers;
980	}
981    }
982    if (!p) {
983        fprintf(stderr, "unable to allocate buffers\n");
984	goto free_buffers;
985    }
986    unaligned_buffer = p;
987    (unsigned long)p = ((unsigned long) (p + page_size_mask) & ~page_size_mask);
988    aligned_buffer = p;
989    return 0;
990
991free_buffers:
992    drop_shm();
993    if (unaligned_buffer)
994        free(unaligned_buffer);
995    return -1;
996}
997
998/*
999 * runs through all the thread_info structs and calculates a combined
1000 * throughput
1001 */
1002void global_thread_throughput(struct thread_info *t, char *this_stage) {
1003    int i;
1004    double runtime = time_since(&global_stage_start_time);
1005    double total_mb = 0;
1006    double min_trans = 0;
1007
1008    for (i = 0 ; i < num_threads ; i++) {
1009        total_mb += global_thread_info[i].stage_mb_trans;
1010	if (!min_trans || t->stage_mb_trans < min_trans)
1011	    min_trans = t->stage_mb_trans;
1012    }
1013    if (total_mb) {
1014	fprintf(stderr, "%s throughput (%.2f MB/s) ", this_stage,
1015	        total_mb / runtime);
1016	fprintf(stderr, "%.2f MB in %.2fs", total_mb, runtime);
1017        if (stonewall)
1018	    fprintf(stderr, " min transfer %.2fMB", min_trans);
1019        fprintf(stderr, "\n");
1020    }
1021}
1022
1023
1024/* this is the meat of the state machine.  There is a list of
1025 * active operations structs, and as each one finishes the required
1026 * io it is moved to a list of finished operations.  Once they have
1027 * all finished whatever stage they were in, they are given the chance
1028 * to restart and pick a different stage (read/write/random read etc)
1029 *
1030 * various timings are printed in between the stages, along with
1031 * thread synchronization if there are more than one threads.
1032 */
1033int worker(struct thread_info *t)
1034{
1035    struct io_oper *oper;
1036    char *this_stage = NULL;
1037    struct timeval stage_time;
1038    int status = 0;
1039    int iteration = 0;
1040    int cnt;
1041
1042    aio_setup(&t->io_ctx, 512);
1043
1044restart:
1045    printf("Starting %s iter:%d \n", __FUNCTION__,iteration);
1046    if (num_threads > 1) {
1047        printf("num_threads %d \n", num_threads);
1048        pthread_mutex_lock(&stage_mutex);
1049        threads_starting++;
1050        if (threads_starting == num_threads) {
1051            threads_ending = 0;
1052            gettimeofday(&global_stage_start_time, NULL);
1053            pthread_cond_broadcast(&stage_cond);
1054        }
1055        while (threads_starting != num_threads)
1056            pthread_cond_wait(&stage_cond, &stage_mutex);
1057        pthread_mutex_unlock(&stage_mutex);
1058    }
1059    if (t->active_opers) {
1060//        printf("active_opers %p line:%d\n", t->active_opers, __LINE__);
1061        this_stage = stage_name(t->active_opers->rw);
1062        gettimeofday(&stage_time, NULL);
1063        t->stage_mb_trans = 0;
1064    }
1065    cnt = 0;
1066    /* first we send everything through aio */
1067//    printf("cnt:%d max_iterations:%d oper:%p\n",cnt, iterations,oper);
1068
1069    while (t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) {
1070        printf("active_opers %p line:%d cnt:%d ", t->active_opers,__LINE__,cnt);
1071        if (stonewall && threads_ending) {
1072            oper = t->active_opers;
1073            oper->stonewalled = 1;
1074            oper_list_del(oper, &t->active_opers);
1075            oper_list_add(oper, &t->finished_opers);
1076            printf(" if branch\n");
1077        } else {
1078            run_active_list(t, io_iter,  max_io_submit);
1079            printf(" else branch\n");
1080        }
1081        cnt++;
1082    }
1083
1084    if (latency_stats)
1085        print_latency(t);
1086
1087    /* then we wait for all the operations to finish */
1088    oper = t->finished_opers;
1089//    printf("line:%d oper:%p\n", __LINE__, oper);
1090    do {
1091        io_oper_wait(t, oper);
1092        if (oper != NULL) {
1093            oper = oper->next;
1094        }
1095    } while (oper != t->finished_opers);
1096//    printf("finished_opers %p line:%d\n", t->finished_opers,__LINE__);
1097
1098    /* then we do an fsync to get the timing for any future operations
1099     * right, and check to see if any of these need to get restarted
1100     */
1101    oper = t->finished_opers;
1102//    printf("oper %p line:%d\n", oper,__LINE__);
1103    while (oper) {
1104        if (fsync_stages)
1105            fsync(oper->fd);
1106        t->stage_mb_trans += oper_mb_trans(oper);
1107        if (restart_oper(oper)) {
1108            oper_list_del(oper, &t->finished_opers);
1109            oper_list_add(oper, &t->active_opers);
1110            oper = t->finished_opers;
1111            continue;
1112        }
1113        oper = oper->next;
1114        if (oper == t->finished_opers)
1115            break;
1116    }
1117
1118    if (t->stage_mb_trans && t->num_files > 0) {
1119//        printf("num_files %d line:%d\n", t->num_files,__LINE__);
1120        double seconds = time_since(&stage_time);
1121        fprintf(stderr, "thread %d %s totals (%.2f MB/s) %.2f MB in %.2fs\n",
1122                t - global_thread_info, this_stage, t->stage_mb_trans/seconds,
1123                t->stage_mb_trans, seconds);
1124    }
1125
1126    if (num_threads > 1) {
1127//        printf("num_threads %d line:%d\n", num_threads,__LINE__);
1128        pthread_mutex_lock(&stage_mutex);
1129        threads_ending++;
1130        if (threads_ending == num_threads) {
1131            threads_starting = 0;
1132            pthread_cond_broadcast(&stage_cond);
1133            global_thread_throughput(t, this_stage);
1134        }
1135//        printf("threads_ending %d line:%d\n", threads_ending,__LINE__);
1136        while (threads_ending != num_threads)
1137            pthread_cond_wait(&stage_cond, &stage_mutex);
1138        pthread_mutex_unlock(&stage_mutex);
1139    }
1140
1141    /* someone got restarted, go back to the beginning */
1142    if (t->active_opers && (cnt < iterations || iterations == RUN_FOREVER)) {
1143        iteration++;
1144        goto restart;
1145    }
1146
1147    /* finally, free all the ram */
1148//    printf("finished_opers %p line:%d\n", t->finished_opers,__LINE__);
1149    while (t->finished_opers) {
1150        oper = t->finished_opers;
1151        oper_list_del(oper, &t->finished_opers);
1152        status = finish_oper(t, oper);
1153    }
1154
1155    if (t->num_global_pending) {
1156        fprintf(stderr, "global num pending is %d\n", t->num_global_pending);
1157    }
1158    io_queue_release(t->io_ctx);
1159
1160    return status;
1161}
1162
1163typedef void * (*start_routine)(void *);
1164int run_workers(struct thread_info *t, int num_threads)
1165{
1166    int ret;
1167    int thread_ret;
1168    int i;
1169
1170//    printf("%s num_threads %d line:%d\n", __FUNCTION__,num_threads,__LINE__);
1171    for(i = 0 ; i < num_threads ; i++) {
1172        ret = pthread_create(&t[i].tid, NULL, (start_routine)worker, t + i);
1173	if (ret) {
1174	    perror("pthread_create");
1175	    exit(1);
1176	}
1177    }
1178    for(i = 0 ; i < num_threads ; i++) {
1179        ret = pthread_join(t[i].tid, (void *)&thread_ret);
1180        if (ret) {
1181	    perror("pthread_join");
1182	    exit(1);
1183	}
1184    }
1185    return 0;
1186}
1187
1188off_t parse_size(char *size_arg, off_t mult) {
1189    char c;
1190    int num;
1191    off_t ret;
1192    c = size_arg[strlen(size_arg) - 1];
1193    if (c > '9') {
1194        size_arg[strlen(size_arg) - 1] = '\0';
1195    }
1196    num = atoi(size_arg);
1197    switch(c) {
1198    case 'g':
1199    case 'G':
1200        mult = 1024 * 1024 * 1024;
1201	break;
1202    case 'm':
1203    case 'M':
1204        mult = 1024 * 1024;
1205	break;
1206    case 'k':
1207    case 'K':
1208        mult = 1024;
1209	break;
1210    }
1211    ret = mult * num;
1212    return ret;
1213}
1214
1215void print_usage(void) {
1216    printf("usage: aio-stress [-s size] [-r size] [-a size] [-d num] [-b num]\n");
1217    printf("                  [-i num] [-t num] [-c num] [-C size] [-nxhOS ]\n");
1218    printf("                  file1 [file2 ...]\n");
1219    printf("\t-a size in KB at which to align buffers\n");
1220    printf("\t-b max number of iocbs to give io_submit at once\n");
1221    printf("\t-c number of io contexts per file\n");
1222    printf("\t-C offset between contexts, default 2MB\n");
1223    printf("\t-s size in MB of the test file(s), default 1024MB\n");
1224    printf("\t-r record size in KB used for each io, default 64KB\n");
1225    printf("\t-d number of pending aio requests for each file, default 64\n");
1226    printf("\t-i number of ios per file sent before switching\n\t   to the next file, default 8\n");
1227    printf("\t-I total number of ayncs IOs the program will run, default is run until Cntl-C\n");
1228    printf("\t-O Use O_DIRECT (not available in 2.4 kernels),\n");
1229    printf("\t-S Use O_SYNC for writes\n");
1230    printf("\t-o add an operation to the list: write=0, read=1,\n");
1231    printf("\t   random write=2, random read=3.\n");
1232    printf("\t   repeat -o to specify multiple ops: -o 0 -o 1 etc.\n");
1233    printf("\t-m shm use ipc shared memory for io buffers instead of malloc\n");
1234    printf("\t-m shmfs mmap a file in /dev/shm for io buffers\n");
1235    printf("\t-n no fsyncs between write stage and read stage\n");
1236    printf("\t-l print io_submit latencies after each stage\n");
1237    printf("\t-t number of threads to run\n");
1238    printf("\t-v verification of bytes written\n");
1239    printf("\t-x turn off thread stonewalling\n");
1240    printf("\t-h this message\n");
1241    printf("\n\t   the size options (-a -s and -r) allow modifiers -s 400{k,m,g}\n");
1242    printf("\t   translate to 400KB, 400MB and 400GB\n");
1243    printf("version %s\n", PROG_VERSION);
1244}
1245
1246int main(int ac, char **av)
1247{
1248    int rwfd;
1249    int i;
1250    int j;
1251    int c;
1252
1253    off_t file_size = 1 * 1024 * 1024 * 1024;
1254    int first_stage = WRITE;
1255    struct io_oper *oper;
1256    int status = 0;
1257    int num_files = 0;
1258    int open_fds = 0;
1259    struct thread_info *t;
1260
1261    page_size_mask = getpagesize() - 1;
1262
1263    while(1) {
1264	c = getopt(ac, av, "a:b:c:C:m:s:r:d:i:I:o:t:lnhOSxv");
1265	if  (c < 0)
1266	    break;
1267
1268        switch(c) {
1269	case 'a':
1270	    page_size_mask = parse_size(optarg, 1024);
1271	    page_size_mask--;
1272	    break;
1273	case 'c':
1274	    num_contexts = atoi(optarg);
1275	    break;
1276	case 'C':
1277	    context_offset = parse_size(optarg, 1024 * 1024);
1278	case 'b':
1279	    max_io_submit = atoi(optarg);
1280	    break;
1281	case 's':
1282	    file_size = parse_size(optarg, 1024 * 1024);
1283	    break;
1284	case 'd':
1285	    depth = atoi(optarg);
1286	    break;
1287	case 'r':
1288	    rec_len = parse_size(optarg, 1024);
1289	    break;
1290	case 'i':
1291	    io_iter = atoi(optarg);
1292	    break;
1293        case 'I':
1294          iterations = atoi(optarg);
1295        break;
1296	case 'n':
1297	    fsync_stages = 0;
1298	    break;
1299	case 'l':
1300	    latency_stats = 1;
1301	    break;
1302	case 'm':
1303	    if (!strcmp(optarg, "shm")) {
1304		fprintf(stderr, "using ipc shm\n");
1305	        use_shm = USE_SHM;
1306	    } else if (!strcmp(optarg, "shmfs")) {
1307	        fprintf(stderr, "using /dev/shm for buffers\n");
1308		use_shm = USE_SHMFS;
1309	    }
1310	    break;
1311	case 'o':
1312	    i = atoi(optarg);
1313	    stages |= 1 << i;
1314	    fprintf(stderr, "adding stage %s\n", stage_name(i));
1315	    break;
1316	case 'O':
1317	    o_direct = O_DIRECT;
1318	    break;
1319	case 'S':
1320	    o_sync = O_SYNC;
1321	    break;
1322	case 't':
1323	    num_threads = atoi(optarg);
1324	    break;
1325	case 'x':
1326	    stonewall = 0;
1327	    break;
1328	case 'v':
1329	    verify = 1;
1330	    break;
1331	case 'h':
1332	default:
1333	    print_usage();
1334	    exit(1);
1335	}
1336    }
1337
1338    /*
1339     * make sure we don't try to submit more ios than we have allocated
1340     * memory for
1341     */
1342    if (depth < io_iter) {
1343	io_iter = depth;
1344        fprintf(stderr, "dropping io_iter to %d\n", io_iter);
1345    }
1346
1347    if (optind >= ac) {
1348	print_usage();
1349	exit(1);
1350    }
1351
1352    num_files = ac - optind;
1353
1354    if (num_threads > (num_files * num_contexts)) {
1355        num_threads = num_files * num_contexts;
1356	fprintf(stderr, "dropping thread count to the number of contexts %d\n",
1357	        num_threads);
1358    }
1359
1360    t = malloc(num_threads * sizeof(*t));
1361    if (!t) {
1362        perror("malloc");
1363	exit(1);
1364    }
1365    global_thread_info = t;
1366
1367    /* by default, allow a huge number of iocbs to be sent towards
1368     * io_submit
1369     */
1370    if (!max_io_submit)
1371        max_io_submit = num_files * io_iter * num_contexts;
1372
1373    /*
1374     * make sure we don't try to submit more ios than max_io_submit allows
1375     */
1376    if (max_io_submit < io_iter) {
1377        io_iter = max_io_submit;
1378	fprintf(stderr, "dropping io_iter to %d\n", io_iter);
1379    }
1380
1381    if (!stages) {
1382        stages = (1 << WRITE) | (1 << READ) | (1 << RREAD) | (1 << RWRITE);
1383    } else {
1384        for (i = 0 ; i < LAST_STAGE; i++) {
1385	    if (stages & (1 << i)) {
1386	        first_stage = i;
1387		fprintf(stderr, "starting with %s\n", stage_name(i));
1388		break;
1389	    }
1390	}
1391    }
1392
1393    if (file_size < num_contexts * context_offset) {
1394        fprintf(stderr, "file size %Lu too small for %d contexts\n",
1395	        file_size, num_contexts);
1396	exit(1);
1397    }
1398
1399    fprintf(stderr, "file size %LuMB, record size %luKB, depth %d, ios per iteration %d\n", file_size / (1024 * 1024), rec_len / 1024, depth, io_iter);
1400    fprintf(stderr, "max io_submit %d, buffer alignment set to %luKB\n",
1401            max_io_submit, (page_size_mask + 1)/1024);
1402    fprintf(stderr, "threads %d files %d contexts %d context offset %LuMB verification %s\n",
1403            num_threads, num_files, num_contexts,
1404	    context_offset / (1024 * 1024), verify ? "on" : "off");
1405    /* open all the files and do any required setup for them */
1406    for (i = optind ; i < ac ; i++) {
1407	int thread_index;
1408	for (j = 0 ; j < num_contexts ; j++) {
1409	    thread_index = open_fds % num_threads;
1410	    open_fds++;
1411	    fprintf(stderr, "adding file %s thread %d\n", av[i], thread_index);
1412
1413	    rwfd = open(av[i], O_CREAT | O_RDWR | o_direct | o_sync, 0600);
1414	    assert(rwfd != -1);
1415
1416	    oper = create_oper(rwfd, first_stage, j * context_offset,
1417	                       file_size - j * context_offset, rec_len,
1418			       depth, io_iter, av[i]);
1419	    if (!oper) {
1420		fprintf(stderr, "error in create_oper\n");
1421		exit(-1);
1422	    }
1423	    oper_list_add(oper, &t[thread_index].active_opers);
1424	    t[thread_index].num_files++;
1425	}
1426    }
1427    if (setup_shared_mem(num_threads, num_files * num_contexts,
1428                         depth, rec_len, max_io_submit))
1429    {
1430        exit(1);
1431    }
1432    for (i = 0 ; i < num_threads ; i++) {
1433	if (setup_ious(&t[i], t[i].num_files, depth, rec_len, max_io_submit))
1434		exit(1);
1435    }
1436    if (num_threads > 1){
1437        printf("Running multi thread version num_threads:%d\n", num_threads);
1438        run_workers(t, num_threads);
1439    }
1440    else {
1441        printf("Running single thread version \n");
1442        status = worker(t);
1443    }
1444
1445    if (status) {
1446    printf("non zero return %d \n", status);
1447	exit(1);
1448    }
1449    return status;
1450}
1451
1452