1# Copyright 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging, threading
6
7from autotest_lib.client.common_lib import error
8from autotest_lib.client.cros.enterprise import enterprise_policy_base
9from SocketServer import ThreadingTCPServer, StreamRequestHandler
10
11
12class ProxyHandler(StreamRequestHandler):
13    """Provide request handler for the Threaded Proxy Listener."""
14
15    def handle(self):
16        """Get URL of request from first line.
17
18        Read the first line of the request, up to 40 characters, and look
19        for the URL of the request. If found, save it to the URL list.
20
21        Note: All requests are sent an HTTP 504 error.
22        """
23        # Capture URL in first 40 chars of request.
24        data = self.rfile.readline(40).strip()
25        logging.debug('ProxyHandler::handle(): <%s>', data)
26        self.server.store_requests_received(data)
27        self.wfile.write('HTTP/1.1 504 Gateway Timeout\r\n'
28                         'Connection: close\r\n\r\n')
29
30
31class ThreadedProxyServer(ThreadingTCPServer):
32    """Provide a Threaded Proxy Server to service and save requests.
33
34    Define a Threaded Proxy Server which services requests, and allows the
35    handler to save all requests.
36    """
37
38    def __init__(self, server_address, HandlerClass):
39        """Constructor.
40
41        @param server_address: tuple of server IP and port to listen on.
42        @param HandlerClass: the RequestHandler class to instantiate per req.
43        """
44        self.requests_received = []
45        ThreadingTCPServer.allow_reuse_address = True
46        ThreadingTCPServer.__init__(self, server_address, HandlerClass)
47
48    def store_requests_received(self, request):
49        """Add receieved request to list.
50
51        @param request: request received by the proxy server.
52        """
53        self.requests_received.append(request)
54
55
56class ProxyListener(object):
57    """Provide a Proxy Listener to detect connect requests.
58
59    Define a proxy listener to detect when a CONNECT request is seen at the
60    given |server_address|, and record all requests received. Requests
61    received are exposed to the caller.
62    """
63
64    def __init__(self, server_address):
65        """Constructor.
66
67        @param server_address: tuple of server IP and port to listen on.
68        """
69        self._server = ThreadedProxyServer(server_address, ProxyHandler)
70        self._thread = threading.Thread(target=self._server.serve_forever)
71
72    def run(self):
73        """Start the server by activating it's thread."""
74        self._thread.start()
75
76    def stop(self):
77        """Stop the server and its threads."""
78        self._server.server_close()
79        self._thread.join()
80
81    def get_requests_received(self):
82        """Get list of received requests."""
83        return self._server.requests_received
84
85    def reset_requests_received(self):
86        """Clear list of received requests."""
87        self._server.requests_received = []
88
89
90class policy_ProxySettings(enterprise_policy_base.EnterprisePolicyTest):
91    """Test effect of ProxySettings policy on Chrome OS behavior.
92
93    This test verifies the behavior of Chrome OS for specific configurations
94    of the ProxySettings use policy: None (undefined), ProxyMode=direct,
95    ProxyMode=fixed_servers, ProxyMode=pac_script. None means that the policy
96    value is not set. This induces the default behavior, equivalent to what is
97    seen by an un-managed user.
98
99    When ProxySettings is None (undefined) or ProxyMode=direct, then no proxy
100    server should be used. When ProxyMode=fixed_servers or pac_script, then
101    the proxy server address specified by the ProxyServer or ProxyPacUrl
102    entry should be used.
103    """
104    version = 1
105
106    def initialize(self, **kwargs):
107        self._initialize_test_constants()
108        super(policy_ProxySettings, self).initialize(**kwargs)
109        self._proxy_server = ProxyListener(('', self.PROXY_PORT))
110        self._proxy_server.run()
111        self.start_webserver()
112
113
114    def _initialize_test_constants(self):
115        """Initialize test-specific constants, some from class constants."""
116        self.POLICY_NAME = 'ProxySettings'
117        self.PROXY_PORT = 3128
118        self.PAC_FILE = 'proxy_test.pac'
119        self.FIXED_PROXY = {
120            'ProxyBypassList': 'www.google.com,www.googleapis.com',
121            'ProxyMode': 'fixed_servers',
122            'ProxyServer': 'localhost:%s' % self.PROXY_PORT
123        }
124        self.PAC_PROXY = {
125            'ProxyMode': 'pac_script',
126            'ProxyPacUrl': '%s/%s' % (self.WEB_HOST, self.PAC_FILE)
127        }
128        self.DIRECT_PROXY = {
129            'ProxyMode': 'direct'
130        }
131        self.TEST_URL = 'http://www.wired.com/'
132        self.TEST_CASES = {
133            'FixedProxy_UseFixedProxy': self.FIXED_PROXY,
134            'PacProxy_UsePacFile': self.PAC_PROXY,
135            'DirectProxy_UseNoProxy': self.DIRECT_PROXY,
136            'NotSet_UseNoProxy': None
137        }
138
139
140    def cleanup(self):
141        self._proxy_server.stop()
142        super(policy_ProxySettings, self).cleanup()
143
144
145    def _test_proxy_configuration(self, policy_value):
146        """Verify CrOS enforces the specified ProxySettings configuration.
147
148        @param policy_value: policy value expected.
149        """
150        self._proxy_server.reset_requests_received()
151        self.navigate_to_url(self.TEST_URL)
152        proxied_requests = self._proxy_server.get_requests_received()
153
154        # Determine whether TEST_URL is in |proxied_requests|. Comprehension
155        # is conceptually equivalent to `TEST_URL in proxied_requests`;
156        # however, we must do partial matching since TEST_URL and the
157        # elements inside |proxied_requests| are not necessarily equal, i.e.,
158        # TEST_URL is a substring of the received request.
159        matching_requests = [request for request in proxied_requests
160                             if self.TEST_URL in request]
161        logging.info('matching_requests: %s', matching_requests)
162
163        mode = policy_value['ProxyMode'] if policy_value else None
164        if mode is None or mode == 'direct':
165            if matching_requests:
166                raise error.TestFail('Requests should not have been sent '
167                                     'through the proxy server.')
168        elif mode == 'fixed_servers' or mode == 'pac_script':
169            if not matching_requests:
170                raise error.TestFail('Requests should have been sent '
171                                     'through the proxy server.')
172        else:
173            raise error.TestFail('Unrecognized Policy Value %s', policy_value)
174
175
176    def run_test_case(self, case):
177        """Setup and run the test configured for the specified test case.
178
179        @param case: Name of the test case to run.
180        """
181        case_value = self.TEST_CASES[case]
182        self.setup_case(self.POLICY_NAME, case_value)
183        self._test_proxy_configuration(case_value)
184