1#! /usr/bin/python
2# -*- coding: utf-8 -*-
3#
4# Protocol Buffers - Google's data interchange format
5# Copyright 2008 Google Inc.  All rights reserved.
6# http://code.google.com/p/protobuf/
7#
8# Redistribution and use in source and binary forms, with or without
9# modification, are permitted provided that the following conditions are
10# met:
11#
12#     * Redistributions of source code must retain the above copyright
13# notice, this list of conditions and the following disclaimer.
14#     * Redistributions in binary form must reproduce the above
15# copyright notice, this list of conditions and the following disclaimer
16# in the documentation and/or other materials provided with the
17# distribution.
18#     * Neither the name of Google Inc. nor the names of its
19# contributors may be used to endorse or promote products derived from
20# this software without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33
34"""Test for preservation of unknown fields in the pure Python implementation."""
35
36__author__ = 'bohdank@google.com (Bohdan Koval)'
37
38import unittest
39from google.protobuf import unittest_mset_pb2
40from google.protobuf import unittest_pb2
41from google.protobuf.internal import encoder
42from google.protobuf.internal import test_util
43from google.protobuf.internal import type_checkers
44
45
46class UnknownFieldsTest(unittest.TestCase):
47
48  def setUp(self):
49    self.descriptor = unittest_pb2.TestAllTypes.DESCRIPTOR
50    self.all_fields = unittest_pb2.TestAllTypes()
51    test_util.SetAllFields(self.all_fields)
52    self.all_fields_data = self.all_fields.SerializeToString()
53    self.empty_message = unittest_pb2.TestEmptyMessage()
54    self.empty_message.ParseFromString(self.all_fields_data)
55    self.unknown_fields = self.empty_message._unknown_fields
56
57  def GetField(self, name):
58    field_descriptor = self.descriptor.fields_by_name[name]
59    wire_type = type_checkers.FIELD_TYPE_TO_WIRE_TYPE[field_descriptor.type]
60    field_tag = encoder.TagBytes(field_descriptor.number, wire_type)
61    for tag_bytes, value in self.unknown_fields:
62      if tag_bytes == field_tag:
63        decoder = unittest_pb2.TestAllTypes._decoders_by_tag[tag_bytes]
64        result_dict = {}
65        decoder(value, 0, len(value), self.all_fields, result_dict)
66        return result_dict[field_descriptor]
67
68  def testVarint(self):
69    value = self.GetField('optional_int32')
70    self.assertEqual(self.all_fields.optional_int32, value)
71
72  def testFixed32(self):
73    value = self.GetField('optional_fixed32')
74    self.assertEqual(self.all_fields.optional_fixed32, value)
75
76  def testFixed64(self):
77    value = self.GetField('optional_fixed64')
78    self.assertEqual(self.all_fields.optional_fixed64, value)
79
80  def testLengthDelimited(self):
81    value = self.GetField('optional_string')
82    self.assertEqual(self.all_fields.optional_string, value)
83
84  def testGroup(self):
85    value = self.GetField('optionalgroup')
86    self.assertEqual(self.all_fields.optionalgroup, value)
87
88  def testSerialize(self):
89    data = self.empty_message.SerializeToString()
90
91    # Don't use assertEqual because we don't want to dump raw binary data to
92    # stdout.
93    self.assertTrue(data == self.all_fields_data)
94
95  def testCopyFrom(self):
96    message = unittest_pb2.TestEmptyMessage()
97    message.CopyFrom(self.empty_message)
98    self.assertEqual(self.unknown_fields, message._unknown_fields)
99
100  def testMergeFrom(self):
101    message = unittest_pb2.TestAllTypes()
102    message.optional_int32 = 1
103    message.optional_uint32 = 2
104    source = unittest_pb2.TestEmptyMessage()
105    source.ParseFromString(message.SerializeToString())
106
107    message.ClearField('optional_int32')
108    message.optional_int64 = 3
109    message.optional_uint32 = 4
110    destination = unittest_pb2.TestEmptyMessage()
111    destination.ParseFromString(message.SerializeToString())
112    unknown_fields = destination._unknown_fields[:]
113
114    destination.MergeFrom(source)
115    self.assertEqual(unknown_fields + source._unknown_fields,
116                     destination._unknown_fields)
117
118  def testClear(self):
119    self.empty_message.Clear()
120    self.assertEqual(0, len(self.empty_message._unknown_fields))
121
122  def testByteSize(self):
123    self.assertEqual(self.all_fields.ByteSize(), self.empty_message.ByteSize())
124
125  def testUnknownExtensions(self):
126    message = unittest_pb2.TestEmptyMessageWithExtensions()
127    message.ParseFromString(self.all_fields_data)
128    self.assertEqual(self.empty_message._unknown_fields,
129                     message._unknown_fields)
130
131  def testListFields(self):
132    # Make sure ListFields doesn't return unknown fields.
133    self.assertEqual(0, len(self.empty_message.ListFields()))
134
135  def testSerializeMessageSetWireFormatUnknownExtension(self):
136    # Create a message using the message set wire format with an unknown
137    # message.
138    raw = unittest_mset_pb2.RawMessageSet()
139
140    # Add an unknown extension.
141    item = raw.item.add()
142    item.type_id = 1545009
143    message1 = unittest_mset_pb2.TestMessageSetExtension1()
144    message1.i = 12345
145    item.message = message1.SerializeToString()
146
147    serialized = raw.SerializeToString()
148
149    # Parse message using the message set wire format.
150    proto = unittest_mset_pb2.TestMessageSet()
151    proto.MergeFromString(serialized)
152
153    # Verify that the unknown extension is serialized unchanged
154    reserialized = proto.SerializeToString()
155    new_raw = unittest_mset_pb2.RawMessageSet()
156    new_raw.MergeFromString(reserialized)
157    self.assertEqual(raw, new_raw)
158
159  def testEquals(self):
160    message = unittest_pb2.TestEmptyMessage()
161    message.ParseFromString(self.all_fields_data)
162    self.assertEqual(self.empty_message, message)
163
164    self.all_fields.ClearField('optional_string')
165    message.ParseFromString(self.all_fields.SerializeToString())
166    self.assertNotEqual(self.empty_message, message)
167
168
169if __name__ == '__main__':
170  unittest.main()
171