xdrlib.py revision b921a84405a9c9db7c01ca19533bd98556c7375c
1"""Implements (a subset of) Sun XDR -- eXternal Data Representation.
2
3See: RFC 1014
4
5"""
6
7import struct
8try:
9    from cStringIO import StringIO as _StringIO
10except ImportError:
11    from StringIO import StringIO as _StringIO
12
13__all__ = ["Error", "Packer", "Unpacker", "ConversionError"]
14
15# exceptions
16class Error(Exception):
17    """Exception class for this module. Use:
18
19    except xdrlib.Error, var:
20        # var has the Error instance for the exception
21
22    Public ivars:
23        msg -- contains the message
24
25    """
26    def __init__(self, msg):
27        self.msg = msg
28    def __repr__(self):
29        return repr(self.msg)
30    def __str__(self):
31        return str(self.msg)
32
33
34class ConversionError(Error):
35    pass
36
37
38
39class Packer:
40    """Pack various data representations into a buffer."""
41
42    def __init__(self):
43        self.reset()
44
45    def reset(self):
46        self.__buf = _StringIO()
47
48    def get_buffer(self):
49        return self.__buf.getvalue()
50    # backwards compatibility
51    get_buf = get_buffer
52
53    def pack_uint(self, x):
54        self.__buf.write(struct.pack('>L', x))
55
56    pack_int = pack_uint
57    pack_enum = pack_int
58
59    def pack_bool(self, x):
60        if x: self.__buf.write('\0\0\0\1')
61        else: self.__buf.write('\0\0\0\0')
62
63    def pack_uhyper(self, x):
64        self.pack_uint(x>>32 & 0xffffffffL)
65        self.pack_uint(x & 0xffffffffL)
66
67    pack_hyper = pack_uhyper
68
69    def pack_float(self, x):
70        try: self.__buf.write(struct.pack('>f', x))
71        except struct.error, msg:
72            raise ConversionError, msg
73
74    def pack_double(self, x):
75        try: self.__buf.write(struct.pack('>d', x))
76        except struct.error, msg:
77            raise ConversionError, msg
78
79    def pack_fstring(self, n, s):
80        if n < 0:
81            raise ValueError, 'fstring size must be nonnegative'
82        data = s[:n]
83        n = ((n+3)//4)*4
84        data = data + (n - len(data)) * '\0'
85        self.__buf.write(data)
86
87    pack_fopaque = pack_fstring
88
89    def pack_string(self, s):
90        n = len(s)
91        self.pack_uint(n)
92        self.pack_fstring(n, s)
93
94    pack_opaque = pack_string
95    pack_bytes = pack_string
96
97    def pack_list(self, list, pack_item):
98        for item in list:
99            self.pack_uint(1)
100            pack_item(item)
101        self.pack_uint(0)
102
103    def pack_farray(self, n, list, pack_item):
104        if len(list) != n:
105            raise ValueError, 'wrong array size'
106        for item in list:
107            pack_item(item)
108
109    def pack_array(self, list, pack_item):
110        n = len(list)
111        self.pack_uint(n)
112        self.pack_farray(n, list, pack_item)
113
114
115
116class Unpacker:
117    """Unpacks various data representations from the given buffer."""
118
119    def __init__(self, data):
120        self.reset(data)
121
122    def reset(self, data):
123        self.__buf = data
124        self.__pos = 0
125
126    def get_position(self):
127        return self.__pos
128
129    def set_position(self, position):
130        self.__pos = position
131
132    def get_buffer(self):
133        return self.__buf
134
135    def done(self):
136        if self.__pos < len(self.__buf):
137            raise Error('unextracted data remains')
138
139    def unpack_uint(self):
140        i = self.__pos
141        self.__pos = j = i+4
142        data = self.__buf[i:j]
143        if len(data) < 4:
144            raise EOFError
145        x = struct.unpack('>L', data)[0]
146        try:
147            return int(x)
148        except OverflowError:
149            return x
150
151    def unpack_int(self):
152        i = self.__pos
153        self.__pos = j = i+4
154        data = self.__buf[i:j]
155        if len(data) < 4:
156            raise EOFError
157        return struct.unpack('>l', data)[0]
158
159    unpack_enum = unpack_int
160
161    def unpack_bool(self):
162        return bool(self.unpack_int())
163
164    def unpack_uhyper(self):
165        hi = self.unpack_uint()
166        lo = self.unpack_uint()
167        return long(hi)<<32 | lo
168
169    def unpack_hyper(self):
170        x = self.unpack_uhyper()
171        if x >= 0x8000000000000000L:
172            x = x - 0x10000000000000000L
173        return x
174
175    def unpack_float(self):
176        i = self.__pos
177        self.__pos = j = i+4
178        data = self.__buf[i:j]
179        if len(data) < 4:
180            raise EOFError
181        return struct.unpack('>f', data)[0]
182
183    def unpack_double(self):
184        i = self.__pos
185        self.__pos = j = i+8
186        data = self.__buf[i:j]
187        if len(data) < 8:
188            raise EOFError
189        return struct.unpack('>d', data)[0]
190
191    def unpack_fstring(self, n):
192        if n < 0:
193            raise ValueError, 'fstring size must be nonnegative'
194        i = self.__pos
195        j = i + (n+3)//4*4
196        if j > len(self.__buf):
197            raise EOFError
198        self.__pos = j
199        return self.__buf[i:i+n]
200
201    unpack_fopaque = unpack_fstring
202
203    def unpack_string(self):
204        n = self.unpack_uint()
205        return self.unpack_fstring(n)
206
207    unpack_opaque = unpack_string
208    unpack_bytes = unpack_string
209
210    def unpack_list(self, unpack_item):
211        list = []
212        while 1:
213            x = self.unpack_uint()
214            if x == 0: break
215            if x != 1:
216                raise ConversionError, '0 or 1 expected, got %r' % (x,)
217            item = unpack_item()
218            list.append(item)
219        return list
220
221    def unpack_farray(self, n, unpack_item):
222        list = []
223        for i in range(n):
224            list.append(unpack_item())
225        return list
226
227    def unpack_array(self, unpack_item):
228        n = self.unpack_uint()
229        return self.unpack_farray(n, unpack_item)
230
231
232# test suite
233def _test():
234    p = Packer()
235    packtest = [
236        (p.pack_uint,    (9,)),
237        (p.pack_bool,    (True,)),
238        (p.pack_bool,    (False,)),
239        (p.pack_uhyper,  (45L,)),
240        (p.pack_float,   (1.9,)),
241        (p.pack_double,  (1.9,)),
242        (p.pack_string,  ('hello world',)),
243        (p.pack_list,    (range(5), p.pack_uint)),
244        (p.pack_array,   (['what', 'is', 'hapnin', 'doctor'], p.pack_string)),
245        ]
246    succeedlist = [1] * len(packtest)
247    count = 0
248    for method, args in packtest:
249        print 'pack test', count,
250        try:
251            method(*args)
252            print 'succeeded'
253        except ConversionError, var:
254            print 'ConversionError:', var.msg
255            succeedlist[count] = 0
256        count = count + 1
257    data = p.get_buffer()
258    # now verify
259    up = Unpacker(data)
260    unpacktest = [
261        (up.unpack_uint,   (), lambda x: x == 9),
262        (up.unpack_bool,   (), lambda x: x is True),
263        (up.unpack_bool,   (), lambda x: x is False),
264        (up.unpack_uhyper, (), lambda x: x == 45L),
265        (up.unpack_float,  (), lambda x: 1.89 < x < 1.91),
266        (up.unpack_double, (), lambda x: 1.89 < x < 1.91),
267        (up.unpack_string, (), lambda x: x == 'hello world'),
268        (up.unpack_list,   (up.unpack_uint,), lambda x: x == range(5)),
269        (up.unpack_array,  (up.unpack_string,),
270         lambda x: x == ['what', 'is', 'hapnin', 'doctor']),
271        ]
272    count = 0
273    for method, args, pred in unpacktest:
274        print 'unpack test', count,
275        try:
276            if succeedlist[count]:
277                x = method(*args)
278                print pred(x) and 'succeeded' or 'failed', ':', x
279            else:
280                print 'skipping'
281        except ConversionError, var:
282            print 'ConversionError:', var.msg
283        count = count + 1
284
285
286if __name__ == '__main__':
287    _test()
288