io_u.c revision 9520ebb9f4dd88d086e313ae97e37ebb6d4f240b
1#include <unistd.h>
2#include <fcntl.h>
3#include <string.h>
4#include <signal.h>
5#include <time.h>
6#include <assert.h>
7
8#include "fio.h"
9#include "hash.h"
10
11struct io_completion_data {
12	int nr;				/* input */
13
14	int error;			/* output */
15	unsigned long bytes_done[2];	/* output */
16	struct timeval time;		/* output */
17};
18
19/*
20 * The ->file_map[] contains a map of blocks we have or have not done io
21 * to yet. Used to make sure we cover the entire range in a fair fashion.
22 */
23static int random_map_free(struct fio_file *f, const unsigned long long block)
24{
25	unsigned int idx = RAND_MAP_IDX(f, block);
26	unsigned int bit = RAND_MAP_BIT(f, block);
27
28	dprint(FD_RANDOM, "free: b=%llu, idx=%u, bit=%u\n", block, idx, bit);
29
30	return (f->file_map[idx] & (1 << bit)) == 0;
31}
32
33/*
34 * Mark a given offset as used in the map.
35 */
36static void mark_random_map(struct thread_data *td, struct io_u *io_u)
37{
38	unsigned int min_bs = td->o.rw_min_bs;
39	struct fio_file *f = io_u->file;
40	unsigned long long block;
41	unsigned int blocks, nr_blocks;
42
43	block = (io_u->offset - f->file_offset) / (unsigned long long) min_bs;
44	nr_blocks = (io_u->buflen + min_bs - 1) / min_bs;
45	blocks = 0;
46
47	while (nr_blocks) {
48		unsigned int this_blocks, mask;
49		unsigned int idx, bit;
50
51		/*
52		 * If we have a mixed random workload, we may
53		 * encounter blocks we already did IO to.
54		 */
55		if ((td->o.ddir_nr == 1) && !random_map_free(f, block)) {
56			if (!blocks)
57				blocks = 1;
58			break;
59		}
60
61		idx = RAND_MAP_IDX(f, block);
62		bit = RAND_MAP_BIT(f, block);
63
64		fio_assert(td, idx < f->num_maps);
65
66		this_blocks = nr_blocks;
67		if (this_blocks + bit > BLOCKS_PER_MAP)
68			this_blocks = BLOCKS_PER_MAP - bit;
69
70		if (this_blocks == BLOCKS_PER_MAP)
71			mask = -1U;
72		else
73			mask = ((1U << this_blocks) - 1) << bit;
74
75		f->file_map[idx] |= mask;
76		nr_blocks -= this_blocks;
77		blocks += this_blocks;
78		block += this_blocks;
79	}
80
81	if ((blocks * min_bs) < io_u->buflen)
82		io_u->buflen = blocks * min_bs;
83}
84
85static unsigned long long last_block(struct thread_data *td, struct fio_file *f,
86				     enum fio_ddir ddir)
87{
88	unsigned long long max_blocks;
89	unsigned long long max_size;
90
91	/*
92	 * Hmm, should we make sure that ->io_size <= ->real_file_size?
93	 */
94	max_size = f->io_size;
95	if (max_size > f->real_file_size)
96		max_size = f->real_file_size;
97
98	max_blocks = max_size / (unsigned long long) td->o.min_bs[ddir];
99	if (!max_blocks)
100		return 0;
101
102	return max_blocks;
103}
104
105/*
106 * Return the next free block in the map.
107 */
108static int get_next_free_block(struct thread_data *td, struct fio_file *f,
109			       enum fio_ddir ddir, unsigned long long *b)
110{
111	unsigned long long min_bs = td->o.rw_min_bs;
112	int i;
113
114	i = f->last_free_lookup;
115	*b = (i * BLOCKS_PER_MAP);
116	while ((*b) * min_bs < f->real_file_size) {
117		if (f->file_map[i] != (unsigned int) -1) {
118			*b += ffz(f->file_map[i]);
119			if (*b > last_block(td, f, ddir))
120				break;
121			f->last_free_lookup = i;
122			return 0;
123		}
124
125		*b += BLOCKS_PER_MAP;
126		i++;
127	}
128
129	dprint(FD_IO, "failed finding a free block\n");
130	return 1;
131}
132
133static int get_next_rand_offset(struct thread_data *td, struct fio_file *f,
134				enum fio_ddir ddir, unsigned long long *b)
135{
136	unsigned long long r;
137	int loops = 5;
138
139	do {
140		r = os_random_long(&td->random_state);
141		dprint(FD_RANDOM, "off rand %llu\n", r);
142		*b = (last_block(td, f, ddir) - 1)
143			* (r / ((unsigned long long) OS_RAND_MAX + 1.0));
144
145		/*
146		 * if we are not maintaining a random map, we are done.
147		 */
148		if (!file_randommap(td, f))
149			return 0;
150
151		/*
152		 * calculate map offset and check if it's free
153		 */
154		if (random_map_free(f, *b))
155			return 0;
156
157		dprint(FD_RANDOM, "get_next_rand_offset: offset %llu busy\n",
158									*b);
159	} while (--loops);
160
161	/*
162	 * we get here, if we didn't suceed in looking up a block. generate
163	 * a random start offset into the filemap, and find the first free
164	 * block from there.
165	 */
166	loops = 10;
167	do {
168		f->last_free_lookup = (f->num_maps - 1) *
169					(r / (OS_RAND_MAX + 1.0));
170		if (!get_next_free_block(td, f, ddir, b))
171			return 0;
172
173		r = os_random_long(&td->random_state);
174	} while (--loops);
175
176	/*
177	 * that didn't work either, try exhaustive search from the start
178	 */
179	f->last_free_lookup = 0;
180	return get_next_free_block(td, f, ddir, b);
181}
182
183/*
184 * For random io, generate a random new block and see if it's used. Repeat
185 * until we find a free one. For sequential io, just return the end of
186 * the last io issued.
187 */
188static int get_next_offset(struct thread_data *td, struct io_u *io_u)
189{
190	struct fio_file *f = io_u->file;
191	unsigned long long b;
192	enum fio_ddir ddir = io_u->ddir;
193
194	if (td_random(td) && (td->o.ddir_nr && !--td->ddir_nr)) {
195		td->ddir_nr = td->o.ddir_nr;
196
197		if (get_next_rand_offset(td, f, ddir, &b))
198			return 1;
199	} else {
200		if (f->last_pos >= f->real_file_size) {
201			if (!td_random(td) ||
202			     get_next_rand_offset(td, f, ddir, &b))
203				return 1;
204		} else
205			b = (f->last_pos - f->file_offset) / td->o.min_bs[ddir];
206	}
207
208	io_u->offset = b * td->o.min_bs[ddir];
209	if (io_u->offset >= f->io_size) {
210		dprint(FD_IO, "get_next_offset: offset %llu >= io_size %llu\n",
211					io_u->offset, f->io_size);
212		return 1;
213	}
214
215	io_u->offset += f->file_offset;
216	if (io_u->offset >= f->real_file_size) {
217		dprint(FD_IO, "get_next_offset: offset %llu >= size %llu\n",
218					io_u->offset, f->real_file_size);
219		return 1;
220	}
221
222	return 0;
223}
224
225static inline int is_power_of_2(unsigned int val)
226{
227	return (val != 0 && ((val & (val - 1)) == 0));
228}
229
230static unsigned int get_next_buflen(struct thread_data *td, struct io_u *io_u)
231{
232	const int ddir = io_u->ddir;
233	unsigned int uninitialized_var(buflen);
234	unsigned int minbs, maxbs;
235	long r;
236
237	minbs = td->o.min_bs[ddir];
238	maxbs = td->o.max_bs[ddir];
239
240	if (minbs == maxbs)
241		buflen = minbs;
242	else {
243		r = os_random_long(&td->bsrange_state);
244		if (!td->o.bssplit_nr) {
245			buflen = 1 + (unsigned int) ((double) maxbs *
246					(r / (OS_RAND_MAX + 1.0)));
247			if (buflen < minbs)
248				buflen = minbs;
249		} else {
250			long perc = 0;
251			unsigned int i;
252
253			for (i = 0; i < td->o.bssplit_nr; i++) {
254				struct bssplit *bsp = &td->o.bssplit[i];
255
256				buflen = bsp->bs;
257				perc += bsp->perc;
258				if (r <= ((OS_RAND_MAX / 100L) * perc))
259					break;
260			}
261		}
262		if (!td->o.bs_unaligned && is_power_of_2(minbs))
263			buflen = (buflen + minbs - 1) & ~(minbs - 1);
264	}
265
266	if (io_u->offset + buflen > io_u->file->real_file_size) {
267		dprint(FD_IO, "lower buflen %u -> %u (ddir=%d)\n", buflen,
268						minbs, ddir);
269		buflen = minbs;
270	}
271
272	return buflen;
273}
274
275static void set_rwmix_bytes(struct thread_data *td)
276{
277	unsigned int diff;
278
279	/*
280	 * we do time or byte based switch. this is needed because
281	 * buffered writes may issue a lot quicker than they complete,
282	 * whereas reads do not.
283	 */
284	diff = td->o.rwmix[td->rwmix_ddir ^ 1];
285	td->rwmix_issues = (td->io_issues[td->rwmix_ddir] * diff) / 100;
286}
287
288static inline enum fio_ddir get_rand_ddir(struct thread_data *td)
289{
290	unsigned int v;
291	long r;
292
293	r = os_random_long(&td->rwmix_state);
294	v = 1 + (int) (100.0 * (r / (OS_RAND_MAX + 1.0)));
295	if (v <= td->o.rwmix[DDIR_READ])
296		return DDIR_READ;
297
298	return DDIR_WRITE;
299}
300
301/*
302 * Return the data direction for the next io_u. If the job is a
303 * mixed read/write workload, check the rwmix cycle and switch if
304 * necessary.
305 */
306static enum fio_ddir get_rw_ddir(struct thread_data *td)
307{
308	if (td_rw(td)) {
309		/*
310		 * Check if it's time to seed a new data direction.
311		 */
312		if (td->io_issues[td->rwmix_ddir] >= td->rwmix_issues) {
313			unsigned long long max_bytes;
314			enum fio_ddir ddir;
315
316			/*
317			 * Put a top limit on how many bytes we do for
318			 * one data direction, to avoid overflowing the
319			 * ranges too much
320			 */
321			ddir = get_rand_ddir(td);
322			max_bytes = td->this_io_bytes[ddir];
323			if (max_bytes >=
324			    (td->o.size * td->o.rwmix[ddir] / 100)) {
325				if (!td->rw_end_set[ddir]) {
326					td->rw_end_set[ddir] = 1;
327					fio_gettime(&td->rw_end[ddir], NULL);
328				}
329
330				ddir ^= 1;
331			}
332
333			if (ddir != td->rwmix_ddir)
334				set_rwmix_bytes(td);
335
336			td->rwmix_ddir = ddir;
337		}
338		return td->rwmix_ddir;
339	} else if (td_read(td))
340		return DDIR_READ;
341	else
342		return DDIR_WRITE;
343}
344
345static void put_file_log(struct thread_data *td, struct fio_file *f)
346{
347	int ret = put_file(td, f);
348
349	if (ret)
350		td_verror(td, ret, "file close");
351}
352
353void put_io_u(struct thread_data *td, struct io_u *io_u)
354{
355	assert((io_u->flags & IO_U_F_FREE) == 0);
356	io_u->flags |= IO_U_F_FREE;
357
358	if (io_u->file)
359		put_file_log(td, io_u->file);
360
361	io_u->file = NULL;
362	flist_del(&io_u->list);
363	flist_add(&io_u->list, &td->io_u_freelist);
364	td->cur_depth--;
365}
366
367void requeue_io_u(struct thread_data *td, struct io_u **io_u)
368{
369	struct io_u *__io_u = *io_u;
370
371	dprint(FD_IO, "requeue %p\n", __io_u);
372
373	__io_u->flags |= IO_U_F_FREE;
374	if ((__io_u->flags & IO_U_F_FLIGHT) && (__io_u->ddir != DDIR_SYNC))
375		td->io_issues[__io_u->ddir]--;
376
377	__io_u->flags &= ~IO_U_F_FLIGHT;
378
379	flist_del(&__io_u->list);
380	flist_add_tail(&__io_u->list, &td->io_u_requeues);
381	td->cur_depth--;
382	*io_u = NULL;
383}
384
385static int fill_io_u(struct thread_data *td, struct io_u *io_u)
386{
387	if (td->io_ops->flags & FIO_NOIO)
388		goto out;
389
390	/*
391	 * see if it's time to sync
392	 */
393	if (td->o.fsync_blocks &&
394	   !(td->io_issues[DDIR_WRITE] % td->o.fsync_blocks) &&
395	     td->io_issues[DDIR_WRITE] && should_fsync(td)) {
396		io_u->ddir = DDIR_SYNC;
397		goto out;
398	}
399
400	io_u->ddir = get_rw_ddir(td);
401
402	/*
403	 * See if it's time to switch to a new zone
404	 */
405	if (td->zone_bytes >= td->o.zone_size) {
406		td->zone_bytes = 0;
407		io_u->file->last_pos += td->o.zone_skip;
408		td->io_skip_bytes += td->o.zone_skip;
409	}
410
411	/*
412	 * No log, let the seq/rand engine retrieve the next buflen and
413	 * position.
414	 */
415	if (get_next_offset(td, io_u)) {
416		dprint(FD_IO, "io_u %p, failed getting offset\n", io_u);
417		return 1;
418	}
419
420	io_u->buflen = get_next_buflen(td, io_u);
421	if (!io_u->buflen) {
422		dprint(FD_IO, "io_u %p, failed getting buflen\n", io_u);
423		return 1;
424	}
425
426	if (io_u->offset + io_u->buflen > io_u->file->real_file_size) {
427		dprint(FD_IO, "io_u %p, offset too large\n", io_u);
428		dprint(FD_IO, "  off=%llu/%lu > %llu\n", io_u->offset,
429				io_u->buflen, io_u->file->real_file_size);
430		return 1;
431	}
432
433	/*
434	 * mark entry before potentially trimming io_u
435	 */
436	if (td_random(td) && file_randommap(td, io_u->file))
437		mark_random_map(td, io_u);
438
439	/*
440	 * If using a write iolog, store this entry.
441	 */
442out:
443	dprint_io_u(io_u, "fill_io_u");
444	td->zone_bytes += io_u->buflen;
445	log_io_u(td, io_u);
446	return 0;
447}
448
449static void __io_u_mark_map(unsigned int *map, unsigned int nr)
450{
451	int index = 0;
452
453	switch (nr) {
454	default:
455		index = 6;
456		break;
457	case 33 ... 64:
458		index = 5;
459		break;
460	case 17 ... 32:
461		index = 4;
462		break;
463	case 9 ... 16:
464		index = 3;
465		break;
466	case 5 ... 8:
467		index = 2;
468		break;
469	case 1 ... 4:
470		index = 1;
471	case 0:
472		break;
473	}
474
475	map[index]++;
476}
477
478void io_u_mark_submit(struct thread_data *td, unsigned int nr)
479{
480	__io_u_mark_map(td->ts.io_u_submit, nr);
481	td->ts.total_submit++;
482}
483
484void io_u_mark_complete(struct thread_data *td, unsigned int nr)
485{
486	__io_u_mark_map(td->ts.io_u_complete, nr);
487	td->ts.total_complete++;
488}
489
490void io_u_mark_depth(struct thread_data *td, unsigned int nr)
491{
492	int index = 0;
493
494	switch (td->cur_depth) {
495	default:
496		index = 6;
497		break;
498	case 32 ... 63:
499		index = 5;
500		break;
501	case 16 ... 31:
502		index = 4;
503		break;
504	case 8 ... 15:
505		index = 3;
506		break;
507	case 4 ... 7:
508		index = 2;
509		break;
510	case 2 ... 3:
511		index = 1;
512	case 1:
513		break;
514	}
515
516	td->ts.io_u_map[index] += nr;
517}
518
519static void io_u_mark_lat_usec(struct thread_data *td, unsigned long usec)
520{
521	int index = 0;
522
523	assert(usec < 1000);
524
525	switch (usec) {
526	case 750 ... 999:
527		index = 9;
528		break;
529	case 500 ... 749:
530		index = 8;
531		break;
532	case 250 ... 499:
533		index = 7;
534		break;
535	case 100 ... 249:
536		index = 6;
537		break;
538	case 50 ... 99:
539		index = 5;
540		break;
541	case 20 ... 49:
542		index = 4;
543		break;
544	case 10 ... 19:
545		index = 3;
546		break;
547	case 4 ... 9:
548		index = 2;
549		break;
550	case 2 ... 3:
551		index = 1;
552	case 0 ... 1:
553		break;
554	}
555
556	assert(index < FIO_IO_U_LAT_U_NR);
557	td->ts.io_u_lat_u[index]++;
558}
559
560static void io_u_mark_lat_msec(struct thread_data *td, unsigned long msec)
561{
562	int index = 0;
563
564	switch (msec) {
565	default:
566		index = 11;
567		break;
568	case 1000 ... 1999:
569		index = 10;
570		break;
571	case 750 ... 999:
572		index = 9;
573		break;
574	case 500 ... 749:
575		index = 8;
576		break;
577	case 250 ... 499:
578		index = 7;
579		break;
580	case 100 ... 249:
581		index = 6;
582		break;
583	case 50 ... 99:
584		index = 5;
585		break;
586	case 20 ... 49:
587		index = 4;
588		break;
589	case 10 ... 19:
590		index = 3;
591		break;
592	case 4 ... 9:
593		index = 2;
594		break;
595	case 2 ... 3:
596		index = 1;
597	case 0 ... 1:
598		break;
599	}
600
601	assert(index < FIO_IO_U_LAT_M_NR);
602	td->ts.io_u_lat_m[index]++;
603}
604
605static void io_u_mark_latency(struct thread_data *td, unsigned long usec)
606{
607	if (usec < 1000)
608		io_u_mark_lat_usec(td, usec);
609	else
610		io_u_mark_lat_msec(td, usec / 1000);
611}
612
613/*
614 * Get next file to service by choosing one at random
615 */
616static struct fio_file *get_next_file_rand(struct thread_data *td, int goodf,
617					   int badf)
618{
619	struct fio_file *f;
620	int fno;
621
622	do {
623		long r = os_random_long(&td->next_file_state);
624
625		fno = (unsigned int) ((double) td->o.nr_files
626			* (r / (OS_RAND_MAX + 1.0)));
627		f = td->files[fno];
628		if (f->flags & FIO_FILE_DONE)
629			continue;
630
631		if ((!goodf || (f->flags & goodf)) && !(f->flags & badf)) {
632			dprint(FD_FILE, "get_next_file_rand: %p\n", f);
633			return f;
634		}
635	} while (1);
636}
637
638/*
639 * Get next file to service by doing round robin between all available ones
640 */
641static struct fio_file *get_next_file_rr(struct thread_data *td, int goodf,
642					 int badf)
643{
644	unsigned int old_next_file = td->next_file;
645	struct fio_file *f;
646
647	do {
648		f = td->files[td->next_file];
649
650		td->next_file++;
651		if (td->next_file >= td->o.nr_files)
652			td->next_file = 0;
653
654		if (f->flags & FIO_FILE_DONE) {
655			f = NULL;
656			continue;
657		}
658
659		if ((!goodf || (f->flags & goodf)) && !(f->flags & badf))
660			break;
661
662		f = NULL;
663	} while (td->next_file != old_next_file);
664
665	dprint(FD_FILE, "get_next_file_rr: %p\n", f);
666	return f;
667}
668
669static struct fio_file *get_next_file(struct thread_data *td)
670{
671	struct fio_file *f;
672
673	assert(td->o.nr_files <= td->files_index);
674
675	if (!td->nr_open_files || td->nr_done_files >= td->o.nr_files) {
676		dprint(FD_FILE, "get_next_file: nr_open=%d, nr_done=%d,"
677				" nr_files=%d\n", td->nr_open_files,
678						  td->nr_done_files,
679						  td->o.nr_files);
680		return NULL;
681	}
682
683	f = td->file_service_file;
684	if (f && (f->flags & FIO_FILE_OPEN) && td->file_service_left--)
685		goto out;
686
687	if (td->o.file_service_type == FIO_FSERVICE_RR)
688		f = get_next_file_rr(td, FIO_FILE_OPEN, FIO_FILE_CLOSING);
689	else
690		f = get_next_file_rand(td, FIO_FILE_OPEN, FIO_FILE_CLOSING);
691
692	td->file_service_file = f;
693	td->file_service_left = td->file_service_nr - 1;
694out:
695	dprint(FD_FILE, "get_next_file: %p\n", f);
696	return f;
697}
698
699static struct fio_file *find_next_new_file(struct thread_data *td)
700{
701	struct fio_file *f;
702
703	if (!td->nr_open_files || td->nr_done_files >= td->o.nr_files)
704		return NULL;
705
706	if (td->o.file_service_type == FIO_FSERVICE_RR)
707		f = get_next_file_rr(td, 0, FIO_FILE_OPEN);
708	else
709		f = get_next_file_rand(td, 0, FIO_FILE_OPEN);
710
711	return f;
712}
713
714static int set_io_u_file(struct thread_data *td, struct io_u *io_u)
715{
716	struct fio_file *f;
717
718	do {
719		f = get_next_file(td);
720		if (!f)
721			return 1;
722
723set_file:
724		io_u->file = f;
725		get_file(f);
726
727		if (!fill_io_u(td, io_u))
728			break;
729
730		/*
731		 * optimization to prevent close/open of the same file. This
732		 * way we preserve queueing etc.
733		 */
734		if (td->o.nr_files == 1 && td->o.time_based) {
735			put_file_log(td, f);
736			fio_file_reset(f);
737			goto set_file;
738		}
739
740		/*
741		 * td_io_close() does a put_file() as well, so no need to
742		 * do that here.
743		 */
744		io_u->file = NULL;
745		td_io_close_file(td, f);
746		f->flags |= FIO_FILE_DONE;
747		td->nr_done_files++;
748
749		/*
750		 * probably not the right place to do this, but see
751		 * if we need to open a new file
752		 */
753		if (td->nr_open_files < td->o.open_files &&
754		    td->o.open_files != td->o.nr_files) {
755			f = find_next_new_file(td);
756
757			if (!f || td_io_open_file(td, f))
758				return 1;
759
760			goto set_file;
761		}
762	} while (1);
763
764	return 0;
765}
766
767
768struct io_u *__get_io_u(struct thread_data *td)
769{
770	struct io_u *io_u = NULL;
771
772	if (!flist_empty(&td->io_u_requeues))
773		io_u = flist_entry(td->io_u_requeues.next, struct io_u, list);
774	else if (!queue_full(td)) {
775		io_u = flist_entry(td->io_u_freelist.next, struct io_u, list);
776
777		io_u->buflen = 0;
778		io_u->resid = 0;
779		io_u->file = NULL;
780		io_u->end_io = NULL;
781	}
782
783	if (io_u) {
784		assert(io_u->flags & IO_U_F_FREE);
785		io_u->flags &= ~IO_U_F_FREE;
786
787		io_u->error = 0;
788		flist_del(&io_u->list);
789		flist_add(&io_u->list, &td->io_u_busylist);
790		td->cur_depth++;
791	}
792
793	return io_u;
794}
795
796/*
797 * Return an io_u to be processed. Gets a buflen and offset, sets direction,
798 * etc. The returned io_u is fully ready to be prepped and submitted.
799 */
800struct io_u *get_io_u(struct thread_data *td)
801{
802	struct fio_file *f;
803	struct io_u *io_u;
804
805	io_u = __get_io_u(td);
806	if (!io_u) {
807		dprint(FD_IO, "__get_io_u failed\n");
808		return NULL;
809	}
810
811	/*
812	 * from a requeue, io_u already setup
813	 */
814	if (io_u->file)
815		goto out;
816
817	/*
818	 * If using an iolog, grab next piece if any available.
819	 */
820	if (td->o.read_iolog_file) {
821		if (read_iolog_get(td, io_u))
822			goto err_put;
823	} else if (set_io_u_file(td, io_u)) {
824		dprint(FD_IO, "io_u %p, setting file failed\n", io_u);
825		goto err_put;
826	}
827
828	f = io_u->file;
829	assert(f->flags & FIO_FILE_OPEN);
830
831	if (io_u->ddir != DDIR_SYNC) {
832		if (!io_u->buflen && !(td->io_ops->flags & FIO_NOIO)) {
833			dprint(FD_IO, "get_io_u: zero buflen on %p\n", io_u);
834			goto err_put;
835		}
836
837		f->last_pos = io_u->offset + io_u->buflen;
838
839		if (td->o.verify != VERIFY_NONE)
840			populate_verify_io_u(td, io_u);
841		else if (td->o.refill_buffers && io_u->ddir == DDIR_WRITE)
842			io_u_fill_buffer(td, io_u, io_u->xfer_buflen);
843	}
844
845	/*
846	 * Set io data pointers.
847	 */
848	io_u->endpos = io_u->offset + io_u->buflen;
849	io_u->xfer_buf = io_u->buf;
850	io_u->xfer_buflen = io_u->buflen;
851
852out:
853	if (!td_io_prep(td, io_u)) {
854		fio_gettime(&io_u->start_time, NULL);
855		return io_u;
856	}
857err_put:
858	dprint(FD_IO, "get_io_u failed\n");
859	put_io_u(td, io_u);
860	return NULL;
861}
862
863void io_u_log_error(struct thread_data *td, struct io_u *io_u)
864{
865	const char *msg[] = { "read", "write", "sync" };
866
867	log_err("fio: io_u error");
868
869	if (io_u->file)
870		log_err(" on file %s", io_u->file->file_name);
871
872	log_err(": %s\n", strerror(io_u->error));
873
874	log_err("     %s offset=%llu, buflen=%lu\n", msg[io_u->ddir],
875					io_u->offset, io_u->xfer_buflen);
876
877	if (!td->error)
878		td_verror(td, io_u->error, "io_u error");
879}
880
881static void io_completed(struct thread_data *td, struct io_u *io_u,
882			 struct io_completion_data *icd)
883{
884	unsigned long usec;
885
886	dprint_io_u(io_u, "io complete");
887
888	assert(io_u->flags & IO_U_F_FLIGHT);
889	io_u->flags &= ~IO_U_F_FLIGHT;
890
891	if (io_u->ddir == DDIR_SYNC) {
892		td->last_was_sync = 1;
893		return;
894	}
895
896	td->last_was_sync = 0;
897
898	if (!io_u->error) {
899		unsigned int bytes = io_u->buflen - io_u->resid;
900		const enum fio_ddir idx = io_u->ddir;
901		int ret;
902
903		td->io_blocks[idx]++;
904		td->io_bytes[idx] += bytes;
905		td->this_io_bytes[idx] += bytes;
906
907		if (ramp_time_over(td)) {
908			if (!td->o.disable_clat || !td->o.disable_bw)
909				usec = utime_since(&io_u->issue_time,
910							&icd->time);
911
912			if (!td->o.disable_clat) {
913				add_clat_sample(td, idx, usec);
914				io_u_mark_latency(td, usec);
915			}
916			if (!td->o.disable_bw)
917				add_bw_sample(td, idx, &icd->time);
918		}
919
920		if (td_write(td) && idx == DDIR_WRITE &&
921		    td->o.do_verify &&
922		    td->o.verify != VERIFY_NONE)
923			log_io_piece(td, io_u);
924
925		icd->bytes_done[idx] += bytes;
926
927		if (io_u->end_io) {
928			ret = io_u->end_io(td, io_u);
929			if (ret && !icd->error)
930				icd->error = ret;
931		}
932	} else {
933		icd->error = io_u->error;
934		io_u_log_error(td, io_u);
935	}
936}
937
938static void init_icd(struct thread_data *td, struct io_completion_data *icd,
939		     int nr)
940{
941	if (!td->o.disable_clat || !td->o.disable_bw)
942		fio_gettime(&icd->time, NULL);
943
944	icd->nr = nr;
945
946	icd->error = 0;
947	icd->bytes_done[0] = icd->bytes_done[1] = 0;
948}
949
950static void ios_completed(struct thread_data *td,
951			  struct io_completion_data *icd)
952{
953	struct io_u *io_u;
954	int i;
955
956	for (i = 0; i < icd->nr; i++) {
957		io_u = td->io_ops->event(td, i);
958
959		io_completed(td, io_u, icd);
960		put_io_u(td, io_u);
961	}
962}
963
964/*
965 * Complete a single io_u for the sync engines.
966 */
967long io_u_sync_complete(struct thread_data *td, struct io_u *io_u)
968{
969	struct io_completion_data icd;
970
971	init_icd(td, &icd, 1);
972	io_completed(td, io_u, &icd);
973	put_io_u(td, io_u);
974
975	if (!icd.error)
976		return icd.bytes_done[0] + icd.bytes_done[1];
977
978	td_verror(td, icd.error, "io_u_sync_complete");
979	return -1;
980}
981
982/*
983 * Called to complete min_events number of io for the async engines.
984 */
985long io_u_queued_complete(struct thread_data *td, int min_evts)
986{
987	struct io_completion_data icd;
988	struct timespec *tvp = NULL;
989	int ret;
990	struct timespec ts = { .tv_sec = 0, .tv_nsec = 0, };
991
992	dprint(FD_IO, "io_u_queued_completed: min=%d\n", min_evts);
993
994	if (!min_evts)
995		tvp = &ts;
996
997	ret = td_io_getevents(td, min_evts, td->o.iodepth_batch_complete, tvp);
998	if (ret < 0) {
999		td_verror(td, -ret, "td_io_getevents");
1000		return ret;
1001	} else if (!ret)
1002		return ret;
1003
1004	init_icd(td, &icd, ret);
1005	ios_completed(td, &icd);
1006	if (!icd.error)
1007		return icd.bytes_done[0] + icd.bytes_done[1];
1008
1009	td_verror(td, icd.error, "io_u_queued_complete");
1010	return -1;
1011}
1012
1013/*
1014 * Call when io_u is really queued, to update the submission latency.
1015 */
1016void io_u_queued(struct thread_data *td, struct io_u *io_u)
1017{
1018	if (!td->o.disable_slat) {
1019		unsigned long slat_time;
1020
1021		slat_time = utime_since(&io_u->start_time, &io_u->issue_time);
1022		add_slat_sample(td, io_u->ddir, slat_time);
1023	}
1024}
1025
1026/*
1027 * "randomly" fill the buffer contents
1028 */
1029void io_u_fill_buffer(struct thread_data *td, struct io_u *io_u,
1030		      unsigned int max_bs)
1031{
1032	long *ptr = io_u->buf;
1033
1034	if (!td->o.zero_buffers) {
1035		while ((void *) ptr - io_u->buf < max_bs) {
1036			*ptr = rand() * GOLDEN_RATIO_PRIME;
1037			ptr++;
1038		}
1039	} else
1040		memset(ptr, 0, max_bs);
1041}
1042