profiling: reuse interning output tracker in traced_perf

NB: this refactoring slightly changes the points at which the
heapprofd's DumpState checks for the 500k packet chunking limits. I
believe this retains enough calls to |GetCurrentInternedData| to be
fine, but please double-check (in particular, the WriteMap).

The InterningOutputTracker (nee InternState) isn't yet fully untangled
from a couple of heapprofd specifics, looking for advice on how you'd
prefer them resolved.

The perf profiler is now writing StreamingProfilePackets as a
placeholder. Final flush coming soon.

Bug: 144281346
Change-Id: I7897fe6eae71d1745adb95550177a48832838590
diff --git a/Android.bp b/Android.bp
index 83ac1b6..781d421 100644
--- a/Android.bp
+++ b/Android.bp
@@ -112,6 +112,7 @@
     ":perfetto_src_ipc_common",
     ":perfetto_src_profiling_common_callstack_trie",
     ":perfetto_src_profiling_common_interner",
+    ":perfetto_src_profiling_common_interning_output",
     ":perfetto_src_profiling_common_unwind_support",
     ":perfetto_src_profiling_memory_daemon",
     ":perfetto_src_profiling_memory_proc_utils",
@@ -1346,6 +1347,7 @@
     ":perfetto_src_perfetto_cmd_perfetto_atoms",
     ":perfetto_src_profiling_common_callstack_trie",
     ":perfetto_src_profiling_common_interner",
+    ":perfetto_src_profiling_common_interning_output",
     ":perfetto_src_profiling_common_unwind_support",
     ":perfetto_src_profiling_memory_client",
     ":perfetto_src_profiling_memory_daemon",
@@ -5611,6 +5613,14 @@
   name: "perfetto_src_profiling_common_interner",
 }
 
+// GN: //src/profiling/common:interning_output
+filegroup {
+  name: "perfetto_src_profiling_common_interning_output",
+  srcs: [
+    "src/profiling/common/interning_output.cc",
+  ],
+}
+
 // GN: //src/profiling/common:unittests
 filegroup {
   name: "perfetto_src_profiling_common_unittests",
@@ -6996,6 +7006,7 @@
     ":perfetto_src_perfetto_cmd_unittests",
     ":perfetto_src_profiling_common_callstack_trie",
     ":perfetto_src_profiling_common_interner",
+    ":perfetto_src_profiling_common_interning_output",
     ":perfetto_src_profiling_common_unittests",
     ":perfetto_src_profiling_common_unwind_support",
     ":perfetto_src_profiling_deobfuscator",
@@ -7495,6 +7506,7 @@
     ":perfetto_src_ipc_common",
     ":perfetto_src_profiling_common_callstack_trie",
     ":perfetto_src_profiling_common_interner",
+    ":perfetto_src_profiling_common_interning_output",
     ":perfetto_src_profiling_common_unwind_support",
     ":perfetto_src_profiling_perf_proc_descriptors",
     ":perfetto_src_profiling_perf_producer",
diff --git a/src/profiling/common/BUILD.gn b/src/profiling/common/BUILD.gn
index 8bd7784..fc609c6 100644
--- a/src/profiling/common/BUILD.gn
+++ b/src/profiling/common/BUILD.gn
@@ -28,9 +28,9 @@
 }
 
 source_set("callstack_trie") {
+  public_deps = [ ":unwind_support" ]
   deps = [
     ":interner",
-    ":unwind_support",
     "../../../gn:default_deps",
     "../../../src/base",
   ]
@@ -48,6 +48,22 @@
   sources = [ "interner.h" ]
 }
 
