io_u.c revision 9522850758ecad087891710b391b4e77f6bff839
1#include <unistd.h>
2#include <fcntl.h>
3#include <string.h>
4#include <signal.h>
5#include <time.h>
6#include <assert.h>
7
8#include "fio.h"
9#include "hash.h"
10#include "verify.h"
11#include "lib/rand.h"
12
13struct io_completion_data {
14	int nr;				/* input */
15
16	int error;			/* output */
17	unsigned long bytes_done[2];	/* output */
18	struct timeval time;		/* output */
19};
20
21/*
22 * The ->file_map[] contains a map of blocks we have or have not done io
23 * to yet. Used to make sure we cover the entire range in a fair fashion.
24 */
25static int random_map_free(struct fio_file *f, const unsigned long long block)
26{
27	unsigned int idx = RAND_MAP_IDX(f, block);
28	unsigned int bit = RAND_MAP_BIT(f, block);
29
30	dprint(FD_RANDOM, "free: b=%llu, idx=%u, bit=%u\n", block, idx, bit);
31
32	return (f->file_map[idx] & (1 << bit)) == 0;
33}
34
35/*
36 * Mark a given offset as used in the map.
37 */
38static void mark_random_map(struct thread_data *td, struct io_u *io_u)
39{
40	unsigned int min_bs = td->o.rw_min_bs;
41	struct fio_file *f = io_u->file;
42	unsigned long long block;
43	unsigned int blocks, nr_blocks;
44
45	block = (io_u->offset - f->file_offset) / (unsigned long long) min_bs;
46	nr_blocks = (io_u->buflen + min_bs - 1) / min_bs;
47	blocks = 0;
48
49	while (nr_blocks) {
50		unsigned int this_blocks, mask;
51		unsigned int idx, bit;
52
53		/*
54		 * If we have a mixed random workload, we may
55		 * encounter blocks we already did IO to.
56		 */
57		if ((td->o.ddir_nr == 1) && !random_map_free(f, block))
58			break;
59
60		idx = RAND_MAP_IDX(f, block);
61		bit = RAND_MAP_BIT(f, block);
62
63		fio_assert(td, idx < f->num_maps);
64
65		this_blocks = nr_blocks;
66		if (this_blocks + bit > BLOCKS_PER_MAP)
67			this_blocks = BLOCKS_PER_MAP - bit;
68
69		do {
70			if (this_blocks == BLOCKS_PER_MAP)
71				mask = -1U;
72			else
73				mask = ((1U << this_blocks) - 1) << bit;
74
75			if (!(f->file_map[idx] & mask))
76				break;
77
78			this_blocks--;
79		} while (this_blocks);
80
81		if (!this_blocks)
82			break;
83
84		f->file_map[idx] |= mask;
85		nr_blocks -= this_blocks;
86		blocks += this_blocks;
87		block += this_blocks;
88	}
89
90	if ((blocks * min_bs) < io_u->buflen)
91		io_u->buflen = blocks * min_bs;
92}
93
94static unsigned long long last_block(struct thread_data *td, struct fio_file *f,
95				     enum fio_ddir ddir)
96{
97	unsigned long long max_blocks;
98	unsigned long long max_size;
99
100	/*
101	 * Hmm, should we make sure that ->io_size <= ->real_file_size?
102	 */
103	max_size = f->io_size;
104	if (max_size > f->real_file_size)
105		max_size = f->real_file_size;
106
107	max_blocks = max_size / (unsigned long long) td->o.ba[ddir];
108	if (!max_blocks)
109		return 0;
110
111	return max_blocks;
112}
113
114/*
115 * Return the next free block in the map.
116 */
117static int get_next_free_block(struct thread_data *td, struct fio_file *f,
118			       enum fio_ddir ddir, unsigned long long *b)
119{
120	unsigned long long min_bs = td->o.rw_min_bs;
121	int i;
122
123	i = f->last_free_lookup;
124	*b = (i * BLOCKS_PER_MAP);
125	while ((*b) * min_bs < f->real_file_size &&
126		(*b) * min_bs < f->io_size) {
127		if (f->file_map[i] != (unsigned int) -1) {
128			*b += ffz(f->file_map[i]);
129			if (*b > last_block(td, f, ddir))
130				break;
131			f->last_free_lookup = i;
132			return 0;
133		}
134
135		*b += BLOCKS_PER_MAP;
136		i++;
137	}
138
139	dprint(FD_IO, "failed finding a free block\n");
140	return 1;
141}
142
143static int get_next_rand_offset(struct thread_data *td, struct fio_file *f,
144				enum fio_ddir ddir, unsigned long long *b)
145{
146	unsigned long long r;
147	int loops = 5;
148
149	do {
150		r = os_random_long(&td->random_state);
151		dprint(FD_RANDOM, "off rand %llu\n", r);
152		*b = (last_block(td, f, ddir) - 1)
153			* (r / ((unsigned long long) OS_RAND_MAX + 1.0));
154
155		/*
156		 * if we are not maintaining a random map, we are done.
157		 */
158		if (!file_randommap(td, f))
159			return 0;
160
161		/*
162		 * calculate map offset and check if it's free
163		 */
164		if (random_map_free(f, *b))
165			return 0;
166
167		dprint(FD_RANDOM, "get_next_rand_offset: offset %llu busy\n",
168									*b);
169	} while (--loops);
170
171	/*
172	 * we get here, if we didn't suceed in looking up a block. generate
173	 * a random start offset into the filemap, and find the first free
174	 * block from there.
175	 */
176	loops = 10;
177	do {
178		f->last_free_lookup = (f->num_maps - 1) *
179					(r / (OS_RAND_MAX + 1.0));
180		if (!get_next_free_block(td, f, ddir, b))
181			return 0;
182
183		r = os_random_long(&td->random_state);
184	} while (--loops);
185
186	/*
187	 * that didn't work either, try exhaustive search from the start
188	 */
189	f->last_free_lookup = 0;
190	return get_next_free_block(td, f, ddir, b);
191}
192
193/*
194 * For random io, generate a random new block and see if it's used. Repeat
195 * until we find a free one. For sequential io, just return the end of
196 * the last io issued.
197 */
198static int __get_next_offset(struct thread_data *td, struct io_u *io_u)
199{
200	struct fio_file *f = io_u->file;
201	unsigned long long b;
202	enum fio_ddir ddir = io_u->ddir;
203
204	if (td_random(td) && (td->o.ddir_nr && !--td->ddir_nr)) {
205		td->ddir_nr = td->o.ddir_nr;
206
207		if (get_next_rand_offset(td, f, ddir, &b)) {
208			dprint(FD_IO, "%s: getting rand offset failed\n",
209				f->file_name);
210			return 1;
211		}
212	} else {
213		if (f->last_pos >= f->real_file_size) {
214			if (!td_random(td) ||
215			     get_next_rand_offset(td, f, ddir, &b)) {
216				dprint(FD_IO, "%s: pos %llu > size %llu\n",
217						f->file_name, f->last_pos,
218						f->real_file_size);
219				return 1;
220			}
221		} else
222			b = (f->last_pos - f->file_offset) / td->o.min_bs[ddir];
223	}
224
225	io_u->offset = b * td->o.ba[ddir];
226	if (io_u->offset >= f->io_size) {
227		dprint(FD_IO, "get_next_offset: offset %llu >= io_size %llu\n",
228					io_u->offset, f->io_size);
229		return 1;
230	}
231
232	io_u->offset += f->file_offset;
233	if (io_u->offset >= f->real_file_size) {
234		dprint(FD_IO, "get_next_offset: offset %llu >= size %llu\n",
235					io_u->offset, f->real_file_size);
236		return 1;
237	}
238
239	return 0;
240}
241
242static int get_next_offset(struct thread_data *td, struct io_u *io_u)
243{
244	struct prof_io_ops *ops = &td->prof_io_ops;
245
246	if (ops->fill_io_u_off)
247		return ops->fill_io_u_off(td, io_u);
248
249	return __get_next_offset(td, io_u);
250}
251
252static unsigned int __get_next_buflen(struct thread_data *td, struct io_u *io_u)
253{
254	const int ddir = io_u->ddir;
255	unsigned int uninitialized_var(buflen);
256	unsigned int minbs, maxbs;
257	long r;
258
259	minbs = td->o.min_bs[ddir];
260	maxbs = td->o.max_bs[ddir];
261
262	if (minbs == maxbs)
263		buflen = minbs;
264	else {
265		r = os_random_long(&td->bsrange_state);
266		if (!td->o.bssplit_nr[ddir]) {
267			buflen = 1 + (unsigned int) ((double) maxbs *
268					(r / (OS_RAND_MAX + 1.0)));
269			if (buflen < minbs)
270				buflen = minbs;
271		} else {
272			long perc = 0;
273			unsigned int i;
274
275			for (i = 0; i < td->o.bssplit_nr[ddir]; i++) {
276				struct bssplit *bsp = &td->o.bssplit[ddir][i];
277
278				buflen = bsp->bs;
279				perc += bsp->perc;
280				if (r <= ((OS_RAND_MAX / 100L) * perc))
281					break;
282			}
283		}
284		if (!td->o.bs_unaligned && is_power_of_2(minbs))
285			buflen = (buflen + minbs - 1) & ~(minbs - 1);
286	}
287
288	if (io_u->offset + buflen > io_u->file->real_file_size) {
289		dprint(FD_IO, "lower buflen %u -> %u (ddir=%d)\n", buflen,
290						minbs, ddir);
291		buflen = minbs;
292	}
293
294	return buflen;
295}
296
297static unsigned int get_next_buflen(struct thread_data *td, struct io_u *io_u)
298{
299	struct prof_io_ops *ops = &td->prof_io_ops;
300
301	if (ops->fill_io_u_size)
302		return ops->fill_io_u_size(td, io_u);
303
304	return __get_next_buflen(td, io_u);
305}
306
307static void set_rwmix_bytes(struct thread_data *td)
308{
309	unsigned int diff;
310
311	/*
312	 * we do time or byte based switch. this is needed because
313	 * buffered writes may issue a lot quicker than they complete,
314	 * whereas reads do not.
315	 */
316	diff = td->o.rwmix[td->rwmix_ddir ^ 1];
317	td->rwmix_issues = (td->io_issues[td->rwmix_ddir] * diff) / 100;
318}
319
320static inline enum fio_ddir get_rand_ddir(struct thread_data *td)
321{
322	unsigned int v;
323	long r;
324
325	r = os_random_long(&td->rwmix_state);
326	v = 1 + (int) (100.0 * (r / (OS_RAND_MAX + 1.0)));
327	if (v <= td->o.rwmix[DDIR_READ])
328		return DDIR_READ;
329
330	return DDIR_WRITE;
331}
332
333static enum fio_ddir rate_ddir(struct thread_data *td, enum fio_ddir ddir)
334{
335	enum fio_ddir odir = ddir ^ 1;
336	struct timeval t;
337	long usec;
338
339	if (td->rate_pending_usleep[ddir] <= 0)
340		return ddir;
341
342	/*
343	 * We have too much pending sleep in this direction. See if we
344	 * should switch.
345	 */
346	if (td_rw(td)) {
347		/*
348		 * Other direction does not have too much pending, switch
349		 */
350		if (td->rate_pending_usleep[odir] < 100000)
351			return odir;
352
353		/*
354		 * Both directions have pending sleep. Sleep the minimum time
355		 * and deduct from both.
356		 */
357		if (td->rate_pending_usleep[ddir] <=
358			td->rate_pending_usleep[odir]) {
359			usec = td->rate_pending_usleep[ddir];
360		} else {
361			usec = td->rate_pending_usleep[odir];
362			ddir = odir;
363		}
364	} else
365		usec = td->rate_pending_usleep[ddir];
366
367	fio_gettime(&t, NULL);
368	usec_sleep(td, usec);
369	usec = utime_since_now(&t);
370
371	td->rate_pending_usleep[ddir] -= usec;
372
373	odir = ddir ^ 1;
374	if (td_rw(td) && __should_check_rate(td, odir))
375		td->rate_pending_usleep[odir] -= usec;
376
377	return ddir;
378}
379
380/*
381 * Return the data direction for the next io_u. If the job is a
382 * mixed read/write workload, check the rwmix cycle and switch if
383 * necessary.
384 */
385static enum fio_ddir get_rw_ddir(struct thread_data *td)
386{
387	enum fio_ddir ddir;
388
389	/*
390	 * see if it's time to fsync
391	 */
392	if (td->o.fsync_blocks &&
393	   !(td->io_issues[DDIR_WRITE] % td->o.fsync_blocks) &&
394	     td->io_issues[DDIR_WRITE] && should_fsync(td))
395		return DDIR_SYNC;
396
397	/*
398	 * see if it's time to fdatasync
399	 */
400	if (td->o.fdatasync_blocks &&
401	   !(td->io_issues[DDIR_WRITE] % td->o.fdatasync_blocks) &&
402	     td->io_issues[DDIR_WRITE] && should_fsync(td))
403		return DDIR_DATASYNC;
404
405	/*
406	 * see if it's time to sync_file_range
407	 */
408	if (td->sync_file_range_nr &&
409	   !(td->io_issues[DDIR_WRITE] % td->sync_file_range_nr) &&
410	     td->io_issues[DDIR_WRITE] && should_fsync(td))
411		return DDIR_SYNC_FILE_RANGE;
412
413	if (td_rw(td)) {
414		/*
415		 * Check if it's time to seed a new data direction.
416		 */
417		if (td->io_issues[td->rwmix_ddir] >= td->rwmix_issues) {
418			/*
419			 * Put a top limit on how many bytes we do for
420			 * one data direction, to avoid overflowing the
421			 * ranges too much
422			 */
423			ddir = get_rand_ddir(td);
424
425			if (ddir != td->rwmix_ddir)
426				set_rwmix_bytes(td);
427
428			td->rwmix_ddir = ddir;
429		}
430		ddir = td->rwmix_ddir;
431	} else if (td_read(td))
432		ddir = DDIR_READ;
433	else
434		ddir = DDIR_WRITE;
435
436	td->rwmix_ddir = rate_ddir(td, ddir);
437	return td->rwmix_ddir;
438}
439
440void put_file_log(struct thread_data *td, struct fio_file *f)
441{
442	int ret = put_file(td, f);
443
444	if (ret)
445		td_verror(td, ret, "file close");
446}
447
448void put_io_u(struct thread_data *td, struct io_u *io_u)
449{
450	td_io_u_lock(td);
451
452	io_u->flags |= IO_U_F_FREE;
453	io_u->flags &= ~IO_U_F_FREE_DEF;
454
455	if (io_u->file)
456		put_file_log(td, io_u->file);
457
458	io_u->file = NULL;
459	if (io_u->flags & IO_U_F_IN_CUR_DEPTH)
460		td->cur_depth--;
461	flist_del_init(&io_u->list);
462	flist_add(&io_u->list, &td->io_u_freelist);
463	td_io_u_unlock(td);
464	td_io_u_free_notify(td);
465}
466
467void clear_io_u(struct thread_data *td, struct io_u *io_u)
468{
469	io_u->flags &= ~IO_U_F_FLIGHT;
470	put_io_u(td, io_u);
471}
472
473void requeue_io_u(struct thread_data *td, struct io_u **io_u)
474{
475	struct io_u *__io_u = *io_u;
476
477	dprint(FD_IO, "requeue %p\n", __io_u);
478
479	td_io_u_lock(td);
480
481	__io_u->flags |= IO_U_F_FREE;
482	if ((__io_u->flags & IO_U_F_FLIGHT) && !ddir_sync(__io_u->ddir))
483		td->io_issues[__io_u->ddir]--;
484
485	__io_u->flags &= ~IO_U_F_FLIGHT;
486	if (__io_u->flags & IO_U_F_IN_CUR_DEPTH)
487		td->cur_depth--;
488	flist_del(&__io_u->list);
489	flist_add_tail(&__io_u->list, &td->io_u_requeues);
490	td_io_u_unlock(td);
491	*io_u = NULL;
492}
493
494static int fill_io_u(struct thread_data *td, struct io_u *io_u)
495{
496	if (td->io_ops->flags & FIO_NOIO)
497		goto out;
498
499	io_u->ddir = get_rw_ddir(td);
500
501	/*
502	 * fsync() or fdatasync(), we are done
503	 */
504	if (ddir_sync(io_u->ddir))
505		goto out;
506
507	/*
508	 * See if it's time to switch to a new zone
509	 */
510	if (td->zone_bytes >= td->o.zone_size) {
511		td->zone_bytes = 0;
512		io_u->file->last_pos += td->o.zone_skip;
513		td->io_skip_bytes += td->o.zone_skip;
514	}
515
516	/*
517	 * No log, let the seq/rand engine retrieve the next buflen and
518	 * position.
519	 */
520	if (get_next_offset(td, io_u)) {
521		dprint(FD_IO, "io_u %p, failed getting offset\n", io_u);
522		return 1;
523	}
524
525	io_u->buflen = get_next_buflen(td, io_u);
526	if (!io_u->buflen) {
527		dprint(FD_IO, "io_u %p, failed getting buflen\n", io_u);
528		return 1;
529	}
530
531	if (io_u->offset + io_u->buflen > io_u->file->real_file_size) {
532		dprint(FD_IO, "io_u %p, offset too large\n", io_u);
533		dprint(FD_IO, "  off=%llu/%lu > %llu\n", io_u->offset,
534				io_u->buflen, io_u->file->real_file_size);
535		return 1;
536	}
537
538	/*
539	 * mark entry before potentially trimming io_u
540	 */
541	if (td_random(td) && file_randommap(td, io_u->file))
542		mark_random_map(td, io_u);
543
544	/*
545	 * If using a write iolog, store this entry.
546	 */
547out:
548	dprint_io_u(io_u, "fill_io_u");
549	td->zone_bytes += io_u->buflen;
550	log_io_u(td, io_u);
551	return 0;
552}
553
554static void __io_u_mark_map(unsigned int *map, unsigned int nr)
555{
556	int index = 0;
557
558	switch (nr) {
559	default:
560		index = 6;
561		break;
562	case 33 ... 64:
563		index = 5;
564		break;
565	case 17 ... 32:
566		index = 4;
567		break;
568	case 9 ... 16:
569		index = 3;
570		break;
571	case 5 ... 8:
572		index = 2;
573		break;
574	case 1 ... 4:
575		index = 1;
576	case 0:
577		break;
578	}
579
580	map[index]++;
581}
582
583void io_u_mark_submit(struct thread_data *td, unsigned int nr)
584{
585	__io_u_mark_map(td->ts.io_u_submit, nr);
586	td->ts.total_submit++;
587}
588
589void io_u_mark_complete(struct thread_data *td, unsigned int nr)
590{
591	__io_u_mark_map(td->ts.io_u_complete, nr);
592	td->ts.total_complete++;
593}
594
595void io_u_mark_depth(struct thread_data *td, unsigned int nr)
596{
597	int index = 0;
598
599	switch (td->cur_depth) {
600	default:
601		index = 6;
602		break;
603	case 32 ... 63:
604		index = 5;
605		break;
606	case 16 ... 31:
607		index = 4;
608		break;
609	case 8 ... 15:
610		index = 3;
611		break;
612	case 4 ... 7:
613		index = 2;
614		break;
615	case 2 ... 3:
616		index = 1;
617	case 1:
618		break;
619	}
620
621	td->ts.io_u_map[index] += nr;
622}
623
624static void io_u_mark_lat_usec(struct thread_data *td, unsigned long usec)
625{
626	int index = 0;
627
628	assert(usec < 1000);
629
630	switch (usec) {
631	case 750 ... 999:
632		index = 9;
633		break;
634	case 500 ... 749:
635		index = 8;
636		break;
637	case 250 ... 499:
638		index = 7;
639		break;
640	case 100 ... 249:
641		index = 6;
642		break;
643	case 50 ... 99:
644		index = 5;
645		break;
646	case 20 ... 49:
647		index = 4;
648		break;
649	case 10 ... 19:
650		index = 3;
651		break;
652	case 4 ... 9:
653		index = 2;
654		break;
655	case 2 ... 3:
656		index = 1;
657	case 0 ... 1:
658		break;
659	}
660
661	assert(index < FIO_IO_U_LAT_U_NR);
662	td->ts.io_u_lat_u[index]++;
663}
664
665static void io_u_mark_lat_msec(struct thread_data *td, unsigned long msec)
666{
667	int index = 0;
668
669	switch (msec) {
670	default:
671		index = 11;
672		break;
673	case 1000 ... 1999:
674		index = 10;
675		break;
676	case 750 ... 999:
677		index = 9;
678		break;
679	case 500 ... 749:
680		index = 8;
681		break;
682	case 250 ... 499:
683		index = 7;
684		break;
685	case 100 ... 249:
686		index = 6;
687		break;
688	case 50 ... 99:
689		index = 5;
690		break;
691	case 20 ... 49:
692		index = 4;
693		break;
694	case 10 ... 19:
695		index = 3;
696		break;
697	case 4 ... 9:
698		index = 2;
699		break;
700	case 2 ... 3:
701		index = 1;
702	case 0 ... 1:
703		break;
704	}
705
706	assert(index < FIO_IO_U_LAT_M_NR);
707	td->ts.io_u_lat_m[index]++;
708}
709
710static void io_u_mark_latency(struct thread_data *td, unsigned long usec)
711{
712	if (usec < 1000)
713		io_u_mark_lat_usec(td, usec);
714	else
715		io_u_mark_lat_msec(td, usec / 1000);
716}
717
718/*
719 * Get next file to service by choosing one at random
720 */
721static struct fio_file *get_next_file_rand(struct thread_data *td,
722					   enum fio_file_flags goodf,
723					   enum fio_file_flags badf)
724{
725	struct fio_file *f;
726	int fno;
727
728	do {
729		long r = os_random_long(&td->next_file_state);
730		int opened = 0;
731
732		fno = (unsigned int) ((double) td->o.nr_files
733			* (r / (OS_RAND_MAX + 1.0)));
734		f = td->files[fno];
735		if (fio_file_done(f))
736			continue;
737
738		if (!fio_file_open(f)) {
739			int err;
740
741			err = td_io_open_file(td, f);
742			if (err)
743				continue;
744			opened = 1;
745		}
746
747		if ((!goodf || (f->flags & goodf)) && !(f->flags & badf)) {
748			dprint(FD_FILE, "get_next_file_rand: %p\n", f);
749			return f;
750		}
751		if (opened)
752			td_io_close_file(td, f);
753	} while (1);
754}
755
756/*
757 * Get next file to service by doing round robin between all available ones
758 */
759static struct fio_file *get_next_file_rr(struct thread_data *td, int goodf,
760					 int badf)
761{
762	unsigned int old_next_file = td->next_file;
763	struct fio_file *f;
764
765	do {
766		int opened = 0;
767
768		f = td->files[td->next_file];
769
770		td->next_file++;
771		if (td->next_file >= td->o.nr_files)
772			td->next_file = 0;
773
774		dprint(FD_FILE, "trying file %s %x\n", f->file_name, f->flags);
775		if (fio_file_done(f)) {
776			f = NULL;
777			continue;
778		}
779
780		if (!fio_file_open(f)) {
781			int err;
782
783			err = td_io_open_file(td, f);
784			if (err) {
785				dprint(FD_FILE, "error %d on open of %s\n",
786					err, f->file_name);
787				f = NULL;
788				continue;
789			}
790			opened = 1;
791		}
792
793		dprint(FD_FILE, "goodf=%x, badf=%x, ff=%x\n", goodf, badf,
794								f->flags);
795		if ((!goodf || (f->flags & goodf)) && !(f->flags & badf))
796			break;
797
798		if (opened)
799			td_io_close_file(td, f);
800
801		f = NULL;
802	} while (td->next_file != old_next_file);
803
804	dprint(FD_FILE, "get_next_file_rr: %p\n", f);
805	return f;
806}
807
808static struct fio_file *__get_next_file(struct thread_data *td)
809{
810	struct fio_file *f;
811
812	assert(td->o.nr_files <= td->files_index);
813
814	if (td->nr_done_files >= td->o.nr_files) {
815		dprint(FD_FILE, "get_next_file: nr_open=%d, nr_done=%d,"
816				" nr_files=%d\n", td->nr_open_files,
817						  td->nr_done_files,
818						  td->o.nr_files);
819		return NULL;
820	}
821
822	f = td->file_service_file;
823	if (f && fio_file_open(f) && !fio_file_closing(f)) {
824		if (td->o.file_service_type == FIO_FSERVICE_SEQ)
825			goto out;
826		if (td->file_service_left--)
827			goto out;
828	}
829
830	if (td->o.file_service_type == FIO_FSERVICE_RR ||
831	    td->o.file_service_type == FIO_FSERVICE_SEQ)
832		f = get_next_file_rr(td, FIO_FILE_open, FIO_FILE_closing);
833	else
834		f = get_next_file_rand(td, FIO_FILE_open, FIO_FILE_closing);
835
836	td->file_service_file = f;
837	td->file_service_left = td->file_service_nr - 1;
838out:
839	dprint(FD_FILE, "get_next_file: %p [%s]\n", f, f->file_name);
840	return f;
841}
842
843static struct fio_file *get_next_file(struct thread_data *td)
844{
845	struct prof_io_ops *ops = &td->prof_io_ops;
846
847	if (ops->get_next_file)
848		return ops->get_next_file(td);
849
850	return __get_next_file(td);
851}
852
853static int set_io_u_file(struct thread_data *td, struct io_u *io_u)
854{
855	struct fio_file *f;
856
857	do {
858		f = get_next_file(td);
859		if (!f)
860			return 1;
861
862		io_u->file = f;
863		get_file(f);
864
865		if (!fill_io_u(td, io_u))
866			break;
867
868		put_file_log(td, f);
869		td_io_close_file(td, f);
870		io_u->file = NULL;
871		fio_file_set_done(f);
872		td->nr_done_files++;
873		dprint(FD_FILE, "%s: is done (%d of %d)\n", f->file_name,
874					td->nr_done_files, td->o.nr_files);
875	} while (1);
876
877	return 0;
878}
879
880
881struct io_u *__get_io_u(struct thread_data *td)
882{
883	struct io_u *io_u = NULL;
884
885	td_io_u_lock(td);
886
887again:
888	if (!flist_empty(&td->io_u_requeues))
889		io_u = flist_entry(td->io_u_requeues.next, struct io_u, list);
890	else if (!queue_full(td)) {
891		io_u = flist_entry(td->io_u_freelist.next, struct io_u, list);
892
893		io_u->buflen = 0;
894		io_u->resid = 0;
895		io_u->file = NULL;
896		io_u->end_io = NULL;
897	}
898
899	if (io_u) {
900		assert(io_u->flags & IO_U_F_FREE);
901		io_u->flags &= ~(IO_U_F_FREE | IO_U_F_FREE_DEF);
902
903		io_u->error = 0;
904		flist_del(&io_u->list);
905		flist_add(&io_u->list, &td->io_u_busylist);
906		td->cur_depth++;
907		io_u->flags |= IO_U_F_IN_CUR_DEPTH;
908	} else if (td->o.verify_async) {
909		/*
910		 * We ran out, wait for async verify threads to finish and
911		 * return one
912		 */
913		pthread_cond_wait(&td->free_cond, &td->io_u_lock);
914		goto again;
915	}
916
917	td_io_u_unlock(td);
918	return io_u;
919}
920
921/*
922 * Return an io_u to be processed. Gets a buflen and offset, sets direction,
923 * etc. The returned io_u is fully ready to be prepped and submitted.
924 */
925struct io_u *get_io_u(struct thread_data *td)
926{
927	struct fio_file *f;
928	struct io_u *io_u;
929
930	io_u = __get_io_u(td);
931	if (!io_u) {
932		dprint(FD_IO, "__get_io_u failed\n");
933		return NULL;
934	}
935
936	if (td->o.verify_backlog && td->io_hist_len) {
937		int get_verify = 0;
938
939		if (td->verify_batch) {
940			td->verify_batch--;
941			get_verify = 1;
942		} else if (!(td->io_hist_len % td->o.verify_backlog) &&
943			 td->last_ddir != DDIR_READ) {
944			td->verify_batch = td->o.verify_batch;
945			if (!td->verify_batch)
946				td->verify_batch = td->o.verify_backlog;
947			get_verify = 1;
948		}
949
950		if (get_verify && !get_next_verify(td, io_u))
951			goto out;
952	}
953
954	/*
955	 * from a requeue, io_u already setup
956	 */
957	if (io_u->file)
958		goto out;
959
960	/*
961	 * If using an iolog, grab next piece if any available.
962	 */
963	if (td->o.read_iolog_file) {
964		if (read_iolog_get(td, io_u))
965			goto err_put;
966	} else if (set_io_u_file(td, io_u)) {
967		dprint(FD_IO, "io_u %p, setting file failed\n", io_u);
968		goto err_put;
969	}
970
971	f = io_u->file;
972	assert(fio_file_open(f));
973
974	if (!ddir_sync(io_u->ddir)) {
975		if (!io_u->buflen && !(td->io_ops->flags & FIO_NOIO)) {
976			dprint(FD_IO, "get_io_u: zero buflen on %p\n", io_u);
977			goto err_put;
978		}
979
980		f->last_pos = io_u->offset + io_u->buflen;
981
982		if (td->o.verify != VERIFY_NONE && io_u->ddir == DDIR_WRITE)
983			populate_verify_io_u(td, io_u);
984		else if (td->o.refill_buffers && io_u->ddir == DDIR_WRITE)
985			io_u_fill_buffer(td, io_u, io_u->xfer_buflen);
986		else if (io_u->ddir == DDIR_READ) {
987			/*
988			 * Reset the buf_filled parameters so next time if the
989			 * buffer is used for writes it is refilled.
990			 */
991			io_u->flags &= ~IO_U_F_FILLED;
992			io_u->buf_filled_len = 0;
993		}
994	}
995
996	/*
997	 * Set io data pointers.
998	 */
999	io_u->xfer_buf = io_u->buf;
1000	io_u->xfer_buflen = io_u->buflen;
1001
1002out:
1003	if (!td_io_prep(td, io_u)) {
1004		if (!td->o.disable_slat)
1005			fio_gettime(&io_u->start_time, NULL);
1006		return io_u;
1007	}
1008err_put:
1009	dprint(FD_IO, "get_io_u failed\n");
1010	put_io_u(td, io_u);
1011	return NULL;
1012}
1013
1014void io_u_log_error(struct thread_data *td, struct io_u *io_u)
1015{
1016	const char *msg[] = { "read", "write", "sync" };
1017
1018	log_err("fio: io_u error");
1019
1020	if (io_u->file)
1021		log_err(" on file %s", io_u->file->file_name);
1022
1023	log_err(": %s\n", strerror(io_u->error));
1024
1025	log_err("     %s offset=%llu, buflen=%lu\n", msg[io_u->ddir],
1026					io_u->offset, io_u->xfer_buflen);
1027
1028	if (!td->error)
1029		td_verror(td, io_u->error, "io_u error");
1030}
1031
1032static void io_completed(struct thread_data *td, struct io_u *io_u,
1033			 struct io_completion_data *icd)
1034{
1035	/*
1036	 * Older gcc's are too dumb to realize that usec is always used
1037	 * initialized, silence that warning.
1038	 */
1039	unsigned long uninitialized_var(usec);
1040	struct fio_file *f;
1041
1042	dprint_io_u(io_u, "io complete");
1043
1044	td_io_u_lock(td);
1045	assert(io_u->flags & IO_U_F_FLIGHT);
1046	io_u->flags &= ~IO_U_F_FLIGHT;
1047	td_io_u_unlock(td);
1048
1049	if (ddir_sync(io_u->ddir)) {
1050		td->last_was_sync = 1;
1051		f = io_u->file;
1052		if (f) {
1053			f->first_write = -1ULL;
1054			f->last_write = -1ULL;
1055		}
1056		return;
1057	}
1058
1059	td->last_was_sync = 0;
1060	td->last_ddir = io_u->ddir;
1061
1062	if (!io_u->error) {
1063		unsigned int bytes = io_u->buflen - io_u->resid;
1064		const enum fio_ddir idx = io_u->ddir;
1065		const enum fio_ddir odx = io_u->ddir ^ 1;
1066		int ret;
1067
1068		td->io_blocks[idx]++;
1069		td->io_bytes[idx] += bytes;
1070		td->this_io_bytes[idx] += bytes;
1071
1072		if (idx == DDIR_WRITE) {
1073			f = io_u->file;
1074			if (f) {
1075				if (f->first_write == -1ULL ||
1076				    io_u->offset < f->first_write)
1077					f->first_write = io_u->offset;
1078				if (f->last_write == -1ULL ||
1079				    ((io_u->offset + bytes) > f->last_write))
1080					f->last_write = io_u->offset + bytes;
1081			}
1082		}
1083
1084		if (ramp_time_over(td)) {
1085			unsigned long uninitialized_var(lusec);
1086
1087			if (!td->o.disable_clat || !td->o.disable_bw)
1088				lusec = utime_since(&io_u->issue_time,
1089							&icd->time);
1090			if (!td->o.disable_lat) {
1091				unsigned long tusec;
1092
1093				tusec = utime_since(&io_u->start_time,
1094							&icd->time);
1095				add_lat_sample(td, idx, tusec, bytes);
1096			}
1097			if (!td->o.disable_clat) {
1098				add_clat_sample(td, idx, lusec, bytes);
1099				io_u_mark_latency(td, lusec);
1100			}
1101			if (!td->o.disable_bw)
1102				add_bw_sample(td, idx, bytes, &icd->time);
1103			if (__should_check_rate(td, idx)) {
1104				td->rate_pending_usleep[idx] =
1105					((td->this_io_bytes[idx] *
1106					  td->rate_nsec_cycle[idx]) / 1000 -
1107					 utime_since_now(&td->start));
1108			}
1109			if (__should_check_rate(td, idx ^ 1))
1110				td->rate_pending_usleep[odx] =
1111					((td->this_io_bytes[odx] *
1112					  td->rate_nsec_cycle[odx]) / 1000 -
1113					 utime_since_now(&td->start));
1114		}
1115
1116		if (td_write(td) && idx == DDIR_WRITE &&
1117		    td->o.do_verify &&
1118		    td->o.verify != VERIFY_NONE)
1119			log_io_piece(td, io_u);
1120
1121		icd->bytes_done[idx] += bytes;
1122
1123		if (io_u->end_io) {
1124			ret = io_u->end_io(td, io_u);
1125			if (ret && !icd->error)
1126				icd->error = ret;
1127		}
1128	} else {
1129		icd->error = io_u->error;
1130		io_u_log_error(td, io_u);
1131	}
1132	if (td->o.continue_on_error && icd->error &&
1133	    td_non_fatal_error(icd->error)) {
1134		/*
1135		 * If there is a non_fatal error, then add to the error count
1136		 * and clear all the errors.
1137		 */
1138		update_error_count(td, icd->error);
1139		td_clear_error(td);
1140		icd->error = 0;
1141		io_u->error = 0;
1142	}
1143}
1144
1145static void init_icd(struct thread_data *td, struct io_completion_data *icd,
1146		     int nr)
1147{
1148	if (!td->o.disable_clat || !td->o.disable_bw)
1149		fio_gettime(&icd->time, NULL);
1150
1151	icd->nr = nr;
1152
1153	icd->error = 0;
1154	icd->bytes_done[0] = icd->bytes_done[1] = 0;
1155}
1156
1157static void ios_completed(struct thread_data *td,
1158			  struct io_completion_data *icd)
1159{
1160	struct io_u *io_u;
1161	int i;
1162
1163	for (i = 0; i < icd->nr; i++) {
1164		io_u = td->io_ops->event(td, i);
1165
1166		io_completed(td, io_u, icd);
1167
1168		if (!(io_u->flags & IO_U_F_FREE_DEF))
1169			put_io_u(td, io_u);
1170	}
1171}
1172
1173/*
1174 * Complete a single io_u for the sync engines.
1175 */
1176int io_u_sync_complete(struct thread_data *td, struct io_u *io_u,
1177		       unsigned long *bytes)
1178{
1179	struct io_completion_data icd;
1180
1181	init_icd(td, &icd, 1);
1182	io_completed(td, io_u, &icd);
1183
1184	if (!(io_u->flags & IO_U_F_FREE_DEF))
1185		put_io_u(td, io_u);
1186
1187	if (icd.error) {
1188		td_verror(td, icd.error, "io_u_sync_complete");
1189		return -1;
1190	}
1191
1192	if (bytes) {
1193		bytes[0] += icd.bytes_done[0];
1194		bytes[1] += icd.bytes_done[1];
1195	}
1196
1197	return 0;
1198}
1199
1200/*
1201 * Called to complete min_events number of io for the async engines.
1202 */
1203int io_u_queued_complete(struct thread_data *td, int min_evts,
1204			 unsigned long *bytes)
1205{
1206	struct io_completion_data icd;
1207	struct timespec *tvp = NULL;
1208	int ret;
1209	struct timespec ts = { .tv_sec = 0, .tv_nsec = 0, };
1210
1211	dprint(FD_IO, "io_u_queued_completed: min=%d\n", min_evts);
1212
1213	if (!min_evts)
1214		tvp = &ts;
1215
1216	ret = td_io_getevents(td, min_evts, td->o.iodepth_batch_complete, tvp);
1217	if (ret < 0) {
1218		td_verror(td, -ret, "td_io_getevents");
1219		return ret;
1220	} else if (!ret)
1221		return ret;
1222
1223	init_icd(td, &icd, ret);
1224	ios_completed(td, &icd);
1225	if (icd.error) {
1226		td_verror(td, icd.error, "io_u_queued_complete");
1227		return -1;
1228	}
1229
1230	if (bytes) {
1231		bytes[0] += icd.bytes_done[0];
1232		bytes[1] += icd.bytes_done[1];
1233	}
1234
1235	return 0;
1236}
1237
1238/*
1239 * Call when io_u is really queued, to update the submission latency.
1240 */
1241void io_u_queued(struct thread_data *td, struct io_u *io_u)
1242{
1243	if (!td->o.disable_slat) {
1244		unsigned long slat_time;
1245
1246		slat_time = utime_since(&io_u->start_time, &io_u->issue_time);
1247		add_slat_sample(td, io_u->ddir, slat_time, io_u->xfer_buflen);
1248	}
1249}
1250
1251/*
1252 * "randomly" fill the buffer contents
1253 */
1254void io_u_fill_buffer(struct thread_data *td, struct io_u *io_u,
1255		      unsigned int max_bs)
1256{
1257	if (!td->o.zero_buffers)
1258		fill_random_buf(io_u->buf, max_bs);
1259	else
1260		memset(io_u->buf, 0, max_bs);
1261}
1262