1#!/usr/bin/python 2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import atexit 7import errno 8import logging 9import re 10import sys 11import socket 12import threading 13import xmlrpclib 14 15import rpm_controller 16import rpm_logging_config 17 18from config import rpm_config 19from MultiThreadedXMLRPCServer import MultiThreadedXMLRPCServer 20from rpm_infrastructure_exception import RPMInfrastructureException 21 22import common 23from autotest_lib.site_utils.rpm_control_system import utils 24 25LOG_FILENAME_FORMAT = rpm_config.get('GENERAL','dispatcher_logname_format') 26 27 28class RPMDispatcher(object): 29 """ 30 This class is the RPM dispatcher server and it is responsible for 31 communicating directly to the RPM devices to change a DUT's outlet status. 32 33 When an RPMDispatcher is initialized it registers itself with the frontend 34 server, who will field out outlet requests to this dispatcher. 35 36 Once a request is received the dispatcher looks up the RPMController 37 instance for the given DUT and then queues up the request and blocks until 38 it is processed. 39 40 @var _address: IP address or Hostname of this dispatcher server. 41 @var _frontend_server: URI of the frontend server. 42 @var _lock: Lock used to synchronize access to _worker_dict. 43 @var _port: Port assigned to this server instance. 44 @var _worker_dict: Dictionary mapping RPM hostname's to RPMController 45 instances. 46 """ 47 48 49 def __init__(self, address, port): 50 """ 51 RPMDispatcher constructor. 52 53 Initialized instance vars and registers this server with the frontend 54 server. 55 56 @param address: Address of this dispatcher server. 57 @param port: Port assigned to this dispatcher server. 58 59 @raise RPMInfrastructureException: Raised if the dispatch server is 60 unable to register with the frontend 61 server. 62 """ 63 self._address = address 64 self._port = port 65 self._lock = threading.Lock() 66 self._worker_dict = {} 67 self._frontend_server = rpm_config.get('RPM_INFRASTRUCTURE', 68 'frontend_uri') 69 logging.info('Registering this rpm dispatcher with the frontend ' 70 'server at %s.', self._frontend_server) 71 client = xmlrpclib.ServerProxy(self._frontend_server) 72 # De-register with the frontend when the dispatcher exit's. 73 atexit.register(self._unregister) 74 try: 75 client.register_dispatcher(self._get_serveruri()) 76 except socket.error as er: 77 err_msg = ('Unable to register with frontend server. Error: %s.' % 78 errno.errorcode[er.errno]) 79 logging.error(err_msg) 80 raise RPMInfrastructureException(err_msg) 81 82 83 def _worker_dict_put(self, key, value): 84 """ 85 Private method used to synchronize access to _worker_dict. 86 87 @param key: key value we are using to access _worker_dict. 88 @param value: value we are putting into _worker_dict. 89 """ 90 with self._lock: 91 self._worker_dict[key] = value 92 93 94 def _worker_dict_get(self, key): 95 """ 96 Private method used to synchronize access to _worker_dict. 97 98 @param key: key value we are using to access _worker_dict. 99 @return: value found when accessing _worker_dict 100 """ 101 with self._lock: 102 return self._worker_dict.get(key) 103 104 105 def is_up(self): 106 """ 107 Allows the frontend server to see if the dispatcher server is up before 108 attempting to queue requests. 109 110 @return: True. If connection fails, the client proxy will throw a socket 111 error on the client side. 112 """ 113 return True 114 115 116 def queue_request(self, powerunit_info_dict, new_state): 117 """ 118 Looks up the appropriate RPMController instance for the device and queues 119 up the request. 120 121 @param powerunit_info_dict: A dictionary, containing the attribute/values 122 of an unmarshalled PowerUnitInfo instance. 123 @param new_state: [ON, OFF, CYCLE] state we want to the change the 124 outlet to. 125 @return: True if the attempt to change power state was successful, 126 False otherwise. 127 """ 128 powerunit_info = utils.PowerUnitInfo(**powerunit_info_dict) 129 logging.info('Received request to set device: %s to state: %s', 130 powerunit_info.device_hostname, new_state) 131 rpm_controller = self._get_rpm_controller( 132 powerunit_info.powerunit_hostname, 133 powerunit_info.hydra_hostname) 134 return rpm_controller.queue_request(powerunit_info, new_state) 135 136 137 def _get_rpm_controller(self, rpm_hostname, hydra_hostname=None): 138 """ 139 Private method that retreives the appropriate RPMController instance 140 for this RPM Hostname or calls _create_rpm_controller it if it does not 141 already exist. 142 143 @param rpm_hostname: hostname of the RPM whose RPMController we want. 144 145 @return: RPMController instance responsible for this RPM. 146 """ 147 if not rpm_hostname: 148 return None 149 rpm_controller = self._worker_dict_get(rpm_hostname) 150 if not rpm_controller: 151 rpm_controller = self._create_rpm_controller( 152 rpm_hostname, hydra_hostname) 153 self._worker_dict_put(rpm_hostname, rpm_controller) 154 return rpm_controller 155 156 157 def _create_rpm_controller(self, rpm_hostname, hydra_hostname): 158 """ 159 Determines the type of RPMController required and initializes it. 160 161 @param rpm_hostname: Hostname of the RPM we need to communicate with. 162 163 @return: RPMController instance responsible for this RPM. 164 """ 165 hostname_elements = rpm_hostname.split('-') 166 if hostname_elements[-2] == 'poe': 167 # POE switch hostname looks like 'chromeos2-poe-switch1'. 168 logging.info('The controller is a Cisco POE switch.') 169 return rpm_controller.CiscoPOEController(rpm_hostname) 170 else: 171 # The device is an RPM. 172 rack_id = hostname_elements[-2] 173 rpm_typechecker = re.compile('rack[0-9]+[a-z]+') 174 if rpm_typechecker.match(rack_id): 175 logging.info('RPM is a webpowered device.') 176 return rpm_controller.WebPoweredRPMController(rpm_hostname) 177 else: 178 logging.info('RPM is a Sentry CDU device.') 179 return rpm_controller.SentryRPMController( 180 hostname=rpm_hostname, 181 hydra_hostname=hydra_hostname) 182 183 184 def _get_serveruri(self): 185 """ 186 Formats the _address and _port into a meaningful URI string. 187 188 @return: URI of this dispatch server. 189 """ 190 return 'http://%s:%d' % (self._address, self._port) 191 192 193 def _unregister(self): 194 """ 195 Tells the frontend server that this dispatch server is shutting down and 196 to unregister it. 197 198 Called by atexit. 199 200 @raise RPMInfrastructureException: Raised if the dispatch server is 201 unable to unregister with the 202 frontend server. 203 """ 204 logging.info('Dispatch server shutting down. Unregistering with RPM ' 205 'frontend server.') 206 client = xmlrpclib.ServerProxy(self._frontend_server) 207 try: 208 client.unregister_dispatcher(self._get_serveruri()) 209 except socket.error as er: 210 err_msg = ('Unable to unregister with frontend server. Error: %s.' % 211 errno.errorcode[er.errno]) 212 logging.error(err_msg) 213 raise RPMInfrastructureException(err_msg) 214 215 216def launch_server_on_unused_port(): 217 """ 218 Looks up an unused port on this host and launches the xmlrpc server. 219 220 Useful for testing by running multiple dispatch servers on the same host. 221 222 @return: server,port - server object and the port that which it is listening 223 to. 224 """ 225 address = socket.gethostbyname(socket.gethostname()) 226 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 227 # Set this socket to allow reuse. 228 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 229 sock.bind(('', 0)) 230 port = sock.getsockname()[1] 231 server = MultiThreadedXMLRPCServer((address, port), 232 allow_none=True) 233 sock.close() 234 return server, port 235 236 237if __name__ == '__main__': 238 """ 239 Main function used to launch the dispatch server. Creates an instance of 240 RPMDispatcher and registers it to a MultiThreadedXMLRPCServer instance. 241 """ 242 if len(sys.argv) > 1: 243 print 'Usage: ./%s, no arguments available.' % sys.argv[0] 244 sys.exit(1) 245 rpm_logging_config.start_log_server(LOG_FILENAME_FORMAT) 246 rpm_logging_config.set_up_logging(use_log_server=True) 247 248 # Get the local ip _address and set the server to utilize it. 249 address = socket.gethostbyname(socket.gethostname()) 250 server, port = launch_server_on_unused_port() 251 rpm_dispatcher = RPMDispatcher(address, port) 252 server.register_instance(rpm_dispatcher) 253 server.serve_forever() 254