1# Copyright (c) 2013 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""This module provides cras audio utilities."""
6
7import logging
8import re
9
10from autotest_lib.client.cros.audio import cmd_utils
11
12_CRAS_TEST_CLIENT = '/usr/bin/cras_test_client'
13
14
15class CrasUtilsError(Exception):
16    pass
17
18
19def playback(blocking=True, *args, **kargs):
20    """A helper function to execute the playback_cmd.
21
22    @param blocking: Blocks this call until playback finishes.
23    @param args: args passed to playback_cmd.
24    @param kargs: kargs passed to playback_cmd.
25
26    @returns: The process running the playback command. Note that if the
27              blocking parameter is true, this will return a finished process.
28    """
29    process = cmd_utils.popen(playback_cmd(*args, **kargs))
30    if blocking:
31        cmd_utils.wait_and_check_returncode(process)
32    return process
33
34
35def capture(*args, **kargs):
36    """A helper function to execute the capture_cmd.
37
38    @param args: args passed to capture_cmd.
39    @param kargs: kargs passed to capture_cmd.
40
41    """
42    cmd_utils.execute(capture_cmd(*args, **kargs))
43
44
45def playback_cmd(playback_file, block_size=None, duration=None,
46                 channels=2, rate=48000):
47    """Gets a command to playback a file with given settings.
48
49    @param playback_file: the name of the file to play. '-' indicates to
50                          playback raw audio from the stdin.
51    @param block_size: the number of frames per callback(dictates latency).
52    @param duration: seconds to playback
53    @param channels: number of channels.
54    @param rate: the sampling rate
55
56    @returns: The command args put in a list of strings.
57
58    """
59    args = [_CRAS_TEST_CLIENT]
60    args += ['--playback_file', playback_file]
61    if block_size is not None:
62        args += ['--block_size', str(block_size)]
63    if duration is not None:
64        args += ['--duration', str(duration)]
65    args += ['--num_channels', str(channels)]
66    args += ['--rate', str(rate)]
67    return args
68
69
70def capture_cmd(
71        capture_file, block_size=None, duration=10, channels=1, rate=48000):
72    """Gets a command to capture the audio into the file with given settings.
73
74    @param capture_file: the name of file the audio to be stored in.
75    @param block_size: the number of frames per callback(dictates latency).
76    @param duration: seconds to record. If it is None, duration is not set,
77                     and command will keep capturing audio until it is
78                     terminated.
79    @param channels: number of channels.
80    @param rate: the sampling rate.
81
82    @returns: The command args put in a list of strings.
83
84    """
85    args = [_CRAS_TEST_CLIENT]
86    args += ['--capture_file', capture_file]
87    if block_size is not None:
88        args += ['--block_size', str(block_size)]
89    if duration is not None:
90        args += ['--duration', str(duration)]
91    args += ['--num_channels', str(channels)]
92    args += ['--rate', str(rate)]
93    return args
94
95
96def loopback(*args, **kargs):
97    """A helper function to execute loopback_cmd.
98
99    @param args: args passed to loopback_cmd.
100    @param kargs: kargs passed to loopback_cmd.
101
102    """
103
104    cmd_utils.execute(loopback_cmd(*args, **kargs))
105
106
107def loopback_cmd(output_file, duration=10, channels=2, rate=48000):
108    """Gets a command to record the loopback.
109
110    @param output_file: The name of the file the loopback to be stored in.
111    @param channels: The number of channels of the recorded audio.
112    @param duration: seconds to record.
113    @param rate: the sampling rate.
114
115    @returns: The command args put in a list of strings.
116
117    """
118    args = [_CRAS_TEST_CLIENT]
119    args += ['--loopback_file', output_file]
120    args += ['--duration_seconds', str(duration)]
121    args += ['--num_channels', str(channels)]
122    args += ['--rate', str(rate)]
123    return args
124
125
126def get_cras_nodes_cmd():
127    """Gets a command to query the nodes from Cras.
128
129    @returns: The command to query nodes information from Cras using dbus-send.
130
131    """
132    return ('dbus-send --system --type=method_call --print-reply '
133            '--dest=org.chromium.cras /org/chromium/cras '
134            'org.chromium.cras.Control.GetNodes')
135
136
137def set_system_volume(volume):
138    """Set the system volume.
139
140    @param volume: the system output vlume to be set(0 - 100).
141
142    """
143    get_cras_control_interface().SetOutputVolume(volume)
144
145
146def set_node_volume(node_id, volume):
147    """Set the volume of the given output node.
148
149    @param node_id: the id of the output node to be set the volume.
150    @param volume: the volume to be set(0-100).
151
152    """
153    get_cras_control_interface().SetOutputNodeVolume(node_id, volume)
154
155
156def set_capture_gain(gain):
157    """Set the system capture gain.
158
159    @param gain the capture gain in db*100 (100 = 1dB)
160
161    """
162    get_cras_control_interface().SetInputGain(gain)
163
164
165def get_cras_control_interface(private=False):
166    """Gets Cras DBus control interface.
167
168    @param private: Set to True to use a new instance for dbus.SystemBus
169                    instead of the shared instance.
170
171    @returns: A dBus.Interface object with Cras Control interface.
172
173    @raises: ImportError if this is not called on Cros device.
174
175    """
176    try:
177        import dbus
178    except ImportError, e:
179        logging.exception(
180                'Can not import dbus: %s. This method should only be '
181                'called on Cros device.', e)
182        raise
183    bus = dbus.SystemBus(private=private)
184    cras_object = bus.get_object('org.chromium.cras', '/org/chromium/cras')
185    return dbus.Interface(cras_object, 'org.chromium.cras.Control')
186
187
188def get_cras_nodes():
189    """Gets nodes information from Cras.
190
191    @returns: A dict containing information of each node.
192
193    """
194    return get_cras_control_interface().GetNodes()
195
196
197def get_selected_nodes():
198    """Gets selected output nodes and input nodes.
199
200    @returns: A tuple (output_nodes, input_nodes) where each
201              field is a list of selected node IDs returned from Cras DBus API.
202              Note that there may be multiple output/input nodes being selected
203              at the same time.
204
205    """
206    output_nodes = []
207    input_nodes = []
208    nodes = get_cras_nodes()
209    for node in nodes:
210        if node['Active']:
211            if node['IsInput']:
212                input_nodes.append(node['Id'])
213            else:
214                output_nodes.append(node['Id'])
215    return (output_nodes, input_nodes)
216
217
218def set_selected_output_node_volume(volume):
219    """Sets the selected output node volume.
220
221    @param volume: the volume to be set (0-100).
222
223    """
224    selected_output_node_ids, _ = get_selected_nodes()
225    for node_id in selected_output_node_ids:
226        set_node_volume(node_id, volume)
227
228
229def get_active_stream_count():
230    """Gets the number of active streams.
231
232    @returns: The number of active streams.
233
234    """
235    return int(get_cras_control_interface().GetNumberOfActiveStreams())
236
237
238def set_system_mute(is_mute):
239    """Sets the system mute switch.
240
241    @param is_mute: Set True to mute the system playback.
242
243    """
244    get_cras_control_interface().SetOutputMute(is_mute)
245
246
247def set_capture_mute(is_mute):
248    """Sets the capture mute switch.
249
250    @param is_mute: Set True to mute the capture.
251
252    """
253    get_cras_control_interface().SetInputMute(is_mute)
254
255
256def node_type_is_plugged(node_type, nodes_info):
257    """Determine if there is any node of node_type plugged.
258
259    This method is used in has_loopback_dongle in cros_host, where
260    the call is executed on autotest server. Use get_cras_nodes instead if
261    the call can be executed on Cros device.
262
263    Since Cras only reports the plugged node in GetNodes, we can
264    parse the return value to see if there is any node with the given type.
265    For example, if INTERNAL_MIC is of intereset, the pattern we are
266    looking for is:
267
268    dict entry(
269       string "Type"
270       variant             string "INTERNAL_MIC"
271    )
272
273    @param node_type: A str representing node type defined in CRAS_NODE_TYPES.
274    @param nodes_info: A str containing output of command get_nodes_cmd.
275
276    @returns: True if there is any node of node_type plugged. False otherwise.
277
278    """
279    match = re.search(r'string "Type"\s+variant\s+string "%s"' % node_type,
280                      nodes_info)
281    return True if match else False
282
283
284# Cras node types reported from Cras DBus control API.
285CRAS_OUTPUT_NODE_TYPES = ['HEADPHONE', 'INTERNAL_SPEAKER', 'HDMI', 'USB',
286                          'BLUETOOTH', 'LINEOUT', 'UNKNOWN']
287CRAS_INPUT_NODE_TYPES = ['MIC', 'INTERNAL_MIC', 'USB', 'BLUETOOTH',
288                         'POST_DSP_LOOPBACK', 'POST_MIX_LOOPBACK', 'UNKNOWN',
289                         'KEYBOARD_MIC', 'HOTWORD', 'FRONT_MIC', 'REAR_MIC']
290CRAS_NODE_TYPES = CRAS_OUTPUT_NODE_TYPES + CRAS_INPUT_NODE_TYPES
291
292
293def get_filtered_node_types(callback):
294    """Returns the pair of filtered output node types and input node types.
295
296    @param callback: A callback function which takes a node as input parameter
297                     and filter the node based on its return value.
298
299    @returns: A tuple (output_node_types, input_node_types) where each
300              field is a list of node types defined in CRAS_NODE_TYPES,
301              and their 'attribute_name' is True.
302
303    """
304    output_node_types = []
305    input_node_types = []
306    nodes = get_cras_nodes()
307    for node in nodes:
308        if callback(node):
309            node_type = str(node['Type'])
310            if node_type not in CRAS_NODE_TYPES:
311                raise RuntimeError(
312                        'node type %s is not valid' % node_type)
313            if node['IsInput']:
314                input_node_types.append(node_type)
315            else:
316                output_node_types.append(node_type)
317    return (output_node_types, input_node_types)
318
319
320def get_selected_node_types():
321    """Returns the pair of active output node types and input node types.
322
323    @returns: A tuple (output_node_types, input_node_types) where each
324              field is a list of selected node types defined in CRAS_NODE_TYPES.
325
326    """
327    def is_selected(node):
328        """Checks if a node is selected.
329
330        A node is selected if its Active attribute is True.
331
332        @returns: True is a node is selected, False otherwise.
333
334        """
335        return node['Active']
336
337    return get_filtered_node_types(is_selected)
338
339
340def get_selected_input_device_name():
341    """Returns the device name of the active input node.
342
343    @returns: device name string. E.g. kbl_r5514_5663_max: :0,1
344    """
345    nodes = get_cras_nodes()
346    for node in nodes:
347        if node['Active'] and node['IsInput']:
348            return node['DeviceName']
349    return None
350
351
352def get_selected_output_device_name():
353    """Returns the device name of the active output node.
354
355    @returns: device name string. E.g. mtk-rt5650: :0,0
356    """
357    nodes = get_cras_nodes()
358    for node in nodes:
359        if node['Active'] and not node['IsInput']:
360            return node['DeviceName']
361    return None
362
363
364def get_plugged_node_types():
365    """Returns the pair of plugged output node types and input node types.
366
367    @returns: A tuple (output_node_types, input_node_types) where each
368              field is a list of plugged node types defined in CRAS_NODE_TYPES.
369
370    """
371    def is_plugged(node):
372        """Checks if a node is plugged and is not unknown node.
373
374        Cras DBus API only reports plugged node, so every node reported by Cras
375        DBus API is plugged. However, we filter out UNKNOWN node here because
376        the existence of unknown node depends on the number of redundant
377        playback/record audio device created on audio card. Also, the user of
378        Cras will ignore unknown nodes.
379
380        @returns: True if a node is plugged and is not an UNKNOWN node.
381
382        """
383        return node['Type'] != 'UNKNOWN'
384
385    return get_filtered_node_types(is_plugged)
386
387
388def set_selected_node_types(output_node_types, input_node_types):
389    """Sets selected node types.
390
391    @param output_node_types: A list of output node types. None to skip setting.
392    @param input_node_types: A list of input node types. None to skip setting.
393
394    """
395    if len(output_node_types) == 1:
396        set_single_selected_output_node(output_node_types[0])
397    elif output_node_types:
398        set_selected_output_nodes(output_node_types)
399    if len(input_node_types) == 1:
400        set_single_selected_input_node(input_node_types[0])
401    elif input_node_types:
402        set_selected_input_nodes(input_node_types)
403
404
405def set_single_selected_output_node(node_type):
406    """Sets one selected output node.
407
408    Note that Chrome UI uses SetActiveOutputNode of Cras DBus API
409    to select one output node.
410
411    @param node_type: A node type.
412
413    """
414    nodes = get_cras_nodes()
415    for node in nodes:
416        if node['IsInput']:
417            continue
418        if node['Type'] == node_type:
419            set_active_output_node(node['Id'])
420
421
422def set_single_selected_input_node(node_type):
423    """Sets one selected input node.
424
425    Note that Chrome UI uses SetActiveInputNode of Cras DBus API
426    to select one input node.
427
428    @param node_type: A node type.
429
430    """
431    nodes = get_cras_nodes()
432    for node in nodes:
433        if not node['IsInput']:
434            continue
435        if node['Type'] == node_type:
436            set_active_input_node(node['Id'])
437
438
439def set_selected_output_nodes(types):
440    """Sets selected output node types.
441
442    Note that Chrome UI uses SetActiveOutputNode of Cras DBus API
443    to select one output node. Here we use add/remove active output node
444    to support multiple nodes.
445
446    @param types: A list of output node types.
447
448    """
449    nodes = get_cras_nodes()
450    for node in nodes:
451        if node['IsInput']:
452            continue
453        if node['Type'] in types:
454            add_active_output_node(node['Id'])
455        elif node['Active']:
456            remove_active_output_node(node['Id'])
457
458
459def set_selected_input_nodes(types):
460    """Sets selected input node types.
461
462    Note that Chrome UI uses SetActiveInputNode of Cras DBus API
463    to select one input node. Here we use add/remove active input node
464    to support multiple nodes.
465
466    @param types: A list of input node types.
467
468    """
469    nodes = get_cras_nodes()
470    for node in nodes:
471        if not node['IsInput']:
472            continue
473        if node['Type'] in types:
474            add_active_input_node(node['Id'])
475        elif node['Active']:
476            remove_active_input_node(node['Id'])
477
478
479def set_active_input_node(node_id):
480    """Sets one active input node.
481
482    @param node_id: node id.
483
484    """
485    get_cras_control_interface().SetActiveInputNode(node_id)
486
487
488def set_active_output_node(node_id):
489    """Sets one active output node.
490
491    @param node_id: node id.
492
493    """
494    get_cras_control_interface().SetActiveOutputNode(node_id)
495
496
497def add_active_output_node(node_id):
498    """Adds an active output node.
499
500    @param node_id: node id.
501
502    """
503    get_cras_control_interface().AddActiveOutputNode(node_id)
504
505
506def add_active_input_node(node_id):
507    """Adds an active input node.
508
509    @param node_id: node id.
510
511    """
512    get_cras_control_interface().AddActiveInputNode(node_id)
513
514
515def remove_active_output_node(node_id):
516    """Removes an active output node.
517
518    @param node_id: node id.
519
520    """
521    get_cras_control_interface().RemoveActiveOutputNode(node_id)
522
523
524def remove_active_input_node(node_id):
525    """Removes an active input node.
526
527    @param node_id: node id.
528
529    """
530    get_cras_control_interface().RemoveActiveInputNode(node_id)
531
532
533def get_node_id_from_node_type(node_type, is_input):
534    """Gets node id from node type.
535
536    @param types: A node type defined in CRAS_NODE_TYPES.
537    @param is_input: True if the node is input. False otherwise.
538
539    @returns: A string for node id.
540
541    @raises: CrasUtilsError: if unique node id can not be found.
542
543    """
544    nodes = get_cras_nodes()
545    find_ids = []
546    for node in nodes:
547        if node['Type'] == node_type and node['IsInput'] == is_input:
548            find_ids.append(node['Id'])
549    if len(find_ids) != 1:
550        raise CrasUtilsError(
551                'Can not find unique node id from node type %s' % node_type)
552    return find_ids[0]
553
554def get_active_node_volume():
555    """Returns volume from active node.
556
557    @returns: int for volume
558
559    @raises: CrasUtilsError: if node volume cannot be found.
560    """
561    nodes = get_cras_nodes()
562    for node in nodes:
563        if node['Active'] == 1 and node['IsInput'] == 0:
564            return int(node['NodeVolume'])
565    raise CrasUtilsError('Cannot find active node volume from nodes.')
566