1#
2# A test file for the `multiprocessing` package
3#
4# Copyright (c) 2006-2008, R Oudkerk
5# All rights reserved.
6#
7
8import time, sys, random
9from Queue import Empty
10
11import multiprocessing               # may get overwritten
12
13
14#### TEST_VALUE
15
16def value_func(running, mutex):
17    random.seed()
18    time.sleep(random.random()*4)
19
20    mutex.acquire()
21    print '\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished'
22    running.value -= 1
23    mutex.release()
24
25def test_value():
26    TASKS = 10
27    running = multiprocessing.Value('i', TASKS)
28    mutex = multiprocessing.Lock()
29
30    for i in range(TASKS):
31        p = multiprocessing.Process(target=value_func, args=(running, mutex))
32        p.start()
33
34    while running.value > 0:
35        time.sleep(0.08)
36        mutex.acquire()
37        print running.value,
38        sys.stdout.flush()
39        mutex.release()
40
41    print
42    print 'No more running processes'
43
44
45#### TEST_QUEUE
46
47def queue_func(queue):
48    for i in range(30):
49        time.sleep(0.5 * random.random())
50        queue.put(i*i)
51    queue.put('STOP')
52
53def test_queue():
54    q = multiprocessing.Queue()
55
56    p = multiprocessing.Process(target=queue_func, args=(q,))
57    p.start()
58
59    o = None
60    while o != 'STOP':
61        try:
62            o = q.get(timeout=0.3)
63            print o,
64            sys.stdout.flush()
65        except Empty:
66            print 'TIMEOUT'
67
68    print
69
70
71#### TEST_CONDITION
72
73def condition_func(cond):
74    cond.acquire()
75    print '\t' + str(cond)
76    time.sleep(2)
77    print '\tchild is notifying'
78    print '\t' + str(cond)
79    cond.notify()
80    cond.release()
81
82def test_condition():
83    cond = multiprocessing.Condition()
84
85    p = multiprocessing.Process(target=condition_func, args=(cond,))
86    print cond
87
88    cond.acquire()
89    print cond
90    cond.acquire()
91    print cond
92
93    p.start()
94
95    print 'main is waiting'
96    cond.wait()
97    print 'main has woken up'
98
99    print cond
100    cond.release()
101    print cond
102    cond.release()
103
104    p.join()
105    print cond
106
107
108#### TEST_SEMAPHORE
109
110def semaphore_func(sema, mutex, running):
111    sema.acquire()
112
113    mutex.acquire()
114    running.value += 1
115    print running.value, 'tasks are running'
116    mutex.release()
117
118    random.seed()
119    time.sleep(random.random()*2)
120
121    mutex.acquire()
122    running.value -= 1
123    print '%s has finished' % multiprocessing.current_process()
124    mutex.release()
125
126    sema.release()
127
128def test_semaphore():
129    sema = multiprocessing.Semaphore(3)
130    mutex = multiprocessing.RLock()
131    running = multiprocessing.Value('i', 0)
132
133    processes = [
134        multiprocessing.Process(target=semaphore_func,
135                                args=(sema, mutex, running))
136        for i in range(10)
137        ]
138
139    for p in processes:
140        p.start()
141
142    for p in processes:
143        p.join()
144
145
146#### TEST_JOIN_TIMEOUT
147
148def join_timeout_func():
149    print '\tchild sleeping'
150    time.sleep(5.5)
151    print '\n\tchild terminating'
152
153def test_join_timeout():
154    p = multiprocessing.Process(target=join_timeout_func)
155    p.start()
156
157    print 'waiting for process to finish'
158
159    while 1:
160        p.join(timeout=1)
161        if not p.is_alive():
162            break
163        print '.',
164        sys.stdout.flush()
165
166
167#### TEST_EVENT
168
169def event_func(event):
170    print '\t%r is waiting' % multiprocessing.current_process()
171    event.wait()
172    print '\t%r has woken up' % multiprocessing.current_process()
173
174def test_event():
175    event = multiprocessing.Event()
176
177    processes = [multiprocessing.Process(target=event_func, args=(event,))
178                 for i in range(5)]
179
180    for p in processes:
181        p.start()
182
183    print 'main is sleeping'
184    time.sleep(2)
185
186    print 'main is setting event'
187    event.set()
188
189    for p in processes:
190        p.join()
191
192
193#### TEST_SHAREDVALUES
194
195def sharedvalues_func(values, arrays, shared_values, shared_arrays):
196    for i in range(len(values)):
197        v = values[i][1]
198        sv = shared_values[i].value
199        assert v == sv
200
201    for i in range(len(values)):
202        a = arrays[i][1]
203        sa = list(shared_arrays[i][:])
204        assert a == sa
205
206    print 'Tests passed'
207
208def test_sharedvalues():
209    values = [
210        ('i', 10),
211        ('h', -2),
212        ('d', 1.25)
213        ]
214    arrays = [
215        ('i', range(100)),
216        ('d', [0.25 * i for i in range(100)]),
217        ('H', range(1000))
218        ]
219
220    shared_values = [multiprocessing.Value(id, v) for id, v in values]
221    shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]
222
223    p = multiprocessing.Process(
224        target=sharedvalues_func,
225        args=(values, arrays, shared_values, shared_arrays)
226        )
227    p.start()
228    p.join()
229
230    assert p.exitcode == 0
231
232
233####
234
235def test(namespace=multiprocessing):
236    global multiprocessing
237
238    multiprocessing = namespace
239
240    for func in [ test_value, test_queue, test_condition,
241                  test_semaphore, test_join_timeout, test_event,
242                  test_sharedvalues ]:
243
244        print '\n\t######## %s\n' % func.__name__
245        func()
246
247    ignore = multiprocessing.active_children()      # cleanup any old processes
248    if hasattr(multiprocessing, '_debug_info'):
249        info = multiprocessing._debug_info()
250        if info:
251            print info
252            raise ValueError('there should be no positive refcounts left')
253
254
255if __name__ == '__main__':
256    multiprocessing.freeze_support()
257
258    assert len(sys.argv) in (1, 2)
259
260    if len(sys.argv) == 1 or sys.argv[1] == 'processes':
261        print ' Using processes '.center(79, '-')
262        namespace = multiprocessing
263    elif sys.argv[1] == 'manager':
264        print ' Using processes and a manager '.center(79, '-')
265        namespace = multiprocessing.Manager()
266        namespace.Process = multiprocessing.Process
267        namespace.current_process = multiprocessing.current_process
268        namespace.active_children = multiprocessing.active_children
269    elif sys.argv[1] == 'threads':
270        print ' Using threads '.center(79, '-')
271        import multiprocessing.dummy as namespace
272    else:
273        print 'Usage:\n\t%s [processes | manager | threads]' % sys.argv[0]
274        raise SystemExit(2)
275
276    test(namespace)
277