blob: 3354efed9425de537a7d85371ac77e13ad5a65e7 [file] [log] [blame]
/*
* Copyright (C) 2021 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "src/trace_processor/rpc/proto_ring_buffer.h"
#include "perfetto/base/logging.h"
#include "perfetto/ext/base/paged_memory.h"
#include "perfetto/protozero/proto_utils.h"
namespace perfetto {
namespace trace_processor {
namespace {
constexpr size_t kGrowBytes = 128 * 1024;
inline ProtoRingBuffer::Message FramingError() {
ProtoRingBuffer::Message msg{};
msg.fatal_framing_error = true;
return msg;
}
// Tries to decode a length-delimited proto field from |start|.
// Returns a valid boundary if the preamble is valid and the length is within
// |end|, or an invalid message otherwise.
ProtoRingBuffer::Message TryReadMessage(const uint8_t* start,
const uint8_t* end) {
namespace proto_utils = protozero::proto_utils;
uint64_t field_tag = 0;
auto* start_of_len = proto_utils::ParseVarInt(start, end, &field_tag);
if (start_of_len == start)
return ProtoRingBuffer::Message{}; // Not enough data.
const uint32_t tag = field_tag & 0x07;
if (tag !=
static_cast<uint32_t>(proto_utils::ProtoWireType::kLengthDelimited)) {
PERFETTO_ELOG("RPC framing error, unexpected msg tag 0x%xu", tag);
return FramingError();
}
uint64_t msg_len = 0;
auto* start_of_msg = proto_utils::ParseVarInt(start_of_len, end, &msg_len);
if (start_of_msg == start_of_len)
return ProtoRingBuffer::Message{}; // Not enough data.
if (msg_len > ProtoRingBuffer::kMaxMsgSize) {
PERFETTO_ELOG("RPC framing error, message too large (%" PRIu64 " > %zu)",
msg_len, ProtoRingBuffer::kMaxMsgSize);
return FramingError();
}
if (start_of_msg + msg_len > end)
return ProtoRingBuffer::Message{}; // Not enough data.
ProtoRingBuffer::Message msg{};
msg.start = start_of_msg;
msg.len = static_cast<uint32_t>(msg_len);
msg.field_id = static_cast<uint32_t>(field_tag >> 3);
return msg;
}
} // namespace
ProtoRingBuffer::ProtoRingBuffer()
: buf_(base::PagedMemory::Allocate(kGrowBytes)) {}
ProtoRingBuffer::~ProtoRingBuffer() = default;
void ProtoRingBuffer::Append(const void* data_void, size_t data_len) {
if (failed_)
return;
const uint8_t* data = static_cast<const uint8_t*>(data_void);
PERFETTO_DCHECK(wr_ <= buf_.size());
PERFETTO_DCHECK(wr_ >= rd_);
// If the last call to ReadMessage() consumed all the data in the buffer and
// there are no incomplete messages pending, restart from the beginning rather
// than keep ringing. This is the most common case.
if (rd_ == wr_)
rd_ = wr_ = 0;
// The caller is expected to always issue a ReadMessage() after each Append().
PERFETTO_CHECK(!fastpath_.valid());
if (rd_ == wr_) {
auto msg = TryReadMessage(data, data + data_len);
if (msg.valid() && msg.end() == (data + data_len)) {
// Fastpath: in many cases, the underlying stream will effectively
// preserve the atomicity of messages for most small messages.
// In this case we can avoid the extra buf_ roundtrip and just pass a
// pointer to |data| + (proto preamble len).
// The next call to ReadMessage)= will return |fastpath_|.
fastpath_ = std::move(msg);
return;
}
}
size_t avail = buf_.size() - wr_;
if (data_len > avail) {
// This whole section should be hit extremely rare.
// Try first just recompacting the buffer by moving everything to the left.
// This can happen if we received "a message and a bit" on each Append call
// so we ended pup in a situation like:
// buf_: [unused space] [msg1 incomplete]
// ^rd_ ^wr_
//
// After recompaction:
// buf_: [msg1 incomplete]
// ^rd_ ^wr_
uint8_t* buf = static_cast<uint8_t*>(buf_.Get());
memmove(&buf[0], &buf[rd_], wr_ - rd_);
avail += rd_;
wr_ -= rd_;
rd_ = 0;
if (data_len > avail) {
// The compaction didn't free up enough space and we need to expand the
// ring buffer. Yes, we could have detected this earlier and split the
// code paths, rather than first compacting and then realizing it wasn't
// sufficient. However, that would make the code harder to reason about,
// creating code paths that are nearly never hit, hence making it more
// likely to accumulate bugs in future. All this is very rare.
size_t new_size = buf_.size();
while (data_len > new_size - wr_)
new_size += kGrowBytes;
if (new_size > kMaxMsgSize * 2) {
failed_ = true;
return;
}
auto new_buf = base::PagedMemory::Allocate(new_size);
memcpy(new_buf.Get(), buf_.Get(), buf_.size());
buf_ = std::move(new_buf);
avail = new_size - wr_;
// No need to touch rd_ / wr_ cursors.
}
}
// Append the received data at the end of the ring buffer.
uint8_t* buf = static_cast<uint8_t*>(buf_.Get());
memcpy(&buf[wr_], data, data_len);
wr_ += data_len;
}
ProtoRingBuffer::Message ProtoRingBuffer::ReadMessage() {
if (failed_)
return FramingError();
if (fastpath_.valid()) {
// The fastpath can only be hit when the buffer is empty.
PERFETTO_CHECK(rd_ == wr_);
auto msg = std::move(fastpath_);
fastpath_ = Message{};
return msg;
}
uint8_t* buf = static_cast<uint8_t*>(buf_.Get());
PERFETTO_DCHECK(rd_ <= wr_);
if (rd_ >= wr_)
return Message{}; // Completely empty.
auto msg = TryReadMessage(&buf[rd_], &buf[wr_]);
if (!msg.valid()) {
failed_ = failed_ || msg.fatal_framing_error;
return msg; // Return |msg| because it could be a framing error.
}
// Note: msg.start is > buf[rd_], because it skips the proto preamble.
PERFETTO_DCHECK(msg.start > &buf[rd_]);
const uint8_t* msg_end = msg.start + msg.len;
PERFETTO_CHECK(msg_end > &buf[rd_] && msg_end <= &buf[wr_]);
auto msg_outer_len = static_cast<size_t>(msg_end - &buf[rd_]);
rd_ += msg_outer_len;
return msg;
}
} // namespace trace_processor
} // namespace perfetto