| # Protocol Buffers - Google's data interchange format |
| # Copyright 2008 Google Inc. All rights reserved. |
| # |
| # Use of this source code is governed by a BSD-style |
| # license that can be found in the LICENSE file or at |
| # https://developers.google.com/open-source/licenses/bsd |
| |
| """Tests for google.protobuf.message_factory.""" |
| |
| __author__ = 'matthewtoia@google.com (Matt Toia)' |
| |
| import unittest |
| import gc |
| |
| from google.protobuf import descriptor_pb2 |
| from google.protobuf.internal import api_implementation |
| from google.protobuf.internal import factory_test1_pb2 |
| from google.protobuf.internal import factory_test2_pb2 |
| from google.protobuf.internal import testing_refleaks |
| from google.protobuf import descriptor_database |
| from google.protobuf import descriptor_pool |
| from google.protobuf import message_factory |
| from google.protobuf import descriptor |
| |
| @testing_refleaks.TestCase |
| class MessageFactoryTest(unittest.TestCase): |
| |
| def setUp(self): |
| self.factory_test1_fd = descriptor_pb2.FileDescriptorProto.FromString( |
| factory_test1_pb2.DESCRIPTOR.serialized_pb) |
| self.factory_test2_fd = descriptor_pb2.FileDescriptorProto.FromString( |
| factory_test2_pb2.DESCRIPTOR.serialized_pb) |
| |
| def _ExerciseDynamicClass(self, cls): |
| msg = cls() |
| msg.mandatory = 42 |
| msg.nested_factory_2_enum = 0 |
| msg.nested_factory_2_message.value = 'nested message value' |
| msg.factory_1_message.factory_1_enum = 1 |
| msg.factory_1_message.nested_factory_1_enum = 0 |
| msg.factory_1_message.nested_factory_1_message.value = ( |
| 'nested message value') |
| msg.factory_1_message.scalar_value = 22 |
| msg.factory_1_message.list_value.extend([u'one', u'two', u'three']) |
| msg.factory_1_message.list_value.append(u'four') |
| msg.factory_1_enum = 1 |
| msg.nested_factory_1_enum = 0 |
| msg.nested_factory_1_message.value = 'nested message value' |
| msg.circular_message.mandatory = 1 |
| msg.circular_message.circular_message.mandatory = 2 |
| msg.circular_message.scalar_value = 'one deep' |
| msg.scalar_value = 'zero deep' |
| msg.list_value.extend([u'four', u'three', u'two']) |
| msg.list_value.append(u'one') |
| msg.grouped.add() |
| msg.grouped[0].part_1 = 'hello' |
| msg.grouped[0].part_2 = 'world' |
| msg.grouped.add(part_1='testing', part_2='123') |
| msg.loop.loop.mandatory = 2 |
| msg.loop.loop.loop.loop.mandatory = 4 |
| serialized = msg.SerializeToString() |
| converted = factory_test2_pb2.Factory2Message.FromString(serialized) |
| reserialized = converted.SerializeToString() |
| self.assertEqual(serialized, reserialized) |
| result = cls.FromString(reserialized) |
| self.assertEqual(msg, result) |
| |
| def testGetPrototype(self): |
| db = descriptor_database.DescriptorDatabase() |
| pool = descriptor_pool.DescriptorPool(db) |
| db.Add(self.factory_test1_fd) |
| db.Add(self.factory_test2_fd) |
| cls = message_factory.GetMessageClass(pool.FindMessageTypeByName( |
| 'google.protobuf.python.internal.Factory2Message')) |
| self.assertFalse(cls is factory_test2_pb2.Factory2Message) |
| self._ExerciseDynamicClass(cls) |
| cls2 = message_factory.GetMessageClass(pool.FindMessageTypeByName( |
| 'google.protobuf.python.internal.Factory2Message')) |
| self.assertTrue(cls is cls2) |
| |
| def testGetExistingPrototype(self): |
| # Get Existing Prototype should not create a new class. |
| cls = message_factory.GetMessageClass( |
| descriptor=factory_test2_pb2.Factory2Message.DESCRIPTOR) |
| msg = factory_test2_pb2.Factory2Message() |
| self.assertIsInstance(msg, cls) |
| self.assertIsInstance(msg.factory_1_message, |
| factory_test1_pb2.Factory1Message) |
| |
| def testGetMessages(self): |
| # performed twice because multiple calls with the same input must be allowed |
| for _ in range(2): |
| # GetMessage should work regardless of the order the FileDescriptorProto |
| # are provided. In particular, the function should succeed when the files |
| # are not in the topological order of dependencies. |
| |
| # Assuming factory_test2_fd depends on factory_test1_fd. |
| self.assertIn(self.factory_test1_fd.name, |
| self.factory_test2_fd.dependency) |
| # Get messages should work when a file comes before its dependencies: |
| # factory_test2_fd comes before factory_test1_fd. |
| messages = message_factory.GetMessages([self.factory_test2_fd, |
| self.factory_test1_fd], |
| descriptor_pool.Default()) |
| self.assertTrue( |
| set(['google.protobuf.python.internal.Factory2Message', |
| 'google.protobuf.python.internal.Factory1Message'], |
| ).issubset(set(messages.keys()))) |
| self._ExerciseDynamicClass( |
| messages['google.protobuf.python.internal.Factory2Message']) |
| factory_msg1 = messages['google.protobuf.python.internal.Factory1Message'] |
| self.assertTrue(set( |
| ['google.protobuf.python.internal.Factory2Message.one_more_field', |
| 'google.protobuf.python.internal.another_field'],).issubset(set( |
| ext.full_name |
| for ext in factory_msg1.DESCRIPTOR.file.pool.FindAllExtensions( |
| factory_msg1.DESCRIPTOR)))) |
| msg1 = messages['google.protobuf.python.internal.Factory1Message']() |
| ext1 = msg1.Extensions._FindExtensionByName( |
| 'google.protobuf.python.internal.Factory2Message.one_more_field') |
| ext2 = msg1.Extensions._FindExtensionByName( |
| 'google.protobuf.python.internal.another_field') |
| self.assertEqual(0, len(msg1.Extensions)) |
| msg1.Extensions[ext1] = 'test1' |
| msg1.Extensions[ext2] = 'test2' |
| self.assertEqual('test1', msg1.Extensions[ext1]) |
| self.assertEqual('test2', msg1.Extensions[ext2]) |
| self.assertEqual(None, |
| msg1.Extensions._FindExtensionByNumber(12321)) |
| self.assertEqual(2, len(msg1.Extensions)) |
| if api_implementation.Type() == 'python': |
| self.assertEqual(None, |
| msg1.Extensions._FindExtensionByName(0)) |
| self.assertEqual(None, |
| msg1.Extensions._FindExtensionByNumber('')) |
| else: |
| self.assertRaises(TypeError, msg1.Extensions._FindExtensionByName, 0) |
| self.assertRaises(TypeError, msg1.Extensions._FindExtensionByNumber, '') |
| |
| def testDuplicateExtensionNumber(self): |
| pool = descriptor_pool.DescriptorPool() |
| |
| # Add Container message. |
| f = descriptor_pb2.FileDescriptorProto( |
| name='google/protobuf/internal/container.proto', |
| package='google.protobuf.python.internal') |
| f.message_type.add(name='Container').extension_range.add(start=1, end=10) |
| pool.Add(f) |
| msgs = message_factory.GetMessageClassesForFiles([f.name], pool) |
| self.assertIn('google.protobuf.python.internal.Container', msgs) |
| |
| # Extend container. |
| f = descriptor_pb2.FileDescriptorProto( |
| name='google/protobuf/internal/extension.proto', |
| package='google.protobuf.python.internal', |
| dependency=['google/protobuf/internal/container.proto']) |
| msg = f.message_type.add(name='Extension') |
| msg.extension.add( |
| name='extension_field', |
| number=2, |
| label=descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL, |
| type_name='Extension', |
| extendee='Container', |
| ) |
| pool.Add(f) |
| msgs = message_factory.GetMessageClassesForFiles([f.name], pool) |
| self.assertIn('google.protobuf.python.internal.Extension', msgs) |
| |
| # Add Duplicate extending the same field number. |
| f = descriptor_pb2.FileDescriptorProto( |
| name='google/protobuf/internal/duplicate.proto', |
| package='google.protobuf.python.internal', |
| dependency=['google/protobuf/internal/container.proto']) |
| msg = f.message_type.add(name='Duplicate') |
| msg.extension.add( |
| name='extension_field', |
| number=2, |
| label=descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL, |
| type_name='Duplicate', |
| extendee='Container', |
| ) |
| pool.Add(f) |
| |
| with self.assertRaises(Exception) as cm: |
| message_factory.GetMessageClassesForFiles([f.name], pool) |
| |
| self.assertIn(str(cm.exception), |
| ['Extensions ' |
| '"google.protobuf.python.internal.Duplicate.extension_field" and' |
| ' "google.protobuf.python.internal.Extension.extension_field"' |
| ' both try to extend message type' |
| ' "google.protobuf.python.internal.Container"' |
| ' with field number 2.', |
| 'Double registration of Extensions']) |
| |
| def testExtensionValueInDifferentFile(self): |
| # Add Container message. |
| f1 = descriptor_pb2.FileDescriptorProto( |
| name='google/protobuf/internal/container.proto', |
| package='google.protobuf.python.internal') |
| f1.message_type.add(name='Container').extension_range.add(start=1, end=10) |
| |
| # Add ValueType message. |
| f2 = descriptor_pb2.FileDescriptorProto( |
| name='google/protobuf/internal/value_type.proto', |
| package='google.protobuf.python.internal') |
| f2.message_type.add(name='ValueType').field.add( |
| name='setting', |
| number=1, |
| label=descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL, |
| type=descriptor_pb2.FieldDescriptorProto.TYPE_INT32, |
| default_value='123') |
| |
| # Extend container with field of ValueType. |
| f3 = descriptor_pb2.FileDescriptorProto( |
| name='google/protobuf/internal/extension.proto', |
| package='google.protobuf.python.internal', |
| dependency=[f1.name, f2.name]) |
| f3.extension.add( |
| name='top_level_extension_field', |
| number=2, |
| label=descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL, |
| type_name='ValueType', |
| extendee='Container', |
| ) |
| f3.message_type.add(name='Extension').extension.add( |
| name='nested_extension_field', |
| number=3, |
| label=descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL, |
| type_name='ValueType', |
| extendee='Container', |
| ) |
| |
| pool = descriptor_pool.Default() |
| try: |
| pool.Add(f1) |
| pool.Add(f2) |
| pool.Add(f3) |
| except: |
| pass |
| msgs = message_factory.GetMessageClassesForFiles( |
| [f1.name, f3.name], pool) # Deliberately not f2. |
| msg = msgs['google.protobuf.python.internal.Container'] |
| desc = msgs['google.protobuf.python.internal.Extension'].DESCRIPTOR |
| ext1 = desc.file.extensions_by_name['top_level_extension_field'] |
| ext2 = desc.extensions_by_name['nested_extension_field'] |
| m = msg() |
| m.Extensions[ext1].setting = 234 |
| m.Extensions[ext2].setting = 345 |
| serialized = m.SerializeToString() |
| |
| f1.name='google/protobuf/internal/another/container.proto' |
| f1.package='google.protobuf.python.internal.another' |
| f2.name='google/protobuf/internal/another/value_type.proto' |
| f2.package='google.protobuf.python.internal.another' |
| f3.name='google/protobuf/internal/another/extension.proto' |
| f3.package='google.protobuf.python.internal.another' |
| f3.ClearField('dependency') |
| f3.dependency.extend([f1.name, f2.name]) |
| try: |
| pool.Add(f1) |
| pool.Add(f2) |
| pool.Add(f3) |
| except: |
| pass |
| msgs = message_factory.GetMessageClassesForFiles( |
| [f1.name, f3.name], pool) # Deliberately not f2. |
| msg = msgs['google.protobuf.python.internal.another.Container'] |
| desc = msgs['google.protobuf.python.internal.another.Extension'].DESCRIPTOR |
| ext1 = desc.file.extensions_by_name['top_level_extension_field'] |
| ext2 = desc.extensions_by_name['nested_extension_field'] |
| m = msg.FromString(serialized) |
| self.assertEqual(2, len(m.ListFields())) |
| self.assertEqual(234, m.Extensions[ext1].setting) |
| self.assertEqual(345, m.Extensions[ext2].setting) |
| |
| def testDescriptorKeepConcreteClass(self): |
| def loadFile(): |
| f= descriptor_pb2.FileDescriptorProto( |
| name='google/protobuf/internal/meta_class.proto', |
| package='google.protobuf.python.internal') |
| msg_proto = f.message_type.add(name='Empty') |
| msg_proto.nested_type.add(name='Nested') |
| msg_proto.field.add(name='nested_field', |
| number=1, |
| label=descriptor.FieldDescriptor.LABEL_REPEATED, |
| type=descriptor.FieldDescriptor.TYPE_MESSAGE, |
| type_name='Nested') |
| return message_factory.GetMessages([f]) |
| |
| messages = loadFile() |
| for des, meta_class in messages.items(): |
| message = meta_class() |
| nested_des = message.DESCRIPTOR.nested_types_by_name['Nested'] |
| nested_msg = nested_des._concrete_class() |
| |
| def testOndemandCreateMetaClass(self): |
| def loadFile(): |
| f = descriptor_pb2.FileDescriptorProto.FromString( |
| factory_test1_pb2.DESCRIPTOR.serialized_pb) |
| return message_factory.GetMessages([f]) |
| |
| messages = loadFile() |
| data = factory_test1_pb2.Factory1Message() |
| data.map_field['hello'] = 'welcome' |
| # Force GC to collect. UPB python will clean up the map entry class. |
| # cpp extension and pure python will still keep the map entry class. |
| gc.collect() |
| message = messages['google.protobuf.python.internal.Factory1Message']() |
| message.ParseFromString(data.SerializeToString()) |
| value = message.map_field |
| values = [ |
| # The entry class will be created on demand in upb python. |
| value.GetEntryClass()(key=k, value=value[k]) for k in sorted(value) |
| ] |
| gc.collect() |
| self.assertEqual(1, len(values)) |
| self.assertEqual('hello', values[0].key) |
| self.assertEqual('welcome', values[0].value) |
| |
| if __name__ == '__main__': |
| unittest.main() |