1# -*- Mode: Python; tab-width: 4 -*- 2# Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp 3# Author: Sam Rushing <rushing@nightmare.com> 4 5# ====================================================================== 6# Copyright 1996 by Sam Rushing 7# 8# All Rights Reserved 9# 10# Permission to use, copy, modify, and distribute this software and 11# its documentation for any purpose and without fee is hereby 12# granted, provided that the above copyright notice appear in all 13# copies and that both that copyright notice and this permission 14# notice appear in supporting documentation, and that the name of Sam 15# Rushing not be used in advertising or publicity pertaining to 16# distribution of the software without specific, written prior 17# permission. 18# 19# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, 20# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN 21# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR 22# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS 23# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, 24# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN 25# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 26# ====================================================================== 27 28r"""A class supporting chat-style (command/response) protocols. 29 30This class adds support for 'chat' style protocols - where one side 31sends a 'command', and the other sends a response (examples would be 32the common internet protocols - smtp, nntp, ftp, etc..). 33 34The handle_read() method looks at the input stream for the current 35'terminator' (usually '\r\n' for single-line responses, '\r\n.\r\n' 36for multi-line output), calling self.found_terminator() on its 37receipt. 38 39for example: 40Say you build an async nntp client using this class. At the start 41of the connection, you'll have self.terminator set to '\r\n', in 42order to process the single-line greeting. Just before issuing a 43'LIST' command you'll set it to '\r\n.\r\n'. The output of the LIST 44command will be accumulated (using your own 'collect_incoming_data' 45method) up to the terminator, and then control will be returned to 46you - by calling your self.found_terminator() method. 47""" 48 49import socket 50import asyncore 51from collections import deque 52from sys import py3kwarning 53from warnings import filterwarnings, catch_warnings 54 55class async_chat (asyncore.dispatcher): 56 """This is an abstract class. You must derive from this class, and add 57 the two methods collect_incoming_data() and found_terminator()""" 58 59 # these are overridable defaults 60 61 ac_in_buffer_size = 4096 62 ac_out_buffer_size = 4096 63 64 def __init__ (self, sock=None, map=None): 65 # for string terminator matching 66 self.ac_in_buffer = '' 67 68 # we use a list here rather than cStringIO for a few reasons... 69 # del lst[:] is faster than sio.truncate(0) 70 # lst = [] is faster than sio.truncate(0) 71 # cStringIO will be gaining unicode support in py3k, which 72 # will negatively affect the performance of bytes compared to 73 # a ''.join() equivalent 74 self.incoming = [] 75 76 # we toss the use of the "simple producer" and replace it with 77 # a pure deque, which the original fifo was a wrapping of 78 self.producer_fifo = deque() 79 asyncore.dispatcher.__init__ (self, sock, map) 80 81 def collect_incoming_data(self, data): 82 raise NotImplementedError("must be implemented in subclass") 83 84 def _collect_incoming_data(self, data): 85 self.incoming.append(data) 86 87 def _get_data(self): 88 d = ''.join(self.incoming) 89 del self.incoming[:] 90 return d 91 92 def found_terminator(self): 93 raise NotImplementedError("must be implemented in subclass") 94 95 def set_terminator (self, term): 96 "Set the input delimiter. Can be a fixed string of any length, an integer, or None" 97 self.terminator = term 98 99 def get_terminator (self): 100 return self.terminator 101 102 # grab some more data from the socket, 103 # throw it to the collector method, 104 # check for the terminator, 105 # if found, transition to the next state. 106 107 def handle_read (self): 108 109 try: 110 data = self.recv (self.ac_in_buffer_size) 111 except socket.error, why: 112 self.handle_error() 113 return 114 115 self.ac_in_buffer = self.ac_in_buffer + data 116 117 # Continue to search for self.terminator in self.ac_in_buffer, 118 # while calling self.collect_incoming_data. The while loop 119 # is necessary because we might read several data+terminator 120 # combos with a single recv(4096). 121 122 while self.ac_in_buffer: 123 lb = len(self.ac_in_buffer) 124 terminator = self.get_terminator() 125 if not terminator: 126 # no terminator, collect it all 127 self.collect_incoming_data (self.ac_in_buffer) 128 self.ac_in_buffer = '' 129 elif isinstance(terminator, int) or isinstance(terminator, long): 130 # numeric terminator 131 n = terminator 132 if lb < n: 133 self.collect_incoming_data (self.ac_in_buffer) 134 self.ac_in_buffer = '' 135 self.terminator = self.terminator - lb 136 else: 137 self.collect_incoming_data (self.ac_in_buffer[:n]) 138 self.ac_in_buffer = self.ac_in_buffer[n:] 139 self.terminator = 0 140 self.found_terminator() 141 else: 142 # 3 cases: 143 # 1) end of buffer matches terminator exactly: 144 # collect data, transition 145 # 2) end of buffer matches some prefix: 146 # collect data to the prefix 147 # 3) end of buffer does not match any prefix: 148 # collect data 149 terminator_len = len(terminator) 150 index = self.ac_in_buffer.find(terminator) 151 if index != -1: 152 # we found the terminator 153 if index > 0: 154 # don't bother reporting the empty string (source of subtle bugs) 155 self.collect_incoming_data (self.ac_in_buffer[:index]) 156 self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:] 157 # This does the Right Thing if the terminator is changed here. 158 self.found_terminator() 159 else: 160 # check for a prefix of the terminator 161 index = find_prefix_at_end (self.ac_in_buffer, terminator) 162 if index: 163 if index != lb: 164 # we found a prefix, collect up to the prefix 165 self.collect_incoming_data (self.ac_in_buffer[:-index]) 166 self.ac_in_buffer = self.ac_in_buffer[-index:] 167 break 168 else: 169 # no prefix, collect it all 170 self.collect_incoming_data (self.ac_in_buffer) 171 self.ac_in_buffer = '' 172 173 def handle_write (self): 174 self.initiate_send() 175 176 def handle_close (self): 177 self.close() 178 179 def push (self, data): 180 sabs = self.ac_out_buffer_size 181 if len(data) > sabs: 182 for i in xrange(0, len(data), sabs): 183 self.producer_fifo.append(data[i:i+sabs]) 184 else: 185 self.producer_fifo.append(data) 186 self.initiate_send() 187 188 def push_with_producer (self, producer): 189 self.producer_fifo.append(producer) 190 self.initiate_send() 191 192 def readable (self): 193 "predicate for inclusion in the readable for select()" 194 # cannot use the old predicate, it violates the claim of the 195 # set_terminator method. 196 197 # return (len(self.ac_in_buffer) <= self.ac_in_buffer_size) 198 return 1 199 200 def writable (self): 201 "predicate for inclusion in the writable for select()" 202 return self.producer_fifo or (not self.connected) 203 204 def close_when_done (self): 205 "automatically close this channel once the outgoing queue is empty" 206 self.producer_fifo.append(None) 207 208 def initiate_send(self): 209 while self.producer_fifo and self.connected: 210 first = self.producer_fifo[0] 211 # handle empty string/buffer or None entry 212 if not first: 213 del self.producer_fifo[0] 214 if first is None: 215 self.handle_close() 216 return 217 218 # handle classic producer behavior 219 obs = self.ac_out_buffer_size 220 try: 221 with catch_warnings(): 222 if py3kwarning: 223 filterwarnings("ignore", ".*buffer", DeprecationWarning) 224 data = buffer(first, 0, obs) 225 except TypeError: 226 data = first.more() 227 if data: 228 self.producer_fifo.appendleft(data) 229 else: 230 del self.producer_fifo[0] 231 continue 232 233 # send the data 234 try: 235 num_sent = self.send(data) 236 except socket.error: 237 self.handle_error() 238 return 239 240 if num_sent: 241 if num_sent < len(data) or obs < len(first): 242 self.producer_fifo[0] = first[num_sent:] 243 else: 244 del self.producer_fifo[0] 245 # we tried to send some actual data 246 return 247 248 def discard_buffers (self): 249 # Emergencies only! 250 self.ac_in_buffer = '' 251 del self.incoming[:] 252 self.producer_fifo.clear() 253 254class simple_producer: 255 256 def __init__ (self, data, buffer_size=512): 257 self.data = data 258 self.buffer_size = buffer_size 259 260 def more (self): 261 if len (self.data) > self.buffer_size: 262 result = self.data[:self.buffer_size] 263 self.data = self.data[self.buffer_size:] 264 return result 265 else: 266 result = self.data 267 self.data = '' 268 return result 269 270class fifo: 271 def __init__ (self, list=None): 272 if not list: 273 self.list = deque() 274 else: 275 self.list = deque(list) 276 277 def __len__ (self): 278 return len(self.list) 279 280 def is_empty (self): 281 return not self.list 282 283 def first (self): 284 return self.list[0] 285 286 def push (self, data): 287 self.list.append(data) 288 289 def pop (self): 290 if self.list: 291 return (1, self.list.popleft()) 292 else: 293 return (0, None) 294 295# Given 'haystack', see if any prefix of 'needle' is at its end. This 296# assumes an exact match has already been checked. Return the number of 297# characters matched. 298# for example: 299# f_p_a_e ("qwerty\r", "\r\n") => 1 300# f_p_a_e ("qwertydkjf", "\r\n") => 0 301# f_p_a_e ("qwerty\r\n", "\r\n") => <undefined> 302 303# this could maybe be made faster with a computed regex? 304# [answer: no; circa Python-2.0, Jan 2001] 305# new python: 28961/s 306# old python: 18307/s 307# re: 12820/s 308# regex: 14035/s 309 310def find_prefix_at_end (haystack, needle): 311 l = len(needle) - 1 312 while l and not haystack.endswith(needle[:l]): 313 l -= 1 314 return l 315