1#!/usr/bin/env python
2
3# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7import at_channel
8
9import fcntl
10import functools
11import glib
12import logging
13import mox
14import os
15import tempfile
16import unittest
17
18import task_loop
19
20class ATChannelTestCase(unittest.TestCase):
21    """
22    Test fixture for ATChannel class.
23
24    """
25
26    def setUp(self):
27        self.mox = mox.Mox()
28
29        master, slave = os.openpty()
30        self._at_channel = at_channel.ATChannel(
31                self._recieve_command_local_callback, slave, 'test')
32
33        # Replace the channel inside _at_channel with a tempfile
34        # We will use the tempfile to simulate a tty pair.
35        os.close(master)
36        os.close(slave)
37        self._channel_file = tempfile.TemporaryFile(mode = 'w+')
38        # These properties are a copy of the properties set in ATChannel for the
39        # tty pair.
40        flags = fcntl.fcntl(self._channel_file.fileno(), fcntl.F_GETFL)
41        flags = flags | os.O_NONBLOCK
42        fcntl.fcntl(self._channel_file.fileno(), fcntl.F_SETFL, flags)
43        self._at_channel._channel = self._channel_file.fileno()
44        # We need to seek() to the beginning of the file to simulate tty read.
45        # So remember the head of the file.
46        self._channel_file_head = self._channel_file.tell()
47
48        # Also mock out the task_loop
49        self._mox_task_loop = self.mox.CreateMock(task_loop.TaskLoop)
50        self._at_channel._task_loop = self._mox_task_loop
51
52
53    def tearDown(self):
54        self._channel_file.close()
55
56    # ##########################################################################
57    # Tests
58
59    def test_successful_send(self):
60        """
61        Test that a single AT command can be sent on the channel.
62
63        """
64        payload = 'A not so huge AT+CEREG command.'
65        self._at_channel.send(payload)
66        received_command = self._recieve_command_remote()
67        self.assertTrue(received_command.endswith('\r\n'))
68        self.assertEqual(payload.strip(), received_command.strip())
69
70        # Change the AT command guard strings and check again.
71        self._at_channel.at_prefix = '$$'
72        self._at_channel.at_suffix = '##'
73        payload = 'A not so huge AT+CEREG command.'
74        self._at_channel.send(payload)
75        received_command = self._recieve_command_remote()
76        self.assertTrue(received_command.startswith('$$'))
77        self.assertTrue(received_command.endswith('##'))
78        self.assertEqual(payload.strip(),
79                         received_command.strip('$$').strip('##'))
80
81
82    def test_recieve_single_at_command(self):
83        """
84        Test that a single AT command can be received together on the channel.
85
86        """
87        payload = 'We send you our AT+good wishes too!\r\n'
88        callback = lambda channel, payload: None
89        self._at_channel._receiver_callback = callback
90        self._mox_task_loop.post_task(callback, payload.strip())
91        self.mox.ReplayAll()
92        self._send_command_remote(payload)
93        self._at_channel._handle_channel_cb(self._channel_file.fileno(),
94                                          glib.IO_IN)
95        self.mox.VerifyAll()
96
97
98    def test_receive_at_commands_differet_terminators(self):
99        """
100        Test that AT commands are recieved correctly when different supported
101        termination strings are being used.
102
103        """
104        # ; is a continuation marker. AT1;2 == AT1\r\nAT2
105        payloads = ['AT1\r\nA', 'T2\rA', 'T3\nA', 'T4;', '5\r\n']
106        callback = lambda channel, payload: None
107        self._at_channel._receiver_callback = callback
108        self._mox_task_loop.post_task(callback, 'AT1')
109        self._mox_task_loop.post_task(callback, 'AT2')
110        self._mox_task_loop.post_task(callback, 'AT3')
111        self._mox_task_loop.post_task(callback, 'AT4')
112        self._mox_task_loop.post_task(callback, 'AT5')
113
114        self.mox.ReplayAll()
115        for payload in payloads:
116            self._send_command_remote(payload)
117            self._at_channel._handle_channel_cb(self._channel_file.fileno(),
118                                                glib.IO_IN)
119        self.mox.VerifyAll()
120
121
122    def test_recieve_at_commands_in_parts(self):
123        """
124        Test that a multiple AT commands can be received in parts on the
125        channel.
126
127        """
128        payloads = ['AT1', '11\r\n', '\r\nAT22', '2\r\nAT333', '\r\n']
129        callback = lambda channel, payload: None
130        self._at_channel._receiver_callback = callback
131        self._mox_task_loop.post_task(callback, 'AT111')
132        self._mox_task_loop.post_task(callback, 'AT222')
133        self._mox_task_loop.post_task(callback, 'AT333')
134
135        self.mox.ReplayAll()
136        for payload in payloads:
137            self._send_command_remote(payload)
138            self._at_channel._handle_channel_cb(self._channel_file.fileno(),
139                                                glib.IO_IN)
140        self.mox.VerifyAll()
141
142
143    def test_recieve_long_at_commands(self):
144        """
145        Test that a multiple AT commands can be received in parts on the
146        channel.
147
148        """
149        payloads = ['AT1+',
150                    '123456789\r\nAT2+123456789\r\nAT3+1234567',
151                    '89\r\n']
152        callback = lambda channel, payload: None
153        self._at_channel._receiver_callback = callback
154        self._mox_task_loop.post_task(callback, 'AT1+123456789')
155        self._mox_task_loop.post_task(callback, 'AT2+123456789')
156        self._mox_task_loop.post_task(callback, 'AT3+123456789')
157
158        self.mox.ReplayAll()
159        at_channel.CHANNEL_READ_CHUNK_SIZE = 4
160        for payload in payloads:
161            self._send_command_remote(payload)
162            self._at_channel._handle_channel_cb(self._channel_file.fileno(),
163                                                glib.IO_IN)
164        self.mox.VerifyAll()
165
166    # ##########################################################################
167    # Helper functions
168
169    def _clean_channel_file(self):
170        """
171        Clean the tempfile used to simulate tty, and reset the r/w head.
172
173        """
174        self._channel_file.truncate(0)
175        self._channel_file_head = self._channel_file.tell()
176
177
178    def _send_command_remote(self, payload):
179        """
180        Simulate a command being sent from the remote tty port.
181
182        @param payload: The command to send.
183
184        """
185        self._clean_channel_file()
186        self._channel_file.write(payload)
187        self._channel_file.flush()
188        self._channel_file.seek(self._channel_file_head)
189
190
191    def _recieve_command_remote(self):
192        """
193        Simluate a command being received at the remote tty port.
194
195        """
196        self._channel_file.flush()
197        self._channel_file.seek(self._channel_file_head)
198        payload_list = []
199        for buf in iter(functools.partial(self._channel_file.read, 128), ''):
200            payload_list.append(buf)
201        self._clean_channel_file()
202        return ''.join(payload_list)
203
204
205    def _recieve_command_local_callback(self, payload):
206        pass
207
208
209if __name__ == '__main__':
210    logging.basicConfig(level=logging.DEBUG)
211    unittest.main()
212