1/*
2 * posixaio engine
3 *
4 * IO engine that uses the posix defined aio interface.
5 *
6 */
7#include <stdio.h>
8#include <stdlib.h>
9#include <unistd.h>
10#include <errno.h>
11#include <fcntl.h>
12
13#include "../fio.h"
14
15struct posixaio_data {
16	struct io_u **aio_events;
17	unsigned int queued;
18};
19
20static int fill_timespec(struct timespec *ts)
21{
22#ifdef CONFIG_CLOCK_GETTIME
23#ifdef CONFIG_CLOCK_MONOTONIC
24	clockid_t clk = CLOCK_MONOTONIC;
25#else
26	clockid_t clk = CLOCK_REALTIME;
27#endif
28	if (!clock_gettime(clk, ts))
29		return 0;
30
31	perror("clock_gettime");
32	return 1;
33#else
34	struct timeval tv;
35
36	gettimeofday(&tv, NULL);
37	ts->tv_sec = tv.tv_sec;
38	ts->tv_nsec = tv.tv_usec * 1000;
39	return 0;
40#endif
41}
42
43static unsigned long long ts_utime_since_now(struct timespec *t)
44{
45	long long sec, nsec;
46	struct timespec now;
47
48	if (fill_timespec(&now))
49		return 0;
50
51	sec = now.tv_sec - t->tv_sec;
52	nsec = now.tv_nsec - t->tv_nsec;
53	if (sec > 0 && nsec < 0) {
54		sec--;
55		nsec += 1000000000;
56	}
57
58	sec *= 1000000;
59	nsec /= 1000;
60	return sec + nsec;
61}
62
63static int fio_posixaio_cancel(struct thread_data fio_unused *td,
64			       struct io_u *io_u)
65{
66	struct fio_file *f = io_u->file;
67	int r = aio_cancel(f->fd, &io_u->aiocb);
68
69	if (r == AIO_ALLDONE || r == AIO_CANCELED)
70		return 0;
71
72	return 1;
73}
74
75static int fio_posixaio_prep(struct thread_data fio_unused *td,
76			     struct io_u *io_u)
77{
78	os_aiocb_t *aiocb = &io_u->aiocb;
79	struct fio_file *f = io_u->file;
80
81	aiocb->aio_fildes = f->fd;
82	aiocb->aio_buf = io_u->xfer_buf;
83	aiocb->aio_nbytes = io_u->xfer_buflen;
84	aiocb->aio_offset = io_u->offset;
85	aiocb->aio_sigevent.sigev_notify = SIGEV_NONE;
86
87	io_u->seen = 0;
88	return 0;
89}
90
91#define SUSPEND_ENTRIES	8
92
93static int fio_posixaio_getevents(struct thread_data *td, unsigned int min,
94				  unsigned int max, const struct timespec *t)
95{
96	struct posixaio_data *pd = td->io_ops->data;
97	os_aiocb_t *suspend_list[SUSPEND_ENTRIES];
98	struct timespec start;
99	int have_timeout = 0;
100	int suspend_entries;
101	struct io_u *io_u;
102	unsigned int r;
103	int i;
104
105	if (t && !fill_timespec(&start))
106		have_timeout = 1;
107	else
108		memset(&start, 0, sizeof(start));
109
110	r = 0;
111restart:
112	memset(suspend_list, 0, sizeof(*suspend_list));
113	suspend_entries = 0;
114	io_u_qiter(&td->io_u_all, io_u, i) {
115		int err;
116
117		if (io_u->seen || !(io_u->flags & IO_U_F_FLIGHT))
118			continue;
119
120		err = aio_error(&io_u->aiocb);
121		if (err == EINPROGRESS) {
122			if (suspend_entries < SUSPEND_ENTRIES) {
123				suspend_list[suspend_entries] = &io_u->aiocb;
124				suspend_entries++;
125			}
126			continue;
127		}
128
129		io_u->seen = 1;
130		pd->queued--;
131		pd->aio_events[r++] = io_u;
132
133		if (err == ECANCELED)
134			io_u->resid = io_u->xfer_buflen;
135		else if (!err) {
136			ssize_t retval = aio_return(&io_u->aiocb);
137
138			io_u->resid = io_u->xfer_buflen - retval;
139		} else
140			io_u->error = err;
141	}
142
143	if (r >= min)
144		return r;
145
146	if (have_timeout) {
147		unsigned long long usec;
148
149		usec = (t->tv_sec * 1000000) + (t->tv_nsec / 1000);
150		if (ts_utime_since_now(&start) > usec)
151			return r;
152	}
153
154	/*
155	 * must have some in-flight, wait for at least one
156	 */
157	aio_suspend((const os_aiocb_t * const *)suspend_list,
158							suspend_entries, t);
159	goto restart;
160}
161
162static struct io_u *fio_posixaio_event(struct thread_data *td, int event)
163{
164	struct posixaio_data *pd = td->io_ops->data;
165
166	return pd->aio_events[event];
167}
168
169static int fio_posixaio_queue(struct thread_data *td,
170			      struct io_u *io_u)
171{
172	struct posixaio_data *pd = td->io_ops->data;
173	os_aiocb_t *aiocb = &io_u->aiocb;
174	int ret;
175
176	fio_ro_check(td, io_u);
177
178	if (io_u->ddir == DDIR_READ)
179		ret = aio_read(aiocb);
180	else if (io_u->ddir == DDIR_WRITE)
181		ret = aio_write(aiocb);
182	else if (io_u->ddir == DDIR_TRIM) {
183		if (pd->queued)
184			return FIO_Q_BUSY;
185
186		do_io_u_trim(td, io_u);
187		return FIO_Q_COMPLETED;
188	} else {
189#ifdef CONFIG_POSIXAIO_FSYNC
190		ret = aio_fsync(O_SYNC, aiocb);
191#else
192		if (pd->queued)
193			return FIO_Q_BUSY;
194
195		do_io_u_sync(td, io_u);
196		return FIO_Q_COMPLETED;
197#endif
198	}
199
200	if (ret) {
201		int aio_err = aio_error(aiocb);
202
203		/*
204		 * At least OSX has a very low limit on the number of pending
205		 * IOs, so if it returns EAGAIN, we are out of resources
206		 * to queue more. Just return FIO_Q_BUSY to naturally
207		 * drop off at this depth.
208		 */
209		if (aio_err == EAGAIN)
210			return FIO_Q_BUSY;
211
212		io_u->error = aio_err;
213		td_verror(td, io_u->error, "xfer");
214		return FIO_Q_COMPLETED;
215	}
216
217	pd->queued++;
218	return FIO_Q_QUEUED;
219}
220
221static void fio_posixaio_cleanup(struct thread_data *td)
222{
223	struct posixaio_data *pd = td->io_ops->data;
224
225	if (pd) {
226		free(pd->aio_events);
227		free(pd);
228	}
229}
230
231static int fio_posixaio_init(struct thread_data *td)
232{
233	struct posixaio_data *pd = malloc(sizeof(*pd));
234
235	memset(pd, 0, sizeof(*pd));
236	pd->aio_events = malloc(td->o.iodepth * sizeof(struct io_u *));
237	memset(pd->aio_events, 0, td->o.iodepth * sizeof(struct io_u *));
238
239	td->io_ops->data = pd;
240	return 0;
241}
242
243static struct ioengine_ops ioengine = {
244	.name		= "posixaio",
245	.version	= FIO_IOOPS_VERSION,
246	.init		= fio_posixaio_init,
247	.prep		= fio_posixaio_prep,
248	.queue		= fio_posixaio_queue,
249	.cancel		= fio_posixaio_cancel,
250	.getevents	= fio_posixaio_getevents,
251	.event		= fio_posixaio_event,
252	.cleanup	= fio_posixaio_cleanup,
253	.open_file	= generic_open_file,
254	.close_file	= generic_close_file,
255	.get_file_size	= generic_get_file_size,
256};
257
258static void fio_init fio_posixaio_register(void)
259{
260	register_ioengine(&ioengine);
261}
262
263static void fio_exit fio_posixaio_unregister(void)
264{
265	unregister_ioengine(&ioengine);
266}
267