tp: add TokenizedObjectStore for storing metadata while sorting events

This CL introduces TokenizedObjectStore, a variadic data structure
optimized for storing the metadata for events in a highly memory
efficient manner. This class is intended to be a replacement for
VariadicQueue and related classes due to being better tested and more
readable.

This class is implemented by building on top of BumpAllocator from a
previous CL and a follow-up CL will move TraceSorter to using this
logic.

Change-Id: I40302a7d23ace01880e155834ccfb3b665caca38
diff --git a/Android.bp b/Android.bp
index ec8d768..efb6b35 100644
--- a/Android.bp
+++ b/Android.bp
@@ -2019,6 +2019,7 @@
         ":perfetto_src_trace_processor_storage_storage",
         ":perfetto_src_trace_processor_tables_tables",
         ":perfetto_src_trace_processor_types_types",
+        ":perfetto_src_trace_processor_util_bump_allocator",
         ":perfetto_src_trace_processor_util_descriptors",
         ":perfetto_src_trace_processor_util_glob",
         ":perfetto_src_trace_processor_util_gzip",
@@ -9906,6 +9907,7 @@
     name: "perfetto_src_trace_processor_sorter_sorter",
     srcs: [
         "src/trace_processor/sorter/trace_sorter.cc",
+        "src/trace_processor/sorter/trace_token_buffer.cc",
     ],
 }
 
@@ -9915,6 +9917,7 @@
     srcs: [
         "src/trace_processor/sorter/trace_sorter_queue_unittest.cc",
         "src/trace_processor/sorter/trace_sorter_unittest.cc",
+        "src/trace_processor/sorter/trace_token_buffer_unittest.cc",
     ],
 }
 
@@ -12234,6 +12237,7 @@
         ":perfetto_src_trace_processor_storage_storage",
         ":perfetto_src_trace_processor_tables_tables",
         ":perfetto_src_trace_processor_types_types",
+        ":perfetto_src_trace_processor_util_bump_allocator",
         ":perfetto_src_trace_processor_util_descriptors",
         ":perfetto_src_trace_processor_util_glob",
         ":perfetto_src_trace_processor_util_gzip",
@@ -12445,6 +12449,7 @@
         ":perfetto_src_trace_processor_storage_storage",
         ":perfetto_src_trace_processor_tables_tables",
         ":perfetto_src_trace_processor_types_types",
+        ":perfetto_src_trace_processor_util_bump_allocator",
         ":perfetto_src_trace_processor_util_descriptors",
         ":perfetto_src_trace_processor_util_glob",
         ":perfetto_src_trace_processor_util_gzip",
diff --git a/BUILD b/BUILD
index 9ab91e9..d7f4d06 100644
--- a/BUILD
+++ b/BUILD
@@ -1825,6 +1825,8 @@
         "src/trace_processor/sorter/trace_sorter.h",
         "src/trace_processor/sorter/trace_sorter_internal.h",
         "src/trace_processor/sorter/trace_sorter_queue.h",
+        "src/trace_processor/sorter/trace_token_buffer.cc",
+        "src/trace_processor/sorter/trace_token_buffer.h",
     ],
 )
 
@@ -1986,6 +1988,15 @@
     ],
 )
 
