1ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh"""RPC Implemention, originally written for the Python Idle IDE 2ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 3ffab958fd8d42ed7227d83007350e61555a1fa36Andrew HsiehFor security reasons, GvR requested that Idle's Python execution server process 4ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehconnect to the Idle process, which listens for the connection. Since Idle has 5ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehonly one client per server, this was not a limitation. 6ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 7ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh +---------------------------------+ +-------------+ 8ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh | SocketServer.BaseRequestHandler | | SocketIO | 9ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh +---------------------------------+ +-------------+ 10ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh ^ | register() | 11ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh | | unregister()| 12ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh | +-------------+ 13ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh | ^ ^ 14ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh | | | 15ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh | + -------------------+ | 16ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh | | | 17ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh +-------------------------+ +-----------------+ 18ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh | RPCHandler | | RPCClient | 19ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh | [attribute of RPCServer]| | | 20ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh +-------------------------+ +-----------------+ 21ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 22ffab958fd8d42ed7227d83007350e61555a1fa36Andrew HsiehThe RPCServer handler class is expected to provide register/unregister methods. 23ffab958fd8d42ed7227d83007350e61555a1fa36Andrew HsiehRPCHandler inherits the mix-in class SocketIO, which provides these methods. 24ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 25ffab958fd8d42ed7227d83007350e61555a1fa36Andrew HsiehSee the Idle run.main() docstring for further information on how this was 26ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehaccomplished in Idle. 27ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 28ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh""" 29ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 30ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehimport sys 31ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehimport os 32ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehimport socket 33ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehimport select 34ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehimport SocketServer 35ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehimport struct 36ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehimport cPickle as pickle 37ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehimport threading 38ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehimport Queue 39ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehimport traceback 40ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehimport copy_reg 41ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehimport types 42ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehimport marshal 43ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 44ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 45ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehdef unpickle_code(ms): 46ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh co = marshal.loads(ms) 47ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh assert isinstance(co, types.CodeType) 48ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return co 49ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 50ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehdef pickle_code(co): 51ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh assert isinstance(co, types.CodeType) 52ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh ms = marshal.dumps(co) 53ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return unpickle_code, (ms,) 54ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 55ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh# XXX KBK 24Aug02 function pickling capability not used in Idle 56ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh# def unpickle_function(ms): 57ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh# return ms 58ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 59ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh# def pickle_function(fn): 60ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh# assert isinstance(fn, type.FunctionType) 61ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh# return repr(fn) 62ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 63ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehcopy_reg.pickle(types.CodeType, pickle_code, unpickle_code) 64ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh# copy_reg.pickle(types.FunctionType, pickle_function, unpickle_function) 65ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 66ffab958fd8d42ed7227d83007350e61555a1fa36Andrew HsiehBUFSIZE = 8*1024 67ffab958fd8d42ed7227d83007350e61555a1fa36Andrew HsiehLOCALHOST = '127.0.0.1' 68ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 69ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehclass RPCServer(SocketServer.TCPServer): 70ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 71ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def __init__(self, addr, handlerclass=None): 72ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if handlerclass is None: 73ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh handlerclass = RPCHandler 74ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh SocketServer.TCPServer.__init__(self, addr, handlerclass) 75ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 76ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def server_bind(self): 77ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh "Override TCPServer method, no bind() phase for connecting entity" 78ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh pass 79ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 80ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def server_activate(self): 81ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh """Override TCPServer method, connect() instead of listen() 82ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 83ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh Due to the reversed connection, self.server_address is actually the 84ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh address of the Idle Client to which we are connecting. 85ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 86ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh """ 87ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.socket.connect(self.server_address) 88ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 89ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def get_request(self): 90ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh "Override TCPServer method, return already connected socket" 91ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return self.socket, self.server_address 92ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 93ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def handle_error(self, request, client_address): 94ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh """Override TCPServer method 95ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 96ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh Error message goes to __stderr__. No error message if exiting 97ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh normally or socket raised EOF. Other exceptions not handled in 98ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh server code will cause os._exit. 99ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 100ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh """ 101ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh try: 102ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh raise 103ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh except SystemExit: 104ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh raise 105ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh except: 106ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh erf = sys.__stderr__ 107ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh print>>erf, '\n' + '-'*40 108ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh print>>erf, 'Unhandled server exception!' 109ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh print>>erf, 'Thread: %s' % threading.currentThread().getName() 110ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh print>>erf, 'Client Address: ', client_address 111ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh print>>erf, 'Request: ', repr(request) 112ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh traceback.print_exc(file=erf) 113ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh print>>erf, '\n*** Unrecoverable, server exiting!' 114ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh print>>erf, '-'*40 115ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh os._exit(0) 116ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 117ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh#----------------- end class RPCServer -------------------- 118ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 119ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehobjecttable = {} 120ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehrequest_queue = Queue.Queue(0) 121ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehresponse_queue = Queue.Queue(0) 122ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 123ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 124ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehclass SocketIO(object): 125ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 126ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh nextseq = 0 127ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 128ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def __init__(self, sock, objtable=None, debugging=None): 129ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.sockthread = threading.currentThread() 130ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if debugging is not None: 131ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.debugging = debugging 132ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.sock = sock 133ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if objtable is None: 134ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh objtable = objecttable 135ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.objtable = objtable 136ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.responses = {} 137ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.cvars = {} 138ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 139ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def close(self): 140ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh sock = self.sock 141ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.sock = None 142ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if sock is not None: 143ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh sock.close() 144ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 145ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def exithook(self): 146ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh "override for specific exit action" 147ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh os._exit(0) 148ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 149ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def debug(self, *args): 150ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if not self.debugging: 151ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return 152ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh s = self.location + " " + str(threading.currentThread().getName()) 153ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh for a in args: 154ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh s = s + " " + str(a) 155ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh print>>sys.__stderr__, s 156ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 157ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def register(self, oid, object): 158ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.objtable[oid] = object 159ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 160ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def unregister(self, oid): 161ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh try: 162ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh del self.objtable[oid] 163ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh except KeyError: 164ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh pass 165ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 166ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def localcall(self, seq, request): 167ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.debug("localcall:", request) 168ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh try: 169ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh how, (oid, methodname, args, kwargs) = request 170ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh except TypeError: 171ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return ("ERROR", "Bad request format") 172ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if oid not in self.objtable: 173ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return ("ERROR", "Unknown object id: %r" % (oid,)) 174ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh obj = self.objtable[oid] 175ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if methodname == "__methods__": 176ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh methods = {} 177ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh _getmethods(obj, methods) 178ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return ("OK", methods) 179ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if methodname == "__attributes__": 180ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh attributes = {} 181ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh _getattributes(obj, attributes) 182ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return ("OK", attributes) 183ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if not hasattr(obj, methodname): 184ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return ("ERROR", "Unsupported method name: %r" % (methodname,)) 185ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh method = getattr(obj, methodname) 186ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh try: 187ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if how == 'CALL': 188ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh ret = method(*args, **kwargs) 189ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if isinstance(ret, RemoteObject): 190ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh ret = remoteref(ret) 191ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return ("OK", ret) 192ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh elif how == 'QUEUE': 193ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh request_queue.put((seq, (method, args, kwargs))) 194ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return("QUEUED", None) 195ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh else: 196ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return ("ERROR", "Unsupported message type: %s" % how) 197ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh except SystemExit: 198ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh raise 199ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh except socket.error: 200ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh raise 201ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh except: 202ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh msg = "*** Internal Error: rpc.py:SocketIO.localcall()\n\n"\ 203ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh " Object: %s \n Method: %s \n Args: %s\n" 204ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh print>>sys.__stderr__, msg % (oid, method, args) 205ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh traceback.print_exc(file=sys.__stderr__) 206ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return ("EXCEPTION", None) 207ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 208ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def remotecall(self, oid, methodname, args, kwargs): 209ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.debug("remotecall:asynccall: ", oid, methodname) 210ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh seq = self.asynccall(oid, methodname, args, kwargs) 211ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return self.asyncreturn(seq) 212ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 213ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def remotequeue(self, oid, methodname, args, kwargs): 214ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.debug("remotequeue:asyncqueue: ", oid, methodname) 215ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh seq = self.asyncqueue(oid, methodname, args, kwargs) 216ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return self.asyncreturn(seq) 217ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 218ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def asynccall(self, oid, methodname, args, kwargs): 219ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh request = ("CALL", (oid, methodname, args, kwargs)) 220ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh seq = self.newseq() 221ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if threading.currentThread() != self.sockthread: 222ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh cvar = threading.Condition() 223ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.cvars[seq] = cvar 224ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.debug(("asynccall:%d:" % seq), oid, methodname, args, kwargs) 225ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.putmessage((seq, request)) 226ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return seq 227ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 228ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def asyncqueue(self, oid, methodname, args, kwargs): 229ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh request = ("QUEUE", (oid, methodname, args, kwargs)) 230ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh seq = self.newseq() 231ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if threading.currentThread() != self.sockthread: 232ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh cvar = threading.Condition() 233ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.cvars[seq] = cvar 234ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.debug(("asyncqueue:%d:" % seq), oid, methodname, args, kwargs) 235ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.putmessage((seq, request)) 236ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return seq 237ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 238ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def asyncreturn(self, seq): 239ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.debug("asyncreturn:%d:call getresponse(): " % seq) 240ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh response = self.getresponse(seq, wait=0.05) 241ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.debug(("asyncreturn:%d:response: " % seq), response) 242ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return self.decoderesponse(response) 243ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 244ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def decoderesponse(self, response): 245ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh how, what = response 246ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if how == "OK": 247ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return what 248ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if how == "QUEUED": 249ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return None 250ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if how == "EXCEPTION": 251ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.debug("decoderesponse: EXCEPTION") 252ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return None 253ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if how == "EOF": 254ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.debug("decoderesponse: EOF") 255ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.decode_interrupthook() 256ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return None 257ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if how == "ERROR": 258ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.debug("decoderesponse: Internal ERROR:", what) 259ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh raise RuntimeError, what 260ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh raise SystemError, (how, what) 261ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 262ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def decode_interrupthook(self): 263ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh "" 264ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh raise EOFError 265ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 266ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def mainloop(self): 267ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh """Listen on socket until I/O not ready or EOF 268ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 269ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh pollresponse() will loop looking for seq number None, which 270ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh never comes, and exit on EOFError. 271ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 272ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh """ 273ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh try: 274ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.getresponse(myseq=None, wait=0.05) 275ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh except EOFError: 276ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.debug("mainloop:return") 277ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return 278ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 279ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def getresponse(self, myseq, wait): 280ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh response = self._getresponse(myseq, wait) 281ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if response is not None: 282ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh how, what = response 283ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if how == "OK": 284ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh response = how, self._proxify(what) 285ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return response 286ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 287ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def _proxify(self, obj): 288ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if isinstance(obj, RemoteProxy): 289ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return RPCProxy(self, obj.oid) 290ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if isinstance(obj, types.ListType): 291ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return map(self._proxify, obj) 292ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh # XXX Check for other types -- not currently needed 293ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return obj 294ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 295ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def _getresponse(self, myseq, wait): 296ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.debug("_getresponse:myseq:", myseq) 297ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if threading.currentThread() is self.sockthread: 298ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh # this thread does all reading of requests or responses 299ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh while 1: 300ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh response = self.pollresponse(myseq, wait) 301ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if response is not None: 302ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return response 303ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh else: 304ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh # wait for notification from socket handling thread 305ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh cvar = self.cvars[myseq] 306ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh cvar.acquire() 307ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh while myseq not in self.responses: 308ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh cvar.wait() 309ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh response = self.responses[myseq] 310ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.debug("_getresponse:%s: thread woke up: response: %s" % 311ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh (myseq, response)) 312ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh del self.responses[myseq] 313ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh del self.cvars[myseq] 314ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh cvar.release() 315ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return response 316ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 317ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def newseq(self): 318ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.nextseq = seq = self.nextseq + 2 319ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return seq 320ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 321ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def putmessage(self, message): 322ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.debug("putmessage:%d:" % message[0]) 323ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh try: 324ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh s = pickle.dumps(message) 325ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh except pickle.PicklingError: 326ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh print >>sys.__stderr__, "Cannot pickle:", repr(message) 327ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh raise 328ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh s = struct.pack("<i", len(s)) + s 329ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh while len(s) > 0: 330ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh try: 331ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh r, w, x = select.select([], [self.sock], []) 332ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh n = self.sock.send(s[:BUFSIZE]) 333ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh except (AttributeError, TypeError): 334ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh raise IOError, "socket no longer exists" 335ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh except socket.error: 336ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh raise 337ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh else: 338ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh s = s[n:] 339ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 340ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh buffer = "" 341ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh bufneed = 4 342ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh bufstate = 0 # meaning: 0 => reading count; 1 => reading data 343ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 344ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def pollpacket(self, wait): 345ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self._stage0() 346ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if len(self.buffer) < self.bufneed: 347ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh r, w, x = select.select([self.sock.fileno()], [], [], wait) 348ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if len(r) == 0: 349ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return None 350ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh try: 351ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh s = self.sock.recv(BUFSIZE) 352ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh except socket.error: 353ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh raise EOFError 354ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if len(s) == 0: 355ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh raise EOFError 356ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.buffer += s 357ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self._stage0() 358ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return self._stage1() 359ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 360ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def _stage0(self): 361ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if self.bufstate == 0 and len(self.buffer) >= 4: 362ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh s = self.buffer[:4] 363ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.buffer = self.buffer[4:] 364ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.bufneed = struct.unpack("<i", s)[0] 365ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.bufstate = 1 366ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 367ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def _stage1(self): 368ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if self.bufstate == 1 and len(self.buffer) >= self.bufneed: 369ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh packet = self.buffer[:self.bufneed] 370ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.buffer = self.buffer[self.bufneed:] 371ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.bufneed = 4 372ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.bufstate = 0 373ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return packet 374ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 375ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def pollmessage(self, wait): 376ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh packet = self.pollpacket(wait) 377ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if packet is None: 378ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return None 379ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh try: 380ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh message = pickle.loads(packet) 381ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh except pickle.UnpicklingError: 382ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh print >>sys.__stderr__, "-----------------------" 383ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh print >>sys.__stderr__, "cannot unpickle packet:", repr(packet) 384ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh traceback.print_stack(file=sys.__stderr__) 385ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh print >>sys.__stderr__, "-----------------------" 386ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh raise 387ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return message 388ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 389ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def pollresponse(self, myseq, wait): 390ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh """Handle messages received on the socket. 391ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 392ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh Some messages received may be asynchronous 'call' or 'queue' requests, 393ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh and some may be responses for other threads. 394ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 395ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 'call' requests are passed to self.localcall() with the expectation of 396ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh immediate execution, during which time the socket is not serviced. 397ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 398ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 'queue' requests are used for tasks (which may block or hang) to be 399ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh processed in a different thread. These requests are fed into 400ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh request_queue by self.localcall(). Responses to queued requests are 401ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh taken from response_queue and sent across the link with the associated 402ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh sequence numbers. Messages in the queues are (sequence_number, 403ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh request/response) tuples and code using this module removing messages 404ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh from the request_queue is responsible for returning the correct 405ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh sequence number in the response_queue. 406ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 407ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh pollresponse() will loop until a response message with the myseq 408ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh sequence number is received, and will save other responses in 409ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.responses and notify the owning thread. 410ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 411ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh """ 412ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh while 1: 413ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh # send queued response if there is one available 414ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh try: 415ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh qmsg = response_queue.get(0) 416ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh except Queue.Empty: 417ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh pass 418ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh else: 419ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh seq, response = qmsg 420ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh message = (seq, ('OK', response)) 421ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.putmessage(message) 422ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh # poll for message on link 423ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh try: 424ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh message = self.pollmessage(wait) 425ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if message is None: # socket not ready 426ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return None 427ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh except EOFError: 428ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.handle_EOF() 429ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return None 430ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh except AttributeError: 431ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return None 432ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh seq, resq = message 433ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh how = resq[0] 434ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.debug("pollresponse:%d:myseq:%s" % (seq, myseq)) 435ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh # process or queue a request 436ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if how in ("CALL", "QUEUE"): 437ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.debug("pollresponse:%d:localcall:call:" % seq) 438ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh response = self.localcall(seq, resq) 439ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.debug("pollresponse:%d:localcall:response:%s" 440ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh % (seq, response)) 441ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if how == "CALL": 442ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.putmessage((seq, response)) 443ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh elif how == "QUEUE": 444ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh # don't acknowledge the 'queue' request! 445ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh pass 446ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh continue 447ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh # return if completed message transaction 448ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh elif seq == myseq: 449ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return resq 450ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh # must be a response for a different thread: 451ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh else: 452ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh cv = self.cvars.get(seq, None) 453ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh # response involving unknown sequence number is discarded, 454ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh # probably intended for prior incarnation of server 455ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if cv is not None: 456ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh cv.acquire() 457ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.responses[seq] = resq 458ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh cv.notify() 459ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh cv.release() 460ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh continue 461ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 462ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def handle_EOF(self): 463ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh "action taken upon link being closed by peer" 464ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.EOFhook() 465ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.debug("handle_EOF") 466ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh for key in self.cvars: 467ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh cv = self.cvars[key] 468ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh cv.acquire() 469ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.responses[key] = ('EOF', None) 470ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh cv.notify() 471ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh cv.release() 472ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh # call our (possibly overridden) exit function 473ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.exithook() 474ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 475ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def EOFhook(self): 476ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh "Classes using rpc client/server can override to augment EOF action" 477ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh pass 478ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 479ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh#----------------- end class SocketIO -------------------- 480ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 481ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehclass RemoteObject(object): 482ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh # Token mix-in class 483ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh pass 484ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 485ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehdef remoteref(obj): 486ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh oid = id(obj) 487ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh objecttable[oid] = obj 488ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return RemoteProxy(oid) 489ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 490ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehclass RemoteProxy(object): 491ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 492ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def __init__(self, oid): 493ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.oid = oid 494ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 495ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehclass RPCHandler(SocketServer.BaseRequestHandler, SocketIO): 496ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 497ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh debugging = False 498ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh location = "#S" # Server 499ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 500ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def __init__(self, sock, addr, svr): 501ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh svr.current_handler = self ## cgt xxx 502ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh SocketIO.__init__(self, sock) 503ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh SocketServer.BaseRequestHandler.__init__(self, sock, addr, svr) 504ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 505ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def handle(self): 506ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh "handle() method required by SocketServer" 507ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.mainloop() 508ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 509ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def get_remote_proxy(self, oid): 510ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return RPCProxy(self, oid) 511ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 512ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehclass RPCClient(SocketIO): 513ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 514ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh debugging = False 515ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh location = "#C" # Client 516ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 517ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh nextseq = 1 # Requests coming from the client are odd numbered 518ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 519ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def __init__(self, address, family=socket.AF_INET, type=socket.SOCK_STREAM): 520ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.listening_sock = socket.socket(family, type) 521ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.listening_sock.bind(address) 522ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.listening_sock.listen(1) 523ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 524ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def accept(self): 525ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh working_sock, address = self.listening_sock.accept() 526ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if self.debugging: 527ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh print>>sys.__stderr__, "****** Connection request from ", address 528ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if address[0] == LOCALHOST: 529ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh SocketIO.__init__(self, working_sock) 530ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh else: 531ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh print>>sys.__stderr__, "** Invalid host: ", address 532ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh raise socket.error 533ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 534ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def get_remote_proxy(self, oid): 535ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return RPCProxy(self, oid) 536ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 537ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehclass RPCProxy(object): 538ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 539ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh __methods = None 540ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh __attributes = None 541ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 542ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def __init__(self, sockio, oid): 543ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.sockio = sockio 544ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.oid = oid 545ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 546ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def __getattr__(self, name): 547ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if self.__methods is None: 548ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.__getmethods() 549ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if self.__methods.get(name): 550ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return MethodProxy(self.sockio, self.oid, name) 551ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if self.__attributes is None: 552ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.__getattributes() 553ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if name in self.__attributes: 554ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh value = self.sockio.remotecall(self.oid, '__getattribute__', 555ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh (name,), {}) 556ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return value 557ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh else: 558ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh raise AttributeError, name 559ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 560ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def __getattributes(self): 561ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.__attributes = self.sockio.remotecall(self.oid, 562ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh "__attributes__", (), {}) 563ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 564ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def __getmethods(self): 565ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.__methods = self.sockio.remotecall(self.oid, 566ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh "__methods__", (), {}) 567ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 568ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehdef _getmethods(obj, methods): 569ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh # Helper to get a list of methods from an object 570ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh # Adds names to dictionary argument 'methods' 571ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh for name in dir(obj): 572ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh attr = getattr(obj, name) 573ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if hasattr(attr, '__call__'): 574ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh methods[name] = 1 575ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if type(obj) == types.InstanceType: 576ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh _getmethods(obj.__class__, methods) 577ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if type(obj) == types.ClassType: 578ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh for super in obj.__bases__: 579ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh _getmethods(super, methods) 580ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 581ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehdef _getattributes(obj, attributes): 582ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh for name in dir(obj): 583ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh attr = getattr(obj, name) 584ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh if not hasattr(attr, '__call__'): 585ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh attributes[name] = 1 586ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 587ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehclass MethodProxy(object): 588ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 589ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def __init__(self, sockio, oid, name): 590ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.sockio = sockio 591ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.oid = oid 592ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh self.name = name 593ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 594ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh def __call__(self, *args, **kwargs): 595ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh value = self.sockio.remotecall(self.oid, self.name, args, kwargs) 596ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh return value 597ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 598ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh 599ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh# XXX KBK 09Sep03 We need a proper unit test for this module. Previously 600ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh# existing test code was removed at Rev 1.27 (r34098). 601