desktopui_AudioFeedback.py revision 1b86376362ee5c994fe339f5364392429b096404
1# Copyright (c) 2012 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging, os, re, threading, utils
6
7from autotest_lib.client.bin import test, utils
8from autotest_lib.client.common_lib import error
9from autotest_lib.client.cros import cros_ui_test, httpd
10
11# Names of mixer controls.
12_CONTROL_MASTER = "'Master Playback Volume'"
13_CONTROL_HEADPHONE = "'Headphone Playback Volume'"
14_CONTROL_SPEAKER = "'Speaker Playback Volume'"
15_CONTROL_SPEAKER_HP = "'HP/Speakers'"
16_CONTROL_MIC_BOOST = "'Mic Boost Volume'"
17_CONTROL_CAPTURE = "'Capture Volume'"
18_CONTROL_PCM = "'PCM Playback Volume'"
19_CONTROL_DIGITAL = "'Digital Capture Volume'"
20_CONTROL_CAPTURE_SWITCH = "'Capture Switch'"
21
22# Default test configuration.
23_DEFAULT_CARD = '0'
24_DEFAULT_FREQUENCY = 1000
25_DEFAULT_MIXER_SETTINGS = [{'name': _CONTROL_MASTER, 'value': "100%"},
26                           {'name': _CONTROL_HEADPHONE, 'value': "100%"},
27                           {'name': _CONTROL_MIC_BOOST, 'value': "50%"},
28                           {'name': _CONTROL_PCM, 'value': "100%"},
29                           {'name': _CONTROL_DIGITAL, 'value': "100%"},
30                           {'name': _CONTROL_CAPTURE, 'value': "100%"},
31                           {'name': _CONTROL_CAPTURE_SWITCH, 'value': "on"}]
32
33_CONTROL_SPEAKER_DEVICE = ['x86-alex', 'x86-mario', 'x86-zgb']
34_CONTROL_SPEAKER_DEVICE_HP = ['stumpy', 'lumpy']
35
36_DEFAULT_NUM_CHANNELS = 2
37_DEFAULT_RECORD_DURATION = 15
38# Minimum RMS value to consider a "pass".
39_DEFAULT_SOX_RMS_THRESHOLD = 0.30
40
41# Regexp parsing sox output.
42_SOX_RMS_AMPLITUDE_RE = re.compile('RMS\s+amplitude:\s+(.+)')
43# Format used in sox commands.
44_SOX_FORMAT = '-t raw -b 16 -e signed -r 48000 -L'
45
46
47class RecordSampleThread(threading.Thread):
48    """Wraps the execution of arecord in a thread."""
49    def __init__(self, audio, duration, recordfile):
50        threading.Thread.__init__(self)
51        self._audio = audio
52        self._duration = duration
53        self._recordfile = recordfile
54
55    def run(self):
56        self._audio.record_sample(self._duration, self._recordfile)
57
58
59class desktopui_AudioFeedback(cros_ui_test.UITest):
60    version = 1
61
62    def setup(self):
63        self.job.setup_dep(['test_tones'])
64        self.job.setup_dep(['sox'])
65
66    def initialize(self,
67                   card=_DEFAULT_CARD,
68                   frequency=_DEFAULT_FREQUENCY,
69                   mixer_settings=_DEFAULT_MIXER_SETTINGS,
70                   num_channels=_DEFAULT_NUM_CHANNELS,
71                   record_duration=_DEFAULT_RECORD_DURATION,
72                   sox_min_rms=_DEFAULT_SOX_RMS_THRESHOLD):
73        """Setup the deps for the test.
74
75        Args:
76            card: The index of the sound card to use.
77            frequency: The frequency of the test tone that is looped back.
78            mixer_settings: Alsa control settings to apply to the mixer before
79                starting the test.
80            num_channels: The number of channels on the device to test.
81            record_duration: How long of a sample to record.
82            sox_min_rms: The minimum RMS value to consider a pass.
83
84        Raises:
85            error.TestError if the deps can't be run.
86        """
87        self._card = card
88        self._frequency = frequency
89        self._mixer_settings = mixer_settings
90        self._num_channels = num_channels
91        self._record_duration = record_duration
92        self._sox_min_rms = sox_min_rms
93
94        dep = 'sox'
95        dep_dir = os.path.join(self.autodir, 'deps', dep)
96        self.job.install_pkg(dep, 'dep', dep_dir)
97        self._sox_path = os.path.join(dep_dir, 'bin', dep)
98        self._sox_lib_path = os.path.join(dep_dir, 'lib')
99        if not (os.path.exists(self._sox_path) and
100                os.access(self._sox_path, os.X_OK)):
101            raise error.TestError(
102                '%s is not an executable' % self._sox_path)
103
104        super(desktopui_AudioFeedback, self).initialize()
105        self._test_url = 'http://127.0.0.1:8000/youtube.html'
106        self._testServer = httpd.HTTPListener(8000, docroot=self.bindir)
107        self._testServer.run()
108
109    def run_once(self):
110        self.set_mixer_controls()
111        noise_file = os.path.join(self.tmpdir, os.tmpnam())
112        logging.info('Noise file: %s' % noise_file)
113        self.record_sample(_DEFAULT_RECORD_DURATION, noise_file)
114        try:
115            for channel in xrange(0, self._num_channels):
116                self.loopback_test_one_channel(channel, noise_file)
117        finally:
118            if os.path.isfile(noise_file):
119                os.unlink(noise_file)
120
121    def play_video(self):
122        """Plays a Youtube video to record audio samples.
123
124           Skipping initial 60 seconds so we can ignore initial silence
125           in the video.
126        """
127        logging.info('Playing back youtube media file %s.' % self._test_url)
128        self.pyauto.NavigateToURL(self._test_url)
129        if not self.pyauto.WaitUntil(lambda: self.pyauto.ExecuteJavascript("""
130                    player_status = document.getElementById('player_status');
131                    window.domAutomationController.send(player_status.innerHTML);
132               """), expect_retval='player ready'):
133            raise error.TestError('Failed to load the Youtube player')
134        self.pyauto.ExecuteJavascript("""
135            ytplayer.pauseVideo();
136            ytplayer.seekTo(60, true);
137            ytplayer.playVideo();
138            window.domAutomationController.send('');
139        """)
140
141    def loopback_test_one_channel(self, channel, noise_file):
142        """Test loopback for a given channel.
143
144        Args:
145            channel: The channel to test loopback on.
146            noise_file: Noise profile to use for filtering, None to skip noise
147                filtering.
148        """
149        tmpfile = os.path.join(self.tmpdir, os.tmpnam())
150        record_thread = RecordSampleThread(self, self._record_duration, tmpfile)
151        self.play_video()
152        record_thread.start()
153        record_thread.join()
154
155        if noise_file is not None:
156            test_file = self.noise_reduce_file(tmpfile, noise_file)
157            os.unlink(tmpfile)
158        else:
159            test_file = tmpfile
160        try:
161            self.check_recorded_audio(test_file, channel)
162        finally:
163            if os.path.isfile(test_file):
164                os.unlink(test_file)
165
166    def record_sample(self, duration, tmpfile):
167        """Records a sample from the default input device.
168
169        Args:
170            duration: How long to record in seconds.
171            tmpfile: The file to record to.
172        """
173        cmd_rec = 'arecord -d %f -f dat %s' % (duration, tmpfile)
174        logging.info('Recording audio now for %f seconds.' % duration)
175        utils.system(cmd_rec)
176
177    def set_mixer_controls(self):
178        """Sets all mixer controls listed in the mixer settings on card."""
179        logging.info('Setting mixer control values on %s' % self._card)
180
181        # Speaker control settings may differ from device to device.
182        if self.pyauto.ChromeOSBoard() in _CONTROL_SPEAKER_DEVICE:
183            self._mixer_settings.append({'name': _CONTROL_SPEAKER,
184                                         'value': "0%"})
185        elif self.pyauto.ChromeOSBoard() in _CONTROL_SPEAKER_DEVICE_HP:
186            self._mixer_settings.append({'name': _CONTROL_SPEAKER_HP,
187                                         'value': "0%"})
188
189        for item in self._mixer_settings:
190            logging.info('Setting %s to %s on card %s' %
191                         (item['name'], item['value'], self._card))
192            cmd = 'amixer -c %s cset name=%s %s'
193            cmd = cmd % (self._card, item['name'], item['value'])
194            try:
195                utils.system(cmd)
196            except error.CmdError:
197                # A card is allowed to not support all the controls, so don't
198                # fail the test here if we get an error.
199                logging.info('amixer command failed: %s' % cmd)
200
201    def check_recorded_audio(self, infile, channel):
202        """Runs the sox command to check if we captured audio.
203
204        Note: if we captured any sufficient loud audio which can generate
205        the rms_value greater than the threshold value, test will pass.
206        TODO (rohitbm) : find a way to compare the recorded audio with
207                         an actual sample file.
208
209        Args:
210            infile: The file is to test for (strong) audio content via the RMS
211                    method.
212            channel: The audio channel to test.
213
214        Raises:
215            error.TestFail if the RMS amplitude of the recording isn't above
216                the threshold.
217        """
218        # Build up a pan value string for the sox command.
219        pan_values = '1' if channel == 0 else '0'
220        for pan_index in range(1, self._num_channels):
221            if channel == pan_index:
222                pan_values += ',1'
223            else:
224                pan_values += ',0'
225        # Set up the sox commands.
226        os.environ['LD_LIBRARY_PATH'] = self._sox_lib_path
227        sox_mixer_cmd = '%s -c 2 %s %s -c 1 %s - mixer %s'
228        sox_mixer_cmd = sox_mixer_cmd % (self._sox_path, _SOX_FORMAT, infile,
229                                         _SOX_FORMAT, pan_values)
230        stat_cmd = '%s -c 1 %s - -n stat 2>&1' % (self._sox_path, _SOX_FORMAT)
231        sox_cmd = '%s | %s' % (sox_mixer_cmd, stat_cmd)
232        logging.info('running %s' % sox_cmd)
233        sox_output = utils.system_output(sox_cmd, retain_output=True)
234        # Find the RMS value line and check that it is above threshold.
235        sox_rms_status = False
236        for rms_line in sox_output.split('\n'):
237            m = _SOX_RMS_AMPLITUDE_RE.match(rms_line)
238            if m is not None:
239                sox_rms_status = True
240                rms_val = float(m.group(1))
241                logging.info('Got audio RMS value of %f. Minimum pass is %f.' %
242                             (rms_val, self._sox_min_rms))
243                if rms_val < self._sox_min_rms:
244                    raise error.TestError(
245                        'Audio RMS value %f too low. Minimum pass is %f.' %
246                        (rms_val, self._sox_min_rms))
247        # In case sox didn't return an RMS value.
248        if not sox_rms_status:
249            raise error.TestError(
250                'Failed to generate an audio RMS value from playback.')
251
252    def noise_reduce_file(self, test_file, noise_file):
253        """Runs the sox command to reduce the noise.
254
255        Performs noise reduction on test_file using the noise profile from
256        noise_file.
257
258        Args:
259            test_file: The file to noise reduce.
260            noise_file: The file containing the noise profile.
261                        This can be created by recording silence.
262
263        Returns:
264            The name of the file containing the noise-reduced data.
265        """
266        out_file = os.path.join(self.tmpdir, os.tmpnam())
267        os.environ['LD_LIBRARY_PATH'] = self._sox_lib_path
268        prof_cmd = '%s -c 2 %s %s -n noiseprof' % (self._sox_path,
269                                                   _SOX_FORMAT,
270                                                   noise_file)
271        reduce_cmd = ('%s -c 2 %s %s -c 2 %s %s noisered' %
272                          (self._sox_path, _SOX_FORMAT, test_file, _SOX_FORMAT,
273                           out_file))
274        utils.system('%s | %s' % (prof_cmd, reduce_cmd))
275        return out_file
276