+# GN target: //src/trace_processor/util:bump_allocator
+perfetto_filegroup(
+    name = "src_trace_processor_util_bump_allocator",
+    srcs = [
+        "src/trace_processor/util/bump_allocator.cc",
+        "src/trace_processor/util/bump_allocator.h",
+    ],
+)
+
 # GN target: //src/trace_processor/util:descriptors
 perfetto_filegroup(
     name = "src_trace_processor_util_descriptors",
@@ -4581,6 +4592,7 @@
         ":src_trace_processor_tables_tables",
         ":src_trace_processor_tables_tables_python",
         ":src_trace_processor_types_types",
+        ":src_trace_processor_util_bump_allocator",
         ":src_trace_processor_util_descriptors",
         ":src_trace_processor_util_glob",
         ":src_trace_processor_util_gzip",
@@ -4731,6 +4743,7 @@
         ":src_trace_processor_tables_tables",
         ":src_trace_processor_tables_tables_python",
         ":src_trace_processor_types_types",
+        ":src_trace_processor_util_bump_allocator",
         ":src_trace_processor_util_descriptors",
         ":src_trace_processor_util_glob",
         ":src_trace_processor_util_gzip",
@@ -4936,6 +4949,7 @@
         ":src_trace_processor_tables_tables",
         ":src_trace_processor_tables_tables_python",
         ":src_trace_processor_types_types",
+        ":src_trace_processor_util_bump_allocator",
         ":src_trace_processor_util_descriptors",
         ":src_trace_processor_util_glob",
         ":src_trace_processor_util_gzip",
diff --git a/include/perfetto/trace_processor/ref_counted.h b/include/perfetto/trace_processor/ref_counted.h
index 02996f0..d753151 100644
--- a/include/perfetto/trace_processor/ref_counted.h
+++ b/include/perfetto/trace_processor/ref_counted.h
@@ -92,6 +92,7 @@
   // refcount. Callers *must* call |FromReleasedUnsafe| at a later date with
   // this pointer to avoid memory leaks.
   T* ReleaseUnsafe() {
+    PERFETTO_DCHECK(ptr_->refcount_ > 0);
     auto* old_ptr = ptr_;
     ptr_ = nullptr;
     return old_ptr;
@@ -100,6 +101,7 @@
   // Creates a RefPtr from a pointer returned by |ReleaseUnsafe|. Passing a
   // pointer from any other source results in undefined behaviour.
   static RefPtr<T> FromReleasedUnsafe(T* ptr) {
+    PERFETTO_DCHECK(ptr->refcount_ > 0);
     RefPtr<T> res;
     res.ptr_ = ptr;
     return res;
@@ -136,6 +138,10 @@
   bool operator==(const RefPtr<U>& rhs) const {
     return ptr_ == rhs.ptr_;
   }
+  template <typename U>
+  bool operator!=(const RefPtr<U>& rhs) const {
+    return !(*this == rhs);
+  }
 
   T* get() const { return ptr_; }
   T* operator->() const { return ptr_; }
diff --git a/include/perfetto/trace_processor/trace_blob_view.h b/include/perfetto/trace_processor/trace_blob_view.h
index f140866..3de4d7b 100644
--- a/include/perfetto/trace_processor/trace_blob_view.h
+++ b/include/perfetto/trace_processor/trace_blob_view.h
@@ -45,7 +45,7 @@
 //  - TraceBlob: writable, move-only, single-instance.
 //  - TraceBlobView: readable, copyable, multiple-instances can hold onto
 //                   (different sub-slices of) the same refcounted TraceBlob.
-class TraceBlobView {
+class alignas(8) TraceBlobView {
  public:
   // Takes ownership of the passed |blob|.
   static constexpr size_t kWholeBlob = std::numeric_limits<size_t>::max();
@@ -64,6 +64,11 @@
     blob_.reset(new TraceBlob(std::move(blob)));
   }
 
+  TraceBlobView(RefPtr<TraceBlob> blob, size_t offset, uint32_t length)
+      : blob_(std::move(blob)), data_(blob_->data() + offset), length_(length) {
+    PERFETTO_DCHECK(offset + length_ <= blob_->size());
+  }
+
   // Trivial empty ctor.
   TraceBlobView() : data_(nullptr), length_(0) {}
 
@@ -105,9 +110,10 @@
   bool operator!=(const TraceBlobView& rhs) const { return !(*this == rhs); }
 
   const uint8_t* data() const { return data_; }
-  // TODO(primiano): normalize length() vs size() usage.
+  size_t offset() const { return static_cast<size_t>(data_ - blob_->data()); }
   size_t length() const { return length_; }
   size_t size() const { return length_; }
+  RefPtr<TraceBlob> blob() const { return blob_; }
 
  private:
   TraceBlobView(RefPtr<TraceBlob> blob, const uint8_t* data, uint32_t length)
diff --git a/src/trace_processor/importers/common/parser_types.h b/src/trace_processor/importers/common/parser_types.h
index 9d5976c..5794a64 100644
--- a/src/trace_processor/importers/common/parser_types.h
+++ b/src/trace_processor/importers/common/parser_types.h
@@ -26,14 +26,14 @@
 namespace perfetto {
 namespace trace_processor {
 
-struct InlineSchedSwitch {
+struct alignas(8) InlineSchedSwitch {
   int64_t prev_state;
   int32_t next_pid;
   int32_t next_prio;
   StringPool::Id next_comm;
 };
 
-struct InlineSchedWaking {
+struct alignas(8) InlineSchedWaking {
   int32_t pid;
   int32_t target_cpu;
   int32_t prio;
@@ -53,7 +53,15 @@
   explicit TrackEventData(TracePacketData tpd)
       : trace_packet_data(std::move(tpd)) {}
 
-  static constexpr size_t kMaxNumExtraCounters = 8;
+  static constexpr uint8_t kMaxNumExtraCounters = 8;
+
+  uint8_t CountExtraCounterValues() const {
+    for (uint8_t i = 0; i < TrackEventData::kMaxNumExtraCounters; ++i) {
+      if (std::equal_to<double>()(extra_counter_values[i], 0))
+        return i;
+    }
+    return TrackEventData::kMaxNumExtraCounters;
+  }
 
   TracePacketData trace_packet_data;
   base::Optional<int64_t> thread_timestamp;
diff --git a/src/trace_processor/sorter/BUILD.gn b/src/trace_processor/sorter/BUILD.gn
index d1d3069..baacc96 100644
--- a/src/trace_processor/sorter/BUILD.gn
+++ b/src/trace_processor/sorter/BUILD.gn
@@ -24,6 +24,8 @@
     "trace_sorter.h",
     "trace_sorter_internal.h",
     "trace_sorter_queue.h",
+    "trace_token_buffer.cc",
+    "trace_token_buffer.h",
   ]
   deps = [
     "../../../gn:default_deps",
@@ -35,6 +37,7 @@
     "../importers/systrace:systrace_line",
     "../storage",
     "../types",
+    "../util:bump_allocator",
   ]
 }
 
@@ -43,6 +46,7 @@
   sources = [
     "trace_sorter_queue_unittest.cc",
     "trace_sorter_unittest.cc",
+    "trace_token_buffer_unittest.cc",
   ]
   deps = [
     ":sorter",
diff --git a/src/trace_processor/sorter/trace_token_buffer.cc b/src/trace_processor/sorter/trace_token_buffer.cc
new file mode 100644
index 0000000..360041c
--- /dev/null
+++ b/src/trace_processor/sorter/trace_token_buffer.cc
@@ -0,0 +1,278 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/trace_processor/sorter/trace_token_buffer.h"
+
+#include <stdint.h>
+#include <algorithm>
+#include <cstdint>
+#include <cstring>
+#include <functional>
+#include <limits>
+#include <type_traits>
+#include <utility>
+
+#include "perfetto/base/compiler.h"
+#include "perfetto/ext/base/optional.h"
+#include "perfetto/trace_processor/trace_blob.h"
+#include "perfetto/trace_processor/trace_blob_view.h"
+#include "src/trace_processor/importers/common/parser_types.h"
+#include "src/trace_processor/util/bump_allocator.h"
+
+namespace perfetto {
+namespace trace_processor {
+namespace {
+
+struct alignas(8) TrackEventDataDescriptor {
+  static constexpr uint8_t kMaxOffsetFromInternedBlobBits = 25;
+  static constexpr uint32_t kMaxOffsetFromInternedBlob =
+      (1Ul << kMaxOffsetFromInternedBlobBits) - 1;
+
+  static constexpr uint8_t kMaxExtraCountersBits = 4;
+  static constexpr uint8_t kMaxExtraCounters = (1 << kMaxExtraCountersBits) - 1;
+  static_assert(TrackEventData::kMaxNumExtraCounters <= kMaxExtraCounters,
+                "Counter bits must be able to fit TrackEventData counters");
+
+  uint16_t intern_blob_index;
+  uint16_t intern_seq_index;
+  uint32_t intern_blob_offset : kMaxOffsetFromInternedBlobBits;
+  bool has_thread_timestamp : 1;
+  bool has_thread_instruction_count : 1;
+  bool has_counter_value : 1;
+  uint8_t extra_counter_count : kMaxExtraCountersBits;
+};
+static_assert(sizeof(TrackEventDataDescriptor) == 8,
+              "CompressedTracePacketData must be small");
+static_assert(alignof(TrackEventDataDescriptor) == 8,
+              "CompressedTracePacketData must be 8-aligned");
+
+template <typename T>
+T ExtractFromPtr(uint8_t** ptr) {
+  T* typed_ptr = reinterpret_cast<T*>(*ptr);
+  T value(std::move(*typed_ptr));
+  typed_ptr->~T();
+  *ptr += sizeof(T);
+  return value;
+}
+
+template <typename T>
+uint8_t* AppendToPtr(uint8_t* ptr, T value) {
+  new (ptr) T(std::move(value));
+  return ptr + sizeof(T);
+}
+
+uint32_t GetAllocSize(const TrackEventDataDescriptor& desc) {
+  uint32_t alloc_size = sizeof(TrackEventDataDescriptor);
+  alloc_size += desc.has_thread_instruction_count * sizeof(int64_t);
+  alloc_size += desc.has_thread_timestamp * sizeof(int64_t);
+  alloc_size += desc.has_counter_value * sizeof(double);
+  alloc_size += desc.extra_counter_count * sizeof(double);
+  return alloc_size;
+}
+
+}  // namespace
+
+TraceTokenBuffer::Id TraceTokenBuffer::Append(TrackEventData ted) {
+  // TrackEventData (and TracePacketData) are two big contributors to the size
+  // of the peak memory usage by sorted. The main reasons for this are a) object
+  // padding and b) using more bits than necessary to store their contents.
+  //
+  // The purpose of this function is to "compress" the contents of
+  // TrackEventData by utilising techniques like bitpacking, interning and
+  // variable length encoding to ensure only the amount of data which really
+  // needs to be stored is done so.
+
+  // Compress all the booleans indicating the presence of a value into 4 bits
+  // instead of 4 bytes as they would take inside base::Optional.
+  TrackEventDataDescriptor desc;
+  desc.has_thread_instruction_count = ted.thread_instruction_count.has_value();
+  desc.has_thread_timestamp = ted.thread_timestamp.has_value();
+  desc.has_counter_value = std::not_equal_to<double>()(ted.counter_value, 0);
+  desc.extra_counter_count = ted.CountExtraCounterValues();
+
+  // Allocate enough memory using the BumpAllocator to store the data in |ted|.
+  // Also figure out the interned index.
+  BumpAllocator::AllocId alloc_id =
+      AllocAndResizeInternedVectors(GetAllocSize(desc));
+  InternedIndex interned_index = GetInternedIndex(alloc_id);
+
+  // Compute the interning information for the TrackBlob and the SequenceState.
+  const TracePacketData& tpd = ted.trace_packet_data;
+  desc.intern_blob_offset = InternTraceBlob(interned_index, tpd.packet);
+  desc.intern_blob_index =
+      static_cast<uint16_t>(interned_blobs_.at(interned_index).size() - 1);
+  desc.intern_seq_index =
+      InternSeqState(interned_index, std::move(tpd.sequence_state));
+
+  // Add the "optional" fields of TrackEventData based on whether or not they
+  // are non-null.
+  uint8_t* ptr = static_cast<uint8_t*>(allocator_.GetPointer(alloc_id));
+  ptr = AppendToPtr(ptr, desc);
+  if (desc.has_thread_instruction_count) {
+    ptr = AppendToPtr(ptr, ted.thread_instruction_count.value());
+  }
+  if (desc.has_thread_timestamp) {
+    ptr = AppendToPtr(ptr, ted.thread_timestamp.value());
+  }
+  if (desc.has_counter_value) {
+    ptr = AppendToPtr(ptr, ted.counter_value);
+  }
+  for (uint32_t i = 0; i < desc.extra_counter_count; ++i) {
+    ptr = AppendToPtr(ptr, ted.extra_counter_values[i]);
+  }
+
+  // Store the packet size in the "out of band" bits.
+  uint32_t packet_size = static_cast<uint32_t>(tpd.packet.size());
+  PERFETTO_CHECK(packet_size <= protozero::proto_utils::kMaxMessageLength);
+  return Id{alloc_id, packet_size};
+}
+
+template <>
+TrackEventData TraceTokenBuffer::Extract<TrackEventData>(Id id) {
+  uint8_t* ptr = static_cast<uint8_t*>(allocator_.GetPointer(id.alloc_id));
+  TrackEventDataDescriptor desc =
+      ExtractFromPtr<TrackEventDataDescriptor>(&ptr);
+
+  InternedIndex interned_index = GetInternedIndex(id.alloc_id);
+  BlobWithOffset& bwo =
+      interned_blobs_.at(interned_index)[desc.intern_blob_index];
+  TraceBlobView tbv(RefPtr<TraceBlob>::FromReleasedUnsafe(bwo.blob),
+                    bwo.offset_in_blob + desc.intern_blob_offset,
+                    id.out_of_band);
+  auto seq = RefPtr<PacketSequenceStateGeneration>::FromReleasedUnsafe(
+      interned_seqs_.at(interned_index)[desc.intern_seq_index]);
+
+  TrackEventData ted{std::move(tbv), std::move(seq)};
+  if (desc.has_thread_instruction_count) {
+    ted.thread_instruction_count = ExtractFromPtr<int64_t>(&ptr);
+  }
+  if (desc.has_thread_timestamp) {
+    ted.thread_timestamp = ExtractFromPtr<int64_t>(&ptr);
+  }
+  if (desc.has_counter_value) {
+    ted.counter_value = ExtractFromPtr<double>(&ptr);
+  }
+  for (uint32_t i = 0; i < desc.extra_counter_count; ++i) {
+    ted.extra_counter_values[i] = ExtractFromPtr<double>(&ptr);
+  }
+  allocator_.Free(id.alloc_id);
+  return ted;
+}
+
+uint32_t TraceTokenBuffer::InternTraceBlob(InternedIndex interned_index,
+                                           const TraceBlobView& tbv) {
+  BlobWithOffsets& blobs = interned_blobs_.at(interned_index);
+  if (blobs.empty()) {
+    return AddTraceBlob(interned_index, tbv);
+  }
+
+  BlobWithOffset& last_blob = blobs.back();
+  if (last_blob.blob != tbv.blob().get()) {
+    return AddTraceBlob(interned_index, tbv);
+  }
+  PERFETTO_CHECK(last_blob.offset_in_blob <= tbv.offset());
+
+  // To allow our offsets in the store to be 16 bits, we intern not only the
+  // TraceBlob pointer but also the offset. By having this double indirection,
+  // we can store offset always as uint16 at the cost of storing blobs here more
+  // often: this more than pays for itself as in the majority of cases the
+  // offsets are small anyway.
+  size_t rel_offset = tbv.offset() - last_blob.offset_in_blob;
+  if (rel_offset > TrackEventDataDescriptor::kMaxOffsetFromInternedBlob) {
+    return AddTraceBlob(interned_index, tbv);
+  }
+
+  // Intentionally "leak" this pointer. This essentially keeps the refcount
+  // of this TraceBlob one higher than the number of RefPtrs pointing to it.
+  // This allows avoid storing the same RefPtr n times.
+  //
+  // Calls to this function are paired to Extract<TrackEventData> which picks
+  // up this "leaked" pointer.
+  TraceBlob* leaked = tbv.blob().ReleaseUnsafe();
+  base::ignore_result(leaked);
+  return static_cast<uint32_t>(rel_offset);
+}
+
+uint16_t TraceTokenBuffer::InternSeqState(
+    InternedIndex interned_index,
+    RefPtr<PacketSequenceStateGeneration> ptr) {
+  // Look back at most 32 elements. This should be far enough in most cases
+  // unless either: a) we are essentially round-robining between >32 sequences
+  // b) we are churning through generations. Either case seems pathalogical.
+  SequenceStates& states = interned_seqs_.at(interned_index);
+  size_t lookback = std::min<size_t>(32u, states.size());
+  for (uint32_t i = 0; i < lookback; ++i) {
+    uint16_t idx = static_cast<uint16_t>(states.size() - 1 - i);
+    if (states[idx] == ptr.get()) {
+      // Intentionally "leak" this pointer. See |InternTraceBlob| for an
+      // explanation.
+      PacketSequenceStateGeneration* leaked = ptr.ReleaseUnsafe();
+      base::ignore_result(leaked);
+      return idx;
+    }
+  }
+  states.emplace_back(ptr.ReleaseUnsafe());
+  PERFETTO_CHECK(states.size() <= std::numeric_limits<uint16_t>::max());
+  return static_cast<uint16_t>(states.size() - 1);
+}
+
+uint32_t TraceTokenBuffer::AddTraceBlob(InternedIndex interned_index,
+                                        const TraceBlobView& tbv) {
+  BlobWithOffsets& blobs = interned_blobs_.at(interned_index);
+  blobs.emplace_back(BlobWithOffset{tbv.blob().ReleaseUnsafe(), tbv.offset()});
+  PERFETTO_CHECK(blobs.size() <= std::numeric_limits<uint16_t>::max());
+  return 0u;
+}
+
+void TraceTokenBuffer::FreeMemory() {
+  uint32_t erased = allocator_.EraseFrontFreeChunks();
+  interned_blobs_.erase_front(erased);
+  interned_seqs_.erase_front(erased);
+  PERFETTO_DCHECK(interned_blobs_.size() == interned_seqs_.size());
+}
+
+BumpAllocator::AllocId TraceTokenBuffer::AllocAndResizeInternedVectors(
+    uint32_t size) {
+  uint32_t erased = allocator_.erased_front_chunks_count();
+  BumpAllocator::AllocId alloc_id = allocator_.Alloc(size);
+  uint32_t allocator_chunks_size = alloc_id.chunk_index - erased + 1;
+
+  // The allocator should never "remove" chunks from being tracked.
+  PERFETTO_DCHECK(allocator_chunks_size >= interned_blobs_.size());
+
+  // We should add at most one chunk in the allocator.
+  size_t chunks_added = allocator_chunks_size - interned_blobs_.size();
+  PERFETTO_DCHECK(chunks_added <= 1);
+  PERFETTO_DCHECK(interned_blobs_.size() == interned_seqs_.size());
+  for (uint32_t i = 0; i < chunks_added; ++i) {
+    interned_blobs_.emplace_back();
+    interned_seqs_.emplace_back();
+  }
+  return alloc_id;
+}
+
+TraceTokenBuffer::InternedIndex TraceTokenBuffer::GetInternedIndex(
+    BumpAllocator::AllocId alloc_id) {
+  uint32_t interned_index =
+      alloc_id.chunk_index - allocator_.erased_front_chunks_count();
+  PERFETTO_DCHECK(interned_index < interned_blobs_.size());
+  PERFETTO_DCHECK(interned_index < interned_seqs_.size());
+  PERFETTO_DCHECK(interned_blobs_.size() == interned_seqs_.size());
+  return interned_index;
+}
+
+}  // namespace trace_processor
+}  // namespace perfetto
diff --git a/src/trace_processor/sorter/trace_token_buffer.h b/src/trace_processor/sorter/trace_token_buffer.h
new file mode 100644
index 0000000..47eec13
--- /dev/null
+++ b/src/trace_processor/sorter/trace_token_buffer.h
@@ -0,0 +1,157 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACE_PROCESSOR_SORTER_TRACE_TOKEN_BUFFER_H_
+#define SRC_TRACE_PROCESSOR_SORTER_TRACE_TOKEN_BUFFER_H_
+
+#include <cstdint>
+#include <limits>
+#include <utility>
+#include <vector>
+
+#include "perfetto/base/compiler.h"
+#include "perfetto/ext/base/circular_queue.h"
+#include "perfetto/ext/base/optional.h"
+#include "perfetto/ext/base/utils.h"
+#include "perfetto/trace_processor/trace_blob.h"
+#include "perfetto/trace_processor/trace_blob_view.h"
+#include "src/trace_processor/importers/common/parser_types.h"
+#include "src/trace_processor/util/bump_allocator.h"
+
+namespace perfetto {
+namespace trace_processor {
+
+// Helper class which stores tokenized objects while the corresponding events
+// are being sorted by TraceSorter.
+//
+// This class intrusively compresses the tokenized objects as much as possible
+// to reduce their memory footprint. This is important to reduce the peak memory
+// usage of TraceProcessor which is always hit at some point during sorting.
+// The tokenized objects make up the vast majority of this peak so we trade the
+// complexity in this class for big reductions in the peak use.
+//
+// go/perfetto-tp-memory-use gives an overview of trace processor memory usage.
+class TraceTokenBuffer {
+ public:
+  // Identifier returned when appending items to this buffer. This id can
+  // later by passed to |Extract| to retrieve the event.
+  struct Id {
+    // The number of bits available for store data "out-of-band" (i.e. by
+    // bitpacking in free bits available in another data structure). The value
+    // of this is set based on the available bits in
+    // TraceSorter::TimestampedDescriptor.
+    static constexpr uint32_t kOutOfBandBits = 28;
+    static constexpr uint32_t kMaxOutOfBandValue = (1u << kOutOfBandBits) - 1;
+
+    // The allocation id of the object in the buffer.
+    BumpAllocator::AllocId alloc_id;
+
+    // "Out-of-band" data for this object. This allows squeezing as much memory
+    // as possible by using free bits in other data structures to store parts
+    // of tokens.
+    // This size of this field (i.e. kOutOfBandBits) is chosen to match the
+    // available bits in TraceSorter: there is a static_assert in
+    // TraceSorter::TimestampedDescriptor.
+    uint32_t out_of_band : kOutOfBandBits;
+  };
+  static_assert(protozero::proto_utils::kMaxMessageLength <=
+                    Id::kMaxOutOfBandValue,
+                "Packet length must fit in out-of-band field");
+
+  // Appends an object of type |T| to the token buffer. Returns an id for
+  // looking up the object later using |Extract|.
+  template <typename T>
+  PERFETTO_WARN_UNUSED_RESULT Id Append(T object) {
+    static_assert(sizeof(T) % 8 == 0, "Size must be a multiple of 8");
+    static_assert(alignof(T) == 8, "Alignment must be 8");
+    BumpAllocator::AllocId id = AllocAndResizeInternedVectors(sizeof(T));
+    new (allocator_.GetPointer(id)) T(std::move(object));
+    return Id{id, 0};
+  }
+  PERFETTO_WARN_UNUSED_RESULT Id Append(TrackEventData);
+  PERFETTO_WARN_UNUSED_RESULT Id Append(TracePacketData data) {
+    // While in theory we could add a special case for TracePacketData, the
+    // judgement call we make is that the code complexity does not justify the
+    // micro-performance gain you might hope to see by avoiding the few if
+    // conditions in the |TracePacketData| path.
+    return Append(TrackEventData(std::move(data)));
+  }
+
+  // Extracts an object of type |T| from the token buffer using an id previously
+  // returned by |Append|. This type *must* match the type added using Append.
+  // Mismatching types will caused undefined behaviour.
+  template <typename T>
+  PERFETTO_WARN_UNUSED_RESULT T Extract(Id id) {
+    T* typed_ptr = static_cast<T*>(allocator_.GetPointer(id.alloc_id));
+    T object(std::move(*typed_ptr));
+    typed_ptr->~T();
+    allocator_.Free(id.alloc_id);
+    return object;
+  }
+
+  // Returns the "past-the-end" id from the underlying allocator.
+  // The main use of this function is to provide an id which is greater than
+  // all ids previously returned by |Append|.
+  //
+  // This is similar to the |end()| function in standard library vector classes.
+  BumpAllocator::AllocId PastTheEndAllocId() {
+    return allocator_.PastTheEndId();
+  }
+
+  // Attempts to free any memory retained by this buffer and the underlying
+  // allocator. The amount of memory free is implementation defined.
+  void FreeMemory();
+
+ private:
+  struct BlobWithOffset {
+    TraceBlob* blob;
+    size_t offset_in_blob;
+  };
+  using InternedIndex = uint32_t;
+  using BlobWithOffsets = std::vector<BlobWithOffset>;
+  using SequenceStates = std::vector<PacketSequenceStateGeneration*>;
+
+  // Functions to intern TraceBlob and PacketSequenceStateGeneration: as these
+  // are often shared between packets, we can significantly reduce memory use
+  // by only storing them once.
+  uint32_t InternTraceBlob(InternedIndex, const TraceBlobView&);
+  uint16_t InternSeqState(InternedIndex, RefPtr<PacketSequenceStateGeneration>);
+  uint32_t AddTraceBlob(InternedIndex, const TraceBlobView&);
+
+  BumpAllocator::AllocId AllocAndResizeInternedVectors(uint32_t size);
+  InternedIndex GetInternedIndex(BumpAllocator::AllocId);
+
+  BumpAllocator allocator_;
+  base::CircularQueue<BlobWithOffsets> interned_blobs_;
+  base::CircularQueue<SequenceStates> interned_seqs_;
+};
+
+// GCC7 does not like us declaring these inside the class so define these
+// out-of-line.
+template <>
+PERFETTO_WARN_UNUSED_RESULT TrackEventData
+    TraceTokenBuffer::Extract<TrackEventData>(Id);
+template <>
+PERFETTO_WARN_UNUSED_RESULT inline TracePacketData
+TraceTokenBuffer::Extract<TracePacketData>(Id id) {
+  // See the comment in Append(TracePacketData) for why we do this.
+  return Extract<TrackEventData>(id).trace_packet_data;
+}
+
+}  // namespace trace_processor
+}  // namespace perfetto
+
+#endif  // SRC_TRACE_PROCESSOR_SORTER_TRACE_TOKEN_BUFFER_H_
diff --git a/src/trace_processor/sorter/trace_token_buffer_unittest.cc b/src/trace_processor/sorter/trace_token_buffer_unittest.cc
new file mode 100644
index 0000000..6689dc5
--- /dev/null
+++ b/src/trace_processor/sorter/trace_token_buffer_unittest.cc
@@ -0,0 +1,172 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/trace_processor/sorter/trace_token_buffer.h"
+
+#include "perfetto/base/compiler.h"
+#include "perfetto/ext/base/optional.h"
+#include "perfetto/trace_processor/trace_blob.h"
+#include "perfetto/trace_processor/trace_blob_view.h"
+#include "src/trace_processor/importers/common/parser_types.h"
+#include "src/trace_processor/importers/proto/packet_sequence_state.h"
+#include "src/trace_processor/types/trace_processor_context.h"
+#include "test/gtest_and_gmock.h"
+
+namespace perfetto {
+namespace trace_processor {
+namespace {
+
+class TraceTokenBufferUnittest : public testing::Test {
+ protected:
+  TraceTokenBuffer store;
+  TraceProcessorContext context;
+  PacketSequenceState state{&context};
+};
+
+TEST_F(TraceTokenBufferUnittest, TracePacketDataInOut) {
+  TraceBlobView tbv(TraceBlob::Allocate(1024));
+  TracePacketData tpd{tbv.copy(), state.current_generation()};
+
+  TraceTokenBuffer::Id id = store.Append(std::move(tpd));
+  ASSERT_EQ(id.out_of_band, 1024u);
+
+  TracePacketData extracted = store.Extract<TracePacketData>(id);
+  ASSERT_EQ(extracted.packet, tbv);
+  ASSERT_EQ(extracted.sequence_state, state.current_generation());
+}
+
+TEST_F(TraceTokenBufferUnittest, PacketAppendMultipleBlobs) {
+  TraceBlobView tbv_1(TraceBlob::Allocate(1024));
+  TraceBlobView tbv_2(TraceBlob::Allocate(2048));
+  TraceBlobView tbv_3(TraceBlob::Allocate(4096));
+
+  TraceTokenBuffer::Id id_1 =
+      store.Append(TracePacketData{tbv_1.copy(), state.current_generation()});
+  TraceTokenBuffer::Id id_2 =
+      store.Append(TracePacketData{tbv_2.copy(), state.current_generation()});
+  ASSERT_EQ(store.Extract<TracePacketData>(id_1).packet, tbv_1);
+  ASSERT_EQ(store.Extract<TracePacketData>(id_2).packet, tbv_2);
+
+  TraceTokenBuffer::Id id_3 =
+      store.Append(TracePacketData{tbv_3.copy(), state.current_generation()});
+  ASSERT_EQ(store.Extract<TracePacketData>(id_3).packet, tbv_3);
+}
+
+TEST_F(TraceTokenBufferUnittest, BlobSharing) {
+  TraceBlobView root(TraceBlob::Allocate(2048));
+  TraceBlobView tbv_1 = root.slice_off(0, 1024);
+  TraceBlobView tbv_2 = root.slice_off(1024, 512);
+  TraceBlobView tbv_3 = root.slice_off(1536, 512);
+
+  TraceTokenBuffer::Id id_1 =
+      store.Append(TracePacketData{tbv_1.copy(), state.current_generation()});
+  TraceTokenBuffer::Id id_2 =
+      store.Append(TracePacketData{tbv_2.copy(), state.current_generation()});
+  ASSERT_EQ(store.Extract<TracePacketData>(id_1).packet, tbv_1);
+  ASSERT_EQ(store.Extract<TracePacketData>(id_2).packet, tbv_2);
+
+  TraceTokenBuffer::Id id_3 =
+      store.Append(TracePacketData{tbv_3.copy(), state.current_generation()});
+  ASSERT_EQ(store.Extract<TracePacketData>(id_3).packet, tbv_3);
+}
+
+TEST_F(TraceTokenBufferUnittest, SequenceStateSharing) {
+  TraceBlobView root(TraceBlob::Allocate(2048));
+  TraceBlobView tbv_1 = root.slice_off(0, 1024);
+  TraceBlobView tbv_2 = root.slice_off(1024, 512);
+
+  TraceTokenBuffer::Id id_1 =
+      store.Append(TracePacketData{tbv_1.copy(), state.current_generation()});
+  TraceTokenBuffer::Id id_2 =
+      store.Append(TracePacketData{tbv_2.copy(), state.current_generation()});
+  ASSERT_EQ(store.Extract<TracePacketData>(id_1).sequence_state,
+            state.current_generation());
+  ASSERT_EQ(store.Extract<TracePacketData>(id_2).sequence_state,
+            state.current_generation());
+}
+
+TEST_F(TraceTokenBufferUnittest, ManySequenceState) {
+  TraceBlobView root(TraceBlob::Allocate(1024));
+
+  std::array<TraceTokenBuffer::Id, 1024> ids;
+  std::array<PacketSequenceStateGeneration*, 1024> refs;
+  for (uint32_t i = 0; i < 1024; ++i) {
+    refs[i] = state.current_generation().get();
+    ids[i] = store.Append(
+        TracePacketData{root.slice_off(i, 1), state.current_generation()});
+    state.OnIncrementalStateCleared();
+  }
+
+  for (uint32_t i = 0; i < 1024; ++i) {
+    ASSERT_EQ(refs[i],
+              store.Extract<TracePacketData>(ids[i]).sequence_state.get());
+  }
+}
+
+TEST_F(TraceTokenBufferUnittest, PacketLargeOffset) {
+  TraceBlobView tbv(TraceBlob::Allocate(256ul * 1024));
+
+  TraceBlobView slice_1 = tbv.slice_off(0, 1024ul);
+  TraceTokenBuffer::Id id_1 =
+      store.Append(TracePacketData{slice_1.copy(), state.current_generation()});
+  TracePacketData out_1 = store.Extract<TracePacketData>(id_1);
+  ASSERT_EQ(out_1.packet, slice_1);
+  ASSERT_EQ(out_1.sequence_state, state.current_generation());
+
+  TraceBlobView slice_2 = tbv.slice_off(128ul * 1024, 1024ul);
+  TraceTokenBuffer::Id id_2 =
+      store.Append(TracePacketData{slice_2.copy(), state.current_generation()});
+  TracePacketData out_2 = store.Extract<TracePacketData>(id_2);
+  ASSERT_EQ(out_2.packet, slice_2);
+  ASSERT_EQ(out_2.sequence_state, state.current_generation());
+}
+
+TEST_F(TraceTokenBufferUnittest, TrackEventDataInOut) {
+  TraceBlobView tbv(TraceBlob::Allocate(1234));
+  TrackEventData ted(tbv.copy(), state.current_generation());
+  ted.thread_instruction_count = 123;
+  ted.extra_counter_values = {10, 2, 0, 0, 0, 0, 0, 0};
+  auto counter_array = ted.extra_counter_values;
+
+  TraceTokenBuffer::Id id = store.Append(std::move(ted));
+  ASSERT_EQ(id.out_of_band, 1234u);
+
+  TrackEventData extracted = store.Extract<TrackEventData>(id);
+  ASSERT_EQ(extracted.trace_packet_data.packet, tbv);
+  ASSERT_EQ(extracted.trace_packet_data.sequence_state,
+            state.current_generation());
+  ASSERT_EQ(extracted.thread_instruction_count, 123);
+  ASSERT_EQ(extracted.thread_timestamp, base::nullopt);
+  ASSERT_DOUBLE_EQ(extracted.counter_value, 0.0);
+  ASSERT_EQ(extracted.extra_counter_values, counter_array);
+}
+
+TEST_F(TraceTokenBufferUnittest, ExtractOrAppendAfterFreeMemory) {
+  auto unused_res = store.Extract<TraceBlobView>(
+      store.Append(TraceBlobView(TraceBlob::Allocate(1234))));
+  base::ignore_result(unused_res);
+
+  store.FreeMemory();
+
+  TraceTokenBuffer::Id id =
+      store.Append(TraceBlobView(TraceBlob::Allocate(4567)));
+  TraceBlobView tbv = store.Extract<TraceBlobView>(id);
+  ASSERT_EQ(tbv.size(), 4567u);
+}
+
+}  // namespace
+}  // namespace trace_processor
+}  // namespace perfetto