1#
2# Copyright 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""Script for sending testing parameters and commands to a Bluetooth device.
18
19This script provides a simple shell interface for sending data at run-time to a
20Bluetooth device. It is intended to be used in tandem with the test vendor
21library project.
22
23Usage:
24  Option A: Script
25    1. Run build_and_run.sh in scripts/ with the --test-channel flag set and the
26    port to use for the test channel.
27  Option B: Manual
28    1. Choose a port to use for the test channel. Use 'adb forward tcp:<port>
29    tcp:<port>' to forward the port to the device.
30    2. In a separate shell, build and push the test vendor library to the device
31    using the script mentioned in option A (i.e. without the --test-channel flag
32    set).
33    3. Once logcat has started, turn Bluetooth on from the device.
34    4. Run this program, in the shell from step 1,  the port, also from step 1,
35    as arguments.
36"""
37
38#!/usr/bin/env python
39
40import cmd
41import random
42import socket
43import string
44import struct
45import sys
46
47DEVICE_NAME_LENGTH = 6
48DEVICE_ADDRESS_LENGTH = 6
49
50# Used to generate fake device names and addresses during discovery.
51def generate_random_name():
52  return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \
53    string.digits) for _ in range(DEVICE_NAME_LENGTH))
54
55def generate_random_address():
56  return ''.join(random.SystemRandom().choice(string.digits) for _ in \
57    range(DEVICE_ADDRESS_LENGTH))
58
59class Connection(object):
60  """Simple wrapper class for a socket object.
61
62  Attributes:
63    socket: The underlying socket created for the specified address and port.
64  """
65
66  def __init__(self, port):
67    self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
68    self._socket.connect(('localhost', port))
69
70  def close(self):
71    self._socket.close()
72
73  def send(self, data):
74    self._socket.sendall(data)
75
76class TestChannel(object):
77  """Checks outgoing commands and sends them once verified.
78
79  Attributes:
80    connection: The connection to the test vendor library that commands are sent
81    on.
82  """
83
84  def __init__(self, port):
85    self._connection = Connection(port)
86    self._discovered_devices = DeviceManager()
87
88  def discover_new_device(self, name=None, address=None):
89    device = Device(name, address)
90    self._discovered_devices.add_device(device)
91    return device
92
93  def close(self):
94    self._connection.close()
95
96  def send_command(self, name, args):
97    name_size = len(name)
98    args_size = len(args)
99    self.lint_command(name, args, name_size, args_size)
100    encoded_name = chr(name_size) + name
101    encoded_args = chr(args_size) + ''.join(chr(len(arg)) + arg for arg in args)
102    command = encoded_name + encoded_args
103    self._connection.send(command)
104
105  def lint_command(self, name, args, name_size, args_size):
106    assert name_size == len(name) and args_size == len(args)
107    try:
108      name.encode('utf-8')
109      for arg in args:
110        arg.encode('utf-8')
111    except UnicodeError:
112      print 'Unrecognized characters.'
113      raise
114    if name_size > 255 or args_size > 255:
115      raise ValueError  # Size must be encodable in one octet.
116    for arg in args:
117      if len(arg) > 255:
118        raise ValueError  # Size must be encodable in one octet.
119
120class DeviceManager(object):
121  """Maintains the active fake devices that have been "discovered".
122
123  Attributes:
124    device_list: Maps device addresses (keys) to devices (values).
125  """
126
127  def __init__(self):
128    self.device_list = {}
129
130  def add_device(self, device):
131    self.device_list[device.get_address()] = device
132
133class Device(object):
134  """A fake device to be returned in inquiry and scan results. Note that if an
135  explicit name or address is not provided, a random string of characters
136  is used.
137
138  Attributes:
139    name: The device name for use in extended results.
140    address: The BD address of the device.
141  """
142
143  def __init__(self, name=None, address=None):
144    # TODO(dennischeng): Generate device properties more robustly.
145    self._name = generate_random_name() if name is None else name
146    self._address = generate_random_address() if address is None else address
147
148  def get_name(self):
149    return self._name
150
151  def get_address(self):
152    return self._address
153
154class TestChannelShell(cmd.Cmd):
155  """Shell for sending test channel data to controller.
156
157  Manages the test channel to the controller and defines a set of commands the
158  user can send to the controller as well. These commands are processed parallel
159  to commands sent from the device stack and used to provide additional
160  debugging/testing capabilities.
161
162  Attributes:
163    test_channel: The communication channel to send data to the controller.
164  """
165
166  def __init__(self, test_channel):
167    print 'Type \'help\' for more information.'
168    cmd.Cmd.__init__(self)
169    self._test_channel = test_channel
170
171  def do_clear(self, args):
172    """
173    Arguments: None.
174    Resets the controller to its original, unmodified state.
175    """
176    self._test_channel.send_command('CLEAR', [])
177
178  def do_clear_event_delay(self, args):
179    """
180    Arguments: None.
181    Clears the response delay set by set_event_delay.
182    """
183    self._test_channel.send_command('CLEAR_EVENT_DELAY', args.split())
184
185  def do_discover(self, args):
186    """
187    Arguments: name_1 name_2 ...
188    Sends an inquiry result for named device(s). If no names are provided, a
189    random name is used instead.
190    """
191    if len(args) == 0:
192      args = generate_random_name()
193    device_list = [self._test_channel.discover_new_device(arg) for arg in \
194                   args.split()]
195    device_names_and_addresses = []
196    for device in device_list:
197      device_names_and_addresses.append(device.get_name())
198      device_names_and_addresses.append(device.get_address())
199    self._test_channel.send_command('DISCOVER', device_names_and_addresses)
200
201  def do_set_event_delay(self, args):
202    """
203    Arguments: interval_in_ms
204    Sets the response delay for all event packets sent from the controller back
205    to the HCI.
206    """
207    self._test_channel.send_command('SET_EVENT_DELAY', args.split())
208
209  def do_timeout_all(self, args):
210    """
211    Arguments: None.
212    Causes all HCI commands to timeout.
213    """
214    self._test_channel.send_command('TIMEOUT_ALL', [])
215
216  def do_quit(self, args):
217    """
218    Arguments: None.
219    Exits the test channel.
220    """
221    self._test_channel.send_command('CLOSE_TEST_CHANNEL', [])
222    self._test_channel.close()
223    print 'Goodbye.'
224    return True
225
226def main(argv):
227  if len(argv) != 2:
228    print 'Usage: python test_channel.py [port]'
229    return
230  try:
231    port = int(argv[1])
232  except ValueError:
233    print 'Error parsing port.'
234  else:
235    try:
236      test_channel = TestChannel(port)
237    except socket.error, e:
238      print 'Error connecting to socket: %s' % e
239    except:
240      print 'Error creating test channel (check argument).'
241    else:
242      test_channel_shell = TestChannelShell(test_channel)
243      test_channel_shell.prompt = '$ '
244      test_channel_shell.cmdloop()
245
246if __name__ == '__main__':
247  main(sys.argv)
248