15c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# Copyright (C) 2011 Google Inc. All rights reserved. 25c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# 35c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# Redistribution and use in source and binary forms, with or without 45c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# modification, are permitted provided that the following conditions are 55c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# met: 65c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# 75c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# * Redistributions of source code must retain the above copyright 85c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# notice, this list of conditions and the following disclaimer. 95c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# * Redistributions in binary form must reproduce the above 105c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# copyright notice, this list of conditions and the following disclaimer 115c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# in the documentation and/or other materials provided with the 125c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# distribution. 135c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# * Neither the name of Google Inc. nor the names of its 145c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# contributors may be used to endorse or promote products derived from 155c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# this software without specific prior written permission. 165c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# 175c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 185c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 195c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 205c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 215c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 225c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 235c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 245c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 255c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 265c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 275c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 285c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 295c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)"""Module for handling messages and concurrency for run-webkit-tests 305c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)and test-webkitpy. This module follows the design for multiprocessing.Pool 315c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)and concurrency.futures.ProcessPoolExecutor, with the following differences: 325c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 335c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)* Tasks are executed in stateful subprocesses via objects that implement the 345c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) Worker interface - this allows the workers to share state across tasks. 355c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)* The pool provides an asynchronous event-handling interface so the caller 365c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) may receive events as tasks are processed. 375c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 385c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)If you don't need these features, use multiprocessing.Pool or concurrency.futures 395c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)intead. 405c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 415c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)""" 425c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 435c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)import cPickle 445c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)import logging 455c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)import multiprocessing 465c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)import Queue 475c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)import sys 485c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)import time 495c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)import traceback 505c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 515c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 525c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)from webkitpy.common.host import Host 535c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)from webkitpy.common.system import stack_utils 545c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 555c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 565c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)_log = logging.getLogger(__name__) 575c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 585c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 5919cde67944066db31e633d9e386f2aa9bf9fadb3Torne (Richard Coles)def get(caller, worker_factory, num_workers, host=None): 605c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) """Returns an object that exposes a run() method that takes a list of test shards and runs them in parallel.""" 6119cde67944066db31e633d9e386f2aa9bf9fadb3Torne (Richard Coles) return _MessagePool(caller, worker_factory, num_workers, host) 625c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 635c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 645c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)class _MessagePool(object): 6519cde67944066db31e633d9e386f2aa9bf9fadb3Torne (Richard Coles) def __init__(self, caller, worker_factory, num_workers, host=None): 665c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._caller = caller 675c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._worker_factory = worker_factory 685c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._num_workers = num_workers 695c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._workers = [] 705c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._workers_stopped = set() 715c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._host = host 725c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._name = 'manager' 735c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._running_inline = (self._num_workers == 1) 745c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) if self._running_inline: 755c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._messages_to_worker = Queue.Queue() 765c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._messages_to_manager = Queue.Queue() 775c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) else: 785c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._messages_to_worker = multiprocessing.Queue() 795c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._messages_to_manager = multiprocessing.Queue() 805c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 815c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) def __enter__(self): 825c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) return self 835c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 845c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) def __exit__(self, exc_type, exc_value, exc_traceback): 855c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._close() 865c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) return False 875c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 885c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) def run(self, shards): 895c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) """Posts a list of messages to the pool and waits for them to complete.""" 905c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) for message in shards: 915c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._messages_to_worker.put(_Message(self._name, message[0], message[1:], from_user=True, logs=())) 925c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 935c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) for _ in xrange(self._num_workers): 945c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._messages_to_worker.put(_Message(self._name, 'stop', message_args=(), from_user=False, logs=())) 955c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 965c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self.wait() 975c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 985c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) def _start_workers(self): 995c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) assert not self._workers 1005c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._workers_stopped = set() 1015c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) host = None 1025c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) if self._running_inline or self._can_pickle(self._host): 1035c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) host = self._host 1045c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 1055c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) for worker_number in xrange(self._num_workers): 1065c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) worker = _Worker(host, self._messages_to_manager, self._messages_to_worker, self._worker_factory, worker_number, self._running_inline, self if self._running_inline else None, self._worker_log_level()) 1075c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._workers.append(worker) 1085c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) worker.start() 1095c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 1105c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) def _worker_log_level(self): 1115c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) log_level = logging.NOTSET 1125c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) for handler in logging.root.handlers: 1135c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) if handler.level != logging.NOTSET: 1145c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) if log_level == logging.NOTSET: 1155c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) log_level = handler.level 1165c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) else: 1175c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) log_level = min(log_level, handler.level) 1185c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) return log_level 1195c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 1205c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) def wait(self): 1215c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) try: 1225c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._start_workers() 1235c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) if self._running_inline: 1245c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._workers[0].run() 1255c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._loop(block=False) 1265c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) else: 1275c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._loop(block=True) 1285c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) finally: 1295c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._close() 1305c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 1315c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) def _close(self): 1325c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) for worker in self._workers: 1335c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) if worker.is_alive(): 1345c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) worker.terminate() 1355c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) worker.join() 1365c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._workers = [] 1375c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) if not self._running_inline: 1385c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) # FIXME: This is a hack to get multiprocessing to not log tracebacks during shutdown :(. 1395c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) multiprocessing.util._exiting = True 1405c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) if self._messages_to_worker: 1415c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._messages_to_worker.close() 1425c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._messages_to_worker = None 1435c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) if self._messages_to_manager: 1445c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._messages_to_manager.close() 1455c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._messages_to_manager = None 1465c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 1475c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) def _log_messages(self, messages): 1485c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) for message in messages: 1495c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) logging.root.handle(message) 1505c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 1515c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) def _handle_done(self, source): 1525c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._workers_stopped.add(source) 1535c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 1545c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) @staticmethod 1555c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) def _handle_worker_exception(source, exception_type, exception_value, _): 1565c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) if exception_type == KeyboardInterrupt: 1575c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) raise exception_type(exception_value) 1585c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) raise WorkerException(str(exception_value)) 1595c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 1605c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) def _can_pickle(self, host): 1615c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) try: 1625c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) cPickle.dumps(host) 1635c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) return True 1645c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) except TypeError: 1655c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) return False 1665c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 1675c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) def _loop(self, block): 1685c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) try: 1695c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) while True: 1705c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) if len(self._workers_stopped) == len(self._workers): 1715c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) block = False 1725c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) message = self._messages_to_manager.get(block) 1735c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._log_messages(message.logs) 1745c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) if message.from_user: 1755c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._caller.handle(message.name, message.src, *message.args) 1765c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) continue 1775c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) method = getattr(self, '_handle_' + message.name) 1785c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) assert method, 'bad message %s' % repr(message) 1795c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) method(message.src, *message.args) 1805c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) except Queue.Empty: 1815c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) pass 1825c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 1835c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 184926b001d589ce2f10facb93dd4b87578ea35a855Torne (Richard Coles)class WorkerException(BaseException): 1855c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) """Raised when we receive an unexpected/unknown exception from a worker.""" 1865c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) pass 1875c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 1885c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 1895c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)class _Message(object): 1905c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) def __init__(self, src, message_name, message_args, from_user, logs): 1915c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self.src = src 1925c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self.name = message_name 1935c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self.args = message_args 1945c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self.from_user = from_user 1955c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self.logs = logs 1965c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 1975c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) def __repr__(self): 1985c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) return '_Message(src=%s, name=%s, args=%s, from_user=%s, logs=%s)' % (self.src, self.name, self.args, self.from_user, self.logs) 1995c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 2005c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 2015c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)class _Worker(multiprocessing.Process): 2025c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) def __init__(self, host, messages_to_manager, messages_to_worker, worker_factory, worker_number, running_inline, manager, log_level): 2035c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) super(_Worker, self).__init__() 2045c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self.host = host 2055c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self.worker_number = worker_number 2065c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self.name = 'worker/%d' % worker_number 2075c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self.log_messages = [] 2085c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self.log_level = log_level 2091e202183a5dc46166763171984b285173f8585e5Torne (Richard Coles) self._running = False 2105c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._running_inline = running_inline 2115c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._manager = manager 2125c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 2135c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._messages_to_manager = messages_to_manager 2145c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._messages_to_worker = messages_to_worker 2155c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._worker = worker_factory(self) 2165c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._logger = None 2175c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._log_handler = None 2185c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 2195c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) def terminate(self): 2205c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) if self._worker: 2215c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) if hasattr(self._worker, 'stop'): 2225c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._worker.stop() 2235c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._worker = None 2245c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) if self.is_alive(): 2255c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) super(_Worker, self).terminate() 2265c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 2275c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) def _close(self): 2285c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) if self._log_handler and self._logger: 2295c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._logger.removeHandler(self._log_handler) 2305c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._log_handler = None 2315c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._logger = None 2325c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 2335c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) def start(self): 2345c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) if not self._running_inline: 2355c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) super(_Worker, self).start() 2365c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 2375c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) def run(self): 2385c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) if not self.host: 2395c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self.host = Host() 2405c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) if not self._running_inline: 2415c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._set_up_logging() 2425c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 2435c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) worker = self._worker 2445c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) exception_msg = "" 2455c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) _log.debug("%s starting" % self.name) 2461e202183a5dc46166763171984b285173f8585e5Torne (Richard Coles) self._running = True 2475c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 2485c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) try: 2495c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) if hasattr(worker, 'start'): 2505c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) worker.start() 2511e202183a5dc46166763171984b285173f8585e5Torne (Richard Coles) while self._running: 2525c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) message = self._messages_to_worker.get() 2535c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) if message.from_user: 2545c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) worker.handle(message.name, message.src, *message.args) 2555c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._yield_to_manager() 2565c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) else: 2575c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) assert message.name == 'stop', 'bad message %s' % repr(message) 2585c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) break 2595c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 2605c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) _log.debug("%s exiting" % self.name) 2615c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) except Queue.Empty: 2625c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) assert False, '%s: ran out of messages in worker queue.' % self.name 2635c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) except KeyboardInterrupt, e: 2645c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._raise(sys.exc_info()) 2655c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) except Exception, e: 2665c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._raise(sys.exc_info()) 2675c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) finally: 2685c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) try: 2695c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) if hasattr(worker, 'stop'): 2705c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) worker.stop() 2715c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) finally: 2725c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._post(name='done', args=(), from_user=False) 2735c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._close() 2745c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 2751e202183a5dc46166763171984b285173f8585e5Torne (Richard Coles) def stop_running(self): 2761e202183a5dc46166763171984b285173f8585e5Torne (Richard Coles) self._running = False 2771e202183a5dc46166763171984b285173f8585e5Torne (Richard Coles) 2785c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) def post(self, name, *args): 2795c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._post(name, args, from_user=True) 2805c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._yield_to_manager() 2815c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 2825c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) def _yield_to_manager(self): 2835c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) if self._running_inline: 2845c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._manager._loop(block=False) 2855c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 2865c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) def _post(self, name, args, from_user): 2875c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) log_messages = self.log_messages 2885c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self.log_messages = [] 2895c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._messages_to_manager.put(_Message(self.name, name, args, from_user, log_messages)) 2905c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 2915c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) def _raise(self, exc_info): 2925c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) exception_type, exception_value, exception_traceback = exc_info 2935c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) if self._running_inline: 2945c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) raise exception_type, exception_value, exception_traceback 2955c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 2965c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) if exception_type == KeyboardInterrupt: 2975c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) _log.debug("%s: interrupted, exiting" % self.name) 2985c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) stack_utils.log_traceback(_log.debug, exception_traceback) 2995c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) else: 3005c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) _log.error("%s: %s('%s') raised:" % (self.name, exception_value.__class__.__name__, str(exception_value))) 3015c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) stack_utils.log_traceback(_log.error, exception_traceback) 3025c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) # Since tracebacks aren't picklable, send the extracted stack instead. 3035c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) stack = traceback.extract_tb(exception_traceback) 3045c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._post(name='worker_exception', args=(exception_type, exception_value, stack), from_user=False) 3055c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 3065c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) def _set_up_logging(self): 3075c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._logger = logging.getLogger() 3085c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 3095c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) # The unix multiprocessing implementation clones any log handlers into the child process, 3105c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) # so we remove them to avoid duplicate logging. 3115c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) for h in self._logger.handlers: 3125c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._logger.removeHandler(h) 3135c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 3145c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._log_handler = _WorkerLogHandler(self) 3155c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._logger.addHandler(self._log_handler) 3165c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._logger.setLevel(self.log_level) 3175c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 3185c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 3195c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)class _WorkerLogHandler(logging.Handler): 3205c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) def __init__(self, worker): 3215c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) logging.Handler.__init__(self) 3225c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._worker = worker 3235c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self.setLevel(worker.log_level) 3245c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) 3255c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) def emit(self, record): 3265c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles) self._worker.log_messages.append(record) 327