1#!/usr/bin/python
2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import atexit
7import errno
8import logging
9import re
10import sys
11import socket
12import threading
13import xmlrpclib
14
15import rpm_controller
16import rpm_logging_config
17
18from config import rpm_config
19from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer
20from rpm_infrastructure_exception import RPMInfrastructureException
21
22import common
23from autotest_lib.site_utils.rpm_control_system import utils
24
25LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','dispatcher_logname_format')
26
27
28class RPMDispatcher(object):
29    """
30    This class is the RPM dispatcher server and it is responsible for
31    communicating directly to the RPM devices to change a DUT's outlet status.
32
33    When an RPMDispatcher is initialized it registers itself with the frontend
34    server, who will field out outlet requests to this dispatcher.
35
36    Once a request is received the dispatcher looks up the RPMController
37    instance for the given DUT and then queues up the request and blocks until
38    it is processed.
39
40    @var _address: IP address or Hostname of this dispatcher server.
41    @var _frontend_server: URI of the frontend server.
42    @var _lock: Lock used to synchronize access to _worker_dict.
43    @var _port: Port assigned to this server instance.
44    @var _worker_dict: Dictionary mapping RPM hostname's to RPMController
45                       instances.
46    """
47
48
49    def __init__(self, address, port):
50        """
51        RPMDispatcher constructor.
52
53        Initialized instance vars and registers this server with the frontend
54        server.
55
56        @param address: Address of this dispatcher server.
57        @param port: Port assigned to this dispatcher server.
58
59        @raise RPMInfrastructureException: Raised if the dispatch server is
60                                           unable to register with the frontend
61                                           server.
62        """
63        self._address = address
64        self._port = port
65        self._lock = threading.Lock()
66        self._worker_dict = {}
67        self._frontend_server = rpm_config.get('RPM_INFRASTRUCTURE',
68                                               'frontend_uri')
69        logging.info('Registering this rpm dispatcher with the frontend '
70                     'server at %s.', self._frontend_server)
71        client = xmlrpclib.ServerProxy(self._frontend_server)
72        # De-register with the frontend when the dispatcher exit's.
73        atexit.register(self._unregister)
74        try:
75            client.register_dispatcher(self._get_serveruri())
76        except socket.error as er:
77            err_msg = ('Unable to register with frontend server. Error: %s.' %
78                       errno.errorcode[er.errno])
79            logging.error(err_msg)
80            raise RPMInfrastructureException(err_msg)
81
82
83    def _worker_dict_put(self, key, value):
84        """
85        Private method used to synchronize access to _worker_dict.
86
87        @param key: key value we are using to access _worker_dict.
88        @param value: value we are putting into _worker_dict.
89        """
90        with self._lock:
91            self._worker_dict[key] = value
92
93
94    def _worker_dict_get(self, key):
95        """
96        Private method used to synchronize access to _worker_dict.
97
98        @param key: key value we are using to access _worker_dict.
99        @return: value found when accessing _worker_dict
100        """
101        with self._lock:
102            return self._worker_dict.get(key)
103
104
105    def is_up(self):
106        """
107        Allows the frontend server to see if the dispatcher server is up before
108        attempting to queue requests.
109
110        @return: True. If connection fails, the client proxy will throw a socket
111                 error on the client side.
112        """
113        return True
114
115
116    def queue_request(self, powerunit_info_dict, new_state):
117        """
118        Looks up the appropriate RPMController instance for the device and queues
119        up the request.
120
121        @param powerunit_info_dict: A dictionary, containing the attribute/values
122                                    of an unmarshalled PowerUnitInfo instance.
123        @param new_state: [ON, OFF, CYCLE] state we want to the change the
124                          outlet to.
125        @return: True if the attempt to change power state was successful,
126                 False otherwise.
127        """
128        powerunit_info = utils.PowerUnitInfo(**powerunit_info_dict)
129        logging.info('Received request to set device: %s to state: %s',
130                     powerunit_info.device_hostname, new_state)
131        rpm_controller = self._get_rpm_controller(
132                powerunit_info.powerunit_hostname,
133                powerunit_info.hydra_hostname)
134        return rpm_controller.queue_request(powerunit_info, new_state)
135
136
137    def _get_rpm_controller(self, rpm_hostname, hydra_hostname=None):
138        """
139        Private method that retreives the appropriate RPMController instance
140        for this RPM Hostname or calls _create_rpm_controller it if it does not
141        already exist.
142
143        @param rpm_hostname: hostname of the RPM whose RPMController we want.
144
145        @return: RPMController instance responsible for this RPM.
146        """
147        if not rpm_hostname:
148            return None
149        rpm_controller = self._worker_dict_get(rpm_hostname)
150        if not rpm_controller:
151            rpm_controller = self._create_rpm_controller(
152                    rpm_hostname, hydra_hostname)
153            self._worker_dict_put(rpm_hostname, rpm_controller)
154        return rpm_controller
155
156
157    def _create_rpm_controller(self, rpm_hostname, hydra_hostname):
158        """
159        Determines the type of RPMController required and initializes it.
160
161        @param rpm_hostname: Hostname of the RPM we need to communicate with.
162
163        @return: RPMController instance responsible for this RPM.
164        """
165        hostname_elements = rpm_hostname.split('-')
166        if hostname_elements[-2] == 'poe':
167            # POE switch hostname looks like 'chromeos2-poe-switch1'.
168            logging.info('The controller is a Cisco POE switch.')
169            return rpm_controller.CiscoPOEController(rpm_hostname)
170        else:
171            # The device is an RPM.
172            rack_id = hostname_elements[-2]
173            rpm_typechecker = re.compile('rack[0-9]+[a-z]+')
174            if rpm_typechecker.match(rack_id):
175                logging.info('RPM is a webpowered device.')
176                return rpm_controller.WebPoweredRPMController(rpm_hostname)
177            else:
178                logging.info('RPM is a Sentry CDU device.')
179                return rpm_controller.SentryRPMController(
180                        hostname=rpm_hostname,
181                        hydra_hostname=hydra_hostname)
182
183
184    def _get_serveruri(self):
185        """
186        Formats the _address and _port into a meaningful URI string.
187
188        @return: URI of this dispatch server.
189        """
190        return 'http://%s:%d' % (self._address, self._port)
191
192
193    def _unregister(self):
194        """
195        Tells the frontend server that this dispatch server is shutting down and
196        to unregister it.
197
198        Called by atexit.
199
200        @raise RPMInfrastructureException: Raised if the dispatch server is
201                                           unable to unregister with the
202                                           frontend server.
203        """
204        logging.info('Dispatch server shutting down. Unregistering with RPM '
205                     'frontend server.')
206        client = xmlrpclib.ServerProxy(self._frontend_server)
207        try:
208            client.unregister_dispatcher(self._get_serveruri())
209        except socket.error as er:
210            err_msg = ('Unable to unregister with frontend server. Error: %s.' %
211                       errno.errorcode[er.errno])
212            logging.error(err_msg)
213            raise RPMInfrastructureException(err_msg)
214
215
216def launch_server_on_unused_port():
217    """
218    Looks up an unused port on this host and launches the xmlrpc server.
219
220    Useful for testing by running multiple dispatch servers on the same host.
221
222    @return: server,port - server object and the port that which it is listening
223             to.
224    """
225    address = socket.gethostbyname(socket.gethostname())
226    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
227    # Set this socket to allow reuse.
228    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
229    sock.bind(('', 0))
230    port = sock.getsockname()[1]
231    server = MultiThreadedXMLRPCServer((address, port),
232                                       allow_none=True)
233    sock.close()
234    return server, port
235
236
237if __name__ == '__main__':
238    """
239    Main function used to launch the dispatch server. Creates an instance of
240    RPMDispatcher and registers it to a MultiThreadedXMLRPCServer instance.
241    """
242    if len(sys.argv) > 1:
243      print 'Usage: ./%s, no arguments available.' % sys.argv[0]
244      sys.exit(1)
245    rpm_logging_config.start_log_server(LOG_FILENAME_FORMAT)
246    rpm_logging_config.set_up_logging(use_log_server=True)
247
248    # Get the local ip _address and set the server to utilize it.
249    address = socket.gethostbyname(socket.gethostname())
250    server, port = launch_server_on_unused_port()
251    rpm_dispatcher = RPMDispatcher(address, port)
252    server.register_instance(rpm_dispatcher)
253    server.serve_forever()
254