tp: add TokenizedObjectStore for storing metadata while sorting events
This CL introduces TokenizedObjectStore, a variadic data structure
optimized for storing the metadata for events in a highly memory
efficient manner. This class is intended to be a replacement for
VariadicQueue and related classes due to being better tested and more
readable.
This class is implemented by building on top of BumpAllocator from a
previous CL and a follow-up CL will move TraceSorter to using this
logic.
Change-Id: I40302a7d23ace01880e155834ccfb3b665caca38
diff --git a/Android.bp b/Android.bp
index ec8d768..efb6b35 100644
--- a/Android.bp
+++ b/Android.bp
@@ -2019,6 +2019,7 @@
":perfetto_src_trace_processor_storage_storage",
":perfetto_src_trace_processor_tables_tables",
":perfetto_src_trace_processor_types_types",
+ ":perfetto_src_trace_processor_util_bump_allocator",
":perfetto_src_trace_processor_util_descriptors",
":perfetto_src_trace_processor_util_glob",
":perfetto_src_trace_processor_util_gzip",
@@ -9906,6 +9907,7 @@
name: "perfetto_src_trace_processor_sorter_sorter",
srcs: [
"src/trace_processor/sorter/trace_sorter.cc",
+ "src/trace_processor/sorter/trace_token_buffer.cc",
],
}
@@ -9915,6 +9917,7 @@
srcs: [
"src/trace_processor/sorter/trace_sorter_queue_unittest.cc",
"src/trace_processor/sorter/trace_sorter_unittest.cc",
+ "src/trace_processor/sorter/trace_token_buffer_unittest.cc",
],
}
@@ -12234,6 +12237,7 @@
":perfetto_src_trace_processor_storage_storage",
":perfetto_src_trace_processor_tables_tables",
":perfetto_src_trace_processor_types_types",
+ ":perfetto_src_trace_processor_util_bump_allocator",
":perfetto_src_trace_processor_util_descriptors",
":perfetto_src_trace_processor_util_glob",
":perfetto_src_trace_processor_util_gzip",
@@ -12445,6 +12449,7 @@
":perfetto_src_trace_processor_storage_storage",
":perfetto_src_trace_processor_tables_tables",
":perfetto_src_trace_processor_types_types",
+ ":perfetto_src_trace_processor_util_bump_allocator",
":perfetto_src_trace_processor_util_descriptors",
":perfetto_src_trace_processor_util_glob",
":perfetto_src_trace_processor_util_gzip",
diff --git a/BUILD b/BUILD
index 9ab91e9..d7f4d06 100644
--- a/BUILD
+++ b/BUILD
@@ -1825,6 +1825,8 @@
"src/trace_processor/sorter/trace_sorter.h",
"src/trace_processor/sorter/trace_sorter_internal.h",
"src/trace_processor/sorter/trace_sorter_queue.h",
+ "src/trace_processor/sorter/trace_token_buffer.cc",
+ "src/trace_processor/sorter/trace_token_buffer.h",
],
)
@@ -1986,6 +1988,15 @@
],
)
+# GN target: //src/trace_processor/util:bump_allocator
+perfetto_filegroup(
+ name = "src_trace_processor_util_bump_allocator",
+ srcs = [
+ "src/trace_processor/util/bump_allocator.cc",
+ "src/trace_processor/util/bump_allocator.h",
+ ],
+)
+
# GN target: //src/trace_processor/util:descriptors
perfetto_filegroup(
name = "src_trace_processor_util_descriptors",
@@ -4581,6 +4592,7 @@
":src_trace_processor_tables_tables",
":src_trace_processor_tables_tables_python",
":src_trace_processor_types_types",
+ ":src_trace_processor_util_bump_allocator",
":src_trace_processor_util_descriptors",
":src_trace_processor_util_glob",
":src_trace_processor_util_gzip",
@@ -4731,6 +4743,7 @@
":src_trace_processor_tables_tables",
":src_trace_processor_tables_tables_python",
":src_trace_processor_types_types",
+ ":src_trace_processor_util_bump_allocator",
":src_trace_processor_util_descriptors",
":src_trace_processor_util_glob",
":src_trace_processor_util_gzip",
@@ -4936,6 +4949,7 @@
":src_trace_processor_tables_tables",
":src_trace_processor_tables_tables_python",
":src_trace_processor_types_types",
+ ":src_trace_processor_util_bump_allocator",
":src_trace_processor_util_descriptors",
":src_trace_processor_util_glob",
":src_trace_processor_util_gzip",
diff --git a/include/perfetto/trace_processor/ref_counted.h b/include/perfetto/trace_processor/ref_counted.h
index 02996f0..d753151 100644
--- a/include/perfetto/trace_processor/ref_counted.h
+++ b/include/perfetto/trace_processor/ref_counted.h
@@ -92,6 +92,7 @@
// refcount. Callers *must* call |FromReleasedUnsafe| at a later date with
// this pointer to avoid memory leaks.
T* ReleaseUnsafe() {
+ PERFETTO_DCHECK(ptr_->refcount_ > 0);
auto* old_ptr = ptr_;
ptr_ = nullptr;
return old_ptr;
@@ -100,6 +101,7 @@
// Creates a RefPtr from a pointer returned by |ReleaseUnsafe|. Passing a
// pointer from any other source results in undefined behaviour.
static RefPtr<T> FromReleasedUnsafe(T* ptr) {
+ PERFETTO_DCHECK(ptr->refcount_ > 0);
RefPtr<T> res;
res.ptr_ = ptr;
return res;
@@ -136,6 +138,10 @@
bool operator==(const RefPtr<U>& rhs) const {
return ptr_ == rhs.ptr_;
}
+ template <typename U>
+ bool operator!=(const RefPtr<U>& rhs) const {
+ return !(*this == rhs);
+ }
T* get() const { return ptr_; }
T* operator->() const { return ptr_; }
diff --git a/include/perfetto/trace_processor/trace_blob_view.h b/include/perfetto/trace_processor/trace_blob_view.h
index f140866..3de4d7b 100644
--- a/include/perfetto/trace_processor/trace_blob_view.h
+++ b/include/perfetto/trace_processor/trace_blob_view.h
@@ -45,7 +45,7 @@
// - TraceBlob: writable, move-only, single-instance.
// - TraceBlobView: readable, copyable, multiple-instances can hold onto
// (different sub-slices of) the same refcounted TraceBlob.
-class TraceBlobView {
+class alignas(8) TraceBlobView {
public:
// Takes ownership of the passed |blob|.
static constexpr size_t kWholeBlob = std::numeric_limits<size_t>::max();
@@ -64,6 +64,11 @@
blob_.reset(new TraceBlob(std::move(blob)));
}
+ TraceBlobView(RefPtr<TraceBlob> blob, size_t offset, uint32_t length)
+ : blob_(std::move(blob)), data_(blob_->data() + offset), length_(length) {
+ PERFETTO_DCHECK(offset + length_ <= blob_->size());
+ }
+
// Trivial empty ctor.
TraceBlobView() : data_(nullptr), length_(0) {}
@@ -105,9 +110,10 @@
bool operator!=(const TraceBlobView& rhs) const { return !(*this == rhs); }
const uint8_t* data() const { return data_; }
- // TODO(primiano): normalize length() vs size() usage.
+ size_t offset() const { return static_cast<size_t>(data_ - blob_->data()); }
size_t length() const { return length_; }
size_t size() const { return length_; }
+ RefPtr<TraceBlob> blob() const { return blob_; }
private:
TraceBlobView(RefPtr<TraceBlob> blob, const uint8_t* data, uint32_t length)
diff --git a/src/trace_processor/importers/common/parser_types.h b/src/trace_processor/importers/common/parser_types.h
index 9d5976c..5794a64 100644
--- a/src/trace_processor/importers/common/parser_types.h
+++ b/src/trace_processor/importers/common/parser_types.h
@@ -26,14 +26,14 @@
namespace perfetto {
namespace trace_processor {
-struct InlineSchedSwitch {
+struct alignas(8) InlineSchedSwitch {
int64_t prev_state;
int32_t next_pid;
int32_t next_prio;
StringPool::Id next_comm;
};
-struct InlineSchedWaking {
+struct alignas(8) InlineSchedWaking {
int32_t pid;
int32_t target_cpu;
int32_t prio;
@@ -53,7 +53,15 @@
explicit TrackEventData(TracePacketData tpd)
: trace_packet_data(std::move(tpd)) {}
- static constexpr size_t kMaxNumExtraCounters = 8;
+ static constexpr uint8_t kMaxNumExtraCounters = 8;
+
+ uint8_t CountExtraCounterValues() const {
+ for (uint8_t i = 0; i < TrackEventData::kMaxNumExtraCounters; ++i) {
+ if (std::equal_to<double>()(extra_counter_values[i], 0))
+ return i;
+ }
+ return TrackEventData::kMaxNumExtraCounters;
+ }
TracePacketData trace_packet_data;
base::Optional<int64_t> thread_timestamp;
diff --git a/src/trace_processor/sorter/BUILD.gn b/src/trace_processor/sorter/BUILD.gn
index d1d3069..baacc96 100644
--- a/src/trace_processor/sorter/BUILD.gn
+++ b/src/trace_processor/sorter/BUILD.gn
@@ -24,6 +24,8 @@
"trace_sorter.h",
"trace_sorter_internal.h",
"trace_sorter_queue.h",
+ "trace_token_buffer.cc",
+ "trace_token_buffer.h",
]
deps = [
"../../../gn:default_deps",
@@ -35,6 +37,7 @@
"../importers/systrace:systrace_line",
"../storage",
"../types",
+ "../util:bump_allocator",
]
}
@@ -43,6 +46,7 @@
sources = [
"trace_sorter_queue_unittest.cc",
"trace_sorter_unittest.cc",
+ "trace_token_buffer_unittest.cc",
]
deps = [
":sorter",
diff --git a/src/trace_processor/sorter/trace_token_buffer.cc b/src/trace_processor/sorter/trace_token_buffer.cc
new file mode 100644
index 0000000..360041c
--- /dev/null
+++ b/src/trace_processor/sorter/trace_token_buffer.cc
@@ -0,0 +1,278 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/trace_processor/sorter/trace_token_buffer.h"
+
+#include <stdint.h>
+#include <algorithm>
+#include <cstdint>
+#include <cstring>
+#include <functional>
+#include <limits>
+#include <type_traits>
+#include <utility>
+
+#include "perfetto/base/compiler.h"
+#include "perfetto/ext/base/optional.h"
+#include "perfetto/trace_processor/trace_blob.h"
+#include "perfetto/trace_processor/trace_blob_view.h"
+#include "src/trace_processor/importers/common/parser_types.h"
+#include "src/trace_processor/util/bump_allocator.h"
+
+namespace perfetto {
+namespace trace_processor {
+namespace {
+
+struct alignas(8) TrackEventDataDescriptor {
+ static constexpr uint8_t kMaxOffsetFromInternedBlobBits = 25;
+ static constexpr uint32_t kMaxOffsetFromInternedBlob =
+ (1Ul << kMaxOffsetFromInternedBlobBits) - 1;
+
+ static constexpr uint8_t kMaxExtraCountersBits = 4;
+ static constexpr uint8_t kMaxExtraCounters = (1 << kMaxExtraCountersBits) - 1;
+ static_assert(TrackEventData::kMaxNumExtraCounters <= kMaxExtraCounters,
+ "Counter bits must be able to fit TrackEventData counters");
+
+ uint16_t intern_blob_index;
+ uint16_t intern_seq_index;
+ uint32_t intern_blob_offset : kMaxOffsetFromInternedBlobBits;
+ bool has_thread_timestamp : 1;
+ bool has_thread_instruction_count : 1;
+ bool has_counter_value : 1;
+ uint8_t extra_counter_count : kMaxExtraCountersBits;
+};
+static_assert(sizeof(TrackEventDataDescriptor) == 8,
+ "CompressedTracePacketData must be small");
+static_assert(alignof(TrackEventDataDescriptor) == 8,
+ "CompressedTracePacketData must be 8-aligned");
+
+template <typename T>
+T ExtractFromPtr(uint8_t** ptr) {
+ T* typed_ptr = reinterpret_cast<T*>(*ptr);
+ T value(std::move(*typed_ptr));
+ typed_ptr->~T();
+ *ptr += sizeof(T);
+ return value;
+}
+
+template <typename T>
+uint8_t* AppendToPtr(uint8_t* ptr, T value) {
+ new (ptr) T(std::move(value));
+ return ptr + sizeof(T);
+}
+
+uint32_t GetAllocSize(const TrackEventDataDescriptor& desc) {
+ uint32_t alloc_size = sizeof(TrackEventDataDescriptor);
+ alloc_size += desc.has_thread_instruction_count * sizeof(int64_t);
+ alloc_size += desc.has_thread_timestamp * sizeof(int64_t);
+ alloc_size += desc.has_counter_value * sizeof(double);
+ alloc_size += desc.extra_counter_count * sizeof(double);
+ return alloc_size;
+}
+
+} // namespace
+
+TraceTokenBuffer::Id TraceTokenBuffer::Append(TrackEventData ted) {
+ // TrackEventData (and TracePacketData) are two big contributors to the size
+ // of the peak memory usage by sorted. The main reasons for this are a) object
+ // padding and b) using more bits than necessary to store their contents.
+ //
+ // The purpose of this function is to "compress" the contents of
+ // TrackEventData by utilising techniques like bitpacking, interning and
+ // variable length encoding to ensure only the amount of data which really
+ // needs to be stored is done so.
+
+ // Compress all the booleans indicating the presence of a value into 4 bits
+ // instead of 4 bytes as they would take inside base::Optional.
+ TrackEventDataDescriptor desc;
+ desc.has_thread_instruction_count = ted.thread_instruction_count.has_value();
+ desc.has_thread_timestamp = ted.thread_timestamp.has_value();
+ desc.has_counter_value = std::not_equal_to<double>()(ted.counter_value, 0);
+ desc.extra_counter_count = ted.CountExtraCounterValues();
+
+ // Allocate enough memory using the BumpAllocator to store the data in |ted|.
+ // Also figure out the interned index.
+ BumpAllocator::AllocId alloc_id =
+ AllocAndResizeInternedVectors(GetAllocSize(desc));
+ InternedIndex interned_index = GetInternedIndex(alloc_id);
+
+ // Compute the interning information for the TrackBlob and the SequenceState.
+ const TracePacketData& tpd = ted.trace_packet_data;
+ desc.intern_blob_offset = InternTraceBlob(interned_index, tpd.packet);
+ desc.intern_blob_index =
+ static_cast<uint16_t>(interned_blobs_.at(interned_index).size() - 1);
+ desc.intern_seq_index =
+ InternSeqState(interned_index, std::move(tpd.sequence_state));
+
+ // Add the "optional" fields of TrackEventData based on whether or not they
+ // are non-null.
+ uint8_t* ptr = static_cast<uint8_t*>(allocator_.GetPointer(alloc_id));
+ ptr = AppendToPtr(ptr, desc);
+ if (desc.has_thread_instruction_count) {
+ ptr = AppendToPtr(ptr, ted.thread_instruction_count.value());
+ }
+ if (desc.has_thread_timestamp) {
+ ptr = AppendToPtr(ptr, ted.thread_timestamp.value());
+ }
+ if (desc.has_counter_value) {
+ ptr = AppendToPtr(ptr, ted.counter_value);
+ }
+ for (uint32_t i = 0; i < desc.extra_counter_count; ++i) {
+ ptr = AppendToPtr(ptr, ted.extra_counter_values[i]);
+ }
+
+ // Store the packet size in the "out of band" bits.
+ uint32_t packet_size = static_cast<uint32_t>(tpd.packet.size());
+ PERFETTO_CHECK(packet_size <= protozero::proto_utils::kMaxMessageLength);
+ return Id{alloc_id, packet_size};
+}
+
+template <>
+TrackEventData TraceTokenBuffer::Extract<TrackEventData>(Id id) {
+ uint8_t* ptr = static_cast<uint8_t*>(allocator_.GetPointer(id.alloc_id));
+ TrackEventDataDescriptor desc =
+ ExtractFromPtr<TrackEventDataDescriptor>(&ptr);
+
+ InternedIndex interned_index = GetInternedIndex(id.alloc_id);
+ BlobWithOffset& bwo =
+ interned_blobs_.at(interned_index)[desc.intern_blob_index];
+ TraceBlobView tbv(RefPtr<TraceBlob>::FromReleasedUnsafe(bwo.blob),
+ bwo.offset_in_blob + desc.intern_blob_offset,
+ id.out_of_band);
+ auto seq = RefPtr<PacketSequenceStateGeneration>::FromReleasedUnsafe(
+ interned_seqs_.at(interned_index)[desc.intern_seq_index]);
+
+ TrackEventData ted{std::move(tbv), std::move(seq)};
+ if (desc.has_thread_instruction_count) {
+ ted.thread_instruction_count = ExtractFromPtr<int64_t>(&ptr);
+ }
+ if (desc.has_thread_timestamp) {
+ ted.thread_timestamp = ExtractFromPtr<int64_t>(&ptr);
+ }
+ if (desc.has_counter_value) {
+ ted.counter_value = ExtractFromPtr<double>(&ptr);
+ }
+ for (uint32_t i = 0; i < desc.extra_counter_count; ++i) {
+ ted.extra_counter_values[i] = ExtractFromPtr<double>(&ptr);
+ }
+ allocator_.Free(id.alloc_id);
+ return ted;
+}
+
+uint32_t TraceTokenBuffer::InternTraceBlob(InternedIndex interned_index,
+ const TraceBlobView& tbv) {
+ BlobWithOffsets& blobs = interned_blobs_.at(interned_index);
+ if (blobs.empty()) {
+ return AddTraceBlob(interned_index, tbv);
+ }
+
+ BlobWithOffset& last_blob = blobs.back();
+ if (last_blob.blob != tbv.blob().get()) {
+ return AddTraceBlob(interned_index, tbv);
+ }
+ PERFETTO_CHECK(last_blob.offset_in_blob <= tbv.offset());
+
+ // To allow our offsets in the store to be 16 bits, we intern not only the
+ // TraceBlob pointer but also the offset. By having this double indirection,
+ // we can store offset always as uint16 at the cost of storing blobs here more
+ // often: this more than pays for itself as in the majority of cases the
+ // offsets are small anyway.
+ size_t rel_offset = tbv.offset() - last_blob.offset_in_blob;
+ if (rel_offset > TrackEventDataDescriptor::kMaxOffsetFromInternedBlob) {
+ return AddTraceBlob(interned_index, tbv);
+ }
+
+ // Intentionally "leak" this pointer. This essentially keeps the refcount
+ // of this TraceBlob one higher than the number of RefPtrs pointing to it.
+ // This allows avoid storing the same RefPtr n times.
+ //
+ // Calls to this function are paired to Extract<TrackEventData> which picks
+ // up this "leaked" pointer.
+ TraceBlob* leaked = tbv.blob().ReleaseUnsafe();
+ base::ignore_result(leaked);
+ return static_cast<uint32_t>(rel_offset);
+}
+
+uint16_t TraceTokenBuffer::InternSeqState(
+ InternedIndex interned_index,
+ RefPtr<PacketSequenceStateGeneration> ptr) {
+ // Look back at most 32 elements. This should be far enough in most cases
+ // unless either: a) we are essentially round-robining between >32 sequences
+ // b) we are churning through generations. Either case seems pathalogical.
+ SequenceStates& states = interned_seqs_.at(interned_index);
+ size_t lookback = std::min<size_t>(32u, states.size());
+ for (uint32_t i = 0; i < lookback; ++i) {
+ uint16_t idx = static_cast<uint16_t>(states.size() - 1 - i);
+ if (states[idx] == ptr.get()) {
+ // Intentionally "leak" this pointer. See |InternTraceBlob| for an
+ // explanation.
+ PacketSequenceStateGeneration* leaked = ptr.ReleaseUnsafe();
+ base::ignore_result(leaked);
+ return idx;
+ }
+ }
+ states.emplace_back(ptr.ReleaseUnsafe());
+ PERFETTO_CHECK(states.size() <= std::numeric_limits<uint16_t>::max());
+ return static_cast<uint16_t>(states.size() - 1);
+}
+
+uint32_t TraceTokenBuffer::AddTraceBlob(InternedIndex interned_index,
+ const TraceBlobView& tbv) {
+ BlobWithOffsets& blobs = interned_blobs_.at(interned_index);
+ blobs.emplace_back(BlobWithOffset{tbv.blob().ReleaseUnsafe(), tbv.offset()});
+ PERFETTO_CHECK(blobs.size() <= std::numeric_limits<uint16_t>::max());
+ return 0u;
+}
+
+void TraceTokenBuffer::FreeMemory() {
+ uint32_t erased = allocator_.EraseFrontFreeChunks();
+ interned_blobs_.erase_front(erased);
+ interned_seqs_.erase_front(erased);
+ PERFETTO_DCHECK(interned_blobs_.size() == interned_seqs_.size());
+}
+
+BumpAllocator::AllocId TraceTokenBuffer::AllocAndResizeInternedVectors(
+ uint32_t size) {
+ uint32_t erased = allocator_.erased_front_chunks_count();
+ BumpAllocator::AllocId alloc_id = allocator_.Alloc(size);
+ uint32_t allocator_chunks_size = alloc_id.chunk_index - erased + 1;
+
+ // The allocator should never "remove" chunks from being tracked.
+ PERFETTO_DCHECK(allocator_chunks_size >= interned_blobs_.size());
+
+ // We should add at most one chunk in the allocator.
+ size_t chunks_added = allocator_chunks_size - interned_blobs_.size();
+ PERFETTO_DCHECK(chunks_added <= 1);
+ PERFETTO_DCHECK(interned_blobs_.size() == interned_seqs_.size());
+ for (uint32_t i = 0; i < chunks_added; ++i) {
+ interned_blobs_.emplace_back();
+ interned_seqs_.emplace_back();
+ }
+ return alloc_id;
+}
+
+TraceTokenBuffer::InternedIndex TraceTokenBuffer::GetInternedIndex(
+ BumpAllocator::AllocId alloc_id) {
+ uint32_t interned_index =
+ alloc_id.chunk_index - allocator_.erased_front_chunks_count();
+ PERFETTO_DCHECK(interned_index < interned_blobs_.size());
+ PERFETTO_DCHECK(interned_index < interned_seqs_.size());
+ PERFETTO_DCHECK(interned_blobs_.size() == interned_seqs_.size());
+ return interned_index;
+}
+
+} // namespace trace_processor
+} // namespace perfetto
diff --git a/src/trace_processor/sorter/trace_token_buffer.h b/src/trace_processor/sorter/trace_token_buffer.h
new file mode 100644
index 0000000..47eec13
--- /dev/null
+++ b/src/trace_processor/sorter/trace_token_buffer.h
@@ -0,0 +1,157 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACE_PROCESSOR_SORTER_TRACE_TOKEN_BUFFER_H_
+#define SRC_TRACE_PROCESSOR_SORTER_TRACE_TOKEN_BUFFER_H_
+
+#include <cstdint>
+#include <limits>
+#include <utility>
+#include <vector>
+
+#include "perfetto/base/compiler.h"
+#include "perfetto/ext/base/circular_queue.h"
+#include "perfetto/ext/base/optional.h"
+#include "perfetto/ext/base/utils.h"
+#include "perfetto/trace_processor/trace_blob.h"
+#include "perfetto/trace_processor/trace_blob_view.h"
+#include "src/trace_processor/importers/common/parser_types.h"
+#include "src/trace_processor/util/bump_allocator.h"
+
+namespace perfetto {
+namespace trace_processor {
+
+// Helper class which stores tokenized objects while the corresponding events
+// are being sorted by TraceSorter.
+//
+// This class intrusively compresses the tokenized objects as much as possible
+// to reduce their memory footprint. This is important to reduce the peak memory
+// usage of TraceProcessor which is always hit at some point during sorting.
+// The tokenized objects make up the vast majority of this peak so we trade the
+// complexity in this class for big reductions in the peak use.
+//
+// go/perfetto-tp-memory-use gives an overview of trace processor memory usage.
+class TraceTokenBuffer {
+ public:
+ // Identifier returned when appending items to this buffer. This id can
+ // later by passed to |Extract| to retrieve the event.
+ struct Id {
+ // The number of bits available for store data "out-of-band" (i.e. by
+ // bitpacking in free bits available in another data structure). The value
+ // of this is set based on the available bits in
+ // TraceSorter::TimestampedDescriptor.
+ static constexpr uint32_t kOutOfBandBits = 28;
+ static constexpr uint32_t kMaxOutOfBandValue = (1u << kOutOfBandBits) - 1;
+
+ // The allocation id of the object in the buffer.
+ BumpAllocator::AllocId alloc_id;
+
+ // "Out-of-band" data for this object. This allows squeezing as much memory
+ // as possible by using free bits in other data structures to store parts
+ // of tokens.
+ // This size of this field (i.e. kOutOfBandBits) is chosen to match the
+ // available bits in TraceSorter: there is a static_assert in
+ // TraceSorter::TimestampedDescriptor.
+ uint32_t out_of_band : kOutOfBandBits;
+ };
+ static_assert(protozero::proto_utils::kMaxMessageLength <=
+ Id::kMaxOutOfBandValue,
+ "Packet length must fit in out-of-band field");
+
+ // Appends an object of type |T| to the token buffer. Returns an id for
+ // looking up the object later using |Extract|.
+ template <typename T>
+ PERFETTO_WARN_UNUSED_RESULT Id Append(T object) {
+ static_assert(sizeof(T) % 8 == 0, "Size must be a multiple of 8");
+ static_assert(alignof(T) == 8, "Alignment must be 8");
+ BumpAllocator::AllocId id = AllocAndResizeInternedVectors(sizeof(T));
+ new (allocator_.GetPointer(id)) T(std::move(object));
+ return Id{id, 0};
+ }
+ PERFETTO_WARN_UNUSED_RESULT Id Append(TrackEventData);
+ PERFETTO_WARN_UNUSED_RESULT Id Append(TracePacketData data) {
+ // While in theory we could add a special case for TracePacketData, the
+ // judgement call we make is that the code complexity does not justify the
+ // micro-performance gain you might hope to see by avoiding the few if
+ // conditions in the |TracePacketData| path.
+ return Append(TrackEventData(std::move(data)));
+ }
+
+ // Extracts an object of type |T| from the token buffer using an id previously
+ // returned by |Append|. This type *must* match the type added using Append.
+ // Mismatching types will caused undefined behaviour.
+ template <typename T>
+ PERFETTO_WARN_UNUSED_RESULT T Extract(Id id) {
+ T* typed_ptr = static_cast<T*>(allocator_.GetPointer(id.alloc_id));
+ T object(std::move(*typed_ptr));
+ typed_ptr->~T();
+ allocator_.Free(id.alloc_id);
+ return object;
+ }
+
+ // Returns the "past-the-end" id from the underlying allocator.
+ // The main use of this function is to provide an id which is greater than
+ // all ids previously returned by |Append|.
+ //
+ // This is similar to the |end()| function in standard library vector classes.
+ BumpAllocator::AllocId PastTheEndAllocId() {
+ return allocator_.PastTheEndId();
+ }
+
+ // Attempts to free any memory retained by this buffer and the underlying
+ // allocator. The amount of memory free is implementation defined.
+ void FreeMemory();
+
+ private:
+ struct BlobWithOffset {
+ TraceBlob* blob;
+ size_t offset_in_blob;
+ };
+ using InternedIndex = uint32_t;
+ using BlobWithOffsets = std::vector<BlobWithOffset>;
+ using SequenceStates = std::vector<PacketSequenceStateGeneration*>;
+
+ // Functions to intern TraceBlob and PacketSequenceStateGeneration: as these
+ // are often shared between packets, we can significantly reduce memory use
+ // by only storing them once.
+ uint32_t InternTraceBlob(InternedIndex, const TraceBlobView&);
+ uint16_t InternSeqState(InternedIndex, RefPtr<PacketSequenceStateGeneration>);
+ uint32_t AddTraceBlob(InternedIndex, const TraceBlobView&);
+
+ BumpAllocator::AllocId AllocAndResizeInternedVectors(uint32_t size);
+ InternedIndex GetInternedIndex(BumpAllocator::AllocId);
+
+ BumpAllocator allocator_;
+ base::CircularQueue<BlobWithOffsets> interned_blobs_;
+ base::CircularQueue<SequenceStates> interned_seqs_;
+};
+
+// GCC7 does not like us declaring these inside the class so define these
+// out-of-line.
+template <>
+PERFETTO_WARN_UNUSED_RESULT TrackEventData
+ TraceTokenBuffer::Extract<TrackEventData>(Id);
+template <>
+PERFETTO_WARN_UNUSED_RESULT inline TracePacketData
+TraceTokenBuffer::Extract<TracePacketData>(Id id) {
+ // See the comment in Append(TracePacketData) for why we do this.
+ return Extract<TrackEventData>(id).trace_packet_data;
+}
+
+} // namespace trace_processor
+} // namespace perfetto
+
+#endif // SRC_TRACE_PROCESSOR_SORTER_TRACE_TOKEN_BUFFER_H_
diff --git a/src/trace_processor/sorter/trace_token_buffer_unittest.cc b/src/trace_processor/sorter/trace_token_buffer_unittest.cc
new file mode 100644
index 0000000..6689dc5
--- /dev/null
+++ b/src/trace_processor/sorter/trace_token_buffer_unittest.cc
@@ -0,0 +1,172 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/trace_processor/sorter/trace_token_buffer.h"
+
+#include "perfetto/base/compiler.h"
+#include "perfetto/ext/base/optional.h"
+#include "perfetto/trace_processor/trace_blob.h"
+#include "perfetto/trace_processor/trace_blob_view.h"
+#include "src/trace_processor/importers/common/parser_types.h"
+#include "src/trace_processor/importers/proto/packet_sequence_state.h"
+#include "src/trace_processor/types/trace_processor_context.h"
+#include "test/gtest_and_gmock.h"
+
+namespace perfetto {
+namespace trace_processor {
+namespace {
+
+class TraceTokenBufferUnittest : public testing::Test {
+ protected:
+ TraceTokenBuffer store;
+ TraceProcessorContext context;
+ PacketSequenceState state{&context};
+};
+
+TEST_F(TraceTokenBufferUnittest, TracePacketDataInOut) {
+ TraceBlobView tbv(TraceBlob::Allocate(1024));
+ TracePacketData tpd{tbv.copy(), state.current_generation()};
+
+ TraceTokenBuffer::Id id = store.Append(std::move(tpd));
+ ASSERT_EQ(id.out_of_band, 1024u);
+
+ TracePacketData extracted = store.Extract<TracePacketData>(id);
+ ASSERT_EQ(extracted.packet, tbv);
+ ASSERT_EQ(extracted.sequence_state, state.current_generation());
+}
+
+TEST_F(TraceTokenBufferUnittest, PacketAppendMultipleBlobs) {
+ TraceBlobView tbv_1(TraceBlob::Allocate(1024));
+ TraceBlobView tbv_2(TraceBlob::Allocate(2048));
+ TraceBlobView tbv_3(TraceBlob::Allocate(4096));
+
+ TraceTokenBuffer::Id id_1 =
+ store.Append(TracePacketData{tbv_1.copy(), state.current_generation()});
+ TraceTokenBuffer::Id id_2 =
+ store.Append(TracePacketData{tbv_2.copy(), state.current_generation()});
+ ASSERT_EQ(store.Extract<TracePacketData>(id_1).packet, tbv_1);
+ ASSERT_EQ(store.Extract<TracePacketData>(id_2).packet, tbv_2);
+
+ TraceTokenBuffer::Id id_3 =
+ store.Append(TracePacketData{tbv_3.copy(), state.current_generation()});
+ ASSERT_EQ(store.Extract<TracePacketData>(id_3).packet, tbv_3);
+}
+
+TEST_F(TraceTokenBufferUnittest, BlobSharing) {
+ TraceBlobView root(TraceBlob::Allocate(2048));
+ TraceBlobView tbv_1 = root.slice_off(0, 1024);
+ TraceBlobView tbv_2 = root.slice_off(1024, 512);
+ TraceBlobView tbv_3 = root.slice_off(1536, 512);
+
+ TraceTokenBuffer::Id id_1 =
+ store.Append(TracePacketData{tbv_1.copy(), state.current_generation()});
+ TraceTokenBuffer::Id id_2 =
+ store.Append(TracePacketData{tbv_2.copy(), state.current_generation()});
+ ASSERT_EQ(store.Extract<TracePacketData>(id_1).packet, tbv_1);
+ ASSERT_EQ(store.Extract<TracePacketData>(id_2).packet, tbv_2);
+
+ TraceTokenBuffer::Id id_3 =
+ store.Append(TracePacketData{tbv_3.copy(), state.current_generation()});
+ ASSERT_EQ(store.Extract<TracePacketData>(id_3).packet, tbv_3);
+}
+
+TEST_F(TraceTokenBufferUnittest, SequenceStateSharing) {
+ TraceBlobView root(TraceBlob::Allocate(2048));
+ TraceBlobView tbv_1 = root.slice_off(0, 1024);
+ TraceBlobView tbv_2 = root.slice_off(1024, 512);
+
+ TraceTokenBuffer::Id id_1 =
+ store.Append(TracePacketData{tbv_1.copy(), state.current_generation()});
+ TraceTokenBuffer::Id id_2 =
+ store.Append(TracePacketData{tbv_2.copy(), state.current_generation()});
+ ASSERT_EQ(store.Extract<TracePacketData>(id_1).sequence_state,
+ state.current_generation());
+ ASSERT_EQ(store.Extract<TracePacketData>(id_2).sequence_state,
+ state.current_generation());
+}
+
+TEST_F(TraceTokenBufferUnittest, ManySequenceState) {
+ TraceBlobView root(TraceBlob::Allocate(1024));
+
+ std::array<TraceTokenBuffer::Id, 1024> ids;
+ std::array<PacketSequenceStateGeneration*, 1024> refs;
+ for (uint32_t i = 0; i < 1024; ++i) {
+ refs[i] = state.current_generation().get();
+ ids[i] = store.Append(
+ TracePacketData{root.slice_off(i, 1), state.current_generation()});
+ state.OnIncrementalStateCleared();
+ }
+
+ for (uint32_t i = 0; i < 1024; ++i) {
+ ASSERT_EQ(refs[i],
+ store.Extract<TracePacketData>(ids[i]).sequence_state.get());
+ }
+}
+
+TEST_F(TraceTokenBufferUnittest, PacketLargeOffset) {
+ TraceBlobView tbv(TraceBlob::Allocate(256ul * 1024));
+
+ TraceBlobView slice_1 = tbv.slice_off(0, 1024ul);
+ TraceTokenBuffer::Id id_1 =
+ store.Append(TracePacketData{slice_1.copy(), state.current_generation()});
+ TracePacketData out_1 = store.Extract<TracePacketData>(id_1);
+ ASSERT_EQ(out_1.packet, slice_1);
+ ASSERT_EQ(out_1.sequence_state, state.current_generation());
+
+ TraceBlobView slice_2 = tbv.slice_off(128ul * 1024, 1024ul);
+ TraceTokenBuffer::Id id_2 =
+ store.Append(TracePacketData{slice_2.copy(), state.current_generation()});
+ TracePacketData out_2 = store.Extract<TracePacketData>(id_2);
+ ASSERT_EQ(out_2.packet, slice_2);
+ ASSERT_EQ(out_2.sequence_state, state.current_generation());
+}
+
+TEST_F(TraceTokenBufferUnittest, TrackEventDataInOut) {
+ TraceBlobView tbv(TraceBlob::Allocate(1234));
+ TrackEventData ted(tbv.copy(), state.current_generation());
+ ted.thread_instruction_count = 123;
+ ted.extra_counter_values = {10, 2, 0, 0, 0, 0, 0, 0};
+ auto counter_array = ted.extra_counter_values;
+
+ TraceTokenBuffer::Id id = store.Append(std::move(ted));
+ ASSERT_EQ(id.out_of_band, 1234u);
+
+ TrackEventData extracted = store.Extract<TrackEventData>(id);
+ ASSERT_EQ(extracted.trace_packet_data.packet, tbv);
+ ASSERT_EQ(extracted.trace_packet_data.sequence_state,
+ state.current_generation());
+ ASSERT_EQ(extracted.thread_instruction_count, 123);
+ ASSERT_EQ(extracted.thread_timestamp, base::nullopt);
+ ASSERT_DOUBLE_EQ(extracted.counter_value, 0.0);
+ ASSERT_EQ(extracted.extra_counter_values, counter_array);
+}
+
+TEST_F(TraceTokenBufferUnittest, ExtractOrAppendAfterFreeMemory) {
+ auto unused_res = store.Extract<TraceBlobView>(
+ store.Append(TraceBlobView(TraceBlob::Allocate(1234))));
+ base::ignore_result(unused_res);
+
+ store.FreeMemory();
+
+ TraceTokenBuffer::Id id =
+ store.Append(TraceBlobView(TraceBlob::Allocate(4567)));
+ TraceBlobView tbv = store.Extract<TraceBlobView>(id);
+ ASSERT_EQ(tbv.size(), 4567u);
+}
+
+} // namespace
+} // namespace trace_processor
+} // namespace perfetto