1#!/usr/bin/env python2.7
2"""
3    Utility for converting *_clat_hist* files generated by fio into latency statistics.
4
5    Example usage:
6
7            $ fiologparser_hist.py *_clat_hist*
8            end-time, samples, min, avg, median, 90%, 95%, 99%, max
9            1000, 15, 192, 1678.107, 1788.859, 1856.076, 1880.040, 1899.208, 1888.000
10            2000, 43, 152, 1642.368, 1714.099, 1816.659, 1845.552, 1888.131, 1888.000
11            4000, 39, 1152, 1546.962, 1545.785, 1627.192, 1640.019, 1691.204, 1744
12            ...
13
14    @author Karl Cronburg <karl.cronburg@gmail.com>
15"""
16import os
17import sys
18import pandas
19import numpy as np
20
21err = sys.stderr.write
22
23def weighted_percentile(percs, vs, ws):
24    """ Use linear interpolation to calculate the weighted percentile.
25
26        Value and weight arrays are first sorted by value. The cumulative
27        distribution function (cdf) is then computed, after which np.interp
28        finds the two values closest to our desired weighted percentile(s)
29        and linearly interpolates them.
30
31        percs  :: List of percentiles we want to calculate
32        vs     :: Array of values we are computing the percentile of
33        ws     :: Array of weights for our corresponding values
34        return :: Array of percentiles
35    """
36    idx = np.argsort(vs)
37    vs, ws = vs[idx], ws[idx] # weights and values sorted by value
38    cdf = 100 * (ws.cumsum() - ws / 2.0) / ws.sum()
39    return np.interp(percs, cdf, vs) # linear interpolation
40
41def weights(start_ts, end_ts, start, end):
42    """ Calculate weights based on fraction of sample falling in the
43        given interval [start,end]. Weights computed using vector / array
44        computation instead of for-loops.
45
46        Note that samples with zero time length are effectively ignored
47        (we set their weight to zero).
48
49        start_ts :: Array of start times for a set of samples
50        end_ts   :: Array of end times for a set of samples
51        start    :: int
52        end      :: int
53        return   :: Array of weights
54    """
55    sbounds = np.maximum(start_ts, start).astype(float)
56    ebounds = np.minimum(end_ts,   end).astype(float)
57    ws = (ebounds - sbounds) / (end_ts - start_ts)
58    if np.any(np.isnan(ws)):
59      err("WARNING: zero-length sample(s) detected. Log file corrupt"
60          " / bad time values? Ignoring these samples.\n")
61    ws[np.where(np.isnan(ws))] = 0.0;
62    return ws
63
64def weighted_average(vs, ws):
65    return np.sum(vs * ws) / np.sum(ws)
66
67columns = ["end-time", "samples", "min", "avg", "median", "90%", "95%", "99%", "max"]
68percs   = [50, 90, 95, 99]
69
70def fmt_float_list(ctx, num=1):
71  """ Return a comma separated list of float formatters to the required number
72      of decimal places. For instance:
73
74        fmt_float_list(ctx.decimals=4, num=3) == "%.4f, %.4f, %.4f"
75  """
76  return ', '.join(["%%.%df" % ctx.decimals] * num)
77
78# Default values - see beginning of main() for how we detect number columns in
79# the input files:
80__HIST_COLUMNS = 1216
81__NON_HIST_COLUMNS = 3
82__TOTAL_COLUMNS = __HIST_COLUMNS + __NON_HIST_COLUMNS
83
84def read_chunk(rdr, sz):
85    """ Read the next chunk of size sz from the given reader. """
86    try:
87        """ StopIteration occurs when the pandas reader is empty, and AttributeError
88            occurs if rdr is None due to the file being empty. """
89        new_arr = rdr.read().values
90    except (StopIteration, AttributeError):
91        return None
92
93    """ Extract array of just the times, and histograms matrix without times column. """
94    times, rws, szs = new_arr[:,0], new_arr[:,1], new_arr[:,2]
95    hists = new_arr[:,__NON_HIST_COLUMNS:]
96    times = times.reshape((len(times),1))
97    arr = np.append(times, hists, axis=1)
98
99    return arr
100
101def get_min(fps, arrs):
102    """ Find the file with the current first row with the smallest start time """
103    return min([fp for fp in fps if not arrs[fp] is None], key=lambda fp: arrs.get(fp)[0][0])
104
105def histogram_generator(ctx, fps, sz):
106
107    # Create a chunked pandas reader for each of the files:
108    rdrs = {}
109    for fp in fps:
110        try:
111            rdrs[fp] = pandas.read_csv(fp, dtype=int, header=None, chunksize=sz)
112        except ValueError as e:
113            if e.message == 'No columns to parse from file':
114                if ctx.warn: sys.stderr.write("WARNING: Empty input file encountered.\n")
115                rdrs[fp] = None
116            else:
117                raise(e)
118
119    # Initial histograms from disk:
120    arrs = {fp: read_chunk(rdr, sz) for fp,rdr in rdrs.items()}
121    while True:
122
123        try:
124            """ ValueError occurs when nothing more to read """
125            fp = get_min(fps, arrs)
126        except ValueError:
127            return
128        arr = arrs[fp]
129        yield np.insert(arr[0], 1, fps.index(fp))
130        arrs[fp] = arr[1:]
131
132        if arrs[fp].shape[0] == 0:
133            arrs[fp] = read_chunk(rdrs[fp], sz)
134
135def _plat_idx_to_val(idx, edge=0.5, FIO_IO_U_PLAT_BITS=6, FIO_IO_U_PLAT_VAL=64):
136    """ Taken from fio's stat.c for calculating the latency value of a bin
137        from that bin's index.
138
139            idx  : the value of the index into the histogram bins
140            edge : fractional value in the range [0,1]** indicating how far into
141            the bin we wish to compute the latency value of.
142
143        ** edge = 0.0 and 1.0 computes the lower and upper latency bounds
144           respectively of the given bin index. """
145
146    # MSB <= (FIO_IO_U_PLAT_BITS-1), cannot be rounded off. Use
147    # all bits of the sample as index
148    if (idx < (FIO_IO_U_PLAT_VAL << 1)):
149        return idx
150
151    # Find the group and compute the minimum value of that group
152    error_bits = (idx >> FIO_IO_U_PLAT_BITS) - 1
153    base = 1 << (error_bits + FIO_IO_U_PLAT_BITS)
154
155    # Find its bucket number of the group
156    k = idx % FIO_IO_U_PLAT_VAL
157
158    # Return the mean (if edge=0.5) of the range of the bucket
159    return base + ((k + edge) * (1 << error_bits))
160
161def plat_idx_to_val_coarse(idx, coarseness, edge=0.5):
162    """ Converts the given *coarse* index into a non-coarse index as used by fio
163        in stat.h:plat_idx_to_val(), subsequently computing the appropriate
164        latency value for that bin.
165        """
166
167    # Multiply the index by the power of 2 coarseness to get the bin
168    # bin index with a max of 1536 bins (FIO_IO_U_PLAT_GROUP_NR = 24 in stat.h)
169    stride = 1 << coarseness
170    idx = idx * stride
171    lower = _plat_idx_to_val(idx, edge=0.0)
172    upper = _plat_idx_to_val(idx + stride, edge=1.0)
173    return lower + (upper - lower) * edge
174
175def print_all_stats(ctx, end, mn, ss_cnt, vs, ws, mx):
176    ps = weighted_percentile(percs, vs, ws)
177
178    avg = weighted_average(vs, ws)
179    values = [mn, avg] + list(ps) + [mx]
180    row = [end, ss_cnt] + map(lambda x: float(x) / ctx.divisor, values)
181    fmt = "%d, %d, %d, " + fmt_float_list(ctx, 5) + ", %d"
182    print (fmt % tuple(row))
183
184def update_extreme(val, fncn, new_val):
185    """ Calculate min / max in the presence of None values """
186    if val is None: return new_val
187    else: return fncn(val, new_val)
188
189# See beginning of main() for how bin_vals are computed
190bin_vals = []
191lower_bin_vals = [] # lower edge of each bin
192upper_bin_vals = [] # upper edge of each bin
193
194def process_interval(ctx, samples, iStart, iEnd):
195    """ Construct the weighted histogram for the given interval by scanning
196        through all the histograms and figuring out which of their bins have
197        samples with latencies which overlap with the given interval
198        [iStart,iEnd].
199    """
200
201    times, files, hists = samples[:,0], samples[:,1], samples[:,2:]
202    iHist = np.zeros(__HIST_COLUMNS)
203    ss_cnt = 0 # number of samples affecting this interval
204    mn_bin_val, mx_bin_val = None, None
205
206    for end_time,file,hist in zip(times,files,hists):
207
208        # Only look at bins of the current histogram sample which
209        # started before the end of the current time interval [start,end]
210        start_times = (end_time - 0.5 * ctx.interval) - bin_vals / 1000.0
211        idx = np.where(start_times < iEnd)
212        s_ts, l_bvs, u_bvs, hs = start_times[idx], lower_bin_vals[idx], upper_bin_vals[idx], hist[idx]
213
214        # Increment current interval histogram by weighted values of future histogram:
215        ws = hs * weights(s_ts, end_time, iStart, iEnd)
216        iHist[idx] += ws
217
218        # Update total number of samples affecting current interval histogram:
219        ss_cnt += np.sum(hs)
220
221        # Update min and max bin values seen if necessary:
222        idx = np.where(hs != 0)[0]
223        if idx.size > 0:
224            mn_bin_val = update_extreme(mn_bin_val, min, l_bvs[max(0,           idx[0]  - 1)])
225            mx_bin_val = update_extreme(mx_bin_val, max, u_bvs[min(len(hs) - 1, idx[-1] + 1)])
226
227    if ss_cnt > 0: print_all_stats(ctx, iEnd, mn_bin_val, ss_cnt, bin_vals, iHist, mx_bin_val)
228
229def guess_max_from_bins(ctx, hist_cols):
230    """ Try to guess the GROUP_NR from given # of histogram
231        columns seen in an input file """
232    max_coarse = 8
233    if ctx.group_nr < 19 or ctx.group_nr > 26:
234        bins = [ctx.group_nr * (1 << 6)]
235    else:
236        bins = [1216,1280,1344,1408,1472,1536,1600,1664]
237    coarses = range(max_coarse + 1)
238    fncn = lambda z: list(map(lambda x: z/2**x if z % 2**x == 0 else -10, coarses))
239
240    arr = np.transpose(list(map(fncn, bins)))
241    idx = np.where(arr == hist_cols)
242    if len(idx[1]) == 0:
243        table = repr(arr.astype(int)).replace('-10', 'N/A').replace('array','     ')
244        err("Unable to determine bin values from input clat_hist files. Namely \n"
245            "the first line of file '%s' " % ctx.FILE[0] + "has %d \n" % (__TOTAL_COLUMNS,) +
246            "columns of which we assume %d " % (hist_cols,) + "correspond to histogram bins. \n"
247            "This number needs to be equal to one of the following numbers:\n\n"
248            + table + "\n\n"
249            "Possible reasons and corresponding solutions:\n"
250            "  - Input file(s) does not contain histograms.\n"
251            "  - You recompiled fio with a different GROUP_NR. If so please specify this\n"
252            "    new GROUP_NR on the command line with --group_nr\n")
253        exit(1)
254    return bins[idx[1][0]]
255
256def main(ctx):
257
258    if ctx.job_file:
259        try:
260            from configparser import SafeConfigParser, NoOptionError
261        except ImportError:
262            from ConfigParser import SafeConfigParser, NoOptionError
263
264        cp = SafeConfigParser(allow_no_value=True)
265        with open(ctx.job_file, 'r') as fp:
266            cp.readfp(fp)
267
268        if ctx.interval is None:
269            # Auto detect --interval value
270            for s in cp.sections():
271                try:
272                    hist_msec = cp.get(s, 'log_hist_msec')
273                    if hist_msec is not None:
274                        ctx.interval = int(hist_msec)
275                except NoOptionError:
276                    pass
277
278    if ctx.interval is None:
279        ctx.interval = 1000
280
281    # Automatically detect how many columns are in the input files,
282    # calculate the corresponding 'coarseness' parameter used to generate
283    # those files, and calculate the appropriate bin latency values:
284    with open(ctx.FILE[0], 'r') as fp:
285        global bin_vals,lower_bin_vals,upper_bin_vals,__HIST_COLUMNS,__TOTAL_COLUMNS
286        __TOTAL_COLUMNS = len(fp.readline().split(','))
287        __HIST_COLUMNS = __TOTAL_COLUMNS - __NON_HIST_COLUMNS
288
289        max_cols = guess_max_from_bins(ctx, __HIST_COLUMNS)
290        coarseness = int(np.log2(float(max_cols) / __HIST_COLUMNS))
291        bin_vals = np.array(map(lambda x: plat_idx_to_val_coarse(x, coarseness), np.arange(__HIST_COLUMNS)), dtype=float)
292        lower_bin_vals = np.array(map(lambda x: plat_idx_to_val_coarse(x, coarseness, 0.0), np.arange(__HIST_COLUMNS)), dtype=float)
293        upper_bin_vals = np.array(map(lambda x: plat_idx_to_val_coarse(x, coarseness, 1.0), np.arange(__HIST_COLUMNS)), dtype=float)
294
295    fps = [open(f, 'r') for f in ctx.FILE]
296    gen = histogram_generator(ctx, fps, ctx.buff_size)
297
298    print(', '.join(columns))
299
300    try:
301        start, end = 0, ctx.interval
302        arr = np.empty(shape=(0,__TOTAL_COLUMNS - 1))
303        more_data = True
304        while more_data or len(arr) > 0:
305
306            # Read up to ctx.max_latency (default 20 seconds) of data from end of current interval.
307            while len(arr) == 0 or arr[-1][0] < ctx.max_latency * 1000 + end:
308                try:
309                    new_arr = next(gen)
310                except StopIteration:
311                    more_data = False
312                    break
313                arr = np.append(arr, new_arr.reshape((1,__TOTAL_COLUMNS - 1)), axis=0)
314            arr = arr.astype(int)
315
316            if arr.size > 0:
317                # Jump immediately to the start of the input, rounding
318                # down to the nearest multiple of the interval (useful when --log_unix_epoch
319                # was used to create these histograms):
320                if start == 0 and arr[0][0] - ctx.max_latency > end:
321                    start = arr[0][0] - ctx.max_latency
322                    start = start - (start % ctx.interval)
323                    end = start + ctx.interval
324
325                process_interval(ctx, arr, start, end)
326
327                # Update arr to throw away samples we no longer need - samples which
328                # end before the start of the next interval, i.e. the end of the
329                # current interval:
330                idx = np.where(arr[:,0] > end)
331                arr = arr[idx]
332
333            start += ctx.interval
334            end = start + ctx.interval
335    finally:
336        map(lambda f: f.close(), fps)
337
338
339if __name__ == '__main__':
340    import argparse
341    p = argparse.ArgumentParser()
342    arg = p.add_argument
343    arg("FILE", help='space separated list of latency log filenames', nargs='+')
344    arg('--buff_size',
345        default=10000,
346        type=int,
347        help='number of samples to buffer into numpy at a time')
348
349    arg('--max_latency',
350        default=20,
351        type=float,
352        help='number of seconds of data to process at a time')
353
354    arg('-i', '--interval',
355        type=int,
356        help='interval width (ms), default 1000 ms')
357
358    arg('-d', '--divisor',
359        required=False,
360        type=int,
361        default=1,
362        help='divide the results by this value.')
363
364    arg('--decimals',
365        default=3,
366        type=int,
367        help='number of decimal places to print floats to')
368
369    arg('--warn',
370        dest='warn',
371        action='store_true',
372        default=False,
373        help='print warning messages to stderr')
374
375    arg('--group_nr',
376        default=19,
377        type=int,
378        help='FIO_IO_U_PLAT_GROUP_NR as defined in stat.h')
379
380    arg('--job-file',
381        default=None,
382        type=str,
383        help='Optional argument pointing to the job file used to create the '
384             'given histogram files. Useful for auto-detecting --log_hist_msec and '
385             '--log_unix_epoch (in fio) values.')
386
387    main(p.parse_args())
388
389