server_tester.py revision f81680c018729fd4499e1e200d04b48c4b90127c
1#!/usr/bin/python2.6 2# Copyright 2012 Google Inc. All Rights Reserved. 3# Author: mrdmnd@ (Matt Redmond) 4"""A unit test for sending data to Bartlett. Requires poster module.""" 5 6import cookielib 7import os 8import signal 9import subprocess 10import tempfile 11import time 12import unittest 13import urllib2 14 15from poster.encode import multipart_encode 16from poster.streaminghttp import register_openers 17 18 19SERVER_DIR = "../." 20SERVER_URL = "http://localhost:8080/" 21GET = "_ah/login?email=googler@google.com&action=Login&continue=%s" 22AUTH_URL = SERVER_URL + GET 23 24 25class ServerTest(unittest.TestCase): 26 """A unit test for the bartlett server. Tests upload, serve, and delete.""" 27 28 def setUp(self): 29 """Instantiate the files and server needed to test upload functionality.""" 30 self._server_proc = LaunchLocalServer() 31 self._jar = cookielib.LWPCookieJar() 32 self.opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self._jar)) 33 34 # We need these files to not delete when closed, because we have to reopen 35 # them in read mode after we write and close them. 36 self.profile_data = tempfile.NamedTemporaryFile(delete=False) 37 38 size = 16 * 1024 39 self.profile_data.write(os.urandom(size)) 40 41 def tearDown(self): 42 self.profile_data.close() 43 os.remove(self.profile_data.name) 44 os.kill(self._server_proc.pid, signal.SIGINT) 45 46 def testIntegration(self): # pylint: disable-msg=C6409 47 key = self._testUpload() 48 self._testListAll() 49 self._testServeKey(key) 50 self._testDelKey(key) 51 52 def _testUpload(self): # pylint: disable-msg=C6409 53 register_openers() 54 data = {"profile_data": self.profile_data, 55 "board": "x86-zgb", 56 "chromeos_version": "2409.0.2012_06_08_1114"} 57 datagen, headers = multipart_encode(data) 58 request = urllib2.Request(SERVER_URL + "upload", datagen, headers) 59 response = urllib2.urlopen(request).read() 60 self.assertTrue(response) 61 return response 62 63 def _testListAll(self): # pylint: disable-msg=C6409 64 request = urllib2.Request(AUTH_URL % (SERVER_URL + "serve")) 65 response = self.opener.open(request).read() 66 self.assertTrue(response) 67 68 def _testServeKey(self, key): # pylint: disable-msg=C6409 69 request = urllib2.Request(AUTH_URL % (SERVER_URL + "serve/" + key)) 70 response = self.opener.open(request).read() 71 self.assertTrue(response) 72 73 def _testDelKey(self, key): # pylint: disable-msg=C6409 74 # There is no response to a delete request. 75 # We will check the listAll page to ensure there is no data. 76 request = urllib2.Request(AUTH_URL % (SERVER_URL + "del/" + key)) 77 response = self.opener.open(request).read() 78 request = urllib2.Request(AUTH_URL % (SERVER_URL + "serve")) 79 response = self.opener.open(request).read() 80 self.assertFalse(response) 81 82 83def LaunchLocalServer(): 84 """Launch and store an authentication cookie with a local server.""" 85 proc = subprocess.Popen(["dev_appserver.py", "--clear_datastore", SERVER_DIR], 86 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 87 # Wait for server to come up 88 while True: 89 time.sleep(1) 90 try: 91 request = urllib2.Request(SERVER_URL + "serve") 92 response = urllib2.urlopen(request).read() 93 if response: 94 break 95 except urllib2.URLError: 96 continue 97 return proc 98 99 100if __name__ == "__main__": 101 unittest.main() 102 103