18d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# Copyright 2014 Google Inc. All rights reserved.
28d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi#
38d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# Licensed under the Apache License, Version 2.0 (the "License");
48d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# you may not use this file except in compliance with the License.
58d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# You may obtain a copy of the License at
68d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi#
78d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi#    http://www.apache.org/licenses/LICENSE-2.0
88d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi#
98d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# Unless required by applicable law or agreed to in writing, software
108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# distributed under the License is distributed on an "AS IS" BASIS,
118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# See the License for the specific language governing permissions and
138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# limitations under the License.
148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport copy
168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport multiprocessing
178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport pickle
188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport traceback
198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom typ.host import Host
218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef make_pool(host, jobs, callback, context, pre_fn, post_fn):
248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    _validate_args(context, pre_fn, post_fn)
258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    if jobs > 1:
268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        return _ProcessPool(host, jobs, callback, context, pre_fn, post_fn)
278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    else:
288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        return _AsyncPool(host, jobs, callback, context, pre_fn, post_fn)
298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiclass _MessageType(object):
328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Request = 'Request'
338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Response = 'Response'
348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Close = 'Close'
358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Done = 'Done'
368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Error = 'Error'
378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    Interrupt = 'Interrupt'
388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    values = [Request, Response, Close, Done, Error, Interrupt]
408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef _validate_args(context, pre_fn, post_fn):
438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    try:
448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        _ = pickle.dumps(context)
458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    except Exception as e:
468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        raise ValueError('context passed to make_pool is not picklable: %s'
478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                         % str(e))
488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    try:
498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        _ = pickle.dumps(pre_fn)
508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    except pickle.PickleError:
518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        raise ValueError('pre_fn passed to make_pool is not picklable')
528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    try:
538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        _ = pickle.dumps(post_fn)
548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    except pickle.PickleError:
558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        raise ValueError('post_fn passed to make_pool is not picklable')
568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiclass _ProcessPool(object):
598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    def __init__(self, host, jobs, callback, context, pre_fn, post_fn):
618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.host = host
628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.jobs = jobs
638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.requests = multiprocessing.Queue()
648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.responses = multiprocessing.Queue()
658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.workers = []
668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.discarded_responses = []
678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.closed = False
688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.erred = False
698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        for worker_num in range(1, jobs + 1):
708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            w = multiprocessing.Process(target=_loop,
718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                                        args=(self.requests, self.responses,
728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                                              host.for_mp(), worker_num,
738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                                              callback, context,
748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                                              pre_fn, post_fn))
758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            w.start()
768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            self.workers.append(w)
778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    def send(self, msg):
798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.requests.put((_MessageType.Request, msg))
808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    def get(self):
828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        msg_type, resp = self.responses.get()
838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        if msg_type == _MessageType.Error:
848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            self._handle_error(resp)
858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        elif msg_type == _MessageType.Interrupt:
868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            raise KeyboardInterrupt
878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        assert msg_type == _MessageType.Response
888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        return resp
898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    def close(self):
918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        for _ in self.workers:
928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            self.requests.put((_MessageType.Close, None))
938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.closed = True
948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    def join(self):
968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # TODO: one would think that we could close self.requests in close(),
978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # above, and close self.responses below, but if we do, we get
988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # weird tracebacks in the daemon threads multiprocessing starts up.
998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # Instead, we have to hack the innards of multiprocessing. It
1008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # seems likely that there's a bug somewhere, either in this module or
1018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # in multiprocessing.
1028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # pylint: disable=protected-access
1038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        if self.host.is_python3:  # pragma: python3
1048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            multiprocessing.queues.is_exiting = lambda: True
1058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        else:  # pragma: python2
1068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            multiprocessing.util._exiting = True
1078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        if not self.closed:
1098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            # We must be aborting; terminate the workers rather than
1108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            # shutting down cleanly.
1118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            for w in self.workers:
1128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                w.terminate()
1138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                w.join()
1148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            return []
1158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        final_responses = []
1178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        error = None
1188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        interrupted = None
1198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        for w in self.workers:
1208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            while True:
1218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                msg_type, resp = self.responses.get()
1228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                if msg_type == _MessageType.Error:
1238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                    error = resp
1248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                    break
1258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                if msg_type == _MessageType.Interrupt:
1268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                    interrupted = True
1278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                    break
1288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                if msg_type == _MessageType.Done:
1298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                    final_responses.append(resp[1])
1308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                    break
1318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                self.discarded_responses.append(resp)
1328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        for w in self.workers:
1348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            w.join()
1358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # TODO: See comment above at the beginning of the function for
1378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # why this is commented out.
1388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        # self.responses.close()
1398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        if error:
1418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            self._handle_error(error)
1428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        if interrupted:
1438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            raise KeyboardInterrupt
1448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        return final_responses
1458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    def _handle_error(self, msg):
1478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        worker_num, tb = msg
1488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.erred = True
1498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        raise Exception("Error from worker %d (traceback follows):\n%s" %
1508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                        (worker_num, tb))
1518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# 'Too many arguments' pylint: disable=R0913
1548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef _loop(requests, responses, host, worker_num,
1568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi          callback, context, pre_fn, post_fn, should_loop=True):
1578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    host = host or Host()
1588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    try:
1598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        context_after_pre = pre_fn(host, worker_num, context)
1608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        keep_looping = True
1618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        while keep_looping:
1628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            message_type, args = requests.get(block=True)
1638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            if message_type == _MessageType.Close:
1648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                responses.put((_MessageType.Done,
1658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                               (worker_num, post_fn(context_after_pre))))
1668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                break
1678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            assert message_type == _MessageType.Request
1688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            resp = callback(context_after_pre, args)
1698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            responses.put((_MessageType.Response, resp))
1708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            keep_looping = should_loop
1718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    except KeyboardInterrupt as e:
1728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        responses.put((_MessageType.Interrupt, (worker_num, str(e))))
1738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    except Exception as e:
1748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        responses.put((_MessageType.Error,
1758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi                       (worker_num, traceback.format_exc(e))))
1768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiclass _AsyncPool(object):
1798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    def __init__(self, host, jobs, callback, context, pre_fn, post_fn):
1818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.host = host or Host()
1828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.jobs = jobs
1838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.callback = callback
1848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.context = copy.deepcopy(context)
1858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.msgs = []
1868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.closed = False
1878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.post_fn = post_fn
1888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.context_after_pre = pre_fn(self.host, 1, self.context)
1898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.final_context = None
1908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    def send(self, msg):
1928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.msgs.append(msg)
1938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    def get(self):
1958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        return self.callback(self.context_after_pre, self.msgs.pop(0))
1968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
1978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    def close(self):
1988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.closed = True
1998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        self.final_context = self.post_fn(self.context_after_pre)
2008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi
2018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi    def join(self):
2028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        if not self.closed:
2038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi            self.close()
2048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi        return [self.final_context]
205