1#include <stdlib.h>
2
3#include "fio.h"
4#include "steadystate.h"
5#include "helper_thread.h"
6
7bool steadystate_enabled = false;
8
9static void steadystate_alloc(struct thread_data *td)
10{
11	td->ss.bw_data = calloc(td->ss.dur, sizeof(uint64_t));
12	td->ss.iops_data = calloc(td->ss.dur, sizeof(uint64_t));
13
14	td->ss.state |= __FIO_SS_DATA;
15}
16
17void steadystate_setup(void)
18{
19	int i, prev_groupid;
20	struct thread_data *td, *prev_td;
21
22	if (!steadystate_enabled)
23		return;
24
25	/*
26	 * if group reporting is enabled, identify the last td
27	 * for each group and use it for storing steady state
28	 * data
29	 */
30	prev_groupid = -1;
31	prev_td = NULL;
32	for_each_td(td, i) {
33		if (!td->ss.dur)
34			continue;
35
36		if (!td->o.group_reporting) {
37			steadystate_alloc(td);
38			continue;
39		}
40
41		if (prev_groupid != td->groupid) {
42			if (prev_td != NULL) {
43				steadystate_alloc(prev_td);
44			}
45			prev_groupid = td->groupid;
46		}
47		prev_td = td;
48	}
49
50	if (prev_td != NULL && prev_td->o.group_reporting) {
51		steadystate_alloc(prev_td);
52	}
53}
54
55static bool steadystate_slope(uint64_t iops, uint64_t bw,
56			      struct thread_data *td)
57{
58	int i, j;
59	double result;
60	struct steadystate_data *ss = &td->ss;
61	uint64_t new_val;
62
63	ss->bw_data[ss->tail] = bw;
64	ss->iops_data[ss->tail] = iops;
65
66	if (ss->state & __FIO_SS_IOPS)
67		new_val = iops;
68	else
69		new_val = bw;
70
71	if (ss->state & __FIO_SS_BUFFER_FULL || ss->tail - ss->head == ss->dur - 1) {
72		if (!(ss->state & __FIO_SS_BUFFER_FULL)) {
73			/* first time through */
74			for(i = 0, ss->sum_y = 0; i < ss->dur; i++) {
75				if (ss->state & __FIO_SS_IOPS)
76					ss->sum_y += ss->iops_data[i];
77				else
78					ss->sum_y += ss->bw_data[i];
79				j = (ss->head + i) % ss->dur;
80				if (ss->state & __FIO_SS_IOPS)
81					ss->sum_xy += i * ss->iops_data[j];
82				else
83					ss->sum_xy += i * ss->bw_data[j];
84			}
85			ss->state |= __FIO_SS_BUFFER_FULL;
86		} else {		/* easy to update the sums */
87			ss->sum_y -= ss->oldest_y;
88			ss->sum_y += new_val;
89			ss->sum_xy = ss->sum_xy - ss->sum_y + ss->dur * new_val;
90		}
91
92		if (ss->state & __FIO_SS_IOPS)
93			ss->oldest_y = ss->iops_data[ss->head];
94		else
95			ss->oldest_y = ss->bw_data[ss->head];
96
97		/*
98		 * calculate slope as (sum_xy - sum_x * sum_y / n) / (sum_(x^2)
99		 * - (sum_x)^2 / n) This code assumes that all x values are
100		 * equally spaced when they are often off by a few milliseconds.
101		 * This assumption greatly simplifies the calculations.
102		 */
103		ss->slope = (ss->sum_xy - (double) ss->sum_x * ss->sum_y / ss->dur) /
104				(ss->sum_x_sq - (double) ss->sum_x * ss->sum_x / ss->dur);
105		if (ss->state & __FIO_SS_PCT)
106			ss->criterion = 100.0 * ss->slope / (ss->sum_y / ss->dur);
107		else
108			ss->criterion = ss->slope;
109
110		dprint(FD_STEADYSTATE, "sum_y: %llu, sum_xy: %llu, slope: %f, "
111					"criterion: %f, limit: %f\n",
112					(unsigned long long) ss->sum_y,
113					(unsigned long long) ss->sum_xy,
114					ss->slope, ss->criterion, ss->limit);
115
116		result = ss->criterion * (ss->criterion < 0.0 ? -1.0 : 1.0);
117		if (result < ss->limit)
118			return true;
119	}
120
121	ss->tail = (ss->tail + 1) % ss->dur;
122	if (ss->tail <= ss->head)
123		ss->head = (ss->head + 1) % ss->dur;
124
125	return false;
126}
127
128static bool steadystate_deviation(uint64_t iops, uint64_t bw,
129				  struct thread_data *td)
130{
131	int i;
132	double diff;
133	double mean;
134
135	struct steadystate_data *ss = &td->ss;
136
137	ss->bw_data[ss->tail] = bw;
138	ss->iops_data[ss->tail] = iops;
139
140	if (ss->state & __FIO_SS_BUFFER_FULL || ss->tail - ss->head == ss->dur - 1) {
141		if (!(ss->state & __FIO_SS_BUFFER_FULL)) {
142			/* first time through */
143			for(i = 0, ss->sum_y = 0; i < ss->dur; i++)
144				if (ss->state & __FIO_SS_IOPS)
145					ss->sum_y += ss->iops_data[i];
146				else
147					ss->sum_y += ss->bw_data[i];
148			ss->state |= __FIO_SS_BUFFER_FULL;
149		} else {		/* easy to update the sum */
150			ss->sum_y -= ss->oldest_y;
151			if (ss->state & __FIO_SS_IOPS)
152				ss->sum_y += ss->iops_data[ss->tail];
153			else
154				ss->sum_y += ss->bw_data[ss->tail];
155		}
156
157		if (ss->state & __FIO_SS_IOPS)
158			ss->oldest_y = ss->iops_data[ss->head];
159		else
160			ss->oldest_y = ss->bw_data[ss->head];
161
162		mean = (double) ss->sum_y / ss->dur;
163		ss->deviation = 0.0;
164
165		for (i = 0; i < ss->dur; i++) {
166			if (ss->state & __FIO_SS_IOPS)
167				diff = ss->iops_data[i] - mean;
168			else
169				diff = ss->bw_data[i] - mean;
170			ss->deviation = max(ss->deviation, diff * (diff < 0.0 ? -1.0 : 1.0));
171		}
172
173		if (ss->state & __FIO_SS_PCT)
174			ss->criterion = 100.0 * ss->deviation / mean;
175		else
176			ss->criterion = ss->deviation;
177
178		dprint(FD_STEADYSTATE, "sum_y: %llu, mean: %f, max diff: %f, "
179					"objective: %f, limit: %f\n",
180					(unsigned long long) ss->sum_y, mean,
181					ss->deviation, ss->criterion, ss->limit);
182
183		if (ss->criterion < ss->limit)
184			return true;
185	}
186
187	ss->tail = (ss->tail + 1) % ss->dur;
188	if (ss->tail <= ss->head)
189		ss->head = (ss->head + 1) % ss->dur;
190
191	return false;
192}
193
194void steadystate_check(void)
195{
196	int i, j, ddir, prev_groupid, group_ramp_time_over = 0;
197	unsigned long rate_time;
198	struct thread_data *td, *td2;
199	struct timeval now;
200	uint64_t group_bw = 0, group_iops = 0;
201	uint64_t td_iops, td_bytes;
202	bool ret;
203
204	prev_groupid = -1;
205	for_each_td(td, i) {
206		struct steadystate_data *ss = &td->ss;
207
208		if (!ss->dur || td->runstate <= TD_SETTING_UP ||
209		    td->runstate >= TD_EXITED || !ss->state ||
210		    ss->state & __FIO_SS_ATTAINED)
211			continue;
212
213		td_iops = 0;
214		td_bytes = 0;
215		if (!td->o.group_reporting ||
216		    (td->o.group_reporting && td->groupid != prev_groupid)) {
217			group_bw = 0;
218			group_iops = 0;
219			group_ramp_time_over = 0;
220		}
221		prev_groupid = td->groupid;
222
223		fio_gettime(&now, NULL);
224		if (ss->ramp_time && !(ss->state & __FIO_SS_RAMP_OVER)) {
225			/*
226			 * Begin recording data one second after ss->ramp_time
227			 * has elapsed
228			 */
229			if (utime_since(&td->epoch, &now) >= (ss->ramp_time + 1000000L))
230				ss->state |= __FIO_SS_RAMP_OVER;
231		}
232
233		td_io_u_lock(td);
234		for (ddir = 0; ddir < DDIR_RWDIR_CNT; ddir++) {
235			td_iops += td->io_blocks[ddir];
236			td_bytes += td->io_bytes[ddir];
237		}
238		td_io_u_unlock(td);
239
240		rate_time = mtime_since(&ss->prev_time, &now);
241		memcpy(&ss->prev_time, &now, sizeof(now));
242
243		/*
244		 * Begin monitoring when job starts but don't actually use
245		 * data in checking stopping criterion until ss->ramp_time is
246		 * over. This ensures that we will have a sane value in
247		 * prev_iops/bw the first time through after ss->ramp_time
248		 * is done.
249		 */
250		if (ss->state & __FIO_SS_RAMP_OVER) {
251			group_bw += 1000 * (td_bytes - ss->prev_bytes) / rate_time;
252			group_iops += 1000 * (td_iops - ss->prev_iops) / rate_time;
253			++group_ramp_time_over;
254		}
255		ss->prev_iops = td_iops;
256		ss->prev_bytes = td_bytes;
257
258		if (td->o.group_reporting && !(ss->state & __FIO_SS_DATA))
259			continue;
260
261		/*
262		 * Don't begin checking criterion until ss->ramp_time is over
263		 * for at least one thread in group
264		 */
265		if (!group_ramp_time_over)
266			continue;
267
268		dprint(FD_STEADYSTATE, "steadystate_check() thread: %d, "
269					"groupid: %u, rate_msec: %ld, "
270					"iops: %llu, bw: %llu, head: %d, tail: %d\n",
271					i, td->groupid, rate_time,
272					(unsigned long long) group_iops,
273					(unsigned long long) group_bw,
274					ss->head, ss->tail);
275
276		if (ss->state & __FIO_SS_SLOPE)
277			ret = steadystate_slope(group_iops, group_bw, td);
278		else
279			ret = steadystate_deviation(group_iops, group_bw, td);
280
281		if (ret) {
282			if (td->o.group_reporting) {
283				for_each_td(td2, j) {
284					if (td2->groupid == td->groupid) {
285						td2->ss.state |= __FIO_SS_ATTAINED;
286						fio_mark_td_terminate(td2);
287					}
288				}
289			} else {
290				ss->state |= __FIO_SS_ATTAINED;
291				fio_mark_td_terminate(td);
292			}
293		}
294	}
295}
296
297int td_steadystate_init(struct thread_data *td)
298{
299	struct steadystate_data *ss = &td->ss;
300	struct thread_options *o = &td->o;
301	struct thread_data *td2;
302	int j;
303
304	memset(ss, 0, sizeof(*ss));
305
306	if (o->ss_dur) {
307		steadystate_enabled = true;
308		o->ss_dur /= 1000000L;
309
310		/* put all steady state info in one place */
311		ss->dur = o->ss_dur;
312		ss->limit = o->ss_limit.u.f;
313		ss->ramp_time = o->ss_ramp_time;
314
315		ss->state = o->ss_state;
316		if (!td->ss.ramp_time)
317			ss->state |= __FIO_SS_RAMP_OVER;
318
319		ss->sum_x = o->ss_dur * (o->ss_dur - 1) / 2;
320		ss->sum_x_sq = (o->ss_dur - 1) * (o->ss_dur) * (2*o->ss_dur - 1) / 6;
321	}
322
323	/* make sure that ss options are consistent within reporting group */
324	for_each_td(td2, j) {
325		if (td2->groupid == td->groupid) {
326			struct steadystate_data *ss2 = &td2->ss;
327
328			if (ss2->dur != ss->dur ||
329			    ss2->limit != ss->limit ||
330			    ss2->ramp_time != ss->ramp_time ||
331			    ss2->state != ss->state ||
332			    ss2->sum_x != ss->sum_x ||
333			    ss2->sum_x_sq != ss->sum_x_sq) {
334				td_verror(td, EINVAL, "job rejected: steadystate options must be consistent within reporting groups");
335				return 1;
336			}
337		}
338	}
339
340	return 0;
341}
342
343uint64_t steadystate_bw_mean(struct thread_stat *ts)
344{
345	int i;
346	uint64_t sum;
347
348	for (i = 0, sum = 0; i < ts->ss_dur; i++)
349		sum += ts->ss_bw_data[i];
350
351	return sum / ts->ss_dur;
352}
353
354uint64_t steadystate_iops_mean(struct thread_stat *ts)
355{
356	int i;
357	uint64_t sum;
358
359	for (i = 0, sum = 0; i < ts->ss_dur; i++)
360		sum += ts->ss_iops_data[i];
361
362	return sum / ts->ss_dur;
363}
364