1e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott#!/usr/bin/python2.4
2e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott# Copyright (c) 2010 The Chromium Authors. All rights reserved.
3e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott# Use of this source code is governed by a BSD-style license that can be
4e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott# found in the LICENSE file.
5e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott
6e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott"""Tests exercising the various classes in xmppserver.py."""
7e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott
8e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scottimport unittest
9e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott
10e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scottimport base64
11e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scottimport xmppserver
12e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott
13e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scottclass XmlUtilsTest(unittest.TestCase):
14e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott
15e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott  def testParseXml(self):
16e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott    xml_text = """<foo xmlns=""><bar xmlns=""><baz/></bar></foo>"""
17e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott    xml = xmppserver.ParseXml(xml_text)
18e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott    self.assertEqual(xml.toxml(), xml_text)
19e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott
20e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott  def testCloneXml(self):
21e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott    xml = xmppserver.ParseXml('<foo/>')
22e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott    xml_clone = xmppserver.CloneXml(xml)
23e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott    xml_clone.setAttribute('bar', 'baz')
24e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott    self.assertEqual(xml, xml)
25e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott    self.assertEqual(xml_clone, xml_clone)
26e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott    self.assertNotEqual(xml, xml_clone)
27e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott
28e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott  def testCloneXmlUnlink(self):
29e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott    xml_text = '<foo/>'
30e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott    xml = xmppserver.ParseXml(xml_text)
31e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott    xml_clone = xmppserver.CloneXml(xml)
32e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott    xml.unlink()
33e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott    self.assertEqual(xml.parentNode, None)
34e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott    self.assertNotEqual(xml_clone.parentNode, None)
35e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott    self.assertEqual(xml_clone.toxml(), xml_text)
36e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott
37e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scottclass StanzaParserTest(unittest.TestCase):
38e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott
39e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott  def setUp(self):
40e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott    self.stanzas = []
41e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott
42e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott  def FeedStanza(self, stanza):
43e46c9386c4f79aa40185f79a19fc5b2a7ef528b3Patrick Scott    # We can't append stanza directly because it is unlinked after
44    # this callback.
45    self.stanzas.append(stanza.toxml())
46
47  def testBasic(self):
48    parser = xmppserver.StanzaParser(self)
49    parser.FeedString('<foo')
50    self.assertEqual(len(self.stanzas), 0)
51    parser.FeedString('/><bar></bar>')
52    self.assertEqual(self.stanzas[0], '<foo/>')
53    self.assertEqual(self.stanzas[1], '<bar/>')
54
55  def testStream(self):
56    parser = xmppserver.StanzaParser(self)
57    parser.FeedString('<stream')
58    self.assertEqual(len(self.stanzas), 0)
59    parser.FeedString(':stream foo="bar" xmlns:stream="baz">')
60    self.assertEqual(self.stanzas[0],
61                     '<stream:stream foo="bar" xmlns:stream="baz"/>')
62
63  def testNested(self):
64    parser = xmppserver.StanzaParser(self)
65    parser.FeedString('<foo')
66    self.assertEqual(len(self.stanzas), 0)
67    parser.FeedString(' bar="baz"')
68    parser.FeedString('><baz/><blah>meh</blah></foo>')
69    self.assertEqual(self.stanzas[0],
70                     '<foo bar="baz"><baz/><blah>meh</blah></foo>')
71
72
73class JidTest(unittest.TestCase):
74
75  def testBasic(self):
76    jid = xmppserver.Jid('foo', 'bar.com')
77    self.assertEqual(str(jid), 'foo@bar.com')
78
79  def testResource(self):
80    jid = xmppserver.Jid('foo', 'bar.com', 'resource')
81    self.assertEqual(str(jid), 'foo@bar.com/resource')
82
83  def testGetBareJid(self):
84    jid = xmppserver.Jid('foo', 'bar.com', 'resource')
85    self.assertEqual(str(jid.GetBareJid()), 'foo@bar.com')
86
87
88class IdGeneratorTest(unittest.TestCase):
89
90  def testBasic(self):
91    id_generator = xmppserver.IdGenerator('foo')
92    for i in xrange(0, 100):
93      self.assertEqual('foo.%d' % i, id_generator.GetNextId())
94
95
96class HandshakeTaskTest(unittest.TestCase):
97
98  def setUp(self):
99    self.data_received = 0
100
101  def SendData(self, _):
102    self.data_received += 1
103
104  def SendStanza(self, _, unused=True):
105    self.data_received += 1
106
107  def HandshakeDone(self, jid):
108    self.jid = jid
109
110  def DoHandshake(self, resource_prefix, resource, username,
111                  initial_stream_domain, auth_domain, auth_stream_domain):
112    self.data_received = 0
113    handshake_task = (
114      xmppserver.HandshakeTask(self, resource_prefix))
115    stream_xml = xmppserver.ParseXml('<stream:stream xmlns:stream="foo"/>')
116    stream_xml.setAttribute('to', initial_stream_domain)
117    self.assertEqual(self.data_received, 0)
118    handshake_task.FeedStanza(stream_xml)
119    self.assertEqual(self.data_received, 2)
120
121    if auth_domain:
122      username_domain = '%s@%s' % (username, auth_domain)
123    else:
124      username_domain = username
125    auth_string = base64.b64encode('\0%s\0bar' % username_domain)
126    auth_xml = xmppserver.ParseXml('<auth>%s</auth>'% auth_string)
127    handshake_task.FeedStanza(auth_xml)
128    self.assertEqual(self.data_received, 3)
129
130    stream_xml = xmppserver.ParseXml('<stream:stream xmlns:stream="foo"/>')
131    stream_xml.setAttribute('to', auth_stream_domain)
132    handshake_task.FeedStanza(stream_xml)
133    self.assertEqual(self.data_received, 5)
134
135    bind_xml = xmppserver.ParseXml(
136      '<iq type="set"><bind><resource>%s</resource></bind></iq>' % resource)
137    handshake_task.FeedStanza(bind_xml)
138    self.assertEqual(self.data_received, 6)
139
140    session_xml = xmppserver.ParseXml(
141      '<iq type="set"><session></session></iq>')
142    handshake_task.FeedStanza(session_xml)
143    self.assertEqual(self.data_received, 7)
144
145    self.assertEqual(self.jid.username, username)
146    self.assertEqual(self.jid.domain,
147                     auth_stream_domain or auth_domain or
148                     initial_stream_domain)
149    self.assertEqual(self.jid.resource,
150                     '%s.%s' % (resource_prefix, resource))
151
152  def testBasic(self):
153    self.DoHandshake('resource_prefix', 'resource',
154                     'foo', 'bar.com', 'baz.com', 'quux.com')
155
156  def testDomainBehavior(self):
157    self.DoHandshake('resource_prefix', 'resource',
158                     'foo', 'bar.com', 'baz.com', 'quux.com')
159    self.DoHandshake('resource_prefix', 'resource',
160                     'foo', 'bar.com', 'baz.com', '')
161    self.DoHandshake('resource_prefix', 'resource',
162                     'foo', 'bar.com', '', '')
163    self.DoHandshake('resource_prefix', 'resource',
164                     'foo', '', '', '')
165
166
167class XmppConnectionTest(unittest.TestCase):
168
169  def setUp(self):
170    self.connections = set()
171    self.data = []
172
173  # socket-like methods.
174  def fileno(self):
175    return 0
176
177  def setblocking(self, int):
178    pass
179
180  def getpeername(self):
181    return ('', 0)
182
183  def send(self, data):
184    self.data.append(data)
185    pass
186
187  def close(self):
188    pass
189
190  # XmppConnection delegate methods.
191  def OnXmppHandshakeDone(self, xmpp_connection):
192    self.connections.add(xmpp_connection)
193
194  def OnXmppConnectionClosed(self, xmpp_connection):
195    self.connections.discard(xmpp_connection)
196
197  def ForwardNotification(self, unused_xmpp_connection, notification_stanza):
198    for connection in self.connections:
199      connection.ForwardNotification(notification_stanza)
200
201  def testBasic(self):
202    socket_map = {}
203    xmpp_connection = xmppserver.XmppConnection(
204      self, socket_map, self, ('', 0))
205    self.assertEqual(len(socket_map), 1)
206    self.assertEqual(len(self.connections), 0)
207    xmpp_connection.HandshakeDone(xmppserver.Jid('foo', 'bar'))
208    self.assertEqual(len(socket_map), 1)
209    self.assertEqual(len(self.connections), 1)
210
211    # Test subscription request.
212    self.assertEqual(len(self.data), 0)
213    xmpp_connection.collect_incoming_data(
214      '<iq><subscribe xmlns="google:push"></subscribe></iq>')
215    self.assertEqual(len(self.data), 1)
216
217    # Test acks.
218    xmpp_connection.collect_incoming_data('<iq type="result"/>')
219    self.assertEqual(len(self.data), 1)
220
221    # Test notification.
222    xmpp_connection.collect_incoming_data(
223      '<message><push xmlns="google:push"/></message>')
224    self.assertEqual(len(self.data), 2)
225
226    # Test unexpected stanza.
227    def SendUnexpectedStanza():
228      xmpp_connection.collect_incoming_data('<foo/>')
229    self.assertRaises(xmppserver.UnexpectedXml, SendUnexpectedStanza)
230
231    # Test unexpected notifier command.
232    def SendUnexpectedNotifierCommand():
233      xmpp_connection.collect_incoming_data(
234        '<iq><foo xmlns="google:notifier"/></iq>')
235    self.assertRaises(xmppserver.UnexpectedXml,
236                      SendUnexpectedNotifierCommand)
237
238    # Test close
239    xmpp_connection.close()
240    self.assertEqual(len(socket_map), 0)
241    self.assertEqual(len(self.connections), 0)
242
243class XmppServerTest(unittest.TestCase):
244
245  # socket-like methods.
246  def fileno(self):
247    return 0
248
249  def setblocking(self, int):
250    pass
251
252  def getpeername(self):
253    return ('', 0)
254
255  def close(self):
256    pass
257
258  def testBasic(self):
259    class FakeXmppServer(xmppserver.XmppServer):
260      def accept(self2):
261        return (self, ('', 0))
262
263    socket_map = {}
264    self.assertEqual(len(socket_map), 0)
265    xmpp_server = FakeXmppServer(socket_map, ('', 0))
266    self.assertEqual(len(socket_map), 1)
267    xmpp_server.handle_accept()
268    self.assertEqual(len(socket_map), 2)
269    xmpp_server.close()
270    self.assertEqual(len(socket_map), 0)
271
272
273if __name__ == '__main__':
274  unittest.main()
275