Aggregate samples when generating profiles

This can potentially make profiles much smaller.

Change-Id: I810d3b5eabbed941e288ab6506244895c4a31af9
diff --git a/src/trace_processor/prelude/functions/pprof_functions.cc b/src/trace_processor/prelude/functions/pprof_functions.cc
index 23b2f23..98df2d1 100644
--- a/src/trace_processor/prelude/functions/pprof_functions.cc
+++ b/src/trace_processor/prelude/functions/pprof_functions.cc
@@ -81,7 +81,7 @@
       return sqlite_utils::ToInvalidArgumentError(
           "stack", 0, base::ErrStatus("failed to deserialize Stack proto"));
     }
-    if (!builder_.AddSample(stack, sample_value_)) {
+    if (!builder_.AddSample(stack, sample_values_)) {
       return base::ErrStatus("Failed to add callstack");
     }
     return util::OkStatus();
@@ -129,29 +129,30 @@
   AggregateContext(TraceProcessorContext* tp_context,
                    const std::vector<GProfileBuilder::ValueType>& sample_types)
       : builder_(tp_context, sample_types) {
-    sample_value_.Append(1);
+    sample_values_.resize(sample_types.size(), 1);
   }
 
   base::Status UpdateSampleValue(size_t argc, sqlite3_value** argv) {
     if (argc == 1) {
+      PERFETTO_CHECK(sample_values_.size() == 1);
       return base::OkStatus();
     }
 
-    sample_value_.Reset();
-    for (size_t i = 3; i < argc; i += 3) {
+    PERFETTO_CHECK(argc == 1 + (sample_values_.size() * 3));
+    for (size_t i = 0; i < sample_values_.size(); ++i) {
       base::StatusOr<SqlValue> value = sqlite_utils::ExtractArgument(
-          argc, argv, "sample_value", i, SqlValue::kLong);
+          argc, argv, "sample_value", 3 + i * 3, SqlValue::kLong);
       if (!value.ok()) {
         return value.status();
       }
-      sample_value_.Append(value->AsLong());
+      sample_values_[i] = value->AsLong();
     }
 
     return base::OkStatus();
   }
 
   GProfileBuilder builder_;
-  protozero::PackedVarInt sample_value_;
+  std::vector<int64_t> sample_values_;
 };
 
 static base::Status Step(sqlite3_context* ctx,
diff --git a/src/trace_processor/util/profile_builder.cc b/src/trace_processor/util/profile_builder.cc
index 8856825..951472b 100644
--- a/src/trace_processor/util/profile_builder.cc
+++ b/src/trace_processor/util/profile_builder.cc
@@ -27,6 +27,7 @@
 #include "perfetto/ext/base/string_utils.h"
 #include "perfetto/ext/base/string_view.h"
 #include "perfetto/ext/trace_processor/demangle.h"
+#include "protos/third_party/pprof/profile.pbzero.h"
 #include "src/trace_processor/containers/null_term_string_view.h"
 #include "src/trace_processor/containers/string_pool.h"
 #include "src/trace_processor/storage/trace_storage.h"
@@ -38,6 +39,8 @@
 namespace {
 
 using protos::pbzero::Stack;
+using third_party::perftools::profiles::pbzero::Profile;
+using third_party::perftools::profiles::pbzero::Sample;
 
 base::StringView ToString(CallsiteAnnotation annotation) {
   switch (annotation) {
@@ -195,13 +198,47 @@
   return score;
 }
 
+bool GProfileBuilder::SampleAggregator::AddSample(
+    const protozero::PackedVarInt& location_ids,
+    const std::vector<int64_t>& values) {
+  SerializedLocationId key(location_ids.data(),
+                           location_ids.data() + location_ids.size());
+  std::vector<int64_t>* agg_values = samples_.Find(key);
+  if (!agg_values) {
+    samples_.Insert(std::move(key), values);
+    return true;
+  }
+  // All samples must have the same number of values.
+  if (values.size() != agg_values->size()) {
+    return false;
+  }
+  std::transform(values.begin(), values.end(), agg_values->begin(),
+                 agg_values->begin(), std::plus<int64_t>());
+  return true;
+}
+
+void GProfileBuilder::SampleAggregator::WriteTo(Profile& profile) {
+  protozero::PackedVarInt values;
+  for (auto it = samples_.GetIterator(); it; ++it) {
+    values.Reset();
+    for (int64_t value : it.value()) {
+      values.Append(value);
+    }
+    Sample* sample = profile.add_sample();
+    sample->set_value(values);
+    // Map key is the serialized varint. Just append the bytes.
+    sample->AppendBytes(Sample::kLocationIdFieldNumber, it.key().data(),
+                        it.key().size());
+  }
+}
+
 GProfileBuilder::GProfileBuilder(const TraceProcessorContext* context,
                                  const std::vector<ValueType>& sample_types)
     : context_(*context),
       string_table_(&result_, &context->storage->string_pool()),
       annotations_(context) {
-  // Make sure the empty function always gets id 0 which will be ignored when
-  // writing the proto file.
+  // Make sure the empty function always gets id 0 which will be ignored
+  // when writing the proto file.
   functions_.insert(
       {Function{kEmptyStringIndex, kEmptyStringIndex, kEmptyStringIndex},
        kNullFunctionId});
@@ -218,28 +255,16 @@
         string_table_.InternString(base::StringView(value_type.type));
     int64_t unit =
         string_table_.InternString(base::StringView(value_type.unit));
-    // Add message later, remember protozero does not allow you to interleave
-    // these write calls.
+    // Add message later, remember protozero does not allow you to
+    // interleave these write calls.
     auto* sample_type = result_->add_sample_type();
     sample_type->set_type(type);
     sample_type->set_unit(unit);
   }
 }
 
-bool GProfileBuilder::AddSample(const protozero::PackedVarInt& location_ids,
-                                const protozero::PackedVarInt& values) {
-  PERFETTO_CHECK(!finalized_);
-  if (location_ids.size() == 0) {
-    return false;
-  }
-  auto* sample = result_->add_sample();
-  sample->set_value(values);
-  sample->set_location_id(location_ids);
-  return true;
-}
-
 bool GProfileBuilder::AddSample(const Stack::Decoder& stack,
-                                const protozero::PackedVarInt& values) {
+                                const std::vector<int64_t>& values) {
   PERFETTO_CHECK(!finalized_);
 
   auto it = stack.entries();
@@ -256,7 +281,7 @@
       uint32_t callsite_id = entry.has_callsite_id()
                                  ? entry.callsite_id()
                                  : entry.annotated_callsite_id();
-      return AddSample(
+      return samples_.AddSample(
           GetLocationIdsForCallsite(CallsiteId(callsite_id), annotated),
           values);
     }
@@ -288,7 +313,7 @@
                                                 CallsiteAnnotation::kNone));
     }
   }
