| /* |
| * Copyright (C) 2021 The Android Open Source Project |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #ifndef SRC_PROTOZERO_PROTO_RING_BUFFER_H_ |
| #define SRC_PROTOZERO_PROTO_RING_BUFFER_H_ |
| |
| #include <stdint.h> |
| |
| #include "perfetto/ext/base/paged_memory.h" |
| |
| namespace protozero { |
| |
| // This class buffers and tokenizes proto messages. |
| // |
| // From a logical level, it works with a sequence of protos like this. |
| // [ header 1 ] [ payload 1 ] |
| // [ header 2 ] [ payload 2 ] |
| // [ header 3 ] [ payload 3 ] |
| // Where [ header ] is a variable-length sequence of: |
| // [ Field ID = 1, type = length-delimited] [ length (varint) ]. |
| // |
| // The input to this class is byte-oriented, not message-oriented (like a TCP |
| // stream or a pipe). The caller is not required to respect the boundaries of |
| // each message; only guarantee that data is not lost or duplicated. The |
| // following sequence of inbound events is possible: |
| // 1. [ hdr 1 (incomplete) ... ] |
| // 2. [ ... hdr 1 ] [ payload 1 ] [ hdr 2 ] [ payoad 2 ] [ hdr 3 ] [ pay... ] |
| // 3. [ ...load 3 ] |
| // |
| // This class maintains inbound requests in a ring buffer. |
| // The expected usage is: |
| // ring_buf.Append(data, len); |
| // for (;;) { |
| // auto msg = ring_buf.ReadMessage(); |
| // if (!msg.valid()) |
| // break; |
| // Decode(msg); |
| // } |
| // |
| // After each call to Append, the caller is expected to call ReadMessage() until |
| // it returns an invalid message (signalling no more messages could be decoded). |
| // Note that a single Append can "unblock" > 1 messages, which is why the caller |
| // needs to keep calling ReadMessage in a loop. |
| // |
| // Internal architecture |
| // --------------------- |
| // Internally this is similar to a ring-buffer, with the caveat that it never |
| // wraps, it only expands. Expansions are rare. The deal is that in most cases |
| // the read cursor follows very closely the write cursor. For instance, if the |
| // underlying transport behaves as a dgram socket, after each Append, the read |
| // cursor will chase completely the write cursor. Even if the underlying stream |
| // is not always atomic, the expectation is that the read cursor will eventually |
| // reach the write one within few messages. |
| // A visual example, imagine we have four messages: 2it 4will 2be 4fine |
| // Visually: |
| // |
| // Append("2it4wi"): A message and a bit: |
| // [ 2it 4wi ] |
| // ^R ^W |
| // |
| // After the ReadMessage(), the 1st message will be read, but not the 2nd. |
| // [ 2it 4wi ] |
| // ^R ^W |
| // |
| // Append("ll2be4f") |
| // [ 2it 4will 2be 4f ] |
| // ^R ^W |
| // |
| // After the ReadMessage() loop: |
| // [ 2it 4will 2be 4f ] |
| // ^R ^W |
| // Append("ine") |
| // [ 2it 4will 2be 4fine ] |
| // ^R ^W |
| // |
| // In the next ReadMessage() the R cursor will chase the W cursor. When this |
| // happens (very frequent) we can just reset both cursors to 0 and restart. |
| // If we are unlucky and get to the end of the buffer, two things happen: |
| // 1. We try first to recompact the buffer, moving everything left by R. |
| // 2. If still there isn't enough space, we expand the buffer. |
| // Given that each message is expected to be at most kMaxMsgSize (64 MB), the |
| // expansion is bound at 2 * kMaxMsgSize. |
| |
| class RingBufferMessageReader { |
| public: |
| static constexpr size_t kMaxMsgSize = 64 * 1024 * 1024; |
| struct Message { |
| const uint8_t* start = nullptr; |
| uint32_t len = 0; |
| uint32_t field_id = 0; |
| bool fatal_framing_error = false; |
| const uint8_t* end() const { return start + len; } |
| inline bool valid() const { return !!start; } |
| }; |
| |
| RingBufferMessageReader(); |
| virtual ~RingBufferMessageReader(); |
| RingBufferMessageReader(const RingBufferMessageReader&) = delete; |
| RingBufferMessageReader& operator=(const RingBufferMessageReader&) = delete; |
| |
| // Appends data into the ring buffer, recompacting or resizing it if needed. |
| // Will invaildate the pointers previously handed out. |
| void Append(const void* data, size_t len); |
| |
| // If a message can be read, it returns the boundaries of the message |
| // (without including the preamble) and advances the read cursor. |
| // If no message is available, returns a null range. |
| // The returned pointer is only valid until the next call to Append(), as |
| // that can recompact or resize the underlying buffer. |
| Message ReadMessage(); |
| |
| // Exposed for testing. |
| size_t capacity() const { return buf_.size(); } |
| size_t avail() const { return buf_.size() - (wr_ - rd_); } |
| |
| protected: |
| // Subclasses must implement the header parsing. |
| virtual Message TryReadMessage(const uint8_t* start, const uint8_t* end) = 0; |
| |
| private: |
| perfetto::base::PagedMemory buf_; |
| Message fastpath_{}; |
| bool failed_ = false; // Set in case of an unrecoverable framing faiulre. |
| size_t rd_ = 0; // Offset of the read cursor in |buf_|. |
| size_t wr_ = 0; // Offset of the write cursor in |buf_|. |
| }; |
| |
| class ProtoRingBuffer final : public RingBufferMessageReader { |
| public: |
| ProtoRingBuffer(); |
| ~ProtoRingBuffer() override final; |
| |
| protected: |
| Message TryReadMessage(const uint8_t* start, |
| const uint8_t* end) override final; |
| }; |
| |
| } // namespace protozero |
| |
| #endif // SRC_PROTOZERO_PROTO_RING_BUFFER_H_ |