1# Copyright (C) 2011 Google Inc. All rights reserved. 2# 3# Redistribution and use in source and binary forms, with or without 4# modification, are permitted provided that the following conditions are 5# met: 6# 7# * Redistributions of source code must retain the above copyright 8# notice, this list of conditions and the following disclaimer. 9# * Redistributions in binary form must reproduce the above 10# copyright notice, this list of conditions and the following disclaimer 11# in the documentation and/or other materials provided with the 12# distribution. 13# * Neither the name of Google Inc. nor the names of its 14# contributors may be used to endorse or promote products derived from 15# this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 18# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 19# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 20# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 21# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 22# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 23# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 24# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 25# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 26# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 27# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 28 29"""Module for handling messaging for run-webkit-tests. 30 31This module implements a simple message broker abstraction that will be 32used to coordinate messages between the main run-webkit-tests thread 33(aka TestRunner) and the individual worker threads (previously known as 34dump_render_tree_threads). 35 36The broker simply distributes messages onto topics (named queues); the actual 37queues themselves are provided by the caller, as the queue's implementation 38requirements varies vary depending on the desired concurrency model 39(none/threads/processes). 40 41In order for shared-nothing messaging between processing to be possible, 42Messages must be picklable. 43 44The module defines one interface and two classes. Callers of this package 45must implement the BrokerClient interface, and most callers will create 46BrokerConnections as well as Brokers. 47 48The classes relate to each other as: 49 50 BrokerClient ------> BrokerConnection 51 ^ | 52 | v 53 \---------------- Broker 54 55(The BrokerClient never calls broker directly after it is created, only 56BrokerConnection. BrokerConnection passes a reference to BrokerClient to 57Broker, and Broker only invokes that reference, never talking directly to 58BrokerConnection). 59""" 60 61import cPickle 62import logging 63import Queue 64import time 65 66 67_log = logging.getLogger(__name__) 68 69 70class BrokerClient(object): 71 """Abstract base class / interface that all message broker clients must 72 implement. In addition to the methods below, by convention clients 73 implement routines of the signature type 74 75 handle_MESSAGE_NAME(self, src, ...): 76 77 where MESSAGE_NAME matches the string passed to post_message(), and 78 src indicates the name of the sender. If the message contains values in 79 the message body, those will be provided as optparams.""" 80 81 def __init__(self, *optargs, **kwargs): 82 raise NotImplementedError 83 84 def is_done(self): 85 """Called from inside run_message_loop() to indicate whether to exit.""" 86 raise NotImplementedError 87 88 def name(self): 89 """Return a name that identifies the client.""" 90 raise NotImplementedError 91 92 93class Broker(object): 94 """Brokers provide the basic model of a set of topics. Clients can post a 95 message to any topic using post_message(), and can process messages on one 96 topic at a time using run_message_loop().""" 97 98 def __init__(self, options, queue_maker): 99 """Args: 100 options: a runtime option class from optparse 101 queue_maker: a factory method that returns objects implementing a 102 Queue interface (put()/get()). 103 """ 104 self._options = options 105 self._queue_maker = queue_maker 106 self._topics = {} 107 108 def add_topic(self, topic_name): 109 if topic_name not in self._topics: 110 self._topics[topic_name] = self._queue_maker() 111 112 def _get_queue_for_topic(self, topic_name): 113 return self._topics[topic_name] 114 115 def post_message(self, client, topic_name, message_name, *message_args): 116 """Post a message to the appropriate topic name. 117 118 Messages have a name and a tuple of optional arguments. Both must be picklable.""" 119 message = _Message(client.name(), topic_name, message_name, message_args) 120 queue = self._get_queue_for_topic(topic_name) 121 queue.put(_Message.dumps(message)) 122 123 def run_message_loop(self, topic_name, client, delay_secs=None): 124 """Loop processing messages until client.is_done() or delay passes. 125 126 To run indefinitely, set delay_secs to None.""" 127 assert delay_secs is None or delay_secs > 0 128 self._run_loop(topic_name, client, block=True, delay_secs=delay_secs) 129 130 def run_all_pending(self, topic_name, client): 131 """Process messages until client.is_done() or caller would block.""" 132 self._run_loop(topic_name, client, block=False, delay_secs=None) 133 134 def _run_loop(self, topic_name, client, block, delay_secs): 135 queue = self._get_queue_for_topic(topic_name) 136 while not client.is_done(): 137 try: 138 s = queue.get(block, delay_secs) 139 except Queue.Empty: 140 return 141 msg = _Message.loads(s) 142 self._dispatch_message(msg, client) 143 144 def _dispatch_message(self, message, client): 145 if not hasattr(client, 'handle_' + message.name): 146 raise ValueError( 147 "%s: received message '%s' it couldn't handle" % 148 (client.name(), message.name)) 149 optargs = message.args 150 message_handler = getattr(client, 'handle_' + message.name) 151 message_handler(message.src, *optargs) 152 153 154class _Message(object): 155 @staticmethod 156 def loads(str): 157 obj = cPickle.loads(str) 158 assert(isinstance(obj, _Message)) 159 return obj 160 161 def __init__(self, src, topic_name, message_name, message_args): 162 self.src = src 163 self.topic_name = topic_name 164 self.name = message_name 165 self.args = message_args 166 167 def dumps(self): 168 return cPickle.dumps(self) 169 170 def __repr__(self): 171 return ("_Message(from='%s', topic_name='%s', message_name='%s')" % 172 (self.src, self.topic_name, self.name)) 173 174 175class BrokerConnection(object): 176 """BrokerConnection provides a connection-oriented facade on top of a 177 Broker, so that callers don't have to repeatedly pass the same topic 178 names over and over.""" 179 180 def __init__(self, broker, client, run_topic, post_topic): 181 """Create a BrokerConnection on top of a Broker. Note that the Broker 182 is passed in rather than created so that a single Broker can be used 183 by multiple BrokerConnections.""" 184 self._broker = broker 185 self._client = client 186 self._post_topic = post_topic 187 self._run_topic = run_topic 188 broker.add_topic(run_topic) 189 broker.add_topic(post_topic) 190 191 def run_message_loop(self, delay_secs=None): 192 self._broker.run_message_loop(self._run_topic, self._client, delay_secs) 193 194 def post_message(self, message_name, *message_args): 195 self._broker.post_message(self._client, self._post_topic, 196 message_name, *message_args) 197