1# Copyright (c) 2012 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import os
7import tempfile
8import time
9
10from autotest_lib.client.bin import utils
11from autotest_lib.client.common_lib import error
12from autotest_lib.client.cros.audio import audio_helper
13from autotest_lib.client.cros.audio import cmd_utils
14from autotest_lib.client.cros.audio import cras_utils
15from autotest_lib.client.cros.audio import sox_utils
16
17
18_TEST_TONE_ONE = 440
19_TEST_TONE_TWO = 523
20
21class audio_CRASFormatConversion(audio_helper.cras_rms_test):
22    version = 1
23
24
25    def play_sine_tone(self, frequence, rate):
26        """Plays a sine tone by cras and returns the processes.
27        Args:
28            frequence: the frequence of the sine wave.
29            rate: the sampling rate.
30        """
31        p1 = cmd_utils.popen(
32            sox_utils.generate_sine_tone_cmd(
33                    filename='-', rate=rate, frequence=frequence, gain=-6),
34            stdout=cmd_utils.PIPE)
35        p2 = cmd_utils.popen(
36            cras_utils.playback_cmd(playback_file='-', rate=rate),
37            stdin=p1.stdout)
38        return [p1, p2]
39
40
41    def wait_for_active_stream_count(self, expected_count):
42        utils.poll_for_condition(
43                lambda: cras_utils.get_active_stream_count() == expected_count,
44                exception=error.TestError(
45                        'Timeout waiting active stream count to become %d' %
46                        expected_count),
47                timeout=1, sleep_interval=0.05)
48
49    def loopback(self, noise_profile, primary, secondary):
50        """Gets the rms value of the recorded audio of playing two different
51        tones (the 440 and 523 Hz sine wave) at the specified sampling rate.
52
53        @param noise_profile: The noise profile which is used to reduce the
54                              noise of the recored audio.
55        @param primary: The first sample rate, HW will be set to this.
56        @param secondary: The second sample rate, will be SRC'd to the first.
57        """
58        popens = []
59
60        record_file = os.path.join(self.resultsdir,
61                'record-%s-%s.wav' % (primary, secondary))
62
63        # There should be no other active streams.
64        self.wait_for_active_stream_count(0)
65
66        # Start with the primary sample rate, then add the secondary.  This
67        # causes the secondary to be SRC'd to the primary rate.
68        try:
69            # Play the first audio stream and make sure it has been played
70            popens += self.play_sine_tone(_TEST_TONE_ONE, primary)
71            self.wait_for_active_stream_count(1)
72
73            # Play the second audio stream and make sure it has been played
74            popens += self.play_sine_tone(_TEST_TONE_TWO, secondary)
75            self.wait_for_active_stream_count(2)
76
77            cras_utils.capture(record_file, duration=1, rate=44100)
78
79            # Make sure the playback is still in good shape
80            if any(p.poll() is not None for p in popens):
81                # We will log more details later in finally.
82                raise error.TestFail('process unexpectly stopped')
83
84            reduced_file = tempfile.NamedTemporaryFile()
85            sox_utils.noise_reduce(
86                    record_file, reduced_file.name, noise_profile, rate=44100)
87
88            sox_stat = sox_utils.get_stat(reduced_file.name, rate=44100)
89
90            logging.info('The sox stat of (%d, %d) is %s',
91                         primary, secondary, str(sox_stat))
92
93            return sox_stat.rms
94
95        finally:
96            cmd_utils.kill_or_log_returncode(*popens)
97
98    def run_once(self, test_sample_rates):
99        """Runs the format conversion test.
100        """
101
102        rms_values = {}
103
104        # Record silence to use as the noise profile.
105        noise_file = os.path.join(self.resultsdir, "noise.wav")
106        noise_profile = tempfile.NamedTemporaryFile()
107        cras_utils.capture(noise_file, duration=1)
108        sox_utils.noise_profile(noise_file, noise_profile.name)
109
110        # Try all sample rate pairs.
111        for primary in test_sample_rates:
112            for secondary in test_sample_rates:
113                key = 'rms_value_%d_%d' % (primary, secondary)
114                rms_values[key] = self.loopback(
115                        noise_profile.name, primary, secondary)
116
117        # Record at all sample rates
118        record_file = tempfile.NamedTemporaryFile()
119        for rate in test_sample_rates:
120            cras_utils.capture(record_file.name, duration=1, rate=rate)
121
122        # Add min_rms_value to the result
123        rms_values['min_rms_value'] = min(rms_values.values())
124
125        self.write_perf_keyval(rms_values)
126