backend.c revision 3b44e8bb2a96110472c3b5132d5115f903e354ed
1/*
2 * fio - the flexible io tester
3 *
4 * Copyright (C) 2005 Jens Axboe <axboe@suse.de>
5 * Copyright (C) 2006-2012 Jens Axboe <axboe@kernel.dk>
6 *
7 * The license below covers all files distributed with fio unless otherwise
8 * noted in the file itself.
9 *
10 *  This program is free software; you can redistribute it and/or modify
11 *  it under the terms of the GNU General Public License version 2 as
12 *  published by the Free Software Foundation.
13 *
14 *  This program is distributed in the hope that it will be useful,
15 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
16 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 *  GNU General Public License for more details.
18 *
19 *  You should have received a copy of the GNU General Public License
20 *  along with this program; if not, write to the Free Software
21 *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22 *
23 */
24#include <unistd.h>
25#include <fcntl.h>
26#include <string.h>
27#include <limits.h>
28#include <signal.h>
29#include <time.h>
30#include <locale.h>
31#include <assert.h>
32#include <time.h>
33#include <inttypes.h>
34#include <sys/stat.h>
35#include <sys/wait.h>
36#include <sys/ipc.h>
37#include <sys/mman.h>
38
39#include "fio.h"
40#ifndef FIO_NO_HAVE_SHM_H
41#include <sys/shm.h>
42#endif
43#include "hash.h"
44#include "smalloc.h"
45#include "verify.h"
46#include "trim.h"
47#include "diskutil.h"
48#include "cgroup.h"
49#include "profile.h"
50#include "lib/rand.h"
51#include "memalign.h"
52#include "server.h"
53#include "lib/getrusage.h"
54#include "idletime.h"
55#include "err.h"
56#include "lib/tp.h"
57
58static pthread_t disk_util_thread;
59static struct fio_mutex *disk_thread_mutex;
60static struct fio_mutex *startup_mutex;
61static struct flist_head *cgroup_list;
62static char *cgroup_mnt;
63static int exit_value;
64static volatile int fio_abort;
65static unsigned int nr_process = 0;
66static unsigned int nr_thread = 0;
67
68struct io_log *agg_io_log[DDIR_RWDIR_CNT];
69
70int groupid = 0;
71unsigned int thread_number = 0;
72unsigned int stat_number = 0;
73int shm_id = 0;
74int temp_stall_ts;
75unsigned long done_secs = 0;
76volatile int disk_util_exit = 0;
77
78#define PAGE_ALIGN(buf)	\
79	(char *) (((uintptr_t) (buf) + page_mask) & ~page_mask)
80
81#define JOB_START_TIMEOUT	(5 * 1000)
82
83static void sig_int(int sig)
84{
85	if (threads) {
86		if (is_backend)
87			fio_server_got_signal(sig);
88		else {
89			log_info("\nfio: terminating on signal %d\n", sig);
90			fflush(stdout);
91			exit_value = 128;
92		}
93
94		fio_terminate_threads(TERMINATE_ALL);
95	}
96}
97
98static void sig_show_status(int sig)
99{
100	show_running_run_stats();
101}
102
103static void set_sig_handlers(void)
104{
105	struct sigaction act;
106
107	memset(&act, 0, sizeof(act));
108	act.sa_handler = sig_int;
109	act.sa_flags = SA_RESTART;
110	sigaction(SIGINT, &act, NULL);
111
112	memset(&act, 0, sizeof(act));
113	act.sa_handler = sig_int;
114	act.sa_flags = SA_RESTART;
115	sigaction(SIGTERM, &act, NULL);
116
117/* Windows uses SIGBREAK as a quit signal from other applications */
118#ifdef WIN32
119	memset(&act, 0, sizeof(act));
120	act.sa_handler = sig_int;
121	act.sa_flags = SA_RESTART;
122	sigaction(SIGBREAK, &act, NULL);
123#endif
124
125	memset(&act, 0, sizeof(act));
126	act.sa_handler = sig_show_status;
127	act.sa_flags = SA_RESTART;
128	sigaction(SIGUSR1, &act, NULL);
129
130	if (is_backend) {
131		memset(&act, 0, sizeof(act));
132		act.sa_handler = sig_int;
133		act.sa_flags = SA_RESTART;
134		sigaction(SIGPIPE, &act, NULL);
135	}
136}
137
138/*
139 * Check if we are above the minimum rate given.
140 */
141static int __check_min_rate(struct thread_data *td, struct timeval *now,
142			    enum fio_ddir ddir)
143{
144	unsigned long long bytes = 0;
145	unsigned long iops = 0;
146	unsigned long spent;
147	unsigned long rate;
148	unsigned int ratemin = 0;
149	unsigned int rate_iops = 0;
150	unsigned int rate_iops_min = 0;
151
152	assert(ddir_rw(ddir));
153
154	if (!td->o.ratemin[ddir] && !td->o.rate_iops_min[ddir])
155		return 0;
156
157	/*
158	 * allow a 2 second settle period in the beginning
159	 */
160	if (mtime_since(&td->start, now) < 2000)
161		return 0;
162
163	iops += td->this_io_blocks[ddir];
164	bytes += td->this_io_bytes[ddir];
165	ratemin += td->o.ratemin[ddir];
166	rate_iops += td->o.rate_iops[ddir];
167	rate_iops_min += td->o.rate_iops_min[ddir];
168
169	/*
170	 * if rate blocks is set, sample is running
171	 */
172	if (td->rate_bytes[ddir] || td->rate_blocks[ddir]) {
173		spent = mtime_since(&td->lastrate[ddir], now);
174		if (spent < td->o.ratecycle)
175			return 0;
176
177		if (td->o.rate[ddir]) {
178			/*
179			 * check bandwidth specified rate
180			 */
181			if (bytes < td->rate_bytes[ddir]) {
182				log_err("%s: min rate %u not met\n", td->o.name,
183								ratemin);
184				return 1;
185			} else {
186				if (spent)
187					rate = ((bytes - td->rate_bytes[ddir]) * 1000) / spent;
188				else
189					rate = 0;
190
191				if (rate < ratemin ||
192				    bytes < td->rate_bytes[ddir]) {
193					log_err("%s: min rate %u not met, got"
194						" %luKB/sec\n", td->o.name,
195							ratemin, rate);
196					return 1;
197				}
198			}
199		} else {
200			/*
201			 * checks iops specified rate
202			 */
203			if (iops < rate_iops) {
204				log_err("%s: min iops rate %u not met\n",
205						td->o.name, rate_iops);
206				return 1;
207			} else {
208				if (spent)
209					rate = ((iops - td->rate_blocks[ddir]) * 1000) / spent;
210				else
211					rate = 0;
212
213				if (rate < rate_iops_min ||
214				    iops < td->rate_blocks[ddir]) {
215					log_err("%s: min iops rate %u not met,"
216						" got %lu\n", td->o.name,
217							rate_iops_min, rate);
218				}
219			}
220		}
221	}
222
223	td->rate_bytes[ddir] = bytes;
224	td->rate_blocks[ddir] = iops;
225	memcpy(&td->lastrate[ddir], now, sizeof(*now));
226	return 0;
227}
228
229static int check_min_rate(struct thread_data *td, struct timeval *now,
230			  uint64_t *bytes_done)
231{
232	int ret = 0;
233
234	if (bytes_done[DDIR_READ])
235		ret |= __check_min_rate(td, now, DDIR_READ);
236	if (bytes_done[DDIR_WRITE])
237		ret |= __check_min_rate(td, now, DDIR_WRITE);
238	if (bytes_done[DDIR_TRIM])
239		ret |= __check_min_rate(td, now, DDIR_TRIM);
240
241	return ret;
242}
243
244/*
245 * When job exits, we can cancel the in-flight IO if we are using async
246 * io. Attempt to do so.
247 */
248static void cleanup_pending_aio(struct thread_data *td)
249{
250	int r;
251
252	/*
253	 * get immediately available events, if any
254	 */
255	r = io_u_queued_complete(td, 0, NULL);
256	if (r < 0)
257		return;
258
259	/*
260	 * now cancel remaining active events
261	 */
262	if (td->io_ops->cancel) {
263		struct io_u *io_u;
264		int i;
265
266		io_u_qiter(&td->io_u_all, io_u, i) {
267			if (io_u->flags & IO_U_F_FLIGHT) {
268				r = td->io_ops->cancel(td, io_u);
269				if (!r)
270					put_io_u(td, io_u);
271			}
272		}
273	}
274
275	if (td->cur_depth)
276		r = io_u_queued_complete(td, td->cur_depth, NULL);
277}
278
279/*
280 * Helper to handle the final sync of a file. Works just like the normal
281 * io path, just does everything sync.
282 */
283static int fio_io_sync(struct thread_data *td, struct fio_file *f)
284{
285	struct io_u *io_u = __get_io_u(td);
286	int ret;
287
288	if (!io_u)
289		return 1;
290
291	io_u->ddir = DDIR_SYNC;
292	io_u->file = f;
293
294	if (td_io_prep(td, io_u)) {
295		put_io_u(td, io_u);
296		return 1;
297	}
298
299requeue:
300	ret = td_io_queue(td, io_u);
301	if (ret < 0) {
302		td_verror(td, io_u->error, "td_io_queue");
303		put_io_u(td, io_u);
304		return 1;
305	} else if (ret == FIO_Q_QUEUED) {
306		if (io_u_queued_complete(td, 1, NULL) < 0)
307			return 1;
308	} else if (ret == FIO_Q_COMPLETED) {
309		if (io_u->error) {
310			td_verror(td, io_u->error, "td_io_queue");
311			return 1;
312		}
313
314		if (io_u_sync_complete(td, io_u, NULL) < 0)
315			return 1;
316	} else if (ret == FIO_Q_BUSY) {
317		if (td_io_commit(td))
318			return 1;
319		goto requeue;
320	}
321
322	return 0;
323}
324
325static int fio_file_fsync(struct thread_data *td, struct fio_file *f)
326{
327	int ret;
328
329	if (fio_file_open(f))
330		return fio_io_sync(td, f);
331
332	if (td_io_open_file(td, f))
333		return 1;
334
335	ret = fio_io_sync(td, f);
336	td_io_close_file(td, f);
337	return ret;
338}
339
340static inline void __update_tv_cache(struct thread_data *td)
341{
342	fio_gettime(&td->tv_cache, NULL);
343}
344
345static inline void update_tv_cache(struct thread_data *td)
346{
347	if ((++td->tv_cache_nr & td->tv_cache_mask) == td->tv_cache_mask)
348		__update_tv_cache(td);
349}
350
351static inline int runtime_exceeded(struct thread_data *td, struct timeval *t)
352{
353	if (in_ramp_time(td))
354		return 0;
355	if (!td->o.timeout)
356		return 0;
357	if (utime_since(&td->epoch, t) >= td->o.timeout)
358		return 1;
359
360	return 0;
361}
362
363static int break_on_this_error(struct thread_data *td, enum fio_ddir ddir,
364			       int *retptr)
365{
366	int ret = *retptr;
367
368	if (ret < 0 || td->error) {
369		int err = td->error;
370		enum error_type_bit eb;
371
372		if (ret < 0)
373			err = -ret;
374
375		eb = td_error_type(ddir, err);
376		if (!(td->o.continue_on_error & (1 << eb)))
377			return 1;
378
379		if (td_non_fatal_error(td, eb, err)) {
380		        /*
381		         * Continue with the I/Os in case of
382			 * a non fatal error.
383			 */
384			update_error_count(td, err);
385			td_clear_error(td);
386			*retptr = 0;
387			return 0;
388		} else if (td->o.fill_device && err == ENOSPC) {
389			/*
390			 * We expect to hit this error if
391			 * fill_device option is set.
392			 */
393			td_clear_error(td);
394			fio_mark_td_terminate(td);
395			return 1;
396		} else {
397			/*
398			 * Stop the I/O in case of a fatal
399			 * error.
400			 */
401			update_error_count(td, err);
402			return 1;
403		}
404	}
405
406	return 0;
407}
408
409static void check_update_rusage(struct thread_data *td)
410{
411	if (td->update_rusage) {
412		td->update_rusage = 0;
413		update_rusage_stat(td);
414		fio_mutex_up(td->rusage_sem);
415	}
416}
417
418/*
419 * The main verify engine. Runs over the writes we previously submitted,
420 * reads the blocks back in, and checks the crc/md5 of the data.
421 */
422static void do_verify(struct thread_data *td, uint64_t verify_bytes)
423{
424	uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
425	struct fio_file *f;
426	struct io_u *io_u;
427	int ret, min_events;
428	unsigned int i;
429
430	dprint(FD_VERIFY, "starting loop\n");
431
432	/*
433	 * sync io first and invalidate cache, to make sure we really
434	 * read from disk.
435	 */
436	for_each_file(td, f, i) {
437		if (!fio_file_open(f))
438			continue;
439		if (fio_io_sync(td, f))
440			break;
441		if (file_invalidate_cache(td, f))
442			break;
443	}
444
445	check_update_rusage(td);
446
447	if (td->error)
448		return;
449
450	td_set_runstate(td, TD_VERIFYING);
451
452	io_u = NULL;
453	while (!td->terminate) {
454		enum fio_ddir ddir;
455		int ret2, full;
456
457		update_tv_cache(td);
458		check_update_rusage(td);
459
460		if (runtime_exceeded(td, &td->tv_cache)) {
461			__update_tv_cache(td);
462			if (runtime_exceeded(td, &td->tv_cache)) {
463				fio_mark_td_terminate(td);
464				break;
465			}
466		}
467
468		if (flow_threshold_exceeded(td))
469			continue;
470
471		if (!td->o.experimental_verify) {
472			io_u = __get_io_u(td);
473			if (!io_u)
474				break;
475
476			if (get_next_verify(td, io_u)) {
477				put_io_u(td, io_u);
478				break;
479			}
480
481			if (td_io_prep(td, io_u)) {
482				put_io_u(td, io_u);
483				break;
484			}
485		} else {
486			if (ddir_rw_sum(bytes_done) + td->o.rw_min_bs > verify_bytes)
487				break;
488
489			while ((io_u = get_io_u(td)) != NULL) {
490				if (IS_ERR(io_u)) {
491					io_u = NULL;
492					ret = FIO_Q_BUSY;
493					goto reap;
494				}
495
496				/*
497				 * We are only interested in the places where
498				 * we wrote or trimmed IOs. Turn those into
499				 * reads for verification purposes.
500				 */
501				if (io_u->ddir == DDIR_READ) {
502					/*
503					 * Pretend we issued it for rwmix
504					 * accounting
505					 */
506					td->io_issues[DDIR_READ]++;
507					put_io_u(td, io_u);
508					continue;
509				} else if (io_u->ddir == DDIR_TRIM) {
510					io_u->ddir = DDIR_READ;
511					io_u->flags |= IO_U_F_TRIMMED;
512					break;
513				} else if (io_u->ddir == DDIR_WRITE) {
514					io_u->ddir = DDIR_READ;
515					break;
516				} else {
517					put_io_u(td, io_u);
518					continue;
519				}
520			}
521
522			if (!io_u)
523				break;
524		}
525
526		if (td->o.verify_async)
527			io_u->end_io = verify_io_u_async;
528		else
529			io_u->end_io = verify_io_u;
530
531		ddir = io_u->ddir;
532
533		ret = td_io_queue(td, io_u);
534		switch (ret) {
535		case FIO_Q_COMPLETED:
536			if (io_u->error) {
537				ret = -io_u->error;
538				clear_io_u(td, io_u);
539			} else if (io_u->resid) {
540				int bytes = io_u->xfer_buflen - io_u->resid;
541
542				/*
543				 * zero read, fail
544				 */
545				if (!bytes) {
546					td_verror(td, EIO, "full resid");
547					put_io_u(td, io_u);
548					break;
549				}
550
551				io_u->xfer_buflen = io_u->resid;
552				io_u->xfer_buf += bytes;
553				io_u->offset += bytes;
554
555				if (ddir_rw(io_u->ddir))
556					td->ts.short_io_u[io_u->ddir]++;
557
558				f = io_u->file;
559				if (io_u->offset == f->real_file_size)
560					goto sync_done;
561
562				requeue_io_u(td, &io_u);
563			} else {
564sync_done:
565				ret = io_u_sync_complete(td, io_u, bytes_done);
566				if (ret < 0)
567					break;
568			}
569			continue;
570		case FIO_Q_QUEUED:
571			break;
572		case FIO_Q_BUSY:
573			requeue_io_u(td, &io_u);
574			ret2 = td_io_commit(td);
575			if (ret2 < 0)
576				ret = ret2;
577			break;
578		default:
579			assert(ret < 0);
580			td_verror(td, -ret, "td_io_queue");
581			break;
582		}
583
584		if (break_on_this_error(td, ddir, &ret))
585			break;
586
587		/*
588		 * if we can queue more, do so. but check if there are
589		 * completed io_u's first. Note that we can get BUSY even
590		 * without IO queued, if the system is resource starved.
591		 */
592reap:
593		full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth);
594		if (full || !td->o.iodepth_batch_complete) {
595			min_events = min(td->o.iodepth_batch_complete,
596					 td->cur_depth);
597			/*
598			 * if the queue is full, we MUST reap at least 1 event
599			 */
600			if (full && !min_events)
601				min_events = 1;
602
603			do {
604				/*
605				 * Reap required number of io units, if any,
606				 * and do the verification on them through
607				 * the callback handler
608				 */
609				if (io_u_queued_complete(td, min_events, bytes_done) < 0) {
610					ret = -1;
611					break;
612				}
613			} while (full && (td->cur_depth > td->o.iodepth_low));
614		}
615		if (ret < 0)
616			break;
617	}
618
619	check_update_rusage(td);
620
621	if (!td->error) {
622		min_events = td->cur_depth;
623
624		if (min_events)
625			ret = io_u_queued_complete(td, min_events, NULL);
626	} else
627		cleanup_pending_aio(td);
628
629	td_set_runstate(td, TD_RUNNING);
630
631	dprint(FD_VERIFY, "exiting loop\n");
632}
633
634static unsigned int exceeds_number_ios(struct thread_data *td)
635{
636	unsigned long long number_ios;
637
638	if (!td->o.number_ios)
639		return 0;
640
641	number_ios = ddir_rw_sum(td->this_io_blocks);
642	number_ios += td->io_u_queued + td->io_u_in_flight;
643
644	return number_ios >= td->o.number_ios;
645}
646
647static int io_bytes_exceeded(struct thread_data *td)
648{
649	unsigned long long bytes, limit;
650
651	if (td_rw(td))
652		bytes = td->this_io_bytes[DDIR_READ] + td->this_io_bytes[DDIR_WRITE];
653	else if (td_write(td))
654		bytes = td->this_io_bytes[DDIR_WRITE];
655	else if (td_read(td))
656		bytes = td->this_io_bytes[DDIR_READ];
657	else
658		bytes = td->this_io_bytes[DDIR_TRIM];
659
660	if (td->o.io_limit)
661		limit = td->o.io_limit;
662	else
663		limit = td->o.size;
664
665	return bytes >= limit || exceeds_number_ios(td);
666}
667
668/*
669 * Main IO worker function. It retrieves io_u's to process and queues
670 * and reaps them, checking for rate and errors along the way.
671 *
672 * Returns number of bytes written and trimmed.
673 */
674static uint64_t do_io(struct thread_data *td)
675{
676	uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
677	unsigned int i;
678	int ret = 0;
679	uint64_t total_bytes, bytes_issued = 0;
680
681	if (in_ramp_time(td))
682		td_set_runstate(td, TD_RAMP);
683	else
684		td_set_runstate(td, TD_RUNNING);
685
686	lat_target_init(td);
687
688	/*
689	 * If verify_backlog is enabled, we'll run the verify in this
690	 * handler as well. For that case, we may need up to twice the
691	 * amount of bytes.
692	 */
693	total_bytes = td->o.size;
694	if (td->o.verify != VERIFY_NONE &&
695	   (td_write(td) && td->o.verify_backlog))
696		total_bytes += td->o.size;
697
698	while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) ||
699		(!flist_empty(&td->trim_list)) || !io_bytes_exceeded(td) ||
700		td->o.time_based) {
701		struct timeval comp_time;
702		int min_evts = 0;
703		struct io_u *io_u;
704		int ret2, full;
705		enum fio_ddir ddir;
706
707		check_update_rusage(td);
708
709		if (td->terminate || td->done)
710			break;
711
712		update_tv_cache(td);
713
714		if (runtime_exceeded(td, &td->tv_cache)) {
715			__update_tv_cache(td);
716			if (runtime_exceeded(td, &td->tv_cache)) {
717				fio_mark_td_terminate(td);
718				break;
719			}
720		}
721
722		if (flow_threshold_exceeded(td))
723			continue;
724
725		if (bytes_issued >= total_bytes)
726			break;
727
728		io_u = get_io_u(td);
729		if (IS_ERR_OR_NULL(io_u)) {
730			int err = PTR_ERR(io_u);
731
732			io_u = NULL;
733			if (err == -EBUSY) {
734				ret = FIO_Q_BUSY;
735				goto reap;
736			}
737			if (td->o.latency_target)
738				goto reap;
739			break;
740		}
741
742		ddir = io_u->ddir;
743
744		/*
745		 * Add verification end_io handler if:
746		 *	- Asked to verify (!td_rw(td))
747		 *	- Or the io_u is from our verify list (mixed write/ver)
748		 */
749		if (td->o.verify != VERIFY_NONE && io_u->ddir == DDIR_READ &&
750		    ((io_u->flags & IO_U_F_VER_LIST) || !td_rw(td))) {
751
752			if (!td->o.verify_pattern_bytes) {
753				io_u->rand_seed = __rand(&td->__verify_state);
754				if (sizeof(int) != sizeof(long *))
755					io_u->rand_seed *= __rand(&td->__verify_state);
756			}
757
758			if (td->o.verify_async)
759				io_u->end_io = verify_io_u_async;
760			else
761				io_u->end_io = verify_io_u;
762			td_set_runstate(td, TD_VERIFYING);
763		} else if (in_ramp_time(td))
764			td_set_runstate(td, TD_RAMP);
765		else
766			td_set_runstate(td, TD_RUNNING);
767
768		/*
769		 * Always log IO before it's issued, so we know the specific
770		 * order of it. The logged unit will track when the IO has
771		 * completed.
772		 */
773		if (td_write(td) && io_u->ddir == DDIR_WRITE &&
774		    td->o.do_verify &&
775		    td->o.verify != VERIFY_NONE &&
776		    !td->o.experimental_verify)
777			log_io_piece(td, io_u);
778
779		ret = td_io_queue(td, io_u);
780		switch (ret) {
781		case FIO_Q_COMPLETED:
782			if (io_u->error) {
783				ret = -io_u->error;
784				unlog_io_piece(td, io_u);
785				clear_io_u(td, io_u);
786			} else if (io_u->resid) {
787				int bytes = io_u->xfer_buflen - io_u->resid;
788				struct fio_file *f = io_u->file;
789
790				bytes_issued += bytes;
791
792				trim_io_piece(td, io_u);
793
794				/*
795				 * zero read, fail
796				 */
797				if (!bytes) {
798					unlog_io_piece(td, io_u);
799					td_verror(td, EIO, "full resid");
800					put_io_u(td, io_u);
801					break;
802				}
803
804				io_u->xfer_buflen = io_u->resid;
805				io_u->xfer_buf += bytes;
806				io_u->offset += bytes;
807
808				if (ddir_rw(io_u->ddir))
809					td->ts.short_io_u[io_u->ddir]++;
810
811				if (io_u->offset == f->real_file_size)
812					goto sync_done;
813
814				requeue_io_u(td, &io_u);
815			} else {
816sync_done:
817				if (__should_check_rate(td, DDIR_READ) ||
818				    __should_check_rate(td, DDIR_WRITE) ||
819				    __should_check_rate(td, DDIR_TRIM))
820					fio_gettime(&comp_time, NULL);
821
822				ret = io_u_sync_complete(td, io_u, bytes_done);
823				if (ret < 0)
824					break;
825				bytes_issued += io_u->xfer_buflen;
826			}
827			break;
828		case FIO_Q_QUEUED:
829			/*
830			 * if the engine doesn't have a commit hook,
831			 * the io_u is really queued. if it does have such
832			 * a hook, it has to call io_u_queued() itself.
833			 */
834			if (td->io_ops->commit == NULL)
835				io_u_queued(td, io_u);
836			bytes_issued += io_u->xfer_buflen;
837			break;
838		case FIO_Q_BUSY:
839			unlog_io_piece(td, io_u);
840			requeue_io_u(td, &io_u);
841			ret2 = td_io_commit(td);
842			if (ret2 < 0)
843				ret = ret2;
844			break;
845		default:
846			assert(ret < 0);
847			put_io_u(td, io_u);
848			break;
849		}
850
851		if (break_on_this_error(td, ddir, &ret))
852			break;
853
854		/*
855		 * See if we need to complete some commands. Note that we
856		 * can get BUSY even without IO queued, if the system is
857		 * resource starved.
858		 */
859reap:
860		full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth);
861		if (full || !td->o.iodepth_batch_complete) {
862			min_evts = min(td->o.iodepth_batch_complete,
863					td->cur_depth);
864			/*
865			 * if the queue is full, we MUST reap at least 1 event
866			 */
867			if (full && !min_evts)
868				min_evts = 1;
869
870			if (__should_check_rate(td, DDIR_READ) ||
871			    __should_check_rate(td, DDIR_WRITE) ||
872			    __should_check_rate(td, DDIR_TRIM))
873				fio_gettime(&comp_time, NULL);
874
875			do {
876				ret = io_u_queued_complete(td, min_evts, bytes_done);
877				if (ret < 0)
878					break;
879
880			} while (full && (td->cur_depth > td->o.iodepth_low));
881		}
882
883		if (ret < 0)
884			break;
885		if (!ddir_rw_sum(bytes_done) && !(td->io_ops->flags & FIO_NOIO))
886			continue;
887
888		if (!in_ramp_time(td) && should_check_rate(td, bytes_done)) {
889			if (check_min_rate(td, &comp_time, bytes_done)) {
890				if (exitall_on_terminate)
891					fio_terminate_threads(td->groupid);
892				td_verror(td, EIO, "check_min_rate");
893				break;
894			}
895		}
896		if (!in_ramp_time(td) && td->o.latency_target)
897			lat_target_check(td);
898
899		if (td->o.thinktime) {
900			unsigned long long b;
901
902			b = ddir_rw_sum(td->io_blocks);
903			if (!(b % td->o.thinktime_blocks)) {
904				int left;
905
906				io_u_quiesce(td);
907
908				if (td->o.thinktime_spin)
909					usec_spin(td->o.thinktime_spin);
910
911				left = td->o.thinktime - td->o.thinktime_spin;
912				if (left)
913					usec_sleep(td, left);
914			}
915		}
916	}
917
918	check_update_rusage(td);
919
920	if (td->trim_entries)
921		log_err("fio: %lu trim entries leaked?\n", td->trim_entries);
922
923	if (td->o.fill_device && td->error == ENOSPC) {
924		td->error = 0;
925		fio_mark_td_terminate(td);
926	}
927	if (!td->error) {
928		struct fio_file *f;
929
930		i = td->cur_depth;
931		if (i) {
932			ret = io_u_queued_complete(td, i, bytes_done);
933			if (td->o.fill_device && td->error == ENOSPC)
934				td->error = 0;
935		}
936
937		if (should_fsync(td) && td->o.end_fsync) {
938			td_set_runstate(td, TD_FSYNCING);
939
940			for_each_file(td, f, i) {
941				if (!fio_file_fsync(td, f))
942					continue;
943
944				log_err("fio: end_fsync failed for file %s\n",
945								f->file_name);
946			}
947		}
948	} else
949		cleanup_pending_aio(td);
950
951	/*
952	 * stop job if we failed doing any IO
953	 */
954	if (!ddir_rw_sum(td->this_io_bytes))
955		td->done = 1;
956
957	return bytes_done[DDIR_WRITE] + bytes_done[DDIR_TRIM];
958}
959
960static void cleanup_io_u(struct thread_data *td)
961{
962	struct io_u *io_u;
963
964	while ((io_u = io_u_qpop(&td->io_u_freelist)) != NULL) {
965
966		if (td->io_ops->io_u_free)
967			td->io_ops->io_u_free(td, io_u);
968
969		fio_memfree(io_u, sizeof(*io_u));
970	}
971
972	free_io_mem(td);
973
974	io_u_rexit(&td->io_u_requeues);
975	io_u_qexit(&td->io_u_freelist);
976	io_u_qexit(&td->io_u_all);
977}
978
979static int init_io_u(struct thread_data *td)
980{
981	struct io_u *io_u;
982	unsigned int max_bs, min_write;
983	int cl_align, i, max_units;
984	int data_xfer = 1, err;
985	char *p;
986
987	max_units = td->o.iodepth;
988	max_bs = td_max_bs(td);
989	min_write = td->o.min_bs[DDIR_WRITE];
990	td->orig_buffer_size = (unsigned long long) max_bs
991					* (unsigned long long) max_units;
992
993	if ((td->io_ops->flags & FIO_NOIO) || !(td_read(td) || td_write(td)))
994		data_xfer = 0;
995
996	err = 0;
997	err += io_u_rinit(&td->io_u_requeues, td->o.iodepth);
998	err += io_u_qinit(&td->io_u_freelist, td->o.iodepth);
999	err += io_u_qinit(&td->io_u_all, td->o.iodepth);
1000
1001	if (err) {
1002		log_err("fio: failed setting up IO queues\n");
1003		return 1;
1004	}
1005
1006	/*
1007	 * if we may later need to do address alignment, then add any
1008	 * possible adjustment here so that we don't cause a buffer
1009	 * overflow later. this adjustment may be too much if we get
1010	 * lucky and the allocator gives us an aligned address.
1011	 */
1012	if (td->o.odirect || td->o.mem_align || td->o.oatomic ||
1013	    (td->io_ops->flags & FIO_RAWIO))
1014		td->orig_buffer_size += page_mask + td->o.mem_align;
1015
1016	if (td->o.mem_type == MEM_SHMHUGE || td->o.mem_type == MEM_MMAPHUGE) {
1017		unsigned long bs;
1018
1019		bs = td->orig_buffer_size + td->o.hugepage_size - 1;
1020		td->orig_buffer_size = bs & ~(td->o.hugepage_size - 1);
1021	}
1022
1023	if (td->orig_buffer_size != (size_t) td->orig_buffer_size) {
1024		log_err("fio: IO memory too large. Reduce max_bs or iodepth\n");
1025		return 1;
1026	}
1027
1028	if (data_xfer && allocate_io_mem(td))
1029		return 1;
1030
1031	if (td->o.odirect || td->o.mem_align || td->o.oatomic ||
1032	    (td->io_ops->flags & FIO_RAWIO))
1033		p = PAGE_ALIGN(td->orig_buffer) + td->o.mem_align;
1034	else
1035		p = td->orig_buffer;
1036
1037	cl_align = os_cache_line_size();
1038
1039	for (i = 0; i < max_units; i++) {
1040		void *ptr;
1041
1042		if (td->terminate)
1043			return 1;
1044
1045		ptr = fio_memalign(cl_align, sizeof(*io_u));
1046		if (!ptr) {
1047			log_err("fio: unable to allocate aligned memory\n");
1048			break;
1049		}
1050
1051		io_u = ptr;
1052		memset(io_u, 0, sizeof(*io_u));
1053		INIT_FLIST_HEAD(&io_u->verify_list);
1054		dprint(FD_MEM, "io_u alloc %p, index %u\n", io_u, i);
1055
1056		if (data_xfer) {
1057			io_u->buf = p;
1058			dprint(FD_MEM, "io_u %p, mem %p\n", io_u, io_u->buf);
1059
1060			if (td_write(td))
1061				io_u_fill_buffer(td, io_u, min_write, max_bs);
1062			if (td_write(td) && td->o.verify_pattern_bytes) {
1063				/*
1064				 * Fill the buffer with the pattern if we are
1065				 * going to be doing writes.
1066				 */
1067				fill_verify_pattern(td, io_u->buf, max_bs, io_u, 0, 0);
1068			}
1069		}
1070
1071		io_u->index = i;
1072		io_u->flags = IO_U_F_FREE;
1073		io_u_qpush(&td->io_u_freelist, io_u);
1074
1075		/*
1076		 * io_u never leaves this stack, used for iteration of all
1077		 * io_u buffers.
1078		 */
1079		io_u_qpush(&td->io_u_all, io_u);
1080
1081		if (td->io_ops->io_u_init) {
1082			int ret = td->io_ops->io_u_init(td, io_u);
1083
1084			if (ret) {
1085				log_err("fio: failed to init engine data: %d\n", ret);
1086				return 1;
1087			}
1088		}
1089
1090		p += max_bs;
1091	}
1092
1093	return 0;
1094}
1095
1096static int switch_ioscheduler(struct thread_data *td)
1097{
1098	char tmp[256], tmp2[128];
1099	FILE *f;
1100	int ret;
1101
1102	if (td->io_ops->flags & FIO_DISKLESSIO)
1103		return 0;
1104
1105	sprintf(tmp, "%s/queue/scheduler", td->sysfs_root);
1106
1107	f = fopen(tmp, "r+");
1108	if (!f) {
1109		if (errno == ENOENT) {
1110			log_err("fio: os or kernel doesn't support IO scheduler"
1111				" switching\n");
1112			return 0;
1113		}
1114		td_verror(td, errno, "fopen iosched");
1115		return 1;
1116	}
1117
1118	/*
1119	 * Set io scheduler.
1120	 */
1121	ret = fwrite(td->o.ioscheduler, strlen(td->o.ioscheduler), 1, f);
1122	if (ferror(f) || ret != 1) {
1123		td_verror(td, errno, "fwrite");
1124		fclose(f);
1125		return 1;
1126	}
1127
1128	rewind(f);
1129
1130	/*
1131	 * Read back and check that the selected scheduler is now the default.
1132	 */
1133	ret = fread(tmp, sizeof(tmp), 1, f);
1134	if (ferror(f) || ret < 0) {
1135		td_verror(td, errno, "fread");
1136		fclose(f);
1137		return 1;
1138	}
1139	tmp[sizeof(tmp) - 1] = '\0';
1140
1141
1142	sprintf(tmp2, "[%s]", td->o.ioscheduler);
1143	if (!strstr(tmp, tmp2)) {
1144		log_err("fio: io scheduler %s not found\n", td->o.ioscheduler);
1145		td_verror(td, EINVAL, "iosched_switch");
1146		fclose(f);
1147		return 1;
1148	}
1149
1150	fclose(f);
1151	return 0;
1152}
1153
1154static int keep_running(struct thread_data *td)
1155{
1156	unsigned long long limit;
1157
1158	if (td->done)
1159		return 0;
1160	if (td->o.time_based)
1161		return 1;
1162	if (td->o.loops) {
1163		td->o.loops--;
1164		return 1;
1165	}
1166	if (exceeds_number_ios(td))
1167		return 0;
1168
1169	if (td->o.io_limit)
1170		limit = td->o.io_limit;
1171	else
1172		limit = td->o.size;
1173
1174	if (limit != -1ULL && ddir_rw_sum(td->io_bytes) < limit) {
1175		uint64_t diff;
1176
1177		/*
1178		 * If the difference is less than the minimum IO size, we
1179		 * are done.
1180		 */
1181		diff = limit - ddir_rw_sum(td->io_bytes);
1182		if (diff < td_max_bs(td))
1183			return 0;
1184
1185		if (fio_files_done(td))
1186			return 0;
1187
1188		return 1;
1189	}
1190
1191	return 0;
1192}
1193
1194static int exec_string(struct thread_options *o, const char *string, const char *mode)
1195{
1196	int ret, newlen = strlen(string) + strlen(o->name) + strlen(mode) + 9 + 1;
1197	char *str;
1198
1199	str = malloc(newlen);
1200	sprintf(str, "%s &> %s.%s.txt", string, o->name, mode);
1201
1202	log_info("%s : Saving output of %s in %s.%s.txt\n",o->name, mode, o->name, mode);
1203	ret = system(str);
1204	if (ret == -1)
1205		log_err("fio: exec of cmd <%s> failed\n", str);
1206
1207	free(str);
1208	return ret;
1209}
1210
1211/*
1212 * Dry run to compute correct state of numberio for verification.
1213 */
1214static uint64_t do_dry_run(struct thread_data *td)
1215{
1216	uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
1217
1218	td_set_runstate(td, TD_RUNNING);
1219
1220	while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) ||
1221		(!flist_empty(&td->trim_list)) || !io_bytes_exceeded(td)) {
1222		struct io_u *io_u;
1223		int ret;
1224
1225		if (td->terminate || td->done)
1226			break;
1227
1228		io_u = get_io_u(td);
1229		if (!io_u)
1230			break;
1231
1232		io_u->flags |= IO_U_F_FLIGHT;
1233		io_u->error = 0;
1234		io_u->resid = 0;
1235		if (ddir_rw(acct_ddir(io_u)))
1236			td->io_issues[acct_ddir(io_u)]++;
1237		if (ddir_rw(io_u->ddir)) {
1238			io_u_mark_depth(td, 1);
1239			td->ts.total_io_u[io_u->ddir]++;
1240		}
1241
1242		if (td_write(td) && io_u->ddir == DDIR_WRITE &&
1243		    td->o.do_verify &&
1244		    td->o.verify != VERIFY_NONE &&
1245		    !td->o.experimental_verify)
1246			log_io_piece(td, io_u);
1247
1248		ret = io_u_sync_complete(td, io_u, bytes_done);
1249		(void) ret;
1250	}
1251
1252	return bytes_done[DDIR_WRITE] + bytes_done[DDIR_TRIM];
1253}
1254
1255/*
1256 * Entry point for the thread based jobs. The process based jobs end up
1257 * here as well, after a little setup.
1258 */
1259static void *thread_main(void *data)
1260{
1261	unsigned long long elapsed;
1262	struct thread_data *td = data;
1263	struct thread_options *o = &td->o;
1264	pthread_condattr_t attr;
1265	int clear_state;
1266	int ret;
1267
1268	if (!o->use_thread) {
1269		setsid();
1270		td->pid = getpid();
1271	} else
1272		td->pid = gettid();
1273
1274	fio_local_clock_init(o->use_thread);
1275
1276	dprint(FD_PROCESS, "jobs pid=%d started\n", (int) td->pid);
1277
1278	if (is_backend)
1279		fio_server_send_start(td);
1280
1281	INIT_FLIST_HEAD(&td->io_log_list);
1282	INIT_FLIST_HEAD(&td->io_hist_list);
1283	INIT_FLIST_HEAD(&td->verify_list);
1284	INIT_FLIST_HEAD(&td->trim_list);
1285	INIT_FLIST_HEAD(&td->next_rand_list);
1286	pthread_mutex_init(&td->io_u_lock, NULL);
1287	td->io_hist_tree = RB_ROOT;
1288
1289	pthread_condattr_init(&attr);
1290	pthread_cond_init(&td->verify_cond, &attr);
1291	pthread_cond_init(&td->free_cond, &attr);
1292
1293	td_set_runstate(td, TD_INITIALIZED);
1294	dprint(FD_MUTEX, "up startup_mutex\n");
1295	fio_mutex_up(startup_mutex);
1296	dprint(FD_MUTEX, "wait on td->mutex\n");
1297	fio_mutex_down(td->mutex);
1298	dprint(FD_MUTEX, "done waiting on td->mutex\n");
1299
1300	/*
1301	 * A new gid requires privilege, so we need to do this before setting
1302	 * the uid.
1303	 */
1304	if (o->gid != -1U && setgid(o->gid)) {
1305		td_verror(td, errno, "setgid");
1306		goto err;
1307	}
1308	if (o->uid != -1U && setuid(o->uid)) {
1309		td_verror(td, errno, "setuid");
1310		goto err;
1311	}
1312
1313	/*
1314	 * If we have a gettimeofday() thread, make sure we exclude that
1315	 * thread from this job
1316	 */
1317	if (o->gtod_cpu)
1318		fio_cpu_clear(&o->cpumask, o->gtod_cpu);
1319
1320	/*
1321	 * Set affinity first, in case it has an impact on the memory
1322	 * allocations.
1323	 */
1324	if (o->cpumask_set) {
1325		if (o->cpus_allowed_policy == FIO_CPUS_SPLIT) {
1326			ret = fio_cpus_split(&o->cpumask, td->thread_number - 1);
1327			if (!ret) {
1328				log_err("fio: no CPUs set\n");
1329				log_err("fio: Try increasing number of available CPUs\n");
1330				td_verror(td, EINVAL, "cpus_split");
1331				goto err;
1332			}
1333		}
1334		ret = fio_setaffinity(td->pid, o->cpumask);
1335		if (ret == -1) {
1336			td_verror(td, errno, "cpu_set_affinity");
1337			goto err;
1338		}
1339	}
1340
1341#ifdef CONFIG_LIBNUMA
1342	/* numa node setup */
1343	if (o->numa_cpumask_set || o->numa_memmask_set) {
1344		struct bitmask *mask;
1345		int ret;
1346
1347		if (numa_available() < 0) {
1348			td_verror(td, errno, "Does not support NUMA API\n");
1349			goto err;
1350		}
1351
1352		if (o->numa_cpumask_set) {
1353			mask = numa_parse_nodestring(o->numa_cpunodes);
1354			ret = numa_run_on_node_mask(mask);
1355			numa_free_nodemask(mask);
1356			if (ret == -1) {
1357				td_verror(td, errno, \
1358					"numa_run_on_node_mask failed\n");
1359				goto err;
1360			}
1361		}
1362
1363		if (o->numa_memmask_set) {
1364
1365			mask = NULL;
1366			if (o->numa_memnodes)
1367				mask = numa_parse_nodestring(o->numa_memnodes);
1368
1369			switch (o->numa_mem_mode) {
1370			case MPOL_INTERLEAVE:
1371				numa_set_interleave_mask(mask);
1372				break;
1373			case MPOL_BIND:
1374				numa_set_membind(mask);
1375				break;
1376			case MPOL_LOCAL:
1377				numa_set_localalloc();
1378				break;
1379			case MPOL_PREFERRED:
1380				numa_set_preferred(o->numa_mem_prefer_node);
1381				break;
1382			case MPOL_DEFAULT:
1383			default:
1384				break;
1385			}
1386
1387			if (mask)
1388				numa_free_nodemask(mask);
1389
1390		}
1391	}
1392#endif
1393
1394	if (fio_pin_memory(td))
1395		goto err;
1396
1397	/*
1398	 * May alter parameters that init_io_u() will use, so we need to
1399	 * do this first.
1400	 */
1401	if (init_iolog(td))
1402		goto err;
1403
1404	if (init_io_u(td))
1405		goto err;
1406
1407	if (o->verify_async && verify_async_init(td))
1408		goto err;
1409
1410	if (o->ioprio) {
1411		ret = ioprio_set(IOPRIO_WHO_PROCESS, 0, o->ioprio_class, o->ioprio);
1412		if (ret == -1) {
1413			td_verror(td, errno, "ioprio_set");
1414			goto err;
1415		}
1416	}
1417
1418	if (o->cgroup && cgroup_setup(td, cgroup_list, &cgroup_mnt))
1419		goto err;
1420
1421	errno = 0;
1422	if (nice(o->nice) == -1 && errno != 0) {
1423		td_verror(td, errno, "nice");
1424		goto err;
1425	}
1426
1427	if (o->ioscheduler && switch_ioscheduler(td))
1428		goto err;
1429
1430	if (!o->create_serialize && setup_files(td))
1431		goto err;
1432
1433	if (td_io_init(td))
1434		goto err;
1435
1436	if (init_random_map(td))
1437		goto err;
1438
1439	if (o->exec_prerun && exec_string(o, o->exec_prerun, (const char *)"prerun"))
1440		goto err;
1441
1442	if (o->pre_read) {
1443		if (pre_read_files(td) < 0)
1444			goto err;
1445	}
1446
1447	if (td->flags & TD_F_COMPRESS_LOG)
1448		tp_init(&td->tp_data);
1449
1450	fio_verify_init(td);
1451
1452	fio_gettime(&td->epoch, NULL);
1453	fio_getrusage(&td->ru_start);
1454	clear_state = 0;
1455	while (keep_running(td)) {
1456		uint64_t verify_bytes;
1457
1458		fio_gettime(&td->start, NULL);
1459		memcpy(&td->bw_sample_time, &td->start, sizeof(td->start));
1460		memcpy(&td->iops_sample_time, &td->start, sizeof(td->start));
1461		memcpy(&td->tv_cache, &td->start, sizeof(td->start));
1462
1463		if (o->ratemin[DDIR_READ] || o->ratemin[DDIR_WRITE] ||
1464				o->ratemin[DDIR_TRIM]) {
1465		        memcpy(&td->lastrate[DDIR_READ], &td->bw_sample_time,
1466						sizeof(td->bw_sample_time));
1467		        memcpy(&td->lastrate[DDIR_WRITE], &td->bw_sample_time,
1468						sizeof(td->bw_sample_time));
1469		        memcpy(&td->lastrate[DDIR_TRIM], &td->bw_sample_time,
1470						sizeof(td->bw_sample_time));
1471		}
1472
1473		if (clear_state)
1474			clear_io_state(td);
1475
1476		prune_io_piece_log(td);
1477
1478		if (td->o.verify_only && (td_write(td) || td_rw(td)))
1479			verify_bytes = do_dry_run(td);
1480		else
1481			verify_bytes = do_io(td);
1482
1483		clear_state = 1;
1484
1485		if (td_read(td) && td->io_bytes[DDIR_READ]) {
1486			elapsed = utime_since_now(&td->start);
1487			td->ts.runtime[DDIR_READ] += elapsed;
1488		}
1489		if (td_write(td) && td->io_bytes[DDIR_WRITE]) {
1490			elapsed = utime_since_now(&td->start);
1491			td->ts.runtime[DDIR_WRITE] += elapsed;
1492		}
1493		if (td_trim(td) && td->io_bytes[DDIR_TRIM]) {
1494			elapsed = utime_since_now(&td->start);
1495			td->ts.runtime[DDIR_TRIM] += elapsed;
1496		}
1497
1498		if (td->error || td->terminate)
1499			break;
1500
1501		if (!o->do_verify ||
1502		    o->verify == VERIFY_NONE ||
1503		    (td->io_ops->flags & FIO_UNIDIR))
1504			continue;
1505
1506		clear_io_state(td);
1507
1508		fio_gettime(&td->start, NULL);
1509
1510		do_verify(td, verify_bytes);
1511
1512		td->ts.runtime[DDIR_READ] += utime_since_now(&td->start);
1513
1514		if (td->error || td->terminate)
1515			break;
1516	}
1517
1518	update_rusage_stat(td);
1519	td->ts.runtime[DDIR_READ] = (td->ts.runtime[DDIR_READ] + 999) / 1000;
1520	td->ts.runtime[DDIR_WRITE] = (td->ts.runtime[DDIR_WRITE] + 999) / 1000;
1521	td->ts.runtime[DDIR_TRIM] = (td->ts.runtime[DDIR_TRIM] + 999) / 1000;
1522	td->ts.total_run_time = mtime_since_now(&td->epoch);
1523	td->ts.io_bytes[DDIR_READ] = td->io_bytes[DDIR_READ];
1524	td->ts.io_bytes[DDIR_WRITE] = td->io_bytes[DDIR_WRITE];
1525	td->ts.io_bytes[DDIR_TRIM] = td->io_bytes[DDIR_TRIM];
1526
1527	fio_unpin_memory(td);
1528
1529	fio_writeout_logs(td);
1530
1531	if (td->flags & TD_F_COMPRESS_LOG)
1532		tp_exit(&td->tp_data);
1533
1534	if (o->exec_postrun)
1535		exec_string(o, o->exec_postrun, (const char *)"postrun");
1536
1537	if (exitall_on_terminate)
1538		fio_terminate_threads(td->groupid);
1539
1540err:
1541	if (td->error)
1542		log_info("fio: pid=%d, err=%d/%s\n", (int) td->pid, td->error,
1543							td->verror);
1544
1545	if (o->verify_async)
1546		verify_async_exit(td);
1547
1548	close_and_free_files(td);
1549	cleanup_io_u(td);
1550	close_ioengine(td);
1551	cgroup_shutdown(td, &cgroup_mnt);
1552
1553	if (o->cpumask_set) {
1554		int ret = fio_cpuset_exit(&o->cpumask);
1555
1556		td_verror(td, ret, "fio_cpuset_exit");
1557	}
1558
1559	/*
1560	 * do this very late, it will log file closing as well
1561	 */
1562	if (o->write_iolog_file)
1563		write_iolog_close(td);
1564
1565	fio_mutex_remove(td->rusage_sem);
1566	td->rusage_sem = NULL;
1567
1568	fio_mutex_remove(td->mutex);
1569	td->mutex = NULL;
1570
1571	td_set_runstate(td, TD_EXITED);
1572	return (void *) (uintptr_t) td->error;
1573}
1574
1575
1576/*
1577 * We cannot pass the td data into a forked process, so attach the td and
1578 * pass it to the thread worker.
1579 */
1580static int fork_main(int shmid, int offset)
1581{
1582	struct thread_data *td;
1583	void *data, *ret;
1584
1585#if !defined(__hpux) && !defined(CONFIG_NO_SHM)
1586	data = shmat(shmid, NULL, 0);
1587	if (data == (void *) -1) {
1588		int __err = errno;
1589
1590		perror("shmat");
1591		return __err;
1592	}
1593#else
1594	/*
1595	 * HP-UX inherits shm mappings?
1596	 */
1597	data = threads;
1598#endif
1599
1600	td = data + offset * sizeof(struct thread_data);
1601	ret = thread_main(td);
1602	shmdt(data);
1603	return (int) (uintptr_t) ret;
1604}
1605
1606static void dump_td_info(struct thread_data *td)
1607{
1608	log_err("fio: job '%s' hasn't exited in %lu seconds, it appears to "
1609		"be stuck. Doing forceful exit of this job.\n", td->o.name,
1610			(unsigned long) time_since_now(&td->terminate_time));
1611}
1612
1613/*
1614 * Run over the job map and reap the threads that have exited, if any.
1615 */
1616static void reap_threads(unsigned int *nr_running, unsigned int *t_rate,
1617			 unsigned int *m_rate)
1618{
1619	struct thread_data *td;
1620	unsigned int cputhreads, realthreads, pending;
1621	int i, status, ret;
1622
1623	/*
1624	 * reap exited threads (TD_EXITED -> TD_REAPED)
1625	 */
1626	realthreads = pending = cputhreads = 0;
1627	for_each_td(td, i) {
1628		int flags = 0;
1629
1630		/*
1631		 * ->io_ops is NULL for a thread that has closed its
1632		 * io engine
1633		 */
1634		if (td->io_ops && !strcmp(td->io_ops->name, "cpuio"))
1635			cputhreads++;
1636		else
1637			realthreads++;
1638
1639		if (!td->pid) {
1640			pending++;
1641			continue;
1642		}
1643		if (td->runstate == TD_REAPED)
1644			continue;
1645		if (td->o.use_thread) {
1646			if (td->runstate == TD_EXITED) {
1647				td_set_runstate(td, TD_REAPED);
1648				goto reaped;
1649			}
1650			continue;
1651		}
1652
1653		flags = WNOHANG;
1654		if (td->runstate == TD_EXITED)
1655			flags = 0;
1656
1657		/*
1658		 * check if someone quit or got killed in an unusual way
1659		 */
1660		ret = waitpid(td->pid, &status, flags);
1661		if (ret < 0) {
1662			if (errno == ECHILD) {
1663				log_err("fio: pid=%d disappeared %d\n",
1664						(int) td->pid, td->runstate);
1665				td->sig = ECHILD;
1666				td_set_runstate(td, TD_REAPED);
1667				goto reaped;
1668			}
1669			perror("waitpid");
1670		} else if (ret == td->pid) {
1671			if (WIFSIGNALED(status)) {
1672				int sig = WTERMSIG(status);
1673
1674				if (sig != SIGTERM && sig != SIGUSR2)
1675					log_err("fio: pid=%d, got signal=%d\n",
1676							(int) td->pid, sig);
1677				td->sig = sig;
1678				td_set_runstate(td, TD_REAPED);
1679				goto reaped;
1680			}
1681			if (WIFEXITED(status)) {
1682				if (WEXITSTATUS(status) && !td->error)
1683					td->error = WEXITSTATUS(status);
1684
1685				td_set_runstate(td, TD_REAPED);
1686				goto reaped;
1687			}
1688		}
1689
1690		/*
1691		 * If the job is stuck, do a forceful timeout of it and
1692		 * move on.
1693		 */
1694		if (td->terminate &&
1695		    time_since_now(&td->terminate_time) >= FIO_REAP_TIMEOUT) {
1696			dump_td_info(td);
1697			td_set_runstate(td, TD_REAPED);
1698			goto reaped;
1699		}
1700
1701		/*
1702		 * thread is not dead, continue
1703		 */
1704		pending++;
1705		continue;
1706reaped:
1707		(*nr_running)--;
1708		(*m_rate) -= ddir_rw_sum(td->o.ratemin);
1709		(*t_rate) -= ddir_rw_sum(td->o.rate);
1710		if (!td->pid)
1711			pending--;
1712
1713		if (td->error)
1714			exit_value++;
1715
1716		done_secs += mtime_since_now(&td->epoch) / 1000;
1717		profile_td_exit(td);
1718	}
1719
1720	if (*nr_running == cputhreads && !pending && realthreads)
1721		fio_terminate_threads(TERMINATE_ALL);
1722}
1723
1724static void do_usleep(unsigned int usecs)
1725{
1726	check_for_running_stats();
1727	usleep(usecs);
1728}
1729
1730/*
1731 * Main function for kicking off and reaping jobs, as needed.
1732 */
1733static void run_threads(void)
1734{
1735	struct thread_data *td;
1736	unsigned int i, todo, nr_running, m_rate, t_rate, nr_started;
1737	uint64_t spent;
1738
1739	if (fio_gtod_offload && fio_start_gtod_thread())
1740		return;
1741
1742	fio_idle_prof_init();
1743
1744	set_sig_handlers();
1745
1746	nr_thread = nr_process = 0;
1747	for_each_td(td, i) {
1748		if (td->o.use_thread)
1749			nr_thread++;
1750		else
1751			nr_process++;
1752	}
1753
1754	if (output_format == FIO_OUTPUT_NORMAL) {
1755		log_info("Starting ");
1756		if (nr_thread)
1757			log_info("%d thread%s", nr_thread,
1758						nr_thread > 1 ? "s" : "");
1759		if (nr_process) {
1760			if (nr_thread)
1761				log_info(" and ");
1762			log_info("%d process%s", nr_process,
1763						nr_process > 1 ? "es" : "");
1764		}
1765		log_info("\n");
1766		fflush(stdout);
1767	}
1768
1769	todo = thread_number;
1770	nr_running = 0;
1771	nr_started = 0;
1772	m_rate = t_rate = 0;
1773
1774	for_each_td(td, i) {
1775		print_status_init(td->thread_number - 1);
1776
1777		if (!td->o.create_serialize)
1778			continue;
1779
1780		/*
1781		 * do file setup here so it happens sequentially,
1782		 * we don't want X number of threads getting their
1783		 * client data interspersed on disk
1784		 */
1785		if (setup_files(td)) {
1786			exit_value++;
1787			if (td->error)
1788				log_err("fio: pid=%d, err=%d/%s\n",
1789					(int) td->pid, td->error, td->verror);
1790			td_set_runstate(td, TD_REAPED);
1791			todo--;
1792		} else {
1793			struct fio_file *f;
1794			unsigned int j;
1795
1796			/*
1797			 * for sharing to work, each job must always open
1798			 * its own files. so close them, if we opened them
1799			 * for creation
1800			 */
1801			for_each_file(td, f, j) {
1802				if (fio_file_open(f))
1803					td_io_close_file(td, f);
1804			}
1805		}
1806	}
1807
1808	/* start idle threads before io threads start to run */
1809	fio_idle_prof_start();
1810
1811	set_genesis_time();
1812
1813	while (todo) {
1814		struct thread_data *map[REAL_MAX_JOBS];
1815		struct timeval this_start;
1816		int this_jobs = 0, left;
1817
1818		/*
1819		 * create threads (TD_NOT_CREATED -> TD_CREATED)
1820		 */
1821		for_each_td(td, i) {
1822			if (td->runstate != TD_NOT_CREATED)
1823				continue;
1824
1825			/*
1826			 * never got a chance to start, killed by other
1827			 * thread for some reason
1828			 */
1829			if (td->terminate) {
1830				todo--;
1831				continue;
1832			}
1833
1834			if (td->o.start_delay) {
1835				spent = utime_since_genesis();
1836
1837				if (td->o.start_delay > spent)
1838					continue;
1839			}
1840
1841			if (td->o.stonewall && (nr_started || nr_running)) {
1842				dprint(FD_PROCESS, "%s: stonewall wait\n",
1843							td->o.name);
1844				break;
1845			}
1846
1847			init_disk_util(td);
1848
1849			td->rusage_sem = fio_mutex_init(FIO_MUTEX_LOCKED);
1850			td->update_rusage = 0;
1851
1852			/*
1853			 * Set state to created. Thread will transition
1854			 * to TD_INITIALIZED when it's done setting up.
1855			 */
1856			td_set_runstate(td, TD_CREATED);
1857			map[this_jobs++] = td;
1858			nr_started++;
1859
1860			if (td->o.use_thread) {
1861				int ret;
1862
1863				dprint(FD_PROCESS, "will pthread_create\n");
1864				ret = pthread_create(&td->thread, NULL,
1865							thread_main, td);
1866				if (ret) {
1867					log_err("pthread_create: %s\n",
1868							strerror(ret));
1869					nr_started--;
1870					break;
1871				}
1872				ret = pthread_detach(td->thread);
1873				if (ret)
1874					log_err("pthread_detach: %s",
1875							strerror(ret));
1876			} else {
1877				pid_t pid;
1878				dprint(FD_PROCESS, "will fork\n");
1879				pid = fork();
1880				if (!pid) {
1881					int ret = fork_main(shm_id, i);
1882
1883					_exit(ret);
1884				} else if (i == fio_debug_jobno)
1885					*fio_debug_jobp = pid;
1886			}
1887			dprint(FD_MUTEX, "wait on startup_mutex\n");
1888			if (fio_mutex_down_timeout(startup_mutex, 10)) {
1889				log_err("fio: job startup hung? exiting.\n");
1890				fio_terminate_threads(TERMINATE_ALL);
1891				fio_abort = 1;
1892				nr_started--;
1893				break;
1894			}
1895			dprint(FD_MUTEX, "done waiting on startup_mutex\n");
1896		}
1897
1898		/*
1899		 * Wait for the started threads to transition to
1900		 * TD_INITIALIZED.
1901		 */
1902		fio_gettime(&this_start, NULL);
1903		left = this_jobs;
1904		while (left && !fio_abort) {
1905			if (mtime_since_now(&this_start) > JOB_START_TIMEOUT)
1906				break;
1907
1908			do_usleep(100000);
1909
1910			for (i = 0; i < this_jobs; i++) {
1911				td = map[i];
1912				if (!td)
1913					continue;
1914				if (td->runstate == TD_INITIALIZED) {
1915					map[i] = NULL;
1916					left--;
1917				} else if (td->runstate >= TD_EXITED) {
1918					map[i] = NULL;
1919					left--;
1920					todo--;
1921					nr_running++; /* work-around... */
1922				}
1923			}
1924		}
1925
1926		if (left) {
1927			log_err("fio: %d job%s failed to start\n", left,
1928					left > 1 ? "s" : "");
1929			for (i = 0; i < this_jobs; i++) {
1930				td = map[i];
1931				if (!td)
1932					continue;
1933				kill(td->pid, SIGTERM);
1934			}
1935			break;
1936		}
1937
1938		/*
1939		 * start created threads (TD_INITIALIZED -> TD_RUNNING).
1940		 */
1941		for_each_td(td, i) {
1942			if (td->runstate != TD_INITIALIZED)
1943				continue;
1944
1945			if (in_ramp_time(td))
1946				td_set_runstate(td, TD_RAMP);
1947			else
1948				td_set_runstate(td, TD_RUNNING);
1949			nr_running++;
1950			nr_started--;
1951			m_rate += ddir_rw_sum(td->o.ratemin);
1952			t_rate += ddir_rw_sum(td->o.rate);
1953			todo--;
1954			fio_mutex_up(td->mutex);
1955		}
1956
1957		reap_threads(&nr_running, &t_rate, &m_rate);
1958
1959		if (todo)
1960			do_usleep(100000);
1961	}
1962
1963	while (nr_running) {
1964		reap_threads(&nr_running, &t_rate, &m_rate);
1965		do_usleep(10000);
1966	}
1967
1968	fio_idle_prof_stop();
1969
1970	update_io_ticks();
1971}
1972
1973void wait_for_disk_thread_exit(void)
1974{
1975	fio_mutex_down(disk_thread_mutex);
1976}
1977
1978static void free_disk_util(void)
1979{
1980	disk_util_start_exit();
1981	wait_for_disk_thread_exit();
1982	disk_util_prune_entries();
1983}
1984
1985static void *disk_thread_main(void *data)
1986{
1987	int ret = 0;
1988
1989	fio_mutex_up(startup_mutex);
1990
1991	while (threads && !ret) {
1992		usleep(DISK_UTIL_MSEC * 1000);
1993		if (!threads)
1994			break;
1995		ret = update_io_ticks();
1996
1997		if (!is_backend)
1998			print_thread_status();
1999	}
2000
2001	fio_mutex_up(disk_thread_mutex);
2002	return NULL;
2003}
2004
2005static int create_disk_util_thread(void)
2006{
2007	int ret;
2008
2009	setup_disk_util();
2010
2011	disk_thread_mutex = fio_mutex_init(FIO_MUTEX_LOCKED);
2012
2013	ret = pthread_create(&disk_util_thread, NULL, disk_thread_main, NULL);
2014	if (ret) {
2015		fio_mutex_remove(disk_thread_mutex);
2016		log_err("Can't create disk util thread: %s\n", strerror(ret));
2017		return 1;
2018	}
2019
2020	ret = pthread_detach(disk_util_thread);
2021	if (ret) {
2022		fio_mutex_remove(disk_thread_mutex);
2023		log_err("Can't detatch disk util thread: %s\n", strerror(ret));
2024		return 1;
2025	}
2026
2027	dprint(FD_MUTEX, "wait on startup_mutex\n");
2028	fio_mutex_down(startup_mutex);
2029	dprint(FD_MUTEX, "done waiting on startup_mutex\n");
2030	return 0;
2031}
2032
2033int fio_backend(void)
2034{
2035	struct thread_data *td;
2036	int i;
2037
2038	if (exec_profile) {
2039		if (load_profile(exec_profile))
2040			return 1;
2041		free(exec_profile);
2042		exec_profile = NULL;
2043	}
2044	if (!thread_number)
2045		return 0;
2046
2047	if (write_bw_log) {
2048		struct log_params p = {
2049			.log_type = IO_LOG_TYPE_BW,
2050		};
2051
2052		setup_log(&agg_io_log[DDIR_READ], &p, "agg-read_bw.log");
2053		setup_log(&agg_io_log[DDIR_WRITE], &p, "agg-write_bw.log");
2054		setup_log(&agg_io_log[DDIR_TRIM], &p, "agg-trim_bw.log");
2055	}
2056
2057	startup_mutex = fio_mutex_init(FIO_MUTEX_LOCKED);
2058	if (startup_mutex == NULL)
2059		return 1;
2060
2061	set_genesis_time();
2062	stat_init();
2063	create_disk_util_thread();
2064
2065	cgroup_list = smalloc(sizeof(*cgroup_list));
2066	INIT_FLIST_HEAD(cgroup_list);
2067
2068	run_threads();
2069
2070	if (!fio_abort) {
2071		show_run_stats();
2072		if (write_bw_log) {
2073			int i;
2074
2075			for (i = 0; i < DDIR_RWDIR_CNT; i++) {
2076				struct io_log *log = agg_io_log[i];
2077
2078				flush_log(log);
2079				free_log(log);
2080			}
2081		}
2082	}
2083
2084	for_each_td(td, i)
2085		fio_options_free(td);
2086
2087	free_disk_util();
2088	cgroup_kill(cgroup_list);
2089	sfree(cgroup_list);
2090	sfree(cgroup_mnt);
2091
2092	fio_mutex_remove(startup_mutex);
2093	fio_mutex_remove(disk_thread_mutex);
2094	stat_exit();
2095	return exit_value;
2096}
2097