18d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# Copyright 2014 Google Inc. All rights reserved. 28d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# 38d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# Licensed under the Apache License, Version 2.0 (the "License"); 48d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# you may not use this file except in compliance with the License. 58d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# You may obtain a copy of the License at 68d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# 78d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# http://www.apache.org/licenses/LICENSE-2.0 88d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# 98d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# Unless required by applicable law or agreed to in writing, software 108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# distributed under the License is distributed on an "AS IS" BASIS, 118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# See the License for the specific language governing permissions and 138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# limitations under the License. 148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport copy 168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport multiprocessing 178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport pickle 188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiimport traceback 198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoifrom typ.host import Host 218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef make_pool(host, jobs, callback, context, pre_fn, post_fn): 248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi _validate_args(context, pre_fn, post_fn) 258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if jobs > 1: 268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return _ProcessPool(host, jobs, callback, context, pre_fn, post_fn) 278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: 288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return _AsyncPool(host, jobs, callback, context, pre_fn, post_fn) 298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiclass _MessageType(object): 328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Request = 'Request' 338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Response = 'Response' 348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Close = 'Close' 358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Done = 'Done' 368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Error = 'Error' 378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi Interrupt = 'Interrupt' 388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi values = [Request, Response, Close, Done, Error, Interrupt] 408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef _validate_args(context, pre_fn, post_fn): 438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi try: 448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi _ = pickle.dumps(context) 458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi except Exception as e: 468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise ValueError('context passed to make_pool is not picklable: %s' 478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi % str(e)) 488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi try: 498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi _ = pickle.dumps(pre_fn) 508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi except pickle.PickleError: 518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise ValueError('pre_fn passed to make_pool is not picklable') 528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi try: 538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi _ = pickle.dumps(post_fn) 548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi except pickle.PickleError: 558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise ValueError('post_fn passed to make_pool is not picklable') 568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiclass _ProcessPool(object): 598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def __init__(self, host, jobs, callback, context, pre_fn, post_fn): 618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.host = host 628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.jobs = jobs 638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.requests = multiprocessing.Queue() 648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.responses = multiprocessing.Queue() 658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.workers = [] 668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.discarded_responses = [] 678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.closed = False 688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.erred = False 698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi for worker_num in range(1, jobs + 1): 708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi w = multiprocessing.Process(target=_loop, 718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi args=(self.requests, self.responses, 728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi host.for_mp(), worker_num, 738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi callback, context, 748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi pre_fn, post_fn)) 758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi w.start() 768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.workers.append(w) 778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def send(self, msg): 798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.requests.put((_MessageType.Request, msg)) 808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def get(self): 828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi msg_type, resp = self.responses.get() 838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if msg_type == _MessageType.Error: 848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self._handle_error(resp) 858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi elif msg_type == _MessageType.Interrupt: 868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise KeyboardInterrupt 878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi assert msg_type == _MessageType.Response 888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return resp 898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def close(self): 918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi for _ in self.workers: 928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.requests.put((_MessageType.Close, None)) 938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.closed = True 948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def join(self): 968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # TODO: one would think that we could close self.requests in close(), 978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # above, and close self.responses below, but if we do, we get 988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # weird tracebacks in the daemon threads multiprocessing starts up. 998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # Instead, we have to hack the innards of multiprocessing. It 1008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # seems likely that there's a bug somewhere, either in this module or 1018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # in multiprocessing. 1028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # pylint: disable=protected-access 1038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if self.host.is_python3: # pragma: python3 1048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi multiprocessing.queues.is_exiting = lambda: True 1058d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi else: # pragma: python2 1068d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi multiprocessing.util._exiting = True 1078d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1088d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if not self.closed: 1098d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # We must be aborting; terminate the workers rather than 1108d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # shutting down cleanly. 1118d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi for w in self.workers: 1128d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi w.terminate() 1138d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi w.join() 1148d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return [] 1158d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1168d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi final_responses = [] 1178d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi error = None 1188d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi interrupted = None 1198d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi for w in self.workers: 1208d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi while True: 1218d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi msg_type, resp = self.responses.get() 1228d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if msg_type == _MessageType.Error: 1238d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi error = resp 1248d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi break 1258d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if msg_type == _MessageType.Interrupt: 1268d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi interrupted = True 1278d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi break 1288d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if msg_type == _MessageType.Done: 1298d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi final_responses.append(resp[1]) 1308d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi break 1318d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.discarded_responses.append(resp) 1328d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1338d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi for w in self.workers: 1348d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi w.join() 1358d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1368d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # TODO: See comment above at the beginning of the function for 1378d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # why this is commented out. 1388d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi # self.responses.close() 1398d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1408d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if error: 1418d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self._handle_error(error) 1428d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if interrupted: 1438d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise KeyboardInterrupt 1448d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return final_responses 1458d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1468d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def _handle_error(self, msg): 1478d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi worker_num, tb = msg 1488d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.erred = True 1498d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi raise Exception("Error from worker %d (traceback follows):\n%s" % 1508d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi (worker_num, tb)) 1518d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1528d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1538d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi# 'Too many arguments' pylint: disable=R0913 1548d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1558d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoidef _loop(requests, responses, host, worker_num, 1568d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi callback, context, pre_fn, post_fn, should_loop=True): 1578d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi host = host or Host() 1588d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi try: 1598d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi context_after_pre = pre_fn(host, worker_num, context) 1608d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi keep_looping = True 1618d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi while keep_looping: 1628d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi message_type, args = requests.get(block=True) 1638d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if message_type == _MessageType.Close: 1648d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi responses.put((_MessageType.Done, 1658d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi (worker_num, post_fn(context_after_pre)))) 1668d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi break 1678d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi assert message_type == _MessageType.Request 1688d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi resp = callback(context_after_pre, args) 1698d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi responses.put((_MessageType.Response, resp)) 1708d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi keep_looping = should_loop 1718d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi except KeyboardInterrupt as e: 1728d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi responses.put((_MessageType.Interrupt, (worker_num, str(e)))) 1738d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi except Exception as e: 1748d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi responses.put((_MessageType.Error, 1758d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi (worker_num, traceback.format_exc(e)))) 1768d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1778d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1788d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoiclass _AsyncPool(object): 1798d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1808d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def __init__(self, host, jobs, callback, context, pre_fn, post_fn): 1818d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.host = host or Host() 1828d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.jobs = jobs 1838d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.callback = callback 1848d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.context = copy.deepcopy(context) 1858d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.msgs = [] 1868d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.closed = False 1878d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.post_fn = post_fn 1888d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.context_after_pre = pre_fn(self.host, 1, self.context) 1898d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.final_context = None 1908d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1918d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def send(self, msg): 1928d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.msgs.append(msg) 1938d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1948d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def get(self): 1958d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return self.callback(self.context_after_pre, self.msgs.pop(0)) 1968d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 1978d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def close(self): 1988d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.closed = True 1998d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.final_context = self.post_fn(self.context_after_pre) 2008d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi 2018d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi def join(self): 2028d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi if not self.closed: 2038d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi self.close() 2048d2b206a675ec20ea07100c35df34e65ee1e45e8Ruchi Kandoi return [self.final_context] 205