backend.c revision de54cfd8b8e93d2a32a02961f1587b83f0763aa8
1/*
2 * fio - the flexible io tester
3 *
4 * Copyright (C) 2005 Jens Axboe <axboe@suse.de>
5 * Copyright (C) 2006-2012 Jens Axboe <axboe@kernel.dk>
6 *
7 * The license below covers all files distributed with fio unless otherwise
8 * noted in the file itself.
9 *
10 *  This program is free software; you can redistribute it and/or modify
11 *  it under the terms of the GNU General Public License version 2 as
12 *  published by the Free Software Foundation.
13 *
14 *  This program is distributed in the hope that it will be useful,
15 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
16 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 *  GNU General Public License for more details.
18 *
19 *  You should have received a copy of the GNU General Public License
20 *  along with this program; if not, write to the Free Software
21 *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
22 *
23 */
24#include <unistd.h>
25#include <fcntl.h>
26#include <string.h>
27#include <limits.h>
28#include <signal.h>
29#include <time.h>
30#include <locale.h>
31#include <assert.h>
32#include <time.h>
33#include <inttypes.h>
34#include <sys/stat.h>
35#include <sys/wait.h>
36#include <sys/ipc.h>
37#include <sys/mman.h>
38
39#include "fio.h"
40#ifndef FIO_NO_HAVE_SHM_H
41#include <sys/shm.h>
42#endif
43#include "hash.h"
44#include "smalloc.h"
45#include "verify.h"
46#include "trim.h"
47#include "diskutil.h"
48#include "cgroup.h"
49#include "profile.h"
50#include "lib/rand.h"
51#include "memalign.h"
52#include "server.h"
53#include "lib/getrusage.h"
54#include "idletime.h"
55#include "err.h"
56#include "lib/tp.h"
57
58static pthread_t helper_thread;
59static pthread_mutex_t helper_lock;
60pthread_cond_t helper_cond;
61int helper_do_stat = 0;
62
63static struct fio_mutex *startup_mutex;
64static struct flist_head *cgroup_list;
65static char *cgroup_mnt;
66static int exit_value;
67static volatile int fio_abort;
68static unsigned int nr_process = 0;
69static unsigned int nr_thread = 0;
70
71struct io_log *agg_io_log[DDIR_RWDIR_CNT];
72
73int groupid = 0;
74unsigned int thread_number = 0;
75unsigned int stat_number = 0;
76int shm_id = 0;
77int temp_stall_ts;
78unsigned long done_secs = 0;
79volatile int helper_exit = 0;
80
81#define PAGE_ALIGN(buf)	\
82	(char *) (((uintptr_t) (buf) + page_mask) & ~page_mask)
83
84#define JOB_START_TIMEOUT	(5 * 1000)
85
86static void sig_int(int sig)
87{
88	if (threads) {
89		if (is_backend)
90			fio_server_got_signal(sig);
91		else {
92			log_info("\nfio: terminating on signal %d\n", sig);
93			log_info_flush();
94			exit_value = 128;
95		}
96
97		fio_terminate_threads(TERMINATE_ALL);
98	}
99}
100
101static void sig_show_status(int sig)
102{
103	show_running_run_stats();
104}
105
106static void set_sig_handlers(void)
107{
108	struct sigaction act;
109
110	memset(&act, 0, sizeof(act));
111	act.sa_handler = sig_int;
112	act.sa_flags = SA_RESTART;
113	sigaction(SIGINT, &act, NULL);
114
115	memset(&act, 0, sizeof(act));
116	act.sa_handler = sig_int;
117	act.sa_flags = SA_RESTART;
118	sigaction(SIGTERM, &act, NULL);
119
120/* Windows uses SIGBREAK as a quit signal from other applications */
121#ifdef WIN32
122	memset(&act, 0, sizeof(act));
123	act.sa_handler = sig_int;
124	act.sa_flags = SA_RESTART;
125	sigaction(SIGBREAK, &act, NULL);
126#endif
127
128	memset(&act, 0, sizeof(act));
129	act.sa_handler = sig_show_status;
130	act.sa_flags = SA_RESTART;
131	sigaction(SIGUSR1, &act, NULL);
132
133	if (is_backend) {
134		memset(&act, 0, sizeof(act));
135		act.sa_handler = sig_int;
136		act.sa_flags = SA_RESTART;
137		sigaction(SIGPIPE, &act, NULL);
138	}
139}
140
141/*
142 * Check if we are above the minimum rate given.
143 */
144static int __check_min_rate(struct thread_data *td, struct timeval *now,
145			    enum fio_ddir ddir)
146{
147	unsigned long long bytes = 0;
148	unsigned long iops = 0;
149	unsigned long spent;
150	unsigned long rate;
151	unsigned int ratemin = 0;
152	unsigned int rate_iops = 0;
153	unsigned int rate_iops_min = 0;
154
155	assert(ddir_rw(ddir));
156
157	if (!td->o.ratemin[ddir] && !td->o.rate_iops_min[ddir])
158		return 0;
159
160	/*
161	 * allow a 2 second settle period in the beginning
162	 */
163	if (mtime_since(&td->start, now) < 2000)
164		return 0;
165
166	iops += td->this_io_blocks[ddir];
167	bytes += td->this_io_bytes[ddir];
168	ratemin += td->o.ratemin[ddir];
169	rate_iops += td->o.rate_iops[ddir];
170	rate_iops_min += td->o.rate_iops_min[ddir];
171
172	/*
173	 * if rate blocks is set, sample is running
174	 */
175	if (td->rate_bytes[ddir] || td->rate_blocks[ddir]) {
176		spent = mtime_since(&td->lastrate[ddir], now);
177		if (spent < td->o.ratecycle)
178			return 0;
179
180		if (td->o.rate[ddir]) {
181			/*
182			 * check bandwidth specified rate
183			 */
184			if (bytes < td->rate_bytes[ddir]) {
185				log_err("%s: min rate %u not met\n", td->o.name,
186								ratemin);
187				return 1;
188			} else {
189				if (spent)
190					rate = ((bytes - td->rate_bytes[ddir]) * 1000) / spent;
191				else
192					rate = 0;
193
194				if (rate < ratemin ||
195				    bytes < td->rate_bytes[ddir]) {
196					log_err("%s: min rate %u not met, got"
197						" %luKB/sec\n", td->o.name,
198							ratemin, rate);
199					return 1;
200				}
201			}
202		} else {
203			/*
204			 * checks iops specified rate
205			 */
206			if (iops < rate_iops) {
207				log_err("%s: min iops rate %u not met\n",
208						td->o.name, rate_iops);
209				return 1;
210			} else {
211				if (spent)
212					rate = ((iops - td->rate_blocks[ddir]) * 1000) / spent;
213				else
214					rate = 0;
215
216				if (rate < rate_iops_min ||
217				    iops < td->rate_blocks[ddir]) {
218					log_err("%s: min iops rate %u not met,"
219						" got %lu\n", td->o.name,
220							rate_iops_min, rate);
221				}
222			}
223		}
224	}
225
226	td->rate_bytes[ddir] = bytes;
227	td->rate_blocks[ddir] = iops;
228	memcpy(&td->lastrate[ddir], now, sizeof(*now));
229	return 0;
230}
231
232static int check_min_rate(struct thread_data *td, struct timeval *now,
233			  uint64_t *bytes_done)
234{
235	int ret = 0;
236
237	if (bytes_done[DDIR_READ])
238		ret |= __check_min_rate(td, now, DDIR_READ);
239	if (bytes_done[DDIR_WRITE])
240		ret |= __check_min_rate(td, now, DDIR_WRITE);
241	if (bytes_done[DDIR_TRIM])
242		ret |= __check_min_rate(td, now, DDIR_TRIM);
243
244	return ret;
245}
246
247/*
248 * When job exits, we can cancel the in-flight IO if we are using async
249 * io. Attempt to do so.
250 */
251static void cleanup_pending_aio(struct thread_data *td)
252{
253	int r;
254
255	/*
256	 * get immediately available events, if any
257	 */
258	r = io_u_queued_complete(td, 0, NULL);
259	if (r < 0)
260		return;
261
262	/*
263	 * now cancel remaining active events
264	 */
265	if (td->io_ops->cancel) {
266		struct io_u *io_u;
267		int i;
268
269		io_u_qiter(&td->io_u_all, io_u, i) {
270			if (io_u->flags & IO_U_F_FLIGHT) {
271				r = td->io_ops->cancel(td, io_u);
272				if (!r)
273					put_io_u(td, io_u);
274			}
275		}
276	}
277
278	if (td->cur_depth)
279		r = io_u_queued_complete(td, td->cur_depth, NULL);
280}
281
282/*
283 * Helper to handle the final sync of a file. Works just like the normal
284 * io path, just does everything sync.
285 */
286static int fio_io_sync(struct thread_data *td, struct fio_file *f)
287{
288	struct io_u *io_u = __get_io_u(td);
289	int ret;
290
291	if (!io_u)
292		return 1;
293
294	io_u->ddir = DDIR_SYNC;
295	io_u->file = f;
296
297	if (td_io_prep(td, io_u)) {
298		put_io_u(td, io_u);
299		return 1;
300	}
301
302requeue:
303	ret = td_io_queue(td, io_u);
304	if (ret < 0) {
305		td_verror(td, io_u->error, "td_io_queue");
306		put_io_u(td, io_u);
307		return 1;
308	} else if (ret == FIO_Q_QUEUED) {
309		if (io_u_queued_complete(td, 1, NULL) < 0)
310			return 1;
311	} else if (ret == FIO_Q_COMPLETED) {
312		if (io_u->error) {
313			td_verror(td, io_u->error, "td_io_queue");
314			return 1;
315		}
316
317		if (io_u_sync_complete(td, io_u, NULL) < 0)
318			return 1;
319	} else if (ret == FIO_Q_BUSY) {
320		if (td_io_commit(td))
321			return 1;
322		goto requeue;
323	}
324
325	return 0;
326}
327
328static int fio_file_fsync(struct thread_data *td, struct fio_file *f)
329{
330	int ret;
331
332	if (fio_file_open(f))
333		return fio_io_sync(td, f);
334
335	if (td_io_open_file(td, f))
336		return 1;
337
338	ret = fio_io_sync(td, f);
339	td_io_close_file(td, f);
340	return ret;
341}
342
343static inline void __update_tv_cache(struct thread_data *td)
344{
345	fio_gettime(&td->tv_cache, NULL);
346}
347
348static inline void update_tv_cache(struct thread_data *td)
349{
350	if ((++td->tv_cache_nr & td->tv_cache_mask) == td->tv_cache_mask)
351		__update_tv_cache(td);
352}
353
354static inline int runtime_exceeded(struct thread_data *td, struct timeval *t)
355{
356	if (in_ramp_time(td))
357		return 0;
358	if (!td->o.timeout)
359		return 0;
360	if (utime_since(&td->epoch, t) >= td->o.timeout)
361		return 1;
362
363	return 0;
364}
365
366static int break_on_this_error(struct thread_data *td, enum fio_ddir ddir,
367			       int *retptr)
368{
369	int ret = *retptr;
370
371	if (ret < 0 || td->error) {
372		int err = td->error;
373		enum error_type_bit eb;
374
375		if (ret < 0)
376			err = -ret;
377
378		eb = td_error_type(ddir, err);
379		if (!(td->o.continue_on_error & (1 << eb)))
380			return 1;
381
382		if (td_non_fatal_error(td, eb, err)) {
383		        /*
384		         * Continue with the I/Os in case of
385			 * a non fatal error.
386			 */
387			update_error_count(td, err);
388			td_clear_error(td);
389			*retptr = 0;
390			return 0;
391		} else if (td->o.fill_device && err == ENOSPC) {
392			/*
393			 * We expect to hit this error if
394			 * fill_device option is set.
395			 */
396			td_clear_error(td);
397			fio_mark_td_terminate(td);
398			return 1;
399		} else {
400			/*
401			 * Stop the I/O in case of a fatal
402			 * error.
403			 */
404			update_error_count(td, err);
405			return 1;
406		}
407	}
408
409	return 0;
410}
411
412static void check_update_rusage(struct thread_data *td)
413{
414	if (td->update_rusage) {
415		td->update_rusage = 0;
416		update_rusage_stat(td);
417		fio_mutex_up(td->rusage_sem);
418	}
419}
420
421/*
422 * The main verify engine. Runs over the writes we previously submitted,
423 * reads the blocks back in, and checks the crc/md5 of the data.
424 */
425static void do_verify(struct thread_data *td, uint64_t verify_bytes)
426{
427	uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
428	struct fio_file *f;
429	struct io_u *io_u;
430	int ret, min_events;
431	unsigned int i;
432
433	dprint(FD_VERIFY, "starting loop\n");
434
435	/*
436	 * sync io first and invalidate cache, to make sure we really
437	 * read from disk.
438	 */
439	for_each_file(td, f, i) {
440		if (!fio_file_open(f))
441			continue;
442		if (fio_io_sync(td, f))
443			break;
444		if (file_invalidate_cache(td, f))
445			break;
446	}
447
448	check_update_rusage(td);
449
450	if (td->error)
451		return;
452
453	td_set_runstate(td, TD_VERIFYING);
454
455	io_u = NULL;
456	while (!td->terminate) {
457		enum fio_ddir ddir;
458		int ret2, full;
459
460		update_tv_cache(td);
461		check_update_rusage(td);
462
463		if (runtime_exceeded(td, &td->tv_cache)) {
464			__update_tv_cache(td);
465			if (runtime_exceeded(td, &td->tv_cache)) {
466				fio_mark_td_terminate(td);
467				break;
468			}
469		}
470
471		if (flow_threshold_exceeded(td))
472			continue;
473
474		if (!td->o.experimental_verify) {
475			io_u = __get_io_u(td);
476			if (!io_u)
477				break;
478
479			if (get_next_verify(td, io_u)) {
480				put_io_u(td, io_u);
481				break;
482			}
483
484			if (td_io_prep(td, io_u)) {
485				put_io_u(td, io_u);
486				break;
487			}
488		} else {
489			if (ddir_rw_sum(bytes_done) + td->o.rw_min_bs > verify_bytes)
490				break;
491
492			while ((io_u = get_io_u(td)) != NULL) {
493				if (IS_ERR(io_u)) {
494					io_u = NULL;
495					ret = FIO_Q_BUSY;
496					goto reap;
497				}
498
499				/*
500				 * We are only interested in the places where
501				 * we wrote or trimmed IOs. Turn those into
502				 * reads for verification purposes.
503				 */
504				if (io_u->ddir == DDIR_READ) {
505					/*
506					 * Pretend we issued it for rwmix
507					 * accounting
508					 */
509					td->io_issues[DDIR_READ]++;
510					put_io_u(td, io_u);
511					continue;
512				} else if (io_u->ddir == DDIR_TRIM) {
513					io_u->ddir = DDIR_READ;
514					io_u->flags |= IO_U_F_TRIMMED;
515					break;
516				} else if (io_u->ddir == DDIR_WRITE) {
517					io_u->ddir = DDIR_READ;
518					break;
519				} else {
520					put_io_u(td, io_u);
521					continue;
522				}
523			}
524
525			if (!io_u)
526				break;
527		}
528
529		if (verify_state_should_stop(td, io_u)) {
530			put_io_u(td, io_u);
531			break;
532		}
533
534		if (td->o.verify_async)
535			io_u->end_io = verify_io_u_async;
536		else
537			io_u->end_io = verify_io_u;
538
539		ddir = io_u->ddir;
540
541		ret = td_io_queue(td, io_u);
542		switch (ret) {
543		case FIO_Q_COMPLETED:
544			if (io_u->error) {
545				ret = -io_u->error;
546				clear_io_u(td, io_u);
547			} else if (io_u->resid) {
548				int bytes = io_u->xfer_buflen - io_u->resid;
549
550				/*
551				 * zero read, fail
552				 */
553				if (!bytes) {
554					td_verror(td, EIO, "full resid");
555					put_io_u(td, io_u);
556					break;
557				}
558
559				io_u->xfer_buflen = io_u->resid;
560				io_u->xfer_buf += bytes;
561				io_u->offset += bytes;
562
563				if (ddir_rw(io_u->ddir))
564					td->ts.short_io_u[io_u->ddir]++;
565
566				f = io_u->file;
567				if (io_u->offset == f->real_file_size)
568					goto sync_done;
569
570				requeue_io_u(td, &io_u);
571			} else {
572sync_done:
573				ret = io_u_sync_complete(td, io_u, bytes_done);
574				if (ret < 0)
575					break;
576			}
577			continue;
578		case FIO_Q_QUEUED:
579			break;
580		case FIO_Q_BUSY:
581			requeue_io_u(td, &io_u);
582			ret2 = td_io_commit(td);
583			if (ret2 < 0)
584				ret = ret2;
585			break;
586		default:
587			assert(ret < 0);
588			td_verror(td, -ret, "td_io_queue");
589			break;
590		}
591
592		if (break_on_this_error(td, ddir, &ret))
593			break;
594
595		/*
596		 * if we can queue more, do so. but check if there are
597		 * completed io_u's first. Note that we can get BUSY even
598		 * without IO queued, if the system is resource starved.
599		 */
600reap:
601		full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth);
602		if (full || !td->o.iodepth_batch_complete) {
603			min_events = min(td->o.iodepth_batch_complete,
604					 td->cur_depth);
605			/*
606			 * if the queue is full, we MUST reap at least 1 event
607			 */
608			if (full && !min_events)
609				min_events = 1;
610
611			do {
612				/*
613				 * Reap required number of io units, if any,
614				 * and do the verification on them through
615				 * the callback handler
616				 */
617				if (io_u_queued_complete(td, min_events, bytes_done) < 0) {
618					ret = -1;
619					break;
620				}
621			} while (full && (td->cur_depth > td->o.iodepth_low));
622		}
623		if (ret < 0)
624			break;
625	}
626
627	check_update_rusage(td);
628
629	if (!td->error) {
630		min_events = td->cur_depth;
631
632		if (min_events)
633			ret = io_u_queued_complete(td, min_events, NULL);
634	} else
635		cleanup_pending_aio(td);
636
637	td_set_runstate(td, TD_RUNNING);
638
639	dprint(FD_VERIFY, "exiting loop\n");
640}
641
642static unsigned int exceeds_number_ios(struct thread_data *td)
643{
644	unsigned long long number_ios;
645
646	if (!td->o.number_ios)
647		return 0;
648
649	number_ios = ddir_rw_sum(td->this_io_blocks);
650	number_ios += td->io_u_queued + td->io_u_in_flight;
651
652	return number_ios >= td->o.number_ios;
653}
654
655static int io_bytes_exceeded(struct thread_data *td)
656{
657	unsigned long long bytes, limit;
658
659	if (td_rw(td))
660		bytes = td->this_io_bytes[DDIR_READ] + td->this_io_bytes[DDIR_WRITE];
661	else if (td_write(td))
662		bytes = td->this_io_bytes[DDIR_WRITE];
663	else if (td_read(td))
664		bytes = td->this_io_bytes[DDIR_READ];
665	else
666		bytes = td->this_io_bytes[DDIR_TRIM];
667
668	if (td->o.io_limit)
669		limit = td->o.io_limit;
670	else
671		limit = td->o.size;
672
673	return bytes >= limit || exceeds_number_ios(td);
674}
675
676/*
677 * Main IO worker function. It retrieves io_u's to process and queues
678 * and reaps them, checking for rate and errors along the way.
679 *
680 * Returns number of bytes written and trimmed.
681 */
682static uint64_t do_io(struct thread_data *td)
683{
684	uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
685	unsigned int i;
686	int ret = 0;
687	uint64_t total_bytes, bytes_issued = 0;
688
689	if (in_ramp_time(td))
690		td_set_runstate(td, TD_RAMP);
691	else
692		td_set_runstate(td, TD_RUNNING);
693
694	lat_target_init(td);
695
696	/*
697	 * If verify_backlog is enabled, we'll run the verify in this
698	 * handler as well. For that case, we may need up to twice the
699	 * amount of bytes.
700	 */
701	total_bytes = td->o.size;
702	if (td->o.verify != VERIFY_NONE &&
703	   (td_write(td) && td->o.verify_backlog))
704		total_bytes += td->o.size;
705
706	while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) ||
707		(!flist_empty(&td->trim_list)) || !io_bytes_exceeded(td) ||
708		td->o.time_based) {
709		struct timeval comp_time;
710		int min_evts = 0;
711		struct io_u *io_u;
712		int ret2, full;
713		enum fio_ddir ddir;
714
715		check_update_rusage(td);
716
717		if (td->terminate || td->done)
718			break;
719
720		update_tv_cache(td);
721
722		if (runtime_exceeded(td, &td->tv_cache)) {
723			__update_tv_cache(td);
724			if (runtime_exceeded(td, &td->tv_cache)) {
725				fio_mark_td_terminate(td);
726				break;
727			}
728		}
729
730		if (flow_threshold_exceeded(td))
731			continue;
732
733		if (bytes_issued >= total_bytes)
734			break;
735
736		io_u = get_io_u(td);
737		if (IS_ERR_OR_NULL(io_u)) {
738			int err = PTR_ERR(io_u);
739
740			io_u = NULL;
741			if (err == -EBUSY) {
742				ret = FIO_Q_BUSY;
743				goto reap;
744			}
745			if (td->o.latency_target)
746				goto reap;
747			break;
748		}
749
750		ddir = io_u->ddir;
751
752		/*
753		 * Add verification end_io handler if:
754		 *	- Asked to verify (!td_rw(td))
755		 *	- Or the io_u is from our verify list (mixed write/ver)
756		 */
757		if (td->o.verify != VERIFY_NONE && io_u->ddir == DDIR_READ &&
758		    ((io_u->flags & IO_U_F_VER_LIST) || !td_rw(td))) {
759
760			if (!td->o.verify_pattern_bytes) {
761				io_u->rand_seed = __rand(&td->verify_state);
762				if (sizeof(int) != sizeof(long *))
763					io_u->rand_seed *= __rand(&td->verify_state);
764			}
765
766			if (verify_state_should_stop(td, io_u)) {
767				put_io_u(td, io_u);
768				break;
769			}
770
771			if (td->o.verify_async)
772				io_u->end_io = verify_io_u_async;
773			else
774				io_u->end_io = verify_io_u;
775			td_set_runstate(td, TD_VERIFYING);
776		} else if (in_ramp_time(td))
777			td_set_runstate(td, TD_RAMP);
778		else
779			td_set_runstate(td, TD_RUNNING);
780
781		/*
782		 * Always log IO before it's issued, so we know the specific
783		 * order of it. The logged unit will track when the IO has
784		 * completed.
785		 */
786		if (td_write(td) && io_u->ddir == DDIR_WRITE &&
787		    td->o.do_verify &&
788		    td->o.verify != VERIFY_NONE &&
789		    !td->o.experimental_verify)
790			log_io_piece(td, io_u);
791
792		ret = td_io_queue(td, io_u);
793		switch (ret) {
794		case FIO_Q_COMPLETED:
795			if (io_u->error) {
796				ret = -io_u->error;
797				unlog_io_piece(td, io_u);
798				clear_io_u(td, io_u);
799			} else if (io_u->resid) {
800				int bytes = io_u->xfer_buflen - io_u->resid;
801				struct fio_file *f = io_u->file;
802
803				bytes_issued += bytes;
804
805				trim_io_piece(td, io_u);
806
807				/*
808				 * zero read, fail
809				 */
810				if (!bytes) {
811					unlog_io_piece(td, io_u);
812					td_verror(td, EIO, "full resid");
813					put_io_u(td, io_u);
814					break;
815				}
816
817				io_u->xfer_buflen = io_u->resid;
818				io_u->xfer_buf += bytes;
819				io_u->offset += bytes;
820
821				if (ddir_rw(io_u->ddir))
822					td->ts.short_io_u[io_u->ddir]++;
823
824				if (io_u->offset == f->real_file_size)
825					goto sync_done;
826
827				requeue_io_u(td, &io_u);
828			} else {
829sync_done:
830				if (__should_check_rate(td, DDIR_READ) ||
831				    __should_check_rate(td, DDIR_WRITE) ||
832				    __should_check_rate(td, DDIR_TRIM))
833					fio_gettime(&comp_time, NULL);
834
835				ret = io_u_sync_complete(td, io_u, bytes_done);
836				if (ret < 0)
837					break;
838				bytes_issued += io_u->xfer_buflen;
839			}
840			break;
841		case FIO_Q_QUEUED:
842			/*
843			 * if the engine doesn't have a commit hook,
844			 * the io_u is really queued. if it does have such
845			 * a hook, it has to call io_u_queued() itself.
846			 */
847			if (td->io_ops->commit == NULL)
848				io_u_queued(td, io_u);
849			bytes_issued += io_u->xfer_buflen;
850			break;
851		case FIO_Q_BUSY:
852			unlog_io_piece(td, io_u);
853			requeue_io_u(td, &io_u);
854			ret2 = td_io_commit(td);
855			if (ret2 < 0)
856				ret = ret2;
857			break;
858		default:
859			assert(ret < 0);
860			put_io_u(td, io_u);
861			break;
862		}
863
864		if (break_on_this_error(td, ddir, &ret))
865			break;
866
867		/*
868		 * See if we need to complete some commands. Note that we
869		 * can get BUSY even without IO queued, if the system is
870		 * resource starved.
871		 */
872reap:
873		full = queue_full(td) || (ret == FIO_Q_BUSY && td->cur_depth);
874		if (full || !td->o.iodepth_batch_complete) {
875			min_evts = min(td->o.iodepth_batch_complete,
876					td->cur_depth);
877			/*
878			 * if the queue is full, we MUST reap at least 1 event
879			 */
880			if (full && !min_evts)
881				min_evts = 1;
882
883			if (__should_check_rate(td, DDIR_READ) ||
884			    __should_check_rate(td, DDIR_WRITE) ||
885			    __should_check_rate(td, DDIR_TRIM))
886				fio_gettime(&comp_time, NULL);
887
888			do {
889				ret = io_u_queued_complete(td, min_evts, bytes_done);
890				if (ret < 0)
891					break;
892
893			} while (full && (td->cur_depth > td->o.iodepth_low));
894		}
895
896		if (ret < 0)
897			break;
898		if (!ddir_rw_sum(bytes_done) && !(td->io_ops->flags & FIO_NOIO))
899			continue;
900
901		if (!in_ramp_time(td) && should_check_rate(td, bytes_done)) {
902			if (check_min_rate(td, &comp_time, bytes_done)) {
903				if (exitall_on_terminate)
904					fio_terminate_threads(td->groupid);
905				td_verror(td, EIO, "check_min_rate");
906				break;
907			}
908		}
909		if (!in_ramp_time(td) && td->o.latency_target)
910			lat_target_check(td);
911
912		if (td->o.thinktime) {
913			unsigned long long b;
914
915			b = ddir_rw_sum(td->io_blocks);
916			if (!(b % td->o.thinktime_blocks)) {
917				int left;
918
919				io_u_quiesce(td);
920
921				if (td->o.thinktime_spin)
922					usec_spin(td->o.thinktime_spin);
923
924				left = td->o.thinktime - td->o.thinktime_spin;
925				if (left)
926					usec_sleep(td, left);
927			}
928		}
929	}
930
931	check_update_rusage(td);
932
933	if (td->trim_entries)
934		log_err("fio: %lu trim entries leaked?\n", td->trim_entries);
935
936	if (td->o.fill_device && td->error == ENOSPC) {
937		td->error = 0;
938		fio_mark_td_terminate(td);
939	}
940	if (!td->error) {
941		struct fio_file *f;
942
943		i = td->cur_depth;
944		if (i) {
945			ret = io_u_queued_complete(td, i, bytes_done);
946			if (td->o.fill_device && td->error == ENOSPC)
947				td->error = 0;
948		}
949
950		if (should_fsync(td) && td->o.end_fsync) {
951			td_set_runstate(td, TD_FSYNCING);
952
953			for_each_file(td, f, i) {
954				if (!fio_file_fsync(td, f))
955					continue;
956
957				log_err("fio: end_fsync failed for file %s\n",
958								f->file_name);
959			}
960		}
961	} else
962		cleanup_pending_aio(td);
963
964	/*
965	 * stop job if we failed doing any IO
966	 */
967	if (!ddir_rw_sum(td->this_io_bytes))
968		td->done = 1;
969
970	return bytes_done[DDIR_WRITE] + bytes_done[DDIR_TRIM];
971}
972
973static void cleanup_io_u(struct thread_data *td)
974{
975	struct io_u *io_u;
976
977	while ((io_u = io_u_qpop(&td->io_u_freelist)) != NULL) {
978
979		if (td->io_ops->io_u_free)
980			td->io_ops->io_u_free(td, io_u);
981
982		fio_memfree(io_u, sizeof(*io_u));
983	}
984
985	free_io_mem(td);
986
987	io_u_rexit(&td->io_u_requeues);
988	io_u_qexit(&td->io_u_freelist);
989	io_u_qexit(&td->io_u_all);
990
991	if (td->last_write_comp)
992		sfree(td->last_write_comp);
993}
994
995static int init_io_u(struct thread_data *td)
996{
997	struct io_u *io_u;
998	unsigned int max_bs, min_write;
999	int cl_align, i, max_units;
1000	int data_xfer = 1, err;
1001	char *p;
1002
1003	max_units = td->o.iodepth;
1004	max_bs = td_max_bs(td);
1005	min_write = td->o.min_bs[DDIR_WRITE];
1006	td->orig_buffer_size = (unsigned long long) max_bs
1007					* (unsigned long long) max_units;
1008
1009	if ((td->io_ops->flags & FIO_NOIO) || !(td_read(td) || td_write(td)))
1010		data_xfer = 0;
1011
1012	err = 0;
1013	err += io_u_rinit(&td->io_u_requeues, td->o.iodepth);
1014	err += io_u_qinit(&td->io_u_freelist, td->o.iodepth);
1015	err += io_u_qinit(&td->io_u_all, td->o.iodepth);
1016
1017	if (err) {
1018		log_err("fio: failed setting up IO queues\n");
1019		return 1;
1020	}
1021
1022	/*
1023	 * if we may later need to do address alignment, then add any
1024	 * possible adjustment here so that we don't cause a buffer
1025	 * overflow later. this adjustment may be too much if we get
1026	 * lucky and the allocator gives us an aligned address.
1027	 */
1028	if (td->o.odirect || td->o.mem_align || td->o.oatomic ||
1029	    (td->io_ops->flags & FIO_RAWIO))
1030		td->orig_buffer_size += page_mask + td->o.mem_align;
1031
1032	if (td->o.mem_type == MEM_SHMHUGE || td->o.mem_type == MEM_MMAPHUGE) {
1033		unsigned long bs;
1034
1035		bs = td->orig_buffer_size + td->o.hugepage_size - 1;
1036		td->orig_buffer_size = bs & ~(td->o.hugepage_size - 1);
1037	}
1038
1039	if (td->orig_buffer_size != (size_t) td->orig_buffer_size) {
1040		log_err("fio: IO memory too large. Reduce max_bs or iodepth\n");
1041		return 1;
1042	}
1043
1044	if (data_xfer && allocate_io_mem(td))
1045		return 1;
1046
1047	if (td->o.odirect || td->o.mem_align || td->o.oatomic ||
1048	    (td->io_ops->flags & FIO_RAWIO))
1049		p = PAGE_ALIGN(td->orig_buffer) + td->o.mem_align;
1050	else
1051		p = td->orig_buffer;
1052
1053	cl_align = os_cache_line_size();
1054
1055	for (i = 0; i < max_units; i++) {
1056		void *ptr;
1057
1058		if (td->terminate)
1059			return 1;
1060
1061		ptr = fio_memalign(cl_align, sizeof(*io_u));
1062		if (!ptr) {
1063			log_err("fio: unable to allocate aligned memory\n");
1064			break;
1065		}
1066
1067		io_u = ptr;
1068		memset(io_u, 0, sizeof(*io_u));
1069		INIT_FLIST_HEAD(&io_u->verify_list);
1070		dprint(FD_MEM, "io_u alloc %p, index %u\n", io_u, i);
1071
1072		if (data_xfer) {
1073			io_u->buf = p;
1074			dprint(FD_MEM, "io_u %p, mem %p\n", io_u, io_u->buf);
1075
1076			if (td_write(td))
1077				io_u_fill_buffer(td, io_u, min_write, max_bs);
1078			if (td_write(td) && td->o.verify_pattern_bytes) {
1079				/*
1080				 * Fill the buffer with the pattern if we are
1081				 * going to be doing writes.
1082				 */
1083				fill_verify_pattern(td, io_u->buf, max_bs, io_u, 0, 0);
1084			}
1085		}
1086
1087		io_u->index = i;
1088		io_u->flags = IO_U_F_FREE;
1089		io_u_qpush(&td->io_u_freelist, io_u);
1090
1091		/*
1092		 * io_u never leaves this stack, used for iteration of all
1093		 * io_u buffers.
1094		 */
1095		io_u_qpush(&td->io_u_all, io_u);
1096
1097		if (td->io_ops->io_u_init) {
1098			int ret = td->io_ops->io_u_init(td, io_u);
1099
1100			if (ret) {
1101				log_err("fio: failed to init engine data: %d\n", ret);
1102				return 1;
1103			}
1104		}
1105
1106		p += max_bs;
1107	}
1108
1109	if (td->o.verify != VERIFY_NONE) {
1110		td->last_write_comp = scalloc(max_units, sizeof(uint64_t));
1111		if (!td->last_write_comp) {
1112			log_err("fio: failed to alloc write comp data\n");
1113			return 1;
1114		}
1115	}
1116
1117	return 0;
1118}
1119
1120static int switch_ioscheduler(struct thread_data *td)
1121{
1122	char tmp[256], tmp2[128];
1123	FILE *f;
1124	int ret;
1125
1126	if (td->io_ops->flags & FIO_DISKLESSIO)
1127		return 0;
1128
1129	sprintf(tmp, "%s/queue/scheduler", td->sysfs_root);
1130
1131	f = fopen(tmp, "r+");
1132	if (!f) {
1133		if (errno == ENOENT) {
1134			log_err("fio: os or kernel doesn't support IO scheduler"
1135				" switching\n");
1136			return 0;
1137		}
1138		td_verror(td, errno, "fopen iosched");
1139		return 1;
1140	}
1141
1142	/*
1143	 * Set io scheduler.
1144	 */
1145	ret = fwrite(td->o.ioscheduler, strlen(td->o.ioscheduler), 1, f);
1146	if (ferror(f) || ret != 1) {
1147		td_verror(td, errno, "fwrite");
1148		fclose(f);
1149		return 1;
1150	}
1151
1152	rewind(f);
1153
1154	/*
1155	 * Read back and check that the selected scheduler is now the default.
1156	 */
1157	ret = fread(tmp, sizeof(tmp), 1, f);
1158	if (ferror(f) || ret < 0) {
1159		td_verror(td, errno, "fread");
1160		fclose(f);
1161		return 1;
1162	}
1163	tmp[sizeof(tmp) - 1] = '\0';
1164
1165
1166	sprintf(tmp2, "[%s]", td->o.ioscheduler);
1167	if (!strstr(tmp, tmp2)) {
1168		log_err("fio: io scheduler %s not found\n", td->o.ioscheduler);
1169		td_verror(td, EINVAL, "iosched_switch");
1170		fclose(f);
1171		return 1;
1172	}
1173
1174	fclose(f);
1175	return 0;
1176}
1177
1178static int keep_running(struct thread_data *td)
1179{
1180	unsigned long long limit;
1181
1182	if (td->done)
1183		return 0;
1184	if (td->o.time_based)
1185		return 1;
1186	if (td->o.loops) {
1187		td->o.loops--;
1188		return 1;
1189	}
1190	if (exceeds_number_ios(td))
1191		return 0;
1192
1193	if (td->o.io_limit)
1194		limit = td->o.io_limit;
1195	else
1196		limit = td->o.size;
1197
1198	if (limit != -1ULL && ddir_rw_sum(td->io_bytes) < limit) {
1199		uint64_t diff;
1200
1201		/*
1202		 * If the difference is less than the minimum IO size, we
1203		 * are done.
1204		 */
1205		diff = limit - ddir_rw_sum(td->io_bytes);
1206		if (diff < td_max_bs(td))
1207			return 0;
1208
1209		if (fio_files_done(td))
1210			return 0;
1211
1212		return 1;
1213	}
1214
1215	return 0;
1216}
1217
1218static int exec_string(struct thread_options *o, const char *string, const char *mode)
1219{
1220	int ret, newlen = strlen(string) + strlen(o->name) + strlen(mode) + 9 + 1;
1221	char *str;
1222
1223	str = malloc(newlen);
1224	sprintf(str, "%s &> %s.%s.txt", string, o->name, mode);
1225
1226	log_info("%s : Saving output of %s in %s.%s.txt\n",o->name, mode, o->name, mode);
1227	ret = system(str);
1228	if (ret == -1)
1229		log_err("fio: exec of cmd <%s> failed\n", str);
1230
1231	free(str);
1232	return ret;
1233}
1234
1235/*
1236 * Dry run to compute correct state of numberio for verification.
1237 */
1238static uint64_t do_dry_run(struct thread_data *td)
1239{
1240	uint64_t bytes_done[DDIR_RWDIR_CNT] = { 0, 0, 0 };
1241
1242	td_set_runstate(td, TD_RUNNING);
1243
1244	while ((td->o.read_iolog_file && !flist_empty(&td->io_log_list)) ||
1245		(!flist_empty(&td->trim_list)) || !io_bytes_exceeded(td)) {
1246		struct io_u *io_u;
1247		int ret;
1248
1249		if (td->terminate || td->done)
1250			break;
1251
1252		io_u = get_io_u(td);
1253		if (!io_u)
1254			break;
1255
1256		io_u->flags |= IO_U_F_FLIGHT;
1257		io_u->error = 0;
1258		io_u->resid = 0;
1259		if (ddir_rw(acct_ddir(io_u)))
1260			td->io_issues[acct_ddir(io_u)]++;
1261		if (ddir_rw(io_u->ddir)) {
1262			io_u_mark_depth(td, 1);
1263			td->ts.total_io_u[io_u->ddir]++;
1264		}
1265
1266		if (td_write(td) && io_u->ddir == DDIR_WRITE &&
1267		    td->o.do_verify &&
1268		    td->o.verify != VERIFY_NONE &&
1269		    !td->o.experimental_verify)
1270			log_io_piece(td, io_u);
1271
1272		ret = io_u_sync_complete(td, io_u, bytes_done);
1273		(void) ret;
1274	}
1275
1276	return bytes_done[DDIR_WRITE] + bytes_done[DDIR_TRIM];
1277}
1278
1279/*
1280 * Entry point for the thread based jobs. The process based jobs end up
1281 * here as well, after a little setup.
1282 */
1283static void *thread_main(void *data)
1284{
1285	unsigned long long elapsed;
1286	struct thread_data *td = data;
1287	struct thread_options *o = &td->o;
1288	pthread_condattr_t attr;
1289	int clear_state;
1290	int ret;
1291
1292	if (!o->use_thread) {
1293		setsid();
1294		td->pid = getpid();
1295	} else
1296		td->pid = gettid();
1297
1298	fio_local_clock_init(o->use_thread);
1299
1300	dprint(FD_PROCESS, "jobs pid=%d started\n", (int) td->pid);
1301
1302	if (is_backend)
1303		fio_server_send_start(td);
1304
1305	INIT_FLIST_HEAD(&td->io_log_list);
1306	INIT_FLIST_HEAD(&td->io_hist_list);
1307	INIT_FLIST_HEAD(&td->verify_list);
1308	INIT_FLIST_HEAD(&td->trim_list);
1309	INIT_FLIST_HEAD(&td->next_rand_list);
1310	pthread_mutex_init(&td->io_u_lock, NULL);
1311	td->io_hist_tree = RB_ROOT;
1312
1313	pthread_condattr_init(&attr);
1314	pthread_cond_init(&td->verify_cond, &attr);
1315	pthread_cond_init(&td->free_cond, &attr);
1316
1317	td_set_runstate(td, TD_INITIALIZED);
1318	dprint(FD_MUTEX, "up startup_mutex\n");
1319	fio_mutex_up(startup_mutex);
1320	dprint(FD_MUTEX, "wait on td->mutex\n");
1321	fio_mutex_down(td->mutex);
1322	dprint(FD_MUTEX, "done waiting on td->mutex\n");
1323
1324	/*
1325	 * A new gid requires privilege, so we need to do this before setting
1326	 * the uid.
1327	 */
1328	if (o->gid != -1U && setgid(o->gid)) {
1329		td_verror(td, errno, "setgid");
1330		goto err;
1331	}
1332	if (o->uid != -1U && setuid(o->uid)) {
1333		td_verror(td, errno, "setuid");
1334		goto err;
1335	}
1336
1337	/*
1338	 * If we have a gettimeofday() thread, make sure we exclude that
1339	 * thread from this job
1340	 */
1341	if (o->gtod_cpu)
1342		fio_cpu_clear(&o->cpumask, o->gtod_cpu);
1343
1344	/*
1345	 * Set affinity first, in case it has an impact on the memory
1346	 * allocations.
1347	 */
1348	if (o->cpumask_set) {
1349		if (o->cpus_allowed_policy == FIO_CPUS_SPLIT) {
1350			ret = fio_cpus_split(&o->cpumask, td->thread_number - 1);
1351			if (!ret) {
1352				log_err("fio: no CPUs set\n");
1353				log_err("fio: Try increasing number of available CPUs\n");
1354				td_verror(td, EINVAL, "cpus_split");
1355				goto err;
1356			}
1357		}
1358		ret = fio_setaffinity(td->pid, o->cpumask);
1359		if (ret == -1) {
1360			td_verror(td, errno, "cpu_set_affinity");
1361			goto err;
1362		}
1363	}
1364
1365#ifdef CONFIG_LIBNUMA
1366	/* numa node setup */
1367	if (o->numa_cpumask_set || o->numa_memmask_set) {
1368		struct bitmask *mask;
1369
1370		if (numa_available() < 0) {
1371			td_verror(td, errno, "Does not support NUMA API\n");
1372			goto err;
1373		}
1374
1375		if (o->numa_cpumask_set) {
1376			mask = numa_parse_nodestring(o->numa_cpunodes);
1377			ret = numa_run_on_node_mask(mask);
1378			numa_free_nodemask(mask);
1379			if (ret == -1) {
1380				td_verror(td, errno, \
1381					"numa_run_on_node_mask failed\n");
1382				goto err;
1383			}
1384		}
1385
1386		if (o->numa_memmask_set) {
1387
1388			mask = NULL;
1389			if (o->numa_memnodes)
1390				mask = numa_parse_nodestring(o->numa_memnodes);
1391
1392			switch (o->numa_mem_mode) {
1393			case MPOL_INTERLEAVE:
1394				numa_set_interleave_mask(mask);
1395				break;
1396			case MPOL_BIND:
1397				numa_set_membind(mask);
1398				break;
1399			case MPOL_LOCAL:
1400				numa_set_localalloc();
1401				break;
1402			case MPOL_PREFERRED:
1403				numa_set_preferred(o->numa_mem_prefer_node);
1404				break;
1405			case MPOL_DEFAULT:
1406			default:
1407				break;
1408			}
1409
1410			if (mask)
1411				numa_free_nodemask(mask);
1412
1413		}
1414	}
1415#endif
1416
1417	if (fio_pin_memory(td))
1418		goto err;
1419
1420	/*
1421	 * May alter parameters that init_io_u() will use, so we need to
1422	 * do this first.
1423	 */
1424	if (init_iolog(td))
1425		goto err;
1426
1427	if (init_io_u(td))
1428		goto err;
1429
1430	if (o->verify_async && verify_async_init(td))
1431		goto err;
1432
1433	if (o->ioprio) {
1434		ret = ioprio_set(IOPRIO_WHO_PROCESS, 0, o->ioprio_class, o->ioprio);
1435		if (ret == -1) {
1436			td_verror(td, errno, "ioprio_set");
1437			goto err;
1438		}
1439	}
1440
1441	if (o->cgroup && cgroup_setup(td, cgroup_list, &cgroup_mnt))
1442		goto err;
1443
1444	errno = 0;
1445	if (nice(o->nice) == -1 && errno != 0) {
1446		td_verror(td, errno, "nice");
1447		goto err;
1448	}
1449
1450	if (o->ioscheduler && switch_ioscheduler(td))
1451		goto err;
1452
1453	if (!o->create_serialize && setup_files(td))
1454		goto err;
1455
1456	if (td_io_init(td))
1457		goto err;
1458
1459	if (init_random_map(td))
1460		goto err;
1461
1462	if (o->exec_prerun && exec_string(o, o->exec_prerun, (const char *)"prerun"))
1463		goto err;
1464
1465	if (o->pre_read) {
1466		if (pre_read_files(td) < 0)
1467			goto err;
1468	}
1469
1470	if (td->flags & TD_F_COMPRESS_LOG)
1471		tp_init(&td->tp_data);
1472
1473	fio_verify_init(td);
1474
1475	fio_gettime(&td->epoch, NULL);
1476	fio_getrusage(&td->ru_start);
1477	clear_state = 0;
1478	while (keep_running(td)) {
1479		uint64_t verify_bytes;
1480
1481		fio_gettime(&td->start, NULL);
1482		memcpy(&td->bw_sample_time, &td->start, sizeof(td->start));
1483		memcpy(&td->iops_sample_time, &td->start, sizeof(td->start));
1484		memcpy(&td->tv_cache, &td->start, sizeof(td->start));
1485
1486		if (o->ratemin[DDIR_READ] || o->ratemin[DDIR_WRITE] ||
1487				o->ratemin[DDIR_TRIM]) {
1488		        memcpy(&td->lastrate[DDIR_READ], &td->bw_sample_time,
1489						sizeof(td->bw_sample_time));
1490		        memcpy(&td->lastrate[DDIR_WRITE], &td->bw_sample_time,
1491						sizeof(td->bw_sample_time));
1492		        memcpy(&td->lastrate[DDIR_TRIM], &td->bw_sample_time,
1493						sizeof(td->bw_sample_time));
1494		}
1495
1496		if (clear_state)
1497			clear_io_state(td);
1498
1499		prune_io_piece_log(td);
1500
1501		if (td->o.verify_only && (td_write(td) || td_rw(td)))
1502			verify_bytes = do_dry_run(td);
1503		else
1504			verify_bytes = do_io(td);
1505
1506		clear_state = 1;
1507
1508		fio_mutex_down(stat_mutex);
1509		if (td_read(td) && td->io_bytes[DDIR_READ]) {
1510			elapsed = mtime_since_now(&td->start);
1511			td->ts.runtime[DDIR_READ] += elapsed;
1512		}
1513		if (td_write(td) && td->io_bytes[DDIR_WRITE]) {
1514			elapsed = mtime_since_now(&td->start);
1515			td->ts.runtime[DDIR_WRITE] += elapsed;
1516		}
1517		if (td_trim(td) && td->io_bytes[DDIR_TRIM]) {
1518			elapsed = mtime_since_now(&td->start);
1519			td->ts.runtime[DDIR_TRIM] += elapsed;
1520		}
1521		fio_gettime(&td->start, NULL);
1522		fio_mutex_up(stat_mutex);
1523
1524		if (td->error || td->terminate)
1525			break;
1526
1527		if (!o->do_verify ||
1528		    o->verify == VERIFY_NONE ||
1529		    (td->io_ops->flags & FIO_UNIDIR))
1530			continue;
1531
1532		clear_io_state(td);
1533
1534		fio_gettime(&td->start, NULL);
1535
1536		do_verify(td, verify_bytes);
1537
1538		fio_mutex_down(stat_mutex);
1539		td->ts.runtime[DDIR_READ] += mtime_since_now(&td->start);
1540		fio_gettime(&td->start, NULL);
1541		fio_mutex_up(stat_mutex);
1542
1543		if (td->error || td->terminate)
1544			break;
1545	}
1546
1547	update_rusage_stat(td);
1548	td->ts.total_run_time = mtime_since_now(&td->epoch);
1549	td->ts.io_bytes[DDIR_READ] = td->io_bytes[DDIR_READ];
1550	td->ts.io_bytes[DDIR_WRITE] = td->io_bytes[DDIR_WRITE];
1551	td->ts.io_bytes[DDIR_TRIM] = td->io_bytes[DDIR_TRIM];
1552
1553	if (td->o.verify_state_save && !(td->flags & TD_F_VSTATE_SAVED) &&
1554	    (td->o.verify != VERIFY_NONE && td_write(td))) {
1555		struct all_io_list *state;
1556		size_t sz;
1557
1558		state = get_all_io_list(td->thread_number, &sz);
1559		if (state) {
1560			__verify_save_state(state, "local");
1561			free(state);
1562		}
1563	}
1564
1565	fio_unpin_memory(td);
1566
1567	fio_writeout_logs(td);
1568
1569	if (td->flags & TD_F_COMPRESS_LOG)
1570		tp_exit(&td->tp_data);
1571
1572	if (o->exec_postrun)
1573		exec_string(o, o->exec_postrun, (const char *)"postrun");
1574
1575	if (exitall_on_terminate)
1576		fio_terminate_threads(td->groupid);
1577
1578err:
1579	if (td->error)
1580		log_info("fio: pid=%d, err=%d/%s\n", (int) td->pid, td->error,
1581							td->verror);
1582
1583	if (o->verify_async)
1584		verify_async_exit(td);
1585
1586	close_and_free_files(td);
1587	cleanup_io_u(td);
1588	close_ioengine(td);
1589	cgroup_shutdown(td, &cgroup_mnt);
1590	verify_free_state(td);
1591
1592	if (o->cpumask_set) {
1593		ret = fio_cpuset_exit(&o->cpumask);
1594		if (ret)
1595			td_verror(td, ret, "fio_cpuset_exit");
1596	}
1597
1598	/*
1599	 * do this very late, it will log file closing as well
1600	 */
1601	if (o->write_iolog_file)
1602		write_iolog_close(td);
1603
1604	fio_mutex_remove(td->mutex);
1605	td->mutex = NULL;
1606
1607	td_set_runstate(td, TD_EXITED);
1608
1609	/*
1610	 * Do this last after setting our runstate to exited, so we
1611	 * know that the stat thread is signaled.
1612	 */
1613	check_update_rusage(td);
1614
1615	return (void *) (uintptr_t) td->error;
1616}
1617
1618
1619/*
1620 * We cannot pass the td data into a forked process, so attach the td and
1621 * pass it to the thread worker.
1622 */
1623static int fork_main(int shmid, int offset)
1624{
1625	struct thread_data *td;
1626	void *data, *ret;
1627
1628#if !defined(__hpux) && !defined(CONFIG_NO_SHM)
1629	data = shmat(shmid, NULL, 0);
1630	if (data == (void *) -1) {
1631		int __err = errno;
1632
1633		perror("shmat");
1634		return __err;
1635	}
1636#else
1637	/*
1638	 * HP-UX inherits shm mappings?
1639	 */
1640	data = threads;
1641#endif
1642
1643	td = data + offset * sizeof(struct thread_data);
1644	ret = thread_main(td);
1645	shmdt(data);
1646	return (int) (uintptr_t) ret;
1647}
1648
1649static void dump_td_info(struct thread_data *td)
1650{
1651	log_err("fio: job '%s' hasn't exited in %lu seconds, it appears to "
1652		"be stuck. Doing forceful exit of this job.\n", td->o.name,
1653			(unsigned long) time_since_now(&td->terminate_time));
1654}
1655
1656/*
1657 * Run over the job map and reap the threads that have exited, if any.
1658 */
1659static void reap_threads(unsigned int *nr_running, unsigned int *t_rate,
1660			 unsigned int *m_rate)
1661{
1662	struct thread_data *td;
1663	unsigned int cputhreads, realthreads, pending;
1664	int i, status, ret;
1665
1666	/*
1667	 * reap exited threads (TD_EXITED -> TD_REAPED)
1668	 */
1669	realthreads = pending = cputhreads = 0;
1670	for_each_td(td, i) {
1671		int flags = 0;
1672
1673		/*
1674		 * ->io_ops is NULL for a thread that has closed its
1675		 * io engine
1676		 */
1677		if (td->io_ops && !strcmp(td->io_ops->name, "cpuio"))
1678			cputhreads++;
1679		else
1680			realthreads++;
1681
1682		if (!td->pid) {
1683			pending++;
1684			continue;
1685		}
1686		if (td->runstate == TD_REAPED)
1687			continue;
1688		if (td->o.use_thread) {
1689			if (td->runstate == TD_EXITED) {
1690				td_set_runstate(td, TD_REAPED);
1691				goto reaped;
1692			}
1693			continue;
1694		}
1695
1696		flags = WNOHANG;
1697		if (td->runstate == TD_EXITED)
1698			flags = 0;
1699
1700		/*
1701		 * check if someone quit or got killed in an unusual way
1702		 */
1703		ret = waitpid(td->pid, &status, flags);
1704		if (ret < 0) {
1705			if (errno == ECHILD) {
1706				log_err("fio: pid=%d disappeared %d\n",
1707						(int) td->pid, td->runstate);
1708				td->sig = ECHILD;
1709				td_set_runstate(td, TD_REAPED);
1710				goto reaped;
1711			}
1712			perror("waitpid");
1713		} else if (ret == td->pid) {
1714			if (WIFSIGNALED(status)) {
1715				int sig = WTERMSIG(status);
1716
1717				if (sig != SIGTERM && sig != SIGUSR2)
1718					log_err("fio: pid=%d, got signal=%d\n",
1719							(int) td->pid, sig);
1720				td->sig = sig;
1721				td_set_runstate(td, TD_REAPED);
1722				goto reaped;
1723			}
1724			if (WIFEXITED(status)) {
1725				if (WEXITSTATUS(status) && !td->error)
1726					td->error = WEXITSTATUS(status);
1727
1728				td_set_runstate(td, TD_REAPED);
1729				goto reaped;
1730			}
1731		}
1732
1733		/*
1734		 * If the job is stuck, do a forceful timeout of it and
1735		 * move on.
1736		 */
1737		if (td->terminate &&
1738		    time_since_now(&td->terminate_time) >= FIO_REAP_TIMEOUT) {
1739			dump_td_info(td);
1740			td_set_runstate(td, TD_REAPED);
1741			goto reaped;
1742		}
1743
1744		/*
1745		 * thread is not dead, continue
1746		 */
1747		pending++;
1748		continue;
1749reaped:
1750		(*nr_running)--;
1751		(*m_rate) -= ddir_rw_sum(td->o.ratemin);
1752		(*t_rate) -= ddir_rw_sum(td->o.rate);
1753		if (!td->pid)
1754			pending--;
1755
1756		if (td->error)
1757			exit_value++;
1758
1759		done_secs += mtime_since_now(&td->epoch) / 1000;
1760		profile_td_exit(td);
1761	}
1762
1763	if (*nr_running == cputhreads && !pending && realthreads)
1764		fio_terminate_threads(TERMINATE_ALL);
1765}
1766
1767static int __check_trigger_file(void)
1768{
1769	struct stat sb;
1770
1771	if (!trigger_file)
1772		return 0;
1773
1774	if (stat(trigger_file, &sb))
1775		return 0;
1776
1777	if (unlink(trigger_file) < 0)
1778		log_err("fio: failed to unlink %s: %s\n", trigger_file,
1779							strerror(errno));
1780
1781	return 1;
1782}
1783
1784static int trigger_timedout(void)
1785{
1786	if (trigger_timeout)
1787		return time_since_genesis() >= trigger_timeout;
1788
1789	return 0;
1790}
1791
1792void exec_trigger(const char *cmd)
1793{
1794	int ret;
1795
1796	if (!cmd)
1797		return;
1798
1799	ret = system(cmd);
1800	if (ret == -1)
1801		log_err("fio: failed executing %s trigger\n", cmd);
1802}
1803
1804void check_trigger_file(void)
1805{
1806	if (__check_trigger_file() || trigger_timedout()) {
1807		if (nr_clients)
1808			fio_clients_send_trigger(trigger_cmd);
1809		else {
1810			verify_save_state();
1811			fio_terminate_threads(TERMINATE_ALL);
1812			exec_trigger(trigger_cmd);
1813		}
1814	}
1815}
1816
1817static int fio_verify_load_state(struct thread_data *td)
1818{
1819	int ret;
1820
1821	if (!td->o.verify_state)
1822		return 0;
1823
1824	if (is_backend) {
1825		void *data;
1826
1827		ret = fio_server_get_verify_state(td->o.name,
1828					td->thread_number - 1, &data);
1829		if (!ret)
1830			verify_convert_assign_state(td, data);
1831	} else
1832		ret = verify_load_state(td, "local");
1833
1834	return ret;
1835}
1836
1837static void do_usleep(unsigned int usecs)
1838{
1839	check_for_running_stats();
1840	check_trigger_file();
1841	usleep(usecs);
1842}
1843
1844/*
1845 * Main function for kicking off and reaping jobs, as needed.
1846 */
1847static void run_threads(void)
1848{
1849	struct thread_data *td;
1850	unsigned int i, todo, nr_running, m_rate, t_rate, nr_started;
1851	uint64_t spent;
1852
1853	if (fio_gtod_offload && fio_start_gtod_thread())
1854		return;
1855
1856	fio_idle_prof_init();
1857
1858	set_sig_handlers();
1859
1860	nr_thread = nr_process = 0;
1861	for_each_td(td, i) {
1862		if (td->o.use_thread)
1863			nr_thread++;
1864		else
1865			nr_process++;
1866	}
1867
1868	if (output_format == FIO_OUTPUT_NORMAL) {
1869		log_info("Starting ");
1870		if (nr_thread)
1871			log_info("%d thread%s", nr_thread,
1872						nr_thread > 1 ? "s" : "");
1873		if (nr_process) {
1874			if (nr_thread)
1875				log_info(" and ");
1876			log_info("%d process%s", nr_process,
1877						nr_process > 1 ? "es" : "");
1878		}
1879		log_info("\n");
1880		log_info_flush();
1881	}
1882
1883	todo = thread_number;
1884	nr_running = 0;
1885	nr_started = 0;
1886	m_rate = t_rate = 0;
1887
1888	for_each_td(td, i) {
1889		print_status_init(td->thread_number - 1);
1890
1891		if (!td->o.create_serialize)
1892			continue;
1893
1894		if (fio_verify_load_state(td))
1895			goto reap;
1896
1897		/*
1898		 * do file setup here so it happens sequentially,
1899		 * we don't want X number of threads getting their
1900		 * client data interspersed on disk
1901		 */
1902		if (setup_files(td)) {
1903reap:
1904			exit_value++;
1905			if (td->error)
1906				log_err("fio: pid=%d, err=%d/%s\n",
1907					(int) td->pid, td->error, td->verror);
1908			td_set_runstate(td, TD_REAPED);
1909			todo--;
1910		} else {
1911			struct fio_file *f;
1912			unsigned int j;
1913
1914			/*
1915			 * for sharing to work, each job must always open
1916			 * its own files. so close them, if we opened them
1917			 * for creation
1918			 */
1919			for_each_file(td, f, j) {
1920				if (fio_file_open(f))
1921					td_io_close_file(td, f);
1922			}
1923		}
1924	}
1925
1926	/* start idle threads before io threads start to run */
1927	fio_idle_prof_start();
1928
1929	set_genesis_time();
1930
1931	while (todo) {
1932		struct thread_data *map[REAL_MAX_JOBS];
1933		struct timeval this_start;
1934		int this_jobs = 0, left;
1935
1936		/*
1937		 * create threads (TD_NOT_CREATED -> TD_CREATED)
1938		 */
1939		for_each_td(td, i) {
1940			if (td->runstate != TD_NOT_CREATED)
1941				continue;
1942
1943			/*
1944			 * never got a chance to start, killed by other
1945			 * thread for some reason
1946			 */
1947			if (td->terminate) {
1948				todo--;
1949				continue;
1950			}
1951
1952			if (td->o.start_delay) {
1953				spent = utime_since_genesis();
1954
1955				if (td->o.start_delay > spent)
1956					continue;
1957			}
1958
1959			if (td->o.stonewall && (nr_started || nr_running)) {
1960				dprint(FD_PROCESS, "%s: stonewall wait\n",
1961							td->o.name);
1962				break;
1963			}
1964
1965			init_disk_util(td);
1966
1967			td->rusage_sem = fio_mutex_init(FIO_MUTEX_LOCKED);
1968			td->update_rusage = 0;
1969
1970			/*
1971			 * Set state to created. Thread will transition
1972			 * to TD_INITIALIZED when it's done setting up.
1973			 */
1974			td_set_runstate(td, TD_CREATED);
1975			map[this_jobs++] = td;
1976			nr_started++;
1977
1978			if (td->o.use_thread) {
1979				int ret;
1980
1981				dprint(FD_PROCESS, "will pthread_create\n");
1982				ret = pthread_create(&td->thread, NULL,
1983							thread_main, td);
1984				if (ret) {
1985					log_err("pthread_create: %s\n",
1986							strerror(ret));
1987					nr_started--;
1988					break;
1989				}
1990				ret = pthread_detach(td->thread);
1991				if (ret)
1992					log_err("pthread_detach: %s",
1993							strerror(ret));
1994			} else {
1995				pid_t pid;
1996				dprint(FD_PROCESS, "will fork\n");
1997				pid = fork();
1998				if (!pid) {
1999					int ret = fork_main(shm_id, i);
2000
2001					_exit(ret);
2002				} else if (i == fio_debug_jobno)
2003					*fio_debug_jobp = pid;
2004			}
2005			dprint(FD_MUTEX, "wait on startup_mutex\n");
2006			if (fio_mutex_down_timeout(startup_mutex, 10)) {
2007				log_err("fio: job startup hung? exiting.\n");
2008				fio_terminate_threads(TERMINATE_ALL);
2009				fio_abort = 1;
2010				nr_started--;
2011				break;
2012			}
2013			dprint(FD_MUTEX, "done waiting on startup_mutex\n");
2014		}
2015
2016		/*
2017		 * Wait for the started threads to transition to
2018		 * TD_INITIALIZED.
2019		 */
2020		fio_gettime(&this_start, NULL);
2021		left = this_jobs;
2022		while (left && !fio_abort) {
2023			if (mtime_since_now(&this_start) > JOB_START_TIMEOUT)
2024				break;
2025
2026			do_usleep(100000);
2027
2028			for (i = 0; i < this_jobs; i++) {
2029				td = map[i];
2030				if (!td)
2031					continue;
2032				if (td->runstate == TD_INITIALIZED) {
2033					map[i] = NULL;
2034					left--;
2035				} else if (td->runstate >= TD_EXITED) {
2036					map[i] = NULL;
2037					left--;
2038					todo--;
2039					nr_running++; /* work-around... */
2040				}
2041			}
2042		}
2043
2044		if (left) {
2045			log_err("fio: %d job%s failed to start\n", left,
2046					left > 1 ? "s" : "");
2047			for (i = 0; i < this_jobs; i++) {
2048				td = map[i];
2049				if (!td)
2050					continue;
2051				kill(td->pid, SIGTERM);
2052			}
2053			break;
2054		}
2055
2056		/*
2057		 * start created threads (TD_INITIALIZED -> TD_RUNNING).
2058		 */
2059		for_each_td(td, i) {
2060			if (td->runstate != TD_INITIALIZED)
2061				continue;
2062
2063			if (in_ramp_time(td))
2064				td_set_runstate(td, TD_RAMP);
2065			else
2066				td_set_runstate(td, TD_RUNNING);
2067			nr_running++;
2068			nr_started--;
2069			m_rate += ddir_rw_sum(td->o.ratemin);
2070			t_rate += ddir_rw_sum(td->o.rate);
2071			todo--;
2072			fio_mutex_up(td->mutex);
2073		}
2074
2075		reap_threads(&nr_running, &t_rate, &m_rate);
2076
2077		if (todo)
2078			do_usleep(100000);
2079	}
2080
2081	while (nr_running) {
2082		reap_threads(&nr_running, &t_rate, &m_rate);
2083		do_usleep(10000);
2084	}
2085
2086	fio_idle_prof_stop();
2087
2088	update_io_ticks();
2089}
2090
2091static void wait_for_helper_thread_exit(void)
2092{
2093	void *ret;
2094
2095	helper_exit = 1;
2096	pthread_cond_signal(&helper_cond);
2097	pthread_join(helper_thread, &ret);
2098}
2099
2100static void free_disk_util(void)
2101{
2102	disk_util_prune_entries();
2103
2104	pthread_cond_destroy(&helper_cond);
2105}
2106
2107static void *helper_thread_main(void *data)
2108{
2109	int ret = 0;
2110
2111	fio_mutex_up(startup_mutex);
2112
2113	while (!ret) {
2114		uint64_t sec = DISK_UTIL_MSEC / 1000;
2115		uint64_t nsec = (DISK_UTIL_MSEC % 1000) * 1000000;
2116		struct timespec ts;
2117		struct timeval tv;
2118
2119		gettimeofday(&tv, NULL);
2120		ts.tv_sec = tv.tv_sec + sec;
2121		ts.tv_nsec = (tv.tv_usec * 1000) + nsec;
2122		if (ts.tv_nsec > 1000000000ULL) {
2123			ts.tv_nsec -= 1000000000ULL;
2124			ts.tv_sec++;
2125		}
2126
2127		pthread_cond_timedwait(&helper_cond, &helper_lock, &ts);
2128
2129		ret = update_io_ticks();
2130
2131		if (helper_do_stat) {
2132			helper_do_stat = 0;
2133			__show_running_run_stats();
2134		}
2135
2136		if (!is_backend)
2137			print_thread_status();
2138	}
2139
2140	return NULL;
2141}
2142
2143static int create_helper_thread(void)
2144{
2145	int ret;
2146
2147	setup_disk_util();
2148
2149	pthread_cond_init(&helper_cond, NULL);
2150	pthread_mutex_init(&helper_lock, NULL);
2151
2152	ret = pthread_create(&helper_thread, NULL, helper_thread_main, NULL);
2153	if (ret) {
2154		log_err("Can't create helper thread: %s\n", strerror(ret));
2155		return 1;
2156	}
2157
2158	dprint(FD_MUTEX, "wait on startup_mutex\n");
2159	fio_mutex_down(startup_mutex);
2160	dprint(FD_MUTEX, "done waiting on startup_mutex\n");
2161	return 0;
2162}
2163
2164int fio_backend(void)
2165{
2166	struct thread_data *td;
2167	int i;
2168
2169	if (exec_profile) {
2170		if (load_profile(exec_profile))
2171			return 1;
2172		free(exec_profile);
2173		exec_profile = NULL;
2174	}
2175	if (!thread_number)
2176		return 0;
2177
2178	if (write_bw_log) {
2179		struct log_params p = {
2180			.log_type = IO_LOG_TYPE_BW,
2181		};
2182
2183		setup_log(&agg_io_log[DDIR_READ], &p, "agg-read_bw.log");
2184		setup_log(&agg_io_log[DDIR_WRITE], &p, "agg-write_bw.log");
2185		setup_log(&agg_io_log[DDIR_TRIM], &p, "agg-trim_bw.log");
2186	}
2187
2188	startup_mutex = fio_mutex_init(FIO_MUTEX_LOCKED);
2189	if (startup_mutex == NULL)
2190		return 1;
2191
2192	set_genesis_time();
2193	stat_init();
2194	create_helper_thread();
2195
2196	cgroup_list = smalloc(sizeof(*cgroup_list));
2197	INIT_FLIST_HEAD(cgroup_list);
2198
2199	run_threads();
2200
2201	wait_for_helper_thread_exit();
2202
2203	if (!fio_abort) {
2204		__show_run_stats();
2205		if (write_bw_log) {
2206			for (i = 0; i < DDIR_RWDIR_CNT; i++) {
2207				struct io_log *log = agg_io_log[i];
2208
2209				flush_log(log);
2210				free_log(log);
2211			}
2212		}
2213	}
2214
2215	for_each_td(td, i) {
2216		fio_options_free(td);
2217		if (td->rusage_sem) {
2218			fio_mutex_remove(td->rusage_sem);
2219			td->rusage_sem = NULL;
2220		}
2221	}
2222
2223	free_disk_util();
2224	cgroup_kill(cgroup_list);
2225	sfree(cgroup_list);
2226	sfree(cgroup_mnt);
2227
2228	fio_mutex_remove(startup_mutex);
2229	stat_exit();
2230	return exit_value;
2231}
2232