1#!/usr/bin/python2.4
2# Copyright (c) 2010 The Chromium Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""A bare-bones and non-compliant XMPP server.
7
8Just enough of the protocol is implemented to get it to work with
9Chrome's sync notification system.
10"""
11
12import asynchat
13import asyncore
14import base64
15import re
16import socket
17from xml.dom import minidom
18
19# pychecker complains about the use of fileno(), which is implemented
20# by asyncore by forwarding to an internal object via __getattr__.
21__pychecker__ = 'no-classattr'
22
23
24class Error(Exception):
25  """Error class for this module."""
26  pass
27
28
29class UnexpectedXml(Error):
30  """Raised when an unexpected XML element has been encountered."""
31
32  def __init__(self, xml_element):
33    xml_text = xml_element.toxml()
34    Error.__init__(self, 'Unexpected XML element', xml_text)
35
36
37def ParseXml(xml_string):
38  """Parses the given string as XML and returns a minidom element
39  object.
40  """
41  dom = minidom.parseString(xml_string)
42
43  # minidom handles xmlns specially, but there's a bug where it sets
44  # the attribute value to None, which causes toxml() or toprettyxml()
45  # to break.
46  def FixMinidomXmlnsBug(xml_element):
47    if xml_element.getAttribute('xmlns') is None:
48      xml_element.setAttribute('xmlns', '')
49
50  def ApplyToAllDescendantElements(xml_element, fn):
51    fn(xml_element)
52    for node in xml_element.childNodes:
53      if node.nodeType == node.ELEMENT_NODE:
54        ApplyToAllDescendantElements(node, fn)
55
56  root = dom.documentElement
57  ApplyToAllDescendantElements(root, FixMinidomXmlnsBug)
58  return root
59
60
61def CloneXml(xml):
62  """Returns a deep copy of the given XML element.
63
64  Args:
65    xml: The XML element, which should be something returned from
66         ParseXml() (i.e., a root element).
67  """
68  return xml.ownerDocument.cloneNode(True).documentElement
69
70
71class StanzaParser(object):
72  """A hacky incremental XML parser.
73
74  StanzaParser consumes data incrementally via FeedString() and feeds
75  its delegate complete parsed stanzas (i.e., XML documents) via
76  FeedStanza().  Any stanzas passed to FeedStanza() are unlinked after
77  the callback is done.
78
79  Use like so:
80
81  class MyClass(object):
82    ...
83    def __init__(self, ...):
84      ...
85      self._parser = StanzaParser(self)
86      ...
87
88    def SomeFunction(self, ...):
89      ...
90      self._parser.FeedString(some_data)
91      ...
92
93    def FeedStanza(self, stanza):
94      ...
95      print stanza.toprettyxml()
96      ...
97  """
98
99  # NOTE(akalin): The following regexps are naive, but necessary since
100  # none of the existing Python 2.4/2.5 XML libraries support
101  # incremental parsing.  This works well enough for our purposes.
102  #
103  # The regexps below assume that any present XML element starts at
104  # the beginning of the string, but there may be trailing whitespace.
105
106  # Matches an opening stream tag (e.g., '<stream:stream foo="bar">')
107  # (assumes that the stream XML namespace is defined in the tag).
108  _stream_re = re.compile(r'^(<stream:stream [^>]*>)\s*')
109
110  # Matches an empty element tag (e.g., '<foo bar="baz"/>').
111  _empty_element_re = re.compile(r'^(<[^>]*/>)\s*')
112
113  # Matches a non-empty element (e.g., '<foo bar="baz">quux</foo>').
114  # Does *not* handle nested elements.
115  _non_empty_element_re = re.compile(r'^(<([^ >]*)[^>]*>.*?</\2>)\s*')
116
117  # The closing tag for a stream tag.  We have to insert this
118  # ourselves since all XML stanzas are children of the stream tag,
119  # which is never closed until the connection is closed.
120  _stream_suffix = '</stream:stream>'
121
122  def __init__(self, delegate):
123    self._buffer = ''
124    self._delegate = delegate
125
126  def FeedString(self, data):
127    """Consumes the given string data, possibly feeding one or more
128    stanzas to the delegate.
129    """
130    self._buffer += data
131    while (self._ProcessBuffer(self._stream_re, self._stream_suffix) or
132           self._ProcessBuffer(self._empty_element_re) or
133           self._ProcessBuffer(self._non_empty_element_re)):
134      pass
135
136  def _ProcessBuffer(self, regexp, xml_suffix=''):
137    """If the buffer matches the given regexp, removes the match from
138    the buffer, appends the given suffix, parses it, and feeds it to
139    the delegate.
140
141    Returns:
142      Whether or not the buffer matched the given regexp.
143    """
144    results = regexp.match(self._buffer)
145    if not results:
146      return False
147    xml_text = self._buffer[:results.end()] + xml_suffix
148    self._buffer = self._buffer[results.end():]
149    stanza = ParseXml(xml_text)
150    self._delegate.FeedStanza(stanza)
151    # Needed because stanza may have cycles.
152    stanza.unlink()
153    return True
154
155
156class Jid(object):
157  """Simple struct for an XMPP jid (essentially an e-mail address with
158  an optional resource string).
159  """
160
161  def __init__(self, username, domain, resource=''):
162    self.username = username
163    self.domain = domain
164    self.resource = resource
165
166  def __str__(self):
167    jid_str = "%s@%s" % (self.username, self.domain)
168    if self.resource:
169      jid_str += '/' + self.resource
170    return jid_str
171
172  def GetBareJid(self):
173    return Jid(self.username, self.domain)
174
175
176class IdGenerator(object):
177  """Simple class to generate unique IDs for XMPP messages."""
178
179  def __init__(self, prefix):
180    self._prefix = prefix
181    self._id = 0
182
183  def GetNextId(self):
184    next_id = "%s.%s" % (self._prefix, self._id)
185    self._id += 1
186    return next_id
187
188
189class HandshakeTask(object):
190  """Class to handle the initial handshake with a connected XMPP
191  client.
192  """
193
194  # The handshake states in order.
195  (_INITIAL_STREAM_NEEDED,
196   _AUTH_NEEDED,
197   _AUTH_STREAM_NEEDED,
198   _BIND_NEEDED,
199   _SESSION_NEEDED,
200   _FINISHED) = range(6)
201
202  # Used when in the _INITIAL_STREAM_NEEDED and _AUTH_STREAM_NEEDED
203  # states.  Not an XML object as it's only the opening tag.
204  #
205  # The from and id attributes are filled in later.
206  _STREAM_DATA = (
207    '<stream:stream from="%s" id="%s" '
208    'version="1.0" xmlns:stream="http://etherx.jabber.org/streams" '
209    'xmlns="jabber:client">')
210
211  # Used when in the _INITIAL_STREAM_NEEDED state.
212  _AUTH_STANZA = ParseXml(
213    '<stream:features xmlns:stream="http://etherx.jabber.org/streams">'
214    '  <mechanisms xmlns="urn:ietf:params:xml:ns:xmpp-sasl">'
215    '    <mechanism>PLAIN</mechanism>'
216    '    <mechanism>X-GOOGLE-TOKEN</mechanism>'
217    '  </mechanisms>'
218    '</stream:features>')
219
220  # Used when in the _AUTH_NEEDED state.
221  _AUTH_SUCCESS_STANZA = ParseXml(
222    '<success xmlns="urn:ietf:params:xml:ns:xmpp-sasl"/>')
223
224  # Used when in the _AUTH_STREAM_NEEDED state.
225  _BIND_STANZA = ParseXml(
226    '<stream:features xmlns:stream="http://etherx.jabber.org/streams">'
227    '  <bind xmlns="urn:ietf:params:xml:ns:xmpp-bind"/>'
228    '  <session xmlns="urn:ietf:params:xml:ns:xmpp-session"/>'
229    '</stream:features>')
230
231  # Used when in the _BIND_NEEDED state.
232  #
233  # The id and jid attributes are filled in later.
234  _BIND_RESULT_STANZA = ParseXml(
235    '<iq id="" type="result">'
236    '  <bind xmlns="urn:ietf:params:xml:ns:xmpp-bind">'
237    '    <jid/>'
238    '  </bind>'
239    '</iq>')
240
241  # Used when in the _SESSION_NEEDED state.
242  #
243  # The id attribute is filled in later.
244  _IQ_RESPONSE_STANZA = ParseXml('<iq id="" type="result"/>')
245
246  def __init__(self, connection, resource_prefix):
247    self._connection = connection
248    self._id_generator = IdGenerator(resource_prefix)
249    self._username = ''
250    self._domain = ''
251    self._jid = None
252    self._resource_prefix = resource_prefix
253    self._state = self._INITIAL_STREAM_NEEDED
254
255  def FeedStanza(self, stanza):
256    """Inspects the given stanza and changes the handshake state if needed.
257
258    Called when a stanza is received from the client.  Inspects the
259    stanza to make sure it has the expected attributes given the
260    current state, advances the state if needed, and sends a reply to
261    the client if needed.
262    """
263    def ExpectStanza(stanza, name):
264      if stanza.tagName != name:
265        raise UnexpectedXml(stanza)
266
267    def ExpectIq(stanza, type, name):
268      ExpectStanza(stanza, 'iq')
269      if (stanza.getAttribute('type') != type or
270          stanza.firstChild.tagName != name):
271        raise UnexpectedXml(stanza)
272
273    def GetStanzaId(stanza):
274      return stanza.getAttribute('id')
275
276    def HandleStream(stanza):
277      ExpectStanza(stanza, 'stream:stream')
278      domain = stanza.getAttribute('to')
279      if domain:
280        self._domain = domain
281      SendStreamData()
282
283    def SendStreamData():
284      next_id = self._id_generator.GetNextId()
285      stream_data = self._STREAM_DATA % (self._domain, next_id)
286      self._connection.SendData(stream_data)
287
288    def GetUserDomain(stanza):
289      encoded_username_password = stanza.firstChild.data
290      username_password = base64.b64decode(encoded_username_password)
291      (_, username_domain, _) = username_password.split('\0')
292      # The domain may be omitted.
293      #
294      # If we were using python 2.5, we'd be able to do:
295      #
296      #   username, _, domain = username_domain.partition('@')
297      #   if not domain:
298      #     domain = self._domain
299      at_pos = username_domain.find('@')
300      if at_pos != -1:
301        username = username_domain[:at_pos]
302        domain = username_domain[at_pos+1:]
303      else:
304        username = username_domain
305        domain = self._domain
306      return (username, domain)
307
308    if self._state == self._INITIAL_STREAM_NEEDED:
309      HandleStream(stanza)
310      self._connection.SendStanza(self._AUTH_STANZA, False)
311      self._state = self._AUTH_NEEDED
312
313    elif self._state == self._AUTH_NEEDED:
314      ExpectStanza(stanza, 'auth')
315      (self._username, self._domain) = GetUserDomain(stanza)
316      self._connection.SendStanza(self._AUTH_SUCCESS_STANZA, False)
317      self._state = self._AUTH_STREAM_NEEDED
318
319    elif self._state == self._AUTH_STREAM_NEEDED:
320      HandleStream(stanza)
321      self._connection.SendStanza(self._BIND_STANZA, False)
322      self._state = self._BIND_NEEDED
323
324    elif self._state == self._BIND_NEEDED:
325      ExpectIq(stanza, 'set', 'bind')
326      stanza_id = GetStanzaId(stanza)
327      resource_element = stanza.getElementsByTagName('resource')[0]
328      resource = resource_element.firstChild.data
329      full_resource = '%s.%s' % (self._resource_prefix, resource)
330      response = CloneXml(self._BIND_RESULT_STANZA)
331      response.setAttribute('id', stanza_id)
332      self._jid = Jid(self._username, self._domain, full_resource)
333      jid_text = response.parentNode.createTextNode(str(self._jid))
334      response.getElementsByTagName('jid')[0].appendChild(jid_text)
335      self._connection.SendStanza(response)
336      self._state = self._SESSION_NEEDED
337
338    elif self._state == self._SESSION_NEEDED:
339      ExpectIq(stanza, 'set', 'session')
340      stanza_id = GetStanzaId(stanza)
341      xml = CloneXml(self._IQ_RESPONSE_STANZA)
342      xml.setAttribute('id', stanza_id)
343      self._connection.SendStanza(xml)
344      self._state = self._FINISHED
345      self._connection.HandshakeDone(self._jid)
346
347
348def AddrString(addr):
349  return '%s:%d' % addr
350
351
352class XmppConnection(asynchat.async_chat):
353  """A single XMPP client connection.
354
355  This class handles the connection to a single XMPP client (via a
356  socket).  It does the XMPP handshake and also implements the (old)
357  Google notification protocol.
358  """
359
360  # Used for acknowledgements to the client.
361  #
362  # The from and id attributes are filled in later.
363  _IQ_RESPONSE_STANZA = ParseXml('<iq from="" id="" type="result"/>')
364
365  def __init__(self, sock, socket_map, delegate, addr):
366    """Starts up the xmpp connection.
367
368    Args:
369      sock: The socket to the client.
370      socket_map: A map from sockets to their owning objects.
371      delegate: The delegate, which is notified when the XMPP
372        handshake is successful, when the connection is closed, and
373        when a notification has to be broadcast.
374      addr: The host/port of the client.
375    """
376    # We do this because in versions of python < 2.6,
377    # async_chat.__init__ doesn't take a map argument nor pass it to
378    # dispatcher.__init__.  We rely on the fact that
379    # async_chat.__init__ calls dispatcher.__init__ as the last thing
380    # it does, and that calling dispatcher.__init__ with socket=None
381    # and map=None is essentially a no-op.
382    asynchat.async_chat.__init__(self)
383    asyncore.dispatcher.__init__(self, sock, socket_map)
384
385    self.set_terminator(None)
386
387    self._delegate = delegate
388    self._parser = StanzaParser(self)
389    self._jid = None
390
391    self._addr = addr
392    addr_str = AddrString(self._addr)
393    self._handshake_task = HandshakeTask(self, addr_str)
394    print 'Starting connection to %s' % self
395
396  def __str__(self):
397    if self._jid:
398      return str(self._jid)
399    else:
400      return AddrString(self._addr)
401
402  # async_chat implementation.
403
404  def collect_incoming_data(self, data):
405    self._parser.FeedString(data)
406
407  # This is only here to make pychecker happy.
408  def found_terminator(self):
409    asynchat.async_chat.found_terminator(self)
410
411  def close(self):
412    print "Closing connection to %s" % self
413    self._delegate.OnXmppConnectionClosed(self)
414    asynchat.async_chat.close(self)
415
416  # Called by self._parser.FeedString().
417  def FeedStanza(self, stanza):
418    if self._handshake_task:
419      self._handshake_task.FeedStanza(stanza)
420    elif stanza.tagName == 'iq' and stanza.getAttribute('type') == 'result':
421      # Ignore all client acks.
422      pass
423    elif (stanza.firstChild and
424          stanza.firstChild.namespaceURI == 'google:push'):
425      self._HandlePushCommand(stanza)
426    else:
427      raise UnexpectedXml(stanza)
428
429  # Called by self._handshake_task.
430  def HandshakeDone(self, jid):
431    self._jid = jid
432    self._handshake_task = None
433    self._delegate.OnXmppHandshakeDone(self)
434    print "Handshake done for %s" % self
435
436  def _HandlePushCommand(self, stanza):
437    if stanza.tagName == 'iq' and stanza.firstChild.tagName == 'subscribe':
438      # Subscription request.
439      self._SendIqResponseStanza(stanza)
440    elif stanza.tagName == 'message' and stanza.firstChild.tagName == 'push':
441      # Send notification request.
442      self._delegate.ForwardNotification(self, stanza)
443    else:
444      raise UnexpectedXml(command_xml)
445
446  def _SendIqResponseStanza(self, iq):
447    stanza = CloneXml(self._IQ_RESPONSE_STANZA)
448    stanza.setAttribute('from', str(self._jid.GetBareJid()))
449    stanza.setAttribute('id', iq.getAttribute('id'))
450    self.SendStanza(stanza)
451
452  def SendStanza(self, stanza, unlink=True):
453    """Sends a stanza to the client.
454
455    Args:
456      stanza: The stanza to send.
457      unlink: Whether to unlink stanza after sending it. (Pass in
458      False if stanza is a constant.)
459    """
460    self.SendData(stanza.toxml())
461    if unlink:
462      stanza.unlink()
463
464  def SendData(self, data):
465    """Sends raw data to the client.
466    """
467    # We explicitly encode to ascii as that is what the client expects
468    # (some minidom library functions return unicode strings).
469    self.push(data.encode('ascii'))
470
471  def ForwardNotification(self, notification_stanza):
472    """Forwards a notification to the client."""
473    notification_stanza.setAttribute('from', str(self._jid.GetBareJid()))
474    notification_stanza.setAttribute('to', str(self._jid))
475    self.SendStanza(notification_stanza, False)
476
477
478class XmppServer(asyncore.dispatcher):
479  """The main XMPP server class.
480
481  The XMPP server starts accepting connections on the given address
482  and spawns off XmppConnection objects for each one.
483
484  Use like so:
485
486    socket_map = {}
487    xmpp_server = xmppserver.XmppServer(socket_map, ('127.0.0.1', 5222))
488    asyncore.loop(30.0, False, socket_map)
489  """
490
491  def __init__(self, socket_map, addr):
492    asyncore.dispatcher.__init__(self, None, socket_map)
493    self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
494    self.set_reuse_addr()
495    self.bind(addr)
496    self.listen(5)
497    self._socket_map = socket_map
498    self._connections = set()
499    self._handshake_done_connections = set()
500
501  def handle_accept(self):
502    (sock, addr) = self.accept()
503    xmpp_connection = XmppConnection(sock, self._socket_map, self, addr)
504    self._connections.add(xmpp_connection)
505
506  def close(self):
507    # A copy is necessary since calling close on each connection
508    # removes it from self._connections.
509    for connection in self._connections.copy():
510      connection.close()
511    asyncore.dispatcher.close(self)
512
513  # XmppConnection delegate methods.
514  def OnXmppHandshakeDone(self, xmpp_connection):
515    self._handshake_done_connections.add(xmpp_connection)
516
517  def OnXmppConnectionClosed(self, xmpp_connection):
518    self._connections.discard(xmpp_connection)
519    self._handshake_done_connections.discard(xmpp_connection)
520
521  def ForwardNotification(self, unused_xmpp_connection, notification_stanza):
522    for connection in self._handshake_done_connections:
523      print 'Sending notification to %s' % connection
524      connection.ForwardNotification(notification_stanza)
525