1#
2# Module for starting a process object using os.fork() or CreateProcess()
3#
4# multiprocessing/forking.py
5#
6# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14#    notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16#    notice, this list of conditions and the following disclaimer in the
17#    documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19#    used to endorse or promote products derived from this software
20#    without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
33#
34
35import os
36import sys
37import signal
38import errno
39
40from multiprocessing import util, process
41
42__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler']
43
44#
45# Check that the current thread is spawning a child process
46#
47
48def assert_spawning(self):
49    if not Popen.thread_is_spawning():
50        raise RuntimeError(
51            '%s objects should only be shared between processes'
52            ' through inheritance' % type(self).__name__
53            )
54
55#
56# Try making some callable types picklable
57#
58
59from pickle import Pickler
60class ForkingPickler(Pickler):
61    dispatch = Pickler.dispatch.copy()
62
63    @classmethod
64    def register(cls, type, reduce):
65        def dispatcher(self, obj):
66            rv = reduce(obj)
67            self.save_reduce(obj=obj, *rv)
68        cls.dispatch[type] = dispatcher
69
70def _reduce_method(m):
71    if m.im_self is None:
72        return getattr, (m.im_class, m.im_func.func_name)
73    else:
74        return getattr, (m.im_self, m.im_func.func_name)
75ForkingPickler.register(type(ForkingPickler.save), _reduce_method)
76
77def _reduce_method_descriptor(m):
78    return getattr, (m.__objclass__, m.__name__)
79ForkingPickler.register(type(list.append), _reduce_method_descriptor)
80ForkingPickler.register(type(int.__add__), _reduce_method_descriptor)
81
82#def _reduce_builtin_function_or_method(m):
83#    return getattr, (m.__self__, m.__name__)
84#ForkingPickler.register(type(list().append), _reduce_builtin_function_or_method)
85#ForkingPickler.register(type(int().__add__), _reduce_builtin_function_or_method)
86
87try:
88    from functools import partial
89except ImportError:
90    pass
91else:
92    def _reduce_partial(p):
93        return _rebuild_partial, (p.func, p.args, p.keywords or {})
94    def _rebuild_partial(func, args, keywords):
95        return partial(func, *args, **keywords)
96    ForkingPickler.register(partial, _reduce_partial)
97
98#
99# Unix
100#
101
102if sys.platform != 'win32':
103    import time
104
105    exit = os._exit
106    duplicate = os.dup
107    close = os.close
108
109    #
110    # We define a Popen class similar to the one from subprocess, but
111    # whose constructor takes a process object as its argument.
112    #
113
114    class Popen(object):
115
116        def __init__(self, process_obj):
117            sys.stdout.flush()
118            sys.stderr.flush()
119            self.returncode = None
120
121            self.pid = os.fork()
122            if self.pid == 0:
123                if 'random' in sys.modules:
124                    import random
125                    random.seed()
126                code = process_obj._bootstrap()
127                sys.stdout.flush()
128                sys.stderr.flush()
129                os._exit(code)
130
131        def poll(self, flag=os.WNOHANG):
132            if self.returncode is None:
133                while True:
134                    try:
135                        pid, sts = os.waitpid(self.pid, flag)
136                    except os.error as e:
137                        if e.errno == errno.EINTR:
138                            continue
139                        # Child process not yet created. See #1731717
140                        # e.errno == errno.ECHILD == 10
141                        return None
142                    else:
143                        break
144                if pid == self.pid:
145                    if os.WIFSIGNALED(sts):
146                        self.returncode = -os.WTERMSIG(sts)
147                    else:
148                        assert os.WIFEXITED(sts)
149                        self.returncode = os.WEXITSTATUS(sts)
150            return self.returncode
151
152        def wait(self, timeout=None):
153            if timeout is None:
154                return self.poll(0)
155            deadline = time.time() + timeout
156            delay = 0.0005
157            while 1:
158                res = self.poll()
159                if res is not None:
160                    break
161                remaining = deadline - time.time()
162                if remaining <= 0:
163                    break
164                delay = min(delay * 2, remaining, 0.05)
165                time.sleep(delay)
166            return res
167
168        def terminate(self):
169            if self.returncode is None:
170                try:
171                    os.kill(self.pid, signal.SIGTERM)
172                except OSError, e:
173                    if self.wait(timeout=0.1) is None:
174                        raise
175
176        @staticmethod
177        def thread_is_spawning():
178            return False
179
180#
181# Windows
182#
183
184else:
185    import thread
186    import msvcrt
187    import _subprocess
188    import time
189
190    from _multiprocessing import win32, Connection, PipeConnection
191    from .util import Finalize
192
193    #try:
194    #    from cPickle import dump, load, HIGHEST_PROTOCOL
195    #except ImportError:
196    from pickle import load, HIGHEST_PROTOCOL
197
198    def dump(obj, file, protocol=None):
199        ForkingPickler(file, protocol).dump(obj)
200
201    #
202    #
203    #
204
205    TERMINATE = 0x10000
206    WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
207    WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
208
209    exit = win32.ExitProcess
210    close = win32.CloseHandle
211
212    #
213    # _python_exe is the assumed path to the python executable.
214    # People embedding Python want to modify it.
215    #
216
217    if WINSERVICE:
218        _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
219    else:
220        _python_exe = sys.executable
221
222    def set_executable(exe):
223        global _python_exe
224        _python_exe = exe
225
226    #
227    #
228    #
229
230    def duplicate(handle, target_process=None, inheritable=False):
231        if target_process is None:
232            target_process = _subprocess.GetCurrentProcess()
233        return _subprocess.DuplicateHandle(
234            _subprocess.GetCurrentProcess(), handle, target_process,
235            0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS
236            ).Detach()
237
238    #
239    # We define a Popen class similar to the one from subprocess, but
240    # whose constructor takes a process object as its argument.
241    #
242
243    class Popen(object):
244        '''
245        Start a subprocess to run the code of a process object
246        '''
247        _tls = thread._local()
248
249        def __init__(self, process_obj):
250            # create pipe for communication with child
251            rfd, wfd = os.pipe()
252
253            # get handle for read end of the pipe and make it inheritable
254            rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
255            os.close(rfd)
256
257            # start process
258            cmd = get_command_line() + [rhandle]
259            cmd = ' '.join('"%s"' % x for x in cmd)
260            hp, ht, pid, tid = _subprocess.CreateProcess(
261                _python_exe, cmd, None, None, 1, 0, None, None, None
262                )
263            ht.Close()
264            close(rhandle)
265
266            # set attributes of self
267            self.pid = pid
268            self.returncode = None
269            self._handle = hp
270
271            # send information to child
272            prep_data = get_preparation_data(process_obj._name)
273            to_child = os.fdopen(wfd, 'wb')
274            Popen._tls.process_handle = int(hp)
275            try:
276                dump(prep_data, to_child, HIGHEST_PROTOCOL)
277                dump(process_obj, to_child, HIGHEST_PROTOCOL)
278            finally:
279                del Popen._tls.process_handle
280                to_child.close()
281
282        @staticmethod
283        def thread_is_spawning():
284            return getattr(Popen._tls, 'process_handle', None) is not None
285
286        @staticmethod
287        def duplicate_for_child(handle):
288            return duplicate(handle, Popen._tls.process_handle)
289
290        def wait(self, timeout=None):
291            if self.returncode is None:
292                if timeout is None:
293                    msecs = _subprocess.INFINITE
294                else:
295                    msecs = max(0, int(timeout * 1000 + 0.5))
296
297                res = _subprocess.WaitForSingleObject(int(self._handle), msecs)
298                if res == _subprocess.WAIT_OBJECT_0:
299                    code = _subprocess.GetExitCodeProcess(self._handle)
300                    if code == TERMINATE:
301                        code = -signal.SIGTERM
302                    self.returncode = code
303
304            return self.returncode
305
306        def poll(self):
307            return self.wait(timeout=0)
308
309        def terminate(self):
310            if self.returncode is None:
311                try:
312                    _subprocess.TerminateProcess(int(self._handle), TERMINATE)
313                except WindowsError:
314                    if self.wait(timeout=0.1) is None:
315                        raise
316
317    #
318    #
319    #
320
321    def is_forking(argv):
322        '''
323        Return whether commandline indicates we are forking
324        '''
325        if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
326            assert len(argv) == 3
327            return True
328        else:
329            return False
330
331
332    def freeze_support():
333        '''
334        Run code for process object if this in not the main process
335        '''
336        if is_forking(sys.argv):
337            main()
338            sys.exit()
339
340
341    def get_command_line():
342        '''
343        Returns prefix of command line used for spawning a child process
344        '''
345        if getattr(process.current_process(), '_inheriting', False):
346            raise RuntimeError('''
347            Attempt to start a new process before the current process
348            has finished its bootstrapping phase.
349
350            This probably means that you are on Windows and you have
351            forgotten to use the proper idiom in the main module:
352
353                if __name__ == '__main__':
354                    freeze_support()
355                    ...
356
357            The "freeze_support()" line can be omitted if the program
358            is not going to be frozen to produce a Windows executable.''')
359
360        if getattr(sys, 'frozen', False):
361            return [sys.executable, '--multiprocessing-fork']
362        else:
363            prog = 'from multiprocessing.forking import main; main()'
364            opts = util._args_from_interpreter_flags()
365            return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork']
366
367
368    def main():
369        '''
370        Run code specifed by data received over pipe
371        '''
372        assert is_forking(sys.argv)
373
374        handle = int(sys.argv[-1])
375        fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
376        from_parent = os.fdopen(fd, 'rb')
377
378        process.current_process()._inheriting = True
379        preparation_data = load(from_parent)
380        prepare(preparation_data)
381        self = load(from_parent)
382        process.current_process()._inheriting = False
383
384        from_parent.close()
385
386        exitcode = self._bootstrap()
387        exit(exitcode)
388
389
390    def get_preparation_data(name):
391        '''
392        Return info about parent needed by child to unpickle process object
393        '''
394        from .util import _logger, _log_to_stderr
395
396        d = dict(
397            name=name,
398            sys_path=sys.path,
399            sys_argv=sys.argv,
400            log_to_stderr=_log_to_stderr,
401            orig_dir=process.ORIGINAL_DIR,
402            authkey=process.current_process().authkey,
403            )
404
405        if _logger is not None:
406            d['log_level'] = _logger.getEffectiveLevel()
407
408        if not WINEXE and not WINSERVICE:
409            main_path = getattr(sys.modules['__main__'], '__file__', None)
410            if not main_path and sys.argv[0] not in ('', '-c'):
411                main_path = sys.argv[0]
412            if main_path is not None:
413                if not os.path.isabs(main_path) and \
414                                          process.ORIGINAL_DIR is not None:
415                    main_path = os.path.join(process.ORIGINAL_DIR, main_path)
416                d['main_path'] = os.path.normpath(main_path)
417
418        return d
419
420    #
421    # Make (Pipe)Connection picklable
422    #
423
424    def reduce_connection(conn):
425        if not Popen.thread_is_spawning():
426            raise RuntimeError(
427                'By default %s objects can only be shared between processes\n'
428                'using inheritance' % type(conn).__name__
429                )
430        return type(conn), (Popen.duplicate_for_child(conn.fileno()),
431                            conn.readable, conn.writable)
432
433    ForkingPickler.register(Connection, reduce_connection)
434    ForkingPickler.register(PipeConnection, reduce_connection)
435
436#
437# Prepare current process
438#
439
440old_main_modules = []
441
442def prepare(data):
443    '''
444    Try to get current process ready to unpickle process object
445    '''
446    old_main_modules.append(sys.modules['__main__'])
447
448    if 'name' in data:
449        process.current_process().name = data['name']
450
451    if 'authkey' in data:
452        process.current_process()._authkey = data['authkey']
453
454    if 'log_to_stderr' in data and data['log_to_stderr']:
455        util.log_to_stderr()
456
457    if 'log_level' in data:
458        util.get_logger().setLevel(data['log_level'])
459
460    if 'sys_path' in data:
461        sys.path = data['sys_path']
462
463    if 'sys_argv' in data:
464        sys.argv = data['sys_argv']
465
466    if 'dir' in data:
467        os.chdir(data['dir'])
468
469    if 'orig_dir' in data:
470        process.ORIGINAL_DIR = data['orig_dir']
471
472    if 'main_path' in data:
473        main_path = data['main_path']
474        main_name = os.path.splitext(os.path.basename(main_path))[0]
475        if main_name == '__init__':
476            main_name = os.path.basename(os.path.dirname(main_path))
477
478        if main_name != 'ipython':
479            import imp
480
481            if main_path is None:
482                dirs = None
483            elif os.path.basename(main_path).startswith('__init__.py'):
484                dirs = [os.path.dirname(os.path.dirname(main_path))]
485            else:
486                dirs = [os.path.dirname(main_path)]
487
488            assert main_name not in sys.modules, main_name
489            file, path_name, etc = imp.find_module(main_name, dirs)
490            try:
491                # We would like to do "imp.load_module('__main__', ...)"
492                # here.  However, that would cause 'if __name__ ==
493                # "__main__"' clauses to be executed.
494                main_module = imp.load_module(
495                    '__parents_main__', file, path_name, etc
496                    )
497            finally:
498                if file:
499                    file.close()
500
501            sys.modules['__main__'] = main_module
502            main_module.__name__ = '__main__'
503
504            # Try to make the potentially picklable objects in
505            # sys.modules['__main__'] realize they are in the main
506            # module -- somewhat ugly.
507            for obj in main_module.__dict__.values():
508                try:
509                    if obj.__module__ == '__parents_main__':
510                        obj.__module__ = '__main__'
511                except Exception:
512                    pass
513