1import os
2import sys
3import threading
4
5from . import process
6from . import reduction
7
8__all__ = []            # things are copied from here to __init__.py
9
10#
11# Exceptions
12#
13
14class ProcessError(Exception):
15    pass
16
17class BufferTooShort(ProcessError):
18    pass
19
20class TimeoutError(ProcessError):
21    pass
22
23class AuthenticationError(ProcessError):
24    pass
25
26#
27# Base type for contexts
28#
29
30class BaseContext(object):
31
32    ProcessError = ProcessError
33    BufferTooShort = BufferTooShort
34    TimeoutError = TimeoutError
35    AuthenticationError = AuthenticationError
36
37    current_process = staticmethod(process.current_process)
38    active_children = staticmethod(process.active_children)
39
40    def cpu_count(self):
41        '''Returns the number of CPUs in the system'''
42        num = os.cpu_count()
43        if num is None:
44            raise NotImplementedError('cannot determine number of cpus')
45        else:
46            return num
47
48    def Manager(self):
49        '''Returns a manager associated with a running server process
50
51        The managers methods such as `Lock()`, `Condition()` and `Queue()`
52        can be used to create shared objects.
53        '''
54        from .managers import SyncManager
55        m = SyncManager(ctx=self.get_context())
56        m.start()
57        return m
58
59    def Pipe(self, duplex=True):
60        '''Returns two connection object connected by a pipe'''
61        from .connection import Pipe
62        return Pipe(duplex)
63
64    def Lock(self):
65        '''Returns a non-recursive lock object'''
66        from .synchronize import Lock
67        return Lock(ctx=self.get_context())
68
69    def RLock(self):
70        '''Returns a recursive lock object'''
71        from .synchronize import RLock
72        return RLock(ctx=self.get_context())
73
74    def Condition(self, lock=None):
75        '''Returns a condition object'''
76        from .synchronize import Condition
77        return Condition(lock, ctx=self.get_context())
78
79    def Semaphore(self, value=1):
80        '''Returns a semaphore object'''
81        from .synchronize import Semaphore
82        return Semaphore(value, ctx=self.get_context())
83
84    def BoundedSemaphore(self, value=1):
85        '''Returns a bounded semaphore object'''
86        from .synchronize import BoundedSemaphore
87        return BoundedSemaphore(value, ctx=self.get_context())
88
89    def Event(self):
90        '''Returns an event object'''
91        from .synchronize import Event
92        return Event(ctx=self.get_context())
93
94    def Barrier(self, parties, action=None, timeout=None):
95        '''Returns a barrier object'''
96        from .synchronize import Barrier
97        return Barrier(parties, action, timeout, ctx=self.get_context())
98
99    def Queue(self, maxsize=0):
100        '''Returns a queue object'''
101        from .queues import Queue
102        return Queue(maxsize, ctx=self.get_context())
103
104    def JoinableQueue(self, maxsize=0):
105        '''Returns a queue object'''
106        from .queues import JoinableQueue
107        return JoinableQueue(maxsize, ctx=self.get_context())
108
109    def SimpleQueue(self):
110        '''Returns a queue object'''
111        from .queues import SimpleQueue
112        return SimpleQueue(ctx=self.get_context())
113
114    def Pool(self, processes=None, initializer=None, initargs=(),
115             maxtasksperchild=None):
116        '''Returns a process pool object'''
117        from .pool import Pool
118        return Pool(processes, initializer, initargs, maxtasksperchild,
119                    context=self.get_context())
120
121    def RawValue(self, typecode_or_type, *args):
122        '''Returns a shared object'''
123        from .sharedctypes import RawValue
124        return RawValue(typecode_or_type, *args)
125
126    def RawArray(self, typecode_or_type, size_or_initializer):
127        '''Returns a shared array'''
128        from .sharedctypes import RawArray
129        return RawArray(typecode_or_type, size_or_initializer)
130
131    def Value(self, typecode_or_type, *args, lock=True):
132        '''Returns a synchronized shared object'''
133        from .sharedctypes import Value
134        return Value(typecode_or_type, *args, lock=lock,
135                     ctx=self.get_context())
136
137    def Array(self, typecode_or_type, size_or_initializer, *, lock=True):
138        '''Returns a synchronized shared array'''
139        from .sharedctypes import Array
140        return Array(typecode_or_type, size_or_initializer, lock=lock,
141                     ctx=self.get_context())
142
143    def freeze_support(self):
144        '''Check whether this is a fake forked process in a frozen executable.
145        If so then run code specified by commandline and exit.
146        '''
147        if sys.platform == 'win32' and getattr(sys, 'frozen', False):
148            from .spawn import freeze_support
149            freeze_support()
150
151    def get_logger(self):
152        '''Return package logger -- if it does not already exist then
153        it is created.
154        '''
155        from .util import get_logger
156        return get_logger()
157
158    def log_to_stderr(self, level=None):
159        '''Turn on logging and add a handler which prints to stderr'''
160        from .util import log_to_stderr
161        return log_to_stderr(level)
162
163    def allow_connection_pickling(self):
164        '''Install support for sending connections and sockets
165        between processes
166        '''
167        # This is undocumented.  In previous versions of multiprocessing
168        # its only effect was to make socket objects inheritable on Windows.
169        from . import connection
170
171    def set_executable(self, executable):
172        '''Sets the path to a python.exe or pythonw.exe binary used to run
173        child processes instead of sys.executable when using the 'spawn'
174        start method.  Useful for people embedding Python.
175        '''
176        from .spawn import set_executable
177        set_executable(executable)
178
179    def set_forkserver_preload(self, module_names):
180        '''Set list of module names to try to load in forkserver process.
181        This is really just a hint.
182        '''
183        from .forkserver import set_forkserver_preload
184        set_forkserver_preload(module_names)
185
186    def get_context(self, method=None):
187        if method is None:
188            return self
189        try:
190            ctx = _concrete_contexts[method]
191        except KeyError:
192            raise ValueError('cannot find context for %r' % method)
193        ctx._check_available()
194        return ctx
195
196    def get_start_method(self, allow_none=False):
197        return self._name
198
199    def set_start_method(self, method, force=False):
200        raise ValueError('cannot set start method of concrete context')
201
202    @property
203    def reducer(self):
204        '''Controls how objects will be reduced to a form that can be
205        shared with other processes.'''
206        return globals().get('reduction')
207
208    @reducer.setter
209    def reducer(self, reduction):
210        globals()['reduction'] = reduction
211
212    def _check_available(self):
213        pass
214
215#
216# Type of default context -- underlying context can be set at most once
217#
218
219class Process(process.BaseProcess):
220    _start_method = None
221    @staticmethod
222    def _Popen(process_obj):
223        return _default_context.get_context().Process._Popen(process_obj)
224
225class DefaultContext(BaseContext):
226    Process = Process
227
228    def __init__(self, context):
229        self._default_context = context
230        self._actual_context = None
231
232    def get_context(self, method=None):
233        if method is None:
234            if self._actual_context is None:
235                self._actual_context = self._default_context
236            return self._actual_context
237        else:
238            return super().get_context(method)
239
240    def set_start_method(self, method, force=False):
241        if self._actual_context is not None and not force:
242            raise RuntimeError('context has already been set')
243        if method is None and force:
244            self._actual_context = None
245            return
246        self._actual_context = self.get_context(method)
247
248    def get_start_method(self, allow_none=False):
249        if self._actual_context is None:
250            if allow_none:
251                return None
252            self._actual_context = self._default_context
253        return self._actual_context._name
254
255    def get_all_start_methods(self):
256        if sys.platform == 'win32':
257            return ['spawn']
258        else:
259            if reduction.HAVE_SEND_HANDLE:
260                return ['fork', 'spawn', 'forkserver']
261            else:
262                return ['fork', 'spawn']
263
264DefaultContext.__all__ = list(x for x in dir(DefaultContext) if x[0] != '_')
265
266#
267# Context types for fixed start method
268#
269
270if sys.platform != 'win32':
271
272    class ForkProcess(process.BaseProcess):
273        _start_method = 'fork'
274        @staticmethod
275        def _Popen(process_obj):
276            from .popen_fork import Popen
277            return Popen(process_obj)
278
279    class SpawnProcess(process.BaseProcess):
280        _start_method = 'spawn'
281        @staticmethod
282        def _Popen(process_obj):
283            from .popen_spawn_posix import Popen
284            return Popen(process_obj)
285
286    class ForkServerProcess(process.BaseProcess):
287        _start_method = 'forkserver'
288        @staticmethod
289        def _Popen(process_obj):
290            from .popen_forkserver import Popen
291            return Popen(process_obj)
292
293    class ForkContext(BaseContext):
294        _name = 'fork'
295        Process = ForkProcess
296
297    class SpawnContext(BaseContext):
298        _name = 'spawn'
299        Process = SpawnProcess
300
301    class ForkServerContext(BaseContext):
302        _name = 'forkserver'
303        Process = ForkServerProcess
304        def _check_available(self):
305            if not reduction.HAVE_SEND_HANDLE:
306                raise ValueError('forkserver start method not available')
307
308    _concrete_contexts = {
309        'fork': ForkContext(),
310        'spawn': SpawnContext(),
311        'forkserver': ForkServerContext(),
312    }
313    _default_context = DefaultContext(_concrete_contexts['fork'])
314
315else:
316
317    class SpawnProcess(process.BaseProcess):
318        _start_method = 'spawn'
319        @staticmethod
320        def _Popen(process_obj):
321            from .popen_spawn_win32 import Popen
322            return Popen(process_obj)
323
324    class SpawnContext(BaseContext):
325        _name = 'spawn'
326        Process = SpawnProcess
327
328    _concrete_contexts = {
329        'spawn': SpawnContext(),
330    }
331    _default_context = DefaultContext(_concrete_contexts['spawn'])
332
333#
334# Force the start method
335#
336
337def _force_start_method(method):
338    _default_context._actual_context = _concrete_contexts[method]
339
340#
341# Check that the current thread is spawning a child process
342#
343
344_tls = threading.local()
345
346def get_spawning_popen():
347    return getattr(_tls, 'spawning_popen', None)
348
349def set_spawning_popen(popen):
350    _tls.spawning_popen = popen
351
352def assert_spawning(obj):
353    if get_spawning_popen() is None:
354        raise RuntimeError(
355            '%s objects should only be shared between processes'
356            ' through inheritance' % type(obj).__name__
357            )
358