1#
2# Module which supports allocation of ctypes objects from shared memory
3#
4# multiprocessing/sharedctypes.py
5#
6# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14#    notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16#    notice, this list of conditions and the following disclaimer in the
17#    documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19#    used to endorse or promote products derived from this software
20#    without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
33#
34
35import sys
36import ctypes
37import weakref
38
39from multiprocessing import heap, RLock
40from multiprocessing.forking import assert_spawning, ForkingPickler
41
42__all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
43
44#
45#
46#
47
48typecode_to_type = {
49    'c': ctypes.c_char,  'u': ctypes.c_wchar,
50    'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
51    'h': ctypes.c_short, 'H': ctypes.c_ushort,
52    'i': ctypes.c_int,   'I': ctypes.c_uint,
53    'l': ctypes.c_long,  'L': ctypes.c_ulong,
54    'f': ctypes.c_float, 'd': ctypes.c_double
55    }
56
57#
58#
59#
60
61def _new_value(type_):
62    size = ctypes.sizeof(type_)
63    wrapper = heap.BufferWrapper(size)
64    return rebuild_ctype(type_, wrapper, None)
65
66def RawValue(typecode_or_type, *args):
67    '''
68    Returns a ctypes object allocated from shared memory
69    '''
70    type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
71    obj = _new_value(type_)
72    ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
73    obj.__init__(*args)
74    return obj
75
76def RawArray(typecode_or_type, size_or_initializer):
77    '''
78    Returns a ctypes array allocated from shared memory
79    '''
80    type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
81    if isinstance(size_or_initializer, (int, long)):
82        type_ = type_ * size_or_initializer
83        obj = _new_value(type_)
84        ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
85        return obj
86    else:
87        type_ = type_ * len(size_or_initializer)
88        result = _new_value(type_)
89        result.__init__(*size_or_initializer)
90        return result
91
92def Value(typecode_or_type, *args, **kwds):
93    '''
94    Return a synchronization wrapper for a Value
95    '''
96    lock = kwds.pop('lock', None)
97    if kwds:
98        raise ValueError('unrecognized keyword argument(s): %s' % kwds.keys())
99    obj = RawValue(typecode_or_type, *args)
100    if lock is False:
101        return obj
102    if lock in (True, None):
103        lock = RLock()
104    if not hasattr(lock, 'acquire'):
105        raise AttributeError("'%r' has no method 'acquire'" % lock)
106    return synchronized(obj, lock)
107
108def Array(typecode_or_type, size_or_initializer, **kwds):
109    '''
110    Return a synchronization wrapper for a RawArray
111    '''
112    lock = kwds.pop('lock', None)
113    if kwds:
114        raise ValueError('unrecognized keyword argument(s): %s' % kwds.keys())
115    obj = RawArray(typecode_or_type, size_or_initializer)
116    if lock is False:
117        return obj
118    if lock in (True, None):
119        lock = RLock()
120    if not hasattr(lock, 'acquire'):
121        raise AttributeError("'%r' has no method 'acquire'" % lock)
122    return synchronized(obj, lock)
123
124def copy(obj):
125    new_obj = _new_value(type(obj))
126    ctypes.pointer(new_obj)[0] = obj
127    return new_obj
128
129def synchronized(obj, lock=None):
130    assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
131
132    if isinstance(obj, ctypes._SimpleCData):
133        return Synchronized(obj, lock)
134    elif isinstance(obj, ctypes.Array):
135        if obj._type_ is ctypes.c_char:
136            return SynchronizedString(obj, lock)
137        return SynchronizedArray(obj, lock)
138    else:
139        cls = type(obj)
140        try:
141            scls = class_cache[cls]
142        except KeyError:
143            names = [field[0] for field in cls._fields_]
144            d = dict((name, make_property(name)) for name in names)
145            classname = 'Synchronized' + cls.__name__
146            scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
147        return scls(obj, lock)
148
149#
150# Functions for pickling/unpickling
151#
152
153def reduce_ctype(obj):
154    assert_spawning(obj)
155    if isinstance(obj, ctypes.Array):
156        return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_)
157    else:
158        return rebuild_ctype, (type(obj), obj._wrapper, None)
159
160def rebuild_ctype(type_, wrapper, length):
161    if length is not None:
162        type_ = type_ * length
163    ForkingPickler.register(type_, reduce_ctype)
164    obj = type_.from_address(wrapper.get_address())
165    obj._wrapper = wrapper
166    return obj
167
168#
169# Function to create properties
170#
171
172def make_property(name):
173    try:
174        return prop_cache[name]
175    except KeyError:
176        d = {}
177        exec template % ((name,)*7) in d
178        prop_cache[name] = d[name]
179        return d[name]
180
181template = '''
182def get%s(self):
183    self.acquire()
184    try:
185        return self._obj.%s
186    finally:
187        self.release()
188def set%s(self, value):
189    self.acquire()
190    try:
191        self._obj.%s = value
192    finally:
193        self.release()
194%s = property(get%s, set%s)
195'''
196
197prop_cache = {}
198class_cache = weakref.WeakKeyDictionary()
199
200#
201# Synchronized wrappers
202#
203
204class SynchronizedBase(object):
205
206    def __init__(self, obj, lock=None):
207        self._obj = obj
208        self._lock = lock or RLock()
209        self.acquire = self._lock.acquire
210        self.release = self._lock.release
211
212    def __reduce__(self):
213        assert_spawning(self)
214        return synchronized, (self._obj, self._lock)
215
216    def get_obj(self):
217        return self._obj
218
219    def get_lock(self):
220        return self._lock
221
222    def __repr__(self):
223        return '<%s wrapper for %s>' % (type(self).__name__, self._obj)
224
225
226class Synchronized(SynchronizedBase):
227    value = make_property('value')
228
229
230class SynchronizedArray(SynchronizedBase):
231
232    def __len__(self):
233        return len(self._obj)
234
235    def __getitem__(self, i):
236        self.acquire()
237        try:
238            return self._obj[i]
239        finally:
240            self.release()
241
242    def __setitem__(self, i, value):
243        self.acquire()
244        try:
245            self._obj[i] = value
246        finally:
247            self.release()
248
249    def __getslice__(self, start, stop):
250        self.acquire()
251        try:
252            return self._obj[start:stop]
253        finally:
254            self.release()
255
256    def __setslice__(self, start, stop, values):
257        self.acquire()
258        try:
259            self._obj[start:stop] = values
260        finally:
261            self.release()
262
263
264class SynchronizedString(SynchronizedArray):
265    value = make_property('value')
266    raw = make_property('raw')
267