blob: 971ef8d788799fc3ec197d485c7ffe3b4633dd29 [file] [log] [blame]
# Protocol Buffers - Google's data interchange format
# Copyright 2008 Google Inc. All rights reserved.
#
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file or at
# https://developers.google.com/open-source/licenses/bsd
"""Test for google.protobuf.internal.well_known_types."""
__author__ = 'jieluo@google.com (Jie Luo)'
import collections.abc as collections_abc
import datetime
import unittest
from google.protobuf import text_format
from google.protobuf.internal import any_test_pb2
from google.protobuf.internal import more_messages_pb2
from google.protobuf.internal import well_known_types
from google.protobuf import any_pb2
from google.protobuf import duration_pb2
from google.protobuf import struct_pb2
from google.protobuf import timestamp_pb2
from google.protobuf.internal import _parameterized
from google.protobuf import unittest_pb2
try:
# New module in Python 3.9:
import zoneinfo # pylint:disable=g-import-not-at-top
_TZ_JAPAN = zoneinfo.ZoneInfo('Japan')
_TZ_PACIFIC = zoneinfo.ZoneInfo('US/Pacific')
has_zoneinfo = True
except ImportError:
_TZ_JAPAN = datetime.timezone(datetime.timedelta(hours=9), 'Japan')
_TZ_PACIFIC = datetime.timezone(datetime.timedelta(hours=-8), 'US/Pacific')
has_zoneinfo = False
class TimeUtilTestBase(_parameterized.TestCase):
def CheckTimestampConversion(self, message, text):
self.assertEqual(text, message.ToJsonString())
parsed_message = timestamp_pb2.Timestamp()
parsed_message.FromJsonString(text)
self.assertEqual(message, parsed_message)
def CheckDurationConversion(self, message, text):
self.assertEqual(text, message.ToJsonString())
parsed_message = duration_pb2.Duration()
parsed_message.FromJsonString(text)
self.assertEqual(message, parsed_message)
class TimeUtilTest(TimeUtilTestBase):
def testTimestampSerializeAndParse(self):
message = timestamp_pb2.Timestamp()
# Generated output should contain 3, 6, or 9 fractional digits.
message.seconds = 0
message.nanos = 0
self.CheckTimestampConversion(message, '1970-01-01T00:00:00Z')
message.nanos = 10000000
self.CheckTimestampConversion(message, '1970-01-01T00:00:00.010Z')
message.nanos = 10000
self.CheckTimestampConversion(message, '1970-01-01T00:00:00.000010Z')
message.nanos = 10
self.CheckTimestampConversion(message, '1970-01-01T00:00:00.000000010Z')
# Test min timestamps.
message.seconds = -62135596800
message.nanos = 0
self.CheckTimestampConversion(message, '0001-01-01T00:00:00Z')
# Test max timestamps.
message.seconds = 253402300799
message.nanos = 999999999
self.CheckTimestampConversion(message, '9999-12-31T23:59:59.999999999Z')
# Test negative timestamps.
message.seconds = -1
self.CheckTimestampConversion(message, '1969-12-31T23:59:59.999999999Z')
# Parsing accepts an fractional digits as long as they fit into nano
# precision.
message.FromJsonString('1970-01-01T00:00:00.1Z')
self.assertEqual(0, message.seconds)
self.assertEqual(100000000, message.nanos)
# Parsing accepts offsets.
message.FromJsonString('1970-01-01T00:00:00-08:00')
self.assertEqual(8 * 3600, message.seconds)
self.assertEqual(0, message.nanos)
# It is not easy to check with current time. For test coverage only.
message.GetCurrentTime()
self.assertNotEqual(8 * 3600, message.seconds)
def testDurationSerializeAndParse(self):
message = duration_pb2.Duration()
# Generated output should contain 3, 6, or 9 fractional digits.
message.seconds = 0
message.nanos = 0
self.CheckDurationConversion(message, '0s')
message.nanos = 10000000
self.CheckDurationConversion(message, '0.010s')
message.nanos = 10000
self.CheckDurationConversion(message, '0.000010s')
message.nanos = 10
self.CheckDurationConversion(message, '0.000000010s')
# Test min and max
message.seconds = 315576000000
message.nanos = 999999999
self.CheckDurationConversion(message, '315576000000.999999999s')
message.seconds = -315576000000
message.nanos = -999999999
self.CheckDurationConversion(message, '-315576000000.999999999s')
# Parsing accepts an fractional digits as long as they fit into nano
# precision.
message.FromJsonString('0.1s')
self.assertEqual(100000000, message.nanos)
message.FromJsonString('0.0000001s')
self.assertEqual(100, message.nanos)
def testTimestampIntegerConversion(self):
message = timestamp_pb2.Timestamp()
message.FromNanoseconds(1)
self.assertEqual('1970-01-01T00:00:00.000000001Z',
message.ToJsonString())
self.assertEqual(1, message.ToNanoseconds())
message.FromNanoseconds(-1)
self.assertEqual('1969-12-31T23:59:59.999999999Z',
message.ToJsonString())
self.assertEqual(-1, message.ToNanoseconds())
message.FromMicroseconds(1)
self.assertEqual('1970-01-01T00:00:00.000001Z',
message.ToJsonString())
self.assertEqual(1, message.ToMicroseconds())
message.FromMicroseconds(-1)
self.assertEqual('1969-12-31T23:59:59.999999Z',
message.ToJsonString())
self.assertEqual(-1, message.ToMicroseconds())
message.FromMilliseconds(1)
self.assertEqual('1970-01-01T00:00:00.001Z',
message.ToJsonString())
self.assertEqual(1, message.ToMilliseconds())
message.FromMilliseconds(-1)
self.assertEqual('1969-12-31T23:59:59.999Z',
message.ToJsonString())
self.assertEqual(-1, message.ToMilliseconds())
message.FromSeconds(1)
self.assertEqual('1970-01-01T00:00:01Z',
message.ToJsonString())
self.assertEqual(1, message.ToSeconds())
message.FromSeconds(-1)
self.assertEqual('1969-12-31T23:59:59Z',
message.ToJsonString())
self.assertEqual(-1, message.ToSeconds())
message.FromNanoseconds(1999)
self.assertEqual(1, message.ToMicroseconds())
# For negative values, Timestamp will be rounded down.
# For example, "1969-12-31T23:59:59.5Z" (i.e., -0.5s) rounded to seconds
# will be "1969-12-31T23:59:59Z" (i.e., -1s) rather than
# "1970-01-01T00:00:00Z" (i.e., 0s).
message.FromNanoseconds(-1999)
self.assertEqual(-2, message.ToMicroseconds())
def testDurationIntegerConversion(self):
message = duration_pb2.Duration()
message.FromNanoseconds(1)
self.assertEqual('0.000000001s',
message.ToJsonString())
self.assertEqual(1, message.ToNanoseconds())
message.FromNanoseconds(-1)
self.assertEqual('-0.000000001s',
message.ToJsonString())
self.assertEqual(-1, message.ToNanoseconds())
message.FromMicroseconds(1)
self.assertEqual('0.000001s',
message.ToJsonString())
self.assertEqual(1, message.ToMicroseconds())
message.FromMicroseconds(-1)
self.assertEqual('-0.000001s',
message.ToJsonString())
self.assertEqual(-1, message.ToMicroseconds())
message.FromMilliseconds(1)
self.assertEqual('0.001s',
message.ToJsonString())
self.assertEqual(1, message.ToMilliseconds())
message.FromMilliseconds(-1)
self.assertEqual('-0.001s',
message.ToJsonString())
self.assertEqual(-1, message.ToMilliseconds())
message.FromSeconds(1)
self.assertEqual('1s', message.ToJsonString())
self.assertEqual(1, message.ToSeconds())
message.FromSeconds(-1)
self.assertEqual('-1s',
message.ToJsonString())
self.assertEqual(-1, message.ToSeconds())
# Test truncation behavior.
message.FromNanoseconds(1999)
self.assertEqual(1, message.ToMicroseconds())
# For negative values, Duration will be rounded towards 0.
message.FromNanoseconds(-1999)
self.assertEqual(-1, message.ToMicroseconds())
def testTimezoneNaiveDatetimeConversionNearEpoch(self):
message = timestamp_pb2.Timestamp()
naive_utc_epoch = datetime.datetime(1970, 1, 1)
message.FromDatetime(naive_utc_epoch)
self.assertEqual(0, message.seconds)
self.assertEqual(0, message.nanos)
self.assertEqual(naive_utc_epoch, message.ToDatetime())
naive_epoch_morning = datetime.datetime(1970, 1, 1, 8, 0, 0, 1)
message.FromDatetime(naive_epoch_morning)
self.assertEqual(8 * 3600, message.seconds)
self.assertEqual(1000, message.nanos)
self.assertEqual(naive_epoch_morning, message.ToDatetime())
message.FromMilliseconds(1999)
self.assertEqual(1, message.seconds)
self.assertEqual(999_000_000, message.nanos)
self.assertEqual(datetime.datetime(1970, 1, 1, 0, 0, 1, 999000),
message.ToDatetime())
def testTimezoneNaiveDatetimeConversionWhereTimestampLosesPrecision(self):
ts = timestamp_pb2.Timestamp()
naive_future = datetime.datetime(2555, 2, 22, 1, 2, 3, 456789)
# The float timestamp for this datetime does not represent the integer
# millisecond value with full precision.
self.assertNotEqual(
naive_future.astimezone(datetime.timezone.utc),
datetime.datetime.fromtimestamp(
naive_future.timestamp(), datetime.timezone.utc
),
)
# It still round-trips correctly.
ts.FromDatetime(naive_future)
self.assertEqual(naive_future, ts.ToDatetime())
def testTimezoneNaiveMaxDatetimeConversion(self):
ts = timestamp_pb2.Timestamp()
naive_max_datetime = datetime.datetime(9999, 12, 31, 23, 59, 59, 999999)
ts.FromDatetime(naive_max_datetime)
self.assertEqual(naive_max_datetime, ts.ToDatetime())
def testTimezoneNaiveMinDatetimeConversion(self):
ts = timestamp_pb2.Timestamp()
naive_min_datetime = datetime.datetime(1, 1, 1)
ts.FromDatetime(naive_min_datetime)
self.assertEqual(naive_min_datetime, ts.ToDatetime())
# Two hours after the Unix Epoch, around the world.
@_parameterized.named_parameters(
('London', [1970, 1, 1, 2], datetime.timezone.utc),
('Tokyo', [1970, 1, 1, 11], _TZ_JAPAN),
('LA', [1969, 12, 31, 18], _TZ_PACIFIC),
)
def testTimezoneAwareDatetimeConversion(self, date_parts, tzinfo):
original_datetime = datetime.datetime(*date_parts, tzinfo=tzinfo) # pylint:disable=g-tzinfo-datetime
message = timestamp_pb2.Timestamp()
message.FromDatetime(original_datetime)
self.assertEqual(7200, message.seconds)
self.assertEqual(0, message.nanos)
# ToDatetime() with no parameters produces a naive UTC datetime, i.e. it not
# only loses the original timezone information (e.g. US/Pacific) as it's
# "normalised" to UTC, but also drops the information that the datetime
# represents a UTC one.
naive_datetime = message.ToDatetime()
self.assertEqual(datetime.datetime(1970, 1, 1, 2), naive_datetime)
self.assertIsNone(naive_datetime.tzinfo)
self.assertNotEqual(original_datetime, naive_datetime) # not even for UTC!
# In contrast, ToDatetime(tzinfo=) produces an aware datetime in the given
# timezone.
aware_datetime = message.ToDatetime(tzinfo=tzinfo)
self.assertEqual(original_datetime, aware_datetime)
self.assertEqual(
datetime.datetime(1970, 1, 1, 2, tzinfo=datetime.timezone.utc),
aware_datetime)
self.assertEqual(tzinfo, aware_datetime.tzinfo)
@unittest.skipIf(
not has_zoneinfo,
'Versions without zoneinfo use a fixed-offset timezone that does not'
' demonstrate this problem.',
)
def testDatetimeConversionWithDifferentUtcOffsetThanEpoch(self):
# This timezone has a different UTC offset at this date than at the epoch.
# The datetime returned by FromDatetime needs to have the correct offset
# for the moment represented.
tz = _TZ_PACIFIC
dt = datetime.datetime(2016, 6, 26, tzinfo=tz)
epoch_dt = datetime.datetime.fromtimestamp(
0, tz=datetime.timezone.utc
).astimezone(tz)
self.assertNotEqual(dt.utcoffset(), epoch_dt.utcoffset())
ts = timestamp_pb2.Timestamp()
ts.FromDatetime(dt)
self.assertEqual(dt, ts.ToDatetime(tzinfo=dt.tzinfo))
def testTimezoneAwareDatetimeConversionWhereTimestampLosesPrecision(self):
tz = _TZ_PACIFIC
ts = timestamp_pb2.Timestamp()
tz_aware_future = datetime.datetime(2555, 2, 22, 1, 2, 3, 456789, tzinfo=tz)
# The float timestamp for this datetime does not represent the integer
# millisecond value with full precision.
self.assertNotEqual(
tz_aware_future,
datetime.datetime.fromtimestamp(tz_aware_future.timestamp(), tz),
)
# It still round-trips correctly.
ts.FromDatetime(tz_aware_future)
self.assertEqual(tz_aware_future, ts.ToDatetime(tz))
def testTimezoneAwareMaxDatetimeConversion(self):
ts = timestamp_pb2.Timestamp()
tz_aware_max_datetime = datetime.datetime(
9999, 12, 31, 23, 59, 59, 999999, tzinfo=datetime.timezone.utc
)
ts.FromDatetime(tz_aware_max_datetime)
self.assertEqual(
tz_aware_max_datetime, ts.ToDatetime(datetime.timezone.utc)
)
def testTimezoneAwareMinDatetimeConversion(self):
ts = timestamp_pb2.Timestamp()
tz_aware_min_datetime = datetime.datetime(
1, 1, 1, tzinfo=datetime.timezone.utc
)
ts.FromDatetime(tz_aware_min_datetime)
self.assertEqual(
tz_aware_min_datetime, ts.ToDatetime(datetime.timezone.utc)
)
# Two hours after the Unix Epoch, around the world.
@_parameterized.named_parameters(
('London', [1970, 1, 1, 2], datetime.timezone.utc),
('Tokyo', [1970, 1, 1, 11], _TZ_JAPAN),
('LA', [1969, 12, 31, 18], _TZ_PACIFIC),
)
def testTimestampAssignment(self, date_parts, tzinfo):
original_datetime = datetime.datetime(*date_parts, tzinfo=tzinfo) # pylint:disable=g-tzinfo-datetime
msg = more_messages_pb2.WKTMessage()
msg.optional_timestamp = original_datetime
self.assertEqual(7200, msg.optional_timestamp.seconds)
self.assertEqual(0, msg.optional_timestamp.nanos)
# Two hours after the Unix Epoch, around the world.
@_parameterized.named_parameters(
('London', [1970, 1, 1, 2], datetime.timezone.utc),
('Tokyo', [1970, 1, 1, 11], _TZ_JAPAN),
('LA', [1969, 12, 31, 18], _TZ_PACIFIC),
)
def testTimestampCreation(self, date_parts, tzinfo):
original_datetime = datetime.datetime(*date_parts, tzinfo=tzinfo) # pylint:disable=g-tzinfo-datetime
msg = more_messages_pb2.WKTMessage(optional_timestamp=original_datetime)
self.assertEqual(7200, msg.optional_timestamp.seconds)
self.assertEqual(0, msg.optional_timestamp.nanos)
msg2 = more_messages_pb2.WKTMessage(
optional_timestamp=msg.optional_timestamp
)
self.assertEqual(7200, msg2.optional_timestamp.seconds)
self.assertEqual(0, msg2.optional_timestamp.nanos)
@_parameterized.named_parameters(
(
'tz_aware_min_dt',
datetime.datetime(1, 1, 1, tzinfo=datetime.timezone.utc),
datetime.timedelta(hours=9),
-62135564400,
0,
),
(
'no_change',
datetime.datetime(1970, 1, 1, 11, tzinfo=_TZ_JAPAN),
datetime.timedelta(hours=0),
7200,
0,
),
)
def testTimestampAdd(self, old_time, time_delta, expected_sec, expected_nano):
msg = more_messages_pb2.WKTMessage()
msg.optional_timestamp = old_time
# Timestamp + timedelta
new_msg1 = more_messages_pb2.WKTMessage()
new_msg1.optional_timestamp = msg.optional_timestamp + time_delta
self.assertEqual(expected_sec, new_msg1.optional_timestamp.seconds)
self.assertEqual(expected_nano, new_msg1.optional_timestamp.nanos)
# timedelta + Timestamp
new_msg2 = more_messages_pb2.WKTMessage()
new_msg2.optional_timestamp = time_delta + msg.optional_timestamp
self.assertEqual(expected_sec, new_msg2.optional_timestamp.seconds)
self.assertEqual(expected_nano, new_msg2.optional_timestamp.nanos)
# Timestamp + Duration
msg.optional_duration.FromTimedelta(time_delta)
new_msg3 = more_messages_pb2.WKTMessage()
new_msg3.optional_timestamp = msg.optional_timestamp + msg.optional_duration
self.assertEqual(expected_sec, new_msg3.optional_timestamp.seconds)
self.assertEqual(expected_nano, new_msg3.optional_timestamp.nanos)
@_parameterized.named_parameters(
(
'test1',
datetime.datetime(999, 1, 1, tzinfo=datetime.timezone.utc),
datetime.timedelta(hours=9),
-30641792400,
0,
),
(
'no_change',
datetime.datetime(1970, 1, 1, 11, tzinfo=_TZ_JAPAN),
datetime.timedelta(hours=0),
7200,
0,
),
)
def testTimestampSub(self, old_time, time_delta, expected_sec, expected_nano):
msg = more_messages_pb2.WKTMessage()
msg.optional_timestamp = old_time
# Timestamp - timedelta
new_msg1 = more_messages_pb2.WKTMessage()
new_msg1.optional_timestamp = msg.optional_timestamp - time_delta
self.assertEqual(expected_sec, new_msg1.optional_timestamp.seconds)
self.assertEqual(expected_nano, new_msg1.optional_timestamp.nanos)
# Timestamp - Duration
msg.optional_duration = time_delta
new_msg2 = more_messages_pb2.WKTMessage()
new_msg2.optional_timestamp = msg.optional_timestamp - msg.optional_duration
self.assertEqual(expected_sec, new_msg2.optional_timestamp.seconds)
self.assertEqual(expected_nano, new_msg2.optional_timestamp.nanos)
result_msg = more_messages_pb2.WKTMessage()
result_msg.optional_timestamp = old_time - time_delta
# Timestamp - Timestamp
td = msg.optional_timestamp - result_msg.optional_timestamp
self.assertEqual(time_delta, td)
# Timestamp - datetime
td1 = msg.optional_timestamp - result_msg.optional_timestamp.ToDatetime()
self.assertEqual(time_delta, td1)
# datetime - Timestamp
td2 = msg.optional_timestamp.ToDatetime() - result_msg.optional_timestamp
self.assertEqual(time_delta, td2)
def testNanosOneSecond(self):
tz = _TZ_PACIFIC
ts = timestamp_pb2.Timestamp(nanos=1_000_000_000)
self.assertRaisesRegex(ValueError, 'Timestamp is not valid',
ts.ToDatetime)
def testNanosNegativeOneSecond(self):
ts = timestamp_pb2.Timestamp(nanos=-1_000_000_000)
self.assertRaisesRegex(ValueError, 'Timestamp is not valid',
ts.ToDatetime)
def testTimedeltaConversion(self):
message = duration_pb2.Duration()
message.FromNanoseconds(1999999999)
td = message.ToTimedelta()
self.assertEqual(1, td.seconds)
self.assertEqual(999999, td.microseconds)
message.FromNanoseconds(-1999999999)
td = message.ToTimedelta()
self.assertEqual(-1, td.days)
self.assertEqual(86398, td.seconds)
self.assertEqual(1, td.microseconds)
message.FromMicroseconds(-1)
td = message.ToTimedelta()
self.assertEqual(-1, td.days)
self.assertEqual(86399, td.seconds)
self.assertEqual(999999, td.microseconds)
converted_message = duration_pb2.Duration()
converted_message.FromTimedelta(td)
self.assertEqual(message, converted_message)
def testInvalidTimestamp(self):
message = timestamp_pb2.Timestamp()
self.assertRaisesRegex(
ValueError, 'Failed to parse timestamp: missing valid timezone offset.',
message.FromJsonString, '')
self.assertRaisesRegex(
ValueError, 'Failed to parse timestamp: invalid trailing data '
'1970-01-01T00:00:01Ztrail.', message.FromJsonString,
'1970-01-01T00:00:01Ztrail')
self.assertRaisesRegex(
ValueError, 'time data \'10000-01-01T00:00:00\' does not match'
' format \'%Y-%m-%dT%H:%M:%S\'', message.FromJsonString,
'10000-01-01T00:00:00.00Z')
self.assertRaisesRegex(
ValueError, 'nanos 0123456789012 more than 9 fractional digits.',
message.FromJsonString, '1970-01-01T00:00:00.0123456789012Z')
self.assertRaisesRegex(
ValueError,
(r'Invalid timezone offset value: \+08.'),
message.FromJsonString,
'1972-01-01T01:00:00.01+08',
)
self.assertRaisesRegex(ValueError, 'year (0 )?is out of range',
message.FromJsonString, '0000-01-01T00:00:00Z')
message.seconds = 253402300800
self.assertRaisesRegex(ValueError, 'Timestamp is not valid',
message.ToJsonString)
self.assertRaisesRegex(ValueError, 'Timestamp is not valid',
message.FromSeconds, -62135596801)
msg = more_messages_pb2.WKTMessage()
with self.assertRaises(AttributeError):
msg.optional_timestamp = 1
with self.assertRaises(AttributeError):
msg2 = more_messages_pb2.WKTMessage(optional_timestamp=1)
with self.assertRaises(TypeError):
msg.optional_timestamp + ''
with self.assertRaises(TypeError):
msg.optional_timestamp - 123
def testInvalidDuration(self):
message = duration_pb2.Duration()
self.assertRaisesRegex(ValueError, 'Duration must end with letter "s": 1.',
message.FromJsonString, '1')
self.assertRaisesRegex(ValueError, 'Couldn\'t parse duration: 1...2s.',
message.FromJsonString, '1...2s')
text = '-315576000001.000000000s'
self.assertRaisesRegex(
ValueError,
r'Duration is not valid\: Seconds -315576000001 must be in range'
r' \[-315576000000\, 315576000000\].', message.FromJsonString, text)
text = '315576000001.000000000s'
self.assertRaisesRegex(
ValueError,
r'Duration is not valid\: Seconds 315576000001 must be in range'
r' \[-315576000000\, 315576000000\].', message.FromJsonString, text)
message.seconds = -315576000001
message.nanos = 0
self.assertRaisesRegex(
ValueError,
r'Duration is not valid\: Seconds -315576000001 must be in range'
r' \[-315576000000\, 315576000000\].', message.ToJsonString)
message.seconds = 0
message.nanos = 999999999 + 1
self.assertRaisesRegex(
ValueError, r'Duration is not valid\: Nanos 1000000000 must be in range'
r' \[-999999999\, 999999999\].', message.ToJsonString)
message.seconds = -1
message.nanos = 1
self.assertRaisesRegex(ValueError,
r'Duration is not valid\: Sign mismatch.',
message.ToJsonString)
msg = more_messages_pb2.WKTMessage()
with self.assertRaises(AttributeError):
msg.optional_duration = 1
with self.assertRaises(AttributeError):
msg2 = more_messages_pb2.WKTMessage(optional_duration=1)
with self.assertRaises(TypeError):
msg.optional_duration + ''
with self.assertRaises(TypeError):
123 - msg.optional_duration
@_parameterized.named_parameters(
('test1', -1999999, -1, -999999000), ('test2', 1999999, 1, 999999000)
)
def testDurationAssignment(self, microseconds, expected_sec, expected_nano):
message = more_messages_pb2.WKTMessage()
expected_td = datetime.timedelta(microseconds=microseconds)
message.optional_duration = expected_td
self.assertEqual(expected_td, message.optional_duration.ToTimedelta())
self.assertEqual(expected_sec, message.optional_duration.seconds)
self.assertEqual(expected_nano, message.optional_duration.nanos)
@_parameterized.named_parameters(
('test1', -1999999, -1, -999999000), ('test2', 1999999, 1, 999999000)
)
def testDurationCreation(self, microseconds, expected_sec, expected_nano):
message = more_messages_pb2.WKTMessage(
optional_duration=datetime.timedelta(microseconds=microseconds)
)
expected_td = datetime.timedelta(microseconds=microseconds)
self.assertEqual(expected_td, message.optional_duration.ToTimedelta())
self.assertEqual(expected_sec, message.optional_duration.seconds)
self.assertEqual(expected_nano, message.optional_duration.nanos)
@_parameterized.named_parameters(
(
'tz_aware_min_dt',
datetime.datetime(1, 1, 1, tzinfo=datetime.timezone.utc),
datetime.timedelta(hours=9),
-62135564400,
0,
),
(
'no_change',
datetime.datetime(1970, 1, 1, 11, tzinfo=_TZ_JAPAN),
datetime.timedelta(hours=0),
7200,
0,
),
)
def testDurationAdd(self, old_time, time_delta, expected_sec, expected_nano):
msg = more_messages_pb2.WKTMessage()
msg.optional_duration = time_delta
msg.optional_timestamp = old_time
# Duration + datetime
msg1 = more_messages_pb2.WKTMessage()
msg1.optional_timestamp = msg.optional_duration + old_time
self.assertEqual(expected_sec, msg1.optional_timestamp.seconds)
self.assertEqual(expected_nano, msg1.optional_timestamp.nanos)
# datetime + Duration
msg2 = more_messages_pb2.WKTMessage()
msg2.optional_timestamp = old_time + msg.optional_duration
self.assertEqual(expected_sec, msg2.optional_timestamp.seconds)
self.assertEqual(expected_nano, msg2.optional_timestamp.nanos)
# Duration + Timestamp
msg3 = more_messages_pb2.WKTMessage()
msg3.optional_timestamp = msg.optional_duration + msg.optional_timestamp
self.assertEqual(expected_sec, msg3.optional_timestamp.seconds)
self.assertEqual(expected_nano, msg3.optional_timestamp.nanos)
@_parameterized.named_parameters(
(
'test1',
datetime.datetime(999, 1, 1, tzinfo=datetime.timezone.utc),
datetime.timedelta(hours=9),
-30641792400,
0,
),
(
'no_change',
datetime.datetime(1970, 1, 1, 11, tzinfo=_TZ_JAPAN),
datetime.timedelta(hours=0),
7200,
0,
),
)
def testDurationSub(self, old_time, time_delta, expected_sec, expected_nano):
msg = more_messages_pb2.WKTMessage()
msg.optional_duration = time_delta
# datetime - Duration
msg.optional_timestamp = old_time - msg.optional_duration
self.assertEqual(expected_sec, msg.optional_timestamp.seconds)
self.assertEqual(expected_nano, msg.optional_timestamp.nanos)
class StructTest(unittest.TestCase):
def testStruct(self):
struct = struct_pb2.Struct()
self.assertIsInstance(struct, collections_abc.Mapping)
self.assertEqual(0, len(struct))
struct_class = struct.__class__
struct['key1'] = 5
struct['key2'] = 'abc'
struct['key3'] = True
struct.get_or_create_struct('key4')['subkey'] = 11.0
struct_list = struct.get_or_create_list('key5')
self.assertIsInstance(struct_list, collections_abc.Sequence)
struct_list.extend([6, 'seven', True, False, None])
struct_list.add_struct()['subkey2'] = 9
struct['key6'] = {'subkey': {}}
struct['key7'] = [2, False]
self.assertEqual(7, len(struct))
self.assertTrue(isinstance(struct, well_known_types.Struct))
self.assertEqual(5, struct['key1'])
self.assertEqual('abc', struct['key2'])
self.assertIs(True, struct['key3'])
self.assertEqual(11, struct['key4']['subkey'])
inner_struct = struct_class()
inner_struct['subkey2'] = 9
self.assertEqual([6, 'seven', True, False, None, inner_struct],
list(struct['key5'].items()))
self.assertEqual({}, dict(struct['key6']['subkey'].fields))
self.assertEqual([2, False], list(struct['key7'].items()))
serialized = struct.SerializeToString()
struct2 = struct_pb2.Struct()
struct2.ParseFromString(serialized)
self.assertEqual(struct, struct2)
for key, value in struct.items():
self.assertIn(key, struct)
self.assertIn(key, struct2)
self.assertEqual(value, struct2[key])
self.assertEqual(7, len(struct.keys()))
self.assertEqual(7, len(struct.values()))
for key in struct.keys():
self.assertIn(key, struct)
self.assertIn(key, struct2)
self.assertEqual(struct[key], struct2[key])
item = (next(iter(struct.keys())), next(iter(struct.values())))
self.assertEqual(item, next(iter(struct.items())))
self.assertTrue(isinstance(struct2, well_known_types.Struct))
self.assertEqual(5, struct2['key1'])
self.assertEqual('abc', struct2['key2'])
self.assertIs(True, struct2['key3'])
self.assertEqual(11, struct2['key4']['subkey'])
self.assertEqual([6, 'seven', True, False, None, inner_struct],
list(struct2['key5'].items()))
struct_list = struct2['key5']
self.assertEqual(6, struct_list[0])
self.assertEqual('seven', struct_list[1])
self.assertEqual(True, struct_list[2])
self.assertEqual(False, struct_list[3])
self.assertEqual(None, struct_list[4])
self.assertEqual(inner_struct, struct_list[5])
struct_list[1] = 7
self.assertEqual(7, struct_list[1])
struct_list.add_list().extend([1, 'two', True, False, None])
self.assertEqual([1, 'two', True, False, None],
list(struct_list[6].items()))
struct_list.extend([{'nested_struct': 30}, ['nested_list', 99], {}, []])
self.assertEqual(11, len(struct_list.values))
self.assertEqual(30, struct_list[7]['nested_struct'])
self.assertEqual('nested_list', struct_list[8][0])
self.assertEqual(99, struct_list[8][1])
self.assertEqual({}, dict(struct_list[9].fields))
self.assertEqual([], list(struct_list[10].items()))
struct_list[0] = {'replace': 'set'}
struct_list[1] = ['replace', 'set']
self.assertEqual('set', struct_list[0]['replace'])
self.assertEqual(['replace', 'set'], list(struct_list[1].items()))
text_serialized = str(struct)
struct3 = struct_pb2.Struct()
text_format.Merge(text_serialized, struct3)
self.assertEqual(struct, struct3)
struct.get_or_create_struct('key3')['replace'] = 12
self.assertEqual(12, struct['key3']['replace'])
# Tests empty list.
struct.get_or_create_list('empty_list')
empty_list = struct['empty_list']
self.assertEqual([], list(empty_list.items()))
list2 = struct_pb2.ListValue()
list2.add_list()
empty_list = list2[0]
self.assertEqual([], list(empty_list.items()))
# Tests empty struct.
struct.get_or_create_struct('empty_struct')
empty_struct = struct['empty_struct']
self.assertEqual({}, dict(empty_struct.fields))
list2.add_struct()
empty_struct = list2[1]
self.assertEqual({}, dict(empty_struct.fields))
self.assertEqual(9, len(struct))
del struct['key3']
del struct['key4']
self.assertEqual(7, len(struct))
self.assertEqual(6, len(struct['key5']))
del struct['key5'][1]
self.assertEqual(5, len(struct['key5']))
self.assertEqual([6, True, False, None, inner_struct],
list(struct['key5'].items()))
def testInOperator(self):
# in operator for Struct
struct = struct_pb2.Struct()
struct['key'] = 5
self.assertIn('key', struct)
self.assertNotIn('fields', struct)
with self.assertRaises(TypeError) as e:
1 in struct
# in operator for ListValue
struct_list = struct.get_or_create_list('key2')
self.assertIsInstance(struct_list, collections_abc.Sequence)
struct_list.extend([6, 'seven', True, False, None])
struct_list.add_struct()['subkey'] = 9
inner_struct = struct.__class__()
inner_struct['subkey'] = 9
self.assertIn(6, struct_list)
self.assertIn('seven', struct_list)
self.assertIn(True, struct_list)
self.assertIn(False, struct_list)
self.assertIn(None, struct_list)
self.assertIn(inner_struct, struct_list)
self.assertNotIn('values', struct_list)
self.assertNotIn(10, struct_list)
for item in struct_list:
self.assertIn(item, struct_list)
def testStructAssignment(self):
# Tests struct assignment from another struct
s1 = struct_pb2.Struct()
s2 = struct_pb2.Struct()
for value in [1, 'a', [1], ['a'], {'a': 'b'}]:
s1['x'] = value
s2['x'] = s1['x']
self.assertEqual(s1['x'], s2['x'])
dictionary = {
'key1': 5.0,
'key2': 'abc',
'key3': {'subkey': 11.0, 'k': False},
}
msg = more_messages_pb2.WKTMessage()
msg.optional_struct = dictionary
self.assertEqual(msg.optional_struct, dictionary)
# Tests assign is not merge
dictionary2 = {
'key4': {'subkey': 11.0, 'k': True},
}
msg.optional_struct = dictionary2
self.assertEqual(msg.optional_struct, dictionary2)
# Tests assign empty
msg2 = more_messages_pb2.WKTMessage()
self.assertNotIn('optional_struct', msg2)
msg2.optional_struct = {}
self.assertIn('optional_struct', msg2)
self.assertEqual(msg2.optional_struct, {})
def testListValueAssignment(self):
list_value = [6, 'seven', True, False, None, {}]
msg = more_messages_pb2.WKTMessage()
msg.optional_list_value = list_value
self.assertEqual(msg.optional_list_value, list_value)
def testStructConstruction(self):
dictionary = {
'key1': 5.0,
'key2': 'abc',
'key3': {'subkey': 11.0, 'k': False},
}
list_value = [6, 'seven', True, False, None, dictionary]
msg = more_messages_pb2.WKTMessage(
optional_struct=dictionary, optional_list_value=list_value
)
self.assertEqual(len(msg.optional_struct), len(dictionary))
self.assertEqual(msg.optional_struct, dictionary)
self.assertEqual(len(msg.optional_list_value), len(list_value))
self.assertEqual(msg.optional_list_value, list_value)
msg2 = more_messages_pb2.WKTMessage(
optional_struct={}, optional_list_value=[]
)
self.assertIn('optional_struct', msg2)
self.assertIn('optional_list_value', msg2)
self.assertEqual(msg2.optional_struct, {})
self.assertEqual(msg2.optional_list_value, [])
def testSpecialStructConstruct(self):
dictionary = {'key1': 6.0}
msg = more_messages_pb2.WKTMessage(optional_struct=dictionary)
self.assertEqual(msg.optional_struct, dictionary)
dictionary2 = {'fields': 7.0}
msg2 = more_messages_pb2.WKTMessage(optional_struct=dictionary2)
self.assertEqual(msg2.optional_struct, dictionary2)
# Construct Struct as normal message
value_msg = struct_pb2.Value(number_value=5.0)
dictionary3 = {'fields': {'key1': value_msg}}
msg3 = more_messages_pb2.WKTMessage(optional_struct=dictionary3)
self.assertEqual(msg3.optional_struct, {'key1': 5.0})
def testMergeFrom(self):
struct = struct_pb2.Struct()
struct_class = struct.__class__
dictionary = {
'key1': 5,
'key2': 'abc',
'key3': True,
'key4': {'subkey': 11.0},
'key5': [6, 'seven', True, False, None, {'subkey2': 9}],
'key6': [['nested_list', True]],
'empty_struct': {},
'empty_list': [],
'tuple': ((3,2), ())
}
struct.update(dictionary)
self.assertEqual(5, struct['key1'])
self.assertEqual('abc', struct['key2'])
self.assertIs(True, struct['key3'])
self.assertEqual(11, struct['key4']['subkey'])
inner_struct = struct_class()
inner_struct['subkey2'] = 9
self.assertEqual([6, 'seven', True, False, None, inner_struct],
list(struct['key5'].items()))
self.assertEqual(2, len(struct['key6'][0].values))
self.assertEqual('nested_list', struct['key6'][0][0])
self.assertEqual(True, struct['key6'][0][1])
empty_list = struct['empty_list']
self.assertEqual([], list(empty_list.items()))
empty_struct = struct['empty_struct']
self.assertEqual({}, dict(empty_struct.fields))
# According to documentation: "When parsing from the wire or when merging,
# if there are duplicate map keys the last key seen is used".
duplicate = {
'key4': {'replace': 20},
'key5': [[False, 5]]
}
struct.update(duplicate)
self.assertEqual(1, len(struct['key4'].fields))
self.assertEqual(20, struct['key4']['replace'])
self.assertEqual(1, len(struct['key5'].values))
self.assertEqual(False, struct['key5'][0][0])
self.assertEqual(5, struct['key5'][0][1])
class AnyTest(unittest.TestCase):
def testAnyMessage(self):
# Creates and sets message.
msg = any_test_pb2.TestAny()
msg_descriptor = msg.DESCRIPTOR
all_types = unittest_pb2.TestAllTypes()
all_descriptor = all_types.DESCRIPTOR
all_types.repeated_string.append(u'\u00fc\ua71f')
# Packs to Any.
msg.value.Pack(all_types)
self.assertEqual(msg.value.type_url,
'type.googleapis.com/%s' % all_descriptor.full_name)
self.assertEqual(msg.value.value,
all_types.SerializeToString())
# Tests Is() method.
self.assertTrue(msg.value.Is(all_descriptor))
self.assertFalse(msg.value.Is(msg_descriptor))
# Unpacks Any.
unpacked_message = unittest_pb2.TestAllTypes()
self.assertTrue(msg.value.Unpack(unpacked_message))
self.assertEqual(all_types, unpacked_message)
# Unpacks to different type.
self.assertFalse(msg.value.Unpack(msg))
# Only Any messages have Pack method.
try:
msg.Pack(all_types)
except AttributeError:
pass
else:
raise AttributeError('%s should not have Pack method.' %
msg_descriptor.full_name)
def testUnpackWithNoSlashInTypeUrl(self):
msg = any_test_pb2.TestAny()
all_types = unittest_pb2.TestAllTypes()
all_descriptor = all_types.DESCRIPTOR
msg.value.Pack(all_types)
# Reset type_url to part of type_url after '/'
msg.value.type_url = msg.value.TypeName()
self.assertFalse(msg.value.Is(all_descriptor))
unpacked_message = unittest_pb2.TestAllTypes()
self.assertFalse(msg.value.Unpack(unpacked_message))
def testMessageName(self):
# Creates and sets message.
submessage = any_test_pb2.TestAny()
submessage.int_value = 12345
msg = any_pb2.Any()
msg.Pack(submessage)
self.assertEqual(msg.TypeName(), 'google.protobuf.internal.TestAny')
def testPackWithCustomTypeUrl(self):
submessage = any_test_pb2.TestAny()
submessage.int_value = 12345
msg = any_pb2.Any()
# Pack with a custom type URL prefix.
msg.Pack(submessage, 'type.myservice.com')
self.assertEqual(msg.type_url,
'type.myservice.com/%s' % submessage.DESCRIPTOR.full_name)
# Pack with a custom type URL prefix ending with '/'.
msg.Pack(submessage, 'type.myservice.com/')
self.assertEqual(msg.type_url,
'type.myservice.com/%s' % submessage.DESCRIPTOR.full_name)
# Pack with an empty type URL prefix.
msg.Pack(submessage, '')
self.assertEqual(msg.type_url,
'/%s' % submessage.DESCRIPTOR.full_name)
# Test unpacking the type.
unpacked_message = any_test_pb2.TestAny()
self.assertTrue(msg.Unpack(unpacked_message))
self.assertEqual(submessage, unpacked_message)
def testPackDeterministic(self):
submessage = any_test_pb2.TestAny()
for i in range(10):
submessage.map_value[str(i)] = i * 2
msg = any_pb2.Any()
msg.Pack(submessage, deterministic=True)
serialized = msg.SerializeToString(deterministic=True)
golden = (b'\n4type.googleapis.com/google.protobuf.internal.TestAny\x12F'
b'\x1a\x05\n\x010\x10\x00\x1a\x05\n\x011\x10\x02\x1a\x05\n\x01'
b'2\x10\x04\x1a\x05\n\x013\x10\x06\x1a\x05\n\x014\x10\x08\x1a'
b'\x05\n\x015\x10\n\x1a\x05\n\x016\x10\x0c\x1a\x05\n\x017\x10'
b'\x0e\x1a\x05\n\x018\x10\x10\x1a\x05\n\x019\x10\x12')
self.assertEqual(golden, serialized)
if __name__ == '__main__':
unittest.main()