1#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
6import queue as pyqueue
7import time
8import io
9import itertools
10import sys
11import os
12import gc
13import errno
14import signal
15import array
16import socket
17import random
18import logging
19import struct
20import operator
21import test.support
22import test.support.script_helper
23
24
25# Skip tests if _multiprocessing wasn't built.
26_multiprocessing = test.support.import_module('_multiprocessing')
27# Skip tests if sem_open implementation is broken.
28test.support.import_module('multiprocessing.synchronize')
29# import threading after _multiprocessing to raise a more relevant error
30# message: "No module named _multiprocessing". _multiprocessing is not compiled
31# without thread support.
32import threading
33
34import multiprocessing.dummy
35import multiprocessing.connection
36import multiprocessing.managers
37import multiprocessing.heap
38import multiprocessing.pool
39
40from multiprocessing import util
41
42try:
43    from multiprocessing import reduction
44    HAS_REDUCTION = reduction.HAVE_SEND_HANDLE
45except ImportError:
46    HAS_REDUCTION = False
47
48try:
49    from multiprocessing.sharedctypes import Value, copy
50    HAS_SHAREDCTYPES = True
51except ImportError:
52    HAS_SHAREDCTYPES = False
53
54try:
55    import msvcrt
56except ImportError:
57    msvcrt = None
58
59#
60#
61#
62
63def latin(s):
64    return s.encode('latin')
65
66#
67# Constants
68#
69
70LOG_LEVEL = util.SUBWARNING
71#LOG_LEVEL = logging.DEBUG
72
73DELTA = 0.1
74CHECK_TIMINGS = False     # making true makes tests take a lot longer
75                          # and can sometimes cause some non-serious
76                          # failures because some calls block a bit
77                          # longer than expected
78if CHECK_TIMINGS:
79    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
80else:
81    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
82
83HAVE_GETVALUE = not getattr(_multiprocessing,
84                            'HAVE_BROKEN_SEM_GETVALUE', False)
85
86WIN32 = (sys.platform == "win32")
87
88from multiprocessing.connection import wait
89
90def wait_for_handle(handle, timeout):
91    if timeout is not None and timeout < 0.0:
92        timeout = None
93    return wait([handle], timeout)
94
95try:
96    MAXFD = os.sysconf("SC_OPEN_MAX")
97except:
98    MAXFD = 256
99
100# To speed up tests when using the forkserver, we can preload these:
101PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver']
102
103#
104# Some tests require ctypes
105#
106
107try:
108    from ctypes import Structure, c_int, c_double
109except ImportError:
110    Structure = object
111    c_int = c_double = None
112
113
114def check_enough_semaphores():
115    """Check that the system supports enough semaphores to run the test."""
116    # minimum number of semaphores available according to POSIX
117    nsems_min = 256
118    try:
119        nsems = os.sysconf("SC_SEM_NSEMS_MAX")
120    except (AttributeError, ValueError):
121        # sysconf not available or setting not available
122        return
123    if nsems == -1 or nsems >= nsems_min:
124        return
125    raise unittest.SkipTest("The OS doesn't support enough semaphores "
126                            "to run the test (required: %d)." % nsems_min)
127
128
129#
130# Creates a wrapper for a function which records the time it takes to finish
131#
132
133class TimingWrapper(object):
134
135    def __init__(self, func):
136        self.func = func
137        self.elapsed = None
138
139    def __call__(self, *args, **kwds):
140        t = time.time()
141        try:
142            return self.func(*args, **kwds)
143        finally:
144            self.elapsed = time.time() - t
145
146#
147# Base class for test cases
148#
149
150class BaseTestCase(object):
151
152    ALLOWED_TYPES = ('processes', 'manager', 'threads')
153
154    def assertTimingAlmostEqual(self, a, b):
155        if CHECK_TIMINGS:
156            self.assertAlmostEqual(a, b, 1)
157
158    def assertReturnsIfImplemented(self, value, func, *args):
159        try:
160            res = func(*args)
161        except NotImplementedError:
162            pass
163        else:
164            return self.assertEqual(value, res)
165
166    # For the sanity of Windows users, rather than crashing or freezing in
167    # multiple ways.
168    def __reduce__(self, *args):
169        raise NotImplementedError("shouldn't try to pickle a test case")
170
171    __reduce_ex__ = __reduce__
172
173#
174# Return the value of a semaphore
175#
176
177def get_value(self):
178    try:
179        return self.get_value()
180    except AttributeError:
181        try:
182            return self._Semaphore__value
183        except AttributeError:
184            try:
185                return self._value
186            except AttributeError:
187                raise NotImplementedError
188
189#
190# Testcases
191#
192
193class _TestProcess(BaseTestCase):
194
195    ALLOWED_TYPES = ('processes', 'threads')
196
197    def test_current(self):
198        if self.TYPE == 'threads':
199            self.skipTest('test not appropriate for {}'.format(self.TYPE))
200
201        current = self.current_process()
202        authkey = current.authkey
203
204        self.assertTrue(current.is_alive())
205        self.assertTrue(not current.daemon)
206        self.assertIsInstance(authkey, bytes)
207        self.assertTrue(len(authkey) > 0)
208        self.assertEqual(current.ident, os.getpid())
209        self.assertEqual(current.exitcode, None)
210
211    def test_daemon_argument(self):
212        if self.TYPE == "threads":
213            self.skipTest('test not appropriate for {}'.format(self.TYPE))
214
215        # By default uses the current process's daemon flag.
216        proc0 = self.Process(target=self._test)
217        self.assertEqual(proc0.daemon, self.current_process().daemon)
218        proc1 = self.Process(target=self._test, daemon=True)
219        self.assertTrue(proc1.daemon)
220        proc2 = self.Process(target=self._test, daemon=False)
221        self.assertFalse(proc2.daemon)
222
223    @classmethod
224    def _test(cls, q, *args, **kwds):
225        current = cls.current_process()
226        q.put(args)
227        q.put(kwds)
228        q.put(current.name)
229        if cls.TYPE != 'threads':
230            q.put(bytes(current.authkey))
231            q.put(current.pid)
232
233    def test_process(self):
234        q = self.Queue(1)
235        e = self.Event()
236        args = (q, 1, 2)
237        kwargs = {'hello':23, 'bye':2.54}
238        name = 'SomeProcess'
239        p = self.Process(
240            target=self._test, args=args, kwargs=kwargs, name=name
241            )
242        p.daemon = True
243        current = self.current_process()
244
245        if self.TYPE != 'threads':
246            self.assertEqual(p.authkey, current.authkey)
247        self.assertEqual(p.is_alive(), False)
248        self.assertEqual(p.daemon, True)
249        self.assertNotIn(p, self.active_children())
250        self.assertTrue(type(self.active_children()) is list)
251        self.assertEqual(p.exitcode, None)
252
253        p.start()
254
255        self.assertEqual(p.exitcode, None)
256        self.assertEqual(p.is_alive(), True)
257        self.assertIn(p, self.active_children())
258
259        self.assertEqual(q.get(), args[1:])
260        self.assertEqual(q.get(), kwargs)
261        self.assertEqual(q.get(), p.name)
262        if self.TYPE != 'threads':
263            self.assertEqual(q.get(), current.authkey)
264            self.assertEqual(q.get(), p.pid)
265
266        p.join()
267
268        self.assertEqual(p.exitcode, 0)
269        self.assertEqual(p.is_alive(), False)
270        self.assertNotIn(p, self.active_children())
271
272    @classmethod
273    def _test_terminate(cls):
274        time.sleep(100)
275
276    def test_terminate(self):
277        if self.TYPE == 'threads':
278            self.skipTest('test not appropriate for {}'.format(self.TYPE))
279
280        p = self.Process(target=self._test_terminate)
281        p.daemon = True
282        p.start()
283
284        self.assertEqual(p.is_alive(), True)
285        self.assertIn(p, self.active_children())
286        self.assertEqual(p.exitcode, None)
287
288        join = TimingWrapper(p.join)
289
290        self.assertEqual(join(0), None)
291        self.assertTimingAlmostEqual(join.elapsed, 0.0)
292        self.assertEqual(p.is_alive(), True)
293
294        self.assertEqual(join(-1), None)
295        self.assertTimingAlmostEqual(join.elapsed, 0.0)
296        self.assertEqual(p.is_alive(), True)
297
298        # XXX maybe terminating too soon causes the problems on Gentoo...
299        time.sleep(1)
300
301        p.terminate()
302
303        if hasattr(signal, 'alarm'):
304            # On the Gentoo buildbot waitpid() often seems to block forever.
305            # We use alarm() to interrupt it if it blocks for too long.
306            def handler(*args):
307                raise RuntimeError('join took too long: %s' % p)
308            old_handler = signal.signal(signal.SIGALRM, handler)
309            try:
310                signal.alarm(10)
311                self.assertEqual(join(), None)
312            finally:
313                signal.alarm(0)
314                signal.signal(signal.SIGALRM, old_handler)
315        else:
316            self.assertEqual(join(), None)
317
318        self.assertTimingAlmostEqual(join.elapsed, 0.0)
319
320        self.assertEqual(p.is_alive(), False)
321        self.assertNotIn(p, self.active_children())
322
323        p.join()
324
325        # XXX sometimes get p.exitcode == 0 on Windows ...
326        #self.assertEqual(p.exitcode, -signal.SIGTERM)
327
328    def test_cpu_count(self):
329        try:
330            cpus = multiprocessing.cpu_count()
331        except NotImplementedError:
332            cpus = 1
333        self.assertTrue(type(cpus) is int)
334        self.assertTrue(cpus >= 1)
335
336    def test_active_children(self):
337        self.assertEqual(type(self.active_children()), list)
338
339        p = self.Process(target=time.sleep, args=(DELTA,))
340        self.assertNotIn(p, self.active_children())
341
342        p.daemon = True
343        p.start()
344        self.assertIn(p, self.active_children())
345
346        p.join()
347        self.assertNotIn(p, self.active_children())
348
349    @classmethod
350    def _test_recursion(cls, wconn, id):
351        wconn.send(id)
352        if len(id) < 2:
353            for i in range(2):
354                p = cls.Process(
355                    target=cls._test_recursion, args=(wconn, id+[i])
356                    )
357                p.start()
358                p.join()
359
360    def test_recursion(self):
361        rconn, wconn = self.Pipe(duplex=False)
362        self._test_recursion(wconn, [])
363
364        time.sleep(DELTA)
365        result = []
366        while rconn.poll():
367            result.append(rconn.recv())
368
369        expected = [
370            [],
371              [0],
372                [0, 0],
373                [0, 1],
374              [1],
375                [1, 0],
376                [1, 1]
377            ]
378        self.assertEqual(result, expected)
379
380    @classmethod
381    def _test_sentinel(cls, event):
382        event.wait(10.0)
383
384    def test_sentinel(self):
385        if self.TYPE == "threads":
386            self.skipTest('test not appropriate for {}'.format(self.TYPE))
387        event = self.Event()
388        p = self.Process(target=self._test_sentinel, args=(event,))
389        with self.assertRaises(ValueError):
390            p.sentinel
391        p.start()
392        self.addCleanup(p.join)
393        sentinel = p.sentinel
394        self.assertIsInstance(sentinel, int)
395        self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
396        event.set()
397        p.join()
398        self.assertTrue(wait_for_handle(sentinel, timeout=1))
399
400#
401#
402#
403
404class _UpperCaser(multiprocessing.Process):
405
406    def __init__(self):
407        multiprocessing.Process.__init__(self)
408        self.child_conn, self.parent_conn = multiprocessing.Pipe()
409
410    def run(self):
411        self.parent_conn.close()
412        for s in iter(self.child_conn.recv, None):
413            self.child_conn.send(s.upper())
414        self.child_conn.close()
415
416    def submit(self, s):
417        assert type(s) is str
418        self.parent_conn.send(s)
419        return self.parent_conn.recv()
420
421    def stop(self):
422        self.parent_conn.send(None)
423        self.parent_conn.close()
424        self.child_conn.close()
425
426class _TestSubclassingProcess(BaseTestCase):
427
428    ALLOWED_TYPES = ('processes',)
429
430    def test_subclassing(self):
431        uppercaser = _UpperCaser()
432        uppercaser.daemon = True
433        uppercaser.start()
434        self.assertEqual(uppercaser.submit('hello'), 'HELLO')
435        self.assertEqual(uppercaser.submit('world'), 'WORLD')
436        uppercaser.stop()
437        uppercaser.join()
438
439    def test_stderr_flush(self):
440        # sys.stderr is flushed at process shutdown (issue #13812)
441        if self.TYPE == "threads":
442            self.skipTest('test not appropriate for {}'.format(self.TYPE))
443
444        testfn = test.support.TESTFN
445        self.addCleanup(test.support.unlink, testfn)
446        proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
447        proc.start()
448        proc.join()
449        with open(testfn, 'r') as f:
450            err = f.read()
451            # The whole traceback was printed
452            self.assertIn("ZeroDivisionError", err)
453            self.assertIn("test_multiprocessing.py", err)
454            self.assertIn("1/0 # MARKER", err)
455
456    @classmethod
457    def _test_stderr_flush(cls, testfn):
458        fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
459        sys.stderr = open(fd, 'w', closefd=False)
460        1/0 # MARKER
461
462
463    @classmethod
464    def _test_sys_exit(cls, reason, testfn):
465        fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
466        sys.stderr = open(fd, 'w', closefd=False)
467        sys.exit(reason)
468
469    def test_sys_exit(self):
470        # See Issue 13854
471        if self.TYPE == 'threads':
472            self.skipTest('test not appropriate for {}'.format(self.TYPE))
473
474        testfn = test.support.TESTFN
475        self.addCleanup(test.support.unlink, testfn)
476
477        for reason in (
478            [1, 2, 3],
479            'ignore this',
480        ):
481            p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
482            p.daemon = True
483            p.start()
484            p.join(5)
485            self.assertEqual(p.exitcode, 1)
486
487            with open(testfn, 'r') as f:
488                content = f.read()
489            self.assertEqual(content.rstrip(), str(reason))
490
491            os.unlink(testfn)
492
493        for reason in (True, False, 8):
494            p = self.Process(target=sys.exit, args=(reason,))
495            p.daemon = True
496            p.start()
497            p.join(5)
498            self.assertEqual(p.exitcode, reason)
499
500#
501#
502#
503
504def queue_empty(q):
505    if hasattr(q, 'empty'):
506        return q.empty()
507    else:
508        return q.qsize() == 0
509
510def queue_full(q, maxsize):
511    if hasattr(q, 'full'):
512        return q.full()
513    else:
514        return q.qsize() == maxsize
515
516
517class _TestQueue(BaseTestCase):
518
519
520    @classmethod
521    def _test_put(cls, queue, child_can_start, parent_can_continue):
522        child_can_start.wait()
523        for i in range(6):
524            queue.get()
525        parent_can_continue.set()
526
527    def test_put(self):
528        MAXSIZE = 6
529        queue = self.Queue(maxsize=MAXSIZE)
530        child_can_start = self.Event()
531        parent_can_continue = self.Event()
532
533        proc = self.Process(
534            target=self._test_put,
535            args=(queue, child_can_start, parent_can_continue)
536            )
537        proc.daemon = True
538        proc.start()
539
540        self.assertEqual(queue_empty(queue), True)
541        self.assertEqual(queue_full(queue, MAXSIZE), False)
542
543        queue.put(1)
544        queue.put(2, True)
545        queue.put(3, True, None)
546        queue.put(4, False)
547        queue.put(5, False, None)
548        queue.put_nowait(6)
549
550        # the values may be in buffer but not yet in pipe so sleep a bit
551        time.sleep(DELTA)
552
553        self.assertEqual(queue_empty(queue), False)
554        self.assertEqual(queue_full(queue, MAXSIZE), True)
555
556        put = TimingWrapper(queue.put)
557        put_nowait = TimingWrapper(queue.put_nowait)
558
559        self.assertRaises(pyqueue.Full, put, 7, False)
560        self.assertTimingAlmostEqual(put.elapsed, 0)
561
562        self.assertRaises(pyqueue.Full, put, 7, False, None)
563        self.assertTimingAlmostEqual(put.elapsed, 0)
564
565        self.assertRaises(pyqueue.Full, put_nowait, 7)
566        self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
567
568        self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
569        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
570
571        self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
572        self.assertTimingAlmostEqual(put.elapsed, 0)
573
574        self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
575        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
576
577        child_can_start.set()
578        parent_can_continue.wait()
579
580        self.assertEqual(queue_empty(queue), True)
581        self.assertEqual(queue_full(queue, MAXSIZE), False)
582
583        proc.join()
584
585    @classmethod
586    def _test_get(cls, queue, child_can_start, parent_can_continue):
587        child_can_start.wait()
588        #queue.put(1)
589        queue.put(2)
590        queue.put(3)
591        queue.put(4)
592        queue.put(5)
593        parent_can_continue.set()
594
595    def test_get(self):
596        queue = self.Queue()
597        child_can_start = self.Event()
598        parent_can_continue = self.Event()
599
600        proc = self.Process(
601            target=self._test_get,
602            args=(queue, child_can_start, parent_can_continue)
603            )
604        proc.daemon = True
605        proc.start()
606
607        self.assertEqual(queue_empty(queue), True)
608
609        child_can_start.set()
610        parent_can_continue.wait()
611
612        time.sleep(DELTA)
613        self.assertEqual(queue_empty(queue), False)
614
615        # Hangs unexpectedly, remove for now
616        #self.assertEqual(queue.get(), 1)
617        self.assertEqual(queue.get(True, None), 2)
618        self.assertEqual(queue.get(True), 3)
619        self.assertEqual(queue.get(timeout=1), 4)
620        self.assertEqual(queue.get_nowait(), 5)
621
622        self.assertEqual(queue_empty(queue), True)
623
624        get = TimingWrapper(queue.get)
625        get_nowait = TimingWrapper(queue.get_nowait)
626
627        self.assertRaises(pyqueue.Empty, get, False)
628        self.assertTimingAlmostEqual(get.elapsed, 0)
629
630        self.assertRaises(pyqueue.Empty, get, False, None)
631        self.assertTimingAlmostEqual(get.elapsed, 0)
632
633        self.assertRaises(pyqueue.Empty, get_nowait)
634        self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
635
636        self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
637        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
638
639        self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
640        self.assertTimingAlmostEqual(get.elapsed, 0)
641
642        self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
643        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
644
645        proc.join()
646
647    @classmethod
648    def _test_fork(cls, queue):
649        for i in range(10, 20):
650            queue.put(i)
651        # note that at this point the items may only be buffered, so the
652        # process cannot shutdown until the feeder thread has finished
653        # pushing items onto the pipe.
654
655    def test_fork(self):
656        # Old versions of Queue would fail to create a new feeder
657        # thread for a forked process if the original process had its
658        # own feeder thread.  This test checks that this no longer
659        # happens.
660
661        queue = self.Queue()
662
663        # put items on queue so that main process starts a feeder thread
664        for i in range(10):
665            queue.put(i)
666
667        # wait to make sure thread starts before we fork a new process
668        time.sleep(DELTA)
669
670        # fork process
671        p = self.Process(target=self._test_fork, args=(queue,))
672        p.daemon = True
673        p.start()
674
675        # check that all expected items are in the queue
676        for i in range(20):
677            self.assertEqual(queue.get(), i)
678        self.assertRaises(pyqueue.Empty, queue.get, False)
679
680        p.join()
681
682    def test_qsize(self):
683        q = self.Queue()
684        try:
685            self.assertEqual(q.qsize(), 0)
686        except NotImplementedError:
687            self.skipTest('qsize method not implemented')
688        q.put(1)
689        self.assertEqual(q.qsize(), 1)
690        q.put(5)
691        self.assertEqual(q.qsize(), 2)
692        q.get()
693        self.assertEqual(q.qsize(), 1)
694        q.get()
695        self.assertEqual(q.qsize(), 0)
696
697    @classmethod
698    def _test_task_done(cls, q):
699        for obj in iter(q.get, None):
700            time.sleep(DELTA)
701            q.task_done()
702
703    def test_task_done(self):
704        queue = self.JoinableQueue()
705
706        workers = [self.Process(target=self._test_task_done, args=(queue,))
707                   for i in range(4)]
708
709        for p in workers:
710            p.daemon = True
711            p.start()
712
713        for i in range(10):
714            queue.put(i)
715
716        queue.join()
717
718        for p in workers:
719            queue.put(None)
720
721        for p in workers:
722            p.join()
723
724    def test_no_import_lock_contention(self):
725        with test.support.temp_cwd():
726            module_name = 'imported_by_an_imported_module'
727            with open(module_name + '.py', 'w') as f:
728                f.write("""if 1:
729                    import multiprocessing
730
731                    q = multiprocessing.Queue()
732                    q.put('knock knock')
733                    q.get(timeout=3)
734                    q.close()
735                    del q
736                """)
737
738            with test.support.DirsOnSysPath(os.getcwd()):
739                try:
740                    __import__(module_name)
741                except pyqueue.Empty:
742                    self.fail("Probable regression on import lock contention;"
743                              " see Issue #22853")
744
745    def test_timeout(self):
746        q = multiprocessing.Queue()
747        start = time.time()
748        self.assertRaises(pyqueue.Empty, q.get, True, 0.200)
749        delta = time.time() - start
750        # Tolerate a delta of 30 ms because of the bad clock resolution on
751        # Windows (usually 15.6 ms)
752        self.assertGreaterEqual(delta, 0.170)
753
754#
755#
756#
757
758class _TestLock(BaseTestCase):
759
760    def test_lock(self):
761        lock = self.Lock()
762        self.assertEqual(lock.acquire(), True)
763        self.assertEqual(lock.acquire(False), False)
764        self.assertEqual(lock.release(), None)
765        self.assertRaises((ValueError, threading.ThreadError), lock.release)
766
767    def test_rlock(self):
768        lock = self.RLock()
769        self.assertEqual(lock.acquire(), True)
770        self.assertEqual(lock.acquire(), True)
771        self.assertEqual(lock.acquire(), True)
772        self.assertEqual(lock.release(), None)
773        self.assertEqual(lock.release(), None)
774        self.assertEqual(lock.release(), None)
775        self.assertRaises((AssertionError, RuntimeError), lock.release)
776
777    def test_lock_context(self):
778        with self.Lock():
779            pass
780
781
782class _TestSemaphore(BaseTestCase):
783
784    def _test_semaphore(self, sem):
785        self.assertReturnsIfImplemented(2, get_value, sem)
786        self.assertEqual(sem.acquire(), True)
787        self.assertReturnsIfImplemented(1, get_value, sem)
788        self.assertEqual(sem.acquire(), True)
789        self.assertReturnsIfImplemented(0, get_value, sem)
790        self.assertEqual(sem.acquire(False), False)
791        self.assertReturnsIfImplemented(0, get_value, sem)
792        self.assertEqual(sem.release(), None)
793        self.assertReturnsIfImplemented(1, get_value, sem)
794        self.assertEqual(sem.release(), None)
795        self.assertReturnsIfImplemented(2, get_value, sem)
796
797    def test_semaphore(self):
798        sem = self.Semaphore(2)
799        self._test_semaphore(sem)
800        self.assertEqual(sem.release(), None)
801        self.assertReturnsIfImplemented(3, get_value, sem)
802        self.assertEqual(sem.release(), None)
803        self.assertReturnsIfImplemented(4, get_value, sem)
804
805    def test_bounded_semaphore(self):
806        sem = self.BoundedSemaphore(2)
807        self._test_semaphore(sem)
808        # Currently fails on OS/X
809        #if HAVE_GETVALUE:
810        #    self.assertRaises(ValueError, sem.release)
811        #    self.assertReturnsIfImplemented(2, get_value, sem)
812
813    def test_timeout(self):
814        if self.TYPE != 'processes':
815            self.skipTest('test not appropriate for {}'.format(self.TYPE))
816
817        sem = self.Semaphore(0)
818        acquire = TimingWrapper(sem.acquire)
819
820        self.assertEqual(acquire(False), False)
821        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
822
823        self.assertEqual(acquire(False, None), False)
824        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
825
826        self.assertEqual(acquire(False, TIMEOUT1), False)
827        self.assertTimingAlmostEqual(acquire.elapsed, 0)
828
829        self.assertEqual(acquire(True, TIMEOUT2), False)
830        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
831
832        self.assertEqual(acquire(timeout=TIMEOUT3), False)
833        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
834
835
836class _TestCondition(BaseTestCase):
837
838    @classmethod
839    def f(cls, cond, sleeping, woken, timeout=None):
840        cond.acquire()
841        sleeping.release()
842        cond.wait(timeout)
843        woken.release()
844        cond.release()
845
846    def check_invariant(self, cond):
847        # this is only supposed to succeed when there are no sleepers
848        if self.TYPE == 'processes':
849            try:
850                sleepers = (cond._sleeping_count.get_value() -
851                            cond._woken_count.get_value())
852                self.assertEqual(sleepers, 0)
853                self.assertEqual(cond._wait_semaphore.get_value(), 0)
854            except NotImplementedError:
855                pass
856
857    def test_notify(self):
858        cond = self.Condition()
859        sleeping = self.Semaphore(0)
860        woken = self.Semaphore(0)
861
862        p = self.Process(target=self.f, args=(cond, sleeping, woken))
863        p.daemon = True
864        p.start()
865
866        p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
867        p.daemon = True
868        p.start()
869
870        # wait for both children to start sleeping
871        sleeping.acquire()
872        sleeping.acquire()
873
874        # check no process/thread has woken up
875        time.sleep(DELTA)
876        self.assertReturnsIfImplemented(0, get_value, woken)
877
878        # wake up one process/thread
879        cond.acquire()
880        cond.notify()
881        cond.release()
882
883        # check one process/thread has woken up
884        time.sleep(DELTA)
885        self.assertReturnsIfImplemented(1, get_value, woken)
886
887        # wake up another
888        cond.acquire()
889        cond.notify()
890        cond.release()
891
892        # check other has woken up
893        time.sleep(DELTA)
894        self.assertReturnsIfImplemented(2, get_value, woken)
895
896        # check state is not mucked up
897        self.check_invariant(cond)
898        p.join()
899
900    def test_notify_all(self):
901        cond = self.Condition()
902        sleeping = self.Semaphore(0)
903        woken = self.Semaphore(0)
904
905        # start some threads/processes which will timeout
906        for i in range(3):
907            p = self.Process(target=self.f,
908                             args=(cond, sleeping, woken, TIMEOUT1))
909            p.daemon = True
910            p.start()
911
912            t = threading.Thread(target=self.f,
913                                 args=(cond, sleeping, woken, TIMEOUT1))
914            t.daemon = True
915            t.start()
916
917        # wait for them all to sleep
918        for i in range(6):
919            sleeping.acquire()
920
921        # check they have all timed out
922        for i in range(6):
923            woken.acquire()
924        self.assertReturnsIfImplemented(0, get_value, woken)
925
926        # check state is not mucked up
927        self.check_invariant(cond)
928
929        # start some more threads/processes
930        for i in range(3):
931            p = self.Process(target=self.f, args=(cond, sleeping, woken))
932            p.daemon = True
933            p.start()
934
935            t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
936            t.daemon = True
937            t.start()
938
939        # wait for them to all sleep
940        for i in range(6):
941            sleeping.acquire()
942
943        # check no process/thread has woken up
944        time.sleep(DELTA)
945        self.assertReturnsIfImplemented(0, get_value, woken)
946
947        # wake them all up
948        cond.acquire()
949        cond.notify_all()
950        cond.release()
951
952        # check they have all woken
953        for i in range(10):
954            try:
955                if get_value(woken) == 6:
956                    break
957            except NotImplementedError:
958                break
959            time.sleep(DELTA)
960        self.assertReturnsIfImplemented(6, get_value, woken)
961
962        # check state is not mucked up
963        self.check_invariant(cond)
964
965    def test_timeout(self):
966        cond = self.Condition()
967        wait = TimingWrapper(cond.wait)
968        cond.acquire()
969        res = wait(TIMEOUT1)
970        cond.release()
971        self.assertEqual(res, False)
972        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
973
974    @classmethod
975    def _test_waitfor_f(cls, cond, state):
976        with cond:
977            state.value = 0
978            cond.notify()
979            result = cond.wait_for(lambda : state.value==4)
980            if not result or state.value != 4:
981                sys.exit(1)
982
983    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
984    def test_waitfor(self):
985        # based on test in test/lock_tests.py
986        cond = self.Condition()
987        state = self.Value('i', -1)
988
989        p = self.Process(target=self._test_waitfor_f, args=(cond, state))
990        p.daemon = True
991        p.start()
992
993        with cond:
994            result = cond.wait_for(lambda : state.value==0)
995            self.assertTrue(result)
996            self.assertEqual(state.value, 0)
997
998        for i in range(4):
999            time.sleep(0.01)
1000            with cond:
1001                state.value += 1
1002                cond.notify()
1003
1004        p.join(5)
1005        self.assertFalse(p.is_alive())
1006        self.assertEqual(p.exitcode, 0)
1007
1008    @classmethod
1009    def _test_waitfor_timeout_f(cls, cond, state, success, sem):
1010        sem.release()
1011        with cond:
1012            expected = 0.1
1013            dt = time.time()
1014            result = cond.wait_for(lambda : state.value==4, timeout=expected)
1015            dt = time.time() - dt
1016            # borrow logic in assertTimeout() from test/lock_tests.py
1017            if not result and expected * 0.6 < dt < expected * 10.0:
1018                success.value = True
1019
1020    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1021    def test_waitfor_timeout(self):
1022        # based on test in test/lock_tests.py
1023        cond = self.Condition()
1024        state = self.Value('i', 0)
1025        success = self.Value('i', False)
1026        sem = self.Semaphore(0)
1027
1028        p = self.Process(target=self._test_waitfor_timeout_f,
1029                         args=(cond, state, success, sem))
1030        p.daemon = True
1031        p.start()
1032        self.assertTrue(sem.acquire(timeout=10))
1033
1034        # Only increment 3 times, so state == 4 is never reached.
1035        for i in range(3):
1036            time.sleep(0.01)
1037            with cond:
1038                state.value += 1
1039                cond.notify()
1040
1041        p.join(5)
1042        self.assertTrue(success.value)
1043
1044    @classmethod
1045    def _test_wait_result(cls, c, pid):
1046        with c:
1047            c.notify()
1048        time.sleep(1)
1049        if pid is not None:
1050            os.kill(pid, signal.SIGINT)
1051
1052    def test_wait_result(self):
1053        if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1054            pid = os.getpid()
1055        else:
1056            pid = None
1057
1058        c = self.Condition()
1059        with c:
1060            self.assertFalse(c.wait(0))
1061            self.assertFalse(c.wait(0.1))
1062
1063            p = self.Process(target=self._test_wait_result, args=(c, pid))
1064            p.start()
1065
1066            self.assertTrue(c.wait(10))
1067            if pid is not None:
1068                self.assertRaises(KeyboardInterrupt, c.wait, 10)
1069
1070            p.join()
1071
1072
1073class _TestEvent(BaseTestCase):
1074
1075    @classmethod
1076    def _test_event(cls, event):
1077        time.sleep(TIMEOUT2)
1078        event.set()
1079
1080    def test_event(self):
1081        event = self.Event()
1082        wait = TimingWrapper(event.wait)
1083
1084        # Removed temporarily, due to API shear, this does not
1085        # work with threading._Event objects. is_set == isSet
1086        self.assertEqual(event.is_set(), False)
1087
1088        # Removed, threading.Event.wait() will return the value of the __flag
1089        # instead of None. API Shear with the semaphore backed mp.Event
1090        self.assertEqual(wait(0.0), False)
1091        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1092        self.assertEqual(wait(TIMEOUT1), False)
1093        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1094
1095        event.set()
1096
1097        # See note above on the API differences
1098        self.assertEqual(event.is_set(), True)
1099        self.assertEqual(wait(), True)
1100        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1101        self.assertEqual(wait(TIMEOUT1), True)
1102        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1103        # self.assertEqual(event.is_set(), True)
1104
1105        event.clear()
1106
1107        #self.assertEqual(event.is_set(), False)
1108
1109        p = self.Process(target=self._test_event, args=(event,))
1110        p.daemon = True
1111        p.start()
1112        self.assertEqual(wait(), True)
1113
1114#
1115# Tests for Barrier - adapted from tests in test/lock_tests.py
1116#
1117
1118# Many of the tests for threading.Barrier use a list as an atomic
1119# counter: a value is appended to increment the counter, and the
1120# length of the list gives the value.  We use the class DummyList
1121# for the same purpose.
1122
1123class _DummyList(object):
1124
1125    def __init__(self):
1126        wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1127        lock = multiprocessing.Lock()
1128        self.__setstate__((wrapper, lock))
1129        self._lengthbuf[0] = 0
1130
1131    def __setstate__(self, state):
1132        (self._wrapper, self._lock) = state
1133        self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1134
1135    def __getstate__(self):
1136        return (self._wrapper, self._lock)
1137
1138    def append(self, _):
1139        with self._lock:
1140            self._lengthbuf[0] += 1
1141
1142    def __len__(self):
1143        with self._lock:
1144            return self._lengthbuf[0]
1145
1146def _wait():
1147    # A crude wait/yield function not relying on synchronization primitives.
1148    time.sleep(0.01)
1149
1150
1151class Bunch(object):
1152    """
1153    A bunch of threads.
1154    """
1155    def __init__(self, namespace, f, args, n, wait_before_exit=False):
1156        """
1157        Construct a bunch of `n` threads running the same function `f`.
1158        If `wait_before_exit` is True, the threads won't terminate until
1159        do_finish() is called.
1160        """
1161        self.f = f
1162        self.args = args
1163        self.n = n
1164        self.started = namespace.DummyList()
1165        self.finished = namespace.DummyList()
1166        self._can_exit = namespace.Event()
1167        if not wait_before_exit:
1168            self._can_exit.set()
1169        for i in range(n):
1170            p = namespace.Process(target=self.task)
1171            p.daemon = True
1172            p.start()
1173
1174    def task(self):
1175        pid = os.getpid()
1176        self.started.append(pid)
1177        try:
1178            self.f(*self.args)
1179        finally:
1180            self.finished.append(pid)
1181            self._can_exit.wait(30)
1182            assert self._can_exit.is_set()
1183
1184    def wait_for_started(self):
1185        while len(self.started) < self.n:
1186            _wait()
1187
1188    def wait_for_finished(self):
1189        while len(self.finished) < self.n:
1190            _wait()
1191
1192    def do_finish(self):
1193        self._can_exit.set()
1194
1195
1196class AppendTrue(object):
1197    def __init__(self, obj):
1198        self.obj = obj
1199    def __call__(self):
1200        self.obj.append(True)
1201
1202
1203class _TestBarrier(BaseTestCase):
1204    """
1205    Tests for Barrier objects.
1206    """
1207    N = 5
1208    defaultTimeout = 30.0  # XXX Slow Windows buildbots need generous timeout
1209
1210    def setUp(self):
1211        self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1212
1213    def tearDown(self):
1214        self.barrier.abort()
1215        self.barrier = None
1216
1217    def DummyList(self):
1218        if self.TYPE == 'threads':
1219            return []
1220        elif self.TYPE == 'manager':
1221            return self.manager.list()
1222        else:
1223            return _DummyList()
1224
1225    def run_threads(self, f, args):
1226        b = Bunch(self, f, args, self.N-1)
1227        f(*args)
1228        b.wait_for_finished()
1229
1230    @classmethod
1231    def multipass(cls, barrier, results, n):
1232        m = barrier.parties
1233        assert m == cls.N
1234        for i in range(n):
1235            results[0].append(True)
1236            assert len(results[1]) == i * m
1237            barrier.wait()
1238            results[1].append(True)
1239            assert len(results[0]) == (i + 1) * m
1240            barrier.wait()
1241        try:
1242            assert barrier.n_waiting == 0
1243        except NotImplementedError:
1244            pass
1245        assert not barrier.broken
1246
1247    def test_barrier(self, passes=1):
1248        """
1249        Test that a barrier is passed in lockstep
1250        """
1251        results = [self.DummyList(), self.DummyList()]
1252        self.run_threads(self.multipass, (self.barrier, results, passes))
1253
1254    def test_barrier_10(self):
1255        """
1256        Test that a barrier works for 10 consecutive runs
1257        """
1258        return self.test_barrier(10)
1259
1260    @classmethod
1261    def _test_wait_return_f(cls, barrier, queue):
1262        res = barrier.wait()
1263        queue.put(res)
1264
1265    def test_wait_return(self):
1266        """
1267        test the return value from barrier.wait
1268        """
1269        queue = self.Queue()
1270        self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1271        results = [queue.get() for i in range(self.N)]
1272        self.assertEqual(results.count(0), 1)
1273
1274    @classmethod
1275    def _test_action_f(cls, barrier, results):
1276        barrier.wait()
1277        if len(results) != 1:
1278            raise RuntimeError
1279
1280    def test_action(self):
1281        """
1282        Test the 'action' callback
1283        """
1284        results = self.DummyList()
1285        barrier = self.Barrier(self.N, action=AppendTrue(results))
1286        self.run_threads(self._test_action_f, (barrier, results))
1287        self.assertEqual(len(results), 1)
1288
1289    @classmethod
1290    def _test_abort_f(cls, barrier, results1, results2):
1291        try:
1292            i = barrier.wait()
1293            if i == cls.N//2:
1294                raise RuntimeError
1295            barrier.wait()
1296            results1.append(True)
1297        except threading.BrokenBarrierError:
1298            results2.append(True)
1299        except RuntimeError:
1300            barrier.abort()
1301
1302    def test_abort(self):
1303        """
1304        Test that an abort will put the barrier in a broken state
1305        """
1306        results1 = self.DummyList()
1307        results2 = self.DummyList()
1308        self.run_threads(self._test_abort_f,
1309                         (self.barrier, results1, results2))
1310        self.assertEqual(len(results1), 0)
1311        self.assertEqual(len(results2), self.N-1)
1312        self.assertTrue(self.barrier.broken)
1313
1314    @classmethod
1315    def _test_reset_f(cls, barrier, results1, results2, results3):
1316        i = barrier.wait()
1317        if i == cls.N//2:
1318            # Wait until the other threads are all in the barrier.
1319            while barrier.n_waiting < cls.N-1:
1320                time.sleep(0.001)
1321            barrier.reset()
1322        else:
1323            try:
1324                barrier.wait()
1325                results1.append(True)
1326            except threading.BrokenBarrierError:
1327                results2.append(True)
1328        # Now, pass the barrier again
1329        barrier.wait()
1330        results3.append(True)
1331
1332    def test_reset(self):
1333        """
1334        Test that a 'reset' on a barrier frees the waiting threads
1335        """
1336        results1 = self.DummyList()
1337        results2 = self.DummyList()
1338        results3 = self.DummyList()
1339        self.run_threads(self._test_reset_f,
1340                         (self.barrier, results1, results2, results3))
1341        self.assertEqual(len(results1), 0)
1342        self.assertEqual(len(results2), self.N-1)
1343        self.assertEqual(len(results3), self.N)
1344
1345    @classmethod
1346    def _test_abort_and_reset_f(cls, barrier, barrier2,
1347                                results1, results2, results3):
1348        try:
1349            i = barrier.wait()
1350            if i == cls.N//2:
1351                raise RuntimeError
1352            barrier.wait()
1353            results1.append(True)
1354        except threading.BrokenBarrierError:
1355            results2.append(True)
1356        except RuntimeError:
1357            barrier.abort()
1358        # Synchronize and reset the barrier.  Must synchronize first so
1359        # that everyone has left it when we reset, and after so that no
1360        # one enters it before the reset.
1361        if barrier2.wait() == cls.N//2:
1362            barrier.reset()
1363        barrier2.wait()
1364        barrier.wait()
1365        results3.append(True)
1366
1367    def test_abort_and_reset(self):
1368        """
1369        Test that a barrier can be reset after being broken.
1370        """
1371        results1 = self.DummyList()
1372        results2 = self.DummyList()
1373        results3 = self.DummyList()
1374        barrier2 = self.Barrier(self.N)
1375
1376        self.run_threads(self._test_abort_and_reset_f,
1377                         (self.barrier, barrier2, results1, results2, results3))
1378        self.assertEqual(len(results1), 0)
1379        self.assertEqual(len(results2), self.N-1)
1380        self.assertEqual(len(results3), self.N)
1381
1382    @classmethod
1383    def _test_timeout_f(cls, barrier, results):
1384        i = barrier.wait()
1385        if i == cls.N//2:
1386            # One thread is late!
1387            time.sleep(1.0)
1388        try:
1389            barrier.wait(0.5)
1390        except threading.BrokenBarrierError:
1391            results.append(True)
1392
1393    def test_timeout(self):
1394        """
1395        Test wait(timeout)
1396        """
1397        results = self.DummyList()
1398        self.run_threads(self._test_timeout_f, (self.barrier, results))
1399        self.assertEqual(len(results), self.barrier.parties)
1400
1401    @classmethod
1402    def _test_default_timeout_f(cls, barrier, results):
1403        i = barrier.wait(cls.defaultTimeout)
1404        if i == cls.N//2:
1405            # One thread is later than the default timeout
1406            time.sleep(1.0)
1407        try:
1408            barrier.wait()
1409        except threading.BrokenBarrierError:
1410            results.append(True)
1411
1412    def test_default_timeout(self):
1413        """
1414        Test the barrier's default timeout
1415        """
1416        barrier = self.Barrier(self.N, timeout=0.5)
1417        results = self.DummyList()
1418        self.run_threads(self._test_default_timeout_f, (barrier, results))
1419        self.assertEqual(len(results), barrier.parties)
1420
1421    def test_single_thread(self):
1422        b = self.Barrier(1)
1423        b.wait()
1424        b.wait()
1425
1426    @classmethod
1427    def _test_thousand_f(cls, barrier, passes, conn, lock):
1428        for i in range(passes):
1429            barrier.wait()
1430            with lock:
1431                conn.send(i)
1432
1433    def test_thousand(self):
1434        if self.TYPE == 'manager':
1435            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1436        passes = 1000
1437        lock = self.Lock()
1438        conn, child_conn = self.Pipe(False)
1439        for j in range(self.N):
1440            p = self.Process(target=self._test_thousand_f,
1441                           args=(self.barrier, passes, child_conn, lock))
1442            p.start()
1443
1444        for i in range(passes):
1445            for j in range(self.N):
1446                self.assertEqual(conn.recv(), i)
1447
1448#
1449#
1450#
1451
1452class _TestValue(BaseTestCase):
1453
1454    ALLOWED_TYPES = ('processes',)
1455
1456    codes_values = [
1457        ('i', 4343, 24234),
1458        ('d', 3.625, -4.25),
1459        ('h', -232, 234),
1460        ('c', latin('x'), latin('y'))
1461        ]
1462
1463    def setUp(self):
1464        if not HAS_SHAREDCTYPES:
1465            self.skipTest("requires multiprocessing.sharedctypes")
1466
1467    @classmethod
1468    def _test(cls, values):
1469        for sv, cv in zip(values, cls.codes_values):
1470            sv.value = cv[2]
1471
1472
1473    def test_value(self, raw=False):
1474        if raw:
1475            values = [self.RawValue(code, value)
1476                      for code, value, _ in self.codes_values]
1477        else:
1478            values = [self.Value(code, value)
1479                      for code, value, _ in self.codes_values]
1480
1481        for sv, cv in zip(values, self.codes_values):
1482            self.assertEqual(sv.value, cv[1])
1483
1484        proc = self.Process(target=self._test, args=(values,))
1485        proc.daemon = True
1486        proc.start()
1487        proc.join()
1488
1489        for sv, cv in zip(values, self.codes_values):
1490            self.assertEqual(sv.value, cv[2])
1491
1492    def test_rawvalue(self):
1493        self.test_value(raw=True)
1494
1495    def test_getobj_getlock(self):
1496        val1 = self.Value('i', 5)
1497        lock1 = val1.get_lock()
1498        obj1 = val1.get_obj()
1499
1500        val2 = self.Value('i', 5, lock=None)
1501        lock2 = val2.get_lock()
1502        obj2 = val2.get_obj()
1503
1504        lock = self.Lock()
1505        val3 = self.Value('i', 5, lock=lock)
1506        lock3 = val3.get_lock()
1507        obj3 = val3.get_obj()
1508        self.assertEqual(lock, lock3)
1509
1510        arr4 = self.Value('i', 5, lock=False)
1511        self.assertFalse(hasattr(arr4, 'get_lock'))
1512        self.assertFalse(hasattr(arr4, 'get_obj'))
1513
1514        self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1515
1516        arr5 = self.RawValue('i', 5)
1517        self.assertFalse(hasattr(arr5, 'get_lock'))
1518        self.assertFalse(hasattr(arr5, 'get_obj'))
1519
1520
1521class _TestArray(BaseTestCase):
1522
1523    ALLOWED_TYPES = ('processes',)
1524
1525    @classmethod
1526    def f(cls, seq):
1527        for i in range(1, len(seq)):
1528            seq[i] += seq[i-1]
1529
1530    @unittest.skipIf(c_int is None, "requires _ctypes")
1531    def test_array(self, raw=False):
1532        seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1533        if raw:
1534            arr = self.RawArray('i', seq)
1535        else:
1536            arr = self.Array('i', seq)
1537
1538        self.assertEqual(len(arr), len(seq))
1539        self.assertEqual(arr[3], seq[3])
1540        self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1541
1542        arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1543
1544        self.assertEqual(list(arr[:]), seq)
1545
1546        self.f(seq)
1547
1548        p = self.Process(target=self.f, args=(arr,))
1549        p.daemon = True
1550        p.start()
1551        p.join()
1552
1553        self.assertEqual(list(arr[:]), seq)
1554
1555    @unittest.skipIf(c_int is None, "requires _ctypes")
1556    def test_array_from_size(self):
1557        size = 10
1558        # Test for zeroing (see issue #11675).
1559        # The repetition below strengthens the test by increasing the chances
1560        # of previously allocated non-zero memory being used for the new array
1561        # on the 2nd and 3rd loops.
1562        for _ in range(3):
1563            arr = self.Array('i', size)
1564            self.assertEqual(len(arr), size)
1565            self.assertEqual(list(arr), [0] * size)
1566            arr[:] = range(10)
1567            self.assertEqual(list(arr), list(range(10)))
1568            del arr
1569
1570    @unittest.skipIf(c_int is None, "requires _ctypes")
1571    def test_rawarray(self):
1572        self.test_array(raw=True)
1573
1574    @unittest.skipIf(c_int is None, "requires _ctypes")
1575    def test_getobj_getlock_obj(self):
1576        arr1 = self.Array('i', list(range(10)))
1577        lock1 = arr1.get_lock()
1578        obj1 = arr1.get_obj()
1579
1580        arr2 = self.Array('i', list(range(10)), lock=None)
1581        lock2 = arr2.get_lock()
1582        obj2 = arr2.get_obj()
1583
1584        lock = self.Lock()
1585        arr3 = self.Array('i', list(range(10)), lock=lock)
1586        lock3 = arr3.get_lock()
1587        obj3 = arr3.get_obj()
1588        self.assertEqual(lock, lock3)
1589
1590        arr4 = self.Array('i', range(10), lock=False)
1591        self.assertFalse(hasattr(arr4, 'get_lock'))
1592        self.assertFalse(hasattr(arr4, 'get_obj'))
1593        self.assertRaises(AttributeError,
1594                          self.Array, 'i', range(10), lock='notalock')
1595
1596        arr5 = self.RawArray('i', range(10))
1597        self.assertFalse(hasattr(arr5, 'get_lock'))
1598        self.assertFalse(hasattr(arr5, 'get_obj'))
1599
1600#
1601#
1602#
1603
1604class _TestContainers(BaseTestCase):
1605
1606    ALLOWED_TYPES = ('manager',)
1607
1608    def test_list(self):
1609        a = self.list(list(range(10)))
1610        self.assertEqual(a[:], list(range(10)))
1611
1612        b = self.list()
1613        self.assertEqual(b[:], [])
1614
1615        b.extend(list(range(5)))
1616        self.assertEqual(b[:], list(range(5)))
1617
1618        self.assertEqual(b[2], 2)
1619        self.assertEqual(b[2:10], [2,3,4])
1620
1621        b *= 2
1622        self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1623
1624        self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1625
1626        self.assertEqual(a[:], list(range(10)))
1627
1628        d = [a, b]
1629        e = self.list(d)
1630        self.assertEqual(
1631            [element[:] for element in e],
1632            [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1633            )
1634
1635        f = self.list([a])
1636        a.append('hello')
1637        self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello'])
1638
1639    def test_list_proxy_in_list(self):
1640        a = self.list([self.list(range(3)) for _i in range(3)])
1641        self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3)
1642
1643        a[0][-1] = 55
1644        self.assertEqual(a[0][:], [0, 1, 55])
1645        for i in range(1, 3):
1646            self.assertEqual(a[i][:], [0, 1, 2])
1647
1648        self.assertEqual(a[1].pop(), 2)
1649        self.assertEqual(len(a[1]), 2)
1650        for i in range(0, 3, 2):
1651            self.assertEqual(len(a[i]), 3)
1652
1653        del a
1654
1655        b = self.list()
1656        b.append(b)
1657        del b
1658
1659    def test_dict(self):
1660        d = self.dict()
1661        indices = list(range(65, 70))
1662        for i in indices:
1663            d[i] = chr(i)
1664        self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1665        self.assertEqual(sorted(d.keys()), indices)
1666        self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1667        self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1668
1669    def test_dict_proxy_nested(self):
1670        pets = self.dict(ferrets=2, hamsters=4)
1671        supplies = self.dict(water=10, feed=3)
1672        d = self.dict(pets=pets, supplies=supplies)
1673
1674        self.assertEqual(supplies['water'], 10)
1675        self.assertEqual(d['supplies']['water'], 10)
1676
1677        d['supplies']['blankets'] = 5
1678        self.assertEqual(supplies['blankets'], 5)
1679        self.assertEqual(d['supplies']['blankets'], 5)
1680
1681        d['supplies']['water'] = 7
1682        self.assertEqual(supplies['water'], 7)
1683        self.assertEqual(d['supplies']['water'], 7)
1684
1685        del pets
1686        del supplies
1687        self.assertEqual(d['pets']['ferrets'], 2)
1688        d['supplies']['blankets'] = 11
1689        self.assertEqual(d['supplies']['blankets'], 11)
1690
1691        pets = d['pets']
1692        supplies = d['supplies']
1693        supplies['water'] = 7
1694        self.assertEqual(supplies['water'], 7)
1695        self.assertEqual(d['supplies']['water'], 7)
1696
1697        d.clear()
1698        self.assertEqual(len(d), 0)
1699        self.assertEqual(supplies['water'], 7)
1700        self.assertEqual(pets['hamsters'], 4)
1701
1702        l = self.list([pets, supplies])
1703        l[0]['marmots'] = 1
1704        self.assertEqual(pets['marmots'], 1)
1705        self.assertEqual(l[0]['marmots'], 1)
1706
1707        del pets
1708        del supplies
1709        self.assertEqual(l[0]['marmots'], 1)
1710
1711        outer = self.list([[88, 99], l])
1712        self.assertIsInstance(outer[0], list)  # Not a ListProxy
1713        self.assertEqual(outer[-1][-1]['feed'], 3)
1714
1715    def test_namespace(self):
1716        n = self.Namespace()
1717        n.name = 'Bob'
1718        n.job = 'Builder'
1719        n._hidden = 'hidden'
1720        self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1721        del n.job
1722        self.assertEqual(str(n), "Namespace(name='Bob')")
1723        self.assertTrue(hasattr(n, 'name'))
1724        self.assertTrue(not hasattr(n, 'job'))
1725
1726#
1727#
1728#
1729
1730def sqr(x, wait=0.0):
1731    time.sleep(wait)
1732    return x*x
1733
1734def mul(x, y):
1735    return x*y
1736
1737def raise_large_valuerror(wait):
1738    time.sleep(wait)
1739    raise ValueError("x" * 1024**2)
1740
1741class SayWhenError(ValueError): pass
1742
1743def exception_throwing_generator(total, when):
1744    for i in range(total):
1745        if i == when:
1746            raise SayWhenError("Somebody said when")
1747        yield i
1748
1749class _TestPool(BaseTestCase):
1750
1751    @classmethod
1752    def setUpClass(cls):
1753        super().setUpClass()
1754        cls.pool = cls.Pool(4)
1755
1756    @classmethod
1757    def tearDownClass(cls):
1758        cls.pool.terminate()
1759        cls.pool.join()
1760        cls.pool = None
1761        super().tearDownClass()
1762
1763    def test_apply(self):
1764        papply = self.pool.apply
1765        self.assertEqual(papply(sqr, (5,)), sqr(5))
1766        self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1767
1768    def test_map(self):
1769        pmap = self.pool.map
1770        self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1771        self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1772                         list(map(sqr, list(range(100)))))
1773
1774    def test_starmap(self):
1775        psmap = self.pool.starmap
1776        tuples = list(zip(range(10), range(9,-1, -1)))
1777        self.assertEqual(psmap(mul, tuples),
1778                         list(itertools.starmap(mul, tuples)))
1779        tuples = list(zip(range(100), range(99,-1, -1)))
1780        self.assertEqual(psmap(mul, tuples, chunksize=20),
1781                         list(itertools.starmap(mul, tuples)))
1782
1783    def test_starmap_async(self):
1784        tuples = list(zip(range(100), range(99,-1, -1)))
1785        self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1786                         list(itertools.starmap(mul, tuples)))
1787
1788    def test_map_async(self):
1789        self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
1790                         list(map(sqr, list(range(10)))))
1791
1792    def test_map_async_callbacks(self):
1793        call_args = self.manager.list() if self.TYPE == 'manager' else []
1794        self.pool.map_async(int, ['1'],
1795                            callback=call_args.append,
1796                            error_callback=call_args.append).wait()
1797        self.assertEqual(1, len(call_args))
1798        self.assertEqual([1], call_args[0])
1799        self.pool.map_async(int, ['a'],
1800                            callback=call_args.append,
1801                            error_callback=call_args.append).wait()
1802        self.assertEqual(2, len(call_args))
1803        self.assertIsInstance(call_args[1], ValueError)
1804
1805    def test_map_unplicklable(self):
1806        # Issue #19425 -- failure to pickle should not cause a hang
1807        if self.TYPE == 'threads':
1808            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1809        class A(object):
1810            def __reduce__(self):
1811                raise RuntimeError('cannot pickle')
1812        with self.assertRaises(RuntimeError):
1813            self.pool.map(sqr, [A()]*10)
1814
1815    def test_map_chunksize(self):
1816        try:
1817            self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1818        except multiprocessing.TimeoutError:
1819            self.fail("pool.map_async with chunksize stalled on null list")
1820
1821    def test_async(self):
1822        res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1823        get = TimingWrapper(res.get)
1824        self.assertEqual(get(), 49)
1825        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1826
1827    def test_async_timeout(self):
1828        res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
1829        get = TimingWrapper(res.get)
1830        self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1831        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1832
1833    def test_imap(self):
1834        it = self.pool.imap(sqr, list(range(10)))
1835        self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1836
1837        it = self.pool.imap(sqr, list(range(10)))
1838        for i in range(10):
1839            self.assertEqual(next(it), i*i)
1840        self.assertRaises(StopIteration, it.__next__)
1841
1842        it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1843        for i in range(1000):
1844            self.assertEqual(next(it), i*i)
1845        self.assertRaises(StopIteration, it.__next__)
1846
1847    def test_imap_handle_iterable_exception(self):
1848        if self.TYPE == 'manager':
1849            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1850
1851        it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
1852        for i in range(3):
1853            self.assertEqual(next(it), i*i)
1854        self.assertRaises(SayWhenError, it.__next__)
1855
1856        # SayWhenError seen at start of problematic chunk's results
1857        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
1858        for i in range(6):
1859            self.assertEqual(next(it), i*i)
1860        self.assertRaises(SayWhenError, it.__next__)
1861        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
1862        for i in range(4):
1863            self.assertEqual(next(it), i*i)
1864        self.assertRaises(SayWhenError, it.__next__)
1865
1866    def test_imap_unordered(self):
1867        it = self.pool.imap_unordered(sqr, list(range(1000)))
1868        self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1869
1870        it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1871        self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1872
1873    def test_imap_unordered_handle_iterable_exception(self):
1874        if self.TYPE == 'manager':
1875            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1876
1877        it = self.pool.imap_unordered(sqr,
1878                                      exception_throwing_generator(10, 3),
1879                                      1)
1880        expected_values = list(map(sqr, list(range(10))))
1881        with self.assertRaises(SayWhenError):
1882            # imap_unordered makes it difficult to anticipate the SayWhenError
1883            for i in range(10):
1884                value = next(it)
1885                self.assertIn(value, expected_values)
1886                expected_values.remove(value)
1887
1888        it = self.pool.imap_unordered(sqr,
1889                                      exception_throwing_generator(20, 7),
1890                                      2)
1891        expected_values = list(map(sqr, list(range(20))))
1892        with self.assertRaises(SayWhenError):
1893            for i in range(20):
1894                value = next(it)
1895                self.assertIn(value, expected_values)
1896                expected_values.remove(value)
1897
1898    def test_make_pool(self):
1899        expected_error = (RemoteError if self.TYPE == 'manager'
1900                          else ValueError)
1901
1902        self.assertRaises(expected_error, self.Pool, -1)
1903        self.assertRaises(expected_error, self.Pool, 0)
1904
1905        if self.TYPE != 'manager':
1906            p = self.Pool(3)
1907            try:
1908                self.assertEqual(3, len(p._pool))
1909            finally:
1910                p.close()
1911                p.join()
1912
1913    def test_terminate(self):
1914        result = self.pool.map_async(
1915            time.sleep, [0.1 for i in range(10000)], chunksize=1
1916            )
1917        self.pool.terminate()
1918        join = TimingWrapper(self.pool.join)
1919        join()
1920        # Sanity check the pool didn't wait for all tasks to finish
1921        self.assertLess(join.elapsed, 2.0)
1922
1923    def test_empty_iterable(self):
1924        # See Issue 12157
1925        p = self.Pool(1)
1926
1927        self.assertEqual(p.map(sqr, []), [])
1928        self.assertEqual(list(p.imap(sqr, [])), [])
1929        self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1930        self.assertEqual(p.map_async(sqr, []).get(), [])
1931
1932        p.close()
1933        p.join()
1934
1935    def test_context(self):
1936        if self.TYPE == 'processes':
1937            L = list(range(10))
1938            expected = [sqr(i) for i in L]
1939            with self.Pool(2) as p:
1940                r = p.map_async(sqr, L)
1941                self.assertEqual(r.get(), expected)
1942            self.assertRaises(ValueError, p.map_async, sqr, L)
1943
1944    @classmethod
1945    def _test_traceback(cls):
1946        raise RuntimeError(123) # some comment
1947
1948    def test_traceback(self):
1949        # We want ensure that the traceback from the child process is
1950        # contained in the traceback raised in the main process.
1951        if self.TYPE == 'processes':
1952            with self.Pool(1) as p:
1953                try:
1954                    p.apply(self._test_traceback)
1955                except Exception as e:
1956                    exc = e
1957                else:
1958                    raise AssertionError('expected RuntimeError')
1959            self.assertIs(type(exc), RuntimeError)
1960            self.assertEqual(exc.args, (123,))
1961            cause = exc.__cause__
1962            self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback)
1963            self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
1964
1965            with test.support.captured_stderr() as f1:
1966                try:
1967                    raise exc
1968                except RuntimeError:
1969                    sys.excepthook(*sys.exc_info())
1970            self.assertIn('raise RuntimeError(123) # some comment',
1971                          f1.getvalue())
1972
1973    @classmethod
1974    def _test_wrapped_exception(cls):
1975        raise RuntimeError('foo')
1976
1977    def test_wrapped_exception(self):
1978        # Issue #20980: Should not wrap exception when using thread pool
1979        with self.Pool(1) as p:
1980            with self.assertRaises(RuntimeError):
1981                p.apply(self._test_wrapped_exception)
1982
1983    def test_map_no_failfast(self):
1984        # Issue #23992: the fail-fast behaviour when an exception is raised
1985        # during map() would make Pool.join() deadlock, because a worker
1986        # process would fill the result queue (after the result handler thread
1987        # terminated, hence not draining it anymore).
1988
1989        t_start = time.time()
1990
1991        with self.assertRaises(ValueError):
1992            with self.Pool(2) as p:
1993                try:
1994                    p.map(raise_large_valuerror, [0, 1])
1995                finally:
1996                    time.sleep(0.5)
1997                    p.close()
1998                    p.join()
1999
2000        # check that we indeed waited for all jobs
2001        self.assertGreater(time.time() - t_start, 0.9)
2002
2003
2004def raising():
2005    raise KeyError("key")
2006
2007def unpickleable_result():
2008    return lambda: 42
2009
2010class _TestPoolWorkerErrors(BaseTestCase):
2011    ALLOWED_TYPES = ('processes', )
2012
2013    def test_async_error_callback(self):
2014        p = multiprocessing.Pool(2)
2015
2016        scratchpad = [None]
2017        def errback(exc):
2018            scratchpad[0] = exc
2019
2020        res = p.apply_async(raising, error_callback=errback)
2021        self.assertRaises(KeyError, res.get)
2022        self.assertTrue(scratchpad[0])
2023        self.assertIsInstance(scratchpad[0], KeyError)
2024
2025        p.close()
2026        p.join()
2027
2028    def test_unpickleable_result(self):
2029        from multiprocessing.pool import MaybeEncodingError
2030        p = multiprocessing.Pool(2)
2031
2032        # Make sure we don't lose pool processes because of encoding errors.
2033        for iteration in range(20):
2034
2035            scratchpad = [None]
2036            def errback(exc):
2037                scratchpad[0] = exc
2038
2039            res = p.apply_async(unpickleable_result, error_callback=errback)
2040            self.assertRaises(MaybeEncodingError, res.get)
2041            wrapped = scratchpad[0]
2042            self.assertTrue(wrapped)
2043            self.assertIsInstance(scratchpad[0], MaybeEncodingError)
2044            self.assertIsNotNone(wrapped.exc)
2045            self.assertIsNotNone(wrapped.value)
2046
2047        p.close()
2048        p.join()
2049
2050class _TestPoolWorkerLifetime(BaseTestCase):
2051    ALLOWED_TYPES = ('processes', )
2052
2053    def test_pool_worker_lifetime(self):
2054        p = multiprocessing.Pool(3, maxtasksperchild=10)
2055        self.assertEqual(3, len(p._pool))
2056        origworkerpids = [w.pid for w in p._pool]
2057        # Run many tasks so each worker gets replaced (hopefully)
2058        results = []
2059        for i in range(100):
2060            results.append(p.apply_async(sqr, (i, )))
2061        # Fetch the results and verify we got the right answers,
2062        # also ensuring all the tasks have completed.
2063        for (j, res) in enumerate(results):
2064            self.assertEqual(res.get(), sqr(j))
2065        # Refill the pool
2066        p._repopulate_pool()
2067        # Wait until all workers are alive
2068        # (countdown * DELTA = 5 seconds max startup process time)
2069        countdown = 50
2070        while countdown and not all(w.is_alive() for w in p._pool):
2071            countdown -= 1
2072            time.sleep(DELTA)
2073        finalworkerpids = [w.pid for w in p._pool]
2074        # All pids should be assigned.  See issue #7805.
2075        self.assertNotIn(None, origworkerpids)
2076        self.assertNotIn(None, finalworkerpids)
2077        # Finally, check that the worker pids have changed
2078        self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
2079        p.close()
2080        p.join()
2081
2082    def test_pool_worker_lifetime_early_close(self):
2083        # Issue #10332: closing a pool whose workers have limited lifetimes
2084        # before all the tasks completed would make join() hang.
2085        p = multiprocessing.Pool(3, maxtasksperchild=1)
2086        results = []
2087        for i in range(6):
2088            results.append(p.apply_async(sqr, (i, 0.3)))
2089        p.close()
2090        p.join()
2091        # check the results
2092        for (j, res) in enumerate(results):
2093            self.assertEqual(res.get(), sqr(j))
2094
2095#
2096# Test of creating a customized manager class
2097#
2098
2099from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
2100
2101class FooBar(object):
2102    def f(self):
2103        return 'f()'
2104    def g(self):
2105        raise ValueError
2106    def _h(self):
2107        return '_h()'
2108
2109def baz():
2110    for i in range(10):
2111        yield i*i
2112
2113class IteratorProxy(BaseProxy):
2114    _exposed_ = ('__next__',)
2115    def __iter__(self):
2116        return self
2117    def __next__(self):
2118        return self._callmethod('__next__')
2119
2120class MyManager(BaseManager):
2121    pass
2122
2123MyManager.register('Foo', callable=FooBar)
2124MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
2125MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
2126
2127
2128class _TestMyManager(BaseTestCase):
2129
2130    ALLOWED_TYPES = ('manager',)
2131
2132    def test_mymanager(self):
2133        manager = MyManager()
2134        manager.start()
2135        self.common(manager)
2136        manager.shutdown()
2137
2138        # If the manager process exited cleanly then the exitcode
2139        # will be zero.  Otherwise (after a short timeout)
2140        # terminate() is used, resulting in an exitcode of -SIGTERM.
2141        self.assertEqual(manager._process.exitcode, 0)
2142
2143    def test_mymanager_context(self):
2144        with MyManager() as manager:
2145            self.common(manager)
2146        self.assertEqual(manager._process.exitcode, 0)
2147
2148    def test_mymanager_context_prestarted(self):
2149        manager = MyManager()
2150        manager.start()
2151        with manager:
2152            self.common(manager)
2153        self.assertEqual(manager._process.exitcode, 0)
2154
2155    def common(self, manager):
2156        foo = manager.Foo()
2157        bar = manager.Bar()
2158        baz = manager.baz()
2159
2160        foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
2161        bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
2162
2163        self.assertEqual(foo_methods, ['f', 'g'])
2164        self.assertEqual(bar_methods, ['f', '_h'])
2165
2166        self.assertEqual(foo.f(), 'f()')
2167        self.assertRaises(ValueError, foo.g)
2168        self.assertEqual(foo._callmethod('f'), 'f()')
2169        self.assertRaises(RemoteError, foo._callmethod, '_h')
2170
2171        self.assertEqual(bar.f(), 'f()')
2172        self.assertEqual(bar._h(), '_h()')
2173        self.assertEqual(bar._callmethod('f'), 'f()')
2174        self.assertEqual(bar._callmethod('_h'), '_h()')
2175
2176        self.assertEqual(list(baz), [i*i for i in range(10)])
2177
2178
2179#
2180# Test of connecting to a remote server and using xmlrpclib for serialization
2181#
2182
2183_queue = pyqueue.Queue()
2184def get_queue():
2185    return _queue
2186
2187class QueueManager(BaseManager):
2188    '''manager class used by server process'''
2189QueueManager.register('get_queue', callable=get_queue)
2190
2191class QueueManager2(BaseManager):
2192    '''manager class which specifies the same interface as QueueManager'''
2193QueueManager2.register('get_queue')
2194
2195
2196SERIALIZER = 'xmlrpclib'
2197
2198class _TestRemoteManager(BaseTestCase):
2199
2200    ALLOWED_TYPES = ('manager',)
2201    values = ['hello world', None, True, 2.25,
2202              'hall\xe5 v\xe4rlden',
2203              '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442',
2204              b'hall\xe5 v\xe4rlden',
2205             ]
2206    result = values[:]
2207
2208    @classmethod
2209    def _putter(cls, address, authkey):
2210        manager = QueueManager2(
2211            address=address, authkey=authkey, serializer=SERIALIZER
2212            )
2213        manager.connect()
2214        queue = manager.get_queue()
2215        # Note that xmlrpclib will deserialize object as a list not a tuple
2216        queue.put(tuple(cls.values))
2217
2218    def test_remote(self):
2219        authkey = os.urandom(32)
2220
2221        manager = QueueManager(
2222            address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER
2223            )
2224        manager.start()
2225
2226        p = self.Process(target=self._putter, args=(manager.address, authkey))
2227        p.daemon = True
2228        p.start()
2229
2230        manager2 = QueueManager2(
2231            address=manager.address, authkey=authkey, serializer=SERIALIZER
2232            )
2233        manager2.connect()
2234        queue = manager2.get_queue()
2235
2236        self.assertEqual(queue.get(), self.result)
2237
2238        # Because we are using xmlrpclib for serialization instead of
2239        # pickle this will cause a serialization error.
2240        self.assertRaises(Exception, queue.put, time.sleep)
2241
2242        # Make queue finalizer run before the server is stopped
2243        del queue
2244        manager.shutdown()
2245
2246class _TestManagerRestart(BaseTestCase):
2247
2248    @classmethod
2249    def _putter(cls, address, authkey):
2250        manager = QueueManager(
2251            address=address, authkey=authkey, serializer=SERIALIZER)
2252        manager.connect()
2253        queue = manager.get_queue()
2254        queue.put('hello world')
2255
2256    def test_rapid_restart(self):
2257        authkey = os.urandom(32)
2258        manager = QueueManager(
2259            address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
2260        srvr = manager.get_server()
2261        addr = srvr.address
2262        # Close the connection.Listener socket which gets opened as a part
2263        # of manager.get_server(). It's not needed for the test.
2264        srvr.listener.close()
2265        manager.start()
2266
2267        p = self.Process(target=self._putter, args=(manager.address, authkey))
2268        p.daemon = True
2269        p.start()
2270        queue = manager.get_queue()
2271        self.assertEqual(queue.get(), 'hello world')
2272        del queue
2273        manager.shutdown()
2274        manager = QueueManager(
2275            address=addr, authkey=authkey, serializer=SERIALIZER)
2276        try:
2277            manager.start()
2278        except OSError as e:
2279            if e.errno != errno.EADDRINUSE:
2280                raise
2281            # Retry after some time, in case the old socket was lingering
2282            # (sporadic failure on buildbots)
2283            time.sleep(1.0)
2284            manager = QueueManager(
2285                address=addr, authkey=authkey, serializer=SERIALIZER)
2286        manager.shutdown()
2287
2288#
2289#
2290#
2291
2292SENTINEL = latin('')
2293
2294class _TestConnection(BaseTestCase):
2295
2296    ALLOWED_TYPES = ('processes', 'threads')
2297
2298    @classmethod
2299    def _echo(cls, conn):
2300        for msg in iter(conn.recv_bytes, SENTINEL):
2301            conn.send_bytes(msg)
2302        conn.close()
2303
2304    def test_connection(self):
2305        conn, child_conn = self.Pipe()
2306
2307        p = self.Process(target=self._echo, args=(child_conn,))
2308        p.daemon = True
2309        p.start()
2310
2311        seq = [1, 2.25, None]
2312        msg = latin('hello world')
2313        longmsg = msg * 10
2314        arr = array.array('i', list(range(4)))
2315
2316        if self.TYPE == 'processes':
2317            self.assertEqual(type(conn.fileno()), int)
2318
2319        self.assertEqual(conn.send(seq), None)
2320        self.assertEqual(conn.recv(), seq)
2321
2322        self.assertEqual(conn.send_bytes(msg), None)
2323        self.assertEqual(conn.recv_bytes(), msg)
2324
2325        if self.TYPE == 'processes':
2326            buffer = array.array('i', [0]*10)
2327            expected = list(arr) + [0] * (10 - len(arr))
2328            self.assertEqual(conn.send_bytes(arr), None)
2329            self.assertEqual(conn.recv_bytes_into(buffer),
2330                             len(arr) * buffer.itemsize)
2331            self.assertEqual(list(buffer), expected)
2332
2333            buffer = array.array('i', [0]*10)
2334            expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
2335            self.assertEqual(conn.send_bytes(arr), None)
2336            self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
2337                             len(arr) * buffer.itemsize)
2338            self.assertEqual(list(buffer), expected)
2339
2340            buffer = bytearray(latin(' ' * 40))
2341            self.assertEqual(conn.send_bytes(longmsg), None)
2342            try:
2343                res = conn.recv_bytes_into(buffer)
2344            except multiprocessing.BufferTooShort as e:
2345                self.assertEqual(e.args, (longmsg,))
2346            else:
2347                self.fail('expected BufferTooShort, got %s' % res)
2348
2349        poll = TimingWrapper(conn.poll)
2350
2351        self.assertEqual(poll(), False)
2352        self.assertTimingAlmostEqual(poll.elapsed, 0)
2353
2354        self.assertEqual(poll(-1), False)
2355        self.assertTimingAlmostEqual(poll.elapsed, 0)
2356
2357        self.assertEqual(poll(TIMEOUT1), False)
2358        self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
2359
2360        conn.send(None)
2361        time.sleep(.1)
2362
2363        self.assertEqual(poll(TIMEOUT1), True)
2364        self.assertTimingAlmostEqual(poll.elapsed, 0)
2365
2366        self.assertEqual(conn.recv(), None)
2367
2368        really_big_msg = latin('X') * (1024 * 1024 * 16)   # 16Mb
2369        conn.send_bytes(really_big_msg)
2370        self.assertEqual(conn.recv_bytes(), really_big_msg)
2371
2372        conn.send_bytes(SENTINEL)                          # tell child to quit
2373        child_conn.close()
2374
2375        if self.TYPE == 'processes':
2376            self.assertEqual(conn.readable, True)
2377            self.assertEqual(conn.writable, True)
2378            self.assertRaises(EOFError, conn.recv)
2379            self.assertRaises(EOFError, conn.recv_bytes)
2380
2381        p.join()
2382
2383    def test_duplex_false(self):
2384        reader, writer = self.Pipe(duplex=False)
2385        self.assertEqual(writer.send(1), None)
2386        self.assertEqual(reader.recv(), 1)
2387        if self.TYPE == 'processes':
2388            self.assertEqual(reader.readable, True)
2389            self.assertEqual(reader.writable, False)
2390            self.assertEqual(writer.readable, False)
2391            self.assertEqual(writer.writable, True)
2392            self.assertRaises(OSError, reader.send, 2)
2393            self.assertRaises(OSError, writer.recv)
2394            self.assertRaises(OSError, writer.poll)
2395
2396    def test_spawn_close(self):
2397        # We test that a pipe connection can be closed by parent
2398        # process immediately after child is spawned.  On Windows this
2399        # would have sometimes failed on old versions because
2400        # child_conn would be closed before the child got a chance to
2401        # duplicate it.
2402        conn, child_conn = self.Pipe()
2403
2404        p = self.Process(target=self._echo, args=(child_conn,))
2405        p.daemon = True
2406        p.start()
2407        child_conn.close()    # this might complete before child initializes
2408
2409        msg = latin('hello')
2410        conn.send_bytes(msg)
2411        self.assertEqual(conn.recv_bytes(), msg)
2412
2413        conn.send_bytes(SENTINEL)
2414        conn.close()
2415        p.join()
2416
2417    def test_sendbytes(self):
2418        if self.TYPE != 'processes':
2419            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2420
2421        msg = latin('abcdefghijklmnopqrstuvwxyz')
2422        a, b = self.Pipe()
2423
2424        a.send_bytes(msg)
2425        self.assertEqual(b.recv_bytes(), msg)
2426
2427        a.send_bytes(msg, 5)
2428        self.assertEqual(b.recv_bytes(), msg[5:])
2429
2430        a.send_bytes(msg, 7, 8)
2431        self.assertEqual(b.recv_bytes(), msg[7:7+8])
2432
2433        a.send_bytes(msg, 26)
2434        self.assertEqual(b.recv_bytes(), latin(''))
2435
2436        a.send_bytes(msg, 26, 0)
2437        self.assertEqual(b.recv_bytes(), latin(''))
2438
2439        self.assertRaises(ValueError, a.send_bytes, msg, 27)
2440
2441        self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
2442
2443        self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
2444
2445        self.assertRaises(ValueError, a.send_bytes, msg, -1)
2446
2447        self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
2448
2449    @classmethod
2450    def _is_fd_assigned(cls, fd):
2451        try:
2452            os.fstat(fd)
2453        except OSError as e:
2454            if e.errno == errno.EBADF:
2455                return False
2456            raise
2457        else:
2458            return True
2459
2460    @classmethod
2461    def _writefd(cls, conn, data, create_dummy_fds=False):
2462        if create_dummy_fds:
2463            for i in range(0, 256):
2464                if not cls._is_fd_assigned(i):
2465                    os.dup2(conn.fileno(), i)
2466        fd = reduction.recv_handle(conn)
2467        if msvcrt:
2468            fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
2469        os.write(fd, data)
2470        os.close(fd)
2471
2472    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
2473    def test_fd_transfer(self):
2474        if self.TYPE != 'processes':
2475            self.skipTest("only makes sense with processes")
2476        conn, child_conn = self.Pipe(duplex=True)
2477
2478        p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
2479        p.daemon = True
2480        p.start()
2481        self.addCleanup(test.support.unlink, test.support.TESTFN)
2482        with open(test.support.TESTFN, "wb") as f:
2483            fd = f.fileno()
2484            if msvcrt:
2485                fd = msvcrt.get_osfhandle(fd)
2486            reduction.send_handle(conn, fd, p.pid)
2487        p.join()
2488        with open(test.support.TESTFN, "rb") as f:
2489            self.assertEqual(f.read(), b"foo")
2490
2491    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
2492    @unittest.skipIf(sys.platform == "win32",
2493                     "test semantics don't make sense on Windows")
2494    @unittest.skipIf(MAXFD <= 256,
2495                     "largest assignable fd number is too small")
2496    @unittest.skipUnless(hasattr(os, "dup2"),
2497                         "test needs os.dup2()")
2498    def test_large_fd_transfer(self):
2499        # With fd > 256 (issue #11657)
2500        if self.TYPE != 'processes':
2501            self.skipTest("only makes sense with processes")
2502        conn, child_conn = self.Pipe(duplex=True)
2503
2504        p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
2505        p.daemon = True
2506        p.start()
2507        self.addCleanup(test.support.unlink, test.support.TESTFN)
2508        with open(test.support.TESTFN, "wb") as f:
2509            fd = f.fileno()
2510            for newfd in range(256, MAXFD):
2511                if not self._is_fd_assigned(newfd):
2512                    break
2513            else:
2514                self.fail("could not find an unassigned large file descriptor")
2515            os.dup2(fd, newfd)
2516            try:
2517                reduction.send_handle(conn, newfd, p.pid)
2518            finally:
2519                os.close(newfd)
2520        p.join()
2521        with open(test.support.TESTFN, "rb") as f:
2522            self.assertEqual(f.read(), b"bar")
2523
2524    @classmethod
2525    def _send_data_without_fd(self, conn):
2526        os.write(conn.fileno(), b"\0")
2527
2528    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
2529    @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
2530    def test_missing_fd_transfer(self):
2531        # Check that exception is raised when received data is not
2532        # accompanied by a file descriptor in ancillary data.
2533        if self.TYPE != 'processes':
2534            self.skipTest("only makes sense with processes")
2535        conn, child_conn = self.Pipe(duplex=True)
2536
2537        p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
2538        p.daemon = True
2539        p.start()
2540        self.assertRaises(RuntimeError, reduction.recv_handle, conn)
2541        p.join()
2542
2543    def test_context(self):
2544        a, b = self.Pipe()
2545
2546        with a, b:
2547            a.send(1729)
2548            self.assertEqual(b.recv(), 1729)
2549            if self.TYPE == 'processes':
2550                self.assertFalse(a.closed)
2551                self.assertFalse(b.closed)
2552
2553        if self.TYPE == 'processes':
2554            self.assertTrue(a.closed)
2555            self.assertTrue(b.closed)
2556            self.assertRaises(OSError, a.recv)
2557            self.assertRaises(OSError, b.recv)
2558
2559class _TestListener(BaseTestCase):
2560
2561    ALLOWED_TYPES = ('processes',)
2562
2563    def test_multiple_bind(self):
2564        for family in self.connection.families:
2565            l = self.connection.Listener(family=family)
2566            self.addCleanup(l.close)
2567            self.assertRaises(OSError, self.connection.Listener,
2568                              l.address, family)
2569
2570    def test_context(self):
2571        with self.connection.Listener() as l:
2572            with self.connection.Client(l.address) as c:
2573                with l.accept() as d:
2574                    c.send(1729)
2575                    self.assertEqual(d.recv(), 1729)
2576
2577        if self.TYPE == 'processes':
2578            self.assertRaises(OSError, l.accept)
2579
2580class _TestListenerClient(BaseTestCase):
2581
2582    ALLOWED_TYPES = ('processes', 'threads')
2583
2584    @classmethod
2585    def _test(cls, address):
2586        conn = cls.connection.Client(address)
2587        conn.send('hello')
2588        conn.close()
2589
2590    def test_listener_client(self):
2591        for family in self.connection.families:
2592            l = self.connection.Listener(family=family)
2593            p = self.Process(target=self._test, args=(l.address,))
2594            p.daemon = True
2595            p.start()
2596            conn = l.accept()
2597            self.assertEqual(conn.recv(), 'hello')
2598            p.join()
2599            l.close()
2600
2601    def test_issue14725(self):
2602        l = self.connection.Listener()
2603        p = self.Process(target=self._test, args=(l.address,))
2604        p.daemon = True
2605        p.start()
2606        time.sleep(1)
2607        # On Windows the client process should by now have connected,
2608        # written data and closed the pipe handle by now.  This causes
2609        # ConnectNamdedPipe() to fail with ERROR_NO_DATA.  See Issue
2610        # 14725.
2611        conn = l.accept()
2612        self.assertEqual(conn.recv(), 'hello')
2613        conn.close()
2614        p.join()
2615        l.close()
2616
2617    def test_issue16955(self):
2618        for fam in self.connection.families:
2619            l = self.connection.Listener(family=fam)
2620            c = self.connection.Client(l.address)
2621            a = l.accept()
2622            a.send_bytes(b"hello")
2623            self.assertTrue(c.poll(1))
2624            a.close()
2625            c.close()
2626            l.close()
2627
2628class _TestPoll(BaseTestCase):
2629
2630    ALLOWED_TYPES = ('processes', 'threads')
2631
2632    def test_empty_string(self):
2633        a, b = self.Pipe()
2634        self.assertEqual(a.poll(), False)
2635        b.send_bytes(b'')
2636        self.assertEqual(a.poll(), True)
2637        self.assertEqual(a.poll(), True)
2638
2639    @classmethod
2640    def _child_strings(cls, conn, strings):
2641        for s in strings:
2642            time.sleep(0.1)
2643            conn.send_bytes(s)
2644        conn.close()
2645
2646    def test_strings(self):
2647        strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
2648        a, b = self.Pipe()
2649        p = self.Process(target=self._child_strings, args=(b, strings))
2650        p.start()
2651
2652        for s in strings:
2653            for i in range(200):
2654                if a.poll(0.01):
2655                    break
2656            x = a.recv_bytes()
2657            self.assertEqual(s, x)
2658
2659        p.join()
2660
2661    @classmethod
2662    def _child_boundaries(cls, r):
2663        # Polling may "pull" a message in to the child process, but we
2664        # don't want it to pull only part of a message, as that would
2665        # corrupt the pipe for any other processes which might later
2666        # read from it.
2667        r.poll(5)
2668
2669    def test_boundaries(self):
2670        r, w = self.Pipe(False)
2671        p = self.Process(target=self._child_boundaries, args=(r,))
2672        p.start()
2673        time.sleep(2)
2674        L = [b"first", b"second"]
2675        for obj in L:
2676            w.send_bytes(obj)
2677        w.close()
2678        p.join()
2679        self.assertIn(r.recv_bytes(), L)
2680
2681    @classmethod
2682    def _child_dont_merge(cls, b):
2683        b.send_bytes(b'a')
2684        b.send_bytes(b'b')
2685        b.send_bytes(b'cd')
2686
2687    def test_dont_merge(self):
2688        a, b = self.Pipe()
2689        self.assertEqual(a.poll(0.0), False)
2690        self.assertEqual(a.poll(0.1), False)
2691
2692        p = self.Process(target=self._child_dont_merge, args=(b,))
2693        p.start()
2694
2695        self.assertEqual(a.recv_bytes(), b'a')
2696        self.assertEqual(a.poll(1.0), True)
2697        self.assertEqual(a.poll(1.0), True)
2698        self.assertEqual(a.recv_bytes(), b'b')
2699        self.assertEqual(a.poll(1.0), True)
2700        self.assertEqual(a.poll(1.0), True)
2701        self.assertEqual(a.poll(0.0), True)
2702        self.assertEqual(a.recv_bytes(), b'cd')
2703
2704        p.join()
2705
2706#
2707# Test of sending connection and socket objects between processes
2708#
2709
2710@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
2711class _TestPicklingConnections(BaseTestCase):
2712
2713    ALLOWED_TYPES = ('processes',)
2714
2715    @classmethod
2716    def tearDownClass(cls):
2717        from multiprocessing import resource_sharer
2718        resource_sharer.stop(timeout=5)
2719
2720    @classmethod
2721    def _listener(cls, conn, families):
2722        for fam in families:
2723            l = cls.connection.Listener(family=fam)
2724            conn.send(l.address)
2725            new_conn = l.accept()
2726            conn.send(new_conn)
2727            new_conn.close()
2728            l.close()
2729
2730        l = socket.socket()
2731        l.bind((test.support.HOST, 0))
2732        l.listen()
2733        conn.send(l.getsockname())
2734        new_conn, addr = l.accept()
2735        conn.send(new_conn)
2736        new_conn.close()
2737        l.close()
2738
2739        conn.recv()
2740
2741    @classmethod
2742    def _remote(cls, conn):
2743        for (address, msg) in iter(conn.recv, None):
2744            client = cls.connection.Client(address)
2745            client.send(msg.upper())
2746            client.close()
2747
2748        address, msg = conn.recv()
2749        client = socket.socket()
2750        client.connect(address)
2751        client.sendall(msg.upper())
2752        client.close()
2753
2754        conn.close()
2755
2756    def test_pickling(self):
2757        families = self.connection.families
2758
2759        lconn, lconn0 = self.Pipe()
2760        lp = self.Process(target=self._listener, args=(lconn0, families))
2761        lp.daemon = True
2762        lp.start()
2763        lconn0.close()
2764
2765        rconn, rconn0 = self.Pipe()
2766        rp = self.Process(target=self._remote, args=(rconn0,))
2767        rp.daemon = True
2768        rp.start()
2769        rconn0.close()
2770
2771        for fam in families:
2772            msg = ('This connection uses family %s' % fam).encode('ascii')
2773            address = lconn.recv()
2774            rconn.send((address, msg))
2775            new_conn = lconn.recv()
2776            self.assertEqual(new_conn.recv(), msg.upper())
2777
2778        rconn.send(None)
2779
2780        msg = latin('This connection uses a normal socket')
2781        address = lconn.recv()
2782        rconn.send((address, msg))
2783        new_conn = lconn.recv()
2784        buf = []
2785        while True:
2786            s = new_conn.recv(100)
2787            if not s:
2788                break
2789            buf.append(s)
2790        buf = b''.join(buf)
2791        self.assertEqual(buf, msg.upper())
2792        new_conn.close()
2793
2794        lconn.send(None)
2795
2796        rconn.close()
2797        lconn.close()
2798
2799        lp.join()
2800        rp.join()
2801
2802    @classmethod
2803    def child_access(cls, conn):
2804        w = conn.recv()
2805        w.send('all is well')
2806        w.close()
2807
2808        r = conn.recv()
2809        msg = r.recv()
2810        conn.send(msg*2)
2811
2812        conn.close()
2813
2814    def test_access(self):
2815        # On Windows, if we do not specify a destination pid when
2816        # using DupHandle then we need to be careful to use the
2817        # correct access flags for DuplicateHandle(), or else
2818        # DupHandle.detach() will raise PermissionError.  For example,
2819        # for a read only pipe handle we should use
2820        # access=FILE_GENERIC_READ.  (Unfortunately
2821        # DUPLICATE_SAME_ACCESS does not work.)
2822        conn, child_conn = self.Pipe()
2823        p = self.Process(target=self.child_access, args=(child_conn,))
2824        p.daemon = True
2825        p.start()
2826        child_conn.close()
2827
2828        r, w = self.Pipe(duplex=False)
2829        conn.send(w)
2830        w.close()
2831        self.assertEqual(r.recv(), 'all is well')
2832        r.close()
2833
2834        r, w = self.Pipe(duplex=False)
2835        conn.send(r)
2836        r.close()
2837        w.send('foobar')
2838        w.close()
2839        self.assertEqual(conn.recv(), 'foobar'*2)
2840
2841#
2842#
2843#
2844
2845class _TestHeap(BaseTestCase):
2846
2847    ALLOWED_TYPES = ('processes',)
2848
2849    def test_heap(self):
2850        iterations = 5000
2851        maxblocks = 50
2852        blocks = []
2853
2854        # create and destroy lots of blocks of different sizes
2855        for i in range(iterations):
2856            size = int(random.lognormvariate(0, 1) * 1000)
2857            b = multiprocessing.heap.BufferWrapper(size)
2858            blocks.append(b)
2859            if len(blocks) > maxblocks:
2860                i = random.randrange(maxblocks)
2861                del blocks[i]
2862
2863        # get the heap object
2864        heap = multiprocessing.heap.BufferWrapper._heap
2865
2866        # verify the state of the heap
2867        all = []
2868        occupied = 0
2869        heap._lock.acquire()
2870        self.addCleanup(heap._lock.release)
2871        for L in list(heap._len_to_seq.values()):
2872            for arena, start, stop in L:
2873                all.append((heap._arenas.index(arena), start, stop,
2874                            stop-start, 'free'))
2875        for arena, start, stop in heap._allocated_blocks:
2876            all.append((heap._arenas.index(arena), start, stop,
2877                        stop-start, 'occupied'))
2878            occupied += (stop-start)
2879
2880        all.sort()
2881
2882        for i in range(len(all)-1):
2883            (arena, start, stop) = all[i][:3]
2884            (narena, nstart, nstop) = all[i+1][:3]
2885            self.assertTrue((arena != narena and nstart == 0) or
2886                            (stop == nstart))
2887
2888    def test_free_from_gc(self):
2889        # Check that freeing of blocks by the garbage collector doesn't deadlock
2890        # (issue #12352).
2891        # Make sure the GC is enabled, and set lower collection thresholds to
2892        # make collections more frequent (and increase the probability of
2893        # deadlock).
2894        if not gc.isenabled():
2895            gc.enable()
2896            self.addCleanup(gc.disable)
2897        thresholds = gc.get_threshold()
2898        self.addCleanup(gc.set_threshold, *thresholds)
2899        gc.set_threshold(10)
2900
2901        # perform numerous block allocations, with cyclic references to make
2902        # sure objects are collected asynchronously by the gc
2903        for i in range(5000):
2904            a = multiprocessing.heap.BufferWrapper(1)
2905            b = multiprocessing.heap.BufferWrapper(1)
2906            # circular references
2907            a.buddy = b
2908            b.buddy = a
2909
2910#
2911#
2912#
2913
2914class _Foo(Structure):
2915    _fields_ = [
2916        ('x', c_int),
2917        ('y', c_double)
2918        ]
2919
2920class _TestSharedCTypes(BaseTestCase):
2921
2922    ALLOWED_TYPES = ('processes',)
2923
2924    def setUp(self):
2925        if not HAS_SHAREDCTYPES:
2926            self.skipTest("requires multiprocessing.sharedctypes")
2927
2928    @classmethod
2929    def _double(cls, x, y, foo, arr, string):
2930        x.value *= 2
2931        y.value *= 2
2932        foo.x *= 2
2933        foo.y *= 2
2934        string.value *= 2
2935        for i in range(len(arr)):
2936            arr[i] *= 2
2937
2938    def test_sharedctypes(self, lock=False):
2939        x = Value('i', 7, lock=lock)
2940        y = Value(c_double, 1.0/3.0, lock=lock)
2941        foo = Value(_Foo, 3, 2, lock=lock)
2942        arr = self.Array('d', list(range(10)), lock=lock)
2943        string = self.Array('c', 20, lock=lock)
2944        string.value = latin('hello')
2945
2946        p = self.Process(target=self._double, args=(x, y, foo, arr, string))
2947        p.daemon = True
2948        p.start()
2949        p.join()
2950
2951        self.assertEqual(x.value, 14)
2952        self.assertAlmostEqual(y.value, 2.0/3.0)
2953        self.assertEqual(foo.x, 6)
2954        self.assertAlmostEqual(foo.y, 4.0)
2955        for i in range(10):
2956            self.assertAlmostEqual(arr[i], i*2)
2957        self.assertEqual(string.value, latin('hellohello'))
2958
2959    def test_synchronize(self):
2960        self.test_sharedctypes(lock=True)
2961
2962    def test_copy(self):
2963        foo = _Foo(2, 5.0)
2964        bar = copy(foo)
2965        foo.x = 0
2966        foo.y = 0
2967        self.assertEqual(bar.x, 2)
2968        self.assertAlmostEqual(bar.y, 5.0)
2969
2970#
2971#
2972#
2973
2974class _TestFinalize(BaseTestCase):
2975
2976    ALLOWED_TYPES = ('processes',)
2977
2978    @classmethod
2979    def _test_finalize(cls, conn):
2980        class Foo(object):
2981            pass
2982
2983        a = Foo()
2984        util.Finalize(a, conn.send, args=('a',))
2985        del a           # triggers callback for a
2986
2987        b = Foo()
2988        close_b = util.Finalize(b, conn.send, args=('b',))
2989        close_b()       # triggers callback for b
2990        close_b()       # does nothing because callback has already been called
2991        del b           # does nothing because callback has already been called
2992
2993        c = Foo()
2994        util.Finalize(c, conn.send, args=('c',))
2995
2996        d10 = Foo()
2997        util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2998
2999        d01 = Foo()
3000        util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
3001        d02 = Foo()
3002        util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
3003        d03 = Foo()
3004        util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
3005
3006        util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
3007
3008        util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
3009
3010        # call multiprocessing's cleanup function then exit process without
3011        # garbage collecting locals
3012        util._exit_function()
3013        conn.close()
3014        os._exit(0)
3015
3016    def test_finalize(self):
3017        conn, child_conn = self.Pipe()
3018
3019        p = self.Process(target=self._test_finalize, args=(child_conn,))
3020        p.daemon = True
3021        p.start()
3022        p.join()
3023
3024        result = [obj for obj in iter(conn.recv, 'STOP')]
3025        self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
3026
3027#
3028# Test that from ... import * works for each module
3029#
3030
3031class _TestImportStar(unittest.TestCase):
3032
3033    def get_module_names(self):
3034        import glob
3035        folder = os.path.dirname(multiprocessing.__file__)
3036        pattern = os.path.join(folder, '*.py')
3037        files = glob.glob(pattern)
3038        modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files]
3039        modules = ['multiprocessing.' + m for m in modules]
3040        modules.remove('multiprocessing.__init__')
3041        modules.append('multiprocessing')
3042        return modules
3043
3044    def test_import(self):
3045        modules = self.get_module_names()
3046        if sys.platform == 'win32':
3047            modules.remove('multiprocessing.popen_fork')
3048            modules.remove('multiprocessing.popen_forkserver')
3049            modules.remove('multiprocessing.popen_spawn_posix')
3050        else:
3051            modules.remove('multiprocessing.popen_spawn_win32')
3052            if not HAS_REDUCTION:
3053                modules.remove('multiprocessing.popen_forkserver')
3054
3055        if c_int is None:
3056            # This module requires _ctypes
3057            modules.remove('multiprocessing.sharedctypes')
3058
3059        for name in modules:
3060            __import__(name)
3061            mod = sys.modules[name]
3062            self.assertTrue(hasattr(mod, '__all__'), name)
3063
3064            for attr in mod.__all__:
3065                self.assertTrue(
3066                    hasattr(mod, attr),
3067                    '%r does not have attribute %r' % (mod, attr)
3068                    )
3069
3070#
3071# Quick test that logging works -- does not test logging output
3072#
3073
3074class _TestLogging(BaseTestCase):
3075
3076    ALLOWED_TYPES = ('processes',)
3077
3078    def test_enable_logging(self):
3079        logger = multiprocessing.get_logger()
3080        logger.setLevel(util.SUBWARNING)
3081        self.assertTrue(logger is not None)
3082        logger.debug('this will not be printed')
3083        logger.info('nor will this')
3084        logger.setLevel(LOG_LEVEL)
3085
3086    @classmethod
3087    def _test_level(cls, conn):
3088        logger = multiprocessing.get_logger()
3089        conn.send(logger.getEffectiveLevel())
3090
3091    def test_level(self):
3092        LEVEL1 = 32
3093        LEVEL2 = 37
3094
3095        logger = multiprocessing.get_logger()
3096        root_logger = logging.getLogger()
3097        root_level = root_logger.level
3098
3099        reader, writer = multiprocessing.Pipe(duplex=False)
3100
3101        logger.setLevel(LEVEL1)
3102        p = self.Process(target=self._test_level, args=(writer,))
3103        p.daemon = True
3104        p.start()
3105        self.assertEqual(LEVEL1, reader.recv())
3106
3107        logger.setLevel(logging.NOTSET)
3108        root_logger.setLevel(LEVEL2)
3109        p = self.Process(target=self._test_level, args=(writer,))
3110        p.daemon = True
3111        p.start()
3112        self.assertEqual(LEVEL2, reader.recv())
3113
3114        root_logger.setLevel(root_level)
3115        logger.setLevel(level=LOG_LEVEL)
3116
3117
3118# class _TestLoggingProcessName(BaseTestCase):
3119#
3120#     def handle(self, record):
3121#         assert record.processName == multiprocessing.current_process().name
3122#         self.__handled = True
3123#
3124#     def test_logging(self):
3125#         handler = logging.Handler()
3126#         handler.handle = self.handle
3127#         self.__handled = False
3128#         # Bypass getLogger() and side-effects
3129#         logger = logging.getLoggerClass()(
3130#                 'multiprocessing.test.TestLoggingProcessName')
3131#         logger.addHandler(handler)
3132#         logger.propagate = False
3133#
3134#         logger.warn('foo')
3135#         assert self.__handled
3136
3137#
3138# Check that Process.join() retries if os.waitpid() fails with EINTR
3139#
3140
3141class _TestPollEintr(BaseTestCase):
3142
3143    ALLOWED_TYPES = ('processes',)
3144
3145    @classmethod
3146    def _killer(cls, pid):
3147        time.sleep(0.1)
3148        os.kill(pid, signal.SIGUSR1)
3149
3150    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3151    def test_poll_eintr(self):
3152        got_signal = [False]
3153        def record(*args):
3154            got_signal[0] = True
3155        pid = os.getpid()
3156        oldhandler = signal.signal(signal.SIGUSR1, record)
3157        try:
3158            killer = self.Process(target=self._killer, args=(pid,))
3159            killer.start()
3160            try:
3161                p = self.Process(target=time.sleep, args=(2,))
3162                p.start()
3163                p.join()
3164            finally:
3165                killer.join()
3166            self.assertTrue(got_signal[0])
3167            self.assertEqual(p.exitcode, 0)
3168        finally:
3169            signal.signal(signal.SIGUSR1, oldhandler)
3170
3171#
3172# Test to verify handle verification, see issue 3321
3173#
3174
3175class TestInvalidHandle(unittest.TestCase):
3176
3177    @unittest.skipIf(WIN32, "skipped on Windows")
3178    def test_invalid_handles(self):
3179        conn = multiprocessing.connection.Connection(44977608)
3180        # check that poll() doesn't crash
3181        try:
3182            conn.poll()
3183        except (ValueError, OSError):
3184            pass
3185        finally:
3186            # Hack private attribute _handle to avoid printing an error
3187            # in conn.__del__
3188            conn._handle = None
3189        self.assertRaises((ValueError, OSError),
3190                          multiprocessing.connection.Connection, -1)
3191
3192
3193
3194class OtherTest(unittest.TestCase):
3195    # TODO: add more tests for deliver/answer challenge.
3196    def test_deliver_challenge_auth_failure(self):
3197        class _FakeConnection(object):
3198            def recv_bytes(self, size):
3199                return b'something bogus'
3200            def send_bytes(self, data):
3201                pass
3202        self.assertRaises(multiprocessing.AuthenticationError,
3203                          multiprocessing.connection.deliver_challenge,
3204                          _FakeConnection(), b'abc')
3205
3206    def test_answer_challenge_auth_failure(self):
3207        class _FakeConnection(object):
3208            def __init__(self):
3209                self.count = 0
3210            def recv_bytes(self, size):
3211                self.count += 1
3212                if self.count == 1:
3213                    return multiprocessing.connection.CHALLENGE
3214                elif self.count == 2:
3215                    return b'something bogus'
3216                return b''
3217            def send_bytes(self, data):
3218                pass
3219        self.assertRaises(multiprocessing.AuthenticationError,
3220                          multiprocessing.connection.answer_challenge,
3221                          _FakeConnection(), b'abc')
3222
3223#
3224# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
3225#
3226
3227def initializer(ns):
3228    ns.test += 1
3229
3230class TestInitializers(unittest.TestCase):
3231    def setUp(self):
3232        self.mgr = multiprocessing.Manager()
3233        self.ns = self.mgr.Namespace()
3234        self.ns.test = 0
3235
3236    def tearDown(self):
3237        self.mgr.shutdown()
3238        self.mgr.join()
3239
3240    def test_manager_initializer(self):
3241        m = multiprocessing.managers.SyncManager()
3242        self.assertRaises(TypeError, m.start, 1)
3243        m.start(initializer, (self.ns,))
3244        self.assertEqual(self.ns.test, 1)
3245        m.shutdown()
3246        m.join()
3247
3248    def test_pool_initializer(self):
3249        self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
3250        p = multiprocessing.Pool(1, initializer, (self.ns,))
3251        p.close()
3252        p.join()
3253        self.assertEqual(self.ns.test, 1)
3254
3255#
3256# Issue 5155, 5313, 5331: Test process in processes
3257# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
3258#
3259
3260def _this_sub_process(q):
3261    try:
3262        item = q.get(block=False)
3263    except pyqueue.Empty:
3264        pass
3265
3266def _test_process(q):
3267    queue = multiprocessing.Queue()
3268    subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
3269    subProc.daemon = True
3270    subProc.start()
3271    subProc.join()
3272
3273def _afunc(x):
3274    return x*x
3275
3276def pool_in_process():
3277    pool = multiprocessing.Pool(processes=4)
3278    x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
3279    pool.close()
3280    pool.join()
3281
3282class _file_like(object):
3283    def __init__(self, delegate):
3284        self._delegate = delegate
3285        self._pid = None
3286
3287    @property
3288    def cache(self):
3289        pid = os.getpid()
3290        # There are no race conditions since fork keeps only the running thread
3291        if pid != self._pid:
3292            self._pid = pid
3293            self._cache = []
3294        return self._cache
3295
3296    def write(self, data):
3297        self.cache.append(data)
3298
3299    def flush(self):
3300        self._delegate.write(''.join(self.cache))
3301        self._cache = []
3302
3303class TestStdinBadfiledescriptor(unittest.TestCase):
3304
3305    def test_queue_in_process(self):
3306        queue = multiprocessing.Queue()
3307        proc = multiprocessing.Process(target=_test_process, args=(queue,))
3308        proc.start()
3309        proc.join()
3310
3311    def test_pool_in_process(self):
3312        p = multiprocessing.Process(target=pool_in_process)
3313        p.start()
3314        p.join()
3315
3316    def test_flushing(self):
3317        sio = io.StringIO()
3318        flike = _file_like(sio)
3319        flike.write('foo')
3320        proc = multiprocessing.Process(target=lambda: flike.flush())
3321        flike.flush()
3322        assert sio.getvalue() == 'foo'
3323
3324
3325class TestWait(unittest.TestCase):
3326
3327    @classmethod
3328    def _child_test_wait(cls, w, slow):
3329        for i in range(10):
3330            if slow:
3331                time.sleep(random.random()*0.1)
3332            w.send((i, os.getpid()))
3333        w.close()
3334
3335    def test_wait(self, slow=False):
3336        from multiprocessing.connection import wait
3337        readers = []
3338        procs = []
3339        messages = []
3340
3341        for i in range(4):
3342            r, w = multiprocessing.Pipe(duplex=False)
3343            p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
3344            p.daemon = True
3345            p.start()
3346            w.close()
3347            readers.append(r)
3348            procs.append(p)
3349            self.addCleanup(p.join)
3350
3351        while readers:
3352            for r in wait(readers):
3353                try:
3354                    msg = r.recv()
3355                except EOFError:
3356                    readers.remove(r)
3357                    r.close()
3358                else:
3359                    messages.append(msg)
3360
3361        messages.sort()
3362        expected = sorted((i, p.pid) for i in range(10) for p in procs)
3363        self.assertEqual(messages, expected)
3364
3365    @classmethod
3366    def _child_test_wait_socket(cls, address, slow):
3367        s = socket.socket()
3368        s.connect(address)
3369        for i in range(10):
3370            if slow:
3371                time.sleep(random.random()*0.1)
3372            s.sendall(('%s\n' % i).encode('ascii'))
3373        s.close()
3374
3375    def test_wait_socket(self, slow=False):
3376        from multiprocessing.connection import wait
3377        l = socket.socket()
3378        l.bind((test.support.HOST, 0))
3379        l.listen()
3380        addr = l.getsockname()
3381        readers = []
3382        procs = []
3383        dic = {}
3384
3385        for i in range(4):
3386            p = multiprocessing.Process(target=self._child_test_wait_socket,
3387                                        args=(addr, slow))
3388            p.daemon = True
3389            p.start()
3390            procs.append(p)
3391            self.addCleanup(p.join)
3392
3393        for i in range(4):
3394            r, _ = l.accept()
3395            readers.append(r)
3396            dic[r] = []
3397        l.close()
3398
3399        while readers:
3400            for r in wait(readers):
3401                msg = r.recv(32)
3402                if not msg:
3403                    readers.remove(r)
3404                    r.close()
3405                else:
3406                    dic[r].append(msg)
3407
3408        expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
3409        for v in dic.values():
3410            self.assertEqual(b''.join(v), expected)
3411
3412    def test_wait_slow(self):
3413        self.test_wait(True)
3414
3415    def test_wait_socket_slow(self):
3416        self.test_wait_socket(True)
3417
3418    def test_wait_timeout(self):
3419        from multiprocessing.connection import wait
3420
3421        expected = 5
3422        a, b = multiprocessing.Pipe()
3423
3424        start = time.time()
3425        res = wait([a, b], expected)
3426        delta = time.time() - start
3427
3428        self.assertEqual(res, [])
3429        self.assertLess(delta, expected * 2)
3430        self.assertGreater(delta, expected * 0.5)
3431
3432        b.send(None)
3433
3434        start = time.time()
3435        res = wait([a, b], 20)
3436        delta = time.time() - start
3437
3438        self.assertEqual(res, [a])
3439        self.assertLess(delta, 0.4)
3440
3441    @classmethod
3442    def signal_and_sleep(cls, sem, period):
3443        sem.release()
3444        time.sleep(period)
3445
3446    def test_wait_integer(self):
3447        from multiprocessing.connection import wait
3448
3449        expected = 3
3450        sorted_ = lambda l: sorted(l, key=lambda x: id(x))
3451        sem = multiprocessing.Semaphore(0)
3452        a, b = multiprocessing.Pipe()
3453        p = multiprocessing.Process(target=self.signal_and_sleep,
3454                                    args=(sem, expected))
3455
3456        p.start()
3457        self.assertIsInstance(p.sentinel, int)
3458        self.assertTrue(sem.acquire(timeout=20))
3459
3460        start = time.time()
3461        res = wait([a, p.sentinel, b], expected + 20)
3462        delta = time.time() - start
3463
3464        self.assertEqual(res, [p.sentinel])
3465        self.assertLess(delta, expected + 2)
3466        self.assertGreater(delta, expected - 2)
3467
3468        a.send(None)
3469
3470        start = time.time()
3471        res = wait([a, p.sentinel, b], 20)
3472        delta = time.time() - start
3473
3474        self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
3475        self.assertLess(delta, 0.4)
3476
3477        b.send(None)
3478
3479        start = time.time()
3480        res = wait([a, p.sentinel, b], 20)
3481        delta = time.time() - start
3482
3483        self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
3484        self.assertLess(delta, 0.4)
3485
3486        p.terminate()
3487        p.join()
3488
3489    def test_neg_timeout(self):
3490        from multiprocessing.connection import wait
3491        a, b = multiprocessing.Pipe()
3492        t = time.time()
3493        res = wait([a], timeout=-1)
3494        t = time.time() - t
3495        self.assertEqual(res, [])
3496        self.assertLess(t, 1)
3497        a.close()
3498        b.close()
3499
3500#
3501# Issue 14151: Test invalid family on invalid environment
3502#
3503
3504class TestInvalidFamily(unittest.TestCase):
3505
3506    @unittest.skipIf(WIN32, "skipped on Windows")
3507    def test_invalid_family(self):
3508        with self.assertRaises(ValueError):
3509            multiprocessing.connection.Listener(r'\\.\test')
3510
3511    @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
3512    def test_invalid_family_win32(self):
3513        with self.assertRaises(ValueError):
3514            multiprocessing.connection.Listener('/var/test.pipe')
3515
3516#
3517# Issue 12098: check sys.flags of child matches that for parent
3518#
3519
3520class TestFlags(unittest.TestCase):
3521    @classmethod
3522    def run_in_grandchild(cls, conn):
3523        conn.send(tuple(sys.flags))
3524
3525    @classmethod
3526    def run_in_child(cls):
3527        import json
3528        r, w = multiprocessing.Pipe(duplex=False)
3529        p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
3530        p.start()
3531        grandchild_flags = r.recv()
3532        p.join()
3533        r.close()
3534        w.close()
3535        flags = (tuple(sys.flags), grandchild_flags)
3536        print(json.dumps(flags))
3537
3538    def test_flags(self):
3539        import json, subprocess
3540        # start child process using unusual flags
3541        prog = ('from test._test_multiprocessing import TestFlags; ' +
3542                'TestFlags.run_in_child()')
3543        data = subprocess.check_output(
3544            [sys.executable, '-E', '-S', '-O', '-c', prog])
3545        child_flags, grandchild_flags = json.loads(data.decode('ascii'))
3546        self.assertEqual(child_flags, grandchild_flags)
3547
3548#
3549# Test interaction with socket timeouts - see Issue #6056
3550#
3551
3552class TestTimeouts(unittest.TestCase):
3553    @classmethod
3554    def _test_timeout(cls, child, address):
3555        time.sleep(1)
3556        child.send(123)
3557        child.close()
3558        conn = multiprocessing.connection.Client(address)
3559        conn.send(456)
3560        conn.close()
3561
3562    def test_timeout(self):
3563        old_timeout = socket.getdefaulttimeout()
3564        try:
3565            socket.setdefaulttimeout(0.1)
3566            parent, child = multiprocessing.Pipe(duplex=True)
3567            l = multiprocessing.connection.Listener(family='AF_INET')
3568            p = multiprocessing.Process(target=self._test_timeout,
3569                                        args=(child, l.address))
3570            p.start()
3571            child.close()
3572            self.assertEqual(parent.recv(), 123)
3573            parent.close()
3574            conn = l.accept()
3575            self.assertEqual(conn.recv(), 456)
3576            conn.close()
3577            l.close()
3578            p.join(10)
3579        finally:
3580            socket.setdefaulttimeout(old_timeout)
3581
3582#
3583# Test what happens with no "if __name__ == '__main__'"
3584#
3585
3586class TestNoForkBomb(unittest.TestCase):
3587    def test_noforkbomb(self):
3588        sm = multiprocessing.get_start_method()
3589        name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
3590        if sm != 'fork':
3591            rc, out, err = test.support.script_helper.assert_python_failure(name, sm)
3592            self.assertEqual(out, b'')
3593            self.assertIn(b'RuntimeError', err)
3594        else:
3595            rc, out, err = test.support.script_helper.assert_python_ok(name, sm)
3596            self.assertEqual(out.rstrip(), b'123')
3597            self.assertEqual(err, b'')
3598
3599#
3600# Issue #17555: ForkAwareThreadLock
3601#
3602
3603class TestForkAwareThreadLock(unittest.TestCase):
3604    # We recurisvely start processes.  Issue #17555 meant that the
3605    # after fork registry would get duplicate entries for the same
3606    # lock.  The size of the registry at generation n was ~2**n.
3607
3608    @classmethod
3609    def child(cls, n, conn):
3610        if n > 1:
3611            p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
3612            p.start()
3613            conn.close()
3614            p.join(timeout=5)
3615        else:
3616            conn.send(len(util._afterfork_registry))
3617        conn.close()
3618
3619    def test_lock(self):
3620        r, w = multiprocessing.Pipe(False)
3621        l = util.ForkAwareThreadLock()
3622        old_size = len(util._afterfork_registry)
3623        p = multiprocessing.Process(target=self.child, args=(5, w))
3624        p.start()
3625        w.close()
3626        new_size = r.recv()
3627        p.join(timeout=5)
3628        self.assertLessEqual(new_size, old_size)
3629
3630#
3631# Check that non-forked child processes do not inherit unneeded fds/handles
3632#
3633
3634class TestCloseFds(unittest.TestCase):
3635
3636    def get_high_socket_fd(self):
3637        if WIN32:
3638            # The child process will not have any socket handles, so
3639            # calling socket.fromfd() should produce WSAENOTSOCK even
3640            # if there is a handle of the same number.
3641            return socket.socket().detach()
3642        else:
3643            # We want to produce a socket with an fd high enough that a
3644            # freshly created child process will not have any fds as high.
3645            fd = socket.socket().detach()
3646            to_close = []
3647            while fd < 50:
3648                to_close.append(fd)
3649                fd = os.dup(fd)
3650            for x in to_close:
3651                os.close(x)
3652            return fd
3653
3654    def close(self, fd):
3655        if WIN32:
3656            socket.socket(fileno=fd).close()
3657        else:
3658            os.close(fd)
3659
3660    @classmethod
3661    def _test_closefds(cls, conn, fd):
3662        try:
3663            s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
3664        except Exception as e:
3665            conn.send(e)
3666        else:
3667            s.close()
3668            conn.send(None)
3669
3670    def test_closefd(self):
3671        if not HAS_REDUCTION:
3672            raise unittest.SkipTest('requires fd pickling')
3673
3674        reader, writer = multiprocessing.Pipe()
3675        fd = self.get_high_socket_fd()
3676        try:
3677            p = multiprocessing.Process(target=self._test_closefds,
3678                                        args=(writer, fd))
3679            p.start()
3680            writer.close()
3681            e = reader.recv()
3682            p.join(timeout=5)
3683        finally:
3684            self.close(fd)
3685            writer.close()
3686            reader.close()
3687
3688        if multiprocessing.get_start_method() == 'fork':
3689            self.assertIs(e, None)
3690        else:
3691            WSAENOTSOCK = 10038
3692            self.assertIsInstance(e, OSError)
3693            self.assertTrue(e.errno == errno.EBADF or
3694                            e.winerror == WSAENOTSOCK, e)
3695
3696#
3697# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
3698#
3699
3700class TestIgnoreEINTR(unittest.TestCase):
3701
3702    @classmethod
3703    def _test_ignore(cls, conn):
3704        def handler(signum, frame):
3705            pass
3706        signal.signal(signal.SIGUSR1, handler)
3707        conn.send('ready')
3708        x = conn.recv()
3709        conn.send(x)
3710        conn.send_bytes(b'x'*(1024*1024))   # sending 1 MB should block
3711
3712    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3713    def test_ignore(self):
3714        conn, child_conn = multiprocessing.Pipe()
3715        try:
3716            p = multiprocessing.Process(target=self._test_ignore,
3717                                        args=(child_conn,))
3718            p.daemon = True
3719            p.start()
3720            child_conn.close()
3721            self.assertEqual(conn.recv(), 'ready')
3722            time.sleep(0.1)
3723            os.kill(p.pid, signal.SIGUSR1)
3724            time.sleep(0.1)
3725            conn.send(1234)
3726            self.assertEqual(conn.recv(), 1234)
3727            time.sleep(0.1)
3728            os.kill(p.pid, signal.SIGUSR1)
3729            self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
3730            time.sleep(0.1)
3731            p.join()
3732        finally:
3733            conn.close()
3734
3735    @classmethod
3736    def _test_ignore_listener(cls, conn):
3737        def handler(signum, frame):
3738            pass
3739        signal.signal(signal.SIGUSR1, handler)
3740        with multiprocessing.connection.Listener() as l:
3741            conn.send(l.address)
3742            a = l.accept()
3743            a.send('welcome')
3744
3745    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3746    def test_ignore_listener(self):
3747        conn, child_conn = multiprocessing.Pipe()
3748        try:
3749            p = multiprocessing.Process(target=self._test_ignore_listener,
3750                                        args=(child_conn,))
3751            p.daemon = True
3752            p.start()
3753            child_conn.close()
3754            address = conn.recv()
3755            time.sleep(0.1)
3756            os.kill(p.pid, signal.SIGUSR1)
3757            time.sleep(0.1)
3758            client = multiprocessing.connection.Client(address)
3759            self.assertEqual(client.recv(), 'welcome')
3760            p.join()
3761        finally:
3762            conn.close()
3763
3764class TestStartMethod(unittest.TestCase):
3765    @classmethod
3766    def _check_context(cls, conn):
3767        conn.send(multiprocessing.get_start_method())
3768
3769    def check_context(self, ctx):
3770        r, w = ctx.Pipe(duplex=False)
3771        p = ctx.Process(target=self._check_context, args=(w,))
3772        p.start()
3773        w.close()
3774        child_method = r.recv()
3775        r.close()
3776        p.join()
3777        self.assertEqual(child_method, ctx.get_start_method())
3778
3779    def test_context(self):
3780        for method in ('fork', 'spawn', 'forkserver'):
3781            try:
3782                ctx = multiprocessing.get_context(method)
3783            except ValueError:
3784                continue
3785            self.assertEqual(ctx.get_start_method(), method)
3786            self.assertIs(ctx.get_context(), ctx)
3787            self.assertRaises(ValueError, ctx.set_start_method, 'spawn')
3788            self.assertRaises(ValueError, ctx.set_start_method, None)
3789            self.check_context(ctx)
3790
3791    def test_set_get(self):
3792        multiprocessing.set_forkserver_preload(PRELOAD)
3793        count = 0
3794        old_method = multiprocessing.get_start_method()
3795        try:
3796            for method in ('fork', 'spawn', 'forkserver'):
3797                try:
3798                    multiprocessing.set_start_method(method, force=True)
3799                except ValueError:
3800                    continue
3801                self.assertEqual(multiprocessing.get_start_method(), method)
3802                ctx = multiprocessing.get_context()
3803                self.assertEqual(ctx.get_start_method(), method)
3804                self.assertTrue(type(ctx).__name__.lower().startswith(method))
3805                self.assertTrue(
3806                    ctx.Process.__name__.lower().startswith(method))
3807                self.check_context(multiprocessing)
3808                count += 1
3809        finally:
3810            multiprocessing.set_start_method(old_method, force=True)
3811        self.assertGreaterEqual(count, 1)
3812
3813    def test_get_all(self):
3814        methods = multiprocessing.get_all_start_methods()
3815        if sys.platform == 'win32':
3816            self.assertEqual(methods, ['spawn'])
3817        else:
3818            self.assertTrue(methods == ['fork', 'spawn'] or
3819                            methods == ['fork', 'spawn', 'forkserver'])
3820
3821    def test_preload_resources(self):
3822        if multiprocessing.get_start_method() != 'forkserver':
3823            self.skipTest("test only relevant for 'forkserver' method")
3824        name = os.path.join(os.path.dirname(__file__), 'mp_preload.py')
3825        rc, out, err = test.support.script_helper.assert_python_ok(name)
3826        out = out.decode()
3827        err = err.decode()
3828        if out.rstrip() != 'ok' or err != '':
3829            print(out)
3830            print(err)
3831            self.fail("failed spawning forkserver or grandchild")
3832
3833
3834#
3835# Check that killing process does not leak named semaphores
3836#
3837
3838@unittest.skipIf(sys.platform == "win32",
3839                 "test semantics don't make sense on Windows")
3840class TestSemaphoreTracker(unittest.TestCase):
3841    def test_semaphore_tracker(self):
3842        import subprocess
3843        cmd = '''if 1:
3844            import multiprocessing as mp, time, os
3845            mp.set_start_method("spawn")
3846            lock1 = mp.Lock()
3847            lock2 = mp.Lock()
3848            os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n")
3849            os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n")
3850            time.sleep(10)
3851        '''
3852        r, w = os.pipe()
3853        p = subprocess.Popen([sys.executable,
3854                             '-c', cmd % (w, w)],
3855                             pass_fds=[w],
3856                             stderr=subprocess.PIPE)
3857        os.close(w)
3858        with open(r, 'rb', closefd=True) as f:
3859            name1 = f.readline().rstrip().decode('ascii')
3860            name2 = f.readline().rstrip().decode('ascii')
3861        _multiprocessing.sem_unlink(name1)
3862        p.terminate()
3863        p.wait()
3864        time.sleep(2.0)
3865        with self.assertRaises(OSError) as ctx:
3866            _multiprocessing.sem_unlink(name2)
3867        # docs say it should be ENOENT, but OSX seems to give EINVAL
3868        self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL))
3869        err = p.stderr.read().decode('utf-8')
3870        p.stderr.close()
3871        expected = 'semaphore_tracker: There appear to be 2 leaked semaphores'
3872        self.assertRegex(err, expected)
3873        self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1)
3874
3875#
3876# Mixins
3877#
3878
3879class ProcessesMixin(object):
3880    TYPE = 'processes'
3881    Process = multiprocessing.Process
3882    connection = multiprocessing.connection
3883    current_process = staticmethod(multiprocessing.current_process)
3884    active_children = staticmethod(multiprocessing.active_children)
3885    Pool = staticmethod(multiprocessing.Pool)
3886    Pipe = staticmethod(multiprocessing.Pipe)
3887    Queue = staticmethod(multiprocessing.Queue)
3888    JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
3889    Lock = staticmethod(multiprocessing.Lock)
3890    RLock = staticmethod(multiprocessing.RLock)
3891    Semaphore = staticmethod(multiprocessing.Semaphore)
3892    BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
3893    Condition = staticmethod(multiprocessing.Condition)
3894    Event = staticmethod(multiprocessing.Event)
3895    Barrier = staticmethod(multiprocessing.Barrier)
3896    Value = staticmethod(multiprocessing.Value)
3897    Array = staticmethod(multiprocessing.Array)
3898    RawValue = staticmethod(multiprocessing.RawValue)
3899    RawArray = staticmethod(multiprocessing.RawArray)
3900
3901
3902class ManagerMixin(object):
3903    TYPE = 'manager'
3904    Process = multiprocessing.Process
3905    Queue = property(operator.attrgetter('manager.Queue'))
3906    JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
3907    Lock = property(operator.attrgetter('manager.Lock'))
3908    RLock = property(operator.attrgetter('manager.RLock'))
3909    Semaphore = property(operator.attrgetter('manager.Semaphore'))
3910    BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
3911    Condition = property(operator.attrgetter('manager.Condition'))
3912    Event = property(operator.attrgetter('manager.Event'))
3913    Barrier = property(operator.attrgetter('manager.Barrier'))
3914    Value = property(operator.attrgetter('manager.Value'))
3915    Array = property(operator.attrgetter('manager.Array'))
3916    list = property(operator.attrgetter('manager.list'))
3917    dict = property(operator.attrgetter('manager.dict'))
3918    Namespace = property(operator.attrgetter('manager.Namespace'))
3919
3920    @classmethod
3921    def Pool(cls, *args, **kwds):
3922        return cls.manager.Pool(*args, **kwds)
3923
3924    @classmethod
3925    def setUpClass(cls):
3926        cls.manager = multiprocessing.Manager()
3927
3928    @classmethod
3929    def tearDownClass(cls):
3930        # only the manager process should be returned by active_children()
3931        # but this can take a bit on slow machines, so wait a few seconds
3932        # if there are other children too (see #17395)
3933        t = 0.01
3934        while len(multiprocessing.active_children()) > 1 and t < 5:
3935            time.sleep(t)
3936            t *= 2
3937        gc.collect()                       # do garbage collection
3938        if cls.manager._number_of_objects() != 0:
3939            # This is not really an error since some tests do not
3940            # ensure that all processes which hold a reference to a
3941            # managed object have been joined.
3942            print('Shared objects which still exist at manager shutdown:')
3943            print(cls.manager._debug_info())
3944        cls.manager.shutdown()
3945        cls.manager.join()
3946        cls.manager = None
3947
3948
3949class ThreadsMixin(object):
3950    TYPE = 'threads'
3951    Process = multiprocessing.dummy.Process
3952    connection = multiprocessing.dummy.connection
3953    current_process = staticmethod(multiprocessing.dummy.current_process)
3954    active_children = staticmethod(multiprocessing.dummy.active_children)
3955    Pool = staticmethod(multiprocessing.dummy.Pool)
3956    Pipe = staticmethod(multiprocessing.dummy.Pipe)
3957    Queue = staticmethod(multiprocessing.dummy.Queue)
3958    JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
3959    Lock = staticmethod(multiprocessing.dummy.Lock)
3960    RLock = staticmethod(multiprocessing.dummy.RLock)
3961    Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
3962    BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
3963    Condition = staticmethod(multiprocessing.dummy.Condition)
3964    Event = staticmethod(multiprocessing.dummy.Event)
3965    Barrier = staticmethod(multiprocessing.dummy.Barrier)
3966    Value = staticmethod(multiprocessing.dummy.Value)
3967    Array = staticmethod(multiprocessing.dummy.Array)
3968
3969#
3970# Functions used to create test cases from the base ones in this module
3971#
3972
3973def install_tests_in_module_dict(remote_globs, start_method):
3974    __module__ = remote_globs['__name__']
3975    local_globs = globals()
3976    ALL_TYPES = {'processes', 'threads', 'manager'}
3977
3978    for name, base in local_globs.items():
3979        if not isinstance(base, type):
3980            continue
3981        if issubclass(base, BaseTestCase):
3982            if base is BaseTestCase:
3983                continue
3984            assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES
3985            for type_ in base.ALLOWED_TYPES:
3986                newname = 'With' + type_.capitalize() + name[1:]
3987                Mixin = local_globs[type_.capitalize() + 'Mixin']
3988                class Temp(base, Mixin, unittest.TestCase):
3989                    pass
3990                Temp.__name__ = Temp.__qualname__ = newname
3991                Temp.__module__ = __module__
3992                remote_globs[newname] = Temp
3993        elif issubclass(base, unittest.TestCase):
3994            class Temp(base, object):
3995                pass
3996            Temp.__name__ = Temp.__qualname__ = name
3997            Temp.__module__ = __module__
3998            remote_globs[name] = Temp
3999
4000    dangling = [None, None]
4001    old_start_method = [None]
4002
4003    def setUpModule():
4004        multiprocessing.set_forkserver_preload(PRELOAD)
4005        multiprocessing.process._cleanup()
4006        dangling[0] = multiprocessing.process._dangling.copy()
4007        dangling[1] = threading._dangling.copy()
4008        old_start_method[0] = multiprocessing.get_start_method(allow_none=True)
4009        try:
4010            multiprocessing.set_start_method(start_method, force=True)
4011        except ValueError:
4012            raise unittest.SkipTest(start_method +
4013                                    ' start method not supported')
4014
4015        if sys.platform.startswith("linux"):
4016            try:
4017                lock = multiprocessing.RLock()
4018            except OSError:
4019                raise unittest.SkipTest("OSError raises on RLock creation, "
4020                                        "see issue 3111!")
4021        check_enough_semaphores()
4022        util.get_temp_dir()     # creates temp directory
4023        multiprocessing.get_logger().setLevel(LOG_LEVEL)
4024
4025    def tearDownModule():
4026        multiprocessing.set_start_method(old_start_method[0], force=True)
4027        # pause a bit so we don't get warning about dangling threads/processes
4028        time.sleep(0.5)
4029        multiprocessing.process._cleanup()
4030        gc.collect()
4031        tmp = set(multiprocessing.process._dangling) - set(dangling[0])
4032        if tmp:
4033            print('Dangling processes:', tmp, file=sys.stderr)
4034        del tmp
4035        tmp = set(threading._dangling) - set(dangling[1])
4036        if tmp:
4037            print('Dangling threads:', tmp, file=sys.stderr)
4038
4039    remote_globs['setUpModule'] = setUpModule
4040    remote_globs['tearDownModule'] = tearDownModule
4041