1#!/usr/bin/python
2#
3# Copyright Gregory P. Smith, Google Inc 2008
4# Released under the GPL v2
5
6"""Tests for server.frontend."""
7
8from cStringIO import StringIO
9import os, sys, unittest
10import common
11from autotest_lib.client.common_lib import global_config
12from autotest_lib.client.common_lib import utils
13from autotest_lib.client.common_lib.test_utils import mock
14from autotest_lib.frontend.afe import rpc_client_lib
15from autotest_lib.server import frontend
16
17GLOBAL_CONFIG = global_config.global_config
18
19
20class BaseRpcClientTest(unittest.TestCase):
21    def setUp(self):
22        self.god = mock.mock_god()
23        self.god.mock_up(rpc_client_lib, 'rpc_client_lib')
24        self.god.stub_function(utils, 'send_email')
25        self._saved_environ = dict(os.environ)
26        if 'AUTOTEST_WEB' in os.environ:
27            del os.environ['AUTOTEST_WEB']
28
29
30    def tearDown(self):
31        self.god.unstub_all()
32        os.environ.clear()
33        os.environ.update(self._saved_environ)
34
35
36class RpcClientTest(BaseRpcClientTest):
37    def test_init(self):
38        os.environ['LOGNAME'] = 'unittest-user'
39        GLOBAL_CONFIG.override_config_value('SERVER', 'hostname', 'test-host')
40        rpc_client_lib.get_proxy.expect_call(
41                'http://test-host/path',
42                headers={'AUTHORIZATION': 'unittest-user'})
43        frontend.RpcClient('/path', None, None, None, None, None)
44        self.god.check_playback()
45
46
47class AFETest(BaseRpcClientTest):
48    def test_result_notify(self):
49        class fake_job(object):
50            result = True
51            name = 'nameFoo'
52            id = 'idFoo'
53            results_platform_map = {'NORAD' : {'Seeking_Joshua': ['WOPR']}}
54        GLOBAL_CONFIG.override_config_value('SERVER', 'hostname', 'chess')
55        rpc_client_lib.get_proxy.expect_call(
56                'http://chess/afe/server/noauth/rpc/',
57                headers={'AUTHORIZATION': 'david'})
58        self.god.stub_function(utils, 'send_email')
59        utils.send_email.expect_any_call()
60
61        my_afe = frontend.AFE(user='david')
62
63        fake_stdout = StringIO()
64        self.god.stub_with(sys, 'stdout', fake_stdout)
65        my_afe.result_notify(fake_job, 'userA', 'userB')
66        self.god.unstub(sys, 'stdout')
67        fake_stdout = fake_stdout.getvalue()
68
69        self.god.check_playback()
70
71        self.assert_('PASSED' in fake_stdout)
72        self.assert_('WOPR' in fake_stdout)
73        self.assert_('http://chess/tko/compose_query.cgi?' in fake_stdout)
74        self.assert_('columns=test' in fake_stdout)
75        self.assert_('rows=machine_group' in fake_stdout)
76        self.assert_("condition=tag~'idFoo-%25'" in fake_stdout)
77        self.assert_('title=Report' in fake_stdout)
78        self.assert_('Sending email' in fake_stdout)
79
80
81
82if __name__ == '__main__':
83    unittest.main()
84