| /* |
| * Copyright (C) 2021 The Android Open Source Project |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "src/protozero/proto_ring_buffer.h" |
| |
| #include "perfetto/base/logging.h" |
| #include "perfetto/ext/base/paged_memory.h" |
| #include "perfetto/protozero/proto_utils.h" |
| |
| namespace protozero { |
| |
| namespace { |
| constexpr size_t kGrowBytes = 128 * 1024; |
| |
| inline ProtoRingBuffer::Message FramingError() { |
| ProtoRingBuffer::Message msg{}; |
| msg.fatal_framing_error = true; |
| return msg; |
| } |
| |
| // Tries to decode a length-delimited proto field from |start|. |
| // Returns a valid boundary if the preamble is valid and the length is within |
| // |end|, or an invalid message otherwise. |
| ProtoRingBuffer::Message TryReadProtoMessage(const uint8_t* start, |
| const uint8_t* end) { |
| namespace proto_utils = protozero::proto_utils; |
| uint64_t field_tag = 0; |
| auto* start_of_len = proto_utils::ParseVarInt(start, end, &field_tag); |
| if (start_of_len == start) |
| return ProtoRingBuffer::Message{}; // Not enough data. |
| |
| const uint32_t tag = field_tag & 0x07; |
| if (tag != |
| static_cast<uint32_t>(proto_utils::ProtoWireType::kLengthDelimited)) { |
| PERFETTO_ELOG("RPC framing error, unexpected msg tag 0x%xu", tag); |
| return FramingError(); |
| } |
| |
| uint64_t msg_len = 0; |
| auto* start_of_msg = proto_utils::ParseVarInt(start_of_len, end, &msg_len); |
| if (start_of_msg == start_of_len) |
| return ProtoRingBuffer::Message{}; // Not enough data. |
| |
| if (msg_len > ProtoRingBuffer::kMaxMsgSize) { |
| PERFETTO_ELOG("RPC framing error, message too large (%" PRIu64 " > %zu)", |
| msg_len, ProtoRingBuffer::kMaxMsgSize); |
| return FramingError(); |
| } |
| |
| if (start_of_msg + msg_len > end) |
| return ProtoRingBuffer::Message{}; // Not enough data. |
| |
| ProtoRingBuffer::Message msg{}; |
| msg.start = start_of_msg; |
| msg.len = static_cast<uint32_t>(msg_len); |
| msg.field_id = static_cast<uint32_t>(field_tag >> 3); |
| return msg; |
| } |
| |
| } // namespace |
| |
| RingBufferMessageReader::RingBufferMessageReader() |
| : buf_(perfetto::base::PagedMemory::Allocate(kGrowBytes)) {} |
| RingBufferMessageReader::~RingBufferMessageReader() = default; |
| |
| void RingBufferMessageReader::Append(const void* data_void, size_t data_len) { |
| if (failed_) |
| return; |
| const uint8_t* data = static_cast<const uint8_t*>(data_void); |
| PERFETTO_DCHECK(wr_ <= buf_.size()); |
| PERFETTO_DCHECK(wr_ >= rd_); |
| |
| // If the last call to ReadMessage() consumed all the data in the buffer and |
| // there are no incomplete messages pending, restart from the beginning rather |
| // than keep ringing. This is the most common case. |
| if (rd_ == wr_) |
| rd_ = wr_ = 0; |
| |
| // The caller is expected to always issue a ReadMessage() after each Append(). |
| PERFETTO_CHECK(!fastpath_.valid()); |
| if (rd_ == wr_) { |
| auto msg = TryReadMessage(data, data + data_len); |
| if (msg.valid() && msg.end() == (data + data_len)) { |
| // Fastpath: in many cases, the underlying stream will effectively |
| // preserve the atomicity of messages for most small messages. |
| // In this case we can avoid the extra buf_ roundtrip and just pass a |
| // pointer to |data| + (proto preamble len). |
| // The next call to ReadMessage)= will return |fastpath_|. |
| fastpath_ = std::move(msg); |
| return; |
| } |
| } |
| |
| size_t avail = buf_.size() - wr_; |
| if (data_len > avail) { |
| // This whole section should be hit extremely rarely. |
| |
| // Try first just recompacting the buffer by moving everything to the left. |
| // This can happen if we received "a message and a bit" on each Append call |
| // so we ended pup in a situation like: |
| // buf_: [unused space] [msg1 incomplete] |
| // ^rd_ ^wr_ |
| // |
| // After recompaction: |
| // buf_: [msg1 incomplete] |
| // ^rd_ ^wr_ |
| uint8_t* buf = static_cast<uint8_t*>(buf_.Get()); |
| memmove(&buf[0], &buf[rd_], wr_ - rd_); |
| avail += rd_; |
| wr_ -= rd_; |
| rd_ = 0; |
| if (data_len > avail) { |
| // The compaction didn't free up enough space and we need to expand the |
| // ring buffer. Yes, we could have detected this earlier and split the |
| // code paths, rather than first compacting and then realizing it wasn't |
| // sufficient. However, that would make the code harder to reason about, |
| // creating code paths that are nearly never hit, hence making it more |
| // likely to accumulate bugs in future. All this is very rare. |
| size_t new_size = buf_.size(); |
| while (data_len > new_size - wr_) |
| new_size += kGrowBytes; |
| if (new_size > kMaxMsgSize * 2) { |
| failed_ = true; |
| return; |
| } |
| auto new_buf = perfetto::base::PagedMemory::Allocate(new_size); |
| memcpy(new_buf.Get(), buf_.Get(), buf_.size()); |
| buf_ = std::move(new_buf); |
| avail = new_size - wr_; |
| // No need to touch rd_ / wr_ cursors. |
| } |
| } |
| |
| // Append the received data at the end of the ring buffer. |
| uint8_t* buf = static_cast<uint8_t*>(buf_.Get()); |
| memcpy(&buf[wr_], data, data_len); |
| wr_ += data_len; |
| } |
| |
| RingBufferMessageReader::Message RingBufferMessageReader::ReadMessage() { |
| if (failed_) |
| return FramingError(); |
| |
| if (fastpath_.valid()) { |
| // The fastpath can only be hit when the buffer is empty. |
| PERFETTO_CHECK(rd_ == wr_); |
| auto msg = std::move(fastpath_); |
| fastpath_ = Message{}; |
| return msg; |
| } |
| |
| uint8_t* buf = static_cast<uint8_t*>(buf_.Get()); |
| |
| PERFETTO_DCHECK(rd_ <= wr_); |
| if (rd_ >= wr_) |
| return Message{}; // Completely empty. |
| |
| auto msg = TryReadMessage(&buf[rd_], &buf[wr_]); |
| if (!msg.valid()) { |
| failed_ = failed_ || msg.fatal_framing_error; |
| return msg; // Return |msg| because it could be a framing error. |
| } |
| |
| const uint8_t* msg_end = msg.start + msg.len; |
| PERFETTO_CHECK(msg_end > &buf[rd_] && msg_end <= &buf[wr_]); |
| auto msg_outer_len = static_cast<size_t>(msg_end - &buf[rd_]); |
| rd_ += msg_outer_len; |
| return msg; |
| } |
| |
| ProtoRingBuffer::ProtoRingBuffer() = default; |
| ProtoRingBuffer::~ProtoRingBuffer() = default; |
| |
| ProtoRingBuffer::Message ProtoRingBuffer::TryReadMessage(const uint8_t* start, |
| const uint8_t* end) { |
| return TryReadProtoMessage(start, end); |
| } |
| |
| } // namespace protozero |