1# Copyright 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""This module provides utilities to do audio data analysis."""
6
7import logging
8import numpy
9import operator
10
11# Only peaks with coefficient greater than 0.01 of the first peak should be
12# considered. Note that this correspond to -40dB in the spectrum.
13DEFAULT_MIN_PEAK_RATIO = 0.01
14
15PEAK_WINDOW_SIZE_HZ = 20 # Window size for peak detection.
16
17# The minimum RMS value of meaningful audio data.
18MEANINGFUL_RMS_THRESHOLD = 0.001
19
20class RMSTooSmallError(Exception):
21    """Error when signal RMS is too small."""
22    pass
23
24
25class EmptyDataError(Exception):
26    """Error when signal is empty."""
27
28
29def normalize_signal(signal, saturate_value):
30    """Normalizes the signal with respect to the saturate value.
31
32    @param signal: A list for one-channel PCM data.
33    @param saturate_value: The maximum value that the PCM data might be.
34
35    @returns: A numpy array containing normalized signal. The normalized signal
36              has value -1 and 1 when it saturates.
37
38    """
39    signal = numpy.array(signal)
40    return signal / float(saturate_value)
41
42
43def spectral_analysis(signal, rate, min_peak_ratio=DEFAULT_MIN_PEAK_RATIO,
44                      peak_window_size_hz=PEAK_WINDOW_SIZE_HZ):
45    """Gets the dominant frequencies by spectral analysis.
46
47    @param signal: A list of numbers for one-channel PCM data. This should be
48                   normalized to [-1, 1] so the function can check if signal RMS
49                   is too small to be meaningful.
50    @param rate: Sampling rate.
51    @param min_peak_ratio: The minimum peak_0/peak_i ratio such that the
52                           peaks other than the greatest one should be
53                           considered.
54                           This is to ignore peaks that are too small compared
55                           to the first peak peak_0.
56    @param peak_window_size_hz: The window size in Hz to find the peaks.
57                                The minimum differences between found peaks will
58                                be half of this value.
59
60    @returns: A list of tuples:
61              [(peak_frequency_0, peak_coefficient_0),
62               (peak_frequency_1, peak_coefficient_1),
63               (peak_frequency_2, peak_coefficient_2), ...]
64              where the tuples are sorted by coefficients.
65              The last peak_coefficient will be no less than
66              peak_coefficient * min_peak_ratio.
67
68    """
69    # Checks the signal is meaningful.
70    if len(signal) == 0:
71        raise EmptyDataError('Signal data is empty')
72
73    signal_rms = numpy.linalg.norm(signal) / numpy.sqrt(len(signal))
74    logging.debug('signal RMS = %s', signal_rms)
75    if signal_rms < MEANINGFUL_RMS_THRESHOLD:
76        raise RMSTooSmallError(
77                'RMS %s is too small to be meaningful' % signal_rms)
78
79    logging.debug('Doing spectral analysis ...')
80
81    # First, pass signal through a window function to mitigate spectral leakage.
82    y_conv_w = signal * numpy.hanning(len(signal))
83
84    length = len(y_conv_w)
85
86    # x_f is the frequency in Hz, y_f is the transformed coefficient.
87    x_f = _rfft_freq(length, rate)
88    y_f = 2.0 / length * numpy.fft.rfft(y_conv_w)
89
90    # y_f is complex so consider its absolute value for magnitude.
91    abs_y_f = numpy.abs(y_f)
92    threshold = max(abs_y_f) * min_peak_ratio
93
94    # Suppresses all coefficients that are below threshold.
95    for i in xrange(len(abs_y_f)):
96        if abs_y_f[i] < threshold:
97            abs_y_f[i] = 0
98
99    # Gets the peak detection window size in indice.
100    # x_f[1] is the frequency difference per index.
101    peak_window_size = int(peak_window_size_hz / x_f[1])
102
103    # Detects peaks.
104    peaks = peak_detection(abs_y_f, peak_window_size)
105
106    # Transform back the peak location from index to frequency.
107    results = []
108    for index, value in peaks:
109        results.append((x_f[index], value))
110    return results
111
112
113def _rfft_freq(length, rate):
114    """Gets the frequency at each index of real FFT.
115
116    @param length: The window length of FFT.
117    @param rate: Sampling rate.
118
119    @returns: A numpy array containing frequency corresponding to
120              numpy.fft.rfft result at each index.
121
122    """
123    # The difference in Hz between each index.
124    val = rate / float(length)
125    # Only care half of frequencies for FFT on real signal.
126    result_length = length // 2 + 1
127    return numpy.linspace(0, (result_length - 1) * val, result_length)
128
129
130def peak_detection(array, window_size):
131    """Detects peaks in an array.
132
133    A point (i, array[i]) is a peak if array[i] is the maximum among
134    array[i - half_window_size] to array[i + half_window_size].
135    If array[i - half_window_size] to array[i + half_window_size] are all equal,
136    then there is no peak in this window.
137    Note that we only consider peak with value greater than 0.
138
139    @param window_size: The window to detect peaks.
140
141    @returns: A list of tuples:
142              [(peak_index_1, peak_value_1), (peak_index_2, peak_value_2), ...]
143              where the tuples are sorted by peak values.
144
145    """
146    half_window_size = window_size / 2
147    length = len(array)
148
149    def mid_is_peak(array, mid, left, right):
150        """Checks if value at mid is the largest among left to right in array.
151
152        @param array: A list of numbers.
153        @param mid: The mid index.
154        @param left: The left index.
155        @param rigth: The right index.
156
157        @returns: A tuple (is_peak, next_candidate)
158                  is_peak is True if array[index] is the maximum among numbers
159                  in array between index [left, right] inclusively.
160                  next_candidate is the index of next candidate for peak if
161                  is_peak is False. It is the index of maximum value in
162                  [mid + 1, right]. If is_peak is True, next_candidate is
163                  right + 1.
164
165        """
166        value_mid = array[mid]
167        is_peak = True
168        next_peak_candidate_index = None
169
170        # Check the left half window.
171        for index in xrange(left, mid):
172            if array[index] >= value_mid:
173                is_peak = False
174                break
175
176        # Mid is at the end of array.
177        if mid == right:
178            return is_peak, right + 1
179
180        # Check the right half window and also record next candidate.
181        # Favor the larger index for next_peak_candidate_index.
182        for index in xrange(right, mid, -1):
183            if (next_peak_candidate_index is None or
184                array[index] > array[next_peak_candidate_index]):
185                next_peak_candidate_index = index
186
187        if array[next_peak_candidate_index] >= value_mid:
188            is_peak = False
189
190        if is_peak:
191            next_peak_candidate_index = right + 1
192
193        return is_peak, next_peak_candidate_index
194
195    results = []
196    mid = 0
197    next_candidate_idx = None
198    while mid < length:
199        left = max(0, mid - half_window_size)
200        right = min(length - 1, mid + half_window_size)
201
202        # Only consider value greater than 0.
203        if array[mid] == 0:
204            mid = mid + 1
205            continue;
206
207        is_peak, next_candidate_idx = mid_is_peak(array, mid, left, right)
208
209        if is_peak:
210            results.append((mid, array[mid]))
211
212        # Use the next candidate found in [mid + 1, right], or right + 1.
213        mid = next_candidate_idx
214
215    # Sort the peaks by values.
216    return sorted(results, key=lambda x: x[1], reverse=True)
217
218
219# The default pattern mathing threshold. By experiment, this threshold
220# can tolerate normal noise of 0.3 amplitude when sine wave signal
221# amplitude is 1.
222PATTERN_MATCHING_THRESHOLD = 0.85
223
224# The default block size of pattern matching.
225ANOMALY_DETECTION_BLOCK_SIZE = 120
226
227def anomaly_detection(signal, rate, freq,
228                      block_size=ANOMALY_DETECTION_BLOCK_SIZE,
229                      threshold=PATTERN_MATCHING_THRESHOLD):
230    """Detects anomaly in a sine wave signal.
231
232    This method detects anomaly in a sine wave signal by matching
233    patterns of each block.
234    For each moving window of block in the test signal, checks if there
235    is any block in golden signal that is similar to this block of test signal.
236    If there is such a block in golden signal, then this block of test
237    signal is matched and there is no anomaly in this block of test signal.
238    If there is any block in test signal that is not matched, then this block
239    covers an anomaly.
240    The block of test signal starts from index 0, and proceeds in steps of
241    half block size. The overlapping of test signal blocks makes sure there must
242    be at least one block covering the transition from sine wave to anomaly.
243
244    @param signal: A 1-D array-like object for 1-channel PCM data.
245    @param rate: The sampling rate.
246    @param freq: The expected frequency of signal.
247    @param block_size: The block size in samples to detect anomaly.
248    @param threshold: The threshold of correlation index to be judge as matched.
249
250    @returns: A list containing detected anomaly time in seconds.
251
252    """
253    if len(signal) == 0:
254        raise EmptyDataError('Signal data is empty')
255
256    golden_y = _generate_golden_pattern(rate, freq, block_size)
257
258    results = []
259
260    for start in xrange(0, len(signal), block_size / 2):
261        end = start + block_size
262        test_signal = signal[start:end]
263        matched = _moving_pattern_matching(golden_y, test_signal, threshold)
264        if not matched:
265            results.append(start)
266
267    results = [float(x) / rate for x in results]
268
269    return results
270
271
272def _generate_golden_pattern(rate, freq, block_size):
273    """Generates a golden pattern of certain frequency.
274
275    The golden pattern must cover all the possibilities of waveforms in a
276    block. So, we need a golden pattern covering 1 period + 1 block size,
277    such that the test block can start anywhere in a period, and extends
278    a block size.
279
280    |period |1 bk|
281    |       |    |
282     . .     . .
283    .   .   .   .
284         . .     .
285
286    @param rate: The sampling rate.
287    @param freq: The frequency of golden pattern.
288    @param block_size: The block size in samples to detect anomaly.
289
290    @returns: A 1-D array for golden pattern.
291
292    """
293    samples_in_a_period = int(rate / freq) + 1
294    samples_in_golden_pattern = samples_in_a_period + block_size
295    golden_x = numpy.linspace(
296            0.0, (samples_in_golden_pattern - 1) * 1.0 / rate,
297            samples_in_golden_pattern)
298    golden_y = numpy.sin(freq * 2.0 * numpy.pi * golden_x)
299    return golden_y
300
301
302def _moving_pattern_matching(golden_signal, test_signal, threshold):
303    """Checks if test_signal is similar to any block of golden_signal.
304
305    Compares test signal with each block of golden signal by correlation
306    index. If there is any block of golden signal that is similar to
307    test signal, then it is matched.
308
309    @param golden_signal: A 1-D array for golden signal.
310    @param test_signal: A 1-D array for test signal.
311    @param threshold: The threshold of correlation index to be judge as matched.
312
313    @returns: True if there is a match. False otherwise.
314
315    @raises: ValueError: if test signal is longer than golden signal.
316
317    """
318    if len(golden_signal) < len(test_signal):
319        raise ValueError('Test signal is longer than golden signal')
320
321    block_length = len(test_signal)
322    number_of_movings = len(golden_signal) - block_length + 1
323    correlation_indices = []
324    for moving_index in xrange(number_of_movings):
325        # Cuts one block of golden signal from start index.
326        # The block length is the same as test signal.
327        start = moving_index
328        end = start + block_length
329        golden_signal_block = golden_signal[start:end]
330        try:
331            correlation_index = _get_correlation_index(
332                    golden_signal_block, test_signal)
333        except TestSignalNormTooSmallError:
334            logging.info('Caught one block of test signal that has no meaningful norm')
335            return False
336        correlation_indices.append(correlation_index)
337
338    # Checks if the maximum correlation index is high enough.
339    max_corr = max(correlation_indices)
340    if max_corr < threshold:
341        logging.debug('Got one unmatched block with max_corr: %s', max_corr)
342        return False
343    return True
344
345
346class GoldenSignalNormTooSmallError(Exception):
347    """Exception when golden signal norm is too small."""
348    pass
349
350
351class TestSignalNormTooSmallError(Exception):
352    """Exception when test signal norm is too small."""
353    pass
354
355
356_MINIMUM_SIGNAL_NORM = 0.001
357
358def _get_correlation_index(golden_signal, test_signal):
359    """Computes correlation index of two signal of same length.
360
361    @param golden_signal: An 1-D array-like object.
362    @param test_signal: An 1-D array-like object.
363
364    @raises: ValueError: if two signal have different lengths.
365    @raises: GoldenSignalNormTooSmallError: if golden signal norm is too small
366    @raises: TestSignalNormTooSmallError: if test signal norm is too small.
367
368    @returns: The correlation index.
369    """
370    if len(golden_signal) != len(test_signal):
371        raise ValueError(
372                'Only accepts signal of same length: %s, %s' % (
373                        len(golden_signal), len(test_signal)))
374
375    norm_golden = numpy.linalg.norm(golden_signal)
376    norm_test = numpy.linalg.norm(test_signal)
377    if norm_golden <= _MINIMUM_SIGNAL_NORM:
378        raise GoldenSignalNormTooSmallError(
379                'No meaningful data as norm is too small.')
380    if norm_test <= _MINIMUM_SIGNAL_NORM:
381        raise TestSignalNormTooSmallError(
382                'No meaningful data as norm is too small.')
383
384    # The 'valid' cross correlation result of two signals of same length will
385    # contain only one number.
386    correlation = numpy.correlate(golden_signal, test_signal, 'valid')[0]
387    return correlation / (norm_golden * norm_test)
388