10a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#!/usr/bin/env python
20a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
30a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
40a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Unit tests for the multiprocessing package
50a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
60a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
70a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport unittest
80a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport Queue
90a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport time
100a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport sys
110a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport os
120a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport gc
130a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport signal
140a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport array
150a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport socket
160a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport random
170a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport logging
180a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport errno
190a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport test.script_helper
200a8c90248264a8b26970b4473770bcc3df8515fJosh Gaofrom test import test_support
210a8c90248264a8b26970b4473770bcc3df8515fJosh Gaofrom StringIO import StringIO
220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao_multiprocessing = test_support.import_module('_multiprocessing')
230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# import threading after _multiprocessing to raise a more relevant error
240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# message: "No module named _multiprocessing". _multiprocessing is not compiled
250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# without thread support.
260a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport threading
270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Work around broken sem_open implementations
290a8c90248264a8b26970b4473770bcc3df8515fJosh Gaotest_support.import_module('multiprocessing.synchronize')
300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
310a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport multiprocessing.dummy
320a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport multiprocessing.connection
330a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport multiprocessing.managers
340a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport multiprocessing.heap
350a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport multiprocessing.pool
360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
370a8c90248264a8b26970b4473770bcc3df8515fJosh Gaofrom multiprocessing import util
380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
390a8c90248264a8b26970b4473770bcc3df8515fJosh Gaotry:
400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    from multiprocessing import reduction
410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    HAS_REDUCTION = True
420a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoexcept ImportError:
430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    HAS_REDUCTION = False
440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
450a8c90248264a8b26970b4473770bcc3df8515fJosh Gaotry:
460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    from multiprocessing.sharedctypes import Value, copy
470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    HAS_SHAREDCTYPES = True
480a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoexcept ImportError:
490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    HAS_SHAREDCTYPES = False
500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
510a8c90248264a8b26970b4473770bcc3df8515fJosh Gaotry:
520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    import msvcrt
530a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoexcept ImportError:
540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    msvcrt = None
550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
600a8c90248264a8b26970b4473770bcc3df8515fJosh Gaolatin = str
610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Constants
640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
660a8c90248264a8b26970b4473770bcc3df8515fJosh GaoLOG_LEVEL = util.SUBWARNING
670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#LOG_LEVEL = logging.DEBUG
680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
690a8c90248264a8b26970b4473770bcc3df8515fJosh GaoDELTA = 0.1
700a8c90248264a8b26970b4473770bcc3df8515fJosh GaoCHECK_TIMINGS = False     # making true makes tests take a lot longer
710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                          # and can sometimes cause some non-serious
720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                          # failures because some calls block a bit
730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                          # longer than expected
740a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoif CHECK_TIMINGS:
750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
760a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoelse:
770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
790a8c90248264a8b26970b4473770bcc3df8515fJosh GaoHAVE_GETVALUE = not getattr(_multiprocessing,
800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                            'HAVE_BROKEN_SEM_GETVALUE', False)
810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
820a8c90248264a8b26970b4473770bcc3df8515fJosh GaoWIN32 = (sys.platform == "win32")
830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
840a8c90248264a8b26970b4473770bcc3df8515fJosh Gaotry:
850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    MAXFD = os.sysconf("SC_OPEN_MAX")
860a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoexcept:
870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    MAXFD = 256
880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Some tests require ctypes
910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
930a8c90248264a8b26970b4473770bcc3df8515fJosh Gaotry:
940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    from ctypes import Structure, c_int, c_double
950a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoexcept ImportError:
960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    Structure = object
970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    c_int = c_double = None
980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1000a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef check_enough_semaphores():
1010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    """Check that the system supports enough semaphores to run the test."""
1020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    # minimum number of semaphores available according to POSIX
1030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    nsems_min = 256
1040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    try:
1050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        nsems = os.sysconf("SC_SEM_NSEMS_MAX")
1060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    except (AttributeError, ValueError):
1070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # sysconf not available or setting not available
1080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return
1090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    if nsems == -1 or nsems >= nsems_min:
1100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return
1110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    raise unittest.SkipTest("The OS doesn't support enough semaphores "
1120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                            "to run the test (required: %d)." % nsems_min)
1130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
1160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Creates a wrapper for a function which records the time it takes to finish
1170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
1180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1190a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass TimingWrapper(object):
1200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def __init__(self, func):
1220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.func = func
1230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.elapsed = None
1240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def __call__(self, *args, **kwds):
1260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        t = time.time()
1270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        try:
1280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            return self.func(*args, **kwds)
1290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        finally:
1300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.elapsed = time.time() - t
1310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
1330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Base class for test cases
1340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
1350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1360a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass BaseTestCase(object):
1370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ALLOWED_TYPES = ('processes', 'manager', 'threads')
1390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def assertTimingAlmostEqual(self, a, b):
1410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if CHECK_TIMINGS:
1420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertAlmostEqual(a, b, 1)
1430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def assertReturnsIfImplemented(self, value, func, *args):
1450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        try:
1460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            res = func(*args)
1470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        except NotImplementedError:
1480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            pass
1490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        else:
1500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            return self.assertEqual(value, res)
1510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    # For the sanity of Windows users, rather than crashing or freezing in
1530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    # multiple ways.
1540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def __reduce__(self, *args):
1550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        raise NotImplementedError("shouldn't try to pickle a test case")
1560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    __reduce_ex__ = __reduce__
1580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
1600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Return the value of a semaphore
1610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
1620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1630a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef get_value(self):
1640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    try:
1650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return self.get_value()
1660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    except AttributeError:
1670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        try:
1680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            return self._Semaphore__value
1690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        except AttributeError:
1700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            try:
1710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                return self._value
1720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            except AttributeError:
1730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                raise NotImplementedError
1740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
1760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Testcases
1770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
1780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1790a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass _TestProcess(BaseTestCase):
1800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ALLOWED_TYPES = ('processes', 'threads')
1820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_current(self):
1840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if self.TYPE == 'threads':
1850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            return
1860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        current = self.current_process()
1880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        authkey = current.authkey
1890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTrue(current.is_alive())
1910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTrue(not current.daemon)
1920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertIsInstance(authkey, bytes)
1930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTrue(len(authkey) > 0)
1940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(current.ident, os.getpid())
1950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(current.exitcode, None)
1960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @classmethod
1980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _test(cls, q, *args, **kwds):
1990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        current = cls.current_process()
2000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        q.put(args)
2010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        q.put(kwds)
2020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        q.put(current.name)
2030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if cls.TYPE != 'threads':
2040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            q.put(bytes(current.authkey))
2050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            q.put(current.pid)
2060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_process(self):
2080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        q = self.Queue(1)
2090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        e = self.Event()
2100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        args = (q, 1, 2)
2110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        kwargs = {'hello':23, 'bye':2.54}
2120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        name = 'SomeProcess'
2130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p = self.Process(
2140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            target=self._test, args=args, kwargs=kwargs, name=name
2150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            )
2160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.daemon = True
2170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        current = self.current_process()
2180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if self.TYPE != 'threads':
2200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(p.authkey, current.authkey)
2210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(p.is_alive(), False)
2220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(p.daemon, True)
2230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertNotIn(p, self.active_children())
2240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTrue(type(self.active_children()) is list)
2250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(p.exitcode, None)
2260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.start()
2280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(p.exitcode, None)
2300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(p.is_alive(), True)
2310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertIn(p, self.active_children())
2320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(q.get(), args[1:])
2340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(q.get(), kwargs)
2350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(q.get(), p.name)
2360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if self.TYPE != 'threads':
2370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(q.get(), current.authkey)
2380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(q.get(), p.pid)
2390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.join()
2410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(p.exitcode, 0)
2430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(p.is_alive(), False)
2440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertNotIn(p, self.active_children())
2450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @classmethod
2470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _test_terminate(cls):
2480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        time.sleep(1000)
2490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_terminate(self):
2510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if self.TYPE == 'threads':
2520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            return
2530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p = self.Process(target=self._test_terminate)
2550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.daemon = True
2560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.start()
2570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(p.is_alive(), True)
2590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertIn(p, self.active_children())
2600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(p.exitcode, None)
2610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.terminate()
2630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        join = TimingWrapper(p.join)
2650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(join(), None)
2660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTimingAlmostEqual(join.elapsed, 0.0)
2670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(p.is_alive(), False)
2690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertNotIn(p, self.active_children())
2700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.join()
2720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # XXX sometimes get p.exitcode == 0 on Windows ...
2740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        #self.assertEqual(p.exitcode, -signal.SIGTERM)
2750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_cpu_count(self):
2770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        try:
2780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            cpus = multiprocessing.cpu_count()
2790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        except NotImplementedError:
2800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            cpus = 1
2810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTrue(type(cpus) is int)
2820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTrue(cpus >= 1)
2830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_active_children(self):
2850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(type(self.active_children()), list)
2860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p = self.Process(target=time.sleep, args=(DELTA,))
2880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertNotIn(p, self.active_children())
2890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.daemon = True
2910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.start()
2920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertIn(p, self.active_children())
2930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.join()
2950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertNotIn(p, self.active_children())
2960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @classmethod
2980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _test_recursion(cls, wconn, id):
2990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        from multiprocessing import forking
3000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        wconn.send(id)
3010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if len(id) < 2:
3020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            for i in range(2):
3030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                p = cls.Process(
3040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    target=cls._test_recursion, args=(wconn, id+[i])
3050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    )
3060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                p.start()
3070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                p.join()
3080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_recursion(self):
3100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        rconn, wconn = self.Pipe(duplex=False)
3110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._test_recursion(wconn, [])
3120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        time.sleep(DELTA)
3140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        result = []
3150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        while rconn.poll():
3160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            result.append(rconn.recv())
3170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        expected = [
3190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            [],
3200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao              [0],
3210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                [0, 0],
3220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                [0, 1],
3230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao              [1],
3240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                [1, 0],
3250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                [1, 1]
3260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            ]
3270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(result, expected)
3280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @classmethod
3300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _test_sys_exit(cls, reason, testfn):
3310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        sys.stderr = open(testfn, 'w')
3320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        sys.exit(reason)
3330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_sys_exit(self):
3350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # See Issue 13854
3360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if self.TYPE == 'threads':
3370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            return
3380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        testfn = test_support.TESTFN
3400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.addCleanup(test_support.unlink, testfn)
3410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for reason, code in (([1, 2, 3], 1), ('ignore this', 0)):
3430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
3440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p.daemon = True
3450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p.start()
3460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p.join(5)
3470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(p.exitcode, code)
3480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            with open(testfn, 'r') as f:
3500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                self.assertEqual(f.read().rstrip(), str(reason))
3510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for reason in (True, False, 8):
3530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p = self.Process(target=sys.exit, args=(reason,))
3540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p.daemon = True
3550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p.start()
3560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p.join(5)
3570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(p.exitcode, reason)
3580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
3600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
3610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
3620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3630a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass _UpperCaser(multiprocessing.Process):
3640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def __init__(self):
3660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        multiprocessing.Process.__init__(self)
3670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.child_conn, self.parent_conn = multiprocessing.Pipe()
3680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def run(self):
3700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.parent_conn.close()
3710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for s in iter(self.child_conn.recv, None):
3720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.child_conn.send(s.upper())
3730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.child_conn.close()
3740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def submit(self, s):
3760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        assert type(s) is str
3770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.parent_conn.send(s)
3780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return self.parent_conn.recv()
3790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def stop(self):
3810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.parent_conn.send(None)
3820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.parent_conn.close()
3830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.child_conn.close()
3840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3850a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass _TestSubclassingProcess(BaseTestCase):
3860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ALLOWED_TYPES = ('processes',)
3880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_subclassing(self):
3900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        uppercaser = _UpperCaser()
3910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        uppercaser.daemon = True
3920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        uppercaser.start()
3930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(uppercaser.submit('hello'), 'HELLO')
3940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(uppercaser.submit('world'), 'WORLD')
3950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        uppercaser.stop()
3960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        uppercaser.join()
3970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
3990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
4000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
4010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4020a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef queue_empty(q):
4030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    if hasattr(q, 'empty'):
4040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return q.empty()
4050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    else:
4060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return q.qsize() == 0
4070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4080a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef queue_full(q, maxsize):
4090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    if hasattr(q, 'full'):
4100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return q.full()
4110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    else:
4120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return q.qsize() == maxsize
4130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4150a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass _TestQueue(BaseTestCase):
4160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @classmethod
4190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _test_put(cls, queue, child_can_start, parent_can_continue):
4200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        child_can_start.wait()
4210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for i in range(6):
4220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            queue.get()
4230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        parent_can_continue.set()
4240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_put(self):
4260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        MAXSIZE = 6
4270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        queue = self.Queue(maxsize=MAXSIZE)
4280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        child_can_start = self.Event()
4290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        parent_can_continue = self.Event()
4300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        proc = self.Process(
4320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            target=self._test_put,
4330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            args=(queue, child_can_start, parent_can_continue)
4340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            )
4350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        proc.daemon = True
4360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        proc.start()
4370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(queue_empty(queue), True)
4390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(queue_full(queue, MAXSIZE), False)
4400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        queue.put(1)
4420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        queue.put(2, True)
4430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        queue.put(3, True, None)
4440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        queue.put(4, False)
4450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        queue.put(5, False, None)
4460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        queue.put_nowait(6)
4470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # the values may be in buffer but not yet in pipe so sleep a bit
4490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        time.sleep(DELTA)
4500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(queue_empty(queue), False)
4520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(queue_full(queue, MAXSIZE), True)
4530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        put = TimingWrapper(queue.put)
4550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        put_nowait = TimingWrapper(queue.put_nowait)
4560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(Queue.Full, put, 7, False)
4580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTimingAlmostEqual(put.elapsed, 0)
4590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(Queue.Full, put, 7, False, None)
4610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTimingAlmostEqual(put.elapsed, 0)
4620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(Queue.Full, put_nowait, 7)
4640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
4650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
4670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
4680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
4700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTimingAlmostEqual(put.elapsed, 0)
4710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
4730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
4740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        child_can_start.set()
4760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        parent_can_continue.wait()
4770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(queue_empty(queue), True)
4790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(queue_full(queue, MAXSIZE), False)
4800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        proc.join()
4820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @classmethod
4840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _test_get(cls, queue, child_can_start, parent_can_continue):
4850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        child_can_start.wait()
4860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        #queue.put(1)
4870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        queue.put(2)
4880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        queue.put(3)
4890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        queue.put(4)
4900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        queue.put(5)
4910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        parent_can_continue.set()
4920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_get(self):
4940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        queue = self.Queue()
4950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        child_can_start = self.Event()
4960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        parent_can_continue = self.Event()
4970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        proc = self.Process(
4990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            target=self._test_get,
5000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            args=(queue, child_can_start, parent_can_continue)
5010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            )
5020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        proc.daemon = True
5030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        proc.start()
5040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(queue_empty(queue), True)
5060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        child_can_start.set()
5080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        parent_can_continue.wait()
5090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        time.sleep(DELTA)
5110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(queue_empty(queue), False)
5120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # Hangs unexpectedly, remove for now
5140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        #self.assertEqual(queue.get(), 1)
5150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(queue.get(True, None), 2)
5160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(queue.get(True), 3)
5170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(queue.get(timeout=1), 4)
5180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(queue.get_nowait(), 5)
5190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(queue_empty(queue), True)
5210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        get = TimingWrapper(queue.get)
5230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        get_nowait = TimingWrapper(queue.get_nowait)
5240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(Queue.Empty, get, False)
5260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTimingAlmostEqual(get.elapsed, 0)
5270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(Queue.Empty, get, False, None)
5290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTimingAlmostEqual(get.elapsed, 0)
5300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(Queue.Empty, get_nowait)
5320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
5330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
5350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
5360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
5380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTimingAlmostEqual(get.elapsed, 0)
5390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
5410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
5420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        proc.join()
5440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @classmethod
5460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _test_fork(cls, queue):
5470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for i in range(10, 20):
5480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            queue.put(i)
5490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # note that at this point the items may only be buffered, so the
5500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # process cannot shutdown until the feeder thread has finished
5510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # pushing items onto the pipe.
5520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_fork(self):
5540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # Old versions of Queue would fail to create a new feeder
5550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # thread for a forked process if the original process had its
5560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # own feeder thread.  This test checks that this no longer
5570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # happens.
5580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        queue = self.Queue()
5600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # put items on queue so that main process starts a feeder thread
5620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for i in range(10):
5630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            queue.put(i)
5640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # wait to make sure thread starts before we fork a new process
5660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        time.sleep(DELTA)
5670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # fork process
5690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p = self.Process(target=self._test_fork, args=(queue,))
5700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.daemon = True
5710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.start()
5720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # check that all expected items are in the queue
5740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for i in range(20):
5750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(queue.get(), i)
5760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(Queue.Empty, queue.get, False)
5770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.join()
5790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_qsize(self):
5810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        q = self.Queue()
5820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        try:
5830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(q.qsize(), 0)
5840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        except NotImplementedError:
5850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            return
5860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        q.put(1)
5870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(q.qsize(), 1)
5880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        q.put(5)
5890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(q.qsize(), 2)
5900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        q.get()
5910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(q.qsize(), 1)
5920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        q.get()
5930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(q.qsize(), 0)
5940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @classmethod
5960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _test_task_done(cls, q):
5970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for obj in iter(q.get, None):
5980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            time.sleep(DELTA)
5990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            q.task_done()
6000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_task_done(self):
6020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        queue = self.JoinableQueue()
6030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
6050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.skipTest("requires 'queue.task_done()' method")
6060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        workers = [self.Process(target=self._test_task_done, args=(queue,))
6080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                   for i in xrange(4)]
6090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for p in workers:
6110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p.daemon = True
6120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p.start()
6130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for i in xrange(10):
6150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            queue.put(i)
6160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        queue.join()
6180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for p in workers:
6200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            queue.put(None)
6210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for p in workers:
6230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p.join()
6240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
6260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
6270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
6280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6290a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass _TestLock(BaseTestCase):
6300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_lock(self):
6320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        lock = self.Lock()
6330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(lock.acquire(), True)
6340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(lock.acquire(False), False)
6350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(lock.release(), None)
6360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises((ValueError, threading.ThreadError), lock.release)
6370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_rlock(self):
6390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        lock = self.RLock()
6400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(lock.acquire(), True)
6410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(lock.acquire(), True)
6420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(lock.acquire(), True)
6430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(lock.release(), None)
6440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(lock.release(), None)
6450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(lock.release(), None)
6460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises((AssertionError, RuntimeError), lock.release)
6470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_lock_context(self):
6490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        with self.Lock():
6500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            pass
6510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6530a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass _TestSemaphore(BaseTestCase):
6540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _test_semaphore(self, sem):
6560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertReturnsIfImplemented(2, get_value, sem)
6570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(sem.acquire(), True)
6580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertReturnsIfImplemented(1, get_value, sem)
6590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(sem.acquire(), True)
6600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertReturnsIfImplemented(0, get_value, sem)
6610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(sem.acquire(False), False)
6620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertReturnsIfImplemented(0, get_value, sem)
6630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(sem.release(), None)
6640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertReturnsIfImplemented(1, get_value, sem)
6650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(sem.release(), None)
6660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertReturnsIfImplemented(2, get_value, sem)
6670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_semaphore(self):
6690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        sem = self.Semaphore(2)
6700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._test_semaphore(sem)
6710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(sem.release(), None)
6720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertReturnsIfImplemented(3, get_value, sem)
6730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(sem.release(), None)
6740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertReturnsIfImplemented(4, get_value, sem)
6750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_bounded_semaphore(self):
6770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        sem = self.BoundedSemaphore(2)
6780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._test_semaphore(sem)
6790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # Currently fails on OS/X
6800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        #if HAVE_GETVALUE:
6810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        #    self.assertRaises(ValueError, sem.release)
6820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        #    self.assertReturnsIfImplemented(2, get_value, sem)
6830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_timeout(self):
6850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if self.TYPE != 'processes':
6860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            return
6870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        sem = self.Semaphore(0)
6890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        acquire = TimingWrapper(sem.acquire)
6900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(acquire(False), False)
6920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
6930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(acquire(False, None), False)
6950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
6960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(acquire(False, TIMEOUT1), False)
6980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTimingAlmostEqual(acquire.elapsed, 0)
6990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
7000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(acquire(True, TIMEOUT2), False)
7010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
7020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
7030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(acquire(timeout=TIMEOUT3), False)
7040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
7050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
7060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
7070a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass _TestCondition(BaseTestCase):
7080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
7090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @classmethod
7100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def f(cls, cond, sleeping, woken, timeout=None):
7110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        cond.acquire()
7120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        sleeping.release()
7130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        cond.wait(timeout)
7140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        woken.release()
7150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        cond.release()
7160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
7170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def check_invariant(self, cond):
7180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # this is only supposed to succeed when there are no sleepers
7190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if self.TYPE == 'processes':
7200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            try:
7210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                sleepers = (cond._sleeping_count.get_value() -
7220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                            cond._woken_count.get_value())
7230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                self.assertEqual(sleepers, 0)
7240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                self.assertEqual(cond._wait_semaphore.get_value(), 0)
7250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            except NotImplementedError:
7260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                pass
7270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
7280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_notify(self):
7290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        cond = self.Condition()
7300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        sleeping = self.Semaphore(0)
7310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        woken = self.Semaphore(0)
7320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
7330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p = self.Process(target=self.f, args=(cond, sleeping, woken))
7340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.daemon = True
7350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.start()
7360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
7370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
7380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.daemon = True
7390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.start()
7400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
7410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # wait for both children to start sleeping
7420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        sleeping.acquire()
7430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        sleeping.acquire()
7440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
7450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # check no process/thread has woken up
7460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        time.sleep(DELTA)
7470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertReturnsIfImplemented(0, get_value, woken)
7480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
7490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # wake up one process/thread
7500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        cond.acquire()
7510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        cond.notify()
7520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        cond.release()
7530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
7540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # check one process/thread has woken up
7550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        time.sleep(DELTA)
7560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertReturnsIfImplemented(1, get_value, woken)
7570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
7580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # wake up another
7590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        cond.acquire()
7600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        cond.notify()
7610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        cond.release()
7620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
7630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # check other has woken up
7640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        time.sleep(DELTA)
7650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertReturnsIfImplemented(2, get_value, woken)
7660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
7670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # check state is not mucked up
7680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.check_invariant(cond)
7690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.join()
7700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
7710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_notify_all(self):
7720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        cond = self.Condition()
7730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        sleeping = self.Semaphore(0)
7740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        woken = self.Semaphore(0)
7750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
7760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # start some threads/processes which will timeout
7770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for i in range(3):
7780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p = self.Process(target=self.f,
7790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                             args=(cond, sleeping, woken, TIMEOUT1))
7800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p.daemon = True
7810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p.start()
7820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
7830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            t = threading.Thread(target=self.f,
7840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                                 args=(cond, sleeping, woken, TIMEOUT1))
7850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            t.daemon = True
7860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            t.start()
7870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
7880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # wait for them all to sleep
7890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for i in xrange(6):
7900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            sleeping.acquire()
7910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
7920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # check they have all timed out
7930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for i in xrange(6):
7940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            woken.acquire()
7950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertReturnsIfImplemented(0, get_value, woken)
7960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
7970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # check state is not mucked up
7980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.check_invariant(cond)
7990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
8000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # start some more threads/processes
8010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for i in range(3):
8020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p = self.Process(target=self.f, args=(cond, sleeping, woken))
8030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p.daemon = True
8040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p.start()
8050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
8060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
8070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            t.daemon = True
8080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            t.start()
8090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
8100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # wait for them to all sleep
8110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for i in xrange(6):
8120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            sleeping.acquire()
8130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
8140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # check no process/thread has woken up
8150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        time.sleep(DELTA)
8160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertReturnsIfImplemented(0, get_value, woken)
8170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
8180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # wake them all up
8190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        cond.acquire()
8200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        cond.notify_all()
8210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        cond.release()
8220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
8230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # check they have all woken
8240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        time.sleep(DELTA)
8250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertReturnsIfImplemented(6, get_value, woken)
8260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
8270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # check state is not mucked up
8280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.check_invariant(cond)
8290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
8300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_timeout(self):
8310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        cond = self.Condition()
8320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        wait = TimingWrapper(cond.wait)
8330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        cond.acquire()
8340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        res = wait(TIMEOUT1)
8350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        cond.release()
8360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(res, None)
8370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
8380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
8390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
8400a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass _TestEvent(BaseTestCase):
8410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
8420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @classmethod
8430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _test_event(cls, event):
8440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        time.sleep(TIMEOUT2)
8450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        event.set()
8460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
8470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_event(self):
8480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        event = self.Event()
8490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        wait = TimingWrapper(event.wait)
8500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
8510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # Removed temporarily, due to API shear, this does not
8520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # work with threading._Event objects. is_set == isSet
8530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(event.is_set(), False)
8540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
8550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # Removed, threading.Event.wait() will return the value of the __flag
8560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # instead of None. API Shear with the semaphore backed mp.Event
8570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(wait(0.0), False)
8580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
8590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(wait(TIMEOUT1), False)
8600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
8610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
8620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        event.set()
8630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
8640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # See note above on the API differences
8650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(event.is_set(), True)
8660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(wait(), True)
8670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
8680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(wait(TIMEOUT1), True)
8690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
8700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # self.assertEqual(event.is_set(), True)
8710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
8720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        event.clear()
8730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
8740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        #self.assertEqual(event.is_set(), False)
8750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
8760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p = self.Process(target=self._test_event, args=(event,))
8770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.daemon = True
8780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.start()
8790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(wait(), True)
8800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
8810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
8820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
8830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
8840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
8850a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass _TestValue(BaseTestCase):
8860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
8870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ALLOWED_TYPES = ('processes',)
8880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
8890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    codes_values = [
8900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        ('i', 4343, 24234),
8910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        ('d', 3.625, -4.25),
8920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        ('h', -232, 234),
8930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        ('c', latin('x'), latin('y'))
8940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        ]
8950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
8960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def setUp(self):
8970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if not HAS_SHAREDCTYPES:
8980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.skipTest("requires multiprocessing.sharedctypes")
8990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
9000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @classmethod
9010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _test(cls, values):
9020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for sv, cv in zip(values, cls.codes_values):
9030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            sv.value = cv[2]
9040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
9050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
9060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_value(self, raw=False):
9070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if raw:
9080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            values = [self.RawValue(code, value)
9090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                      for code, value, _ in self.codes_values]
9100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        else:
9110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            values = [self.Value(code, value)
9120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                      for code, value, _ in self.codes_values]
9130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
9140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for sv, cv in zip(values, self.codes_values):
9150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(sv.value, cv[1])
9160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
9170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        proc = self.Process(target=self._test, args=(values,))
9180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        proc.daemon = True
9190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        proc.start()
9200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        proc.join()
9210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
9220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for sv, cv in zip(values, self.codes_values):
9230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(sv.value, cv[2])
9240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
9250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_rawvalue(self):
9260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.test_value(raw=True)
9270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
9280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_getobj_getlock(self):
9290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        val1 = self.Value('i', 5)
9300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        lock1 = val1.get_lock()
9310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        obj1 = val1.get_obj()
9320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
9330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        val2 = self.Value('i', 5, lock=None)
9340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        lock2 = val2.get_lock()
9350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        obj2 = val2.get_obj()
9360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
9370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        lock = self.Lock()
9380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        val3 = self.Value('i', 5, lock=lock)
9390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        lock3 = val3.get_lock()
9400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        obj3 = val3.get_obj()
9410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(lock, lock3)
9420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
9430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        arr4 = self.Value('i', 5, lock=False)
9440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertFalse(hasattr(arr4, 'get_lock'))
9450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertFalse(hasattr(arr4, 'get_obj'))
9460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
9470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
9480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
9490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        arr5 = self.RawValue('i', 5)
9500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertFalse(hasattr(arr5, 'get_lock'))
9510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertFalse(hasattr(arr5, 'get_obj'))
9520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
9530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
9540a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass _TestArray(BaseTestCase):
9550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
9560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ALLOWED_TYPES = ('processes',)
9570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
9580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @classmethod
9590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def f(cls, seq):
9600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for i in range(1, len(seq)):
9610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            seq[i] += seq[i-1]
9620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
9630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @unittest.skipIf(c_int is None, "requires _ctypes")
9640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_array(self, raw=False):
9650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
9660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if raw:
9670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            arr = self.RawArray('i', seq)
9680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        else:
9690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            arr = self.Array('i', seq)
9700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
9710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(len(arr), len(seq))
9720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(arr[3], seq[3])
9730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(list(arr[2:7]), list(seq[2:7]))
9740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
9750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
9760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
9770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(list(arr[:]), seq)
9780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
9790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.f(seq)
9800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
9810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p = self.Process(target=self.f, args=(arr,))
9820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.daemon = True
9830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.start()
9840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.join()
9850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
9860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(list(arr[:]), seq)
9870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
9880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @unittest.skipIf(c_int is None, "requires _ctypes")
9890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_array_from_size(self):
9900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        size = 10
9910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # Test for zeroing (see issue #11675).
9920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # The repetition below strengthens the test by increasing the chances
9930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # of previously allocated non-zero memory being used for the new array
9940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # on the 2nd and 3rd loops.
9950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for _ in range(3):
9960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            arr = self.Array('i', size)
9970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(len(arr), size)
9980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(list(arr), [0] * size)
9990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            arr[:] = range(10)
10000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(list(arr), range(10))
10010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            del arr
10020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
10030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @unittest.skipIf(c_int is None, "requires _ctypes")
10040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_rawarray(self):
10050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.test_array(raw=True)
10060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
10070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @unittest.skipIf(c_int is None, "requires _ctypes")
10080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_array_accepts_long(self):
10090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        arr = self.Array('i', 10L)
10100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(len(arr), 10)
10110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        raw_arr = self.RawArray('i', 10L)
10120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(len(raw_arr), 10)
10130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
10140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @unittest.skipIf(c_int is None, "requires _ctypes")
10150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_getobj_getlock_obj(self):
10160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        arr1 = self.Array('i', range(10))
10170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        lock1 = arr1.get_lock()
10180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        obj1 = arr1.get_obj()
10190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
10200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        arr2 = self.Array('i', range(10), lock=None)
10210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        lock2 = arr2.get_lock()
10220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        obj2 = arr2.get_obj()
10230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
10240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        lock = self.Lock()
10250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        arr3 = self.Array('i', range(10), lock=lock)
10260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        lock3 = arr3.get_lock()
10270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        obj3 = arr3.get_obj()
10280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(lock, lock3)
10290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
10300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        arr4 = self.Array('i', range(10), lock=False)
10310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertFalse(hasattr(arr4, 'get_lock'))
10320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertFalse(hasattr(arr4, 'get_obj'))
10330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(AttributeError,
10340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                          self.Array, 'i', range(10), lock='notalock')
10350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
10360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        arr5 = self.RawArray('i', range(10))
10370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertFalse(hasattr(arr5, 'get_lock'))
10380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertFalse(hasattr(arr5, 'get_obj'))
10390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
10400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
10410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
10420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
10430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
10440a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass _TestContainers(BaseTestCase):
10450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
10460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ALLOWED_TYPES = ('manager',)
10470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
10480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_list(self):
10490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        a = self.list(range(10))
10500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(a[:], range(10))
10510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
10520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        b = self.list()
10530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(b[:], [])
10540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
10550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        b.extend(range(5))
10560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(b[:], range(5))
10570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
10580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(b[2], 2)
10590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(b[2:10], [2,3,4])
10600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
10610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        b *= 2
10620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
10630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
10640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
10650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
10660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(a[:], range(10))
10670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
10680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        d = [a, b]
10690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        e = self.list(d)
10700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(
10710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            e[:],
10720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
10730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            )
10740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
10750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        f = self.list([a])
10760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        a.append('hello')
10770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
10780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
10790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_dict(self):
10800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        d = self.dict()
10810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        indices = range(65, 70)
10820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for i in indices:
10830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            d[i] = chr(i)
10840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
10850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(sorted(d.keys()), indices)
10860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
10870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
10880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
10890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_namespace(self):
10900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        n = self.Namespace()
10910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        n.name = 'Bob'
10920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        n.job = 'Builder'
10930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        n._hidden = 'hidden'
10940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
10950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        del n.job
10960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(str(n), "Namespace(name='Bob')")
10970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTrue(hasattr(n, 'name'))
10980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTrue(not hasattr(n, 'job'))
10990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
11000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
11010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
11020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
11030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
11040a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef sqr(x, wait=0.0):
11050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    time.sleep(wait)
11060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    return x*x
11070a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass _TestPool(BaseTestCase):
11080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
11090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_apply(self):
11100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        papply = self.pool.apply
11110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(papply(sqr, (5,)), sqr(5))
11120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
11130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
11140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_map(self):
11150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        pmap = self.pool.map
11160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
11170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(pmap(sqr, range(100), chunksize=20),
11180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                         map(sqr, range(100)))
11190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
11200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_map_chunksize(self):
11210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        try:
11220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
11230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        except multiprocessing.TimeoutError:
11240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.fail("pool.map_async with chunksize stalled on null list")
11250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
11260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_async(self):
11270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
11280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        get = TimingWrapper(res.get)
11290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(get(), 49)
11300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
11310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
11320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_async_timeout(self):
11330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
11340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        get = TimingWrapper(res.get)
11350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
11360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
11370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
11380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_imap(self):
11390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        it = self.pool.imap(sqr, range(10))
11400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(list(it), map(sqr, range(10)))
11410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
11420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        it = self.pool.imap(sqr, range(10))
11430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for i in range(10):
11440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(it.next(), i*i)
11450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(StopIteration, it.next)
11460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
11470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        it = self.pool.imap(sqr, range(1000), chunksize=100)
11480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for i in range(1000):
11490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(it.next(), i*i)
11500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(StopIteration, it.next)
11510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
11520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_imap_unordered(self):
11530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        it = self.pool.imap_unordered(sqr, range(1000))
11540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(sorted(it), map(sqr, range(1000)))
11550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
11560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
11570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(sorted(it), map(sqr, range(1000)))
11580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
11590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_make_pool(self):
11600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(ValueError, multiprocessing.Pool, -1)
11610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(ValueError, multiprocessing.Pool, 0)
11620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
11630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p = multiprocessing.Pool(3)
11640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(3, len(p._pool))
11650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.close()
11660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.join()
11670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
11680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_terminate(self):
11690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if self.TYPE == 'manager':
11700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            # On Unix a forked process increfs each shared object to
11710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            # which its parent process held a reference.  If the
11720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            # forked process gets terminated then there is likely to
11730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            # be a reference leak.  So to prevent
11740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            # _TestZZZNumberOfObjects from failing we skip this test
11750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            # when using a manager.
11760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            return
11770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
11780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        result = self.pool.map_async(
11790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            time.sleep, [0.1 for i in range(10000)], chunksize=1
11800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            )
11810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.pool.terminate()
11820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        join = TimingWrapper(self.pool.join)
11830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        join()
11840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTrue(join.elapsed < 0.2)
11850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
11860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_empty_iterable(self):
11870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # See Issue 12157
11880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p = self.Pool(1)
11890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
11900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(p.map(sqr, []), [])
11910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(list(p.imap(sqr, [])), [])
11920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(list(p.imap_unordered(sqr, [])), [])
11930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(p.map_async(sqr, []).get(), [])
11940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
11950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.close()
11960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.join()
11970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
11980a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef unpickleable_result():
11990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    return lambda: 42
12000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
12010a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass _TestPoolWorkerErrors(BaseTestCase):
12020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ALLOWED_TYPES = ('processes', )
12030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
12040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_unpickleable_result(self):
12050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        from multiprocessing.pool import MaybeEncodingError
12060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p = multiprocessing.Pool(2)
12070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
12080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # Make sure we don't lose pool processes because of encoding errors.
12090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for iteration in range(20):
12100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            res = p.apply_async(unpickleable_result)
12110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertRaises(MaybeEncodingError, res.get)
12120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
12130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.close()
12140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.join()
12150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
12160a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass _TestPoolWorkerLifetime(BaseTestCase):
12170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
12180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ALLOWED_TYPES = ('processes', )
12190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_pool_worker_lifetime(self):
12200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p = multiprocessing.Pool(3, maxtasksperchild=10)
12210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(3, len(p._pool))
12220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        origworkerpids = [w.pid for w in p._pool]
12230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # Run many tasks so each worker gets replaced (hopefully)
12240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        results = []
12250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for i in range(100):
12260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            results.append(p.apply_async(sqr, (i, )))
12270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # Fetch the results and verify we got the right answers,
12280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # also ensuring all the tasks have completed.
12290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for (j, res) in enumerate(results):
12300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(res.get(), sqr(j))
12310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # Refill the pool
12320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p._repopulate_pool()
12330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # Wait until all workers are alive
12340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # (countdown * DELTA = 5 seconds max startup process time)
12350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        countdown = 50
12360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        while countdown and not all(w.is_alive() for w in p._pool):
12370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            countdown -= 1
12380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            time.sleep(DELTA)
12390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        finalworkerpids = [w.pid for w in p._pool]
12400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # All pids should be assigned.  See issue #7805.
12410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertNotIn(None, origworkerpids)
12420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertNotIn(None, finalworkerpids)
12430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # Finally, check that the worker pids have changed
12440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
12450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.close()
12460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.join()
12470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
12480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_pool_worker_lifetime_early_close(self):
12490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # Issue #10332: closing a pool whose workers have limited lifetimes
12500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # before all the tasks completed would make join() hang.
12510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p = multiprocessing.Pool(3, maxtasksperchild=1)
12520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        results = []
12530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for i in range(6):
12540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            results.append(p.apply_async(sqr, (i, 0.3)))
12550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.close()
12560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.join()
12570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # check the results
12580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for (j, res) in enumerate(results):
12590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(res.get(), sqr(j))
12600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
12610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
12620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
12630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Test that manager has expected number of shared objects left
12640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
12650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
12660a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass _TestZZZNumberOfObjects(BaseTestCase):
12670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    # Because test cases are sorted alphabetically, this one will get
12680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    # run after all the other tests for the manager.  It tests that
12690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    # there have been no "reference leaks" for the manager's shared
12700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    # objects.  Note the comment in _TestPool.test_terminate().
12710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ALLOWED_TYPES = ('manager',)
12720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
12730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_number_of_objects(self):
12740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        EXPECTED_NUMBER = 1                # the pool object is still alive
12750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        multiprocessing.active_children()  # discard dead process objs
12760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        gc.collect()                       # do garbage collection
12770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        refs = self.manager._number_of_objects()
12780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        debug_info = self.manager._debug_info()
12790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if refs != EXPECTED_NUMBER:
12800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            print self.manager._debug_info()
12810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            print debug_info
12820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
12830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(refs, EXPECTED_NUMBER)
12840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
12850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
12860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Test of creating a customized manager class
12870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
12880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
12890a8c90248264a8b26970b4473770bcc3df8515fJosh Gaofrom multiprocessing.managers import BaseManager, BaseProxy, RemoteError
12900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
12910a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass FooBar(object):
12920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def f(self):
12930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return 'f()'
12940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def g(self):
12950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        raise ValueError
12960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _h(self):
12970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return '_h()'
12980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
12990a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef baz():
13000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    for i in xrange(10):
13010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        yield i*i
13020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
13030a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass IteratorProxy(BaseProxy):
13040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    _exposed_ = ('next', '__next__')
13050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def __iter__(self):
13060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return self
13070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def next(self):
13080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return self._callmethod('next')
13090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def __next__(self):
13100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return self._callmethod('__next__')
13110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
13120a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass MyManager(BaseManager):
13130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    pass
13140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
13150a8c90248264a8b26970b4473770bcc3df8515fJosh GaoMyManager.register('Foo', callable=FooBar)
13160a8c90248264a8b26970b4473770bcc3df8515fJosh GaoMyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
13170a8c90248264a8b26970b4473770bcc3df8515fJosh GaoMyManager.register('baz', callable=baz, proxytype=IteratorProxy)
13180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
13190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
13200a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass _TestMyManager(BaseTestCase):
13210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
13220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ALLOWED_TYPES = ('manager',)
13230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
13240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_mymanager(self):
13250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        manager = MyManager()
13260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        manager.start()
13270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
13280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        foo = manager.Foo()
13290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        bar = manager.Bar()
13300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        baz = manager.baz()
13310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
13320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
13330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
13340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
13350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(foo_methods, ['f', 'g'])
13360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(bar_methods, ['f', '_h'])
13370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
13380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(foo.f(), 'f()')
13390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(ValueError, foo.g)
13400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(foo._callmethod('f'), 'f()')
13410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(RemoteError, foo._callmethod, '_h')
13420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
13430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(bar.f(), 'f()')
13440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(bar._h(), '_h()')
13450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(bar._callmethod('f'), 'f()')
13460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(bar._callmethod('_h'), '_h()')
13470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
13480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(list(baz), [i*i for i in range(10)])
13490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
13500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        manager.shutdown()
13510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
13520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
13530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Test of connecting to a remote server and using xmlrpclib for serialization
13540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
13550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
13560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao_queue = Queue.Queue()
13570a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef get_queue():
13580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    return _queue
13590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
13600a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass QueueManager(BaseManager):
13610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    '''manager class used by server process'''
13620a8c90248264a8b26970b4473770bcc3df8515fJosh GaoQueueManager.register('get_queue', callable=get_queue)
13630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
13640a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass QueueManager2(BaseManager):
13650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    '''manager class which specifies the same interface as QueueManager'''
13660a8c90248264a8b26970b4473770bcc3df8515fJosh GaoQueueManager2.register('get_queue')
13670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
13680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
13690a8c90248264a8b26970b4473770bcc3df8515fJosh GaoSERIALIZER = 'xmlrpclib'
13700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
13710a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass _TestRemoteManager(BaseTestCase):
13720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
13730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ALLOWED_TYPES = ('manager',)
13740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
13750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @classmethod
13760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _putter(cls, address, authkey):
13770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        manager = QueueManager2(
13780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            address=address, authkey=authkey, serializer=SERIALIZER
13790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            )
13800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        manager.connect()
13810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        queue = manager.get_queue()
13820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        queue.put(('hello world', None, True, 2.25))
13830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
13840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_remote(self):
13850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        authkey = os.urandom(32)
13860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
13870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        manager = QueueManager(
13880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
13890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            )
13900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        manager.start()
13910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
13920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p = self.Process(target=self._putter, args=(manager.address, authkey))
13930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.daemon = True
13940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.start()
13950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
13960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        manager2 = QueueManager2(
13970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            address=manager.address, authkey=authkey, serializer=SERIALIZER
13980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            )
13990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        manager2.connect()
14000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        queue = manager2.get_queue()
14010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
14020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # Note that xmlrpclib will deserialize object as a list not a tuple
14030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
14040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
14050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # Because we are using xmlrpclib for serialization instead of
14060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # pickle this will cause a serialization error.
14070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(Exception, queue.put, time.sleep)
14080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
14090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # Make queue finalizer run before the server is stopped
14100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        del queue
14110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        manager.shutdown()
14120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
14130a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass _TestManagerRestart(BaseTestCase):
14140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
14150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @classmethod
14160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _putter(cls, address, authkey):
14170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        manager = QueueManager(
14180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            address=address, authkey=authkey, serializer=SERIALIZER)
14190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        manager.connect()
14200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        queue = manager.get_queue()
14210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        queue.put('hello world')
14220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
14230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_rapid_restart(self):
14240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        authkey = os.urandom(32)
14250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        manager = QueueManager(
14260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
14270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        srvr = manager.get_server()
14280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        addr = srvr.address
14290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # Close the connection.Listener socket which gets opened as a part
14300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # of manager.get_server(). It's not needed for the test.
14310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        srvr.listener.close()
14320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        manager.start()
14330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
14340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p = self.Process(target=self._putter, args=(manager.address, authkey))
14350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.daemon = True
14360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.start()
14370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        queue = manager.get_queue()
14380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(queue.get(), 'hello world')
14390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        del queue
14400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        manager.shutdown()
14410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        manager = QueueManager(
14420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            address=addr, authkey=authkey, serializer=SERIALIZER)
14430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        manager.start()
14440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        manager.shutdown()
14450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
14460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
14470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
14480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
14490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
14500a8c90248264a8b26970b4473770bcc3df8515fJosh GaoSENTINEL = latin('')
14510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
14520a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass _TestConnection(BaseTestCase):
14530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
14540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ALLOWED_TYPES = ('processes', 'threads')
14550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
14560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @classmethod
14570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _echo(cls, conn):
14580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for msg in iter(conn.recv_bytes, SENTINEL):
14590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            conn.send_bytes(msg)
14600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        conn.close()
14610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
14620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_connection(self):
14630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        conn, child_conn = self.Pipe()
14640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
14650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p = self.Process(target=self._echo, args=(child_conn,))
14660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.daemon = True
14670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.start()
14680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
14690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        seq = [1, 2.25, None]
14700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        msg = latin('hello world')
14710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        longmsg = msg * 10
14720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        arr = array.array('i', range(4))
14730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
14740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if self.TYPE == 'processes':
14750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(type(conn.fileno()), int)
14760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
14770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(conn.send(seq), None)
14780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(conn.recv(), seq)
14790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
14800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(conn.send_bytes(msg), None)
14810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(conn.recv_bytes(), msg)
14820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
14830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if self.TYPE == 'processes':
14840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            buffer = array.array('i', [0]*10)
14850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            expected = list(arr) + [0] * (10 - len(arr))
14860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(conn.send_bytes(arr), None)
14870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(conn.recv_bytes_into(buffer),
14880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                             len(arr) * buffer.itemsize)
14890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(list(buffer), expected)
14900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
14910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            buffer = array.array('i', [0]*10)
14920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
14930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(conn.send_bytes(arr), None)
14940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
14950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                             len(arr) * buffer.itemsize)
14960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(list(buffer), expected)
14970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
14980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            buffer = bytearray(latin(' ' * 40))
14990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(conn.send_bytes(longmsg), None)
15000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            try:
15010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                res = conn.recv_bytes_into(buffer)
15020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            except multiprocessing.BufferTooShort, e:
15030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                self.assertEqual(e.args, (longmsg,))
15040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            else:
15050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                self.fail('expected BufferTooShort, got %s' % res)
15060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
15070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        poll = TimingWrapper(conn.poll)
15080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
15090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(poll(), False)
15100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTimingAlmostEqual(poll.elapsed, 0)
15110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
15120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(poll(TIMEOUT1), False)
15130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
15140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
15150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        conn.send(None)
15160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        time.sleep(.1)
15170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
15180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(poll(TIMEOUT1), True)
15190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTimingAlmostEqual(poll.elapsed, 0)
15200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
15210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(conn.recv(), None)
15220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
15230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        really_big_msg = latin('X') * (1024 * 1024 * 16)   # 16Mb
15240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        conn.send_bytes(really_big_msg)
15250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(conn.recv_bytes(), really_big_msg)
15260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
15270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        conn.send_bytes(SENTINEL)                          # tell child to quit
15280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        child_conn.close()
15290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
15300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if self.TYPE == 'processes':
15310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(conn.readable, True)
15320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(conn.writable, True)
15330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertRaises(EOFError, conn.recv)
15340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertRaises(EOFError, conn.recv_bytes)
15350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
15360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.join()
15370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
15380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_duplex_false(self):
15390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        reader, writer = self.Pipe(duplex=False)
15400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(writer.send(1), None)
15410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(reader.recv(), 1)
15420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if self.TYPE == 'processes':
15430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(reader.readable, True)
15440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(reader.writable, False)
15450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(writer.readable, False)
15460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(writer.writable, True)
15470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertRaises(IOError, reader.send, 2)
15480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertRaises(IOError, writer.recv)
15490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertRaises(IOError, writer.poll)
15500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
15510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_spawn_close(self):
15520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # We test that a pipe connection can be closed by parent
15530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # process immediately after child is spawned.  On Windows this
15540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # would have sometimes failed on old versions because
15550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # child_conn would be closed before the child got a chance to
15560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # duplicate it.
15570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        conn, child_conn = self.Pipe()
15580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
15590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p = self.Process(target=self._echo, args=(child_conn,))
15600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.daemon = True
15610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.start()
15620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        child_conn.close()    # this might complete before child initializes
15630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
15640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        msg = latin('hello')
15650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        conn.send_bytes(msg)
15660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(conn.recv_bytes(), msg)
15670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
15680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        conn.send_bytes(SENTINEL)
15690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        conn.close()
15700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.join()
15710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
15720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_sendbytes(self):
15730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if self.TYPE != 'processes':
15740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            return
15750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
15760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        msg = latin('abcdefghijklmnopqrstuvwxyz')
15770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        a, b = self.Pipe()
15780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
15790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        a.send_bytes(msg)
15800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(b.recv_bytes(), msg)
15810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
15820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        a.send_bytes(msg, 5)
15830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(b.recv_bytes(), msg[5:])
15840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
15850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        a.send_bytes(msg, 7, 8)
15860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(b.recv_bytes(), msg[7:7+8])
15870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
15880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        a.send_bytes(msg, 26)
15890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(b.recv_bytes(), latin(''))
15900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
15910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        a.send_bytes(msg, 26, 0)
15920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(b.recv_bytes(), latin(''))
15930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
15940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(ValueError, a.send_bytes, msg, 27)
15950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
15960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
15970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
15980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
15990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
16000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(ValueError, a.send_bytes, msg, -1)
16010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
16020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
16030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
16040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @classmethod
16050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _is_fd_assigned(cls, fd):
16060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        try:
16070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            os.fstat(fd)
16080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        except OSError as e:
16090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            if e.errno == errno.EBADF:
16100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                return False
16110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            raise
16120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        else:
16130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            return True
16140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
16150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @classmethod
16160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _writefd(cls, conn, data, create_dummy_fds=False):
16170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if create_dummy_fds:
16180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            for i in range(0, 256):
16190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                if not cls._is_fd_assigned(i):
16200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    os.dup2(conn.fileno(), i)
16210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        fd = reduction.recv_handle(conn)
16220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if msvcrt:
16230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
16240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        os.write(fd, data)
16250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        os.close(fd)
16260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
16270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
16280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_fd_transfer(self):
16290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if self.TYPE != 'processes':
16300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.skipTest("only makes sense with processes")
16310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        conn, child_conn = self.Pipe(duplex=True)
16320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
16330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
16340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.daemon = True
16350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.start()
16360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        with open(test_support.TESTFN, "wb") as f:
16370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            fd = f.fileno()
16380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            if msvcrt:
16390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                fd = msvcrt.get_osfhandle(fd)
16400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            reduction.send_handle(conn, fd, p.pid)
16410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.join()
16420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        with open(test_support.TESTFN, "rb") as f:
16430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(f.read(), b"foo")
16440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
16450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
16460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @unittest.skipIf(sys.platform == "win32",
16470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                     "test semantics don't make sense on Windows")
16480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @unittest.skipIf(MAXFD <= 256,
16490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                     "largest assignable fd number is too small")
16500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @unittest.skipUnless(hasattr(os, "dup2"),
16510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                         "test needs os.dup2()")
16520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_large_fd_transfer(self):
16530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # With fd > 256 (issue #11657)
16540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if self.TYPE != 'processes':
16550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.skipTest("only makes sense with processes")
16560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        conn, child_conn = self.Pipe(duplex=True)
16570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
16580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
16590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.daemon = True
16600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.start()
16610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        with open(test_support.TESTFN, "wb") as f:
16620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            fd = f.fileno()
16630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            for newfd in range(256, MAXFD):
16640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                if not self._is_fd_assigned(newfd):
16650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    break
16660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            else:
16670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                self.fail("could not find an unassigned large file descriptor")
16680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            os.dup2(fd, newfd)
16690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            try:
16700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                reduction.send_handle(conn, newfd, p.pid)
16710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            finally:
16720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                os.close(newfd)
16730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.join()
16740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        with open(test_support.TESTFN, "rb") as f:
16750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(f.read(), b"bar")
16760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
16770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @classmethod
16780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _send_data_without_fd(self, conn):
16790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        os.write(conn.fileno(), b"\0")
16800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
16810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
16820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
16830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_missing_fd_transfer(self):
16840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # Check that exception is raised when received data is not
16850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # accompanied by a file descriptor in ancillary data.
16860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if self.TYPE != 'processes':
16870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.skipTest("only makes sense with processes")
16880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        conn, child_conn = self.Pipe(duplex=True)
16890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
16900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
16910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.daemon = True
16920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.start()
16930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(RuntimeError, reduction.recv_handle, conn)
16940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.join()
16950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
16960a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass _TestListenerClient(BaseTestCase):
16970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
16980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ALLOWED_TYPES = ('processes', 'threads')
16990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
17000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @classmethod
17010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _test(cls, address):
17020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        conn = cls.connection.Client(address)
17030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        conn.send('hello')
17040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        conn.close()
17050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
17060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_listener_client(self):
17070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for family in self.connection.families:
17080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            l = self.connection.Listener(family=family)
17090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p = self.Process(target=self._test, args=(l.address,))
17100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p.daemon = True
17110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p.start()
17120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            conn = l.accept()
17130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(conn.recv(), 'hello')
17140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p.join()
17150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            l.close()
17160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
17170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_issue14725(self):
17180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        l = self.connection.Listener()
17190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p = self.Process(target=self._test, args=(l.address,))
17200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.daemon = True
17210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.start()
17220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        time.sleep(1)
17230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # On Windows the client process should by now have connected,
17240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # written data and closed the pipe handle by now.  This causes
17250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # ConnectNamdedPipe() to fail with ERROR_NO_DATA.  See Issue
17260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # 14725.
17270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        conn = l.accept()
17280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(conn.recv(), 'hello')
17290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        conn.close()
17300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.join()
17310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        l.close()
17320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
17330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
17340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Test of sending connection and socket objects between processes
17350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
17360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao"""
17370a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass _TestPicklingConnections(BaseTestCase):
17380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
17390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ALLOWED_TYPES = ('processes',)
17400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
17410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _listener(self, conn, families):
17420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for fam in families:
17430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            l = self.connection.Listener(family=fam)
17440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            conn.send(l.address)
17450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            new_conn = l.accept()
17460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            conn.send(new_conn)
17470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
17480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if self.TYPE == 'processes':
17490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            l = socket.socket()
17500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            l.bind(('localhost', 0))
17510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            conn.send(l.getsockname())
17520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            l.listen(1)
17530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            new_conn, addr = l.accept()
17540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            conn.send(new_conn)
17550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
17560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        conn.recv()
17570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
17580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _remote(self, conn):
17590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for (address, msg) in iter(conn.recv, None):
17600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            client = self.connection.Client(address)
17610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            client.send(msg.upper())
17620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            client.close()
17630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
17640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if self.TYPE == 'processes':
17650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            address, msg = conn.recv()
17660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            client = socket.socket()
17670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            client.connect(address)
17680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            client.sendall(msg.upper())
17690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            client.close()
17700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
17710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        conn.close()
17720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
17730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_pickling(self):
17740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        try:
17750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            multiprocessing.allow_connection_pickling()
17760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        except ImportError:
17770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            return
17780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
17790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        families = self.connection.families
17800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
17810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        lconn, lconn0 = self.Pipe()
17820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        lp = self.Process(target=self._listener, args=(lconn0, families))
17830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        lp.daemon = True
17840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        lp.start()
17850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        lconn0.close()
17860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
17870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        rconn, rconn0 = self.Pipe()
17880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        rp = self.Process(target=self._remote, args=(rconn0,))
17890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        rp.daemon = True
17900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        rp.start()
17910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        rconn0.close()
17920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
17930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for fam in families:
17940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            msg = ('This connection uses family %s' % fam).encode('ascii')
17950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            address = lconn.recv()
17960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            rconn.send((address, msg))
17970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            new_conn = lconn.recv()
17980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(new_conn.recv(), msg.upper())
17990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
18000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        rconn.send(None)
18010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
18020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if self.TYPE == 'processes':
18030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            msg = latin('This connection uses a normal socket')
18040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            address = lconn.recv()
18050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            rconn.send((address, msg))
18060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            if hasattr(socket, 'fromfd'):
18070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                new_conn = lconn.recv()
18080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                self.assertEqual(new_conn.recv(100), msg.upper())
18090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            else:
18100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                # XXX On Windows with Py2.6 need to backport fromfd()
18110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                discard = lconn.recv_bytes()
18120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
18130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        lconn.send(None)
18140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
18150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        rconn.close()
18160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        lconn.close()
18170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
18180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        lp.join()
18190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        rp.join()
18200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao"""
18210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
18220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
18230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
18240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
18250a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass _TestHeap(BaseTestCase):
18260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
18270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ALLOWED_TYPES = ('processes',)
18280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
18290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_heap(self):
18300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        iterations = 5000
18310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        maxblocks = 50
18320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        blocks = []
18330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
18340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # create and destroy lots of blocks of different sizes
18350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for i in xrange(iterations):
18360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            size = int(random.lognormvariate(0, 1) * 1000)
18370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            b = multiprocessing.heap.BufferWrapper(size)
18380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            blocks.append(b)
18390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            if len(blocks) > maxblocks:
18400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                i = random.randrange(maxblocks)
18410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                del blocks[i]
18420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
18430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # get the heap object
18440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        heap = multiprocessing.heap.BufferWrapper._heap
18450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
18460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # verify the state of the heap
18470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        all = []
18480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        occupied = 0
18490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        heap._lock.acquire()
18500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.addCleanup(heap._lock.release)
18510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for L in heap._len_to_seq.values():
18520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            for arena, start, stop in L:
18530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                all.append((heap._arenas.index(arena), start, stop,
18540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                            stop-start, 'free'))
18550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for arena, start, stop in heap._allocated_blocks:
18560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            all.append((heap._arenas.index(arena), start, stop,
18570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                        stop-start, 'occupied'))
18580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            occupied += (stop-start)
18590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
18600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        all.sort()
18610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
18620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for i in range(len(all)-1):
18630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            (arena, start, stop) = all[i][:3]
18640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            (narena, nstart, nstop) = all[i+1][:3]
18650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertTrue((arena != narena and nstart == 0) or
18660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                            (stop == nstart))
18670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
18680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_free_from_gc(self):
18690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # Check that freeing of blocks by the garbage collector doesn't deadlock
18700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # (issue #12352).
18710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # Make sure the GC is enabled, and set lower collection thresholds to
18720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # make collections more frequent (and increase the probability of
18730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # deadlock).
18740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if not gc.isenabled():
18750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            gc.enable()
18760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.addCleanup(gc.disable)
18770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        thresholds = gc.get_threshold()
18780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.addCleanup(gc.set_threshold, *thresholds)
18790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        gc.set_threshold(10)
18800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
18810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # perform numerous block allocations, with cyclic references to make
18820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # sure objects are collected asynchronously by the gc
18830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for i in range(5000):
18840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            a = multiprocessing.heap.BufferWrapper(1)
18850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            b = multiprocessing.heap.BufferWrapper(1)
18860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            # circular references
18870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            a.buddy = b
18880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            b.buddy = a
18890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
18900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
18910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
18920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
18930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
18940a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass _Foo(Structure):
18950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    _fields_ = [
18960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        ('x', c_int),
18970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        ('y', c_double)
18980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        ]
18990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
19000a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass _TestSharedCTypes(BaseTestCase):
19010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
19020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ALLOWED_TYPES = ('processes',)
19030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
19040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def setUp(self):
19050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if not HAS_SHAREDCTYPES:
19060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.skipTest("requires multiprocessing.sharedctypes")
19070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
19080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @classmethod
19090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _double(cls, x, y, foo, arr, string):
19100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        x.value *= 2
19110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        y.value *= 2
19120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        foo.x *= 2
19130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        foo.y *= 2
19140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        string.value *= 2
19150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for i in range(len(arr)):
19160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            arr[i] *= 2
19170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
19180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_sharedctypes(self, lock=False):
19190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        x = Value('i', 7, lock=lock)
19200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        y = Value(c_double, 1.0/3.0, lock=lock)
19210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        foo = Value(_Foo, 3, 2, lock=lock)
19220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        arr = self.Array('d', range(10), lock=lock)
19230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        string = self.Array('c', 20, lock=lock)
19240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        string.value = latin('hello')
19250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
19260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p = self.Process(target=self._double, args=(x, y, foo, arr, string))
19270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.daemon = True
19280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.start()
19290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.join()
19300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
19310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(x.value, 14)
19320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertAlmostEqual(y.value, 2.0/3.0)
19330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(foo.x, 6)
19340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertAlmostEqual(foo.y, 4.0)
19350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for i in range(10):
19360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertAlmostEqual(arr[i], i*2)
19370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(string.value, latin('hellohello'))
19380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
19390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_synchronize(self):
19400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.test_sharedctypes(lock=True)
19410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
19420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_copy(self):
19430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        foo = _Foo(2, 5.0)
19440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        bar = copy(foo)
19450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        foo.x = 0
19460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        foo.y = 0
19470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(bar.x, 2)
19480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertAlmostEqual(bar.y, 5.0)
19490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
19500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
19510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
19520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
19530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
19540a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass _TestFinalize(BaseTestCase):
19550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
19560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ALLOWED_TYPES = ('processes',)
19570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
19580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @classmethod
19590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _test_finalize(cls, conn):
19600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        class Foo(object):
19610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            pass
19620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
19630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        a = Foo()
19640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        util.Finalize(a, conn.send, args=('a',))
19650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        del a           # triggers callback for a
19660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
19670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        b = Foo()
19680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        close_b = util.Finalize(b, conn.send, args=('b',))
19690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        close_b()       # triggers callback for b
19700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        close_b()       # does nothing because callback has already been called
19710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        del b           # does nothing because callback has already been called
19720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
19730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        c = Foo()
19740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        util.Finalize(c, conn.send, args=('c',))
19750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
19760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        d10 = Foo()
19770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
19780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
19790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        d01 = Foo()
19800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
19810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        d02 = Foo()
19820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
19830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        d03 = Foo()
19840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
19850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
19860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
19870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
19880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
19890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
19900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # call multiprocessing's cleanup function then exit process without
19910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # garbage collecting locals
19920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        util._exit_function()
19930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        conn.close()
19940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        os._exit(0)
19950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
19960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_finalize(self):
19970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        conn, child_conn = self.Pipe()
19980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
19990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p = self.Process(target=self._test_finalize, args=(child_conn,))
20000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.daemon = True
20010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.start()
20020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.join()
20030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
20040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        result = [obj for obj in iter(conn.recv, 'STOP')]
20050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
20060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
20070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
20080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Test that from ... import * works for each module
20090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
20100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
20110a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass _TestImportStar(BaseTestCase):
20120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
20130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ALLOWED_TYPES = ('processes',)
20140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
20150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_import(self):
20160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        modules = [
20170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            'multiprocessing', 'multiprocessing.connection',
20180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            'multiprocessing.heap', 'multiprocessing.managers',
20190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            'multiprocessing.pool', 'multiprocessing.process',
20200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            'multiprocessing.synchronize', 'multiprocessing.util'
20210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            ]
20220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
20230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if HAS_REDUCTION:
20240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            modules.append('multiprocessing.reduction')
20250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
20260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if c_int is not None:
20270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            # This module requires _ctypes
20280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            modules.append('multiprocessing.sharedctypes')
20290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
20300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for name in modules:
20310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            __import__(name)
20320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            mod = sys.modules[name]
20330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
20340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            for attr in getattr(mod, '__all__', ()):
20350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                self.assertTrue(
20360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    hasattr(mod, attr),
20370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    '%r does not have attribute %r' % (mod, attr)
20380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    )
20390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
20400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
20410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Quick test that logging works -- does not test logging output
20420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
20430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
20440a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass _TestLogging(BaseTestCase):
20450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
20460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ALLOWED_TYPES = ('processes',)
20470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
20480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_enable_logging(self):
20490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        logger = multiprocessing.get_logger()
20500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        logger.setLevel(util.SUBWARNING)
20510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTrue(logger is not None)
20520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        logger.debug('this will not be printed')
20530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        logger.info('nor will this')
20540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        logger.setLevel(LOG_LEVEL)
20550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
20560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @classmethod
20570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _test_level(cls, conn):
20580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        logger = multiprocessing.get_logger()
20590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        conn.send(logger.getEffectiveLevel())
20600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
20610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_level(self):
20620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        LEVEL1 = 32
20630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        LEVEL2 = 37
20640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
20650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        logger = multiprocessing.get_logger()
20660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        root_logger = logging.getLogger()
20670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        root_level = root_logger.level
20680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
20690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        reader, writer = multiprocessing.Pipe(duplex=False)
20700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
20710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        logger.setLevel(LEVEL1)
20720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p = self.Process(target=self._test_level, args=(writer,))
20730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.daemon = True
20740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.start()
20750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(LEVEL1, reader.recv())
20760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
20770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        logger.setLevel(logging.NOTSET)
20780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        root_logger.setLevel(LEVEL2)
20790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p = self.Process(target=self._test_level, args=(writer,))
20800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.daemon = True
20810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.start()
20820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(LEVEL2, reader.recv())
20830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
20840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        root_logger.setLevel(root_level)
20850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        logger.setLevel(level=LOG_LEVEL)
20860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
20870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
20880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# class _TestLoggingProcessName(BaseTestCase):
20890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
20900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#     def handle(self, record):
20910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#         assert record.processName == multiprocessing.current_process().name
20920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#         self.__handled = True
20930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
20940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#     def test_logging(self):
20950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#         handler = logging.Handler()
20960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#         handler.handle = self.handle
20970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#         self.__handled = False
20980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#         # Bypass getLogger() and side-effects
20990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#         logger = logging.getLoggerClass()(
21000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#                 'multiprocessing.test.TestLoggingProcessName')
21010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#         logger.addHandler(handler)
21020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#         logger.propagate = False
21030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
21040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#         logger.warn('foo')
21050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#         assert self.__handled
21060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
21070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
21080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Check that Process.join() retries if os.waitpid() fails with EINTR
21090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
21100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
21110a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass _TestPollEintr(BaseTestCase):
21120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
21130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ALLOWED_TYPES = ('processes',)
21140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
21150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @classmethod
21160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _killer(cls, pid):
21170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        time.sleep(0.5)
21180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        os.kill(pid, signal.SIGUSR1)
21190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
21200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
21210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_poll_eintr(self):
21220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        got_signal = [False]
21230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        def record(*args):
21240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            got_signal[0] = True
21250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        pid = os.getpid()
21260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        oldhandler = signal.signal(signal.SIGUSR1, record)
21270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        try:
21280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            killer = self.Process(target=self._killer, args=(pid,))
21290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            killer.start()
21300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p = self.Process(target=time.sleep, args=(1,))
21310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p.start()
21320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p.join()
21330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertTrue(got_signal[0])
21340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(p.exitcode, 0)
21350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            killer.join()
21360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        finally:
21370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            signal.signal(signal.SIGUSR1, oldhandler)
21380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
21390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
21400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Test to verify handle verification, see issue 3321
21410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
21420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
21430a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass TestInvalidHandle(unittest.TestCase):
21440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
21450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @unittest.skipIf(WIN32, "skipped on Windows")
21460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_invalid_handles(self):
21470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        conn = _multiprocessing.Connection(44977608)
21480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(IOError, conn.poll)
21490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(IOError, _multiprocessing.Connection, -1)
21500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
21510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
21520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Functions used to create test cases from the base ones in this module
21530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
21540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
21550a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef get_attributes(Source, names):
21560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    d = {}
21570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    for name in names:
21580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        obj = getattr(Source, name)
21590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if type(obj) == type(get_attributes):
21600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            obj = staticmethod(obj)
21610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        d[name] = obj
21620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    return d
21630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
21640a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef create_test_cases(Mixin, type):
21650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    result = {}
21660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    glob = globals()
21670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    Type = type.capitalize()
21680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
21690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    for name in glob.keys():
21700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if name.startswith('_Test'):
21710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            base = glob[name]
21720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            if type in base.ALLOWED_TYPES:
21730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                newname = 'With' + Type + name[1:]
21740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                class Temp(base, unittest.TestCase, Mixin):
21750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    pass
21760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                result[newname] = Temp
21770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                Temp.__name__ = newname
21780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                Temp.__module__ = Mixin.__module__
21790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    return result
21800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
21810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
21820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Create test cases
21830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
21840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
21850a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass ProcessesMixin(object):
21860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    TYPE = 'processes'
21870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    Process = multiprocessing.Process
21880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    locals().update(get_attributes(multiprocessing, (
21890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
21900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        'Condition', 'Event', 'Value', 'Array', 'RawValue',
21910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        'RawArray', 'current_process', 'active_children', 'Pipe',
21920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        'connection', 'JoinableQueue', 'Pool'
21930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        )))
21940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
21950a8c90248264a8b26970b4473770bcc3df8515fJosh Gaotestcases_processes = create_test_cases(ProcessesMixin, type='processes')
21960a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoglobals().update(testcases_processes)
21970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
21980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
21990a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass ManagerMixin(object):
22000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    TYPE = 'manager'
22010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    Process = multiprocessing.Process
22020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    manager = object.__new__(multiprocessing.managers.SyncManager)
22030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    locals().update(get_attributes(manager, (
22040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
22050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao       'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
22060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        'Namespace', 'JoinableQueue', 'Pool'
22070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        )))
22080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
22090a8c90248264a8b26970b4473770bcc3df8515fJosh Gaotestcases_manager = create_test_cases(ManagerMixin, type='manager')
22100a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoglobals().update(testcases_manager)
22110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
22120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
22130a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass ThreadsMixin(object):
22140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    TYPE = 'threads'
22150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    Process = multiprocessing.dummy.Process
22160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    locals().update(get_attributes(multiprocessing.dummy, (
22170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
22180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        'Condition', 'Event', 'Value', 'Array', 'current_process',
22190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        'active_children', 'Pipe', 'connection', 'dict', 'list',
22200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        'Namespace', 'JoinableQueue', 'Pool'
22210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        )))
22220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
22230a8c90248264a8b26970b4473770bcc3df8515fJosh Gaotestcases_threads = create_test_cases(ThreadsMixin, type='threads')
22240a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoglobals().update(testcases_threads)
22250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
22260a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass OtherTest(unittest.TestCase):
22270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    # TODO: add more tests for deliver/answer challenge.
22280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_deliver_challenge_auth_failure(self):
22290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        class _FakeConnection(object):
22300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            def recv_bytes(self, size):
22310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                return b'something bogus'
22320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            def send_bytes(self, data):
22330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                pass
22340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(multiprocessing.AuthenticationError,
22350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                          multiprocessing.connection.deliver_challenge,
22360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                          _FakeConnection(), b'abc')
22370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
22380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_answer_challenge_auth_failure(self):
22390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        class _FakeConnection(object):
22400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            def __init__(self):
22410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                self.count = 0
22420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            def recv_bytes(self, size):
22430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                self.count += 1
22440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                if self.count == 1:
22450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    return multiprocessing.connection.CHALLENGE
22460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                elif self.count == 2:
22470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    return b'something bogus'
22480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                return b''
22490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            def send_bytes(self, data):
22500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                pass
22510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(multiprocessing.AuthenticationError,
22520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                          multiprocessing.connection.answer_challenge,
22530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                          _FakeConnection(), b'abc')
22540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
22550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
22560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
22570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
22580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
22590a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef initializer(ns):
22600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ns.test += 1
22610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
22620a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass TestInitializers(unittest.TestCase):
22630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def setUp(self):
22640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.mgr = multiprocessing.Manager()
22650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.ns = self.mgr.Namespace()
22660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.ns.test = 0
22670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
22680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def tearDown(self):
22690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.mgr.shutdown()
22700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
22710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_manager_initializer(self):
22720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        m = multiprocessing.managers.SyncManager()
22730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(TypeError, m.start, 1)
22740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        m.start(initializer, (self.ns,))
22750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(self.ns.test, 1)
22760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        m.shutdown()
22770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
22780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_pool_initializer(self):
22790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
22800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p = multiprocessing.Pool(1, initializer, (self.ns,))
22810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.close()
22820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.join()
22830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(self.ns.test, 1)
22840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
22850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
22860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Issue 5155, 5313, 5331: Test process in processes
22870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
22880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
22890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
22900a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef _ThisSubProcess(q):
22910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    try:
22920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        item = q.get(block=False)
22930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    except Queue.Empty:
22940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        pass
22950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
22960a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef _TestProcess(q):
22970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    queue = multiprocessing.Queue()
22980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
22990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    subProc.daemon = True
23000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    subProc.start()
23010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    subProc.join()
23020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
23030a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef _afunc(x):
23040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    return x*x
23050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
23060a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef pool_in_process():
23070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    pool = multiprocessing.Pool(processes=4)
23080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
23090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
23100a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass _file_like(object):
23110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def __init__(self, delegate):
23120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._delegate = delegate
23130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._pid = None
23140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
23150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @property
23160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def cache(self):
23170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        pid = os.getpid()
23180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # There are no race conditions since fork keeps only the running thread
23190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if pid != self._pid:
23200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._pid = pid
23210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._cache = []
23220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return self._cache
23230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
23240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def write(self, data):
23250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.cache.append(data)
23260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
23270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def flush(self):
23280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._delegate.write(''.join(self.cache))
23290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._cache = []
23300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
23310a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass TestStdinBadfiledescriptor(unittest.TestCase):
23320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
23330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_queue_in_process(self):
23340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        queue = multiprocessing.Queue()
23350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
23360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        proc.start()
23370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        proc.join()
23380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
23390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_pool_in_process(self):
23400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p = multiprocessing.Process(target=pool_in_process)
23410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.start()
23420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.join()
23430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
23440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_flushing(self):
23450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        sio = StringIO()
23460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        flike = _file_like(sio)
23470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        flike.write('foo')
23480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        proc = multiprocessing.Process(target=lambda: flike.flush())
23490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        flike.flush()
23500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        assert sio.getvalue() == 'foo'
23510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
23520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
23530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Test interaction with socket timeouts - see Issue #6056
23540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
23550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
23560a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass TestTimeouts(unittest.TestCase):
23570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @classmethod
23580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _test_timeout(cls, child, address):
23590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        time.sleep(1)
23600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        child.send(123)
23610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        child.close()
23620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        conn = multiprocessing.connection.Client(address)
23630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        conn.send(456)
23640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        conn.close()
23650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
23660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_timeout(self):
23670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        old_timeout = socket.getdefaulttimeout()
23680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        try:
23690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            socket.setdefaulttimeout(0.1)
23700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            parent, child = multiprocessing.Pipe(duplex=True)
23710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            l = multiprocessing.connection.Listener(family='AF_INET')
23720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p = multiprocessing.Process(target=self._test_timeout,
23730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                                        args=(child, l.address))
23740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p.start()
23750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            child.close()
23760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(parent.recv(), 123)
23770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            parent.close()
23780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            conn = l.accept()
23790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual(conn.recv(), 456)
23800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            conn.close()
23810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            l.close()
23820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p.join(10)
23830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        finally:
23840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            socket.setdefaulttimeout(old_timeout)
23850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
23860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
23870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Test what happens with no "if __name__ == '__main__'"
23880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
23890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
23900a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass TestNoForkBomb(unittest.TestCase):
23910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_noforkbomb(self):
23920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
23930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if WIN32:
23940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            rc, out, err = test.script_helper.assert_python_failure(name)
23950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual('', out.decode('ascii'))
23960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertIn('RuntimeError', err.decode('ascii'))
23970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        else:
23980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            rc, out, err = test.script_helper.assert_python_ok(name)
23990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual('123', out.decode('ascii').rstrip())
24000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertEqual('', err.decode('ascii'))
24010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
24020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
24030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Issue 12098: check sys.flags of child matches that for parent
24040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
24050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
24060a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass TestFlags(unittest.TestCase):
24070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @classmethod
24080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def run_in_grandchild(cls, conn):
24090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        conn.send(tuple(sys.flags))
24100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
24110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @classmethod
24120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def run_in_child(cls):
24130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        import json
24140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        r, w = multiprocessing.Pipe(duplex=False)
24150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
24160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.start()
24170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        grandchild_flags = r.recv()
24180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.join()
24190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        r.close()
24200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        w.close()
24210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        flags = (tuple(sys.flags), grandchild_flags)
24220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        print(json.dumps(flags))
24230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
24240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_flags(self):
24250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        import json, subprocess
24260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # start child process using unusual flags
24270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        prog = ('from test.test_multiprocessing import TestFlags; ' +
24280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                'TestFlags.run_in_child()')
24290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        data = subprocess.check_output(
24300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            [sys.executable, '-E', '-B', '-O', '-c', prog])
24310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        child_flags, grandchild_flags = json.loads(data.decode('ascii'))
24320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(child_flags, grandchild_flags)
24330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
24340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
24350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Issue #17555: ForkAwareThreadLock
24360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
24370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
24380a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass TestForkAwareThreadLock(unittest.TestCase):
24390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    # We recurisvely start processes.  Issue #17555 meant that the
24400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    # after fork registry would get duplicate entries for the same
24410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    # lock.  The size of the registry at generation n was ~2**n.
24420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
24430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @classmethod
24440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def child(cls, n, conn):
24450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if n > 1:
24460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
24470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p.start()
24480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p.join()
24490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        else:
24500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            conn.send(len(util._afterfork_registry))
24510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        conn.close()
24520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
24530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_lock(self):
24540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        r, w = multiprocessing.Pipe(False)
24550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        l = util.ForkAwareThreadLock()
24560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        old_size = len(util._afterfork_registry)
24570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p = multiprocessing.Process(target=self.child, args=(5, w))
24580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.start()
24590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        new_size = r.recv()
24600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        p.join()
24610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertLessEqual(new_size, old_size)
24620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
24630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
24640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
24650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
24660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
24670a8c90248264a8b26970b4473770bcc3df8515fJosh Gaotestcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
24680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                   TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb,
24690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                   TestFlags, TestForkAwareThreadLock]
24700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
24710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
24720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
24730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
24740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
24750a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef test_main(run=None):
24760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    if sys.platform.startswith("linux"):
24770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        try:
24780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            lock = multiprocessing.RLock()
24790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        except OSError:
24800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
24810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
24820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    check_enough_semaphores()
24830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
24840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    if run is None:
24850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        from test.test_support import run_unittest as run
24860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
24870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    util.get_temp_dir()     # creates temp directory for use by all processes
24880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
24890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    multiprocessing.get_logger().setLevel(LOG_LEVEL)
24900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
24910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ProcessesMixin.pool = multiprocessing.Pool(4)
24920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
24930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ManagerMixin.manager.__init__()
24940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ManagerMixin.manager.start()
24950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ManagerMixin.pool = ManagerMixin.manager.Pool(4)
24960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
24970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    testcases = (
24980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
24990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
25000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
25010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        testcases_other
25020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        )
25030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
25040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
25050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
25060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
25070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    # module during these tests is at least platform dependent and possibly
25080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    # non-deterministic on any given platform. So we don't mind if the listed
25090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    # warnings aren't actually raised.
25100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    with test_support.check_py3k_warnings(
25110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            (".+__(get|set)slice__ has been removed", DeprecationWarning),
25120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            (r"sys.exc_clear\(\) not supported", DeprecationWarning),
25130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            quiet=True):
25140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        run(suite)
25150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
25160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ThreadsMixin.pool.terminate()
25170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ProcessesMixin.pool.terminate()
25180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ManagerMixin.pool.terminate()
25190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ManagerMixin.manager.shutdown()
25200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
25210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
25220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
25230a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef main():
25240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    test_main(unittest.TextTestRunner(verbosity=2).run)
25250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
25260a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoif __name__ == '__main__':
25270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    main()
2528