xdrlib.py revision b921a84405a9c9db7c01ca19533bd98556c7375c
1"""Implements (a subset of) Sun XDR -- eXternal Data Representation. 2 3See: RFC 1014 4 5""" 6 7import struct 8try: 9 from cStringIO import StringIO as _StringIO 10except ImportError: 11 from StringIO import StringIO as _StringIO 12 13__all__ = ["Error", "Packer", "Unpacker", "ConversionError"] 14 15# exceptions 16class Error(Exception): 17 """Exception class for this module. Use: 18 19 except xdrlib.Error, var: 20 # var has the Error instance for the exception 21 22 Public ivars: 23 msg -- contains the message 24 25 """ 26 def __init__(self, msg): 27 self.msg = msg 28 def __repr__(self): 29 return repr(self.msg) 30 def __str__(self): 31 return str(self.msg) 32 33 34class ConversionError(Error): 35 pass 36 37 38 39class Packer: 40 """Pack various data representations into a buffer.""" 41 42 def __init__(self): 43 self.reset() 44 45 def reset(self): 46 self.__buf = _StringIO() 47 48 def get_buffer(self): 49 return self.__buf.getvalue() 50 # backwards compatibility 51 get_buf = get_buffer 52 53 def pack_uint(self, x): 54 self.__buf.write(struct.pack('>L', x)) 55 56 pack_int = pack_uint 57 pack_enum = pack_int 58 59 def pack_bool(self, x): 60 if x: self.__buf.write('\0\0\0\1') 61 else: self.__buf.write('\0\0\0\0') 62 63 def pack_uhyper(self, x): 64 self.pack_uint(x>>32 & 0xffffffffL) 65 self.pack_uint(x & 0xffffffffL) 66 67 pack_hyper = pack_uhyper 68 69 def pack_float(self, x): 70 try: self.__buf.write(struct.pack('>f', x)) 71 except struct.error, msg: 72 raise ConversionError, msg 73 74 def pack_double(self, x): 75 try: self.__buf.write(struct.pack('>d', x)) 76 except struct.error, msg: 77 raise ConversionError, msg 78 79 def pack_fstring(self, n, s): 80 if n < 0: 81 raise ValueError, 'fstring size must be nonnegative' 82 data = s[:n] 83 n = ((n+3)//4)*4 84 data = data + (n - len(data)) * '\0' 85 self.__buf.write(data) 86 87 pack_fopaque = pack_fstring 88 89 def pack_string(self, s): 90 n = len(s) 91 self.pack_uint(n) 92 self.pack_fstring(n, s) 93 94 pack_opaque = pack_string 95 pack_bytes = pack_string 96 97 def pack_list(self, list, pack_item): 98 for item in list: 99 self.pack_uint(1) 100 pack_item(item) 101 self.pack_uint(0) 102 103 def pack_farray(self, n, list, pack_item): 104 if len(list) != n: 105 raise ValueError, 'wrong array size' 106 for item in list: 107 pack_item(item) 108 109 def pack_array(self, list, pack_item): 110 n = len(list) 111 self.pack_uint(n) 112 self.pack_farray(n, list, pack_item) 113 114 115 116class Unpacker: 117 """Unpacks various data representations from the given buffer.""" 118 119 def __init__(self, data): 120 self.reset(data) 121 122 def reset(self, data): 123 self.__buf = data 124 self.__pos = 0 125 126 def get_position(self): 127 return self.__pos 128 129 def set_position(self, position): 130 self.__pos = position 131 132 def get_buffer(self): 133 return self.__buf 134 135 def done(self): 136 if self.__pos < len(self.__buf): 137 raise Error('unextracted data remains') 138 139 def unpack_uint(self): 140 i = self.__pos 141 self.__pos = j = i+4 142 data = self.__buf[i:j] 143 if len(data) < 4: 144 raise EOFError 145 x = struct.unpack('>L', data)[0] 146 try: 147 return int(x) 148 except OverflowError: 149 return x 150 151 def unpack_int(self): 152 i = self.__pos 153 self.__pos = j = i+4 154 data = self.__buf[i:j] 155 if len(data) < 4: 156 raise EOFError 157 return struct.unpack('>l', data)[0] 158 159 unpack_enum = unpack_int 160 161 def unpack_bool(self): 162 return bool(self.unpack_int()) 163 164 def unpack_uhyper(self): 165 hi = self.unpack_uint() 166 lo = self.unpack_uint() 167 return long(hi)<<32 | lo 168 169 def unpack_hyper(self): 170 x = self.unpack_uhyper() 171 if x >= 0x8000000000000000L: 172 x = x - 0x10000000000000000L 173 return x 174 175 def unpack_float(self): 176 i = self.__pos 177 self.__pos = j = i+4 178 data = self.__buf[i:j] 179 if len(data) < 4: 180 raise EOFError 181 return struct.unpack('>f', data)[0] 182 183 def unpack_double(self): 184 i = self.__pos 185 self.__pos = j = i+8 186 data = self.__buf[i:j] 187 if len(data) < 8: 188 raise EOFError 189 return struct.unpack('>d', data)[0] 190 191 def unpack_fstring(self, n): 192 if n < 0: 193 raise ValueError, 'fstring size must be nonnegative' 194 i = self.__pos 195 j = i + (n+3)//4*4 196 if j > len(self.__buf): 197 raise EOFError 198 self.__pos = j 199 return self.__buf[i:i+n] 200 201 unpack_fopaque = unpack_fstring 202 203 def unpack_string(self): 204 n = self.unpack_uint() 205 return self.unpack_fstring(n) 206 207 unpack_opaque = unpack_string 208 unpack_bytes = unpack_string 209 210 def unpack_list(self, unpack_item): 211 list = [] 212 while 1: 213 x = self.unpack_uint() 214 if x == 0: break 215 if x != 1: 216 raise ConversionError, '0 or 1 expected, got %r' % (x,) 217 item = unpack_item() 218 list.append(item) 219 return list 220 221 def unpack_farray(self, n, unpack_item): 222 list = [] 223 for i in range(n): 224 list.append(unpack_item()) 225 return list 226 227 def unpack_array(self, unpack_item): 228 n = self.unpack_uint() 229 return self.unpack_farray(n, unpack_item) 230 231 232# test suite 233def _test(): 234 p = Packer() 235 packtest = [ 236 (p.pack_uint, (9,)), 237 (p.pack_bool, (True,)), 238 (p.pack_bool, (False,)), 239 (p.pack_uhyper, (45L,)), 240 (p.pack_float, (1.9,)), 241 (p.pack_double, (1.9,)), 242 (p.pack_string, ('hello world',)), 243 (p.pack_list, (range(5), p.pack_uint)), 244 (p.pack_array, (['what', 'is', 'hapnin', 'doctor'], p.pack_string)), 245 ] 246 succeedlist = [1] * len(packtest) 247 count = 0 248 for method, args in packtest: 249 print 'pack test', count, 250 try: 251 method(*args) 252 print 'succeeded' 253 except ConversionError, var: 254 print 'ConversionError:', var.msg 255 succeedlist[count] = 0 256 count = count + 1 257 data = p.get_buffer() 258 # now verify 259 up = Unpacker(data) 260 unpacktest = [ 261 (up.unpack_uint, (), lambda x: x == 9), 262 (up.unpack_bool, (), lambda x: x is True), 263 (up.unpack_bool, (), lambda x: x is False), 264 (up.unpack_uhyper, (), lambda x: x == 45L), 265 (up.unpack_float, (), lambda x: 1.89 < x < 1.91), 266 (up.unpack_double, (), lambda x: 1.89 < x < 1.91), 267 (up.unpack_string, (), lambda x: x == 'hello world'), 268 (up.unpack_list, (up.unpack_uint,), lambda x: x == range(5)), 269 (up.unpack_array, (up.unpack_string,), 270 lambda x: x == ['what', 'is', 'hapnin', 'doctor']), 271 ] 272 count = 0 273 for method, args, pred in unpacktest: 274 print 'unpack test', count, 275 try: 276 if succeedlist[count]: 277 x = method(*args) 278 print pred(x) and 'succeeded' or 'failed', ':', x 279 else: 280 print 'skipping' 281 except ConversionError, var: 282 print 'ConversionError:', var.msg 283 count = count + 1 284 285 286if __name__ == '__main__': 287 _test() 288