-  return AddSample(location_ids, values);
+  return samples_.AddSample(location_ids, values);
 }
 
 void GProfileBuilder::Finalize() {
@@ -298,6 +323,7 @@
   WriteMappings();
   WriteFunctions();
   WriteLocations();
+  samples_.WriteTo(*result_.get());
   finalized_ = true;
 }
 
@@ -599,8 +625,8 @@
 }
 
 void GProfileBuilder::WriteMappings() {
-  // The convention in pprof files is to write the mapping for the main binary
-  // first. So lets do just that.
+  // The convention in pprof files is to write the mapping for the main
+  // binary first. So lets do just that.
   std::optional<uint64_t> main_mapping_id = GuessMainBinary();
   if (main_mapping_id) {
     WriteMapping(*main_mapping_id);
diff --git a/src/trace_processor/util/profile_builder.h b/src/trace_processor/util/profile_builder.h
index cfd5406..7622578 100644
--- a/src/trace_processor/util/profile_builder.h
+++ b/src/trace_processor/util/profile_builder.h
@@ -19,6 +19,7 @@
 
 #include <optional>
 
+#include "perfetto/ext/base/flat_hash_map.h"
 #include "perfetto/ext/base/string_view.h"
 #include "perfetto/protozero/packed_repeated_fields.h"
 #include "perfetto/protozero/scattered_heap_buffer.h"
@@ -65,7 +66,7 @@
 
   // Returns false if the operation fails (e.g callsite_id was not found)
   bool AddSample(const protos::pbzero::Stack_Decoder& stack,
-                 const protozero::PackedVarInt& values);
+                 const std::vector<int64_t>& values);
 
   // Finalizes the profile and returns the serialized proto. Can be called
   // multiple times but after the first invocation `AddSample` calls will have
@@ -253,6 +254,32 @@
     }
   };
 
