| # Protocol Buffers - Google's data interchange format |
| # Copyright 2008 Google Inc. All rights reserved. |
| # |
| # Use of this source code is governed by a BSD-style |
| # license that can be found in the LICENSE file or at |
| # https://developers.google.com/open-source/licenses/bsd |
| |
| """Contains the Nextgen Pythonic protobuf APIs.""" |
| |
| import io |
| from typing import Type, TypeVar |
| |
| from google.protobuf.internal import decoder |
| from google.protobuf.internal import encoder |
| from google.protobuf.message import Message |
| |
| _MESSAGE = TypeVar('_MESSAGE', bound='Message') |
| |
| |
| def serialize(message: _MESSAGE, deterministic: bool = None) -> bytes: |
| """Return the serialized proto. |
| |
| Args: |
| message: The proto message to be serialized. |
| deterministic: If true, requests deterministic serialization |
| of the protobuf, with predictable ordering of map keys. |
| |
| Returns: |
| A binary bytes representation of the message. |
| """ |
| return message.SerializeToString(deterministic=deterministic) |
| |
| |
| def parse(message_class: Type[_MESSAGE], payload: bytes) -> _MESSAGE: |
| """Given a serialized data in binary form, deserialize it into a Message. |
| |
| Args: |
| message_class: The message meta class. |
| payload: A serialized bytes in binary form. |
| |
| Returns: |
| A new message deserialized from payload. |
| """ |
| new_message = message_class() |
| new_message.ParseFromString(payload) |
| return new_message |
| |
| |
| def serialize_length_prefixed(message: _MESSAGE, output: io.BytesIO) -> None: |
| """Writes the size of the message as a varint and the serialized message. |
| |
| Writes the size of the message as a varint and then the serialized message. |
| This allows more data to be written to the output after the message. Use |
| parse_length_prefixed to parse messages written by this method. |
| |
| The output stream must be buffered, e.g. using |
| https://docs.python.org/3/library/io.html#buffered-streams. |
| |
| Example usage: |
| out = io.BytesIO() |
| for msg in message_list: |
| proto.serialize_length_prefixed(msg, out) |
| |
| Args: |
| message: The protocol buffer message that should be serialized. |
| output: BytesIO or custom buffered IO that data should be written to. |
| """ |
| size = message.ByteSize() |
| encoder._VarintEncoder()(output.write, size) |
| out_size = output.write(serialize(message)) |
| |
| if out_size != size: |
| raise TypeError( |
| 'Failed to write complete message (wrote: %d, expected: %d)' |
| '. Ensure output is using buffered IO.' % (out_size, size) |
| ) |
| |
| |
| def parse_length_prefixed( |
| message_class: Type[_MESSAGE], input_bytes: io.BytesIO |
| ) -> _MESSAGE: |
| """Parse a message from input_bytes. |
| |
| Args: |
| message_class: The protocol buffer message class that parser should parse. |
| input_bytes: A buffered input. |
| |
| Example usage: |
| while True: |
| msg = proto.parse_length_prefixed(message_class, input_bytes) |
| if msg is None: |
| break |
| ... |
| |
| Returns: |
| A parsed message if successful. None if input_bytes is at EOF. |
| """ |
| size = decoder._DecodeVarint(input_bytes) |
| if size is None: |
| # It is the end of buffered input. See example usage in the |
| # API description. |
| return None |
| |
| message = message_class() |
| |
| if size == 0: |
| return message |
| |
| parsed_size = message.ParseFromString(input_bytes.read(size)) |
| if parsed_size != size: |
| raise ValueError( |
| 'Truncated message or non-buffered input_bytes: ' |
| 'Expected {0} bytes but only {1} bytes parsed for ' |
| '{2}.'.format(size, parsed_size, message.DESCRIPTOR.name) |
| ) |
| return message |