io_u.c revision e1161c325f7866bae879e686d1c673ca32ab09ae
1#include <unistd.h>
2#include <fcntl.h>
3#include <string.h>
4#include <signal.h>
5#include <time.h>
6#include <assert.h>
7
8#include "fio.h"
9#include "os.h"
10
11struct io_completion_data {
12	int nr;				/* input */
13	endio_handler *handler;		/* input */
14
15	int error;			/* output */
16	unsigned long bytes_done[2];	/* output */
17	struct timeval time;		/* output */
18};
19
20/*
21 * The ->file_map[] contains a map of blocks we have or have not done io
22 * to yet. Used to make sure we cover the entire range in a fair fashion.
23 */
24static int random_map_free(struct thread_data *td, struct fio_file *f,
25			   unsigned long long block)
26{
27	unsigned int idx = RAND_MAP_IDX(td, f, block);
28	unsigned int bit = RAND_MAP_BIT(td, f, block);
29
30	return (f->file_map[idx] & (1UL << bit)) == 0;
31}
32
33/*
34 * Mark a given offset as used in the map.
35 */
36static void mark_random_map(struct thread_data *td, struct fio_file *f,
37			    struct io_u *io_u)
38{
39	unsigned int min_bs = td->rw_min_bs;
40	unsigned long long block;
41	unsigned int blocks;
42	unsigned int nr_blocks;
43
44	block = io_u->offset / (unsigned long long) min_bs;
45	blocks = 0;
46	nr_blocks = (io_u->buflen + min_bs - 1) / min_bs;
47
48	while (blocks < nr_blocks) {
49		unsigned int idx, bit;
50
51		if (!random_map_free(td, f, block))
52			break;
53
54		idx = RAND_MAP_IDX(td, f, block);
55		bit = RAND_MAP_BIT(td, f, block);
56
57		fio_assert(td, idx < f->num_maps);
58
59		f->file_map[idx] |= (1UL << bit);
60		block++;
61		blocks++;
62	}
63
64	if ((blocks * min_bs) < io_u->buflen)
65		io_u->buflen = blocks * min_bs;
66}
67
68/*
69 * Return the next free block in the map.
70 */
71static int get_next_free_block(struct thread_data *td, struct fio_file *f,
72			       unsigned long long *b)
73{
74	int i;
75
76	i = f->last_free_lookup;
77	*b = (i * BLOCKS_PER_MAP);
78	while ((*b) * td->rw_min_bs < f->real_file_size) {
79		if (f->file_map[i] != -1UL) {
80			*b += ffz(f->file_map[i]);
81			f->last_free_lookup = i;
82			return 0;
83		}
84
85		*b += BLOCKS_PER_MAP;
86		i++;
87	}
88
89	return 1;
90}
91
92/*
93 * For random io, generate a random new block and see if it's used. Repeat
94 * until we find a free one. For sequential io, just return the end of
95 * the last io issued.
96 */
97static int get_next_offset(struct thread_data *td, struct fio_file *f,
98			   struct io_u *io_u)
99{
100	const int ddir = io_u->ddir;
101	unsigned long long b, rb;
102	long r;
103
104	if (!td->sequential) {
105		unsigned long long max_blocks = f->file_size / td->min_bs[ddir];
106		int loops = 5;
107
108		do {
109			r = os_random_long(&td->random_state);
110			b = ((max_blocks - 1) * r / (unsigned long long) (RAND_MAX+1.0));
111			if (td->norandommap)
112				break;
113			rb = b + (f->file_offset / td->min_bs[ddir]);
114			loops--;
115		} while (!random_map_free(td, f, rb) && loops);
116
117		/*
118		 * if we failed to retrieve a truly random offset within
119		 * the loops assigned, see if there are free ones left at all
120		 */
121		if (!loops && get_next_free_block(td, f, &b))
122			return 1;
123	} else
124		b = f->last_pos / td->min_bs[ddir];
125
126	io_u->offset = (b * td->min_bs[ddir]) + f->file_offset;
127	if (io_u->offset >= f->real_file_size)
128		return 1;
129
130	return 0;
131}
132
133static unsigned int get_next_buflen(struct thread_data *td, struct fio_file *f,
134				    struct io_u *io_u)
135{
136	const int ddir = io_u->ddir;
137	unsigned int buflen;
138	long r;
139
140	if (td->min_bs[ddir] == td->max_bs[ddir])
141		buflen = td->min_bs[ddir];
142	else {
143		r = os_random_long(&td->bsrange_state);
144		buflen = (unsigned int) (1 + (double) (td->max_bs[ddir] - 1) * r / (RAND_MAX + 1.0));
145		if (!td->bs_unaligned)
146			buflen = (buflen + td->min_bs[ddir] - 1) & ~(td->min_bs[ddir] - 1);
147	}
148
149	while (buflen + io_u->offset > f->real_file_size) {
150		if (buflen == td->min_bs[ddir])
151			return 0;
152
153		buflen = td->min_bs[ddir];
154	}
155
156	return buflen;
157}
158
159/*
160 * Return the data direction for the next io_u. If the job is a
161 * mixed read/write workload, check the rwmix cycle and switch if
162 * necessary.
163 */
164static enum fio_ddir get_rw_ddir(struct thread_data *td)
165{
166	if (td_rw(td)) {
167		struct timeval now;
168		unsigned long elapsed;
169
170		fio_gettime(&now, NULL);
171	 	elapsed = mtime_since_now(&td->rwmix_switch);
172
173		/*
174		 * Check if it's time to seed a new data direction.
175		 */
176		if (elapsed >= td->rwmixcycle) {
177			unsigned int v;
178			long r;
179
180			r = os_random_long(&td->rwmix_state);
181			v = 1 + (int) (100.0 * (r / (RAND_MAX + 1.0)));
182			if (v < td->rwmixread)
183				td->rwmix_ddir = DDIR_READ;
184			else
185				td->rwmix_ddir = DDIR_WRITE;
186			memcpy(&td->rwmix_switch, &now, sizeof(now));
187		}
188		return td->rwmix_ddir;
189	} else if (td_read(td))
190		return DDIR_READ;
191	else
192		return DDIR_WRITE;
193}
194
195void put_io_u(struct thread_data *td, struct io_u *io_u)
196{
197	assert((io_u->flags & IO_U_F_FREE) == 0);
198	io_u->flags |= IO_U_F_FREE;
199
200	io_u->file = NULL;
201	list_del(&io_u->list);
202	list_add(&io_u->list, &td->io_u_freelist);
203	td->cur_depth--;
204}
205
206void requeue_io_u(struct thread_data *td, struct io_u **io_u)
207{
208	struct io_u *__io_u = *io_u;
209
210	list_del(&__io_u->list);
211	list_add_tail(&__io_u->list, &td->io_u_requeues);
212	td->cur_depth--;
213	*io_u = NULL;
214}
215
216static int fill_io_u(struct thread_data *td, struct fio_file *f,
217		     struct io_u *io_u)
218{
219	/*
220	 * If using an iolog, grab next piece if any available.
221	 */
222	if (td->read_iolog)
223		return read_iolog_get(td, io_u);
224
225	/*
226	 * see if it's time to sync
227	 */
228	if (td->fsync_blocks && !(td->io_issues[DDIR_WRITE] % td->fsync_blocks)
229	    && td->io_issues[DDIR_WRITE] && should_fsync(td)) {
230		io_u->ddir = DDIR_SYNC;
231		io_u->file = f;
232		return 0;
233	}
234
235	io_u->ddir = get_rw_ddir(td);
236
237	/*
238	 * No log, let the seq/rand engine retrieve the next buflen and
239	 * position.
240	 */
241	if (get_next_offset(td, f, io_u))
242		return 1;
243
244	io_u->buflen = get_next_buflen(td, f, io_u);
245	if (!io_u->buflen)
246		return 1;
247
248	/*
249	 * mark entry before potentially trimming io_u
250	 */
251	if (!td->read_iolog && !td->sequential && !td->norandommap)
252		mark_random_map(td, f, io_u);
253
254	/*
255	 * If using a write iolog, store this entry.
256	 */
257	if (td->write_iolog_file)
258		write_iolog_put(td, io_u);
259
260	io_u->file = f;
261	return 0;
262}
263
264static void io_u_mark_depth(struct thread_data *td)
265{
266	int index = 0;
267
268	switch (td->cur_depth) {
269	default:
270		index++;
271	case 32 ... 63:
272		index++;
273	case 16 ... 31:
274		index++;
275	case 8 ... 15:
276		index++;
277	case 4 ... 7:
278		index++;
279	case 2 ... 3:
280		index++;
281	case 1:
282		break;
283	}
284
285	td->io_u_map[index]++;
286	td->total_io_u++;
287}
288
289static void io_u_mark_latency(struct thread_data *td, unsigned long msec)
290{
291	int index = 0;
292
293	switch (msec) {
294	default:
295		index++;
296	case 1000 ... 1999:
297		index++;
298	case 750 ... 999:
299		index++;
300	case 500 ... 749:
301		index++;
302	case 250 ... 499:
303		index++;
304	case 100 ... 249:
305		index++;
306	case 50 ... 99:
307		index++;
308	case 20 ... 49:
309		index++;
310	case 10 ... 19:
311		index++;
312	case 4 ... 9:
313		index++;
314	case 2 ... 3:
315		index++;
316	case 0 ... 1:
317		break;
318	}
319
320	td->io_u_lat[index]++;
321}
322
323static struct fio_file *get_next_file(struct thread_data *td)
324{
325	unsigned int old_next_file = td->next_file;
326	struct fio_file *f;
327
328	do {
329		f = &td->files[td->next_file];
330
331		td->next_file++;
332		if (td->next_file >= td->nr_files)
333			td->next_file = 0;
334
335		if (f->fd != -1)
336			break;
337
338		f = NULL;
339	} while (td->next_file != old_next_file);
340
341	return f;
342}
343
344struct io_u *__get_io_u(struct thread_data *td)
345{
346	struct io_u *io_u = NULL;
347
348	if (!list_empty(&td->io_u_requeues))
349		io_u = list_entry(td->io_u_requeues.next, struct io_u, list);
350	else if (!queue_full(td)) {
351		io_u = list_entry(td->io_u_freelist.next, struct io_u, list);
352
353		io_u->buflen = 0;
354		io_u->resid = 0;
355		io_u->file = NULL;
356	}
357
358	if (io_u) {
359		assert(io_u->flags & IO_U_F_FREE);
360		io_u->flags &= ~IO_U_F_FREE;
361
362		io_u->error = 0;
363		list_del(&io_u->list);
364		list_add(&io_u->list, &td->io_u_busylist);
365		td->cur_depth++;
366		io_u_mark_depth(td);
367	}
368
369	return io_u;
370}
371
372/*
373 * Return an io_u to be processed. Gets a buflen and offset, sets direction,
374 * etc. The returned io_u is fully ready to be prepped and submitted.
375 */
376struct io_u *get_io_u(struct thread_data *td)
377{
378	struct fio_file *f;
379	struct io_u *io_u;
380
381	io_u = __get_io_u(td);
382	if (!io_u)
383		return NULL;
384
385	/*
386	 * from a requeue, io_u already setup
387	 */
388	if (io_u->file)
389		goto out;
390
391	f = get_next_file(td);
392	if (!f) {
393		put_io_u(td, io_u);
394		return NULL;
395	}
396
397	io_u->file = f;
398
399	if (td->zone_bytes >= td->zone_size) {
400		td->zone_bytes = 0;
401		f->last_pos += td->zone_skip;
402	}
403
404	if (fill_io_u(td, f, io_u)) {
405		put_io_u(td, io_u);
406		return NULL;
407	}
408
409	if (io_u->buflen + io_u->offset > f->real_file_size) {
410		if (td->io_ops->flags & FIO_RAWIO) {
411			put_io_u(td, io_u);
412			return NULL;
413		}
414
415		io_u->buflen = f->real_file_size - io_u->offset;
416	}
417
418	if (io_u->ddir != DDIR_SYNC) {
419		if (!io_u->buflen) {
420			put_io_u(td, io_u);
421			return NULL;
422		}
423
424		f->last_pos = io_u->offset + io_u->buflen;
425
426		if (td->verify != VERIFY_NONE)
427			populate_verify_io_u(td, io_u);
428	}
429
430	/*
431	 * Set io data pointers.
432	 */
433out:
434	io_u->xfer_buf = io_u->buf;
435	io_u->xfer_buflen = io_u->buflen;
436
437	if (td_io_prep(td, io_u)) {
438		put_io_u(td, io_u);
439		return NULL;
440	}
441
442	fio_gettime(&io_u->start_time, NULL);
443	return io_u;
444}
445
446static void io_completed(struct thread_data *td, struct io_u *io_u,
447			 struct io_completion_data *icd)
448{
449	unsigned long msec;
450
451	assert(io_u->flags & IO_U_F_FLIGHT);
452	io_u->flags &= ~IO_U_F_FLIGHT;
453
454	if (io_u->ddir == DDIR_SYNC) {
455		td->last_was_sync = 1;
456		return;
457	}
458
459	td->last_was_sync = 0;
460
461	if (!io_u->error) {
462		unsigned int bytes = io_u->buflen - io_u->resid;
463		const enum fio_ddir idx = io_u->ddir;
464		int ret;
465
466		td->io_blocks[idx]++;
467		td->io_bytes[idx] += bytes;
468		td->zone_bytes += bytes;
469		td->this_io_bytes[idx] += bytes;
470
471		io_u->file->last_completed_pos = io_u->offset + io_u->buflen;
472
473		msec = mtime_since(&io_u->issue_time, &icd->time);
474
475		add_clat_sample(td, idx, msec);
476		add_bw_sample(td, idx, &icd->time);
477		io_u_mark_latency(td, msec);
478
479		if ((td_rw(td) || td_write(td)) && idx == DDIR_WRITE)
480			log_io_piece(td, io_u);
481
482		icd->bytes_done[idx] += bytes;
483
484		if (icd->handler) {
485			ret = icd->handler(io_u);
486			if (ret && !icd->error)
487				icd->error = ret;
488		}
489	} else
490		icd->error = io_u->error;
491}
492
493static void init_icd(struct io_completion_data *icd, endio_handler *handler,
494		     int nr)
495{
496	fio_gettime(&icd->time, NULL);
497
498	icd->handler = handler;
499	icd->nr = nr;
500
501	icd->error = 0;
502	icd->bytes_done[0] = icd->bytes_done[1] = 0;
503}
504
505static void ios_completed(struct thread_data *td,
506			  struct io_completion_data *icd)
507{
508	struct io_u *io_u;
509	int i;
510
511	for (i = 0; i < icd->nr; i++) {
512		io_u = td->io_ops->event(td, i);
513
514		io_completed(td, io_u, icd);
515		put_io_u(td, io_u);
516	}
517}
518
519/*
520 * Complete a single io_u for the sync engines.
521 */
522long io_u_sync_complete(struct thread_data *td, struct io_u *io_u,
523			endio_handler *handler)
524{
525	struct io_completion_data icd;
526
527	init_icd(&icd, handler, 1);
528	io_completed(td, io_u, &icd);
529	put_io_u(td, io_u);
530
531	if (!icd.error)
532		return icd.bytes_done[0] + icd.bytes_done[1];
533
534	return -1;
535}
536
537/*
538 * Called to complete min_events number of io for the async engines.
539 */
540long io_u_queued_complete(struct thread_data *td, int min_events,
541			  endio_handler *handler)
542
543{
544	struct io_completion_data icd;
545	struct timespec *tvp = NULL;
546	int ret;
547
548	if (min_events > 0) {
549		ret = td_io_commit(td);
550		if (ret < 0) {
551			td_verror(td, -ret, "td_io_commit");
552			return ret;
553		}
554	} else {
555		struct timespec ts = { .tv_sec = 0, .tv_nsec = 0, };
556
557		tvp = &ts;
558	}
559
560	ret = td_io_getevents(td, min_events, td->cur_depth, tvp);
561	if (ret < 0) {
562		td_verror(td, -ret, "td_io_getevents");
563		return ret;
564	} else if (!ret)
565		return ret;
566
567	init_icd(&icd, handler, ret);
568	ios_completed(td, &icd);
569	if (!icd.error)
570		return icd.bytes_done[0] + icd.bytes_done[1];
571
572	return -1;
573}
574
575/*
576 * Call when io_u is really queued, to update the submission latency.
577 */
578void io_u_queued(struct thread_data *td, struct io_u *io_u)
579{
580	unsigned long slat_time;
581
582	slat_time = mtime_since(&io_u->start_time, &io_u->issue_time);
583	add_slat_sample(td, io_u->ddir, slat_time);
584}
585
586#ifdef FIO_USE_TIMEOUT
587void io_u_set_timeout(struct thread_data *td)
588{
589	assert(td->cur_depth);
590
591	td->timer.it_interval.tv_sec = 0;
592	td->timer.it_interval.tv_usec = 0;
593	td->timer.it_value.tv_sec = IO_U_TIMEOUT + IO_U_TIMEOUT_INC;
594	td->timer.it_value.tv_usec = 0;
595	setitimer(ITIMER_REAL, &td->timer, NULL);
596	fio_gettime(&td->timeout_end, NULL);
597}
598#else
599void io_u_set_timeout(struct thread_data fio_unused *td)
600{
601}
602#endif
603
604#ifdef FIO_USE_TIMEOUT
605static void io_u_timeout_handler(int fio_unused sig)
606{
607	struct thread_data *td, *__td;
608	pid_t pid = getpid();
609	int i;
610
611	log_err("fio: io_u timeout\n");
612
613	/*
614	 * TLS would be nice...
615	 */
616	td = NULL;
617	for_each_td(__td, i) {
618		if (__td->pid == pid) {
619			td = __td;
620			break;
621		}
622	}
623
624	if (!td) {
625		log_err("fio: io_u timeout, can't find job\n");
626		exit(1);
627	}
628
629	if (!td->cur_depth) {
630		log_err("fio: timeout without pending work?\n");
631		return;
632	}
633
634	log_err("fio: io_u timeout: job=%s, pid=%d\n", td->name, td->pid);
635	td->error = ETIMEDOUT;
636	exit(1);
637}
638#endif
639
640void io_u_init_timeout(void)
641{
642#ifdef FIO_USE_TIMEOUT
643	signal(SIGALRM, io_u_timeout_handler);
644#endif
645}
646