+source_set("interning_output") {
+  deps = [
+    ":callstack_trie",
+    ":interner",
+    "../../../gn:default_deps",
+    "../../../include/perfetto/ext/tracing/core",
+    "../../../protos/perfetto/trace:zero",
+    "../../../protos/perfetto/trace/interned_data:zero",
+    "../../../protos/perfetto/trace/profiling:zero",
+  ]
+  sources = [
+    "interning_output.cc",
+    "interning_output.h",
+  ]
+}
+
 perfetto_unittest_source_set("unittests") {
   testonly = true
   deps = [
diff --git a/src/profiling/common/callstack_trie.h b/src/profiling/common/callstack_trie.h
index 45b2786..81ef0a8 100644
--- a/src/profiling/common/callstack_trie.h
+++ b/src/profiling/common/callstack_trie.h
@@ -18,6 +18,7 @@
 #define SRC_PROFILING_COMMON_CALLSTACK_TRIE_H_
 
 #include <string>
+#include <typeindex>
 #include <vector>
 
 #include "perfetto/ext/base/lookup_set.h"
@@ -174,9 +175,8 @@
 }  // namespace profiling
 }  // namespace perfetto
 
-namespace std {
 template <>
-struct hash<::perfetto::profiling::Mapping> {
+struct std::hash<::perfetto::profiling::Mapping> {
   using argument_type = ::perfetto::profiling::Mapping;
   using result_type = size_t;
   result_type operator()(const argument_type& mapping) {
@@ -194,7 +194,7 @@
 };
 
 template <>
-struct hash<::perfetto::profiling::Frame> {
+struct std::hash<::perfetto::profiling::Frame> {
   using argument_type = ::perfetto::profiling::Frame;
   using result_type = size_t;
   result_type operator()(const argument_type& frame) {
@@ -204,6 +204,5 @@
     return h;
   }
 };
-}  // namespace std
 
 #endif  // SRC_PROFILING_COMMON_CALLSTACK_TRIE_H_
diff --git a/src/profiling/common/interning_output.cc b/src/profiling/common/interning_output.cc
new file mode 100644
index 0000000..dee216d
--- /dev/null
+++ b/src/profiling/common/interning_output.cc
@@ -0,0 +1,162 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/common/interning_output.h"
+
+#include "protos/perfetto/trace/interned_data/interned_data.pbzero.h"
+#include "protos/perfetto/trace/profiling/profile_common.pbzero.h"
+#include "protos/perfetto/trace/profiling/profile_packet.pbzero.h"
+#include "protos/perfetto/trace/trace_packet.pbzero.h"
+
+namespace {
+// Flags used to distinguish distinct types of interned strings.
+constexpr int kDumpedBuildID = 1 << 0;
+constexpr int kDumpedMappingPath = 1 << 1;
+constexpr int kDumpedFunctionName = 1 << 2;
+}  // namespace
+
+namespace perfetto {
+namespace profiling {
+
+// static
+void InterningOutputTracker::WriteFixedInterningsPacket(
+    TraceWriter* trace_writer) {
+  constexpr const uint8_t kEmptyString[] = "";
+  // Explicitly reserve intern ID 0 for the empty string, so unset string
+  // fields get mapped to this.
+  auto packet = trace_writer->NewTracePacket();
+  auto* interned_data = packet->set_interned_data();
+  auto* interned_string = interned_data->add_build_ids();
+  interned_string->set_iid(0);
+  interned_string->set_str(kEmptyString, 0);
+
+  interned_string = interned_data->add_mapping_paths();
+  interned_string->set_iid(0);
+  interned_string->set_str(kEmptyString, 0);
+
+  interned_string = interned_data->add_function_names();
+  interned_string->set_iid(0);
+  interned_string->set_str(kEmptyString, 0);
+
+  packet->set_incremental_state_cleared(true);
+}
+
+void InterningOutputTracker::WriteMap(const Interned<Mapping> map,
+                                      protos::pbzero::InternedData* out) {
+  auto map_it_and_inserted = dumped_mappings_.emplace(map.id());
+  if (map_it_and_inserted.second) {
+    for (const Interned<std::string>& str : map->path_components)
+      WriteMappingPathString(str, out);
+
+    WriteBuildIDString(map->build_id, out);
+
+    protos::pbzero::Mapping* mapping = out->add_mappings();
+    mapping->set_iid(map.id());
+    mapping->set_exact_offset(map->exact_offset);
+    mapping->set_start_offset(map->start_offset);
+    mapping->set_start(map->start);
+    mapping->set_end(map->end);
+    mapping->set_load_bias(map->load_bias);
+    mapping->set_build_id(map->build_id.id());
+    for (const Interned<std::string>& str : map->path_components)
+      mapping->add_path_string_ids(str.id());
+  }
+}
+
+void InterningOutputTracker::WriteFrame(Interned<Frame> frame,
+                                        protos::pbzero::InternedData* out) {
+  WriteMap(frame->mapping, out);
+  WriteFunctionNameString(frame->function_name, out);
+  bool inserted;
+  std::tie(std::ignore, inserted) = dumped_frames_.emplace(frame.id());
+  if (inserted) {
+    protos::pbzero::Frame* frame_proto = out->add_frames();
+    frame_proto->set_iid(frame.id());
+    frame_proto->set_function_name_id(frame->function_name.id());
+    frame_proto->set_mapping_id(frame->mapping.id());
+    frame_proto->set_rel_pc(frame->rel_pc);
+  }
+}
+
+void InterningOutputTracker::WriteBuildIDString(
+    const Interned<std::string>& str,
+    protos::pbzero::InternedData* out) {
+  auto it_and_inserted = dumped_strings_.emplace(str.id(), 0);
+  auto it = it_and_inserted.first;
+  // This is for the rare case that the same string is used as two different
+  // types (e.g. a function name that matches a path segment). In that case
+  // we need to emit the string as all of its types.
+  if ((it->second & kDumpedBuildID) == 0) {
+    protos::pbzero::InternedString* interned_string = out->add_build_ids();
+    interned_string->set_iid(str.id());
+    interned_string->set_str(str.data());
+    it->second |= kDumpedBuildID;
+  }
+}
+
+void InterningOutputTracker::WriteMappingPathString(
+    const Interned<std::string>& str,
+    protos::pbzero::InternedData* out) {
+  auto it_and_inserted = dumped_strings_.emplace(str.id(), 0);
+  auto it = it_and_inserted.first;
+  // This is for the rare case that the same string is used as two different
+  // types (e.g. a function name that matches a path segment). In that case
+  // we need to emit the string as all of its types.
+  if ((it->second & kDumpedMappingPath) == 0) {
+    protos::pbzero::InternedString* interned_string = out->add_mapping_paths();
+    interned_string->set_iid(str.id());
+    interned_string->set_str(str.data());
+    it->second |= kDumpedMappingPath;
+  }
+}
+
+void InterningOutputTracker::WriteFunctionNameString(
+    const Interned<std::string>& str,
+    protos::pbzero::InternedData* out) {
+  auto it_and_inserted = dumped_strings_.emplace(str.id(), 0);
+  auto it = it_and_inserted.first;
+  // This is for the rare case that the same string is used as two different
+  // types (e.g. a function name that matches a path segment). In that case
+  // we need to emit the string as all of its types.
+  if ((it->second & kDumpedFunctionName) == 0) {
+    protos::pbzero::InternedString* interned_string = out->add_function_names();
+    interned_string->set_iid(str.id());
+    interned_string->set_str(str.data());
+    it->second |= kDumpedFunctionName;
+  }
+}
+
+void InterningOutputTracker::WriteCallstack(GlobalCallstackTrie::Node* node,
+                                            GlobalCallstackTrie* trie,
+                                            protos::pbzero::InternedData* out) {
+  bool inserted;
+  std::tie(std::ignore, inserted) = dumped_callstacks_.emplace(node->id());
+  if (inserted) {
+    // There need to be two separate loops over built_callstack because
+    // protozero cannot interleave different messages.
+    auto built_callstack = trie->BuildCallstack(node);
+    for (const Interned<Frame>& frame : built_callstack)
+      WriteFrame(frame, out);
+
+    protos::pbzero::Callstack* callstack = out->add_callstacks();
+    callstack->set_iid(node->id());
+    for (const Interned<Frame>& frame : built_callstack)
+      callstack->add_frame_ids(frame.id());
+  }
+}
+
+}  // namespace profiling
+}  // namespace perfetto
diff --git a/src/profiling/common/interning_output.h b/src/profiling/common/interning_output.h
new file mode 100644
index 0000000..3424500
--- /dev/null
+++ b/src/profiling/common/interning_output.h
@@ -0,0 +1,77 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_PROFILING_COMMON_INTERNING_OUTPUT_H_
+#define SRC_PROFILING_COMMON_INTERNING_OUTPUT_H_
+
+#include <map>
+#include <set>
+
+#include <stdint.h>
+
+#include "perfetto/ext/tracing/core/trace_writer.h"
+#include "src/profiling/common/callstack_trie.h"
+#include "src/profiling/common/interner.h"
+
+#include "protos/perfetto/trace/interned_data/interned_data.pbzero.h"
+
+namespace perfetto {
+namespace profiling {
+
+class InterningOutputTracker {
+ public:
+  // Writes out a full packet containing the "empty" (zero) internings.
+  // NB: resulting packet has |incremental_state_cleared| set.
+  static void WriteFixedInterningsPacket(TraceWriter* trace_writer);
+
+  void WriteMap(const Interned<Mapping> map, protos::pbzero::InternedData* out);
+  void WriteFrame(Interned<Frame> frame, protos::pbzero::InternedData* out);
+  void WriteBuildIDString(const Interned<std::string>& str,
+                          protos::pbzero::InternedData* out);
+  void WriteMappingPathString(const Interned<std::string>& str,
+                              protos::pbzero::InternedData* out);
+  void WriteFunctionNameString(const Interned<std::string>& str,
+                               protos::pbzero::InternedData* out);
+
+  // Writes out the callstack represented by the given node.
+  void WriteCallstack(GlobalCallstackTrie::Node* node,
+                      GlobalCallstackTrie* trie,
+                      protos::pbzero::InternedData* out);
+
+  bool IsCallstackNew(uint64_t callstack_id) {
+    return dumped_callstacks_.find(callstack_id) == dumped_callstacks_.end();
+  }
+
+  // TODO(rsavitski): move elsewhere, used in heapprofd for orthogonal
+  // reasons. Shouldn't be cleared together with the rest of the incremental
+  // state.
+  uint64_t* HeapprofdNextIndexMutable() { return &next_index_; }
+
+ private:
+  // Map value is a bitfield distinguishing the distinct string fields
+  // the string can be emitted as, e.g. kDumpedBuildID.
+  std::map<InternID, int> dumped_strings_;
+  std::set<InternID> dumped_frames_;
+  std::set<InternID> dumped_mappings_;
+  std::set<uint64_t> dumped_callstacks_;  // uses callstack trie's node ids
+
+  uint64_t next_index_ = 0;
+};
+
+}  // namespace profiling
+}  // namespace perfetto
+
+#endif  // SRC_PROFILING_COMMON_INTERNING_OUTPUT_H_
diff --git a/src/profiling/memory/BUILD.gn b/src/profiling/memory/BUILD.gn
index c814b6b..ae46a26 100644
--- a/src/profiling/memory/BUILD.gn
+++ b/src/profiling/memory/BUILD.gn
@@ -142,6 +142,7 @@
     "../../tracing/ipc/producer",
     "../common:callstack_trie",
     "../common:interner",
+    "../common:interning_output",
     "../common:unwind_support",
   ]
   public_deps = [
diff --git a/src/profiling/memory/bookkeeping_dump.cc b/src/profiling/memory/bookkeeping_dump.cc
index 6f72074..514424b 100644
--- a/src/profiling/memory/bookkeeping_dump.cc
+++ b/src/profiling/memory/bookkeeping_dump.cc
@@ -19,7 +19,6 @@
 namespace perfetto {
 namespace profiling {
 namespace {
-using ::perfetto::protos::pbzero::Callstack;
 using ::perfetto::protos::pbzero::ProfilePacket;
 // This needs to be lower than the maximum acceptable chunk size, because this
 // is checked *before* writing another submessage. We conservatively assume
@@ -28,113 +27,31 @@
 uint32_t kPacketSizeThreshold = 400000;
 }  // namespace
 
-void WriteFixedInternings(TraceWriter* trace_writer) {
-  constexpr const uint8_t kEmptyString[] = "";
-  // Explicitly reserve intern ID 0 for the empty string, so unset string
-  // fields get mapped to this.
-  auto packet = trace_writer->NewTracePacket();
-  auto* interned_data = packet->set_interned_data();
-  auto interned_string = interned_data->add_build_ids();
-  interned_string->set_iid(0);
-  interned_string->set_str(kEmptyString, 0);
-
-  interned_string = interned_data->add_mapping_paths();
-  interned_string->set_iid(0);
-  interned_string->set_str(kEmptyString, 0);
-
-  interned_string = interned_data->add_function_names();
-  interned_string->set_iid(0);
-  interned_string->set_str(kEmptyString, 0);
-
-  packet->set_incremental_state_cleared(true);
-}
-
 void DumpState::WriteMap(const Interned<Mapping> map) {
-  auto map_it_and_inserted = intern_state_->dumped_mappings_.emplace(map.id());
-  if (map_it_and_inserted.second) {
-    for (const Interned<std::string>& str : map->path_components)
-      WriteMappingPathString(str);
-
-    WriteBuildIDString(map->build_id);
-
-    auto mapping = GetCurrentInternedData()->add_mappings();
-    mapping->set_iid(map.id());
-    mapping->set_exact_offset(map->exact_offset);
-    mapping->set_start_offset(map->start_offset);
-    mapping->set_start(map->start);
-    mapping->set_end(map->end);
-    mapping->set_load_bias(map->load_bias);
-    mapping->set_build_id(map->build_id.id());
-    for (const Interned<std::string>& str : map->path_components)
-      mapping->add_path_string_ids(str.id());
-  }
+  intern_state_->WriteMap(map, GetCurrentInternedData());
 }
 
 void DumpState::WriteFrame(Interned<Frame> frame) {
-  WriteMap(frame->mapping);
-  WriteFunctionNameString(frame->function_name);
-  bool inserted;
-  std::tie(std::ignore, inserted) =
-      intern_state_->dumped_frames_.emplace(frame.id());
-  if (inserted) {
-    auto frame_proto = GetCurrentInternedData()->add_frames();
-    frame_proto->set_iid(frame.id());
-    frame_proto->set_function_name_id(frame->function_name.id());
-    frame_proto->set_mapping_id(frame->mapping.id());
-    frame_proto->set_rel_pc(frame->rel_pc);
-  }
+  intern_state_->WriteFrame(frame, GetCurrentInternedData());
 }
 
 void DumpState::WriteBuildIDString(const Interned<std::string>& str) {
-  auto it_and_inserted = intern_state_->dumped_strings_.emplace(str.id(), 0);
-  auto it = it_and_inserted.first;
-  // This is for the rare case that the same string is used as two different
-  // types (e.g. a function name that matches a path segment). In that case
-  // we need to emit the string as all of its types.
-  if ((it->second & kDumpedBuildID) == 0) {
-    auto interned_string = GetCurrentInternedData()->add_build_ids();
-    interned_string->set_iid(str.id());
-    interned_string->set_str(reinterpret_cast<const uint8_t*>(str->c_str()),
-                             str->size());
-    it->second |= kDumpedBuildID;
-  }
+  intern_state_->WriteBuildIDString(str, GetCurrentInternedData());
 }
 
 void DumpState::WriteMappingPathString(const Interned<std::string>& str) {
-  auto it_and_inserted = intern_state_->dumped_strings_.emplace(str.id(), 0);
-  auto it = it_and_inserted.first;
-  // This is for the rare case that the same string is used as two different
-  // types (e.g. a function name that matches a path segment). In that case
-  // we need to emit the string as all of its types.
-  if ((it->second & kDumpedMappingPath) == 0) {
-    auto interned_string = GetCurrentInternedData()->add_mapping_paths();
-    interned_string->set_iid(str.id());
-    interned_string->set_str(reinterpret_cast<const uint8_t*>(str->c_str()),
-                             str->size());
-    it->second |= kDumpedMappingPath;
-  }
+  intern_state_->WriteMappingPathString(str, GetCurrentInternedData());
 }
 
 void DumpState::WriteFunctionNameString(const Interned<std::string>& str) {
-  auto it_and_inserted = intern_state_->dumped_strings_.emplace(str.id(), 0);
-  auto it = it_and_inserted.first;
-  // This is for the rare case that the same string is used as two different
-  // types (e.g. a function name that matches a path segment). In that case
-  // we need to emit the string as all of its types.
-  if ((it->second & kDumpedFunctionName) == 0) {
-    auto interned_string = GetCurrentInternedData()->add_function_names();
-    interned_string->set_iid(str.id());
-    interned_string->set_str(reinterpret_cast<const uint8_t*>(str->c_str()),
-                             str->size());
-    it->second |= kDumpedFunctionName;
-  }
+  intern_state_->WriteFunctionNameString(str, GetCurrentInternedData());
 }
 
 void DumpState::WriteAllocation(const HeapTracker::CallstackAllocations& alloc,
                                 bool dump_at_max_mode) {
-  if (intern_state_->dumped_callstacks_.find(alloc.node->id()) ==
-      intern_state_->dumped_callstacks_.end())
+  if (intern_state_->IsCallstackNew(alloc.node->id())) {
     callstacks_to_dump_.emplace(alloc.node);
+  }
 
   auto* heap_samples = GetCurrentProcessHeapSamples();
   ProfilePacket::HeapSample* sample = heap_samples->add_samples();
@@ -165,17 +82,7 @@
   if (current_trace_packet_)
     current_profile_packet_->set_continued(true);
   for (GlobalCallstackTrie::Node* node : callstacks_to_dump_) {
-    // There need to be two separate loops over built_callstack because
-    // protozero cannot interleave different messages.
-    auto built_callstack = callsites->BuildCallstack(node);
-    for (const Interned<Frame>& frame : built_callstack)
-      WriteFrame(frame);
-    Callstack* callstack = GetCurrentInternedData()->add_callstacks();
-    callstack->set_iid(node->id());
-    for (const Interned<Frame>& frame : built_callstack)
-      callstack->add_frame_ids(frame.id());
-
-    intern_state_->dumped_callstacks_.emplace(node->id());
+    intern_state_->WriteCallstack(node, callsites, GetCurrentInternedData());
   }
   MakeProfilePacket();
 }
diff --git a/src/profiling/memory/bookkeeping_dump.h b/src/profiling/memory/bookkeeping_dump.h
index a983781..bf19133 100644
--- a/src/profiling/memory/bookkeeping_dump.h
+++ b/src/profiling/memory/bookkeeping_dump.h
@@ -30,36 +30,19 @@
 #include "perfetto/ext/tracing/core/trace_writer.h"
 
 #include "src/profiling/common/interner.h"
+#include "src/profiling/common/interning_output.h"
 #include "src/profiling/memory/bookkeeping.h"
 
 namespace perfetto {
 namespace profiling {
 
-void WriteFixedInternings(TraceWriter* trace_writer);
-
-constexpr int kDumpedBuildID = 1 << 0;
-constexpr int kDumpedMappingPath = 1 << 1;
-constexpr int kDumpedFunctionName = 1 << 2;
-
 class DumpState {
  public:
-  class InternState {
-   private:
-    friend class DumpState;
-
-    std::map<InternID, int> dumped_strings_;
-    std::set<InternID> dumped_frames_;
-    std::set<InternID> dumped_mappings_;
-    std::set<uint64_t> dumped_callstacks_;
-
-    uint64_t next_index_ = 0;
-  };
-
   DumpState(
       TraceWriter* trace_writer,
       std::function<void(protos::pbzero::ProfilePacket::ProcessHeapSamples*)>
           process_fill_header,
-      InternState* intern_state)
+      InterningOutputTracker* intern_state)
       : trace_writer_(trace_writer),
         intern_state_(intern_state),
         current_process_fill_header_(std::move(process_fill_header)) {
@@ -103,7 +86,8 @@
     MakeTracePacket();
 
     current_profile_packet_ = current_trace_packet_->set_profile_packet();
-    current_profile_packet_->set_index(intern_state_->next_index_++);
+    uint64_t* next_index = intern_state_->HeapprofdNextIndexMutable();
+    current_profile_packet_->set_index((*next_index)++);
   }
 
   uint64_t currently_written() {
@@ -117,7 +101,7 @@
   std::set<GlobalCallstackTrie::Node*> callstacks_to_dump_;
 
   TraceWriter* trace_writer_;
-  InternState* intern_state_;
+  InterningOutputTracker* intern_state_;
 
   protos::pbzero::ProfilePacket* current_profile_packet_ = nullptr;
   protos::pbzero::InternedData* current_interned_data_ = nullptr;
diff --git a/src/profiling/memory/heapprofd_producer.cc b/src/profiling/memory/heapprofd_producer.cc
index 91db341..d1ed438 100644
--- a/src/profiling/memory/heapprofd_producer.cc
+++ b/src/profiling/memory/heapprofd_producer.cc
@@ -360,7 +360,8 @@
   data_source.normalized_cmdlines = std::move(normalized_cmdlines);
   data_source.stop_timeout_ms = ds_config.stop_timeout_ms();
 
-  WriteFixedInternings(data_source.trace_writer.get());
+  InterningOutputTracker::WriteFixedInterningsPacket(
+      data_source.trace_writer.get());
   data_sources_.emplace(id, std::move(data_source));
   PERFETTO_DLOG("Set up data source.");
 
diff --git a/src/profiling/memory/heapprofd_producer.h b/src/profiling/memory/heapprofd_producer.h
index e441507..06c0ea9 100644
--- a/src/profiling/memory/heapprofd_producer.h
+++ b/src/profiling/memory/heapprofd_producer.h
@@ -33,6 +33,7 @@
 #include "perfetto/ext/tracing/core/tracing_service.h"
 #include "perfetto/tracing/core/data_source_config.h"
 
+#include "src/profiling/common/interning_output.h"
 #include "src/profiling/memory/bookkeeping.h"
 #include "src/profiling/memory/bookkeeping_dump.h"
 #include "src/profiling/memory/page_idle_checker.h"
@@ -200,7 +201,7 @@
     std::set<pid_t> rejected_pids;
     std::map<pid_t, ProcessState> process_states;
     std::vector<std::string> normalized_cmdlines;
-    DumpState::InternState intern_state;
+    InterningOutputTracker intern_state;
     bool shutting_down = false;
     bool started = false;
     uint32_t stop_timeout_ms;
diff --git a/src/profiling/perf/BUILD.gn b/src/profiling/perf/BUILD.gn
index 6ca1259..9b4e2d1 100644
--- a/src/profiling/perf/BUILD.gn
+++ b/src/profiling/perf/BUILD.gn
@@ -51,11 +51,13 @@
     "../../../protos/perfetto/config:cpp",
     "../../../protos/perfetto/config/profiling:zero",
     "../../../protos/perfetto/trace:zero",
+    "../../../protos/perfetto/trace/profiling:zero",
     "../../../src/base",
     "../../../src/base:unix_socket",
     "../../../src/tracing/ipc/producer",
     "../common:callstack_trie",
     "../common:interner",
+    "../common:interning_output",
     "../common:unwind_support",
   ]
   sources = [
diff --git a/src/profiling/perf/perf_producer.cc b/src/profiling/perf/perf_producer.cc
index 4ad89aa..f8e1804 100644
--- a/src/profiling/perf/perf_producer.cc
+++ b/src/profiling/perf/perf_producer.cc
@@ -36,6 +36,7 @@
 #include "src/profiling/perf/event_reader.h"
 
 #include "protos/perfetto/config/profiling/perf_event_config.pbzero.h"
+#include "protos/perfetto/trace/profiling/profile_packet.pbzero.h"
 #include "protos/perfetto/trace/trace_packet.pbzero.h"
 
 namespace perfetto {
@@ -97,12 +98,18 @@
   auto writer = endpoint_->CreateTraceWriter(buffer_id);
 
   // Construct the data source instance.
-  auto it_inserted = data_sources_.emplace(
+
+  std::map<DataSourceInstanceID, DataSource>::iterator ds_it;
+  bool inserted;
+  std::tie(ds_it, inserted) = data_sources_.emplace(
       std::piecewise_construct, std::forward_as_tuple(instance_id),
       std::forward_as_tuple(std::move(writer),
                             std::move(event_reader.value())));
+  PERFETTO_CHECK(inserted);
 
-  PERFETTO_CHECK(it_inserted.second);
+  // Write out a packet to initialize the incremental state for this sequence.
+  InterningOutputTracker::WriteFixedInterningsPacket(
+      ds_it->second.trace_writer.get());
 
   // Kick off periodic read task.
   auto weak_this = weak_factory_.GetWeakPtr();
@@ -450,20 +457,26 @@
   while (!queue.empty()) {
     BookkeepingEntry& entry = queue.front();
 
-    GlobalCallstackTrie::Node* node =
+    // intern callsite
+    GlobalCallstackTrie::Node* callstack_root =
         callstack_trie_.CreateCallsite(entry.frames);
-    std::vector<Interned<Frame>> extracted =
-        callstack_trie_.BuildCallstack(node);
+    uint64_t callstack_iid = callstack_root->id();
 
-    PERFETTO_DLOG("Extracted from interner:");
-    for (const auto& f : extracted) {
-      PERFETTO_DLOG("%u -> %s", static_cast<unsigned>(f.id()),
-                    f->function_name->c_str());
-    }
-
+    // start packet
     auto packet = ds.trace_writer->NewTracePacket();
     packet->set_timestamp(entry.timestamp);
 
+    // write new interning data (if any)
+    protos::pbzero::InternedData* interned_out = packet->set_interned_data();
+    ds.interning_output.WriteCallstack(callstack_root, &callstack_trie_,
+                                       interned_out);
+
+    // TODO(rsavitski): placeholder packet type. Trace processor will ingest
+    // this mostly fine, but the timestamp assumptions are completely different.
+    auto* streaming_packet = packet->set_streaming_profile_packet();
+    streaming_packet->add_callstack_iid(callstack_iid);
+    streaming_packet->add_timestamp_delta_us(0);
+
     queue.pop();
   }
 
diff --git a/src/profiling/perf/perf_producer.h b/src/profiling/perf/perf_producer.h
index 1f72ee2..f7b055c 100644
--- a/src/profiling/perf/perf_producer.h
+++ b/src/profiling/perf/perf_producer.h
@@ -35,6 +35,7 @@
 #include "perfetto/ext/tracing/core/trace_writer.h"
 #include "perfetto/ext/tracing/core/tracing_service.h"
 #include "src/profiling/common/callstack_trie.h"
+#include "src/profiling/common/interning_output.h"
 #include "src/profiling/common/unwind_support.h"
 #include "src/profiling/perf/event_config.h"
 #include "src/profiling/perf/event_reader.h"
@@ -110,6 +111,9 @@
                                      /*mem_fd=*/base::ScopedFile{}};
     };
     std::map<pid_t, ProcDescriptors> proc_fds;  // keyed by pid
+
+    // Tracks the incremental state for interned entries.
+    InterningOutputTracker interning_output;
   };
 
   // Entry in an unwinding queue. Either a sample that requires unwinding, or a
@@ -179,8 +183,9 @@
   // grows too large (at the moment purged only when no sources are active).
   // TODO(rsavitski): interning sequences are monotonic for the lifetime of the
   // daemon. Consider resetting them at safe points - possible when no sources
-  // are active, and tricky otherwise, as it'll require emitting incremental
-  // sequence invalidation packets on all relevant sequences.
+  // are active, and tricky otherwise. In the latter case, it'll require
+  // emitting incremental sequence invalidation packets on all relevant
+  // sequences.
   GlobalCallstackTrie callstack_trie_;
 
   std::map<DataSourceInstanceID, DataSource> data_sources_;