1# Copyright (C) 2011 Google Inc. All rights reserved.
2#
3# Redistribution and use in source and binary forms, with or without
4# modification, are permitted provided that the following conditions are
5# met:
6#
7#     * Redistributions of source code must retain the above copyright
8# notice, this list of conditions and the following disclaimer.
9#     * Redistributions in binary form must reproduce the above
10# copyright notice, this list of conditions and the following disclaimer
11# in the documentation and/or other materials provided with the
12# distribution.
13#     * Neither the name of Google Inc. nor the names of its
14# contributors may be used to endorse or promote products derived from
15# this software without specific prior written permission.
16#
17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
29"""Module for handling messaging for run-webkit-tests.
30
31This module implements a simple message broker abstraction that will be
32used to coordinate messages between the main run-webkit-tests thread
33(aka TestRunner) and the individual worker threads (previously known as
34dump_render_tree_threads).
35
36The broker simply distributes messages onto topics (named queues); the actual
37queues themselves are provided by the caller, as the queue's implementation
38requirements varies vary depending on the desired concurrency model
39(none/threads/processes).
40
41In order for shared-nothing messaging between processing to be possible,
42Messages must be picklable.
43
44The module defines one interface and two classes. Callers of this package
45must implement the BrokerClient interface, and most callers will create
46BrokerConnections as well as Brokers.
47
48The classes relate to each other as:
49
50    BrokerClient   ------>    BrokerConnection
51         ^                         |
52         |                         v
53         \----------------      Broker
54
55(The BrokerClient never calls broker directly after it is created, only
56BrokerConnection.  BrokerConnection passes a reference to BrokerClient to
57Broker, and Broker only invokes that reference, never talking directly to
58BrokerConnection).
59"""
60
61import cPickle
62import logging
63import Queue
64import time
65
66
67_log = logging.getLogger(__name__)
68
69
70class BrokerClient(object):
71    """Abstract base class / interface that all message broker clients must
72    implement. In addition to the methods below, by convention clients
73    implement routines of the signature type
74
75        handle_MESSAGE_NAME(self, src, ...):
76
77    where MESSAGE_NAME matches the string passed to post_message(), and
78    src indicates the name of the sender. If the message contains values in
79    the message body, those will be provided as optparams."""
80
81    def __init__(self, *optargs, **kwargs):
82        raise NotImplementedError
83
84    def is_done(self):
85        """Called from inside run_message_loop() to indicate whether to exit."""
86        raise NotImplementedError
87
88    def name(self):
89        """Return a name that identifies the client."""
90        raise NotImplementedError
91
92
93class Broker(object):
94    """Brokers provide the basic model of a set of topics. Clients can post a
95    message to any topic using post_message(), and can process messages on one
96    topic at a time using run_message_loop()."""
97
98    def __init__(self, options, queue_maker):
99        """Args:
100            options: a runtime option class from optparse
101            queue_maker: a factory method that returns objects implementing a
102                Queue interface (put()/get()).
103        """
104        self._options = options
105        self._queue_maker = queue_maker
106        self._topics = {}
107
108    def add_topic(self, topic_name):
109        if topic_name not in self._topics:
110            self._topics[topic_name] = self._queue_maker()
111
112    def _get_queue_for_topic(self, topic_name):
113        return self._topics[topic_name]
114
115    def post_message(self, client, topic_name, message_name, *message_args):
116        """Post a message to the appropriate topic name.
117
118        Messages have a name and a tuple of optional arguments. Both must be picklable."""
119        message = _Message(client.name(), topic_name, message_name, message_args)
120        queue = self._get_queue_for_topic(topic_name)
121        queue.put(_Message.dumps(message))
122
123    def run_message_loop(self, topic_name, client, delay_secs=None):
124        """Loop processing messages until client.is_done() or delay passes.
125
126        To run indefinitely, set delay_secs to None."""
127        assert delay_secs is None or delay_secs > 0
128        self._run_loop(topic_name, client, block=True, delay_secs=delay_secs)
129
130    def run_all_pending(self, topic_name, client):
131        """Process messages until client.is_done() or caller would block."""
132        self._run_loop(topic_name, client, block=False, delay_secs=None)
133
134    def _run_loop(self, topic_name, client, block, delay_secs):
135        queue = self._get_queue_for_topic(topic_name)
136        while not client.is_done():
137            try:
138                s = queue.get(block, delay_secs)
139            except Queue.Empty:
140                return
141            msg = _Message.loads(s)
142            self._dispatch_message(msg, client)
143
144    def _dispatch_message(self, message, client):
145        if not hasattr(client, 'handle_' + message.name):
146            raise ValueError(
147               "%s: received message '%s' it couldn't handle" %
148               (client.name(), message.name))
149        optargs = message.args
150        message_handler = getattr(client, 'handle_' + message.name)
151        message_handler(message.src, *optargs)
152
153
154class _Message(object):
155    @staticmethod
156    def loads(str):
157        obj = cPickle.loads(str)
158        assert(isinstance(obj, _Message))
159        return obj
160
161    def __init__(self, src, topic_name, message_name, message_args):
162        self.src = src
163        self.topic_name = topic_name
164        self.name = message_name
165        self.args = message_args
166
167    def dumps(self):
168        return cPickle.dumps(self)
169
170    def __repr__(self):
171        return ("_Message(from='%s', topic_name='%s', message_name='%s')" %
172                (self.src, self.topic_name, self.name))
173
174
175class BrokerConnection(object):
176    """BrokerConnection provides a connection-oriented facade on top of a
177    Broker, so that callers don't have to repeatedly pass the same topic
178    names over and over."""
179
180    def __init__(self, broker, client, run_topic, post_topic):
181        """Create a BrokerConnection on top of a Broker. Note that the Broker
182        is passed in rather than created so that a single Broker can be used
183        by multiple BrokerConnections."""
184        self._broker = broker
185        self._client = client
186        self._post_topic = post_topic
187        self._run_topic = run_topic
188        broker.add_topic(run_topic)
189        broker.add_topic(post_topic)
190
191    def run_message_loop(self, delay_secs=None):
192        self._broker.run_message_loop(self._run_topic, self._client, delay_secs)
193
194    def post_message(self, message_name, *message_args):
195        self._broker.post_message(self._client, self._post_topic,
196                                  message_name, *message_args)
197