1#!/usr/bin/env python 2 3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7import at_channel 8 9import fcntl 10import functools 11import glib 12import logging 13import mox 14import os 15import tempfile 16import unittest 17 18import task_loop 19 20class ATChannelTestCase(unittest.TestCase): 21 """ 22 Test fixture for ATChannel class. 23 24 """ 25 26 def setUp(self): 27 self.mox = mox.Mox() 28 29 master, slave = os.openpty() 30 self._at_channel = at_channel.ATChannel( 31 self._recieve_command_local_callback, slave, 'test') 32 33 # Replace the channel inside _at_channel with a tempfile 34 # We will use the tempfile to simulate a tty pair. 35 os.close(master) 36 os.close(slave) 37 self._channel_file = tempfile.TemporaryFile(mode = 'w+') 38 # These properties are a copy of the properties set in ATChannel for the 39 # tty pair. 40 flags = fcntl.fcntl(self._channel_file.fileno(), fcntl.F_GETFL) 41 flags = flags | os.O_NONBLOCK 42 fcntl.fcntl(self._channel_file.fileno(), fcntl.F_SETFL, flags) 43 self._at_channel._channel = self._channel_file.fileno() 44 # We need to seek() to the beginning of the file to simulate tty read. 45 # So remember the head of the file. 46 self._channel_file_head = self._channel_file.tell() 47 48 # Also mock out the task_loop 49 self._mox_task_loop = self.mox.CreateMock(task_loop.TaskLoop) 50 self._at_channel._task_loop = self._mox_task_loop 51 52 53 def tearDown(self): 54 self._channel_file.close() 55 56 # ########################################################################## 57 # Tests 58 59 def test_successful_send(self): 60 """ 61 Test that a single AT command can be sent on the channel. 62 63 """ 64 payload = 'A not so huge AT+CEREG command.' 65 self._at_channel.send(payload) 66 received_command = self._recieve_command_remote() 67 self.assertTrue(received_command.endswith('\r\n')) 68 self.assertEqual(payload.strip(), received_command.strip()) 69 70 # Change the AT command guard strings and check again. 71 self._at_channel.at_prefix = '$$' 72 self._at_channel.at_suffix = '##' 73 payload = 'A not so huge AT+CEREG command.' 74 self._at_channel.send(payload) 75 received_command = self._recieve_command_remote() 76 self.assertTrue(received_command.startswith('$$')) 77 self.assertTrue(received_command.endswith('##')) 78 self.assertEqual(payload.strip(), 79 received_command.strip('$$').strip('##')) 80 81 82 def test_recieve_single_at_command(self): 83 """ 84 Test that a single AT command can be received together on the channel. 85 86 """ 87 payload = 'We send you our AT+good wishes too!\r\n' 88 callback = lambda channel, payload: None 89 self._at_channel._receiver_callback = callback 90 self._mox_task_loop.post_task(callback, payload.strip()) 91 self.mox.ReplayAll() 92 self._send_command_remote(payload) 93 self._at_channel._handle_channel_cb(self._channel_file.fileno(), 94 glib.IO_IN) 95 self.mox.VerifyAll() 96 97 98 def test_receive_at_commands_differet_terminators(self): 99 """ 100 Test that AT commands are recieved correctly when different supported 101 termination strings are being used. 102 103 """ 104 # ; is a continuation marker. AT1;2 == AT1\r\nAT2 105 payloads = ['AT1\r\nA', 'T2\rA', 'T3\nA', 'T4;', '5\r\n'] 106 callback = lambda channel, payload: None 107 self._at_channel._receiver_callback = callback 108 self._mox_task_loop.post_task(callback, 'AT1') 109 self._mox_task_loop.post_task(callback, 'AT2') 110 self._mox_task_loop.post_task(callback, 'AT3') 111 self._mox_task_loop.post_task(callback, 'AT4') 112 self._mox_task_loop.post_task(callback, 'AT5') 113 114 self.mox.ReplayAll() 115 for payload in payloads: 116 self._send_command_remote(payload) 117 self._at_channel._handle_channel_cb(self._channel_file.fileno(), 118 glib.IO_IN) 119 self.mox.VerifyAll() 120 121 122 def test_recieve_at_commands_in_parts(self): 123 """ 124 Test that a multiple AT commands can be received in parts on the 125 channel. 126 127 """ 128 payloads = ['AT1', '11\r\n', '\r\nAT22', '2\r\nAT333', '\r\n'] 129 callback = lambda channel, payload: None 130 self._at_channel._receiver_callback = callback 131 self._mox_task_loop.post_task(callback, 'AT111') 132 self._mox_task_loop.post_task(callback, 'AT222') 133 self._mox_task_loop.post_task(callback, 'AT333') 134 135 self.mox.ReplayAll() 136 for payload in payloads: 137 self._send_command_remote(payload) 138 self._at_channel._handle_channel_cb(self._channel_file.fileno(), 139 glib.IO_IN) 140 self.mox.VerifyAll() 141 142 143 def test_recieve_long_at_commands(self): 144 """ 145 Test that a multiple AT commands can be received in parts on the 146 channel. 147 148 """ 149 payloads = ['AT1+', 150 '123456789\r\nAT2+123456789\r\nAT3+1234567', 151 '89\r\n'] 152 callback = lambda channel, payload: None 153 self._at_channel._receiver_callback = callback 154 self._mox_task_loop.post_task(callback, 'AT1+123456789') 155 self._mox_task_loop.post_task(callback, 'AT2+123456789') 156 self._mox_task_loop.post_task(callback, 'AT3+123456789') 157 158 self.mox.ReplayAll() 159 at_channel.CHANNEL_READ_CHUNK_SIZE = 4 160 for payload in payloads: 161 self._send_command_remote(payload) 162 self._at_channel._handle_channel_cb(self._channel_file.fileno(), 163 glib.IO_IN) 164 self.mox.VerifyAll() 165 166 # ########################################################################## 167 # Helper functions 168 169 def _clean_channel_file(self): 170 """ 171 Clean the tempfile used to simulate tty, and reset the r/w head. 172 173 """ 174 self._channel_file.truncate(0) 175 self._channel_file_head = self._channel_file.tell() 176 177 178 def _send_command_remote(self, payload): 179 """ 180 Simulate a command being sent from the remote tty port. 181 182 @param payload: The command to send. 183 184 """ 185 self._clean_channel_file() 186 self._channel_file.write(payload) 187 self._channel_file.flush() 188 self._channel_file.seek(self._channel_file_head) 189 190 191 def _recieve_command_remote(self): 192 """ 193 Simluate a command being received at the remote tty port. 194 195 """ 196 self._channel_file.flush() 197 self._channel_file.seek(self._channel_file_head) 198 payload_list = [] 199 for buf in iter(functools.partial(self._channel_file.read, 128), ''): 200 payload_list.append(buf) 201 self._clean_channel_file() 202 return ''.join(payload_list) 203 204 205 def _recieve_command_local_callback(self, payload): 206 pass 207 208 209if __name__ == '__main__': 210 logging.basicConfig(level=logging.DEBUG) 211 unittest.main() 212