+  // Aggregates samples with the same location_ids (i.e. stack) by computing the
+  // sum of their values. This helps keep the generated profiles small as it
+  // potentially removes a lot of duplication from having multiple samples with
+  // the same stack.
+  class SampleAggregator {
+   public:
+    bool AddSample(const protozero::PackedVarInt& location_ids,
+                   const std::vector<int64_t>& values);
+
+    void WriteTo(third_party::perftools::profiles::pbzero::Profile& profile);
+
+   private:
+    // Key holds the serialized value of the Sample::location_id proto field
+    // (packed varint).
+    using SerializedLocationId = std::vector<uint8_t>;
+    struct Hasher {
+      size_t operator()(const SerializedLocationId& data) const {
+        base::Hasher hasher;
+        hasher.Update(reinterpret_cast<const char*>(data.data()), data.size());
+        return static_cast<size_t>(hasher.digest());
+      }
+    };
+    base::FlatHashMap<SerializedLocationId, std::vector<int64_t>, Hasher>
+        samples_;
+  };
+
   const protozero::PackedVarInt& GetLocationIdsForCallsite(
       const CallsiteId& callsite_id,
       bool annotated);
@@ -309,9 +336,6 @@
   // most likely main binary.
   std::optional<uint64_t> GuessMainBinary() const;
 
-  bool AddSample(const protozero::PackedVarInt& location_ids,
-                 const protozero::PackedVarInt& values);
-
   // Profile proto being serialized.
   protozero::HeapBuffered<third_party::perftools::profiles::pbzero::Profile>
       result_;
@@ -363,6 +387,7 @@
   std::unordered_map<Function, uint64_t, Function::Hash> functions_;
   // Staging area for Mappings. mapping_id - 1 = index in the vector.
   std::vector<Mapping> mappings_;
+  SampleAggregator samples_;
 };
 
 }  // namespace trace_processor
diff --git a/test/trace_processor/diff_tests/functions/tests.py b/test/trace_processor/diff_tests/functions/tests.py
index c5f7918..f92cb29 100644
--- a/test/trace_processor/diff_tests/functions/tests.py
+++ b/test/trace_processor/diff_tests/functions/tests.py
@@ -567,6 +567,47 @@
                   A (0x0)
             """))
 
+  def test_profile_aggregates_samples(self):
+    return DiffTestBlueprint(
+        trace=DataPath("perf_sample.pb"),
+        query="""
+        WITH samples(stack, value) AS (
+        VALUES
+          (CAT_STACKS("A", "B"), 4),
+          (CAT_STACKS("A", "B"), 8),
+          (CAT_STACKS("A", "B"), 15),
+          (CAT_STACKS("A", "C"), 16),
+          (CAT_STACKS("C", "B"), 23),
+          (CAT_STACKS("C", "B"), 42)
+        )
+        SELECT HEX(
+          EXPERIMENTAL_PROFILE(
+            stack, "type", "units", value))
+        FROM samples
+        """,
+        out=BinaryProto(
+            message_type="perfetto.third_party.perftools.profiles.Profile",
+            post_processing=PrintProfileProto,
+            contents="""
+            Sample:
+              Values: 16
+              Stack:
+                C (0x0)
+                A (0x0)
+
+            Sample:
+              Values: 27
+              Stack:
+                B (0x0)
+                A (0x0)
+
+            Sample:
+              Values: 65
+              Stack:
+                B (0x0)
+                C (0x0)
+            """))
+
   def test_annotated_callstack(self):
     return DiffTestBlueprint(
         trace=DataPath("perf_sample_annotations.pftrace"),