15c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# Copyright (C) 2011 Google Inc. All rights reserved.
25c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)#
35c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# Redistribution and use in source and binary forms, with or without
45c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# modification, are permitted provided that the following conditions are
55c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# met:
65c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)#
75c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)#     * Redistributions of source code must retain the above copyright
85c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# notice, this list of conditions and the following disclaimer.
95c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)#     * Redistributions in binary form must reproduce the above
105c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# copyright notice, this list of conditions and the following disclaimer
115c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# in the documentation and/or other materials provided with the
125c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# distribution.
135c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)#     * Neither the name of Google Inc. nor the names of its
145c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# contributors may be used to endorse or promote products derived from
155c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# this software without specific prior written permission.
165c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)#
175c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
185c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
195c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
205c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
215c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
225c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
235c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
245c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
255c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
265c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
275c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
285c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
295c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)"""Module for handling messages and concurrency for run-webkit-tests
305c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)and test-webkitpy. This module follows the design for multiprocessing.Pool
315c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)and concurrency.futures.ProcessPoolExecutor, with the following differences:
325c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
335c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)* Tasks are executed in stateful subprocesses via objects that implement the
345c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)  Worker interface - this allows the workers to share state across tasks.
355c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)* The pool provides an asynchronous event-handling interface so the caller
365c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)  may receive events as tasks are processed.
375c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
385c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)If you don't need these features, use multiprocessing.Pool or concurrency.futures
395c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)intead.
405c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
415c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)"""
425c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
435c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)import cPickle
445c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)import logging
455c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)import multiprocessing
465c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)import Queue
475c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)import sys
485c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)import time
495c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)import traceback
505c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
515c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
525c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)from webkitpy.common.host import Host
535c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)from webkitpy.common.system import stack_utils
545c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
555c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
565c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)_log = logging.getLogger(__name__)
575c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
585c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
5919cde67944066db31e633d9e386f2aa9bf9fadb3Torne (Richard Coles)def get(caller, worker_factory, num_workers, host=None):
605c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    """Returns an object that exposes a run() method that takes a list of test shards and runs them in parallel."""
6119cde67944066db31e633d9e386f2aa9bf9fadb3Torne (Richard Coles)    return _MessagePool(caller, worker_factory, num_workers, host)
625c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
635c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
645c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)class _MessagePool(object):
6519cde67944066db31e633d9e386f2aa9bf9fadb3Torne (Richard Coles)    def __init__(self, caller, worker_factory, num_workers, host=None):
665c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._caller = caller
675c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._worker_factory = worker_factory
685c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._num_workers = num_workers
695c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._workers = []
705c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._workers_stopped = set()
715c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._host = host
725c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._name = 'manager'
735c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._running_inline = (self._num_workers == 1)
745c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        if self._running_inline:
755c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            self._messages_to_worker = Queue.Queue()
765c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            self._messages_to_manager = Queue.Queue()
775c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        else:
785c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            self._messages_to_worker = multiprocessing.Queue()
795c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            self._messages_to_manager = multiprocessing.Queue()
805c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
815c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def __enter__(self):
825c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        return self
835c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
845c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def __exit__(self, exc_type, exc_value, exc_traceback):
855c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._close()
865c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        return False
875c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
885c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def run(self, shards):
895c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        """Posts a list of messages to the pool and waits for them to complete."""
905c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        for message in shards:
915c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            self._messages_to_worker.put(_Message(self._name, message[0], message[1:], from_user=True, logs=()))
925c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
935c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        for _ in xrange(self._num_workers):
945c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            self._messages_to_worker.put(_Message(self._name, 'stop', message_args=(), from_user=False, logs=()))
955c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
965c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self.wait()
975c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
985c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def _start_workers(self):
995c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        assert not self._workers
1005c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._workers_stopped = set()
1015c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        host = None
1025c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        if self._running_inline or self._can_pickle(self._host):
1035c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            host = self._host
1045c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1055c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        for worker_number in xrange(self._num_workers):
1065c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            worker = _Worker(host, self._messages_to_manager, self._messages_to_worker, self._worker_factory, worker_number, self._running_inline, self if self._running_inline else None, self._worker_log_level())
1075c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            self._workers.append(worker)
1085c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            worker.start()
1095c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1105c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def _worker_log_level(self):
1115c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        log_level = logging.NOTSET
1125c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        for handler in logging.root.handlers:
1135c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            if handler.level != logging.NOTSET:
1145c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                if log_level == logging.NOTSET:
1155c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                    log_level = handler.level
1165c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                else:
1175c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                    log_level = min(log_level, handler.level)
1185c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        return log_level
1195c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1205c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def wait(self):
1215c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        try:
1225c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            self._start_workers()
1235c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            if self._running_inline:
1245c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                self._workers[0].run()
1255c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                self._loop(block=False)
1265c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            else:
1275c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                self._loop(block=True)
1285c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        finally:
1295c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            self._close()
1305c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1315c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def _close(self):
1325c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        for worker in self._workers:
1335c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            if worker.is_alive():
1345c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                worker.terminate()
1355c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                worker.join()
1365c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._workers = []
1375c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        if not self._running_inline:
1385c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            # FIXME: This is a hack to get multiprocessing to not log tracebacks during shutdown :(.
1395c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            multiprocessing.util._exiting = True
1405c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            if self._messages_to_worker:
1415c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                self._messages_to_worker.close()
1425c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                self._messages_to_worker = None
1435c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            if self._messages_to_manager:
1445c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                self._messages_to_manager.close()
1455c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                self._messages_to_manager = None
1465c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1475c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def _log_messages(self, messages):
1485c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        for message in messages:
1495c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            logging.root.handle(message)
1505c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1515c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def _handle_done(self, source):
1525c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._workers_stopped.add(source)
1535c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1545c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    @staticmethod
1555c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def _handle_worker_exception(source, exception_type, exception_value, _):
1565c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        if exception_type == KeyboardInterrupt:
1575c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            raise exception_type(exception_value)
1585c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        raise WorkerException(str(exception_value))
1595c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1605c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def _can_pickle(self, host):
1615c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        try:
1625c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            cPickle.dumps(host)
1635c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            return True
1645c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        except TypeError:
1655c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            return False
1665c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1675c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def _loop(self, block):
1685c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        try:
1695c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            while True:
1705c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                if len(self._workers_stopped) == len(self._workers):
1715c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                    block = False
1725c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                message = self._messages_to_manager.get(block)
1735c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                self._log_messages(message.logs)
1745c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                if message.from_user:
1755c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                    self._caller.handle(message.name, message.src, *message.args)
1765c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                    continue
1775c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                method = getattr(self, '_handle_' + message.name)
1785c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                assert method, 'bad message %s' % repr(message)
1795c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                method(message.src, *message.args)
1805c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        except Queue.Empty:
1815c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            pass
1825c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1835c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
184926b001d589ce2f10facb93dd4b87578ea35a855Torne (Richard Coles)class WorkerException(BaseException):
1855c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    """Raised when we receive an unexpected/unknown exception from a worker."""
1865c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    pass
1875c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1885c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1895c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)class _Message(object):
1905c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def __init__(self, src, message_name, message_args, from_user, logs):
1915c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self.src = src
1925c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self.name = message_name
1935c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self.args = message_args
1945c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self.from_user = from_user
1955c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self.logs = logs
1965c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1975c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def __repr__(self):
1985c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        return '_Message(src=%s, name=%s, args=%s, from_user=%s, logs=%s)' % (self.src, self.name, self.args, self.from_user, self.logs)
1995c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
2005c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
2015c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)class _Worker(multiprocessing.Process):
2025c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def __init__(self, host, messages_to_manager, messages_to_worker, worker_factory, worker_number, running_inline, manager, log_level):
2035c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        super(_Worker, self).__init__()
2045c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self.host = host
2055c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self.worker_number = worker_number
2065c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self.name = 'worker/%d' % worker_number
2075c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self.log_messages = []
2085c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self.log_level = log_level
2091e202183a5dc46166763171984b285173f8585e5Torne (Richard Coles)        self._running = False
2105c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._running_inline = running_inline
2115c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._manager = manager
2125c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
2135c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._messages_to_manager = messages_to_manager
2145c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._messages_to_worker = messages_to_worker
2155c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._worker = worker_factory(self)
2165c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._logger = None
2175c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._log_handler = None
2185c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
2195c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def terminate(self):
2205c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        if self._worker:
2215c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            if hasattr(self._worker, 'stop'):
2225c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                self._worker.stop()
2235c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            self._worker = None
2245c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        if self.is_alive():
2255c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            super(_Worker, self).terminate()
2265c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
2275c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def _close(self):
2285c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        if self._log_handler and self._logger:
2295c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            self._logger.removeHandler(self._log_handler)
2305c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._log_handler = None
2315c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._logger = None
2325c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
2335c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def start(self):
2345c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        if not self._running_inline:
2355c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            super(_Worker, self).start()
2365c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
2375c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def run(self):
2385c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        if not self.host:
2395c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            self.host = Host()
2405c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        if not self._running_inline:
2415c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            self._set_up_logging()
2425c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
2435c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        worker = self._worker
2445c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        exception_msg = ""
2455c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        _log.debug("%s starting" % self.name)
2461e202183a5dc46166763171984b285173f8585e5Torne (Richard Coles)        self._running = True
2475c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
2485c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        try:
2495c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            if hasattr(worker, 'start'):
2505c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                worker.start()
2511e202183a5dc46166763171984b285173f8585e5Torne (Richard Coles)            while self._running:
2525c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                message = self._messages_to_worker.get()
2535c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                if message.from_user:
2545c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                    worker.handle(message.name, message.src, *message.args)
2555c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                    self._yield_to_manager()
2565c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                else:
2575c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                    assert message.name == 'stop', 'bad message %s' % repr(message)
2585c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                    break
2595c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
2605c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            _log.debug("%s exiting" % self.name)
2615c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        except Queue.Empty:
2625c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            assert False, '%s: ran out of messages in worker queue.' % self.name
2635c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        except KeyboardInterrupt, e:
2645c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            self._raise(sys.exc_info())
2655c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        except Exception, e:
2665c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            self._raise(sys.exc_info())
2675c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        finally:
2685c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            try:
2695c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                if hasattr(worker, 'stop'):
2705c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                    worker.stop()
2715c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            finally:
2725c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                self._post(name='done', args=(), from_user=False)
2735c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            self._close()
2745c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
2751e202183a5dc46166763171984b285173f8585e5Torne (Richard Coles)    def stop_running(self):
2761e202183a5dc46166763171984b285173f8585e5Torne (Richard Coles)        self._running = False
2771e202183a5dc46166763171984b285173f8585e5Torne (Richard Coles)
2785c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def post(self, name, *args):
2795c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._post(name, args, from_user=True)
2805c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._yield_to_manager()
2815c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
2825c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def _yield_to_manager(self):
2835c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        if self._running_inline:
2845c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            self._manager._loop(block=False)
2855c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
2865c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def _post(self, name, args, from_user):
2875c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        log_messages = self.log_messages
2885c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self.log_messages = []
2895c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._messages_to_manager.put(_Message(self.name, name, args, from_user, log_messages))
2905c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
2915c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def _raise(self, exc_info):
2925c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        exception_type, exception_value, exception_traceback = exc_info
2935c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        if self._running_inline:
2945c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            raise exception_type, exception_value, exception_traceback
2955c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
2965c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        if exception_type == KeyboardInterrupt:
2975c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            _log.debug("%s: interrupted, exiting" % self.name)
2985c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            stack_utils.log_traceback(_log.debug, exception_traceback)
2995c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        else:
3005c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            _log.error("%s: %s('%s') raised:" % (self.name, exception_value.__class__.__name__, str(exception_value)))
3015c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            stack_utils.log_traceback(_log.error, exception_traceback)
3025c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        # Since tracebacks aren't picklable, send the extracted stack instead.
3035c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        stack = traceback.extract_tb(exception_traceback)
3045c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._post(name='worker_exception', args=(exception_type, exception_value, stack), from_user=False)
3055c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
3065c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def _set_up_logging(self):
3075c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._logger = logging.getLogger()
3085c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
3095c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        # The unix multiprocessing implementation clones any log handlers into the child process,
3105c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        # so we remove them to avoid duplicate logging.
3115c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        for h in self._logger.handlers:
3125c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            self._logger.removeHandler(h)
3135c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
3145c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._log_handler = _WorkerLogHandler(self)
3155c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._logger.addHandler(self._log_handler)
3165c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._logger.setLevel(self.log_level)
3175c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
3185c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
3195c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)class _WorkerLogHandler(logging.Handler):
3205c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def __init__(self, worker):
3215c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        logging.Handler.__init__(self)
3225c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._worker = worker
3235c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self.setLevel(worker.log_level)
3245c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
3255c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def emit(self, record):
3265c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._worker.log_messages.append(record)
327