1#!/usr/bin/env python
2
3#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import Queue
9import time
10import sys
11import os
12import gc
13import signal
14import array
15import socket
16import random
17import logging
18import errno
19import test.script_helper
20from test import test_support
21from StringIO import StringIO
22_multiprocessing = test_support.import_module('_multiprocessing')
23# import threading after _multiprocessing to raise a more relevant error
24# message: "No module named _multiprocessing". _multiprocessing is not compiled
25# without thread support.
26import threading
27
28# Work around broken sem_open implementations
29test_support.import_module('multiprocessing.synchronize')
30
31import multiprocessing.dummy
32import multiprocessing.connection
33import multiprocessing.managers
34import multiprocessing.heap
35import multiprocessing.pool
36
37from multiprocessing import util
38
39try:
40    from multiprocessing import reduction
41    HAS_REDUCTION = True
42except ImportError:
43    HAS_REDUCTION = False
44
45try:
46    from multiprocessing.sharedctypes import Value, copy
47    HAS_SHAREDCTYPES = True
48except ImportError:
49    HAS_SHAREDCTYPES = False
50
51try:
52    import msvcrt
53except ImportError:
54    msvcrt = None
55
56#
57#
58#
59
60latin = str
61
62#
63# Constants
64#
65
66LOG_LEVEL = util.SUBWARNING
67#LOG_LEVEL = logging.DEBUG
68
69DELTA = 0.1
70CHECK_TIMINGS = False     # making true makes tests take a lot longer
71                          # and can sometimes cause some non-serious
72                          # failures because some calls block a bit
73                          # longer than expected
74if CHECK_TIMINGS:
75    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
76else:
77    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
78
79HAVE_GETVALUE = not getattr(_multiprocessing,
80                            'HAVE_BROKEN_SEM_GETVALUE', False)
81
82WIN32 = (sys.platform == "win32")
83
84try:
85    MAXFD = os.sysconf("SC_OPEN_MAX")
86except:
87    MAXFD = 256
88
89#
90# Some tests require ctypes
91#
92
93try:
94    from ctypes import Structure, c_int, c_double
95except ImportError:
96    Structure = object
97    c_int = c_double = None
98
99
100def check_enough_semaphores():
101    """Check that the system supports enough semaphores to run the test."""
102    # minimum number of semaphores available according to POSIX
103    nsems_min = 256
104    try:
105        nsems = os.sysconf("SC_SEM_NSEMS_MAX")
106    except (AttributeError, ValueError):
107        # sysconf not available or setting not available
108        return
109    if nsems == -1 or nsems >= nsems_min:
110        return
111    raise unittest.SkipTest("The OS doesn't support enough semaphores "
112                            "to run the test (required: %d)." % nsems_min)
113
114
115#
116# Creates a wrapper for a function which records the time it takes to finish
117#
118
119class TimingWrapper(object):
120
121    def __init__(self, func):
122        self.func = func
123        self.elapsed = None
124
125    def __call__(self, *args, **kwds):
126        t = time.time()
127        try:
128            return self.func(*args, **kwds)
129        finally:
130            self.elapsed = time.time() - t
131
132#
133# Base class for test cases
134#
135
136class BaseTestCase(object):
137
138    ALLOWED_TYPES = ('processes', 'manager', 'threads')
139
140    def assertTimingAlmostEqual(self, a, b):
141        if CHECK_TIMINGS:
142            self.assertAlmostEqual(a, b, 1)
143
144    def assertReturnsIfImplemented(self, value, func, *args):
145        try:
146            res = func(*args)
147        except NotImplementedError:
148            pass
149        else:
150            return self.assertEqual(value, res)
151
152    # For the sanity of Windows users, rather than crashing or freezing in
153    # multiple ways.
154    def __reduce__(self, *args):
155        raise NotImplementedError("shouldn't try to pickle a test case")
156
157    __reduce_ex__ = __reduce__
158
159#
160# Return the value of a semaphore
161#
162
163def get_value(self):
164    try:
165        return self.get_value()
166    except AttributeError:
167        try:
168            return self._Semaphore__value
169        except AttributeError:
170            try:
171                return self._value
172            except AttributeError:
173                raise NotImplementedError
174
175#
176# Testcases
177#
178
179class _TestProcess(BaseTestCase):
180
181    ALLOWED_TYPES = ('processes', 'threads')
182
183    def test_current(self):
184        if self.TYPE == 'threads':
185            return
186
187        current = self.current_process()
188        authkey = current.authkey
189
190        self.assertTrue(current.is_alive())
191        self.assertTrue(not current.daemon)
192        self.assertIsInstance(authkey, bytes)
193        self.assertTrue(len(authkey) > 0)
194        self.assertEqual(current.ident, os.getpid())
195        self.assertEqual(current.exitcode, None)
196
197    @classmethod
198    def _test(cls, q, *args, **kwds):
199        current = cls.current_process()
200        q.put(args)
201        q.put(kwds)
202        q.put(current.name)
203        if cls.TYPE != 'threads':
204            q.put(bytes(current.authkey))
205            q.put(current.pid)
206
207    def test_process(self):
208        q = self.Queue(1)
209        e = self.Event()
210        args = (q, 1, 2)
211        kwargs = {'hello':23, 'bye':2.54}
212        name = 'SomeProcess'
213        p = self.Process(
214            target=self._test, args=args, kwargs=kwargs, name=name
215            )
216        p.daemon = True
217        current = self.current_process()
218
219        if self.TYPE != 'threads':
220            self.assertEqual(p.authkey, current.authkey)
221        self.assertEqual(p.is_alive(), False)
222        self.assertEqual(p.daemon, True)
223        self.assertNotIn(p, self.active_children())
224        self.assertTrue(type(self.active_children()) is list)
225        self.assertEqual(p.exitcode, None)
226
227        p.start()
228
229        self.assertEqual(p.exitcode, None)
230        self.assertEqual(p.is_alive(), True)
231        self.assertIn(p, self.active_children())
232
233        self.assertEqual(q.get(), args[1:])
234        self.assertEqual(q.get(), kwargs)
235        self.assertEqual(q.get(), p.name)
236        if self.TYPE != 'threads':
237            self.assertEqual(q.get(), current.authkey)
238            self.assertEqual(q.get(), p.pid)
239
240        p.join()
241
242        self.assertEqual(p.exitcode, 0)
243        self.assertEqual(p.is_alive(), False)
244        self.assertNotIn(p, self.active_children())
245
246    @classmethod
247    def _test_terminate(cls):
248        time.sleep(1000)
249
250    def test_terminate(self):
251        if self.TYPE == 'threads':
252            return
253
254        p = self.Process(target=self._test_terminate)
255        p.daemon = True
256        p.start()
257
258        self.assertEqual(p.is_alive(), True)
259        self.assertIn(p, self.active_children())
260        self.assertEqual(p.exitcode, None)
261
262        p.terminate()
263
264        join = TimingWrapper(p.join)
265        self.assertEqual(join(), None)
266        self.assertTimingAlmostEqual(join.elapsed, 0.0)
267
268        self.assertEqual(p.is_alive(), False)
269        self.assertNotIn(p, self.active_children())
270
271        p.join()
272
273        # XXX sometimes get p.exitcode == 0 on Windows ...
274        #self.assertEqual(p.exitcode, -signal.SIGTERM)
275
276    def test_cpu_count(self):
277        try:
278            cpus = multiprocessing.cpu_count()
279        except NotImplementedError:
280            cpus = 1
281        self.assertTrue(type(cpus) is int)
282        self.assertTrue(cpus >= 1)
283
284    def test_active_children(self):
285        self.assertEqual(type(self.active_children()), list)
286
287        p = self.Process(target=time.sleep, args=(DELTA,))
288        self.assertNotIn(p, self.active_children())
289
290        p.daemon = True
291        p.start()
292        self.assertIn(p, self.active_children())
293
294        p.join()
295        self.assertNotIn(p, self.active_children())
296
297    @classmethod
298    def _test_recursion(cls, wconn, id):
299        from multiprocessing import forking
300        wconn.send(id)
301        if len(id) < 2:
302            for i in range(2):
303                p = cls.Process(
304                    target=cls._test_recursion, args=(wconn, id+[i])
305                    )
306                p.start()
307                p.join()
308
309    def test_recursion(self):
310        rconn, wconn = self.Pipe(duplex=False)
311        self._test_recursion(wconn, [])
312
313        time.sleep(DELTA)
314        result = []
315        while rconn.poll():
316            result.append(rconn.recv())
317
318        expected = [
319            [],
320              [0],
321                [0, 0],
322                [0, 1],
323              [1],
324                [1, 0],
325                [1, 1]
326            ]
327        self.assertEqual(result, expected)
328
329    @classmethod
330    def _test_sys_exit(cls, reason, testfn):
331        sys.stderr = open(testfn, 'w')
332        sys.exit(reason)
333
334    def test_sys_exit(self):
335        # See Issue 13854
336        if self.TYPE == 'threads':
337            return
338
339        testfn = test_support.TESTFN
340        self.addCleanup(test_support.unlink, testfn)
341
342        for reason, code in (([1, 2, 3], 1), ('ignore this', 0)):
343            p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
344            p.daemon = True
345            p.start()
346            p.join(5)
347            self.assertEqual(p.exitcode, code)
348
349            with open(testfn, 'r') as f:
350                self.assertEqual(f.read().rstrip(), str(reason))
351
352        for reason in (True, False, 8):
353            p = self.Process(target=sys.exit, args=(reason,))
354            p.daemon = True
355            p.start()
356            p.join(5)
357            self.assertEqual(p.exitcode, reason)
358
359#
360#
361#
362
363class _UpperCaser(multiprocessing.Process):
364
365    def __init__(self):
366        multiprocessing.Process.__init__(self)
367        self.child_conn, self.parent_conn = multiprocessing.Pipe()
368
369    def run(self):
370        self.parent_conn.close()
371        for s in iter(self.child_conn.recv, None):
372            self.child_conn.send(s.upper())
373        self.child_conn.close()
374
375    def submit(self, s):
376        assert type(s) is str
377        self.parent_conn.send(s)
378        return self.parent_conn.recv()
379
380    def stop(self):
381        self.parent_conn.send(None)
382        self.parent_conn.close()
383        self.child_conn.close()
384
385class _TestSubclassingProcess(BaseTestCase):
386
387    ALLOWED_TYPES = ('processes',)
388
389    def test_subclassing(self):
390        uppercaser = _UpperCaser()
391        uppercaser.daemon = True
392        uppercaser.start()
393        self.assertEqual(uppercaser.submit('hello'), 'HELLO')
394        self.assertEqual(uppercaser.submit('world'), 'WORLD')
395        uppercaser.stop()
396        uppercaser.join()
397
398#
399#
400#
401
402def queue_empty(q):
403    if hasattr(q, 'empty'):
404        return q.empty()
405    else:
406        return q.qsize() == 0
407
408def queue_full(q, maxsize):
409    if hasattr(q, 'full'):
410        return q.full()
411    else:
412        return q.qsize() == maxsize
413
414
415class _TestQueue(BaseTestCase):
416
417
418    @classmethod
419    def _test_put(cls, queue, child_can_start, parent_can_continue):
420        child_can_start.wait()
421        for i in range(6):
422            queue.get()
423        parent_can_continue.set()
424
425    def test_put(self):
426        MAXSIZE = 6
427        queue = self.Queue(maxsize=MAXSIZE)
428        child_can_start = self.Event()
429        parent_can_continue = self.Event()
430
431        proc = self.Process(
432            target=self._test_put,
433            args=(queue, child_can_start, parent_can_continue)
434            )
435        proc.daemon = True
436        proc.start()
437
438        self.assertEqual(queue_empty(queue), True)
439        self.assertEqual(queue_full(queue, MAXSIZE), False)
440
441        queue.put(1)
442        queue.put(2, True)
443        queue.put(3, True, None)
444        queue.put(4, False)
445        queue.put(5, False, None)
446        queue.put_nowait(6)
447
448        # the values may be in buffer but not yet in pipe so sleep a bit
449        time.sleep(DELTA)
450
451        self.assertEqual(queue_empty(queue), False)
452        self.assertEqual(queue_full(queue, MAXSIZE), True)
453
454        put = TimingWrapper(queue.put)
455        put_nowait = TimingWrapper(queue.put_nowait)
456
457        self.assertRaises(Queue.Full, put, 7, False)
458        self.assertTimingAlmostEqual(put.elapsed, 0)
459
460        self.assertRaises(Queue.Full, put, 7, False, None)
461        self.assertTimingAlmostEqual(put.elapsed, 0)
462
463        self.assertRaises(Queue.Full, put_nowait, 7)
464        self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
465
466        self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
467        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
468
469        self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
470        self.assertTimingAlmostEqual(put.elapsed, 0)
471
472        self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
473        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
474
475        child_can_start.set()
476        parent_can_continue.wait()
477
478        self.assertEqual(queue_empty(queue), True)
479        self.assertEqual(queue_full(queue, MAXSIZE), False)
480
481        proc.join()
482
483    @classmethod
484    def _test_get(cls, queue, child_can_start, parent_can_continue):
485        child_can_start.wait()
486        #queue.put(1)
487        queue.put(2)
488        queue.put(3)
489        queue.put(4)
490        queue.put(5)
491        parent_can_continue.set()
492
493    def test_get(self):
494        queue = self.Queue()
495        child_can_start = self.Event()
496        parent_can_continue = self.Event()
497
498        proc = self.Process(
499            target=self._test_get,
500            args=(queue, child_can_start, parent_can_continue)
501            )
502        proc.daemon = True
503        proc.start()
504
505        self.assertEqual(queue_empty(queue), True)
506
507        child_can_start.set()
508        parent_can_continue.wait()
509
510        time.sleep(DELTA)
511        self.assertEqual(queue_empty(queue), False)
512
513        # Hangs unexpectedly, remove for now
514        #self.assertEqual(queue.get(), 1)
515        self.assertEqual(queue.get(True, None), 2)
516        self.assertEqual(queue.get(True), 3)
517        self.assertEqual(queue.get(timeout=1), 4)
518        self.assertEqual(queue.get_nowait(), 5)
519
520        self.assertEqual(queue_empty(queue), True)
521
522        get = TimingWrapper(queue.get)
523        get_nowait = TimingWrapper(queue.get_nowait)
524
525        self.assertRaises(Queue.Empty, get, False)
526        self.assertTimingAlmostEqual(get.elapsed, 0)
527
528        self.assertRaises(Queue.Empty, get, False, None)
529        self.assertTimingAlmostEqual(get.elapsed, 0)
530
531        self.assertRaises(Queue.Empty, get_nowait)
532        self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
533
534        self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
535        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
536
537        self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
538        self.assertTimingAlmostEqual(get.elapsed, 0)
539
540        self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
541        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
542
543        proc.join()
544
545    @classmethod
546    def _test_fork(cls, queue):
547        for i in range(10, 20):
548            queue.put(i)
549        # note that at this point the items may only be buffered, so the
550        # process cannot shutdown until the feeder thread has finished
551        # pushing items onto the pipe.
552
553    def test_fork(self):
554        # Old versions of Queue would fail to create a new feeder
555        # thread for a forked process if the original process had its
556        # own feeder thread.  This test checks that this no longer
557        # happens.
558
559        queue = self.Queue()
560
561        # put items on queue so that main process starts a feeder thread
562        for i in range(10):
563            queue.put(i)
564
565        # wait to make sure thread starts before we fork a new process
566        time.sleep(DELTA)
567
568        # fork process
569        p = self.Process(target=self._test_fork, args=(queue,))
570        p.daemon = True
571        p.start()
572
573        # check that all expected items are in the queue
574        for i in range(20):
575            self.assertEqual(queue.get(), i)
576        self.assertRaises(Queue.Empty, queue.get, False)
577
578        p.join()
579
580    def test_qsize(self):
581        q = self.Queue()
582        try:
583            self.assertEqual(q.qsize(), 0)
584        except NotImplementedError:
585            return
586        q.put(1)
587        self.assertEqual(q.qsize(), 1)
588        q.put(5)
589        self.assertEqual(q.qsize(), 2)
590        q.get()
591        self.assertEqual(q.qsize(), 1)
592        q.get()
593        self.assertEqual(q.qsize(), 0)
594
595    @classmethod
596    def _test_task_done(cls, q):
597        for obj in iter(q.get, None):
598            time.sleep(DELTA)
599            q.task_done()
600
601    def test_task_done(self):
602        queue = self.JoinableQueue()
603
604        if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
605            self.skipTest("requires 'queue.task_done()' method")
606
607        workers = [self.Process(target=self._test_task_done, args=(queue,))
608                   for i in xrange(4)]
609
610        for p in workers:
611            p.daemon = True
612            p.start()
613
614        for i in xrange(10):
615            queue.put(i)
616
617        queue.join()
618
619        for p in workers:
620            queue.put(None)
621
622        for p in workers:
623            p.join()
624
625#
626#
627#
628
629class _TestLock(BaseTestCase):
630
631    def test_lock(self):
632        lock = self.Lock()
633        self.assertEqual(lock.acquire(), True)
634        self.assertEqual(lock.acquire(False), False)
635        self.assertEqual(lock.release(), None)
636        self.assertRaises((ValueError, threading.ThreadError), lock.release)
637
638    def test_rlock(self):
639        lock = self.RLock()
640        self.assertEqual(lock.acquire(), True)
641        self.assertEqual(lock.acquire(), True)
642        self.assertEqual(lock.acquire(), True)
643        self.assertEqual(lock.release(), None)
644        self.assertEqual(lock.release(), None)
645        self.assertEqual(lock.release(), None)
646        self.assertRaises((AssertionError, RuntimeError), lock.release)
647
648    def test_lock_context(self):
649        with self.Lock():
650            pass
651
652
653class _TestSemaphore(BaseTestCase):
654
655    def _test_semaphore(self, sem):
656        self.assertReturnsIfImplemented(2, get_value, sem)
657        self.assertEqual(sem.acquire(), True)
658        self.assertReturnsIfImplemented(1, get_value, sem)
659        self.assertEqual(sem.acquire(), True)
660        self.assertReturnsIfImplemented(0, get_value, sem)
661        self.assertEqual(sem.acquire(False), False)
662        self.assertReturnsIfImplemented(0, get_value, sem)
663        self.assertEqual(sem.release(), None)
664        self.assertReturnsIfImplemented(1, get_value, sem)
665        self.assertEqual(sem.release(), None)
666        self.assertReturnsIfImplemented(2, get_value, sem)
667
668    def test_semaphore(self):
669        sem = self.Semaphore(2)
670        self._test_semaphore(sem)
671        self.assertEqual(sem.release(), None)
672        self.assertReturnsIfImplemented(3, get_value, sem)
673        self.assertEqual(sem.release(), None)
674        self.assertReturnsIfImplemented(4, get_value, sem)
675
676    def test_bounded_semaphore(self):
677        sem = self.BoundedSemaphore(2)
678        self._test_semaphore(sem)
679        # Currently fails on OS/X
680        #if HAVE_GETVALUE:
681        #    self.assertRaises(ValueError, sem.release)
682        #    self.assertReturnsIfImplemented(2, get_value, sem)
683
684    def test_timeout(self):
685        if self.TYPE != 'processes':
686            return
687
688        sem = self.Semaphore(0)
689        acquire = TimingWrapper(sem.acquire)
690
691        self.assertEqual(acquire(False), False)
692        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
693
694        self.assertEqual(acquire(False, None), False)
695        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
696
697        self.assertEqual(acquire(False, TIMEOUT1), False)
698        self.assertTimingAlmostEqual(acquire.elapsed, 0)
699
700        self.assertEqual(acquire(True, TIMEOUT2), False)
701        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
702
703        self.assertEqual(acquire(timeout=TIMEOUT3), False)
704        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
705
706
707class _TestCondition(BaseTestCase):
708
709    @classmethod
710    def f(cls, cond, sleeping, woken, timeout=None):
711        cond.acquire()
712        sleeping.release()
713        cond.wait(timeout)
714        woken.release()
715        cond.release()
716
717    def check_invariant(self, cond):
718        # this is only supposed to succeed when there are no sleepers
719        if self.TYPE == 'processes':
720            try:
721                sleepers = (cond._sleeping_count.get_value() -
722                            cond._woken_count.get_value())
723                self.assertEqual(sleepers, 0)
724                self.assertEqual(cond._wait_semaphore.get_value(), 0)
725            except NotImplementedError:
726                pass
727
728    def test_notify(self):
729        cond = self.Condition()
730        sleeping = self.Semaphore(0)
731        woken = self.Semaphore(0)
732
733        p = self.Process(target=self.f, args=(cond, sleeping, woken))
734        p.daemon = True
735        p.start()
736
737        p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
738        p.daemon = True
739        p.start()
740
741        # wait for both children to start sleeping
742        sleeping.acquire()
743        sleeping.acquire()
744
745        # check no process/thread has woken up
746        time.sleep(DELTA)
747        self.assertReturnsIfImplemented(0, get_value, woken)
748
749        # wake up one process/thread
750        cond.acquire()
751        cond.notify()
752        cond.release()
753
754        # check one process/thread has woken up
755        time.sleep(DELTA)
756        self.assertReturnsIfImplemented(1, get_value, woken)
757
758        # wake up another
759        cond.acquire()
760        cond.notify()
761        cond.release()
762
763        # check other has woken up
764        time.sleep(DELTA)
765        self.assertReturnsIfImplemented(2, get_value, woken)
766
767        # check state is not mucked up
768        self.check_invariant(cond)
769        p.join()
770
771    def test_notify_all(self):
772        cond = self.Condition()
773        sleeping = self.Semaphore(0)
774        woken = self.Semaphore(0)
775
776        # start some threads/processes which will timeout
777        for i in range(3):
778            p = self.Process(target=self.f,
779                             args=(cond, sleeping, woken, TIMEOUT1))
780            p.daemon = True
781            p.start()
782
783            t = threading.Thread(target=self.f,
784                                 args=(cond, sleeping, woken, TIMEOUT1))
785            t.daemon = True
786            t.start()
787
788        # wait for them all to sleep
789        for i in xrange(6):
790            sleeping.acquire()
791
792        # check they have all timed out
793        for i in xrange(6):
794            woken.acquire()
795        self.assertReturnsIfImplemented(0, get_value, woken)
796
797        # check state is not mucked up
798        self.check_invariant(cond)
799
800        # start some more threads/processes
801        for i in range(3):
802            p = self.Process(target=self.f, args=(cond, sleeping, woken))
803            p.daemon = True
804            p.start()
805
806            t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
807            t.daemon = True
808            t.start()
809
810        # wait for them to all sleep
811        for i in xrange(6):
812            sleeping.acquire()
813
814        # check no process/thread has woken up
815        time.sleep(DELTA)
816        self.assertReturnsIfImplemented(0, get_value, woken)
817
818        # wake them all up
819        cond.acquire()
820        cond.notify_all()
821        cond.release()
822
823        # check they have all woken
824        time.sleep(DELTA)
825        self.assertReturnsIfImplemented(6, get_value, woken)
826
827        # check state is not mucked up
828        self.check_invariant(cond)
829
830    def test_timeout(self):
831        cond = self.Condition()
832        wait = TimingWrapper(cond.wait)
833        cond.acquire()
834        res = wait(TIMEOUT1)
835        cond.release()
836        self.assertEqual(res, None)
837        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
838
839
840class _TestEvent(BaseTestCase):
841
842    @classmethod
843    def _test_event(cls, event):
844        time.sleep(TIMEOUT2)
845        event.set()
846
847    def test_event(self):
848        event = self.Event()
849        wait = TimingWrapper(event.wait)
850
851        # Removed temporarily, due to API shear, this does not
852        # work with threading._Event objects. is_set == isSet
853        self.assertEqual(event.is_set(), False)
854
855        # Removed, threading.Event.wait() will return the value of the __flag
856        # instead of None. API Shear with the semaphore backed mp.Event
857        self.assertEqual(wait(0.0), False)
858        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
859        self.assertEqual(wait(TIMEOUT1), False)
860        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
861
862        event.set()
863
864        # See note above on the API differences
865        self.assertEqual(event.is_set(), True)
866        self.assertEqual(wait(), True)
867        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
868        self.assertEqual(wait(TIMEOUT1), True)
869        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
870        # self.assertEqual(event.is_set(), True)
871
872        event.clear()
873
874        #self.assertEqual(event.is_set(), False)
875
876        p = self.Process(target=self._test_event, args=(event,))
877        p.daemon = True
878        p.start()
879        self.assertEqual(wait(), True)
880
881#
882#
883#
884
885class _TestValue(BaseTestCase):
886
887    ALLOWED_TYPES = ('processes',)
888
889    codes_values = [
890        ('i', 4343, 24234),
891        ('d', 3.625, -4.25),
892        ('h', -232, 234),
893        ('c', latin('x'), latin('y'))
894        ]
895
896    def setUp(self):
897        if not HAS_SHAREDCTYPES:
898            self.skipTest("requires multiprocessing.sharedctypes")
899
900    @classmethod
901    def _test(cls, values):
902        for sv, cv in zip(values, cls.codes_values):
903            sv.value = cv[2]
904
905
906    def test_value(self, raw=False):
907        if raw:
908            values = [self.RawValue(code, value)
909                      for code, value, _ in self.codes_values]
910        else:
911            values = [self.Value(code, value)
912                      for code, value, _ in self.codes_values]
913
914        for sv, cv in zip(values, self.codes_values):
915            self.assertEqual(sv.value, cv[1])
916
917        proc = self.Process(target=self._test, args=(values,))
918        proc.daemon = True
919        proc.start()
920        proc.join()
921
922        for sv, cv in zip(values, self.codes_values):
923            self.assertEqual(sv.value, cv[2])
924
925    def test_rawvalue(self):
926        self.test_value(raw=True)
927
928    def test_getobj_getlock(self):
929        val1 = self.Value('i', 5)
930        lock1 = val1.get_lock()
931        obj1 = val1.get_obj()
932
933        val2 = self.Value('i', 5, lock=None)
934        lock2 = val2.get_lock()
935        obj2 = val2.get_obj()
936
937        lock = self.Lock()
938        val3 = self.Value('i', 5, lock=lock)
939        lock3 = val3.get_lock()
940        obj3 = val3.get_obj()
941        self.assertEqual(lock, lock3)
942
943        arr4 = self.Value('i', 5, lock=False)
944        self.assertFalse(hasattr(arr4, 'get_lock'))
945        self.assertFalse(hasattr(arr4, 'get_obj'))
946
947        self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
948
949        arr5 = self.RawValue('i', 5)
950        self.assertFalse(hasattr(arr5, 'get_lock'))
951        self.assertFalse(hasattr(arr5, 'get_obj'))
952
953
954class _TestArray(BaseTestCase):
955
956    ALLOWED_TYPES = ('processes',)
957
958    @classmethod
959    def f(cls, seq):
960        for i in range(1, len(seq)):
961            seq[i] += seq[i-1]
962
963    @unittest.skipIf(c_int is None, "requires _ctypes")
964    def test_array(self, raw=False):
965        seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
966        if raw:
967            arr = self.RawArray('i', seq)
968        else:
969            arr = self.Array('i', seq)
970
971        self.assertEqual(len(arr), len(seq))
972        self.assertEqual(arr[3], seq[3])
973        self.assertEqual(list(arr[2:7]), list(seq[2:7]))
974
975        arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
976
977        self.assertEqual(list(arr[:]), seq)
978
979        self.f(seq)
980
981        p = self.Process(target=self.f, args=(arr,))
982        p.daemon = True
983        p.start()
984        p.join()
985
986        self.assertEqual(list(arr[:]), seq)
987
988    @unittest.skipIf(c_int is None, "requires _ctypes")
989    def test_array_from_size(self):
990        size = 10
991        # Test for zeroing (see issue #11675).
992        # The repetition below strengthens the test by increasing the chances
993        # of previously allocated non-zero memory being used for the new array
994        # on the 2nd and 3rd loops.
995        for _ in range(3):
996            arr = self.Array('i', size)
997            self.assertEqual(len(arr), size)
998            self.assertEqual(list(arr), [0] * size)
999            arr[:] = range(10)
1000            self.assertEqual(list(arr), range(10))
1001            del arr
1002
1003    @unittest.skipIf(c_int is None, "requires _ctypes")
1004    def test_rawarray(self):
1005        self.test_array(raw=True)
1006
1007    @unittest.skipIf(c_int is None, "requires _ctypes")
1008    def test_array_accepts_long(self):
1009        arr = self.Array('i', 10L)
1010        self.assertEqual(len(arr), 10)
1011        raw_arr = self.RawArray('i', 10L)
1012        self.assertEqual(len(raw_arr), 10)
1013
1014    @unittest.skipIf(c_int is None, "requires _ctypes")
1015    def test_getobj_getlock_obj(self):
1016        arr1 = self.Array('i', range(10))
1017        lock1 = arr1.get_lock()
1018        obj1 = arr1.get_obj()
1019
1020        arr2 = self.Array('i', range(10), lock=None)
1021        lock2 = arr2.get_lock()
1022        obj2 = arr2.get_obj()
1023
1024        lock = self.Lock()
1025        arr3 = self.Array('i', range(10), lock=lock)
1026        lock3 = arr3.get_lock()
1027        obj3 = arr3.get_obj()
1028        self.assertEqual(lock, lock3)
1029
1030        arr4 = self.Array('i', range(10), lock=False)
1031        self.assertFalse(hasattr(arr4, 'get_lock'))
1032        self.assertFalse(hasattr(arr4, 'get_obj'))
1033        self.assertRaises(AttributeError,
1034                          self.Array, 'i', range(10), lock='notalock')
1035
1036        arr5 = self.RawArray('i', range(10))
1037        self.assertFalse(hasattr(arr5, 'get_lock'))
1038        self.assertFalse(hasattr(arr5, 'get_obj'))
1039
1040#
1041#
1042#
1043
1044class _TestContainers(BaseTestCase):
1045
1046    ALLOWED_TYPES = ('manager',)
1047
1048    def test_list(self):
1049        a = self.list(range(10))
1050        self.assertEqual(a[:], range(10))
1051
1052        b = self.list()
1053        self.assertEqual(b[:], [])
1054
1055        b.extend(range(5))
1056        self.assertEqual(b[:], range(5))
1057
1058        self.assertEqual(b[2], 2)
1059        self.assertEqual(b[2:10], [2,3,4])
1060
1061        b *= 2
1062        self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1063
1064        self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1065
1066        self.assertEqual(a[:], range(10))
1067
1068        d = [a, b]
1069        e = self.list(d)
1070        self.assertEqual(
1071            e[:],
1072            [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1073            )
1074
1075        f = self.list([a])
1076        a.append('hello')
1077        self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1078
1079    def test_dict(self):
1080        d = self.dict()
1081        indices = range(65, 70)
1082        for i in indices:
1083            d[i] = chr(i)
1084        self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1085        self.assertEqual(sorted(d.keys()), indices)
1086        self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1087        self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1088
1089    def test_namespace(self):
1090        n = self.Namespace()
1091        n.name = 'Bob'
1092        n.job = 'Builder'
1093        n._hidden = 'hidden'
1094        self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1095        del n.job
1096        self.assertEqual(str(n), "Namespace(name='Bob')")
1097        self.assertTrue(hasattr(n, 'name'))
1098        self.assertTrue(not hasattr(n, 'job'))
1099
1100#
1101#
1102#
1103
1104def sqr(x, wait=0.0):
1105    time.sleep(wait)
1106    return x*x
1107class _TestPool(BaseTestCase):
1108
1109    def test_apply(self):
1110        papply = self.pool.apply
1111        self.assertEqual(papply(sqr, (5,)), sqr(5))
1112        self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1113
1114    def test_map(self):
1115        pmap = self.pool.map
1116        self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1117        self.assertEqual(pmap(sqr, range(100), chunksize=20),
1118                         map(sqr, range(100)))
1119
1120    def test_map_chunksize(self):
1121        try:
1122            self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1123        except multiprocessing.TimeoutError:
1124            self.fail("pool.map_async with chunksize stalled on null list")
1125
1126    def test_async(self):
1127        res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1128        get = TimingWrapper(res.get)
1129        self.assertEqual(get(), 49)
1130        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1131
1132    def test_async_timeout(self):
1133        res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1134        get = TimingWrapper(res.get)
1135        self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1136        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1137
1138    def test_imap(self):
1139        it = self.pool.imap(sqr, range(10))
1140        self.assertEqual(list(it), map(sqr, range(10)))
1141
1142        it = self.pool.imap(sqr, range(10))
1143        for i in range(10):
1144            self.assertEqual(it.next(), i*i)
1145        self.assertRaises(StopIteration, it.next)
1146
1147        it = self.pool.imap(sqr, range(1000), chunksize=100)
1148        for i in range(1000):
1149            self.assertEqual(it.next(), i*i)
1150        self.assertRaises(StopIteration, it.next)
1151
1152    def test_imap_unordered(self):
1153        it = self.pool.imap_unordered(sqr, range(1000))
1154        self.assertEqual(sorted(it), map(sqr, range(1000)))
1155
1156        it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1157        self.assertEqual(sorted(it), map(sqr, range(1000)))
1158
1159    def test_make_pool(self):
1160        self.assertRaises(ValueError, multiprocessing.Pool, -1)
1161        self.assertRaises(ValueError, multiprocessing.Pool, 0)
1162
1163        p = multiprocessing.Pool(3)
1164        self.assertEqual(3, len(p._pool))
1165        p.close()
1166        p.join()
1167
1168    def test_terminate(self):
1169        if self.TYPE == 'manager':
1170            # On Unix a forked process increfs each shared object to
1171            # which its parent process held a reference.  If the
1172            # forked process gets terminated then there is likely to
1173            # be a reference leak.  So to prevent
1174            # _TestZZZNumberOfObjects from failing we skip this test
1175            # when using a manager.
1176            return
1177
1178        result = self.pool.map_async(
1179            time.sleep, [0.1 for i in range(10000)], chunksize=1
1180            )
1181        self.pool.terminate()
1182        join = TimingWrapper(self.pool.join)
1183        join()
1184        self.assertTrue(join.elapsed < 0.2)
1185
1186    def test_empty_iterable(self):
1187        # See Issue 12157
1188        p = self.Pool(1)
1189
1190        self.assertEqual(p.map(sqr, []), [])
1191        self.assertEqual(list(p.imap(sqr, [])), [])
1192        self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1193        self.assertEqual(p.map_async(sqr, []).get(), [])
1194
1195        p.close()
1196        p.join()
1197
1198def unpickleable_result():
1199    return lambda: 42
1200
1201class _TestPoolWorkerErrors(BaseTestCase):
1202    ALLOWED_TYPES = ('processes', )
1203
1204    def test_unpickleable_result(self):
1205        from multiprocessing.pool import MaybeEncodingError
1206        p = multiprocessing.Pool(2)
1207
1208        # Make sure we don't lose pool processes because of encoding errors.
1209        for iteration in range(20):
1210            res = p.apply_async(unpickleable_result)
1211            self.assertRaises(MaybeEncodingError, res.get)
1212
1213        p.close()
1214        p.join()
1215
1216class _TestPoolWorkerLifetime(BaseTestCase):
1217
1218    ALLOWED_TYPES = ('processes', )
1219    def test_pool_worker_lifetime(self):
1220        p = multiprocessing.Pool(3, maxtasksperchild=10)
1221        self.assertEqual(3, len(p._pool))
1222        origworkerpids = [w.pid for w in p._pool]
1223        # Run many tasks so each worker gets replaced (hopefully)
1224        results = []
1225        for i in range(100):
1226            results.append(p.apply_async(sqr, (i, )))
1227        # Fetch the results and verify we got the right answers,
1228        # also ensuring all the tasks have completed.
1229        for (j, res) in enumerate(results):
1230            self.assertEqual(res.get(), sqr(j))
1231        # Refill the pool
1232        p._repopulate_pool()
1233        # Wait until all workers are alive
1234        # (countdown * DELTA = 5 seconds max startup process time)
1235        countdown = 50
1236        while countdown and not all(w.is_alive() for w in p._pool):
1237            countdown -= 1
1238            time.sleep(DELTA)
1239        finalworkerpids = [w.pid for w in p._pool]
1240        # All pids should be assigned.  See issue #7805.
1241        self.assertNotIn(None, origworkerpids)
1242        self.assertNotIn(None, finalworkerpids)
1243        # Finally, check that the worker pids have changed
1244        self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1245        p.close()
1246        p.join()
1247
1248    def test_pool_worker_lifetime_early_close(self):
1249        # Issue #10332: closing a pool whose workers have limited lifetimes
1250        # before all the tasks completed would make join() hang.
1251        p = multiprocessing.Pool(3, maxtasksperchild=1)
1252        results = []
1253        for i in range(6):
1254            results.append(p.apply_async(sqr, (i, 0.3)))
1255        p.close()
1256        p.join()
1257        # check the results
1258        for (j, res) in enumerate(results):
1259            self.assertEqual(res.get(), sqr(j))
1260
1261
1262#
1263# Test that manager has expected number of shared objects left
1264#
1265
1266class _TestZZZNumberOfObjects(BaseTestCase):
1267    # Because test cases are sorted alphabetically, this one will get
1268    # run after all the other tests for the manager.  It tests that
1269    # there have been no "reference leaks" for the manager's shared
1270    # objects.  Note the comment in _TestPool.test_terminate().
1271    ALLOWED_TYPES = ('manager',)
1272
1273    def test_number_of_objects(self):
1274        EXPECTED_NUMBER = 1                # the pool object is still alive
1275        multiprocessing.active_children()  # discard dead process objs
1276        gc.collect()                       # do garbage collection
1277        refs = self.manager._number_of_objects()
1278        debug_info = self.manager._debug_info()
1279        if refs != EXPECTED_NUMBER:
1280            print self.manager._debug_info()
1281            print debug_info
1282
1283        self.assertEqual(refs, EXPECTED_NUMBER)
1284
1285#
1286# Test of creating a customized manager class
1287#
1288
1289from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1290
1291class FooBar(object):
1292    def f(self):
1293        return 'f()'
1294    def g(self):
1295        raise ValueError
1296    def _h(self):
1297        return '_h()'
1298
1299def baz():
1300    for i in xrange(10):
1301        yield i*i
1302
1303class IteratorProxy(BaseProxy):
1304    _exposed_ = ('next', '__next__')
1305    def __iter__(self):
1306        return self
1307    def next(self):
1308        return self._callmethod('next')
1309    def __next__(self):
1310        return self._callmethod('__next__')
1311
1312class MyManager(BaseManager):
1313    pass
1314
1315MyManager.register('Foo', callable=FooBar)
1316MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1317MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1318
1319
1320class _TestMyManager(BaseTestCase):
1321
1322    ALLOWED_TYPES = ('manager',)
1323
1324    def test_mymanager(self):
1325        manager = MyManager()
1326        manager.start()
1327
1328        foo = manager.Foo()
1329        bar = manager.Bar()
1330        baz = manager.baz()
1331
1332        foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1333        bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1334
1335        self.assertEqual(foo_methods, ['f', 'g'])
1336        self.assertEqual(bar_methods, ['f', '_h'])
1337
1338        self.assertEqual(foo.f(), 'f()')
1339        self.assertRaises(ValueError, foo.g)
1340        self.assertEqual(foo._callmethod('f'), 'f()')
1341        self.assertRaises(RemoteError, foo._callmethod, '_h')
1342
1343        self.assertEqual(bar.f(), 'f()')
1344        self.assertEqual(bar._h(), '_h()')
1345        self.assertEqual(bar._callmethod('f'), 'f()')
1346        self.assertEqual(bar._callmethod('_h'), '_h()')
1347
1348        self.assertEqual(list(baz), [i*i for i in range(10)])
1349
1350        manager.shutdown()
1351
1352#
1353# Test of connecting to a remote server and using xmlrpclib for serialization
1354#
1355
1356_queue = Queue.Queue()
1357def get_queue():
1358    return _queue
1359
1360class QueueManager(BaseManager):
1361    '''manager class used by server process'''
1362QueueManager.register('get_queue', callable=get_queue)
1363
1364class QueueManager2(BaseManager):
1365    '''manager class which specifies the same interface as QueueManager'''
1366QueueManager2.register('get_queue')
1367
1368
1369SERIALIZER = 'xmlrpclib'
1370
1371class _TestRemoteManager(BaseTestCase):
1372
1373    ALLOWED_TYPES = ('manager',)
1374
1375    @classmethod
1376    def _putter(cls, address, authkey):
1377        manager = QueueManager2(
1378            address=address, authkey=authkey, serializer=SERIALIZER
1379            )
1380        manager.connect()
1381        queue = manager.get_queue()
1382        queue.put(('hello world', None, True, 2.25))
1383
1384    def test_remote(self):
1385        authkey = os.urandom(32)
1386
1387        manager = QueueManager(
1388            address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1389            )
1390        manager.start()
1391
1392        p = self.Process(target=self._putter, args=(manager.address, authkey))
1393        p.daemon = True
1394        p.start()
1395
1396        manager2 = QueueManager2(
1397            address=manager.address, authkey=authkey, serializer=SERIALIZER
1398            )
1399        manager2.connect()
1400        queue = manager2.get_queue()
1401
1402        # Note that xmlrpclib will deserialize object as a list not a tuple
1403        self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1404
1405        # Because we are using xmlrpclib for serialization instead of
1406        # pickle this will cause a serialization error.
1407        self.assertRaises(Exception, queue.put, time.sleep)
1408
1409        # Make queue finalizer run before the server is stopped
1410        del queue
1411        manager.shutdown()
1412
1413class _TestManagerRestart(BaseTestCase):
1414
1415    @classmethod
1416    def _putter(cls, address, authkey):
1417        manager = QueueManager(
1418            address=address, authkey=authkey, serializer=SERIALIZER)
1419        manager.connect()
1420        queue = manager.get_queue()
1421        queue.put('hello world')
1422
1423    def test_rapid_restart(self):
1424        authkey = os.urandom(32)
1425        manager = QueueManager(
1426            address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
1427        srvr = manager.get_server()
1428        addr = srvr.address
1429        # Close the connection.Listener socket which gets opened as a part
1430        # of manager.get_server(). It's not needed for the test.
1431        srvr.listener.close()
1432        manager.start()
1433
1434        p = self.Process(target=self._putter, args=(manager.address, authkey))
1435        p.daemon = True
1436        p.start()
1437        queue = manager.get_queue()
1438        self.assertEqual(queue.get(), 'hello world')
1439        del queue
1440        manager.shutdown()
1441        manager = QueueManager(
1442            address=addr, authkey=authkey, serializer=SERIALIZER)
1443        manager.start()
1444        manager.shutdown()
1445
1446#
1447#
1448#
1449
1450SENTINEL = latin('')
1451
1452class _TestConnection(BaseTestCase):
1453
1454    ALLOWED_TYPES = ('processes', 'threads')
1455
1456    @classmethod
1457    def _echo(cls, conn):
1458        for msg in iter(conn.recv_bytes, SENTINEL):
1459            conn.send_bytes(msg)
1460        conn.close()
1461
1462    def test_connection(self):
1463        conn, child_conn = self.Pipe()
1464
1465        p = self.Process(target=self._echo, args=(child_conn,))
1466        p.daemon = True
1467        p.start()
1468
1469        seq = [1, 2.25, None]
1470        msg = latin('hello world')
1471        longmsg = msg * 10
1472        arr = array.array('i', range(4))
1473
1474        if self.TYPE == 'processes':
1475            self.assertEqual(type(conn.fileno()), int)
1476
1477        self.assertEqual(conn.send(seq), None)
1478        self.assertEqual(conn.recv(), seq)
1479
1480        self.assertEqual(conn.send_bytes(msg), None)
1481        self.assertEqual(conn.recv_bytes(), msg)
1482
1483        if self.TYPE == 'processes':
1484            buffer = array.array('i', [0]*10)
1485            expected = list(arr) + [0] * (10 - len(arr))
1486            self.assertEqual(conn.send_bytes(arr), None)
1487            self.assertEqual(conn.recv_bytes_into(buffer),
1488                             len(arr) * buffer.itemsize)
1489            self.assertEqual(list(buffer), expected)
1490
1491            buffer = array.array('i', [0]*10)
1492            expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1493            self.assertEqual(conn.send_bytes(arr), None)
1494            self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1495                             len(arr) * buffer.itemsize)
1496            self.assertEqual(list(buffer), expected)
1497
1498            buffer = bytearray(latin(' ' * 40))
1499            self.assertEqual(conn.send_bytes(longmsg), None)
1500            try:
1501                res = conn.recv_bytes_into(buffer)
1502            except multiprocessing.BufferTooShort, e:
1503                self.assertEqual(e.args, (longmsg,))
1504            else:
1505                self.fail('expected BufferTooShort, got %s' % res)
1506
1507        poll = TimingWrapper(conn.poll)
1508
1509        self.assertEqual(poll(), False)
1510        self.assertTimingAlmostEqual(poll.elapsed, 0)
1511
1512        self.assertEqual(poll(TIMEOUT1), False)
1513        self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1514
1515        conn.send(None)
1516        time.sleep(.1)
1517
1518        self.assertEqual(poll(TIMEOUT1), True)
1519        self.assertTimingAlmostEqual(poll.elapsed, 0)
1520
1521        self.assertEqual(conn.recv(), None)
1522
1523        really_big_msg = latin('X') * (1024 * 1024 * 16)   # 16Mb
1524        conn.send_bytes(really_big_msg)
1525        self.assertEqual(conn.recv_bytes(), really_big_msg)
1526
1527        conn.send_bytes(SENTINEL)                          # tell child to quit
1528        child_conn.close()
1529
1530        if self.TYPE == 'processes':
1531            self.assertEqual(conn.readable, True)
1532            self.assertEqual(conn.writable, True)
1533            self.assertRaises(EOFError, conn.recv)
1534            self.assertRaises(EOFError, conn.recv_bytes)
1535
1536        p.join()
1537
1538    def test_duplex_false(self):
1539        reader, writer = self.Pipe(duplex=False)
1540        self.assertEqual(writer.send(1), None)
1541        self.assertEqual(reader.recv(), 1)
1542        if self.TYPE == 'processes':
1543            self.assertEqual(reader.readable, True)
1544            self.assertEqual(reader.writable, False)
1545            self.assertEqual(writer.readable, False)
1546            self.assertEqual(writer.writable, True)
1547            self.assertRaises(IOError, reader.send, 2)
1548            self.assertRaises(IOError, writer.recv)
1549            self.assertRaises(IOError, writer.poll)
1550
1551    def test_spawn_close(self):
1552        # We test that a pipe connection can be closed by parent
1553        # process immediately after child is spawned.  On Windows this
1554        # would have sometimes failed on old versions because
1555        # child_conn would be closed before the child got a chance to
1556        # duplicate it.
1557        conn, child_conn = self.Pipe()
1558
1559        p = self.Process(target=self._echo, args=(child_conn,))
1560        p.daemon = True
1561        p.start()
1562        child_conn.close()    # this might complete before child initializes
1563
1564        msg = latin('hello')
1565        conn.send_bytes(msg)
1566        self.assertEqual(conn.recv_bytes(), msg)
1567
1568        conn.send_bytes(SENTINEL)
1569        conn.close()
1570        p.join()
1571
1572    def test_sendbytes(self):
1573        if self.TYPE != 'processes':
1574            return
1575
1576        msg = latin('abcdefghijklmnopqrstuvwxyz')
1577        a, b = self.Pipe()
1578
1579        a.send_bytes(msg)
1580        self.assertEqual(b.recv_bytes(), msg)
1581
1582        a.send_bytes(msg, 5)
1583        self.assertEqual(b.recv_bytes(), msg[5:])
1584
1585        a.send_bytes(msg, 7, 8)
1586        self.assertEqual(b.recv_bytes(), msg[7:7+8])
1587
1588        a.send_bytes(msg, 26)
1589        self.assertEqual(b.recv_bytes(), latin(''))
1590
1591        a.send_bytes(msg, 26, 0)
1592        self.assertEqual(b.recv_bytes(), latin(''))
1593
1594        self.assertRaises(ValueError, a.send_bytes, msg, 27)
1595
1596        self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1597
1598        self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1599
1600        self.assertRaises(ValueError, a.send_bytes, msg, -1)
1601
1602        self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1603
1604    @classmethod
1605    def _is_fd_assigned(cls, fd):
1606        try:
1607            os.fstat(fd)
1608        except OSError as e:
1609            if e.errno == errno.EBADF:
1610                return False
1611            raise
1612        else:
1613            return True
1614
1615    @classmethod
1616    def _writefd(cls, conn, data, create_dummy_fds=False):
1617        if create_dummy_fds:
1618            for i in range(0, 256):
1619                if not cls._is_fd_assigned(i):
1620                    os.dup2(conn.fileno(), i)
1621        fd = reduction.recv_handle(conn)
1622        if msvcrt:
1623            fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1624        os.write(fd, data)
1625        os.close(fd)
1626
1627    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
1628    def test_fd_transfer(self):
1629        if self.TYPE != 'processes':
1630            self.skipTest("only makes sense with processes")
1631        conn, child_conn = self.Pipe(duplex=True)
1632
1633        p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
1634        p.daemon = True
1635        p.start()
1636        with open(test_support.TESTFN, "wb") as f:
1637            fd = f.fileno()
1638            if msvcrt:
1639                fd = msvcrt.get_osfhandle(fd)
1640            reduction.send_handle(conn, fd, p.pid)
1641        p.join()
1642        with open(test_support.TESTFN, "rb") as f:
1643            self.assertEqual(f.read(), b"foo")
1644
1645    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
1646    @unittest.skipIf(sys.platform == "win32",
1647                     "test semantics don't make sense on Windows")
1648    @unittest.skipIf(MAXFD <= 256,
1649                     "largest assignable fd number is too small")
1650    @unittest.skipUnless(hasattr(os, "dup2"),
1651                         "test needs os.dup2()")
1652    def test_large_fd_transfer(self):
1653        # With fd > 256 (issue #11657)
1654        if self.TYPE != 'processes':
1655            self.skipTest("only makes sense with processes")
1656        conn, child_conn = self.Pipe(duplex=True)
1657
1658        p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
1659        p.daemon = True
1660        p.start()
1661        with open(test_support.TESTFN, "wb") as f:
1662            fd = f.fileno()
1663            for newfd in range(256, MAXFD):
1664                if not self._is_fd_assigned(newfd):
1665                    break
1666            else:
1667                self.fail("could not find an unassigned large file descriptor")
1668            os.dup2(fd, newfd)
1669            try:
1670                reduction.send_handle(conn, newfd, p.pid)
1671            finally:
1672                os.close(newfd)
1673        p.join()
1674        with open(test_support.TESTFN, "rb") as f:
1675            self.assertEqual(f.read(), b"bar")
1676
1677    @classmethod
1678    def _send_data_without_fd(self, conn):
1679        os.write(conn.fileno(), b"\0")
1680
1681    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
1682    @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1683    def test_missing_fd_transfer(self):
1684        # Check that exception is raised when received data is not
1685        # accompanied by a file descriptor in ancillary data.
1686        if self.TYPE != 'processes':
1687            self.skipTest("only makes sense with processes")
1688        conn, child_conn = self.Pipe(duplex=True)
1689
1690        p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1691        p.daemon = True
1692        p.start()
1693        self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1694        p.join()
1695
1696class _TestListenerClient(BaseTestCase):
1697
1698    ALLOWED_TYPES = ('processes', 'threads')
1699
1700    @classmethod
1701    def _test(cls, address):
1702        conn = cls.connection.Client(address)
1703        conn.send('hello')
1704        conn.close()
1705
1706    def test_listener_client(self):
1707        for family in self.connection.families:
1708            l = self.connection.Listener(family=family)
1709            p = self.Process(target=self._test, args=(l.address,))
1710            p.daemon = True
1711            p.start()
1712            conn = l.accept()
1713            self.assertEqual(conn.recv(), 'hello')
1714            p.join()
1715            l.close()
1716
1717    def test_issue14725(self):
1718        l = self.connection.Listener()
1719        p = self.Process(target=self._test, args=(l.address,))
1720        p.daemon = True
1721        p.start()
1722        time.sleep(1)
1723        # On Windows the client process should by now have connected,
1724        # written data and closed the pipe handle by now.  This causes
1725        # ConnectNamdedPipe() to fail with ERROR_NO_DATA.  See Issue
1726        # 14725.
1727        conn = l.accept()
1728        self.assertEqual(conn.recv(), 'hello')
1729        conn.close()
1730        p.join()
1731        l.close()
1732
1733#
1734# Test of sending connection and socket objects between processes
1735#
1736"""
1737class _TestPicklingConnections(BaseTestCase):
1738
1739    ALLOWED_TYPES = ('processes',)
1740
1741    def _listener(self, conn, families):
1742        for fam in families:
1743            l = self.connection.Listener(family=fam)
1744            conn.send(l.address)
1745            new_conn = l.accept()
1746            conn.send(new_conn)
1747
1748        if self.TYPE == 'processes':
1749            l = socket.socket()
1750            l.bind(('localhost', 0))
1751            conn.send(l.getsockname())
1752            l.listen(1)
1753            new_conn, addr = l.accept()
1754            conn.send(new_conn)
1755
1756        conn.recv()
1757
1758    def _remote(self, conn):
1759        for (address, msg) in iter(conn.recv, None):
1760            client = self.connection.Client(address)
1761            client.send(msg.upper())
1762            client.close()
1763
1764        if self.TYPE == 'processes':
1765            address, msg = conn.recv()
1766            client = socket.socket()
1767            client.connect(address)
1768            client.sendall(msg.upper())
1769            client.close()
1770
1771        conn.close()
1772
1773    def test_pickling(self):
1774        try:
1775            multiprocessing.allow_connection_pickling()
1776        except ImportError:
1777            return
1778
1779        families = self.connection.families
1780
1781        lconn, lconn0 = self.Pipe()
1782        lp = self.Process(target=self._listener, args=(lconn0, families))
1783        lp.daemon = True
1784        lp.start()
1785        lconn0.close()
1786
1787        rconn, rconn0 = self.Pipe()
1788        rp = self.Process(target=self._remote, args=(rconn0,))
1789        rp.daemon = True
1790        rp.start()
1791        rconn0.close()
1792
1793        for fam in families:
1794            msg = ('This connection uses family %s' % fam).encode('ascii')
1795            address = lconn.recv()
1796            rconn.send((address, msg))
1797            new_conn = lconn.recv()
1798            self.assertEqual(new_conn.recv(), msg.upper())
1799
1800        rconn.send(None)
1801
1802        if self.TYPE == 'processes':
1803            msg = latin('This connection uses a normal socket')
1804            address = lconn.recv()
1805            rconn.send((address, msg))
1806            if hasattr(socket, 'fromfd'):
1807                new_conn = lconn.recv()
1808                self.assertEqual(new_conn.recv(100), msg.upper())
1809            else:
1810                # XXX On Windows with Py2.6 need to backport fromfd()
1811                discard = lconn.recv_bytes()
1812
1813        lconn.send(None)
1814
1815        rconn.close()
1816        lconn.close()
1817
1818        lp.join()
1819        rp.join()
1820"""
1821#
1822#
1823#
1824
1825class _TestHeap(BaseTestCase):
1826
1827    ALLOWED_TYPES = ('processes',)
1828
1829    def test_heap(self):
1830        iterations = 5000
1831        maxblocks = 50
1832        blocks = []
1833
1834        # create and destroy lots of blocks of different sizes
1835        for i in xrange(iterations):
1836            size = int(random.lognormvariate(0, 1) * 1000)
1837            b = multiprocessing.heap.BufferWrapper(size)
1838            blocks.append(b)
1839            if len(blocks) > maxblocks:
1840                i = random.randrange(maxblocks)
1841                del blocks[i]
1842
1843        # get the heap object
1844        heap = multiprocessing.heap.BufferWrapper._heap
1845
1846        # verify the state of the heap
1847        all = []
1848        occupied = 0
1849        heap._lock.acquire()
1850        self.addCleanup(heap._lock.release)
1851        for L in heap._len_to_seq.values():
1852            for arena, start, stop in L:
1853                all.append((heap._arenas.index(arena), start, stop,
1854                            stop-start, 'free'))
1855        for arena, start, stop in heap._allocated_blocks:
1856            all.append((heap._arenas.index(arena), start, stop,
1857                        stop-start, 'occupied'))
1858            occupied += (stop-start)
1859
1860        all.sort()
1861
1862        for i in range(len(all)-1):
1863            (arena, start, stop) = all[i][:3]
1864            (narena, nstart, nstop) = all[i+1][:3]
1865            self.assertTrue((arena != narena and nstart == 0) or
1866                            (stop == nstart))
1867
1868    def test_free_from_gc(self):
1869        # Check that freeing of blocks by the garbage collector doesn't deadlock
1870        # (issue #12352).
1871        # Make sure the GC is enabled, and set lower collection thresholds to
1872        # make collections more frequent (and increase the probability of
1873        # deadlock).
1874        if not gc.isenabled():
1875            gc.enable()
1876            self.addCleanup(gc.disable)
1877        thresholds = gc.get_threshold()
1878        self.addCleanup(gc.set_threshold, *thresholds)
1879        gc.set_threshold(10)
1880
1881        # perform numerous block allocations, with cyclic references to make
1882        # sure objects are collected asynchronously by the gc
1883        for i in range(5000):
1884            a = multiprocessing.heap.BufferWrapper(1)
1885            b = multiprocessing.heap.BufferWrapper(1)
1886            # circular references
1887            a.buddy = b
1888            b.buddy = a
1889
1890#
1891#
1892#
1893
1894class _Foo(Structure):
1895    _fields_ = [
1896        ('x', c_int),
1897        ('y', c_double)
1898        ]
1899
1900class _TestSharedCTypes(BaseTestCase):
1901
1902    ALLOWED_TYPES = ('processes',)
1903
1904    def setUp(self):
1905        if not HAS_SHAREDCTYPES:
1906            self.skipTest("requires multiprocessing.sharedctypes")
1907
1908    @classmethod
1909    def _double(cls, x, y, foo, arr, string):
1910        x.value *= 2
1911        y.value *= 2
1912        foo.x *= 2
1913        foo.y *= 2
1914        string.value *= 2
1915        for i in range(len(arr)):
1916            arr[i] *= 2
1917
1918    def test_sharedctypes(self, lock=False):
1919        x = Value('i', 7, lock=lock)
1920        y = Value(c_double, 1.0/3.0, lock=lock)
1921        foo = Value(_Foo, 3, 2, lock=lock)
1922        arr = self.Array('d', range(10), lock=lock)
1923        string = self.Array('c', 20, lock=lock)
1924        string.value = latin('hello')
1925
1926        p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1927        p.daemon = True
1928        p.start()
1929        p.join()
1930
1931        self.assertEqual(x.value, 14)
1932        self.assertAlmostEqual(y.value, 2.0/3.0)
1933        self.assertEqual(foo.x, 6)
1934        self.assertAlmostEqual(foo.y, 4.0)
1935        for i in range(10):
1936            self.assertAlmostEqual(arr[i], i*2)
1937        self.assertEqual(string.value, latin('hellohello'))
1938
1939    def test_synchronize(self):
1940        self.test_sharedctypes(lock=True)
1941
1942    def test_copy(self):
1943        foo = _Foo(2, 5.0)
1944        bar = copy(foo)
1945        foo.x = 0
1946        foo.y = 0
1947        self.assertEqual(bar.x, 2)
1948        self.assertAlmostEqual(bar.y, 5.0)
1949
1950#
1951#
1952#
1953
1954class _TestFinalize(BaseTestCase):
1955
1956    ALLOWED_TYPES = ('processes',)
1957
1958    @classmethod
1959    def _test_finalize(cls, conn):
1960        class Foo(object):
1961            pass
1962
1963        a = Foo()
1964        util.Finalize(a, conn.send, args=('a',))
1965        del a           # triggers callback for a
1966
1967        b = Foo()
1968        close_b = util.Finalize(b, conn.send, args=('b',))
1969        close_b()       # triggers callback for b
1970        close_b()       # does nothing because callback has already been called
1971        del b           # does nothing because callback has already been called
1972
1973        c = Foo()
1974        util.Finalize(c, conn.send, args=('c',))
1975
1976        d10 = Foo()
1977        util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1978
1979        d01 = Foo()
1980        util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1981        d02 = Foo()
1982        util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1983        d03 = Foo()
1984        util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1985
1986        util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1987
1988        util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1989
1990        # call multiprocessing's cleanup function then exit process without
1991        # garbage collecting locals
1992        util._exit_function()
1993        conn.close()
1994        os._exit(0)
1995
1996    def test_finalize(self):
1997        conn, child_conn = self.Pipe()
1998
1999        p = self.Process(target=self._test_finalize, args=(child_conn,))
2000        p.daemon = True
2001        p.start()
2002        p.join()
2003
2004        result = [obj for obj in iter(conn.recv, 'STOP')]
2005        self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2006
2007#
2008# Test that from ... import * works for each module
2009#
2010
2011class _TestImportStar(BaseTestCase):
2012
2013    ALLOWED_TYPES = ('processes',)
2014
2015    def test_import(self):
2016        modules = [
2017            'multiprocessing', 'multiprocessing.connection',
2018            'multiprocessing.heap', 'multiprocessing.managers',
2019            'multiprocessing.pool', 'multiprocessing.process',
2020            'multiprocessing.synchronize', 'multiprocessing.util'
2021            ]
2022
2023        if HAS_REDUCTION:
2024            modules.append('multiprocessing.reduction')
2025
2026        if c_int is not None:
2027            # This module requires _ctypes
2028            modules.append('multiprocessing.sharedctypes')
2029
2030        for name in modules:
2031            __import__(name)
2032            mod = sys.modules[name]
2033
2034            for attr in getattr(mod, '__all__', ()):
2035                self.assertTrue(
2036                    hasattr(mod, attr),
2037                    '%r does not have attribute %r' % (mod, attr)
2038                    )
2039
2040#
2041# Quick test that logging works -- does not test logging output
2042#
2043
2044class _TestLogging(BaseTestCase):
2045
2046    ALLOWED_TYPES = ('processes',)
2047
2048    def test_enable_logging(self):
2049        logger = multiprocessing.get_logger()
2050        logger.setLevel(util.SUBWARNING)
2051        self.assertTrue(logger is not None)
2052        logger.debug('this will not be printed')
2053        logger.info('nor will this')
2054        logger.setLevel(LOG_LEVEL)
2055
2056    @classmethod
2057    def _test_level(cls, conn):
2058        logger = multiprocessing.get_logger()
2059        conn.send(logger.getEffectiveLevel())
2060
2061    def test_level(self):
2062        LEVEL1 = 32
2063        LEVEL2 = 37
2064
2065        logger = multiprocessing.get_logger()
2066        root_logger = logging.getLogger()
2067        root_level = root_logger.level
2068
2069        reader, writer = multiprocessing.Pipe(duplex=False)
2070
2071        logger.setLevel(LEVEL1)
2072        p = self.Process(target=self._test_level, args=(writer,))
2073        p.daemon = True
2074        p.start()
2075        self.assertEqual(LEVEL1, reader.recv())
2076
2077        logger.setLevel(logging.NOTSET)
2078        root_logger.setLevel(LEVEL2)
2079        p = self.Process(target=self._test_level, args=(writer,))
2080        p.daemon = True
2081        p.start()
2082        self.assertEqual(LEVEL2, reader.recv())
2083
2084        root_logger.setLevel(root_level)
2085        logger.setLevel(level=LOG_LEVEL)
2086
2087
2088# class _TestLoggingProcessName(BaseTestCase):
2089#
2090#     def handle(self, record):
2091#         assert record.processName == multiprocessing.current_process().name
2092#         self.__handled = True
2093#
2094#     def test_logging(self):
2095#         handler = logging.Handler()
2096#         handler.handle = self.handle
2097#         self.__handled = False
2098#         # Bypass getLogger() and side-effects
2099#         logger = logging.getLoggerClass()(
2100#                 'multiprocessing.test.TestLoggingProcessName')
2101#         logger.addHandler(handler)
2102#         logger.propagate = False
2103#
2104#         logger.warn('foo')
2105#         assert self.__handled
2106
2107#
2108# Check that Process.join() retries if os.waitpid() fails with EINTR
2109#
2110
2111class _TestPollEintr(BaseTestCase):
2112
2113    ALLOWED_TYPES = ('processes',)
2114
2115    @classmethod
2116    def _killer(cls, pid):
2117        time.sleep(0.5)
2118        os.kill(pid, signal.SIGUSR1)
2119
2120    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2121    def test_poll_eintr(self):
2122        got_signal = [False]
2123        def record(*args):
2124            got_signal[0] = True
2125        pid = os.getpid()
2126        oldhandler = signal.signal(signal.SIGUSR1, record)
2127        try:
2128            killer = self.Process(target=self._killer, args=(pid,))
2129            killer.start()
2130            p = self.Process(target=time.sleep, args=(1,))
2131            p.start()
2132            p.join()
2133            self.assertTrue(got_signal[0])
2134            self.assertEqual(p.exitcode, 0)
2135            killer.join()
2136        finally:
2137            signal.signal(signal.SIGUSR1, oldhandler)
2138
2139#
2140# Test to verify handle verification, see issue 3321
2141#
2142
2143class TestInvalidHandle(unittest.TestCase):
2144
2145    @unittest.skipIf(WIN32, "skipped on Windows")
2146    def test_invalid_handles(self):
2147        conn = _multiprocessing.Connection(44977608)
2148        self.assertRaises(IOError, conn.poll)
2149        self.assertRaises(IOError, _multiprocessing.Connection, -1)
2150
2151#
2152# Functions used to create test cases from the base ones in this module
2153#
2154
2155def get_attributes(Source, names):
2156    d = {}
2157    for name in names:
2158        obj = getattr(Source, name)
2159        if type(obj) == type(get_attributes):
2160            obj = staticmethod(obj)
2161        d[name] = obj
2162    return d
2163
2164def create_test_cases(Mixin, type):
2165    result = {}
2166    glob = globals()
2167    Type = type.capitalize()
2168
2169    for name in glob.keys():
2170        if name.startswith('_Test'):
2171            base = glob[name]
2172            if type in base.ALLOWED_TYPES:
2173                newname = 'With' + Type + name[1:]
2174                class Temp(base, unittest.TestCase, Mixin):
2175                    pass
2176                result[newname] = Temp
2177                Temp.__name__ = newname
2178                Temp.__module__ = Mixin.__module__
2179    return result
2180
2181#
2182# Create test cases
2183#
2184
2185class ProcessesMixin(object):
2186    TYPE = 'processes'
2187    Process = multiprocessing.Process
2188    locals().update(get_attributes(multiprocessing, (
2189        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2190        'Condition', 'Event', 'Value', 'Array', 'RawValue',
2191        'RawArray', 'current_process', 'active_children', 'Pipe',
2192        'connection', 'JoinableQueue', 'Pool'
2193        )))
2194
2195testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2196globals().update(testcases_processes)
2197
2198
2199class ManagerMixin(object):
2200    TYPE = 'manager'
2201    Process = multiprocessing.Process
2202    manager = object.__new__(multiprocessing.managers.SyncManager)
2203    locals().update(get_attributes(manager, (
2204        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2205       'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2206        'Namespace', 'JoinableQueue', 'Pool'
2207        )))
2208
2209testcases_manager = create_test_cases(ManagerMixin, type='manager')
2210globals().update(testcases_manager)
2211
2212
2213class ThreadsMixin(object):
2214    TYPE = 'threads'
2215    Process = multiprocessing.dummy.Process
2216    locals().update(get_attributes(multiprocessing.dummy, (
2217        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2218        'Condition', 'Event', 'Value', 'Array', 'current_process',
2219        'active_children', 'Pipe', 'connection', 'dict', 'list',
2220        'Namespace', 'JoinableQueue', 'Pool'
2221        )))
2222
2223testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2224globals().update(testcases_threads)
2225
2226class OtherTest(unittest.TestCase):
2227    # TODO: add more tests for deliver/answer challenge.
2228    def test_deliver_challenge_auth_failure(self):
2229        class _FakeConnection(object):
2230            def recv_bytes(self, size):
2231                return b'something bogus'
2232            def send_bytes(self, data):
2233                pass
2234        self.assertRaises(multiprocessing.AuthenticationError,
2235                          multiprocessing.connection.deliver_challenge,
2236                          _FakeConnection(), b'abc')
2237
2238    def test_answer_challenge_auth_failure(self):
2239        class _FakeConnection(object):
2240            def __init__(self):
2241                self.count = 0
2242            def recv_bytes(self, size):
2243                self.count += 1
2244                if self.count == 1:
2245                    return multiprocessing.connection.CHALLENGE
2246                elif self.count == 2:
2247                    return b'something bogus'
2248                return b''
2249            def send_bytes(self, data):
2250                pass
2251        self.assertRaises(multiprocessing.AuthenticationError,
2252                          multiprocessing.connection.answer_challenge,
2253                          _FakeConnection(), b'abc')
2254
2255#
2256# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2257#
2258
2259def initializer(ns):
2260    ns.test += 1
2261
2262class TestInitializers(unittest.TestCase):
2263    def setUp(self):
2264        self.mgr = multiprocessing.Manager()
2265        self.ns = self.mgr.Namespace()
2266        self.ns.test = 0
2267
2268    def tearDown(self):
2269        self.mgr.shutdown()
2270
2271    def test_manager_initializer(self):
2272        m = multiprocessing.managers.SyncManager()
2273        self.assertRaises(TypeError, m.start, 1)
2274        m.start(initializer, (self.ns,))
2275        self.assertEqual(self.ns.test, 1)
2276        m.shutdown()
2277
2278    def test_pool_initializer(self):
2279        self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2280        p = multiprocessing.Pool(1, initializer, (self.ns,))
2281        p.close()
2282        p.join()
2283        self.assertEqual(self.ns.test, 1)
2284
2285#
2286# Issue 5155, 5313, 5331: Test process in processes
2287# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2288#
2289
2290def _ThisSubProcess(q):
2291    try:
2292        item = q.get(block=False)
2293    except Queue.Empty:
2294        pass
2295
2296def _TestProcess(q):
2297    queue = multiprocessing.Queue()
2298    subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
2299    subProc.daemon = True
2300    subProc.start()
2301    subProc.join()
2302
2303def _afunc(x):
2304    return x*x
2305
2306def pool_in_process():
2307    pool = multiprocessing.Pool(processes=4)
2308    x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2309
2310class _file_like(object):
2311    def __init__(self, delegate):
2312        self._delegate = delegate
2313        self._pid = None
2314
2315    @property
2316    def cache(self):
2317        pid = os.getpid()
2318        # There are no race conditions since fork keeps only the running thread
2319        if pid != self._pid:
2320            self._pid = pid
2321            self._cache = []
2322        return self._cache
2323
2324    def write(self, data):
2325        self.cache.append(data)
2326
2327    def flush(self):
2328        self._delegate.write(''.join(self.cache))
2329        self._cache = []
2330
2331class TestStdinBadfiledescriptor(unittest.TestCase):
2332
2333    def test_queue_in_process(self):
2334        queue = multiprocessing.Queue()
2335        proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2336        proc.start()
2337        proc.join()
2338
2339    def test_pool_in_process(self):
2340        p = multiprocessing.Process(target=pool_in_process)
2341        p.start()
2342        p.join()
2343
2344    def test_flushing(self):
2345        sio = StringIO()
2346        flike = _file_like(sio)
2347        flike.write('foo')
2348        proc = multiprocessing.Process(target=lambda: flike.flush())
2349        flike.flush()
2350        assert sio.getvalue() == 'foo'
2351
2352#
2353# Test interaction with socket timeouts - see Issue #6056
2354#
2355
2356class TestTimeouts(unittest.TestCase):
2357    @classmethod
2358    def _test_timeout(cls, child, address):
2359        time.sleep(1)
2360        child.send(123)
2361        child.close()
2362        conn = multiprocessing.connection.Client(address)
2363        conn.send(456)
2364        conn.close()
2365
2366    def test_timeout(self):
2367        old_timeout = socket.getdefaulttimeout()
2368        try:
2369            socket.setdefaulttimeout(0.1)
2370            parent, child = multiprocessing.Pipe(duplex=True)
2371            l = multiprocessing.connection.Listener(family='AF_INET')
2372            p = multiprocessing.Process(target=self._test_timeout,
2373                                        args=(child, l.address))
2374            p.start()
2375            child.close()
2376            self.assertEqual(parent.recv(), 123)
2377            parent.close()
2378            conn = l.accept()
2379            self.assertEqual(conn.recv(), 456)
2380            conn.close()
2381            l.close()
2382            p.join(10)
2383        finally:
2384            socket.setdefaulttimeout(old_timeout)
2385
2386#
2387# Test what happens with no "if __name__ == '__main__'"
2388#
2389
2390class TestNoForkBomb(unittest.TestCase):
2391    def test_noforkbomb(self):
2392        name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
2393        if WIN32:
2394            rc, out, err = test.script_helper.assert_python_failure(name)
2395            self.assertEqual('', out.decode('ascii'))
2396            self.assertIn('RuntimeError', err.decode('ascii'))
2397        else:
2398            rc, out, err = test.script_helper.assert_python_ok(name)
2399            self.assertEqual('123', out.decode('ascii').rstrip())
2400            self.assertEqual('', err.decode('ascii'))
2401
2402#
2403# Issue 12098: check sys.flags of child matches that for parent
2404#
2405
2406class TestFlags(unittest.TestCase):
2407    @classmethod
2408    def run_in_grandchild(cls, conn):
2409        conn.send(tuple(sys.flags))
2410
2411    @classmethod
2412    def run_in_child(cls):
2413        import json
2414        r, w = multiprocessing.Pipe(duplex=False)
2415        p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
2416        p.start()
2417        grandchild_flags = r.recv()
2418        p.join()
2419        r.close()
2420        w.close()
2421        flags = (tuple(sys.flags), grandchild_flags)
2422        print(json.dumps(flags))
2423
2424    def test_flags(self):
2425        import json, subprocess
2426        # start child process using unusual flags
2427        prog = ('from test.test_multiprocessing import TestFlags; ' +
2428                'TestFlags.run_in_child()')
2429        data = subprocess.check_output(
2430            [sys.executable, '-E', '-B', '-O', '-c', prog])
2431        child_flags, grandchild_flags = json.loads(data.decode('ascii'))
2432        self.assertEqual(child_flags, grandchild_flags)
2433
2434#
2435# Issue #17555: ForkAwareThreadLock
2436#
2437
2438class TestForkAwareThreadLock(unittest.TestCase):
2439    # We recurisvely start processes.  Issue #17555 meant that the
2440    # after fork registry would get duplicate entries for the same
2441    # lock.  The size of the registry at generation n was ~2**n.
2442
2443    @classmethod
2444    def child(cls, n, conn):
2445        if n > 1:
2446            p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
2447            p.start()
2448            p.join()
2449        else:
2450            conn.send(len(util._afterfork_registry))
2451        conn.close()
2452
2453    def test_lock(self):
2454        r, w = multiprocessing.Pipe(False)
2455        l = util.ForkAwareThreadLock()
2456        old_size = len(util._afterfork_registry)
2457        p = multiprocessing.Process(target=self.child, args=(5, w))
2458        p.start()
2459        new_size = r.recv()
2460        p.join()
2461        self.assertLessEqual(new_size, old_size)
2462
2463#
2464#
2465#
2466
2467testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2468                   TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb,
2469                   TestFlags, TestForkAwareThreadLock]
2470
2471#
2472#
2473#
2474
2475def test_main(run=None):
2476    if sys.platform.startswith("linux"):
2477        try:
2478            lock = multiprocessing.RLock()
2479        except OSError:
2480            raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
2481
2482    check_enough_semaphores()
2483
2484    if run is None:
2485        from test.test_support import run_unittest as run
2486
2487    util.get_temp_dir()     # creates temp directory for use by all processes
2488
2489    multiprocessing.get_logger().setLevel(LOG_LEVEL)
2490
2491    ProcessesMixin.pool = multiprocessing.Pool(4)
2492    ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2493    ManagerMixin.manager.__init__()
2494    ManagerMixin.manager.start()
2495    ManagerMixin.pool = ManagerMixin.manager.Pool(4)
2496
2497    testcases = (
2498        sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2499        sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
2500        sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2501        testcases_other
2502        )
2503
2504    loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2505    suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2506    # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2507    # module during these tests is at least platform dependent and possibly
2508    # non-deterministic on any given platform. So we don't mind if the listed
2509    # warnings aren't actually raised.
2510    with test_support.check_py3k_warnings(
2511            (".+__(get|set)slice__ has been removed", DeprecationWarning),
2512            (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2513            quiet=True):
2514        run(suite)
2515
2516    ThreadsMixin.pool.terminate()
2517    ProcessesMixin.pool.terminate()
2518    ManagerMixin.pool.terminate()
2519    ManagerMixin.manager.shutdown()
2520
2521    del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
2522
2523def main():
2524    test_main(unittest.TextTestRunner(verbosity=2).run)
2525
2526if __name__ == '__main__':
2527    main()
2528