1#! /usr/bin/python 2# -*- coding: utf-8 -*- 3# 4# Protocol Buffers - Google's data interchange format 5# Copyright 2008 Google Inc. All rights reserved. 6# http://code.google.com/p/protobuf/ 7# 8# Redistribution and use in source and binary forms, with or without 9# modification, are permitted provided that the following conditions are 10# met: 11# 12# * Redistributions of source code must retain the above copyright 13# notice, this list of conditions and the following disclaimer. 14# * Redistributions in binary form must reproduce the above 15# copyright notice, this list of conditions and the following disclaimer 16# in the documentation and/or other materials provided with the 17# distribution. 18# * Neither the name of Google Inc. nor the names of its 19# contributors may be used to endorse or promote products derived from 20# this software without specific prior written permission. 21# 22# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 25# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 26# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 27# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 28# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 29# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 30# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 31# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 32# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 33 34"""Test for preservation of unknown fields in the pure Python implementation.""" 35 36__author__ = 'bohdank@google.com (Bohdan Koval)' 37 38import unittest 39from google.protobuf import unittest_mset_pb2 40from google.protobuf import unittest_pb2 41from google.protobuf.internal import encoder 42from google.protobuf.internal import test_util 43from google.protobuf.internal import type_checkers 44 45 46class UnknownFieldsTest(unittest.TestCase): 47 48 def setUp(self): 49 self.descriptor = unittest_pb2.TestAllTypes.DESCRIPTOR 50 self.all_fields = unittest_pb2.TestAllTypes() 51 test_util.SetAllFields(self.all_fields) 52 self.all_fields_data = self.all_fields.SerializeToString() 53 self.empty_message = unittest_pb2.TestEmptyMessage() 54 self.empty_message.ParseFromString(self.all_fields_data) 55 self.unknown_fields = self.empty_message._unknown_fields 56 57 def GetField(self, name): 58 field_descriptor = self.descriptor.fields_by_name[name] 59 wire_type = type_checkers.FIELD_TYPE_TO_WIRE_TYPE[field_descriptor.type] 60 field_tag = encoder.TagBytes(field_descriptor.number, wire_type) 61 for tag_bytes, value in self.unknown_fields: 62 if tag_bytes == field_tag: 63 decoder = unittest_pb2.TestAllTypes._decoders_by_tag[tag_bytes] 64 result_dict = {} 65 decoder(value, 0, len(value), self.all_fields, result_dict) 66 return result_dict[field_descriptor] 67 68 def testVarint(self): 69 value = self.GetField('optional_int32') 70 self.assertEqual(self.all_fields.optional_int32, value) 71 72 def testFixed32(self): 73 value = self.GetField('optional_fixed32') 74 self.assertEqual(self.all_fields.optional_fixed32, value) 75 76 def testFixed64(self): 77 value = self.GetField('optional_fixed64') 78 self.assertEqual(self.all_fields.optional_fixed64, value) 79 80 def testLengthDelimited(self): 81 value = self.GetField('optional_string') 82 self.assertEqual(self.all_fields.optional_string, value) 83 84 def testGroup(self): 85 value = self.GetField('optionalgroup') 86 self.assertEqual(self.all_fields.optionalgroup, value) 87 88 def testSerialize(self): 89 data = self.empty_message.SerializeToString() 90 91 # Don't use assertEqual because we don't want to dump raw binary data to 92 # stdout. 93 self.assertTrue(data == self.all_fields_data) 94 95 def testCopyFrom(self): 96 message = unittest_pb2.TestEmptyMessage() 97 message.CopyFrom(self.empty_message) 98 self.assertEqual(self.unknown_fields, message._unknown_fields) 99 100 def testMergeFrom(self): 101 message = unittest_pb2.TestAllTypes() 102 message.optional_int32 = 1 103 message.optional_uint32 = 2 104 source = unittest_pb2.TestEmptyMessage() 105 source.ParseFromString(message.SerializeToString()) 106 107 message.ClearField('optional_int32') 108 message.optional_int64 = 3 109 message.optional_uint32 = 4 110 destination = unittest_pb2.TestEmptyMessage() 111 destination.ParseFromString(message.SerializeToString()) 112 unknown_fields = destination._unknown_fields[:] 113 114 destination.MergeFrom(source) 115 self.assertEqual(unknown_fields + source._unknown_fields, 116 destination._unknown_fields) 117 118 def testClear(self): 119 self.empty_message.Clear() 120 self.assertEqual(0, len(self.empty_message._unknown_fields)) 121 122 def testByteSize(self): 123 self.assertEqual(self.all_fields.ByteSize(), self.empty_message.ByteSize()) 124 125 def testUnknownExtensions(self): 126 message = unittest_pb2.TestEmptyMessageWithExtensions() 127 message.ParseFromString(self.all_fields_data) 128 self.assertEqual(self.empty_message._unknown_fields, 129 message._unknown_fields) 130 131 def testListFields(self): 132 # Make sure ListFields doesn't return unknown fields. 133 self.assertEqual(0, len(self.empty_message.ListFields())) 134 135 def testSerializeMessageSetWireFormatUnknownExtension(self): 136 # Create a message using the message set wire format with an unknown 137 # message. 138 raw = unittest_mset_pb2.RawMessageSet() 139 140 # Add an unknown extension. 141 item = raw.item.add() 142 item.type_id = 1545009 143 message1 = unittest_mset_pb2.TestMessageSetExtension1() 144 message1.i = 12345 145 item.message = message1.SerializeToString() 146 147 serialized = raw.SerializeToString() 148 149 # Parse message using the message set wire format. 150 proto = unittest_mset_pb2.TestMessageSet() 151 proto.MergeFromString(serialized) 152 153 # Verify that the unknown extension is serialized unchanged 154 reserialized = proto.SerializeToString() 155 new_raw = unittest_mset_pb2.RawMessageSet() 156 new_raw.MergeFromString(reserialized) 157 self.assertEqual(raw, new_raw) 158 159 def testEquals(self): 160 message = unittest_pb2.TestEmptyMessage() 161 message.ParseFromString(self.all_fields_data) 162 self.assertEqual(self.empty_message, message) 163 164 self.all_fields.ClearField('optional_string') 165 message.ParseFromString(self.all_fields.SerializeToString()) 166 self.assertNotEqual(self.empty_message, message) 167 168 169if __name__ == '__main__': 170 unittest.main() 171