1#! /usr/bin/python
2# -*- coding: utf-8 -*-
3#
4# Protocol Buffers - Google's data interchange format
5# Copyright 2008 Google Inc.  All rights reserved.
6# https://developers.google.com/protocol-buffers/
7#
8# Redistribution and use in source and binary forms, with or without
9# modification, are permitted provided that the following conditions are
10# met:
11#
12#     * Redistributions of source code must retain the above copyright
13# notice, this list of conditions and the following disclaimer.
14#     * Redistributions in binary form must reproduce the above
15# copyright notice, this list of conditions and the following disclaimer
16# in the documentation and/or other materials provided with the
17# distribution.
18#     * Neither the name of Google Inc. nor the names of its
19# contributors may be used to endorse or promote products derived from
20# this software without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33
34"""Test for preservation of unknown fields in the pure Python implementation."""
35
36__author__ = 'bohdank@google.com (Bohdan Koval)'
37
38from google.apputils import basetest
39from google.protobuf import unittest_mset_pb2
40from google.protobuf import unittest_pb2
41from google.protobuf.internal import encoder
42from google.protobuf.internal import missing_enum_values_pb2
43from google.protobuf.internal import test_util
44from google.protobuf.internal import type_checkers
45
46
47class UnknownFieldsTest(basetest.TestCase):
48
49  def setUp(self):
50    self.descriptor = unittest_pb2.TestAllTypes.DESCRIPTOR
51    self.all_fields = unittest_pb2.TestAllTypes()
52    test_util.SetAllFields(self.all_fields)
53    self.all_fields_data = self.all_fields.SerializeToString()
54    self.empty_message = unittest_pb2.TestEmptyMessage()
55    self.empty_message.ParseFromString(self.all_fields_data)
56    self.unknown_fields = self.empty_message._unknown_fields
57
58  def GetField(self, name):
59    field_descriptor = self.descriptor.fields_by_name[name]
60    wire_type = type_checkers.FIELD_TYPE_TO_WIRE_TYPE[field_descriptor.type]
61    field_tag = encoder.TagBytes(field_descriptor.number, wire_type)
62    result_dict = {}
63    for tag_bytes, value in self.unknown_fields:
64      if tag_bytes == field_tag:
65        decoder = unittest_pb2.TestAllTypes._decoders_by_tag[tag_bytes][0]
66        decoder(value, 0, len(value), self.all_fields, result_dict)
67    return result_dict[field_descriptor]
68
69  def testEnum(self):
70    value = self.GetField('optional_nested_enum')
71    self.assertEqual(self.all_fields.optional_nested_enum, value)
72
73  def testRepeatedEnum(self):
74    value = self.GetField('repeated_nested_enum')
75    self.assertEqual(self.all_fields.repeated_nested_enum, value)
76
77  def testVarint(self):
78    value = self.GetField('optional_int32')
79    self.assertEqual(self.all_fields.optional_int32, value)
80
81  def testFixed32(self):
82    value = self.GetField('optional_fixed32')
83    self.assertEqual(self.all_fields.optional_fixed32, value)
84
85  def testFixed64(self):
86    value = self.GetField('optional_fixed64')
87    self.assertEqual(self.all_fields.optional_fixed64, value)
88
89  def testLengthDelimited(self):
90    value = self.GetField('optional_string')
91    self.assertEqual(self.all_fields.optional_string, value)
92
93  def testGroup(self):
94    value = self.GetField('optionalgroup')
95    self.assertEqual(self.all_fields.optionalgroup, value)
96
97  def testSerialize(self):
98    data = self.empty_message.SerializeToString()
99
100    # Don't use assertEqual because we don't want to dump raw binary data to
101    # stdout.
102    self.assertTrue(data == self.all_fields_data)
103
104  def testCopyFrom(self):
105    message = unittest_pb2.TestEmptyMessage()
106    message.CopyFrom(self.empty_message)
107    self.assertEqual(self.unknown_fields, message._unknown_fields)
108
109  def testMergeFrom(self):
110    message = unittest_pb2.TestAllTypes()
111    message.optional_int32 = 1
112    message.optional_uint32 = 2
113    source = unittest_pb2.TestEmptyMessage()
114    source.ParseFromString(message.SerializeToString())
115
116    message.ClearField('optional_int32')
117    message.optional_int64 = 3
118    message.optional_uint32 = 4
119    destination = unittest_pb2.TestEmptyMessage()
120    destination.ParseFromString(message.SerializeToString())
121    unknown_fields = destination._unknown_fields[:]
122
123    destination.MergeFrom(source)
124    self.assertEqual(unknown_fields + source._unknown_fields,
125                     destination._unknown_fields)
126
127  def testClear(self):
128    self.empty_message.Clear()
129    self.assertEqual(0, len(self.empty_message._unknown_fields))
130
131  def testByteSize(self):
132    self.assertEqual(self.all_fields.ByteSize(), self.empty_message.ByteSize())
133
134  def testUnknownExtensions(self):
135    message = unittest_pb2.TestEmptyMessageWithExtensions()
136    message.ParseFromString(self.all_fields_data)
137    self.assertEqual(self.empty_message._unknown_fields,
138                     message._unknown_fields)
139
140  def testListFields(self):
141    # Make sure ListFields doesn't return unknown fields.
142    self.assertEqual(0, len(self.empty_message.ListFields()))
143
144  def testSerializeMessageSetWireFormatUnknownExtension(self):
145    # Create a message using the message set wire format with an unknown
146    # message.
147    raw = unittest_mset_pb2.RawMessageSet()
148
149    # Add an unknown extension.
150    item = raw.item.add()
151    item.type_id = 1545009
152    message1 = unittest_mset_pb2.TestMessageSetExtension1()
153    message1.i = 12345
154    item.message = message1.SerializeToString()
155
156    serialized = raw.SerializeToString()
157
158    # Parse message using the message set wire format.
159    proto = unittest_mset_pb2.TestMessageSet()
160    proto.MergeFromString(serialized)
161
162    # Verify that the unknown extension is serialized unchanged
163    reserialized = proto.SerializeToString()
164    new_raw = unittest_mset_pb2.RawMessageSet()
165    new_raw.MergeFromString(reserialized)
166    self.assertEqual(raw, new_raw)
167
168  def testEquals(self):
169    message = unittest_pb2.TestEmptyMessage()
170    message.ParseFromString(self.all_fields_data)
171    self.assertEqual(self.empty_message, message)
172
173    self.all_fields.ClearField('optional_string')
174    message.ParseFromString(self.all_fields.SerializeToString())
175    self.assertNotEqual(self.empty_message, message)
176
177
178class UnknownFieldsTest(basetest.TestCase):
179
180  def setUp(self):
181    self.descriptor = missing_enum_values_pb2.TestEnumValues.DESCRIPTOR
182
183    self.message = missing_enum_values_pb2.TestEnumValues()
184    self.message.optional_nested_enum = (
185      missing_enum_values_pb2.TestEnumValues.ZERO)
186    self.message.repeated_nested_enum.extend([
187      missing_enum_values_pb2.TestEnumValues.ZERO,
188      missing_enum_values_pb2.TestEnumValues.ONE,
189      ])
190    self.message.packed_nested_enum.extend([
191      missing_enum_values_pb2.TestEnumValues.ZERO,
192      missing_enum_values_pb2.TestEnumValues.ONE,
193      ])
194    self.message_data = self.message.SerializeToString()
195    self.missing_message = missing_enum_values_pb2.TestMissingEnumValues()
196    self.missing_message.ParseFromString(self.message_data)
197    self.unknown_fields = self.missing_message._unknown_fields
198
199  def GetField(self, name):
200    field_descriptor = self.descriptor.fields_by_name[name]
201    wire_type = type_checkers.FIELD_TYPE_TO_WIRE_TYPE[field_descriptor.type]
202    field_tag = encoder.TagBytes(field_descriptor.number, wire_type)
203    result_dict = {}
204    for tag_bytes, value in self.unknown_fields:
205      if tag_bytes == field_tag:
206        decoder = missing_enum_values_pb2.TestEnumValues._decoders_by_tag[
207          tag_bytes][0]
208        decoder(value, 0, len(value), self.message, result_dict)
209    return result_dict[field_descriptor]
210
211  def testUnknownEnumValue(self):
212    self.assertFalse(self.missing_message.HasField('optional_nested_enum'))
213    value = self.GetField('optional_nested_enum')
214    self.assertEqual(self.message.optional_nested_enum, value)
215
216  def testUnknownRepeatedEnumValue(self):
217    value = self.GetField('repeated_nested_enum')
218    self.assertEqual(self.message.repeated_nested_enum, value)
219
220  def testUnknownPackedEnumValue(self):
221    value = self.GetField('packed_nested_enum')
222    self.assertEqual(self.message.packed_nested_enum, value)
223
224  def testRoundTrip(self):
225    new_message = missing_enum_values_pb2.TestEnumValues()
226    new_message.ParseFromString(self.missing_message.SerializeToString())
227    self.assertEqual(self.message, new_message)
228
229
230if __name__ == '__main__':
231  basetest.main()
232