1ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh"""RPC Implemention, originally written for the Python Idle IDE
2ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
3ffab958fd8d42ed7227d83007350e61555a1fa36Andrew HsiehFor security reasons, GvR requested that Idle's Python execution server process
4ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehconnect to the Idle process, which listens for the connection.  Since Idle has
5ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehonly one client per server, this was not a limitation.
6ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
7ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh   +---------------------------------+ +-------------+
8ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh   | SocketServer.BaseRequestHandler | | SocketIO    |
9ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh   +---------------------------------+ +-------------+
10ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                   ^                   | register()  |
11ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                   |                   | unregister()|
12ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                   |                   +-------------+
13ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                   |                      ^  ^
14ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                   |                      |  |
15ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                   | + -------------------+  |
16ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                   | |                       |
17ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh   +-------------------------+        +-----------------+
18ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh   | RPCHandler              |        | RPCClient       |
19ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh   | [attribute of RPCServer]|        |                 |
20ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh   +-------------------------+        +-----------------+
21ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
22ffab958fd8d42ed7227d83007350e61555a1fa36Andrew HsiehThe RPCServer handler class is expected to provide register/unregister methods.
23ffab958fd8d42ed7227d83007350e61555a1fa36Andrew HsiehRPCHandler inherits the mix-in class SocketIO, which provides these methods.
24ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
25ffab958fd8d42ed7227d83007350e61555a1fa36Andrew HsiehSee the Idle run.main() docstring for further information on how this was
26ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehaccomplished in Idle.
27ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
28ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh"""
29ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
30ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehimport sys
31ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehimport os
32ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehimport socket
33ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehimport select
34ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehimport SocketServer
35ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehimport struct
36ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehimport cPickle as pickle
37ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehimport threading
38ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehimport Queue
39ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehimport traceback
40ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehimport copy_reg
41ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehimport types
42ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehimport marshal
43ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
44ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
45ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehdef unpickle_code(ms):
46ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    co = marshal.loads(ms)
47ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    assert isinstance(co, types.CodeType)
48ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    return co
49ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
50ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehdef pickle_code(co):
51ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    assert isinstance(co, types.CodeType)
52ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    ms = marshal.dumps(co)
53ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    return unpickle_code, (ms,)
54ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
55ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh# XXX KBK 24Aug02 function pickling capability not used in Idle
56ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh#  def unpickle_function(ms):
57ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh#      return ms
58ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
59ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh#  def pickle_function(fn):
60ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh#      assert isinstance(fn, type.FunctionType)
61ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh#      return repr(fn)
62ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
63ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehcopy_reg.pickle(types.CodeType, pickle_code, unpickle_code)
64ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh# copy_reg.pickle(types.FunctionType, pickle_function, unpickle_function)
65ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
66ffab958fd8d42ed7227d83007350e61555a1fa36Andrew HsiehBUFSIZE = 8*1024
67ffab958fd8d42ed7227d83007350e61555a1fa36Andrew HsiehLOCALHOST = '127.0.0.1'
68ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
69ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehclass RPCServer(SocketServer.TCPServer):
70ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
71ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def __init__(self, addr, handlerclass=None):
72ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if handlerclass is None:
73ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            handlerclass = RPCHandler
74ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        SocketServer.TCPServer.__init__(self, addr, handlerclass)
75ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
76ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def server_bind(self):
77ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        "Override TCPServer method, no bind() phase for connecting entity"
78ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        pass
79ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
80ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def server_activate(self):
81ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        """Override TCPServer method, connect() instead of listen()
82ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
83ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        Due to the reversed connection, self.server_address is actually the
84ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        address of the Idle Client to which we are connecting.
85ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
86ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        """
87ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.socket.connect(self.server_address)
88ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
89ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def get_request(self):
90ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        "Override TCPServer method, return already connected socket"
91ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        return self.socket, self.server_address
92ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
93ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def handle_error(self, request, client_address):
94ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        """Override TCPServer method
95ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
96ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        Error message goes to __stderr__.  No error message if exiting
97ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        normally or socket raised EOF.  Other exceptions not handled in
98ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        server code will cause os._exit.
99ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
100ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        """
101ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        try:
102ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            raise
103ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        except SystemExit:
104ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            raise
105ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        except:
106ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            erf = sys.__stderr__
107ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            print>>erf, '\n' + '-'*40
108ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            print>>erf, 'Unhandled server exception!'
109ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            print>>erf, 'Thread: %s' % threading.currentThread().getName()
110ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            print>>erf, 'Client Address: ', client_address
111ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            print>>erf, 'Request: ', repr(request)
112ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            traceback.print_exc(file=erf)
113ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            print>>erf, '\n*** Unrecoverable, server exiting!'
114ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            print>>erf, '-'*40
115ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            os._exit(0)
116ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
117ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh#----------------- end class RPCServer --------------------
118ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
119ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehobjecttable = {}
120ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehrequest_queue = Queue.Queue(0)
121ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehresponse_queue = Queue.Queue(0)
122ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
123ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
124ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehclass SocketIO(object):
125ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
126ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    nextseq = 0
127ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
128ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def __init__(self, sock, objtable=None, debugging=None):
129ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.sockthread = threading.currentThread()
130ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if debugging is not None:
131ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            self.debugging = debugging
132ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.sock = sock
133ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if objtable is None:
134ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            objtable = objecttable
135ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.objtable = objtable
136ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.responses = {}
137ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.cvars = {}
138ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
139ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def close(self):
140ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        sock = self.sock
141ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.sock = None
142ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if sock is not None:
143ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            sock.close()
144ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
145ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def exithook(self):
146ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        "override for specific exit action"
147ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        os._exit(0)
148ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
149ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def debug(self, *args):
150ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if not self.debugging:
151ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            return
152ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        s = self.location + " " + str(threading.currentThread().getName())
153ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        for a in args:
154ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            s = s + " " + str(a)
155ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        print>>sys.__stderr__, s
156ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
157ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def register(self, oid, object):
158ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.objtable[oid] = object
159ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
160ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def unregister(self, oid):
161ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        try:
162ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            del self.objtable[oid]
163ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        except KeyError:
164ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            pass
165ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
166ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def localcall(self, seq, request):
167ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.debug("localcall:", request)
168ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        try:
169ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            how, (oid, methodname, args, kwargs) = request
170ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        except TypeError:
171ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            return ("ERROR", "Bad request format")
172ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if oid not in self.objtable:
173ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            return ("ERROR", "Unknown object id: %r" % (oid,))
174ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        obj = self.objtable[oid]
175ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if methodname == "__methods__":
176ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            methods = {}
177ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            _getmethods(obj, methods)
178ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            return ("OK", methods)
179ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if methodname == "__attributes__":
180ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            attributes = {}
181ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            _getattributes(obj, attributes)
182ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            return ("OK", attributes)
183ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if not hasattr(obj, methodname):
184ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            return ("ERROR", "Unsupported method name: %r" % (methodname,))
185ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        method = getattr(obj, methodname)
186ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        try:
187ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            if how == 'CALL':
188ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                ret = method(*args, **kwargs)
189ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                if isinstance(ret, RemoteObject):
190ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                    ret = remoteref(ret)
191ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                return ("OK", ret)
192ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            elif how == 'QUEUE':
193ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                request_queue.put((seq, (method, args, kwargs)))
194ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                return("QUEUED", None)
195ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            else:
196ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                return ("ERROR", "Unsupported message type: %s" % how)
197ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        except SystemExit:
198ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            raise
199ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        except socket.error:
200ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            raise
201ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        except:
202ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            msg = "*** Internal Error: rpc.py:SocketIO.localcall()\n\n"\
203ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                  " Object: %s \n Method: %s \n Args: %s\n"
204ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            print>>sys.__stderr__, msg % (oid, method, args)
205ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            traceback.print_exc(file=sys.__stderr__)
206ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            return ("EXCEPTION", None)
207ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
208ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def remotecall(self, oid, methodname, args, kwargs):
209ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.debug("remotecall:asynccall: ", oid, methodname)
210ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        seq = self.asynccall(oid, methodname, args, kwargs)
211ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        return self.asyncreturn(seq)
212ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
213ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def remotequeue(self, oid, methodname, args, kwargs):
214ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.debug("remotequeue:asyncqueue: ", oid, methodname)
215ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        seq = self.asyncqueue(oid, methodname, args, kwargs)
216ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        return self.asyncreturn(seq)
217ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
218ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def asynccall(self, oid, methodname, args, kwargs):
219ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        request = ("CALL", (oid, methodname, args, kwargs))
220ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        seq = self.newseq()
221ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if threading.currentThread() != self.sockthread:
222ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            cvar = threading.Condition()
223ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            self.cvars[seq] = cvar
224ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.debug(("asynccall:%d:" % seq), oid, methodname, args, kwargs)
225ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.putmessage((seq, request))
226ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        return seq
227ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
228ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def asyncqueue(self, oid, methodname, args, kwargs):
229ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        request = ("QUEUE", (oid, methodname, args, kwargs))
230ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        seq = self.newseq()
231ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if threading.currentThread() != self.sockthread:
232ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            cvar = threading.Condition()
233ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            self.cvars[seq] = cvar
234ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.debug(("asyncqueue:%d:" % seq), oid, methodname, args, kwargs)
235ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.putmessage((seq, request))
236ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        return seq
237ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
238ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def asyncreturn(self, seq):
239ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.debug("asyncreturn:%d:call getresponse(): " % seq)
240ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        response = self.getresponse(seq, wait=0.05)
241ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.debug(("asyncreturn:%d:response: " % seq), response)
242ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        return self.decoderesponse(response)
243ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
244ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def decoderesponse(self, response):
245ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        how, what = response
246ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if how == "OK":
247ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            return what
248ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if how == "QUEUED":
249ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            return None
250ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if how == "EXCEPTION":
251ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            self.debug("decoderesponse: EXCEPTION")
252ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            return None
253ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if how == "EOF":
254ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            self.debug("decoderesponse: EOF")
255ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            self.decode_interrupthook()
256ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            return None
257ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if how == "ERROR":
258ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            self.debug("decoderesponse: Internal ERROR:", what)
259ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            raise RuntimeError, what
260ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        raise SystemError, (how, what)
261ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
262ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def decode_interrupthook(self):
263ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        ""
264ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        raise EOFError
265ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
266ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def mainloop(self):
267ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        """Listen on socket until I/O not ready or EOF
268ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
269ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        pollresponse() will loop looking for seq number None, which
270ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        never comes, and exit on EOFError.
271ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
272ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        """
273ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        try:
274ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            self.getresponse(myseq=None, wait=0.05)
275ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        except EOFError:
276ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            self.debug("mainloop:return")
277ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            return
278ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
279ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def getresponse(self, myseq, wait):
280ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        response = self._getresponse(myseq, wait)
281ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if response is not None:
282ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            how, what = response
283ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            if how == "OK":
284ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                response = how, self._proxify(what)
285ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        return response
286ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
287ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def _proxify(self, obj):
288ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if isinstance(obj, RemoteProxy):
289ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            return RPCProxy(self, obj.oid)
290ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if isinstance(obj, types.ListType):
291ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            return map(self._proxify, obj)
292ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        # XXX Check for other types -- not currently needed
293ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        return obj
294ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
295ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def _getresponse(self, myseq, wait):
296ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.debug("_getresponse:myseq:", myseq)
297ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if threading.currentThread() is self.sockthread:
298ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            # this thread does all reading of requests or responses
299ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            while 1:
300ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                response = self.pollresponse(myseq, wait)
301ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                if response is not None:
302ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                    return response
303ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        else:
304ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            # wait for notification from socket handling thread
305ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            cvar = self.cvars[myseq]
306ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            cvar.acquire()
307ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            while myseq not in self.responses:
308ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                cvar.wait()
309ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            response = self.responses[myseq]
310ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            self.debug("_getresponse:%s: thread woke up: response: %s" %
311ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                       (myseq, response))
312ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            del self.responses[myseq]
313ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            del self.cvars[myseq]
314ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            cvar.release()
315ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            return response
316ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
317ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def newseq(self):
318ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.nextseq = seq = self.nextseq + 2
319ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        return seq
320ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
321ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def putmessage(self, message):
322ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.debug("putmessage:%d:" % message[0])
323ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        try:
324ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            s = pickle.dumps(message)
325ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        except pickle.PicklingError:
326ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            print >>sys.__stderr__, "Cannot pickle:", repr(message)
327ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            raise
328ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        s = struct.pack("<i", len(s)) + s
329ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        while len(s) > 0:
330ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            try:
331ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                r, w, x = select.select([], [self.sock], [])
332ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                n = self.sock.send(s[:BUFSIZE])
333ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            except (AttributeError, TypeError):
334ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                raise IOError, "socket no longer exists"
335ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            except socket.error:
336ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                raise
337ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            else:
338ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                s = s[n:]
339ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
340ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    buffer = ""
341ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    bufneed = 4
342ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    bufstate = 0 # meaning: 0 => reading count; 1 => reading data
343ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
344ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def pollpacket(self, wait):
345ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self._stage0()
346ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if len(self.buffer) < self.bufneed:
347ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            r, w, x = select.select([self.sock.fileno()], [], [], wait)
348ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            if len(r) == 0:
349ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                return None
350ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            try:
351ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                s = self.sock.recv(BUFSIZE)
352ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            except socket.error:
353ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                raise EOFError
354ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            if len(s) == 0:
355ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                raise EOFError
356ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            self.buffer += s
357ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            self._stage0()
358ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        return self._stage1()
359ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
360ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def _stage0(self):
361ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if self.bufstate == 0 and len(self.buffer) >= 4:
362ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            s = self.buffer[:4]
363ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            self.buffer = self.buffer[4:]
364ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            self.bufneed = struct.unpack("<i", s)[0]
365ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            self.bufstate = 1
366ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
367ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def _stage1(self):
368ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if self.bufstate == 1 and len(self.buffer) >= self.bufneed:
369ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            packet = self.buffer[:self.bufneed]
370ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            self.buffer = self.buffer[self.bufneed:]
371ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            self.bufneed = 4
372ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            self.bufstate = 0
373ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            return packet
374ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
375ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def pollmessage(self, wait):
376ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        packet = self.pollpacket(wait)
377ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if packet is None:
378ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            return None
379ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        try:
380ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            message = pickle.loads(packet)
381ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        except pickle.UnpicklingError:
382ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            print >>sys.__stderr__, "-----------------------"
383ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            print >>sys.__stderr__, "cannot unpickle packet:", repr(packet)
384ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            traceback.print_stack(file=sys.__stderr__)
385ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            print >>sys.__stderr__, "-----------------------"
386ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            raise
387ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        return message
388ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
389ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def pollresponse(self, myseq, wait):
390ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        """Handle messages received on the socket.
391ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
392ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        Some messages received may be asynchronous 'call' or 'queue' requests,
393ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        and some may be responses for other threads.
394ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
395ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        'call' requests are passed to self.localcall() with the expectation of
396ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        immediate execution, during which time the socket is not serviced.
397ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
398ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        'queue' requests are used for tasks (which may block or hang) to be
399ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        processed in a different thread.  These requests are fed into
400ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        request_queue by self.localcall().  Responses to queued requests are
401ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        taken from response_queue and sent across the link with the associated
402ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        sequence numbers.  Messages in the queues are (sequence_number,
403ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        request/response) tuples and code using this module removing messages
404ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        from the request_queue is responsible for returning the correct
405ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        sequence number in the response_queue.
406ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
407ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        pollresponse() will loop until a response message with the myseq
408ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        sequence number is received, and will save other responses in
409ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.responses and notify the owning thread.
410ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
411ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        """
412ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        while 1:
413ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            # send queued response if there is one available
414ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            try:
415ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                qmsg = response_queue.get(0)
416ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            except Queue.Empty:
417ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                pass
418ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            else:
419ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                seq, response = qmsg
420ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                message = (seq, ('OK', response))
421ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                self.putmessage(message)
422ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            # poll for message on link
423ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            try:
424ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                message = self.pollmessage(wait)
425ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                if message is None:  # socket not ready
426ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                    return None
427ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            except EOFError:
428ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                self.handle_EOF()
429ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                return None
430ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            except AttributeError:
431ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                return None
432ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            seq, resq = message
433ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            how = resq[0]
434ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            self.debug("pollresponse:%d:myseq:%s" % (seq, myseq))
435ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            # process or queue a request
436ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            if how in ("CALL", "QUEUE"):
437ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                self.debug("pollresponse:%d:localcall:call:" % seq)
438ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                response = self.localcall(seq, resq)
439ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                self.debug("pollresponse:%d:localcall:response:%s"
440ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                           % (seq, response))
441ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                if how == "CALL":
442ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                    self.putmessage((seq, response))
443ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                elif how == "QUEUE":
444ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                    # don't acknowledge the 'queue' request!
445ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                    pass
446ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                continue
447ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            # return if completed message transaction
448ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            elif seq == myseq:
449ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                return resq
450ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            # must be a response for a different thread:
451ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            else:
452ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                cv = self.cvars.get(seq, None)
453ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                # response involving unknown sequence number is discarded,
454ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                # probably intended for prior incarnation of server
455ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                if cv is not None:
456ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                    cv.acquire()
457ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                    self.responses[seq] = resq
458ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                    cv.notify()
459ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                    cv.release()
460ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                continue
461ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
462ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def handle_EOF(self):
463ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        "action taken upon link being closed by peer"
464ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.EOFhook()
465ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.debug("handle_EOF")
466ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        for key in self.cvars:
467ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            cv = self.cvars[key]
468ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            cv.acquire()
469ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            self.responses[key] = ('EOF', None)
470ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            cv.notify()
471ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            cv.release()
472ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        # call our (possibly overridden) exit function
473ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.exithook()
474ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
475ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def EOFhook(self):
476ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        "Classes using rpc client/server can override to augment EOF action"
477ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        pass
478ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
479ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh#----------------- end class SocketIO --------------------
480ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
481ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehclass RemoteObject(object):
482ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    # Token mix-in class
483ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    pass
484ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
485ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehdef remoteref(obj):
486ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    oid = id(obj)
487ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    objecttable[oid] = obj
488ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    return RemoteProxy(oid)
489ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
490ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehclass RemoteProxy(object):
491ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
492ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def __init__(self, oid):
493ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.oid = oid
494ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
495ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehclass RPCHandler(SocketServer.BaseRequestHandler, SocketIO):
496ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
497ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    debugging = False
498ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    location = "#S"  # Server
499ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
500ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def __init__(self, sock, addr, svr):
501ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        svr.current_handler = self ## cgt xxx
502ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        SocketIO.__init__(self, sock)
503ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        SocketServer.BaseRequestHandler.__init__(self, sock, addr, svr)
504ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
505ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def handle(self):
506ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        "handle() method required by SocketServer"
507ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.mainloop()
508ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
509ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def get_remote_proxy(self, oid):
510ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        return RPCProxy(self, oid)
511ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
512ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehclass RPCClient(SocketIO):
513ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
514ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    debugging = False
515ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    location = "#C"  # Client
516ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
517ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    nextseq = 1 # Requests coming from the client are odd numbered
518ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
519ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def __init__(self, address, family=socket.AF_INET, type=socket.SOCK_STREAM):
520ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.listening_sock = socket.socket(family, type)
521ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.listening_sock.bind(address)
522ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.listening_sock.listen(1)
523ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
524ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def accept(self):
525ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        working_sock, address = self.listening_sock.accept()
526ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if self.debugging:
527ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            print>>sys.__stderr__, "****** Connection request from ", address
528ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if address[0] == LOCALHOST:
529ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            SocketIO.__init__(self, working_sock)
530ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        else:
531ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            print>>sys.__stderr__, "** Invalid host: ", address
532ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            raise socket.error
533ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
534ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def get_remote_proxy(self, oid):
535ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        return RPCProxy(self, oid)
536ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
537ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehclass RPCProxy(object):
538ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
539ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    __methods = None
540ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    __attributes = None
541ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
542ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def __init__(self, sockio, oid):
543ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.sockio = sockio
544ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.oid = oid
545ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
546ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def __getattr__(self, name):
547ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if self.__methods is None:
548ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            self.__getmethods()
549ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if self.__methods.get(name):
550ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            return MethodProxy(self.sockio, self.oid, name)
551ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if self.__attributes is None:
552ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            self.__getattributes()
553ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if name in self.__attributes:
554ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            value = self.sockio.remotecall(self.oid, '__getattribute__',
555ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                                           (name,), {})
556ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            return value
557ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        else:
558ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            raise AttributeError, name
559ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
560ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def __getattributes(self):
561ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.__attributes = self.sockio.remotecall(self.oid,
562ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                                                "__attributes__", (), {})
563ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
564ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def __getmethods(self):
565ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.__methods = self.sockio.remotecall(self.oid,
566ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh                                                "__methods__", (), {})
567ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
568ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehdef _getmethods(obj, methods):
569ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    # Helper to get a list of methods from an object
570ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    # Adds names to dictionary argument 'methods'
571ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    for name in dir(obj):
572ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        attr = getattr(obj, name)
573ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if hasattr(attr, '__call__'):
574ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            methods[name] = 1
575ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    if type(obj) == types.InstanceType:
576ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        _getmethods(obj.__class__, methods)
577ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    if type(obj) == types.ClassType:
578ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        for super in obj.__bases__:
579ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            _getmethods(super, methods)
580ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
581ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehdef _getattributes(obj, attributes):
582ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    for name in dir(obj):
583ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        attr = getattr(obj, name)
584ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        if not hasattr(attr, '__call__'):
585ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh            attributes[name] = 1
586ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
587ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsiehclass MethodProxy(object):
588ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
589ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def __init__(self, sockio, oid, name):
590ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.sockio = sockio
591ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.oid = oid
592ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        self.name = name
593ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
594ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh    def __call__(self, *args, **kwargs):
595ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        value = self.sockio.remotecall(self.oid, self.name, args, kwargs)
596ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh        return value
597ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
598ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh
599ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh# XXX KBK 09Sep03  We need a proper unit test for this module.  Previously
600ffab958fd8d42ed7227d83007350e61555a1fa36Andrew Hsieh#                  existing test code was removed at Rev 1.27 (r34098).
601