backend.c revision fdb0da8028e156c0da43aca18e1423d1b300bdad
1/*
2 * fio - the flexible io tester
3 *
4 * Copyright (C) 2005 Jens Axboe <axboe@suse.de>
5 * Copyright (C) 2006-2012 Jens Axboe <axboe@kernel.dk>
6 *
7 * The license below covers all files distributed with fio unless otherwise
8 * noted in the file itself.
9 *
10 *  This program is free software; you can redistribute it and/or modify
11 *  it under the terms of the GNU General Public License version 2 as
12 *  published by the Free Software Foundation.
13 *
14 *  This program is distributed in the hope that it will be useful,
15 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
16 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 *  GNU General Public License for more details.
18 *
19 *  You should have received a copy of the GNU General Public License
20 *  along with this program; if not, write to the Free Software
21 *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22 *
23 */
24#include <unistd.h>
25#include <fcntl.h>
26#include <string.h>
27#include <limits.h>
28#include <signal.h>
29#include <time.h>
30#include <locale.h>
31#include <assert.h>
32#include <time.h>
33#include <inttypes.h>
34#include <sys/stat.h>
35#include <sys/wait.h>
36#include <sys/ipc.h>
37#include <sys/mman.h>
38
39#include "fio.h"
40#ifndef FIO_NO_HAVE_SHM_H
41#include <sys/shm.h>
42#endif
43#include "hash.h"
44#include "smalloc.h"
45#include "verify.h"
46#include "trim.h"
47#include "diskutil.h"
48#include "cgroup.h"
49#include "profile.h"
50#include "lib/rand.h"
51#include "memalign.h"
52#include "server.h"
53#include "lib/getrusage.h"
54#include "idletime.h"
55#include "err.h"
56#include "lib/tp.h"
57
58static pthread_t disk_util_thread;
59static pthread_cond_t du_cond;
60static pthread_mutex_t du_lock;
61
62static struct fio_mutex *startup_mutex;
63static struct flist_head *cgroup_list;
64static char *cgroup_mnt;
65static int exit_value;
66static volatile int fio_abort;
67static unsigned int nr_process = 0;
68static unsigned int nr_thread = 0;
69
70struct io_log *agg_io_log[DDIR_RWDIR_CNT];
71
72int groupid = 0;
73unsigned int thread_number = 0;
74unsigned int stat_number = 0;
75int shm_id = 0;
76int temp_stall_ts;
77unsigned long done_secs = 0;
78volatile int disk_util_exit = 0;
79
80#define PAGE_ALIGN(buf)	\
81	(char *) (((uintptr_t) (buf) + page_mask) & ~page_mask)
82
83#define JOB_START_TIMEOUT	(5 * 1000)
84
85static void sig_int(int sig)
86{
87	if (threads) {
88		if (is_backend)
89			fio_server_got_signal(sig);
90		else {
91			log_info("\nfio: terminating on signal %d\n", sig);
92			log_info_flush();
93			exit_value = 128;
94		}
95
96		fio_terminate_threads(TERMINATE_ALL);
97	}
98}
99
100static void sig_show_status(int sig)
101{
102	show_running_run_stats();
103}
104
105static void set_sig_handlers(void)
106{
107	struct sigaction act;
108
109	memset(&act, 0, sizeof(act));
110	act.sa_handler = sig_int;
111	act.sa_flags = SA_RESTART;
112	sigaction(SIGINT, &act, NULL);
113
114	memset(&act, 0, sizeof(act));
115	act.sa_handler = sig_int;
116	act.sa_flags = SA_RESTART;
117	sigaction(SIGTERM, &act, NULL);
118
119/* Windows uses SIGBREAK as a quit signal from other applications */
120#ifdef WIN32
121	memset(&act, 0, sizeof(act));
122	act.sa_handler = sig_int;
123	act.sa_flags = SA_RESTART;
124	sigaction(SIGBREAK, &act, NULL);
125#endif
126
127	memset(&act, 0, sizeof(act));
128	act.sa_handler = sig_show_status;
129	act.sa_flags = SA_RESTART;
130	sigaction(SIGUSR1, &act, NULL);
131
132	if (is_backend) {
133		memset(&act, 0, sizeof(act));
134		act.sa_handler = sig_int;
135		act.sa_flags = SA_RESTART;
136		sigaction(SIGPIPE, &act, NULL);
137	}
138}
139
140/*
141 * Check if we are above the minimum rate given.
142 */
143static int __check_min_rate(struct thread_data *td, struct timeval *now,
144			    enum fio_ddir ddir)
145{
146	unsigned long long bytes = 0;
147	unsigned long iops = 0;
148	unsigned long spent;
149	unsigned long rate;
150	unsigned int ratemin = 0;
151	unsigned int rate_iops = 0;
152	unsigned int rate_iops_min = 0;
153
154	assert(ddir_rw(ddir));
155
156	if (!td->o.ratemin[ddir] && !td->o.rate_iops_min[ddir])
157		return 0;
158
159	/*
160	 * allow a 2 second settle period in the beginning
161	 */
162	if (mtime_since(&td->start, now) < 2000)
163		return 0;
164
165	iops += td->this_io_blocks[ddir];
166	bytes += td->this_io_bytes[ddir];
167	ratemin += td->o.ratemin[ddir];
168	rate_iops += td->o.rate_iops[ddir];
169	rate_iops_min += td->o.rate_iops_min[ddir];
170
171	/*
172	 * if rate blocks is set, sample is running
173	 */
174	if (td->rate_bytes[ddir] || td->rate_blocks[ddir]) {
175		spent = mtime_since(&td->lastrate[ddir], now);
176		if (spent < td->o.ratecycle)
177			return 0;
178
179		if (td->o.rate[ddir]) {
180			/*
181			 * check bandwidth specified rate
182			 */
183			if (bytes < td->rate_bytes[ddir]) {
184				log_err("%s: min rate %u not met\n", td->o.name,
185								ratemin);
186				return 1;
187			} else {
188				if (spent)
189					rate = ((bytes - td->rate_bytes[ddir]) * 1000) / spent;
190				else
191					rate = 0;
192
193				if (rate < ratemin ||
194				    bytes < td->rate_bytes[ddir]) {
195					log_err("%s: min rate %u not met, got"
196						" %luKB/sec\n", td->o.name,
197							ratemin, rate);
198					return 1;
199				}
200			}
201		} else {
202			/*
203			 * checks iops specified rate
204			 */
205			if (iops < rate_iops) {
206				log_err("%s: min iops rate %u not met\n",
207						td->o.name, rate_iops);
208				return 1;
209			} else {
210				if (spent)
211					rate = ((iops - td->rate_blocks[ddir]) * 1000) / spent;
212				else
213					rate = 0;
214
215				if (rate < rate_iops_min ||
216				    iops < td->rate_blocks[ddir]) {
217					log_err("%s: min iops rate %u not met,"
218						" got %lu\n", td->o.name,
219							rate_iops_min, rate);
220				}
221			}
222		}
223	}
224
225	td->rate_bytes[ddir] = bytes;
226	td->rate_blocks[ddir] = iops;
227	memcpy(&td->lastrate[ddir], now, sizeof(*now));
228	return 0;
229}
230
231static int check_min_rate(struct thread_data *td, struct timeval *now,
232			  uint64_t *bytes_done)
233{
234	int ret = 0;
235
236	if (bytes_done[DDIR_READ])
237		ret |= __check_min_rate(td, now, DDIR_READ);
238	if (bytes_done[DDIR_WRITE])
239		ret |= __check_min_rate(td, now, DDIR_WRITE);
240	if (bytes_done[DDIR_TRIM])
241		ret |= __check_min_rate(td, now, DDIR_TRIM);
242
243	return ret;
244}
245
246/*
247 * When job exits, we can cancel the in-flight IO if we are using async
248 * io. Attempt to do so.
249 */
250static void cleanup_pending_aio(struct thread_data *td)
251{
252	int r;
253
254	/*
255	 * get immediately available events, if any
256	 */
257	r = io_u_queued_complete(td, 0, NULL);
258	if (r < 0)
259		return;
260
261	/*
262	 * now cancel remaining active events
263	 */
264	if (td->io_ops->cancel) {
265		struct io_u *io_u;
266		int i;
267
268		io_u_qiter(&td->io_u_all, io_u, i) {
269			if (io_u->flags & IO_U_F_FLIGHT) {
270				r = td->io_ops->cancel(td, io_u);
271				if (!r)
272					put_io_u(td, io_u);
273			}
274		}
275	}
276
277	if (td->cur_depth)
278		r = io_u_queued_complete(td, td->cur_depth, NULL);
279}
280
281/*
282 * Helper to handle the final sync of a file. Works just like the normal
283 * io path, just does everything sync.
284 */
285static int fio_io_sync(struct thread_data *td, struct fio_file *f)
286{
287	struct io_u *io_u = __get_io_u(td);
288	int ret;
289
290	if (!io_u)
291		return 1;
292
293	io_u->ddir = DDIR_SYNC;
294	io_u->file = f;
295
296	if (td_io_prep(td, io_u)) {
297		put_io_u(td, io_u);
298		return 1;
299	}
300
301requeue:
302	ret = td_io_queue(td, io_u);
303	if (ret < 0) {
304		td_verror(td, io_u->error, "td_io_queue");
305		put_io_u(td, io_u);
306		return 1;
307	} else if (ret == FIO_Q_QUEUED) {
308		if (io_u_queued_complete(td, 1, NULL) < 0)
309			return 1;
310	} else if (ret == FIO_Q_COMPLETED) {
311		if (io_u->error) {
312			td_verror(td, io_u->error, "td_io_queue");
313			return 1;
314		}
315
316		if (io_u_sync_complete(td, io_u, NULL) < 0)
317			return 1;
318	} else if (ret == FIO_Q_BUSY) {
319		if (td_io_commit(td))
320			return 1;
321		goto requeue;
322	}
323
324	return 0;
325}
326
327static int fio_file_fsync(struct thread_data *td, struct fio_file *f)
328{
329	int ret;
330
331	if (fio_file_open(f))
332		return fio_io_sync(td, f);
333
334	if (td_io_open_file(td, f))
335		return 1;
336
337	ret = fio_io_sync(td, f);
338	td_io_close_file(td, f);
339	return ret;
340}
341
342static inline void __update_tv_cache(struct thread_data *td)
343{
344	fio_gettime(&td->tv_cache, NULL);
345}
346
347static inline void update_tv_cache(struct thread_data *td)
348{
349	if ((++td->tv_cache_nr & td->tv_cache_mask) == td->tv_cache_mask)
350		__update_tv_cache(td);
351}
352
353static inline int runtime_exceeded(struct thread_data *td, struct timeval *t)
354{
355	if (in_ramp_time(td))
356		return 0;
357	if (!td->o.timeout)
358		return 0;
359	if (utime_since(&td->epoch, t) >= td->o.timeout)
360		return 1;
361
362	return 0;
363}
364
365static int break_on_this_error(struct thread_data *td, enum fio_ddir ddir,
366			       int *retptr)
367{
368	int ret = *retptr;
369
370	if (ret < 0 || td->error) {
371		int err = td->error;
372		enum error_type_bit eb;
373
374		if (ret < 0)
375			err = -ret;
376
377		eb = td_error_type(ddir, err);
378		if (!(td->o.continue_on_error & (1 << eb)))
379			return 1;
380
381		if (td_non_fatal_error(td, eb, err)) {
382		        /*
383		         * Continue with the I/Os in case of
384			 * a non fatal error.
385			 */
386			update_error_count(td, err);
387			td_clear_error(td);
388			*retptr = 0;
389			return 0;
390		} else if (td->o.fill_device && err == ENOSPC) {
391			/*
392			 * We expect to hit this error if
393			 * fill_device option is set.
394			 */
395			td_clear_error(td);
396			fio_mark_td_terminate(td);
397			return 1;
398		} else {
399			/*
400			 * Stop the I/O in case of a fatal
401			 * error.
402			 */
403			update_error_count(td, err);
404			return 1;
405		}
406	}
407
408	return 0;
409}
410
411static void check_update_rusage(struct thread_data *td)
412{
413	if (td->update_rusage) {
414		td->update_rusage = 0;
415		update_rusage_stat(td);
416		fio_mutex_up(td->rusage_sem);
417	}
418}
419
420/*
421 * The main verify engine. Runs over the writes we previously submitted,
422 * reads the blocks back in, and checks the crc/md5 of the data.
423 */
424static void do_verify(struct thread_data *td, uint64_t verify_bytes)
425{
426	uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
427	struct fio_file *f;
428	struct io_u *io_u;
429	int ret, min_events;
430	unsigned int i;
431
432	dprint(FD_VERIFY, "starting loop\n");
433
434	/*
435	 * sync io first and invalidate cache, to make sure we really
436	 * read from disk.
437	 */
438	for_each_file(td, f, i) {
439		if (!fio_file_open(f))
440			continue;
441		if (fio_io_sync(td, f))
442			break;
443		if (file_invalidate_cache(td, f))
444			break;
445	}
446
447	check_update_rusage(td);
448
449	if (td->error)
450		return;
451
452	td_set_runstate(td, TD_VERIFYING);
453
454	io_u = NULL;
455	while (!td->terminate) {
456		enum fio_ddir ddir;
457		int ret2, full;
458
459		update_tv_cache(td);
460		check_update_rusage(td);
461
462		if (runtime_exceeded(td, &td->tv_cache)) {
463			__update_tv_cache(td);
464			if (runtime_exceeded(td, &td->tv_cache)) {
465				fio_mark_td_terminate(td);
466				break;
467			}
468		}
469
470		if (flow_threshold_exceeded(td))
471			continue;
472
473		if (!td->o.experimental_verify) {
474			io_u = __get_io_u(td);
475			if (!io_u)
476				break;
477
478			if (get_next_verify(td, io_u)) {
479				put_io_u(td, io_u);
480				break;
481			}
482
483			if (td_io_prep(td, io_u)) {
484				put_io_u(td, io_u);
485				break;
486			}
487		} else {
488			if (ddir_rw_sum(bytes_done) + td->o.rw_min_bs > verify_bytes)
489				break;
490
491			while ((io_u = get_io_u(td)) != NULL) {
492				if (IS_ERR(io_u)) {
493					io_u = NULL;
494					ret = FIO_Q_BUSY;
495					goto reap;
496				}
497
498				/*
499				 * We are only interested in the places where
500				 * we wrote or trimmed IOs. Turn those into
501				 * reads for verification purposes.
502				 */
503				if (io_u->ddir == DDIR_READ) {
504					/*
505					 * Pretend we issued it for rwmix
506					 * accounting
507					 */
508					td->io_issues[DDIR_READ]++;
509					put_io_u(td, io_u);
510					continue;
511				} else if (io_u->ddir == DDIR_TRIM) {
512					io_u->ddir = DDIR_READ;
513					io_u->flags |= IO_U_F_TRIMMED;
514					break;
515				} else if (io_u->ddir == DDIR_WRITE) {
516					io_u->ddir = DDIR_READ;
517					break;
518				} else {
519					put_io_u(td, io_u);
520					continue;
521				}
522			}
523
524			if (!io_u)
525				break;
526		}
527
528		if (td->o.verify_async)
529			io_u->end_io = verify_io_u_async;
530		else
531			io_u->end_io = verify_io_u;
532
533		ddir = io_u->ddir;
534
535		ret = td_io_queue(td, io_u);
536		switch (ret) {
537		case FIO_Q_COMPLETED:
538			if (io_u->error) {
539				ret = -io_u->error;
540				clear_io_u(td, io_u);
541			} else if (io_u->resid) {
542				int bytes = io_u->xfer_buflen - io_u->resid;
543
544				/*
545				 * zero read, fail
546				 */
547				if (!bytes) {
548					td_verror(td, EIO, "full resid");
549					put_io_u(td, io_u);
550					break;
551				}
552
553				io_u->xfer_buflen = io_u->resid;
554				io_u->xfer_buf += bytes;
555				io_u->offset += bytes;
556
557				if (ddir_rw(io_u->ddir))
558					td->ts.short_io_u[io_u->ddir]++;
559
560				f = io_u->file;
561				if (io_u->offset == f->real_file_size)
562					goto sync_done;
563
564				requeue_io_u(td, &io_u);
565			} else {
566sync_done:
567				ret = io_u_sync_complete(td, io_u, bytes_done);
568				if (ret < 0)
569					break;
570			}
571			continue;
572		case FIO_Q_QUEUED:
573			break;
574		case FIO_Q_BUSY:
575			requeue_io_u(td, &io_u);
576			ret2 = td_io_commit(td);
577			if (ret2 < 0)
578				ret = ret2;
579			break;
580		default:
581			assert(ret < 0);
582			td_verror(td, -ret, "td_io_queue");
583			break;
584		}
585
586		if (break_on_this_error(td, ddir, &ret))
587			break;
588
589		/*
590		 * if we can queue more, do so. but check if there are
591		 * completed io_u's first. Note that we can get BUSY even
592		 * without IO queued, if the system is resource starved.
593		 */
594reap:
595		full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth);
596		if (full || !td->o.iodepth_batch_complete) {
597			min_events = min(td->o.iodepth_batch_complete,
598					 td->cur_depth);
599			/*
600			 * if the queue is full, we MUST reap at least 1 event
601			 */
602			if (full && !min_events)
603				min_events = 1;
604
605			do {
606				/*
607				 * Reap required number of io units, if any,
608				 * and do the verification on them through
609				 * the callback handler
610				 */
611				if (io_u_queued_complete(td, min_events, bytes_done) < 0) {
612					ret = -1;
613					break;
614				}
615			} while (full && (td->cur_depth > td->o.iodepth_low));
616		}
617		if (ret < 0)
618			break;
619	}
620
621	check_update_rusage(td);
622
623	if (!td->error) {
624		min_events = td->cur_depth;
625
626		if (min_events)
627			ret = io_u_queued_complete(td, min_events, NULL);
628	} else
629		cleanup_pending_aio(td);
630
631	td_set_runstate(td, TD_RUNNING);
632
633	dprint(FD_VERIFY, "exiting loop\n");
634}
635
636static unsigned int exceeds_number_ios(struct thread_data *td)
637{
638	unsigned long long number_ios;
639
640	if (!td->o.number_ios)
641		return 0;
642
643	number_ios = ddir_rw_sum(td->this_io_blocks);
644	number_ios += td->io_u_queued + td->io_u_in_flight;
645
646	return number_ios >= td->o.number_ios;
647}
648
649static int io_bytes_exceeded(struct thread_data *td)
650{
651	unsigned long long bytes, limit;
652
653	if (td_rw(td))
654		bytes = td->this_io_bytes[DDIR_READ] + td->this_io_bytes[DDIR_WRITE];
655	else if (td_write(td))
656		bytes = td->this_io_bytes[DDIR_WRITE];
657	else if (td_read(td))
658		bytes = td->this_io_bytes[DDIR_READ];
659	else
660		bytes = td->this_io_bytes[DDIR_TRIM];
661
662	if (td->o.io_limit)
663		limit = td->o.io_limit;
664	else
665		limit = td->o.size;
666
667	return bytes >= limit || exceeds_number_ios(td);
668}
669
670/*
671 * Main IO worker function. It retrieves io_u's to process and queues
672 * and reaps them, checking for rate and errors along the way.
673 *
674 * Returns number of bytes written and trimmed.
675 */
676static uint64_t do_io(struct thread_data *td)
677{
678	uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
679	unsigned int i;
680	int ret = 0;
681	uint64_t total_bytes, bytes_issued = 0;
682
683	if (in_ramp_time(td))
684		td_set_runstate(td, TD_RAMP);
685	else
686		td_set_runstate(td, TD_RUNNING);
687
688	lat_target_init(td);
689
690	/*
691	 * If verify_backlog is enabled, we'll run the verify in this
692	 * handler as well. For that case, we may need up to twice the
693	 * amount of bytes.
694	 */
695	total_bytes = td->o.size;
696	if (td->o.verify != VERIFY_NONE &&
697	   (td_write(td) && td->o.verify_backlog))
698		total_bytes += td->o.size;
699
700	while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) ||
701		(!flist_empty(&td->trim_list)) || !io_bytes_exceeded(td) ||
702		td->o.time_based) {
703		struct timeval comp_time;
704		int min_evts = 0;
705		struct io_u *io_u;
706		int ret2, full;
707		enum fio_ddir ddir;
708
709		check_update_rusage(td);
710
711		if (td->terminate || td->done)
712			break;
713
714		update_tv_cache(td);
715
716		if (runtime_exceeded(td, &td->tv_cache)) {
717			__update_tv_cache(td);
718			if (runtime_exceeded(td, &td->tv_cache)) {
719				fio_mark_td_terminate(td);
720				break;
721			}
722		}
723
724		if (flow_threshold_exceeded(td))
725			continue;
726
727		if (bytes_issued >= total_bytes)
728			break;
729
730		io_u = get_io_u(td);
731		if (IS_ERR_OR_NULL(io_u)) {
732			int err = PTR_ERR(io_u);
733
734			io_u = NULL;
735			if (err == -EBUSY) {
736				ret = FIO_Q_BUSY;
737				goto reap;
738			}
739			if (td->o.latency_target)
740				goto reap;
741			break;
742		}
743
744		ddir = io_u->ddir;
745
746		/*
747		 * Add verification end_io handler if:
748		 *	- Asked to verify (!td_rw(td))
749		 *	- Or the io_u is from our verify list (mixed write/ver)
750		 */
751		if (td->o.verify != VERIFY_NONE && io_u->ddir == DDIR_READ &&
752		    ((io_u->flags & IO_U_F_VER_LIST) || !td_rw(td))) {
753
754			if (!td->o.verify_pattern_bytes) {
755				io_u->rand_seed = __rand(&td->__verify_state);
756				if (sizeof(int) != sizeof(long *))
757					io_u->rand_seed *= __rand(&td->__verify_state);
758			}
759
760			if (td->o.verify_async)
761				io_u->end_io = verify_io_u_async;
762			else
763				io_u->end_io = verify_io_u;
764			td_set_runstate(td, TD_VERIFYING);
765		} else if (in_ramp_time(td))
766			td_set_runstate(td, TD_RAMP);
767		else
768			td_set_runstate(td, TD_RUNNING);
769
770		/*
771		 * Always log IO before it's issued, so we know the specific
772		 * order of it. The logged unit will track when the IO has
773		 * completed.
774		 */
775		if (td_write(td) && io_u->ddir == DDIR_WRITE &&
776		    td->o.do_verify &&
777		    td->o.verify != VERIFY_NONE &&
778		    !td->o.experimental_verify)
779			log_io_piece(td, io_u);
780
781		ret = td_io_queue(td, io_u);
782		switch (ret) {
783		case FIO_Q_COMPLETED:
784			if (io_u->error) {
785				ret = -io_u->error;
786				unlog_io_piece(td, io_u);
787				clear_io_u(td, io_u);
788			} else if (io_u->resid) {
789				int bytes = io_u->xfer_buflen - io_u->resid;
790				struct fio_file *f = io_u->file;
791
792				bytes_issued += bytes;
793
794				trim_io_piece(td, io_u);
795
796				/*
797				 * zero read, fail
798				 */
799				if (!bytes) {
800					unlog_io_piece(td, io_u);
801					td_verror(td, EIO, "full resid");
802					put_io_u(td, io_u);
803					break;
804				}
805
806				io_u->xfer_buflen = io_u->resid;
807				io_u->xfer_buf += bytes;
808				io_u->offset += bytes;
809
810				if (ddir_rw(io_u->ddir))
811					td->ts.short_io_u[io_u->ddir]++;
812
813				if (io_u->offset == f->real_file_size)
814					goto sync_done;
815
816				requeue_io_u(td, &io_u);
817			} else {
818sync_done:
819				if (__should_check_rate(td, DDIR_READ) ||
820				    __should_check_rate(td, DDIR_WRITE) ||
821				    __should_check_rate(td, DDIR_TRIM))
822					fio_gettime(&comp_time, NULL);
823
824				ret = io_u_sync_complete(td, io_u, bytes_done);
825				if (ret < 0)
826					break;
827				bytes_issued += io_u->xfer_buflen;
828			}
829			break;
830		case FIO_Q_QUEUED:
831			/*
832			 * if the engine doesn't have a commit hook,
833			 * the io_u is really queued. if it does have such
834			 * a hook, it has to call io_u_queued() itself.
835			 */
836			if (td->io_ops->commit == NULL)
837				io_u_queued(td, io_u);
838			bytes_issued += io_u->xfer_buflen;
839			break;
840		case FIO_Q_BUSY:
841			unlog_io_piece(td, io_u);
842			requeue_io_u(td, &io_u);
843			ret2 = td_io_commit(td);
844			if (ret2 < 0)
845				ret = ret2;
846			break;
847		default:
848			assert(ret < 0);
849			put_io_u(td, io_u);
850			break;
851		}
852
853		if (break_on_this_error(td, ddir, &ret))
854			break;
855
856		/*
857		 * See if we need to complete some commands. Note that we
858		 * can get BUSY even without IO queued, if the system is
859		 * resource starved.
860		 */
861reap:
862		full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth);
863		if (full || !td->o.iodepth_batch_complete) {
864			min_evts = min(td->o.iodepth_batch_complete,
865					td->cur_depth);
866			/*
867			 * if the queue is full, we MUST reap at least 1 event
868			 */
869			if (full && !min_evts)
870				min_evts = 1;
871
872			if (__should_check_rate(td, DDIR_READ) ||
873			    __should_check_rate(td, DDIR_WRITE) ||
874			    __should_check_rate(td, DDIR_TRIM))
875				fio_gettime(&comp_time, NULL);
876
877			do {
878				ret = io_u_queued_complete(td, min_evts, bytes_done);
879				if (ret < 0)
880					break;
881
882			} while (full && (td->cur_depth > td->o.iodepth_low));
883		}
884
885		if (ret < 0)
886			break;
887		if (!ddir_rw_sum(bytes_done) && !(td->io_ops->flags & FIO_NOIO))
888			continue;
889
890		if (!in_ramp_time(td) && should_check_rate(td, bytes_done)) {
891			if (check_min_rate(td, &comp_time, bytes_done)) {
892				if (exitall_on_terminate)
893					fio_terminate_threads(td->groupid);
894				td_verror(td, EIO, "check_min_rate");
895				break;
896			}
897		}
898		if (!in_ramp_time(td) && td->o.latency_target)
899			lat_target_check(td);
900
901		if (td->o.thinktime) {
902			unsigned long long b;
903
904			b = ddir_rw_sum(td->io_blocks);
905			if (!(b % td->o.thinktime_blocks)) {
906				int left;
907
908				io_u_quiesce(td);
909
910				if (td->o.thinktime_spin)
911					usec_spin(td->o.thinktime_spin);
912
913				left = td->o.thinktime - td->o.thinktime_spin;
914				if (left)
915					usec_sleep(td, left);
916			}
917		}
918	}
919
920	check_update_rusage(td);
921
922	if (td->trim_entries)
923		log_err("fio: %lu trim entries leaked?\n", td->trim_entries);
924
925	if (td->o.fill_device && td->error == ENOSPC) {
926		td->error = 0;
927		fio_mark_td_terminate(td);
928	}
929	if (!td->error) {
930		struct fio_file *f;
931
932		i = td->cur_depth;
933		if (i) {
934			ret = io_u_queued_complete(td, i, bytes_done);
935			if (td->o.fill_device && td->error == ENOSPC)
936				td->error = 0;
937		}
938
939		if (should_fsync(td) && td->o.end_fsync) {
940			td_set_runstate(td, TD_FSYNCING);
941
942			for_each_file(td, f, i) {
943				if (!fio_file_fsync(td, f))
944					continue;
945
946				log_err("fio: end_fsync failed for file %s\n",
947								f->file_name);
948			}
949		}
950	} else
951		cleanup_pending_aio(td);
952
953	/*
954	 * stop job if we failed doing any IO
955	 */
956	if (!ddir_rw_sum(td->this_io_bytes))
957		td->done = 1;
958
959	return bytes_done[DDIR_WRITE] + bytes_done[DDIR_TRIM];
960}
961
962static void cleanup_io_u(struct thread_data *td)
963{
964	struct io_u *io_u;
965
966	while ((io_u = io_u_qpop(&td->io_u_freelist)) != NULL) {
967
968		if (td->io_ops->io_u_free)
969			td->io_ops->io_u_free(td, io_u);
970
971		fio_memfree(io_u, sizeof(*io_u));
972	}
973
974	free_io_mem(td);
975
976	io_u_rexit(&td->io_u_requeues);
977	io_u_qexit(&td->io_u_freelist);
978	io_u_qexit(&td->io_u_all);
979}
980
981static int init_io_u(struct thread_data *td)
982{
983	struct io_u *io_u;
984	unsigned int max_bs, min_write;
985	int cl_align, i, max_units;
986	int data_xfer = 1, err;
987	char *p;
988
989	max_units = td->o.iodepth;
990	max_bs = td_max_bs(td);
991	min_write = td->o.min_bs[DDIR_WRITE];
992	td->orig_buffer_size = (unsigned long long) max_bs
993					* (unsigned long long) max_units;
994
995	if ((td->io_ops->flags & FIO_NOIO) || !(td_read(td) || td_write(td)))
996		data_xfer = 0;
997
998	err = 0;
999	err += io_u_rinit(&td->io_u_requeues, td->o.iodepth);
1000	err += io_u_qinit(&td->io_u_freelist, td->o.iodepth);
1001	err += io_u_qinit(&td->io_u_all, td->o.iodepth);
1002
1003	if (err) {
1004		log_err("fio: failed setting up IO queues\n");
1005		return 1;
1006	}
1007
1008	/*
1009	 * if we may later need to do address alignment, then add any
1010	 * possible adjustment here so that we don't cause a buffer
1011	 * overflow later. this adjustment may be too much if we get
1012	 * lucky and the allocator gives us an aligned address.
1013	 */
1014	if (td->o.odirect || td->o.mem_align || td->o.oatomic ||
1015	    (td->io_ops->flags & FIO_RAWIO))
1016		td->orig_buffer_size += page_mask + td->o.mem_align;
1017
1018	if (td->o.mem_type == MEM_SHMHUGE || td->o.mem_type == MEM_MMAPHUGE) {
1019		unsigned long bs;
1020
1021		bs = td->orig_buffer_size + td->o.hugepage_size - 1;
1022		td->orig_buffer_size = bs & ~(td->o.hugepage_size - 1);
1023	}
1024
1025	if (td->orig_buffer_size != (size_t) td->orig_buffer_size) {
1026		log_err("fio: IO memory too large. Reduce max_bs or iodepth\n");
1027		return 1;
1028	}
1029
1030	if (data_xfer && allocate_io_mem(td))
1031		return 1;
1032
1033	if (td->o.odirect || td->o.mem_align || td->o.oatomic ||
1034	    (td->io_ops->flags & FIO_RAWIO))
1035		p = PAGE_ALIGN(td->orig_buffer) + td->o.mem_align;
1036	else
1037		p = td->orig_buffer;
1038
1039	cl_align = os_cache_line_size();
1040
1041	for (i = 0; i < max_units; i++) {
1042		void *ptr;
1043
1044		if (td->terminate)
1045			return 1;
1046
1047		ptr = fio_memalign(cl_align, sizeof(*io_u));
1048		if (!ptr) {
1049			log_err("fio: unable to allocate aligned memory\n");
1050			break;
1051		}
1052
1053		io_u = ptr;
1054		memset(io_u, 0, sizeof(*io_u));
1055		INIT_FLIST_HEAD(&io_u->verify_list);
1056		dprint(FD_MEM, "io_u alloc %p, index %u\n", io_u, i);
1057
1058		if (data_xfer) {
1059			io_u->buf = p;
1060			dprint(FD_MEM, "io_u %p, mem %p\n", io_u, io_u->buf);
1061
1062			if (td_write(td))
1063				io_u_fill_buffer(td, io_u, min_write, max_bs);
1064			if (td_write(td) && td->o.verify_pattern_bytes) {
1065				/*
1066				 * Fill the buffer with the pattern if we are
1067				 * going to be doing writes.
1068				 */
1069				fill_verify_pattern(td, io_u->buf, max_bs, io_u, 0, 0);
1070			}
1071		}
1072
1073		io_u->index = i;
1074		io_u->flags = IO_U_F_FREE;
1075		io_u_qpush(&td->io_u_freelist, io_u);
1076
1077		/*
1078		 * io_u never leaves this stack, used for iteration of all
1079		 * io_u buffers.
1080		 */
1081		io_u_qpush(&td->io_u_all, io_u);
1082
1083		if (td->io_ops->io_u_init) {
1084			int ret = td->io_ops->io_u_init(td, io_u);
1085
1086			if (ret) {
1087				log_err("fio: failed to init engine data: %d\n", ret);
1088				return 1;
1089			}
1090		}
1091
1092		p += max_bs;
1093	}
1094
1095	return 0;
1096}
1097
1098static int switch_ioscheduler(struct thread_data *td)
1099{
1100	char tmp[256], tmp2[128];
1101	FILE *f;
1102	int ret;
1103
1104	if (td->io_ops->flags & FIO_DISKLESSIO)
1105		return 0;
1106
1107	sprintf(tmp, "%s/queue/scheduler", td->sysfs_root);
1108
1109	f = fopen(tmp, "r+");
1110	if (!f) {
1111		if (errno == ENOENT) {
1112			log_err("fio: os or kernel doesn't support IO scheduler"
1113				" switching\n");
1114			return 0;
1115		}
1116		td_verror(td, errno, "fopen iosched");
1117		return 1;
1118	}
1119
1120	/*
1121	 * Set io scheduler.
1122	 */
1123	ret = fwrite(td->o.ioscheduler, strlen(td->o.ioscheduler), 1, f);
1124	if (ferror(f) || ret != 1) {
1125		td_verror(td, errno, "fwrite");
1126		fclose(f);
1127		return 1;
1128	}
1129
1130	rewind(f);
1131
1132	/*
1133	 * Read back and check that the selected scheduler is now the default.
1134	 */
1135	ret = fread(tmp, sizeof(tmp), 1, f);
1136	if (ferror(f) || ret < 0) {
1137		td_verror(td, errno, "fread");
1138		fclose(f);
1139		return 1;
1140	}
1141	tmp[sizeof(tmp) - 1] = '\0';
1142
1143
1144	sprintf(tmp2, "[%s]", td->o.ioscheduler);
1145	if (!strstr(tmp, tmp2)) {
1146		log_err("fio: io scheduler %s not found\n", td->o.ioscheduler);
1147		td_verror(td, EINVAL, "iosched_switch");
1148		fclose(f);
1149		return 1;
1150	}
1151
1152	fclose(f);
1153	return 0;
1154}
1155
1156static int keep_running(struct thread_data *td)
1157{
1158	unsigned long long limit;
1159
1160	if (td->done)
1161		return 0;
1162	if (td->o.time_based)
1163		return 1;
1164	if (td->o.loops) {
1165		td->o.loops--;
1166		return 1;
1167	}
1168	if (exceeds_number_ios(td))
1169		return 0;
1170
1171	if (td->o.io_limit)
1172		limit = td->o.io_limit;
1173	else
1174		limit = td->o.size;
1175
1176	if (limit != -1ULL && ddir_rw_sum(td->io_bytes) < limit) {
1177		uint64_t diff;
1178
1179		/*
1180		 * If the difference is less than the minimum IO size, we
1181		 * are done.
1182		 */
1183		diff = limit - ddir_rw_sum(td->io_bytes);
1184		if (diff < td_max_bs(td))
1185			return 0;
1186
1187		if (fio_files_done(td))
1188			return 0;
1189
1190		return 1;
1191	}
1192
1193	return 0;
1194}
1195
1196static int exec_string(struct thread_options *o, const char *string, const char *mode)
1197{
1198	int ret, newlen = strlen(string) + strlen(o->name) + strlen(mode) + 9 + 1;
1199	char *str;
1200
1201	str = malloc(newlen);
1202	sprintf(str, "%s &> %s.%s.txt", string, o->name, mode);
1203
1204	log_info("%s : Saving output of %s in %s.%s.txt\n",o->name, mode, o->name, mode);
1205	ret = system(str);
1206	if (ret == -1)
1207		log_err("fio: exec of cmd <%s> failed\n", str);
1208
1209	free(str);
1210	return ret;
1211}
1212
1213/*
1214 * Dry run to compute correct state of numberio for verification.
1215 */
1216static uint64_t do_dry_run(struct thread_data *td)
1217{
1218	uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
1219
1220	td_set_runstate(td, TD_RUNNING);
1221
1222	while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) ||
1223		(!flist_empty(&td->trim_list)) || !io_bytes_exceeded(td)) {
1224		struct io_u *io_u;
1225		int ret;
1226
1227		if (td->terminate || td->done)
1228			break;
1229
1230		io_u = get_io_u(td);
1231		if (!io_u)
1232			break;
1233
1234		io_u->flags |= IO_U_F_FLIGHT;
1235		io_u->error = 0;
1236		io_u->resid = 0;
1237		if (ddir_rw(acct_ddir(io_u)))
1238			td->io_issues[acct_ddir(io_u)]++;
1239		if (ddir_rw(io_u->ddir)) {
1240			io_u_mark_depth(td, 1);
1241			td->ts.total_io_u[io_u->ddir]++;
1242		}
1243
1244		if (td_write(td) && io_u->ddir == DDIR_WRITE &&
1245		    td->o.do_verify &&
1246		    td->o.verify != VERIFY_NONE &&
1247		    !td->o.experimental_verify)
1248			log_io_piece(td, io_u);
1249
1250		ret = io_u_sync_complete(td, io_u, bytes_done);
1251		(void) ret;
1252	}
1253
1254	return bytes_done[DDIR_WRITE] + bytes_done[DDIR_TRIM];
1255}
1256
1257/*
1258 * Entry point for the thread based jobs. The process based jobs end up
1259 * here as well, after a little setup.
1260 */
1261static void *thread_main(void *data)
1262{
1263	unsigned long long elapsed;
1264	struct thread_data *td = data;
1265	struct thread_options *o = &td->o;
1266	pthread_condattr_t attr;
1267	int clear_state;
1268	int ret;
1269
1270	if (!o->use_thread) {
1271		setsid();
1272		td->pid = getpid();
1273	} else
1274		td->pid = gettid();
1275
1276	fio_local_clock_init(o->use_thread);
1277
1278	dprint(FD_PROCESS, "jobs pid=%d started\n", (int) td->pid);
1279
1280	if (is_backend)
1281		fio_server_send_start(td);
1282
1283	INIT_FLIST_HEAD(&td->io_log_list);
1284	INIT_FLIST_HEAD(&td->io_hist_list);
1285	INIT_FLIST_HEAD(&td->verify_list);
1286	INIT_FLIST_HEAD(&td->trim_list);
1287	INIT_FLIST_HEAD(&td->next_rand_list);
1288	pthread_mutex_init(&td->io_u_lock, NULL);
1289	td->io_hist_tree = RB_ROOT;
1290
1291	pthread_condattr_init(&attr);
1292	pthread_cond_init(&td->verify_cond, &attr);
1293	pthread_cond_init(&td->free_cond, &attr);
1294
1295	td_set_runstate(td, TD_INITIALIZED);
1296	dprint(FD_MUTEX, "up startup_mutex\n");
1297	fio_mutex_up(startup_mutex);
1298	dprint(FD_MUTEX, "wait on td->mutex\n");
1299	fio_mutex_down(td->mutex);
1300	dprint(FD_MUTEX, "done waiting on td->mutex\n");
1301
1302	/*
1303	 * A new gid requires privilege, so we need to do this before setting
1304	 * the uid.
1305	 */
1306	if (o->gid != -1U && setgid(o->gid)) {
1307		td_verror(td, errno, "setgid");
1308		goto err;
1309	}
1310	if (o->uid != -1U && setuid(o->uid)) {
1311		td_verror(td, errno, "setuid");
1312		goto err;
1313	}
1314
1315	/*
1316	 * If we have a gettimeofday() thread, make sure we exclude that
1317	 * thread from this job
1318	 */
1319	if (o->gtod_cpu)
1320		fio_cpu_clear(&o->cpumask, o->gtod_cpu);
1321
1322	/*
1323	 * Set affinity first, in case it has an impact on the memory
1324	 * allocations.
1325	 */
1326	if (o->cpumask_set) {
1327		if (o->cpus_allowed_policy == FIO_CPUS_SPLIT) {
1328			ret = fio_cpus_split(&o->cpumask, td->thread_number - 1);
1329			if (!ret) {
1330				log_err("fio: no CPUs set\n");
1331				log_err("fio: Try increasing number of available CPUs\n");
1332				td_verror(td, EINVAL, "cpus_split");
1333				goto err;
1334			}
1335		}
1336		ret = fio_setaffinity(td->pid, o->cpumask);
1337		if (ret == -1) {
1338			td_verror(td, errno, "cpu_set_affinity");
1339			goto err;
1340		}
1341	}
1342
1343#ifdef CONFIG_LIBNUMA
1344	/* numa node setup */
1345	if (o->numa_cpumask_set || o->numa_memmask_set) {
1346		struct bitmask *mask;
1347		int ret;
1348
1349		if (numa_available() < 0) {
1350			td_verror(td, errno, "Does not support NUMA API\n");
1351			goto err;
1352		}
1353
1354		if (o->numa_cpumask_set) {
1355			mask = numa_parse_nodestring(o->numa_cpunodes);
1356			ret = numa_run_on_node_mask(mask);
1357			numa_free_nodemask(mask);
1358			if (ret == -1) {
1359				td_verror(td, errno, \
1360					"numa_run_on_node_mask failed\n");
1361				goto err;
1362			}
1363		}
1364
1365		if (o->numa_memmask_set) {
1366
1367			mask = NULL;
1368			if (o->numa_memnodes)
1369				mask = numa_parse_nodestring(o->numa_memnodes);
1370
1371			switch (o->numa_mem_mode) {
1372			case MPOL_INTERLEAVE:
1373				numa_set_interleave_mask(mask);
1374				break;
1375			case MPOL_BIND:
1376				numa_set_membind(mask);
1377				break;
1378			case MPOL_LOCAL:
1379				numa_set_localalloc();
1380				break;
1381			case MPOL_PREFERRED:
1382				numa_set_preferred(o->numa_mem_prefer_node);
1383				break;
1384			case MPOL_DEFAULT:
1385			default:
1386				break;
1387			}
1388
1389			if (mask)
1390				numa_free_nodemask(mask);
1391
1392		}
1393	}
1394#endif
1395
1396	if (fio_pin_memory(td))
1397		goto err;
1398
1399	/*
1400	 * May alter parameters that init_io_u() will use, so we need to
1401	 * do this first.
1402	 */
1403	if (init_iolog(td))
1404		goto err;
1405
1406	if (init_io_u(td))
1407		goto err;
1408
1409	if (o->verify_async && verify_async_init(td))
1410		goto err;
1411
1412	if (o->ioprio) {
1413		ret = ioprio_set(IOPRIO_WHO_PROCESS, 0, o->ioprio_class, o->ioprio);
1414		if (ret == -1) {
1415			td_verror(td, errno, "ioprio_set");
1416			goto err;
1417		}
1418	}
1419
1420	if (o->cgroup && cgroup_setup(td, cgroup_list, &cgroup_mnt))
1421		goto err;
1422
1423	errno = 0;
1424	if (nice(o->nice) == -1 && errno != 0) {
1425		td_verror(td, errno, "nice");
1426		goto err;
1427	}
1428
1429	if (o->ioscheduler && switch_ioscheduler(td))
1430		goto err;
1431
1432	if (!o->create_serialize && setup_files(td))
1433		goto err;
1434
1435	if (td_io_init(td))
1436		goto err;
1437
1438	if (init_random_map(td))
1439		goto err;
1440
1441	if (o->exec_prerun && exec_string(o, o->exec_prerun, (const char *)"prerun"))
1442		goto err;
1443
1444	if (o->pre_read) {
1445		if (pre_read_files(td) < 0)
1446			goto err;
1447	}
1448
1449	if (td->flags & TD_F_COMPRESS_LOG)
1450		tp_init(&td->tp_data);
1451
1452	fio_verify_init(td);
1453
1454	fio_gettime(&td->epoch, NULL);
1455	fio_getrusage(&td->ru_start);
1456	clear_state = 0;
1457	while (keep_running(td)) {
1458		uint64_t verify_bytes;
1459
1460		fio_gettime(&td->start, NULL);
1461		memcpy(&td->bw_sample_time, &td->start, sizeof(td->start));
1462		memcpy(&td->iops_sample_time, &td->start, sizeof(td->start));
1463		memcpy(&td->tv_cache, &td->start, sizeof(td->start));
1464
1465		if (o->ratemin[DDIR_READ] || o->ratemin[DDIR_WRITE] ||
1466				o->ratemin[DDIR_TRIM]) {
1467		        memcpy(&td->lastrate[DDIR_READ], &td->bw_sample_time,
1468						sizeof(td->bw_sample_time));
1469		        memcpy(&td->lastrate[DDIR_WRITE], &td->bw_sample_time,
1470						sizeof(td->bw_sample_time));
1471		        memcpy(&td->lastrate[DDIR_TRIM], &td->bw_sample_time,
1472						sizeof(td->bw_sample_time));
1473		}
1474
1475		if (clear_state)
1476			clear_io_state(td);
1477
1478		prune_io_piece_log(td);
1479
1480		if (td->o.verify_only && (td_write(td) || td_rw(td)))
1481			verify_bytes = do_dry_run(td);
1482		else
1483			verify_bytes = do_io(td);
1484
1485		clear_state = 1;
1486
1487		if (td_read(td) && td->io_bytes[DDIR_READ]) {
1488			elapsed = utime_since_now(&td->start);
1489			td->ts.runtime[DDIR_READ] += elapsed;
1490		}
1491		if (td_write(td) && td->io_bytes[DDIR_WRITE]) {
1492			elapsed = utime_since_now(&td->start);
1493			td->ts.runtime[DDIR_WRITE] += elapsed;
1494		}
1495		if (td_trim(td) && td->io_bytes[DDIR_TRIM]) {
1496			elapsed = utime_since_now(&td->start);
1497			td->ts.runtime[DDIR_TRIM] += elapsed;
1498		}
1499
1500		if (td->error || td->terminate)
1501			break;
1502
1503		if (!o->do_verify ||
1504		    o->verify == VERIFY_NONE ||
1505		    (td->io_ops->flags & FIO_UNIDIR))
1506			continue;
1507
1508		clear_io_state(td);
1509
1510		fio_gettime(&td->start, NULL);
1511
1512		do_verify(td, verify_bytes);
1513
1514		td->ts.runtime[DDIR_READ] += utime_since_now(&td->start);
1515
1516		if (td->error || td->terminate)
1517			break;
1518	}
1519
1520	update_rusage_stat(td);
1521	td->ts.runtime[DDIR_READ] = (td->ts.runtime[DDIR_READ] + 999) / 1000;
1522	td->ts.runtime[DDIR_WRITE] = (td->ts.runtime[DDIR_WRITE] + 999) / 1000;
1523	td->ts.runtime[DDIR_TRIM] = (td->ts.runtime[DDIR_TRIM] + 999) / 1000;
1524	td->ts.total_run_time = mtime_since_now(&td->epoch);
1525	td->ts.io_bytes[DDIR_READ] = td->io_bytes[DDIR_READ];
1526	td->ts.io_bytes[DDIR_WRITE] = td->io_bytes[DDIR_WRITE];
1527	td->ts.io_bytes[DDIR_TRIM] = td->io_bytes[DDIR_TRIM];
1528
1529	fio_unpin_memory(td);
1530
1531	fio_writeout_logs(td);
1532
1533	if (td->flags & TD_F_COMPRESS_LOG)
1534		tp_exit(&td->tp_data);
1535
1536	if (o->exec_postrun)
1537		exec_string(o, o->exec_postrun, (const char *)"postrun");
1538
1539	if (exitall_on_terminate)
1540		fio_terminate_threads(td->groupid);
1541
1542err:
1543	if (td->error)
1544		log_info("fio: pid=%d, err=%d/%s\n", (int) td->pid, td->error,
1545							td->verror);
1546
1547	if (o->verify_async)
1548		verify_async_exit(td);
1549
1550	close_and_free_files(td);
1551	cleanup_io_u(td);
1552	close_ioengine(td);
1553	cgroup_shutdown(td, &cgroup_mnt);
1554
1555	if (o->cpumask_set) {
1556		int ret = fio_cpuset_exit(&o->cpumask);
1557
1558		td_verror(td, ret, "fio_cpuset_exit");
1559	}
1560
1561	/*
1562	 * do this very late, it will log file closing as well
1563	 */
1564	if (o->write_iolog_file)
1565		write_iolog_close(td);
1566
1567	fio_mutex_remove(td->rusage_sem);
1568	td->rusage_sem = NULL;
1569
1570	fio_mutex_remove(td->mutex);
1571	td->mutex = NULL;
1572
1573	td_set_runstate(td, TD_EXITED);
1574	return (void *) (uintptr_t) td->error;
1575}
1576
1577
1578/*
1579 * We cannot pass the td data into a forked process, so attach the td and
1580 * pass it to the thread worker.
1581 */
1582static int fork_main(int shmid, int offset)
1583{
1584	struct thread_data *td;
1585	void *data, *ret;
1586
1587#if !defined(__hpux) && !defined(CONFIG_NO_SHM)
1588	data = shmat(shmid, NULL, 0);
1589	if (data == (void *) -1) {
1590		int __err = errno;
1591
1592		perror("shmat");
1593		return __err;
1594	}
1595#else
1596	/*
1597	 * HP-UX inherits shm mappings?
1598	 */
1599	data = threads;
1600#endif
1601
1602	td = data + offset * sizeof(struct thread_data);
1603	ret = thread_main(td);
1604	shmdt(data);
1605	return (int) (uintptr_t) ret;
1606}
1607
1608static void dump_td_info(struct thread_data *td)
1609{
1610	log_err("fio: job '%s' hasn't exited in %lu seconds, it appears to "
1611		"be stuck. Doing forceful exit of this job.\n", td->o.name,
1612			(unsigned long) time_since_now(&td->terminate_time));
1613}
1614
1615/*
1616 * Run over the job map and reap the threads that have exited, if any.
1617 */
1618static void reap_threads(unsigned int *nr_running, unsigned int *t_rate,
1619			 unsigned int *m_rate)
1620{
1621	struct thread_data *td;
1622	unsigned int cputhreads, realthreads, pending;
1623	int i, status, ret;
1624
1625	/*
1626	 * reap exited threads (TD_EXITED -> TD_REAPED)
1627	 */
1628	realthreads = pending = cputhreads = 0;
1629	for_each_td(td, i) {
1630		int flags = 0;
1631
1632		/*
1633		 * ->io_ops is NULL for a thread that has closed its
1634		 * io engine
1635		 */
1636		if (td->io_ops && !strcmp(td->io_ops->name, "cpuio"))
1637			cputhreads++;
1638		else
1639			realthreads++;
1640
1641		if (!td->pid) {
1642			pending++;
1643			continue;
1644		}
1645		if (td->runstate == TD_REAPED)
1646			continue;
1647		if (td->o.use_thread) {
1648			if (td->runstate == TD_EXITED) {
1649				td_set_runstate(td, TD_REAPED);
1650				goto reaped;
1651			}
1652			continue;
1653		}
1654
1655		flags = WNOHANG;
1656		if (td->runstate == TD_EXITED)
1657			flags = 0;
1658
1659		/*
1660		 * check if someone quit or got killed in an unusual way
1661		 */
1662		ret = waitpid(td->pid, &status, flags);
1663		if (ret < 0) {
1664			if (errno == ECHILD) {
1665				log_err("fio: pid=%d disappeared %d\n",
1666						(int) td->pid, td->runstate);
1667				td->sig = ECHILD;
1668				td_set_runstate(td, TD_REAPED);
1669				goto reaped;
1670			}
1671			perror("waitpid");
1672		} else if (ret == td->pid) {
1673			if (WIFSIGNALED(status)) {
1674				int sig = WTERMSIG(status);
1675
1676				if (sig != SIGTERM && sig != SIGUSR2)
1677					log_err("fio: pid=%d, got signal=%d\n",
1678							(int) td->pid, sig);
1679				td->sig = sig;
1680				td_set_runstate(td, TD_REAPED);
1681				goto reaped;
1682			}
1683			if (WIFEXITED(status)) {
1684				if (WEXITSTATUS(status) && !td->error)
1685					td->error = WEXITSTATUS(status);
1686
1687				td_set_runstate(td, TD_REAPED);
1688				goto reaped;
1689			}
1690		}
1691
1692		/*
1693		 * If the job is stuck, do a forceful timeout of it and
1694		 * move on.
1695		 */
1696		if (td->terminate &&
1697		    time_since_now(&td->terminate_time) >= FIO_REAP_TIMEOUT) {
1698			dump_td_info(td);
1699			td_set_runstate(td, TD_REAPED);
1700			goto reaped;
1701		}
1702
1703		/*
1704		 * thread is not dead, continue
1705		 */
1706		pending++;
1707		continue;
1708reaped:
1709		(*nr_running)--;
1710		(*m_rate) -= ddir_rw_sum(td->o.ratemin);
1711		(*t_rate) -= ddir_rw_sum(td->o.rate);
1712		if (!td->pid)
1713			pending--;
1714
1715		if (td->error)
1716			exit_value++;
1717
1718		done_secs += mtime_since_now(&td->epoch) / 1000;
1719		profile_td_exit(td);
1720	}
1721
1722	if (*nr_running == cputhreads && !pending && realthreads)
1723		fio_terminate_threads(TERMINATE_ALL);
1724}
1725
1726static void do_usleep(unsigned int usecs)
1727{
1728	check_for_running_stats();
1729	usleep(usecs);
1730}
1731
1732/*
1733 * Main function for kicking off and reaping jobs, as needed.
1734 */
1735static void run_threads(void)
1736{
1737	struct thread_data *td;
1738	unsigned int i, todo, nr_running, m_rate, t_rate, nr_started;
1739	uint64_t spent;
1740
1741	if (fio_gtod_offload && fio_start_gtod_thread())
1742		return;
1743
1744	fio_idle_prof_init();
1745
1746	set_sig_handlers();
1747
1748	nr_thread = nr_process = 0;
1749	for_each_td(td, i) {
1750		if (td->o.use_thread)
1751			nr_thread++;
1752		else
1753			nr_process++;
1754	}
1755
1756	if (output_format == FIO_OUTPUT_NORMAL) {
1757		log_info("Starting ");
1758		if (nr_thread)
1759			log_info("%d thread%s", nr_thread,
1760						nr_thread > 1 ? "s" : "");
1761		if (nr_process) {
1762			if (nr_thread)
1763				log_info(" and ");
1764			log_info("%d process%s", nr_process,
1765						nr_process > 1 ? "es" : "");
1766		}
1767		log_info("\n");
1768		log_info_flush();
1769	}
1770
1771	todo = thread_number;
1772	nr_running = 0;
1773	nr_started = 0;
1774	m_rate = t_rate = 0;
1775
1776	for_each_td(td, i) {
1777		print_status_init(td->thread_number - 1);
1778
1779		if (!td->o.create_serialize)
1780			continue;
1781
1782		/*
1783		 * do file setup here so it happens sequentially,
1784		 * we don't want X number of threads getting their
1785		 * client data interspersed on disk
1786		 */
1787		if (setup_files(td)) {
1788			exit_value++;
1789			if (td->error)
1790				log_err("fio: pid=%d, err=%d/%s\n",
1791					(int) td->pid, td->error, td->verror);
1792			td_set_runstate(td, TD_REAPED);
1793			todo--;
1794		} else {
1795			struct fio_file *f;
1796			unsigned int j;
1797
1798			/*
1799			 * for sharing to work, each job must always open
1800			 * its own files. so close them, if we opened them
1801			 * for creation
1802			 */
1803			for_each_file(td, f, j) {
1804				if (fio_file_open(f))
1805					td_io_close_file(td, f);
1806			}
1807		}
1808	}
1809
1810	/* start idle threads before io threads start to run */
1811	fio_idle_prof_start();
1812
1813	set_genesis_time();
1814
1815	while (todo) {
1816		struct thread_data *map[REAL_MAX_JOBS];
1817		struct timeval this_start;
1818		int this_jobs = 0, left;
1819
1820		/*
1821		 * create threads (TD_NOT_CREATED -> TD_CREATED)
1822		 */
1823		for_each_td(td, i) {
1824			if (td->runstate != TD_NOT_CREATED)
1825				continue;
1826
1827			/*
1828			 * never got a chance to start, killed by other
1829			 * thread for some reason
1830			 */
1831			if (td->terminate) {
1832				todo--;
1833				continue;
1834			}
1835
1836			if (td->o.start_delay) {
1837				spent = utime_since_genesis();
1838
1839				if (td->o.start_delay > spent)
1840					continue;
1841			}
1842
1843			if (td->o.stonewall && (nr_started || nr_running)) {
1844				dprint(FD_PROCESS, "%s: stonewall wait\n",
1845							td->o.name);
1846				break;
1847			}
1848
1849			init_disk_util(td);
1850
1851			td->rusage_sem = fio_mutex_init(FIO_MUTEX_LOCKED);
1852			td->update_rusage = 0;
1853
1854			/*
1855			 * Set state to created. Thread will transition
1856			 * to TD_INITIALIZED when it's done setting up.
1857			 */
1858			td_set_runstate(td, TD_CREATED);
1859			map[this_jobs++] = td;
1860			nr_started++;
1861
1862			if (td->o.use_thread) {
1863				int ret;
1864
1865				dprint(FD_PROCESS, "will pthread_create\n");
1866				ret = pthread_create(&td->thread, NULL,
1867							thread_main, td);
1868				if (ret) {
1869					log_err("pthread_create: %s\n",
1870							strerror(ret));
1871					nr_started--;
1872					break;
1873				}
1874				ret = pthread_detach(td->thread);
1875				if (ret)
1876					log_err("pthread_detach: %s",
1877							strerror(ret));
1878			} else {
1879				pid_t pid;
1880				dprint(FD_PROCESS, "will fork\n");
1881				pid = fork();
1882				if (!pid) {
1883					int ret = fork_main(shm_id, i);
1884
1885					_exit(ret);
1886				} else if (i == fio_debug_jobno)
1887					*fio_debug_jobp = pid;
1888			}
1889			dprint(FD_MUTEX, "wait on startup_mutex\n");
1890			if (fio_mutex_down_timeout(startup_mutex, 10)) {
1891				log_err("fio: job startup hung? exiting.\n");
1892				fio_terminate_threads(TERMINATE_ALL);
1893				fio_abort = 1;
1894				nr_started--;
1895				break;
1896			}
1897			dprint(FD_MUTEX, "done waiting on startup_mutex\n");
1898		}
1899
1900		/*
1901		 * Wait for the started threads to transition to
1902		 * TD_INITIALIZED.
1903		 */
1904		fio_gettime(&this_start, NULL);
1905		left = this_jobs;
1906		while (left && !fio_abort) {
1907			if (mtime_since_now(&this_start) > JOB_START_TIMEOUT)
1908				break;
1909
1910			do_usleep(100000);
1911
1912			for (i = 0; i < this_jobs; i++) {
1913				td = map[i];
1914				if (!td)
1915					continue;
1916				if (td->runstate == TD_INITIALIZED) {
1917					map[i] = NULL;
1918					left--;
1919				} else if (td->runstate >= TD_EXITED) {
1920					map[i] = NULL;
1921					left--;
1922					todo--;
1923					nr_running++; /* work-around... */
1924				}
1925			}
1926		}
1927
1928		if (left) {
1929			log_err("fio: %d job%s failed to start\n", left,
1930					left > 1 ? "s" : "");
1931			for (i = 0; i < this_jobs; i++) {
1932				td = map[i];
1933				if (!td)
1934					continue;
1935				kill(td->pid, SIGTERM);
1936			}
1937			break;
1938		}
1939
1940		/*
1941		 * start created threads (TD_INITIALIZED -> TD_RUNNING).
1942		 */
1943		for_each_td(td, i) {
1944			if (td->runstate != TD_INITIALIZED)
1945				continue;
1946
1947			if (in_ramp_time(td))
1948				td_set_runstate(td, TD_RAMP);
1949			else
1950				td_set_runstate(td, TD_RUNNING);
1951			nr_running++;
1952			nr_started--;
1953			m_rate += ddir_rw_sum(td->o.ratemin);
1954			t_rate += ddir_rw_sum(td->o.rate);
1955			todo--;
1956			fio_mutex_up(td->mutex);
1957		}
1958
1959		reap_threads(&nr_running, &t_rate, &m_rate);
1960
1961		if (todo)
1962			do_usleep(100000);
1963	}
1964
1965	while (nr_running) {
1966		reap_threads(&nr_running, &t_rate, &m_rate);
1967		do_usleep(10000);
1968	}
1969
1970	fio_idle_prof_stop();
1971
1972	update_io_ticks();
1973}
1974
1975static void wait_for_disk_thread_exit(void)
1976{
1977	void *ret;
1978
1979	disk_util_start_exit();
1980	pthread_cond_signal(&du_cond);
1981	pthread_join(disk_util_thread, &ret);
1982}
1983
1984static void free_disk_util(void)
1985{
1986	disk_util_prune_entries();
1987
1988	pthread_cond_destroy(&du_cond);
1989}
1990
1991static void *disk_thread_main(void *data)
1992{
1993	int ret = 0;
1994
1995	fio_mutex_up(startup_mutex);
1996
1997	while (!ret) {
1998		uint64_t sec = DISK_UTIL_MSEC / 1000;
1999		uint64_t nsec = (DISK_UTIL_MSEC % 1000) * 1000000;
2000		struct timespec ts;
2001		struct timeval tv;
2002
2003		gettimeofday(&tv, NULL);
2004		ts.tv_sec = tv.tv_sec + sec;
2005		ts.tv_nsec = (tv.tv_usec * 1000) + nsec;
2006		if (ts.tv_nsec > 1000000000ULL) {
2007			ts.tv_nsec -= 1000000000ULL;
2008			ts.tv_sec++;
2009		}
2010
2011		ret = pthread_cond_timedwait(&du_cond, &du_lock, &ts);
2012		if (ret != ETIMEDOUT)
2013			break;
2014
2015		ret = update_io_ticks();
2016
2017		if (!is_backend)
2018			print_thread_status();
2019	}
2020
2021	return NULL;
2022}
2023
2024static int create_disk_util_thread(void)
2025{
2026	int ret;
2027
2028	setup_disk_util();
2029
2030	pthread_cond_init(&du_cond, NULL);
2031	pthread_mutex_init(&du_lock, NULL);
2032
2033	ret = pthread_create(&disk_util_thread, NULL, disk_thread_main, NULL);
2034	if (ret) {
2035		log_err("Can't create disk util thread: %s\n", strerror(ret));
2036		return 1;
2037	}
2038
2039	dprint(FD_MUTEX, "wait on startup_mutex\n");
2040	fio_mutex_down(startup_mutex);
2041	dprint(FD_MUTEX, "done waiting on startup_mutex\n");
2042	return 0;
2043}
2044
2045int fio_backend(void)
2046{
2047	struct thread_data *td;
2048	int i;
2049
2050	if (exec_profile) {
2051		if (load_profile(exec_profile))
2052			return 1;
2053		free(exec_profile);
2054		exec_profile = NULL;
2055	}
2056	if (!thread_number)
2057		return 0;
2058
2059	if (write_bw_log) {
2060		struct log_params p = {
2061			.log_type = IO_LOG_TYPE_BW,
2062		};
2063
2064		setup_log(&agg_io_log[DDIR_READ], &p, "agg-read_bw.log");
2065		setup_log(&agg_io_log[DDIR_WRITE], &p, "agg-write_bw.log");
2066		setup_log(&agg_io_log[DDIR_TRIM], &p, "agg-trim_bw.log");
2067	}
2068
2069	startup_mutex = fio_mutex_init(FIO_MUTEX_LOCKED);
2070	if (startup_mutex == NULL)
2071		return 1;
2072
2073	set_genesis_time();
2074	stat_init();
2075	create_disk_util_thread();
2076	create_status_interval_thread();
2077
2078	cgroup_list = smalloc(sizeof(*cgroup_list));
2079	INIT_FLIST_HEAD(cgroup_list);
2080
2081	run_threads();
2082
2083	wait_for_disk_thread_exit();
2084	wait_for_status_interval_thread_exit();
2085
2086	if (!fio_abort) {
2087		__show_run_stats();
2088		if (write_bw_log) {
2089			int i;
2090
2091			for (i = 0; i < DDIR_RWDIR_CNT; i++) {
2092				struct io_log *log = agg_io_log[i];
2093
2094				flush_log(log);
2095				free_log(log);
2096			}
2097		}
2098	}
2099
2100	for_each_td(td, i)
2101		fio_options_free(td);
2102
2103	free_disk_util();
2104	cgroup_kill(cgroup_list);
2105	sfree(cgroup_list);
2106	sfree(cgroup_mnt);
2107
2108	fio_mutex_remove(startup_mutex);
2109	stat_exit();
2110	return exit_value;
2111}
2112