util.py revision ebdcd859e59ed16a79dea94291c0be3a87640a08
1# 2# Module providing various facilities to other parts of the package 3# 4# multiprocessing/util.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# Licensed to PSF under a Contributor Agreement. 8# 9 10import sys 11import functools 12import itertools 13import weakref 14import atexit 15import threading # we want threading to install it's 16 # cleanup function before multiprocessing does 17from subprocess import _args_from_interpreter_flags 18 19from multiprocessing.process import current_process, active_children 20 21__all__ = [ 22 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger', 23 'log_to_stderr', 'get_temp_dir', 'register_after_fork', 24 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal', 25 'SUBDEBUG', 'SUBWARNING', 26 ] 27 28# 29# Logging 30# 31 32NOTSET = 0 33SUBDEBUG = 5 34DEBUG = 10 35INFO = 20 36SUBWARNING = 25 37 38LOGGER_NAME = 'multiprocessing' 39DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s' 40 41_logger = None 42_log_to_stderr = False 43 44def sub_debug(msg, *args): 45 if _logger: 46 _logger.log(SUBDEBUG, msg, *args) 47 48def debug(msg, *args): 49 if _logger: 50 _logger.log(DEBUG, msg, *args) 51 52def info(msg, *args): 53 if _logger: 54 _logger.log(INFO, msg, *args) 55 56def sub_warning(msg, *args): 57 if _logger: 58 _logger.log(SUBWARNING, msg, *args) 59 60def get_logger(): 61 ''' 62 Returns logger used by multiprocessing 63 ''' 64 global _logger 65 import logging 66 67 logging._acquireLock() 68 try: 69 if not _logger: 70 71 _logger = logging.getLogger(LOGGER_NAME) 72 _logger.propagate = 0 73 logging.addLevelName(SUBDEBUG, 'SUBDEBUG') 74 logging.addLevelName(SUBWARNING, 'SUBWARNING') 75 76 # XXX multiprocessing should cleanup before logging 77 if hasattr(atexit, 'unregister'): 78 atexit.unregister(_exit_function) 79 atexit.register(_exit_function) 80 else: 81 atexit._exithandlers.remove((_exit_function, (), {})) 82 atexit._exithandlers.append((_exit_function, (), {})) 83 84 finally: 85 logging._releaseLock() 86 87 return _logger 88 89def log_to_stderr(level=None): 90 ''' 91 Turn on logging and add a handler which prints to stderr 92 ''' 93 global _log_to_stderr 94 import logging 95 96 logger = get_logger() 97 formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT) 98 handler = logging.StreamHandler() 99 handler.setFormatter(formatter) 100 logger.addHandler(handler) 101 102 if level: 103 logger.setLevel(level) 104 _log_to_stderr = True 105 return _logger 106 107# 108# Function returning a temp directory which will be removed on exit 109# 110 111def get_temp_dir(): 112 # get name of a temp directory which will be automatically cleaned up 113 if current_process()._tempdir is None: 114 import shutil, tempfile 115 tempdir = tempfile.mkdtemp(prefix='pymp-') 116 info('created temp directory %s', tempdir) 117 Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100) 118 current_process()._tempdir = tempdir 119 return current_process()._tempdir 120 121# 122# Support for reinitialization of objects when bootstrapping a child process 123# 124 125_afterfork_registry = weakref.WeakValueDictionary() 126_afterfork_counter = itertools.count() 127 128def _run_after_forkers(): 129 items = list(_afterfork_registry.items()) 130 items.sort() 131 for (index, ident, func), obj in items: 132 try: 133 func(obj) 134 except Exception as e: 135 info('after forker raised exception %s', e) 136 137def register_after_fork(obj, func): 138 _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj 139 140# 141# Finalization using weakrefs 142# 143 144_finalizer_registry = {} 145_finalizer_counter = itertools.count() 146 147 148class Finalize(object): 149 ''' 150 Class which supports object finalization using weakrefs 151 ''' 152 def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): 153 assert exitpriority is None or type(exitpriority) is int 154 155 if obj is not None: 156 self._weakref = weakref.ref(obj, self) 157 else: 158 assert exitpriority is not None 159 160 self._callback = callback 161 self._args = args 162 self._kwargs = kwargs or {} 163 self._key = (exitpriority, next(_finalizer_counter)) 164 165 _finalizer_registry[self._key] = self 166 167 def __call__(self, wr=None, 168 # Need to bind these locally because the globals can have 169 # been cleared at shutdown 170 _finalizer_registry=_finalizer_registry, 171 sub_debug=sub_debug): 172 ''' 173 Run the callback unless it has already been called or cancelled 174 ''' 175 try: 176 del _finalizer_registry[self._key] 177 except KeyError: 178 sub_debug('finalizer no longer registered') 179 else: 180 sub_debug('finalizer calling %s with args %s and kwargs %s', 181 self._callback, self._args, self._kwargs) 182 res = self._callback(*self._args, **self._kwargs) 183 self._weakref = self._callback = self._args = \ 184 self._kwargs = self._key = None 185 return res 186 187 def cancel(self): 188 ''' 189 Cancel finalization of the object 190 ''' 191 try: 192 del _finalizer_registry[self._key] 193 except KeyError: 194 pass 195 else: 196 self._weakref = self._callback = self._args = \ 197 self._kwargs = self._key = None 198 199 def still_active(self): 200 ''' 201 Return whether this finalizer is still waiting to invoke callback 202 ''' 203 return self._key in _finalizer_registry 204 205 def __repr__(self): 206 try: 207 obj = self._weakref() 208 except (AttributeError, TypeError): 209 obj = None 210 211 if obj is None: 212 return '<Finalize object, dead>' 213 214 x = '<Finalize object, callback=%s' % \ 215 getattr(self._callback, '__name__', self._callback) 216 if self._args: 217 x += ', args=' + str(self._args) 218 if self._kwargs: 219 x += ', kwargs=' + str(self._kwargs) 220 if self._key[0] is not None: 221 x += ', exitprority=' + str(self._key[0]) 222 return x + '>' 223 224 225def _run_finalizers(minpriority=None): 226 ''' 227 Run all finalizers whose exit priority is not None and at least minpriority 228 229 Finalizers with highest priority are called first; finalizers with 230 the same priority will be called in reverse order of creation. 231 ''' 232 if minpriority is None: 233 f = lambda p : p[0][0] is not None 234 else: 235 f = lambda p : p[0][0] is not None and p[0][0] >= minpriority 236 237 items = [x for x in list(_finalizer_registry.items()) if f(x)] 238 items.sort(reverse=True) 239 240 for key, finalizer in items: 241 sub_debug('calling %s', finalizer) 242 try: 243 finalizer() 244 except Exception: 245 import traceback 246 traceback.print_exc() 247 248 if minpriority is None: 249 _finalizer_registry.clear() 250 251# 252# Clean up on exit 253# 254 255def is_exiting(): 256 ''' 257 Returns true if the process is shutting down 258 ''' 259 return _exiting or _exiting is None 260 261_exiting = False 262 263def _exit_function(): 264 global _exiting 265 266 info('process shutting down') 267 debug('running all "atexit" finalizers with priority >= 0') 268 _run_finalizers(0) 269 270 for p in active_children(): 271 if p._daemonic: 272 info('calling terminate() for daemon %s', p.name) 273 p._popen.terminate() 274 275 for p in active_children(): 276 info('calling join() for process %s', p.name) 277 p.join() 278 279 debug('running the remaining "atexit" finalizers') 280 _run_finalizers() 281 282atexit.register(_exit_function) 283 284# 285# Some fork aware types 286# 287 288class ForkAwareThreadLock(object): 289 def __init__(self): 290 self._lock = threading.Lock() 291 self.acquire = self._lock.acquire 292 self.release = self._lock.release 293 register_after_fork(self, ForkAwareThreadLock.__init__) 294 295class ForkAwareLocal(threading.local): 296 def __init__(self): 297 register_after_fork(self, lambda obj : obj.__dict__.clear()) 298 def __reduce__(self): 299 return type(self), () 300 301