server_tester.py revision f81680c018729fd4499e1e200d04b48c4b90127c
1#!/usr/bin/python2.6
2# Copyright 2012 Google Inc. All Rights Reserved.
3# Author: mrdmnd@ (Matt Redmond)
4"""A unit test for sending data to Bartlett. Requires poster module."""
5
6import cookielib
7import os
8import signal
9import subprocess
10import tempfile
11import time
12import unittest
13import urllib2
14
15from poster.encode import multipart_encode
16from poster.streaminghttp import register_openers
17
18
19SERVER_DIR = "../."
20SERVER_URL = "http://localhost:8080/"
21GET = "_ah/login?email=googler@google.com&action=Login&continue=%s"
22AUTH_URL = SERVER_URL + GET
23
24
25class ServerTest(unittest.TestCase):
26  """A unit test for the bartlett server. Tests upload, serve, and delete."""
27
28  def setUp(self):
29    """Instantiate the files and server needed to test upload functionality."""
30    self._server_proc = LaunchLocalServer()
31    self._jar = cookielib.LWPCookieJar()
32    self.opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._jar))
33
34    # We need these files to not delete when closed, because we have to reopen
35    # them in read mode after we write and close them.
36    self.profile_data = tempfile.NamedTemporaryFile(delete=False)
37
38    size = 16 * 1024
39    self.profile_data.write(os.urandom(size))
40
41  def tearDown(self):
42    self.profile_data.close()
43    os.remove(self.profile_data.name)
44    os.kill(self._server_proc.pid, signal.SIGINT)
45
46  def testIntegration(self):  # pylint: disable-msg=C6409
47    key = self._testUpload()
48    self._testListAll()
49    self._testServeKey(key)
50    self._testDelKey(key)
51
52  def _testUpload(self):  # pylint: disable-msg=C6409
53    register_openers()
54    data = {"profile_data": self.profile_data,
55            "board": "x86-zgb",
56            "chromeos_version": "2409.0.2012_06_08_1114"}
57    datagen, headers = multipart_encode(data)
58    request = urllib2.Request(SERVER_URL + "upload", datagen, headers)
59    response = urllib2.urlopen(request).read()
60    self.assertTrue(response)
61    return response
62
63  def _testListAll(self):  # pylint: disable-msg=C6409
64    request = urllib2.Request(AUTH_URL % (SERVER_URL + "serve"))
65    response = self.opener.open(request).read()
66    self.assertTrue(response)
67
68  def _testServeKey(self, key):  # pylint: disable-msg=C6409
69    request = urllib2.Request(AUTH_URL % (SERVER_URL + "serve/" + key))
70    response = self.opener.open(request).read()
71    self.assertTrue(response)
72
73  def _testDelKey(self, key):  # pylint: disable-msg=C6409
74    # There is no response to a delete request.
75    # We will check the listAll page to ensure there is no data.
76    request = urllib2.Request(AUTH_URL % (SERVER_URL + "del/" + key))
77    response = self.opener.open(request).read()
78    request = urllib2.Request(AUTH_URL % (SERVER_URL + "serve"))
79    response = self.opener.open(request).read()
80    self.assertFalse(response)
81
82
83def LaunchLocalServer():
84  """Launch and store an authentication cookie with a local server."""
85  proc = subprocess.Popen(["dev_appserver.py", "--clear_datastore", SERVER_DIR],
86                          stdout=subprocess.PIPE, stderr=subprocess.PIPE)
87  # Wait for server to come up
88  while True:
89    time.sleep(1)
90    try:
91      request = urllib2.Request(SERVER_URL + "serve")
92      response = urllib2.urlopen(request).read()
93      if response:
94        break
95    except urllib2.URLError:
96      continue
97  return proc
98
99
100if __name__ == "__main__":
101  unittest.main()
102
103