1#! /usr/bin/python 2# -*- coding: utf-8 -*- 3# 4# Protocol Buffers - Google's data interchange format 5# Copyright 2008 Google Inc. All rights reserved. 6# https://developers.google.com/protocol-buffers/ 7# 8# Redistribution and use in source and binary forms, with or without 9# modification, are permitted provided that the following conditions are 10# met: 11# 12# * Redistributions of source code must retain the above copyright 13# notice, this list of conditions and the following disclaimer. 14# * Redistributions in binary form must reproduce the above 15# copyright notice, this list of conditions and the following disclaimer 16# in the documentation and/or other materials provided with the 17# distribution. 18# * Neither the name of Google Inc. nor the names of its 19# contributors may be used to endorse or promote products derived from 20# this software without specific prior written permission. 21# 22# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 25# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 26# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 27# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 28# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 29# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 30# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 31# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 32# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 33 34"""Test for preservation of unknown fields in the pure Python implementation.""" 35 36__author__ = 'bohdank@google.com (Bohdan Koval)' 37 38from google.apputils import basetest 39from google.protobuf import unittest_mset_pb2 40from google.protobuf import unittest_pb2 41from google.protobuf.internal import encoder 42from google.protobuf.internal import missing_enum_values_pb2 43from google.protobuf.internal import test_util 44from google.protobuf.internal import type_checkers 45 46 47class UnknownFieldsTest(basetest.TestCase): 48 49 def setUp(self): 50 self.descriptor = unittest_pb2.TestAllTypes.DESCRIPTOR 51 self.all_fields = unittest_pb2.TestAllTypes() 52 test_util.SetAllFields(self.all_fields) 53 self.all_fields_data = self.all_fields.SerializeToString() 54 self.empty_message = unittest_pb2.TestEmptyMessage() 55 self.empty_message.ParseFromString(self.all_fields_data) 56 self.unknown_fields = self.empty_message._unknown_fields 57 58 def GetField(self, name): 59 field_descriptor = self.descriptor.fields_by_name[name] 60 wire_type = type_checkers.FIELD_TYPE_TO_WIRE_TYPE[field_descriptor.type] 61 field_tag = encoder.TagBytes(field_descriptor.number, wire_type) 62 result_dict = {} 63 for tag_bytes, value in self.unknown_fields: 64 if tag_bytes == field_tag: 65 decoder = unittest_pb2.TestAllTypes._decoders_by_tag[tag_bytes][0] 66 decoder(value, 0, len(value), self.all_fields, result_dict) 67 return result_dict[field_descriptor] 68 69 def testEnum(self): 70 value = self.GetField('optional_nested_enum') 71 self.assertEqual(self.all_fields.optional_nested_enum, value) 72 73 def testRepeatedEnum(self): 74 value = self.GetField('repeated_nested_enum') 75 self.assertEqual(self.all_fields.repeated_nested_enum, value) 76 77 def testVarint(self): 78 value = self.GetField('optional_int32') 79 self.assertEqual(self.all_fields.optional_int32, value) 80 81 def testFixed32(self): 82 value = self.GetField('optional_fixed32') 83 self.assertEqual(self.all_fields.optional_fixed32, value) 84 85 def testFixed64(self): 86 value = self.GetField('optional_fixed64') 87 self.assertEqual(self.all_fields.optional_fixed64, value) 88 89 def testLengthDelimited(self): 90 value = self.GetField('optional_string') 91 self.assertEqual(self.all_fields.optional_string, value) 92 93 def testGroup(self): 94 value = self.GetField('optionalgroup') 95 self.assertEqual(self.all_fields.optionalgroup, value) 96 97 def testSerialize(self): 98 data = self.empty_message.SerializeToString() 99 100 # Don't use assertEqual because we don't want to dump raw binary data to 101 # stdout. 102 self.assertTrue(data == self.all_fields_data) 103 104 def testCopyFrom(self): 105 message = unittest_pb2.TestEmptyMessage() 106 message.CopyFrom(self.empty_message) 107 self.assertEqual(self.unknown_fields, message._unknown_fields) 108 109 def testMergeFrom(self): 110 message = unittest_pb2.TestAllTypes() 111 message.optional_int32 = 1 112 message.optional_uint32 = 2 113 source = unittest_pb2.TestEmptyMessage() 114 source.ParseFromString(message.SerializeToString()) 115 116 message.ClearField('optional_int32') 117 message.optional_int64 = 3 118 message.optional_uint32 = 4 119 destination = unittest_pb2.TestEmptyMessage() 120 destination.ParseFromString(message.SerializeToString()) 121 unknown_fields = destination._unknown_fields[:] 122 123 destination.MergeFrom(source) 124 self.assertEqual(unknown_fields + source._unknown_fields, 125 destination._unknown_fields) 126 127 def testClear(self): 128 self.empty_message.Clear() 129 self.assertEqual(0, len(self.empty_message._unknown_fields)) 130 131 def testByteSize(self): 132 self.assertEqual(self.all_fields.ByteSize(), self.empty_message.ByteSize()) 133 134 def testUnknownExtensions(self): 135 message = unittest_pb2.TestEmptyMessageWithExtensions() 136 message.ParseFromString(self.all_fields_data) 137 self.assertEqual(self.empty_message._unknown_fields, 138 message._unknown_fields) 139 140 def testListFields(self): 141 # Make sure ListFields doesn't return unknown fields. 142 self.assertEqual(0, len(self.empty_message.ListFields())) 143 144 def testSerializeMessageSetWireFormatUnknownExtension(self): 145 # Create a message using the message set wire format with an unknown 146 # message. 147 raw = unittest_mset_pb2.RawMessageSet() 148 149 # Add an unknown extension. 150 item = raw.item.add() 151 item.type_id = 1545009 152 message1 = unittest_mset_pb2.TestMessageSetExtension1() 153 message1.i = 12345 154 item.message = message1.SerializeToString() 155 156 serialized = raw.SerializeToString() 157 158 # Parse message using the message set wire format. 159 proto = unittest_mset_pb2.TestMessageSet() 160 proto.MergeFromString(serialized) 161 162 # Verify that the unknown extension is serialized unchanged 163 reserialized = proto.SerializeToString() 164 new_raw = unittest_mset_pb2.RawMessageSet() 165 new_raw.MergeFromString(reserialized) 166 self.assertEqual(raw, new_raw) 167 168 def testEquals(self): 169 message = unittest_pb2.TestEmptyMessage() 170 message.ParseFromString(self.all_fields_data) 171 self.assertEqual(self.empty_message, message) 172 173 self.all_fields.ClearField('optional_string') 174 message.ParseFromString(self.all_fields.SerializeToString()) 175 self.assertNotEqual(self.empty_message, message) 176 177 178class UnknownFieldsTest(basetest.TestCase): 179 180 def setUp(self): 181 self.descriptor = missing_enum_values_pb2.TestEnumValues.DESCRIPTOR 182 183 self.message = missing_enum_values_pb2.TestEnumValues() 184 self.message.optional_nested_enum = ( 185 missing_enum_values_pb2.TestEnumValues.ZERO) 186 self.message.repeated_nested_enum.extend([ 187 missing_enum_values_pb2.TestEnumValues.ZERO, 188 missing_enum_values_pb2.TestEnumValues.ONE, 189 ]) 190 self.message.packed_nested_enum.extend([ 191 missing_enum_values_pb2.TestEnumValues.ZERO, 192 missing_enum_values_pb2.TestEnumValues.ONE, 193 ]) 194 self.message_data = self.message.SerializeToString() 195 self.missing_message = missing_enum_values_pb2.TestMissingEnumValues() 196 self.missing_message.ParseFromString(self.message_data) 197 self.unknown_fields = self.missing_message._unknown_fields 198 199 def GetField(self, name): 200 field_descriptor = self.descriptor.fields_by_name[name] 201 wire_type = type_checkers.FIELD_TYPE_TO_WIRE_TYPE[field_descriptor.type] 202 field_tag = encoder.TagBytes(field_descriptor.number, wire_type) 203 result_dict = {} 204 for tag_bytes, value in self.unknown_fields: 205 if tag_bytes == field_tag: 206 decoder = missing_enum_values_pb2.TestEnumValues._decoders_by_tag[ 207 tag_bytes][0] 208 decoder(value, 0, len(value), self.message, result_dict) 209 return result_dict[field_descriptor] 210 211 def testUnknownEnumValue(self): 212 self.assertFalse(self.missing_message.HasField('optional_nested_enum')) 213 value = self.GetField('optional_nested_enum') 214 self.assertEqual(self.message.optional_nested_enum, value) 215 216 def testUnknownRepeatedEnumValue(self): 217 value = self.GetField('repeated_nested_enum') 218 self.assertEqual(self.message.repeated_nested_enum, value) 219 220 def testUnknownPackedEnumValue(self): 221 value = self.GetField('packed_nested_enum') 222 self.assertEqual(self.message.packed_nested_enum, value) 223 224 def testRoundTrip(self): 225 new_message = missing_enum_values_pb2.TestEnumValues() 226 new_message.ParseFromString(self.missing_message.SerializeToString()) 227 self.assertEqual(self.message, new_message) 228 229 230if __name__ == '__main__': 231 basetest.main() 232