| /* | 
 |  * Copyright (C) 2021 The Android Open Source Project | 
 |  * | 
 |  * Licensed under the Apache License, Version 2.0 (the "License"); | 
 |  * you may not use this file except in compliance with the License. | 
 |  * You may obtain a copy of the License at | 
 |  * | 
 |  *      http://www.apache.org/licenses/LICENSE-2.0 | 
 |  * | 
 |  * Unless required by applicable law or agreed to in writing, software | 
 |  * distributed under the License is distributed on an "AS IS" BASIS, | 
 |  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
 |  * See the License for the specific language governing permissions and | 
 |  * limitations under the License. | 
 |  */ | 
 |  | 
 | #include "src/protozero/proto_ring_buffer.h" | 
 |  | 
 | #include "perfetto/base/logging.h" | 
 | #include "perfetto/ext/base/paged_memory.h" | 
 | #include "perfetto/protozero/proto_utils.h" | 
 |  | 
 | namespace protozero { | 
 |  | 
 | namespace { | 
 | constexpr size_t kGrowBytes = 128 * 1024; | 
 |  | 
 | inline ProtoRingBuffer::Message FramingError() { | 
 |   ProtoRingBuffer::Message msg{}; | 
 |   msg.fatal_framing_error = true; | 
 |   return msg; | 
 | } | 
 |  | 
 | // Tries to decode a length-delimited proto field from |start|. | 
 | // Returns a valid boundary if the preamble is valid and the length is within | 
 | // |end|, or an invalid message otherwise. | 
 | ProtoRingBuffer::Message TryReadProtoMessage(const uint8_t* start, | 
 |                                              const uint8_t* end) { | 
 |   namespace proto_utils = protozero::proto_utils; | 
 |   uint64_t field_tag = 0; | 
 |   auto* start_of_len = proto_utils::ParseVarInt(start, end, &field_tag); | 
 |   if (start_of_len == start) | 
 |     return ProtoRingBuffer::Message{};  // Not enough data. | 
 |  | 
 |   const uint32_t tag = field_tag & 0x07; | 
 |   if (tag != | 
 |       static_cast<uint32_t>(proto_utils::ProtoWireType::kLengthDelimited)) { | 
 |     PERFETTO_ELOG("RPC framing error, unexpected msg tag 0x%xu", tag); | 
 |     return FramingError(); | 
 |   } | 
 |  | 
 |   uint64_t msg_len = 0; | 
 |   auto* start_of_msg = proto_utils::ParseVarInt(start_of_len, end, &msg_len); | 
 |   if (start_of_msg == start_of_len) | 
 |     return ProtoRingBuffer::Message{};  // Not enough data. | 
 |  | 
 |   if (msg_len > ProtoRingBuffer::kMaxMsgSize) { | 
 |     PERFETTO_ELOG("RPC framing error, message too large (%" PRIu64 " > %zu)", | 
 |                   msg_len, ProtoRingBuffer::kMaxMsgSize); | 
 |     return FramingError(); | 
 |   } | 
 |  | 
 |   if (start_of_msg + msg_len > end) | 
 |     return ProtoRingBuffer::Message{};  // Not enough data. | 
 |  | 
 |   ProtoRingBuffer::Message msg{}; | 
 |   msg.start = start_of_msg; | 
 |   msg.len = static_cast<uint32_t>(msg_len); | 
 |   msg.field_id = static_cast<uint32_t>(field_tag >> 3); | 
 |   return msg; | 
 | } | 
 |  | 
 | }  // namespace | 
 |  | 
 | RingBufferMessageReader::RingBufferMessageReader() | 
 |     : buf_(perfetto::base::PagedMemory::Allocate(kGrowBytes)) {} | 
 | RingBufferMessageReader::~RingBufferMessageReader() = default; | 
 |  | 
 | void RingBufferMessageReader::Append(const void* data_void, size_t data_len) { | 
 |   if (failed_) | 
 |     return; | 
 |   const uint8_t* data = static_cast<const uint8_t*>(data_void); | 
 |   PERFETTO_DCHECK(wr_ <= buf_.size()); | 
 |   PERFETTO_DCHECK(wr_ >= rd_); | 
 |  | 
 |   // If the last call to ReadMessage() consumed all the data in the buffer and | 
 |   // there are no incomplete messages pending, restart from the beginning rather | 
 |   // than keep ringing. This is the most common case. | 
 |   if (rd_ == wr_) | 
 |     rd_ = wr_ = 0; | 
 |  | 
 |   // The caller is expected to always issue a ReadMessage() after each Append(). | 
 |   PERFETTO_CHECK(!fastpath_.valid()); | 
 |   if (rd_ == wr_) { | 
 |     auto msg = TryReadMessage(data, data + data_len); | 
 |     if (msg.valid() && msg.end() == (data + data_len)) { | 
 |       // Fastpath: in many cases, the underlying stream will effectively | 
 |       // preserve the atomicity of messages for most small messages. | 
 |       // In this case we can avoid the extra buf_ roundtrip and just pass a | 
 |       // pointer to |data| + (proto preamble len). | 
 |       // The next call to ReadMessage)= will return |fastpath_|. | 
 |       fastpath_ = std::move(msg); | 
 |       return; | 
 |     } | 
 |   } | 
 |  | 
 |   size_t avail = buf_.size() - wr_; | 
 |   if (data_len > avail) { | 
 |     // This whole section should be hit extremely rarely. | 
 |  | 
 |     // Try first just recompacting the buffer by moving everything to the left. | 
 |     // This can happen if we received "a message and a bit" on each Append call | 
 |     // so we ended pup in a situation like: | 
 |     // buf_: [unused space] [msg1 incomplete] | 
 |     //                      ^rd_             ^wr_ | 
 |     // | 
 |     // After recompaction: | 
 |     // buf_: [msg1 incomplete] | 
 |     //       ^rd_             ^wr_ | 
 |     uint8_t* buf = static_cast<uint8_t*>(buf_.Get()); | 
 |     memmove(&buf[0], &buf[rd_], wr_ - rd_); | 
 |     avail += rd_; | 
 |     wr_ -= rd_; | 
 |     rd_ = 0; | 
 |     if (data_len > avail) { | 
 |       // The compaction didn't free up enough space and we need to expand the | 
 |       // ring buffer. Yes, we could have detected this earlier and split the | 
 |       // code paths, rather than first compacting and then realizing it wasn't | 
 |       // sufficient. However, that would make the code harder to reason about, | 
 |       // creating code paths that are nearly never hit, hence making it more | 
 |       // likely to accumulate bugs in future. All this is very rare. | 
 |       size_t new_size = buf_.size(); | 
 |       while (data_len > new_size - wr_) | 
 |         new_size += kGrowBytes; | 
 |       if (new_size > kMaxMsgSize * 2) { | 
 |         failed_ = true; | 
 |         return; | 
 |       } | 
 |       auto new_buf = perfetto::base::PagedMemory::Allocate(new_size); | 
 |       memcpy(new_buf.Get(), buf_.Get(), buf_.size()); | 
 |       buf_ = std::move(new_buf); | 
 |       avail = new_size - wr_; | 
 |       // No need to touch rd_ / wr_ cursors. | 
 |     } | 
 |   } | 
 |  | 
 |   // Append the received data at the end of the ring buffer. | 
 |   uint8_t* buf = static_cast<uint8_t*>(buf_.Get()); | 
 |   memcpy(&buf[wr_], data, data_len); | 
 |   wr_ += data_len; | 
 | } | 
 |  | 
 | RingBufferMessageReader::Message RingBufferMessageReader::ReadMessage() { | 
 |   if (failed_) | 
 |     return FramingError(); | 
 |  | 
 |   if (fastpath_.valid()) { | 
 |     // The fastpath can only be hit when the buffer is empty. | 
 |     PERFETTO_CHECK(rd_ == wr_); | 
 |     auto msg = std::move(fastpath_); | 
 |     fastpath_ = Message{}; | 
 |     return msg; | 
 |   } | 
 |  | 
 |   uint8_t* buf = static_cast<uint8_t*>(buf_.Get()); | 
 |  | 
 |   PERFETTO_DCHECK(rd_ <= wr_); | 
 |   if (rd_ >= wr_) | 
 |     return Message{};  // Completely empty. | 
 |  | 
 |   auto msg = TryReadMessage(&buf[rd_], &buf[wr_]); | 
 |   if (!msg.valid()) { | 
 |     failed_ = failed_ || msg.fatal_framing_error; | 
 |     return msg;  // Return |msg| because it could be a framing error. | 
 |   } | 
 |  | 
 |   const uint8_t* msg_end = msg.start + msg.len; | 
 |   PERFETTO_CHECK(msg_end > &buf[rd_] && msg_end <= &buf[wr_]); | 
 |   auto msg_outer_len = static_cast<size_t>(msg_end - &buf[rd_]); | 
 |   rd_ += msg_outer_len; | 
 |   return msg; | 
 | } | 
 |  | 
 | ProtoRingBuffer::ProtoRingBuffer() = default; | 
 | ProtoRingBuffer::~ProtoRingBuffer() = default; | 
 |  | 
 | ProtoRingBuffer::Message ProtoRingBuffer::TryReadMessage(const uint8_t* start, | 
 |                                                          const uint8_t* end) { | 
 |   return TryReadProtoMessage(start, end); | 
 | } | 
 |  | 
 | }  // namespace protozero |