1#! python 2# 3# Python Serial Port Extension for Win32, Linux, BSD, Jython 4# see __init__.py 5# 6# This module implements a loop back connection receiving itself what it sent. 7# 8# The purpose of this module is.. well... You can run the unit tests with it. 9# and it was so easy to implement ;-) 10# 11# (C) 2001-2011 Chris Liechti <cliechti@gmx.net> 12# this is distributed under a free software license, see license.txt 13# 14# URL format: loop://[option[/option...]] 15# options: 16# - "debug" print diagnostic messages 17 18from serial.serialutil import * 19import threading 20import time 21import logging 22 23# map log level names to constants. used in fromURL() 24LOGGER_LEVELS = { 25 'debug': logging.DEBUG, 26 'info': logging.INFO, 27 'warning': logging.WARNING, 28 'error': logging.ERROR, 29 } 30 31 32class LoopbackSerial(SerialBase): 33 """Serial port implementation that simulates a loop back connection in plain software.""" 34 35 BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800, 36 9600, 19200, 38400, 57600, 115200) 37 38 def open(self): 39 """Open port with current settings. This may throw a SerialException 40 if the port cannot be opened.""" 41 if self._isOpen: 42 raise SerialException("Port is already open.") 43 self.logger = None 44 self.buffer_lock = threading.Lock() 45 self.loop_buffer = bytearray() 46 self.cts = False 47 self.dsr = False 48 49 if self._port is None: 50 raise SerialException("Port must be configured before it can be used.") 51 # not that there is anything to open, but the function applies the 52 # options found in the URL 53 self.fromURL(self.port) 54 55 # not that there anything to configure... 56 self._reconfigurePort() 57 # all things set up get, now a clean start 58 self._isOpen = True 59 if not self._rtscts: 60 self.setRTS(True) 61 self.setDTR(True) 62 self.flushInput() 63 self.flushOutput() 64 65 def _reconfigurePort(self): 66 """Set communication parameters on opened port. for the loop:// 67 protocol all settings are ignored!""" 68 # not that's it of any real use, but it helps in the unit tests 69 if not isinstance(self._baudrate, (int, long)) or not 0 < self._baudrate < 2**32: 70 raise ValueError("invalid baudrate: %r" % (self._baudrate)) 71 if self.logger: 72 self.logger.info('_reconfigurePort()') 73 74 def close(self): 75 """Close port""" 76 if self._isOpen: 77 self._isOpen = False 78 # in case of quick reconnects, give the server some time 79 time.sleep(0.3) 80 81 def makeDeviceName(self, port): 82 raise SerialException("there is no sensible way to turn numbers into URLs") 83 84 def fromURL(self, url): 85 """extract host and port from an URL string""" 86 if url.lower().startswith("loop://"): url = url[7:] 87 try: 88 # process options now, directly altering self 89 for option in url.split('/'): 90 if '=' in option: 91 option, value = option.split('=', 1) 92 else: 93 value = None 94 if not option: 95 pass 96 elif option == 'logging': 97 logging.basicConfig() # XXX is that good to call it here? 98 self.logger = logging.getLogger('pySerial.loop') 99 self.logger.setLevel(LOGGER_LEVELS[value]) 100 self.logger.debug('enabled logging') 101 else: 102 raise ValueError('unknown option: %r' % (option,)) 103 except ValueError, e: 104 raise SerialException('expected a string in the form "[loop://][option[/option...]]": %s' % e) 105 106 # - - - - - - - - - - - - - - - - - - - - - - - - 107 108 def inWaiting(self): 109 """Return the number of characters currently in the input buffer.""" 110 if not self._isOpen: raise portNotOpenError 111 if self.logger: 112 # attention the logged value can differ from return value in 113 # threaded environments... 114 self.logger.debug('inWaiting() -> %d' % (len(self.loop_buffer),)) 115 return len(self.loop_buffer) 116 117 def read(self, size=1): 118 """Read size bytes from the serial port. If a timeout is set it may 119 return less characters as requested. With no timeout it will block 120 until the requested number of bytes is read.""" 121 if not self._isOpen: raise portNotOpenError 122 if self._timeout is not None: 123 timeout = time.time() + self._timeout 124 else: 125 timeout = None 126 data = bytearray() 127 while size > 0: 128 self.buffer_lock.acquire() 129 try: 130 block = to_bytes(self.loop_buffer[:size]) 131 del self.loop_buffer[:size] 132 finally: 133 self.buffer_lock.release() 134 data += block 135 size -= len(block) 136 # check for timeout now, after data has been read. 137 # useful for timeout = 0 (non blocking) read 138 if timeout and time.time() > timeout: 139 break 140 return bytes(data) 141 142 def write(self, data): 143 """Output the given string over the serial port. Can block if the 144 connection is blocked. May raise SerialException if the connection is 145 closed.""" 146 if not self._isOpen: raise portNotOpenError 147 # ensure we're working with bytes 148 data = to_bytes(data) 149 # calculate aprox time that would be used to send the data 150 time_used_to_send = 10.0*len(data) / self._baudrate 151 # when a write timeout is configured check if we would be successful 152 # (not sending anything, not even the part that would have time) 153 if self._writeTimeout is not None and time_used_to_send > self._writeTimeout: 154 time.sleep(self._writeTimeout) # must wait so that unit test succeeds 155 raise writeTimeoutError 156 self.buffer_lock.acquire() 157 try: 158 self.loop_buffer += data 159 finally: 160 self.buffer_lock.release() 161 return len(data) 162 163 def flushInput(self): 164 """Clear input buffer, discarding all that is in the buffer.""" 165 if not self._isOpen: raise portNotOpenError 166 if self.logger: 167 self.logger.info('flushInput()') 168 self.buffer_lock.acquire() 169 try: 170 del self.loop_buffer[:] 171 finally: 172 self.buffer_lock.release() 173 174 def flushOutput(self): 175 """Clear output buffer, aborting the current output and 176 discarding all that is in the buffer.""" 177 if not self._isOpen: raise portNotOpenError 178 if self.logger: 179 self.logger.info('flushOutput()') 180 181 def sendBreak(self, duration=0.25): 182 """Send break condition. Timed, returns to idle state after given 183 duration.""" 184 if not self._isOpen: raise portNotOpenError 185 186 def setBreak(self, level=True): 187 """Set break: Controls TXD. When active, to transmitting is 188 possible.""" 189 if not self._isOpen: raise portNotOpenError 190 if self.logger: 191 self.logger.info('setBreak(%r)' % (level,)) 192 193 def setRTS(self, level=True): 194 """Set terminal status line: Request To Send""" 195 if not self._isOpen: raise portNotOpenError 196 if self.logger: 197 self.logger.info('setRTS(%r) -> state of CTS' % (level,)) 198 self.cts = level 199 200 def setDTR(self, level=True): 201 """Set terminal status line: Data Terminal Ready""" 202 if not self._isOpen: raise portNotOpenError 203 if self.logger: 204 self.logger.info('setDTR(%r) -> state of DSR' % (level,)) 205 self.dsr = level 206 207 def getCTS(self): 208 """Read terminal status line: Clear To Send""" 209 if not self._isOpen: raise portNotOpenError 210 if self.logger: 211 self.logger.info('getCTS() -> state of RTS (%r)' % (self.cts,)) 212 return self.cts 213 214 def getDSR(self): 215 """Read terminal status line: Data Set Ready""" 216 if not self._isOpen: raise portNotOpenError 217 if self.logger: 218 self.logger.info('getDSR() -> state of DTR (%r)' % (self.dsr,)) 219 return self.dsr 220 221 def getRI(self): 222 """Read terminal status line: Ring Indicator""" 223 if not self._isOpen: raise portNotOpenError 224 if self.logger: 225 self.logger.info('returning dummy for getRI()') 226 return False 227 228 def getCD(self): 229 """Read terminal status line: Carrier Detect""" 230 if not self._isOpen: raise portNotOpenError 231 if self.logger: 232 self.logger.info('returning dummy for getCD()') 233 return True 234 235 # - - - platform specific - - - 236 # None so far 237 238 239# assemble Serial class with the platform specific implementation and the base 240# for file-like behavior. for Python 2.6 and newer, that provide the new I/O 241# library, derive from io.RawIOBase 242try: 243 import io 244except ImportError: 245 # classic version with our own file-like emulation 246 class Serial(LoopbackSerial, FileLike): 247 pass 248else: 249 # io library present 250 class Serial(LoopbackSerial, io.RawIOBase): 251 pass 252 253 254# simple client test 255if __name__ == '__main__': 256 import sys 257 s = Serial('loop://') 258 sys.stdout.write('%s\n' % s) 259 260 sys.stdout.write("write...\n") 261 s.write("hello\n") 262 s.flush() 263 sys.stdout.write("read: %s\n" % s.read(5)) 264 265 s.close() 266