1# Copyright 2016 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import os
7import tempfile
8import time
9
10import common
11from autotest_lib.client.common_lib import error
12from autotest_lib.client.common_lib.feedback import client
13from autotest_lib.server import test
14from autotest_lib.server.brillo import host_utils
15
16
17# Number of channels to record.
18_DEFAULT_NUM_CHANNELS = 1
19# Recording sample rate (48kHz).
20_DEFAULT_SAMPLE_RATE = 48000
21# Recording sample format is signed 16-bit PCM (two bytes).
22_DEFAULT_SAMPLE_WIDTH = 2
23# Default sine wave frequency.
24_DEFAULT_SINE_FREQUENCY = 440
25# Default recording duration.
26_DEFAULT_DURATION_SECS = 10
27
28_REC_FILENAME = 'rec_file.wav'
29
30class brillo_RecordingAudioTest(test.test):
31    """Verify that audio recording works."""
32    version = 1
33
34
35    def __init__(self, *args, **kwargs):
36        super(brillo_RecordingAudioTest, self).__init__(*args, **kwargs)
37        self.host = None
38
39
40    def _get_recording_cmd(self, recording_method, duration_secs, sample_width,
41                           sample_rate, num_channels, rec_file):
42        """Get a recording command based on the method.
43
44        @param recording_method: A string specifying the recording method to
45                                 use.
46        @param duration_secs: Duration (in secs) to record audio for.
47        @param sample_width: Size of samples in bytes.
48        @param sample_rate: Recording sample rate in hertz.
49        @param num_channels: Number of channels to use for recording.
50        @param rec_file: WAV file to store recorded audio to.
51
52        @return: A string containing the command to record audio using the
53                 specified method.
54
55        @raises TestError: Invalid recording method.
56        """
57        if recording_method == 'libmedia':
58            return ('brillo_audio_test --record --libmedia '
59                    '--duration_secs=%d --num_channels=%d --sample_rate=%d '
60                    '--sample_size=%d --filename=%s' %
61                    (duration_secs, num_channels, sample_rate, sample_width,
62                     rec_file) )
63        elif recording_method == 'stagefright':
64            return ('brillo_audio_test --record --stagefright '
65                    '--duration_secs=%d --num_channels=%d --sample_rate=%d '
66                    '--filename=%s' %
67                    (duration_secs, num_channels, sample_rate, rec_file))
68        elif recording_method == 'opensles':
69            return ('slesTest_recBuffQueue -c%d -d%d -r%d -%d %s' %
70                    (num_channels, duration_secs, sample_rate, sample_width,
71                     rec_file))
72        else:
73            raise error.TestError('Test called with invalid recording method.')
74
75
76    def test_recording(self, fb_query, recording_method, sample_width,
77                       sample_rate, num_channels, duration_secs):
78        """Performs a recording test.
79
80        @param fb_query: A feedback query.
81        @param recording_method: A string representing a recording method to
82                                 use.
83        @param sample_width: Size of samples in bytes.
84        @param sample_rate: Recording sample rate in hertz.
85        @param num_channels: Number of channels to use for recording.
86        @param duration_secs: Duration to record for.
87
88        @raise error.TestError: An error occurred while executing the test.
89        @raise error.TestFail: The test failed.
90        """
91        dut_tmpdir = self.host.get_tmp_dir()
92        dut_rec_file = os.path.join(dut_tmpdir, _REC_FILENAME)
93        cmd = self._get_recording_cmd(recording_method=recording_method,
94                                      duration_secs=duration_secs,
95                                      sample_width=sample_width,
96                                      sample_rate=sample_rate,
97                                      num_channels=num_channels,
98                                      rec_file=dut_rec_file)
99        timeout = duration_secs + 5
100        fb_query.prepare(use_file=self.use_file,
101                         sample_width=sample_width,
102                         sample_rate=sample_rate,
103                         num_channels=num_channels,
104                         frequency=_DEFAULT_SINE_FREQUENCY)
105        logging.info('Recording command: %s', cmd)
106        logging.info('Beginning audio recording')
107        pid = host_utils.run_in_background(self.host, cmd)
108        logging.info('Requesting audio input')
109        fb_query.emit()
110        logging.info('Waiting for recording to terminate')
111        if not host_utils.wait_for_process(self.host, pid, timeout):
112            raise error.TestError(
113                    'Recording did not terminate within %d seconds' % timeout)
114        _, local_rec_file = tempfile.mkstemp(prefix='recording-',
115                                             suffix='.wav', dir=self.tmpdir)
116        self.host.get_file(dut_rec_file, local_rec_file, delete_dest=True)
117        logging.info('Validating recorded audio')
118        fb_query.validate(captured_audio_file=local_rec_file)
119
120
121    def run_once(self, host, fb_client, recording_method, use_file=False,
122                 sample_widths_arr=[_DEFAULT_SAMPLE_WIDTH],
123                 sample_rates_arr=[_DEFAULT_SAMPLE_RATE],
124                 num_channels_arr=[_DEFAULT_NUM_CHANNELS],
125                 duration_secs=_DEFAULT_DURATION_SECS):
126        """Runs the test.
127
128        @param host: A host object representing the DUT.
129        @param fb_client: A feedback client implementation.
130        @param recording_method: A string representing a playback method to use.
131                                 Either 'opensles', 'libmedia', or
132                                 'stagefright'.
133        @param use_file: Use a file to test audio. Must be used with
134                         playback_method 'opensles'.
135        @param sample_widths_arr: Array of sample widths to test record at.
136        @param sample_rates_arr: Array of sample rates to test record at.
137        @param num_channels_arr: Array of number of channels to test record with.
138        @param duration_secs: Duration to record for.
139
140
141        @raise TestError: Something went wrong while trying to execute the test.
142        @raise TestFail: The test failed.
143        """
144        self.host = host
145        self.duration_secs = duration_secs
146        self.use_file = use_file
147        self.temp_dir = tempfile.mkdtemp(dir=fb_client.tmp_dir)
148        failed_params = []
149        with fb_client.initialize(self, host):
150            for sample_rate in sample_rates_arr:
151                for sample_width in sample_widths_arr:
152                    for num_channels in num_channels_arr:
153                        fb_query = fb_client.new_query(
154                                client.QUERY_AUDIO_RECORDING)
155                        logging.info('Running test with the following params:')
156                        logging.info('Sample rate: %d', sample_rate)
157                        logging.info('Sample width: %d', sample_width)
158                        logging.info('Number of channels: %d', num_channels)
159
160                        try:
161                            self.test_recording(fb_query=fb_query,
162                                    recording_method=recording_method,
163                                    sample_width=sample_width,
164                                    sample_rate=sample_rate,
165                                    num_channels=num_channels,
166                                    duration_secs=duration_secs)
167                        except error.TestFail as e:
168                            logging.info('Test failed: %s.' % e)
169                            failed_params.append((sample_rate, sample_width,
170                                                  num_channels))
171                        finally:
172                            # Sleep to avoid conflict between different tests.
173                            time.sleep(duration_secs)
174
175        if failed_params == []:
176            logging.info('All tests successfully passed.')
177        else:
178            logging.error('The following combinations failed:')
179            for param in failed_params:
180                logging.error('Sample rate: %i, Sample width: %i, Num Channels '
181                              '%i', param[0], param[1], param[2])
182            raise error.TestFail('Some of the tests failed to pass.')
183