perfetto: add saturated and fixed rate consumer benchmarks

Bug: 74380167
Change-Id: I73345df857c9efd899da272a1f570ba3396080b4
diff --git a/test/end_to_end_benchmark.cc b/test/end_to_end_benchmark.cc
index 100674e..f6f3d3d 100644
--- a/test/end_to_end_benchmark.cc
+++ b/test/end_to_end_benchmark.cc
@@ -36,7 +36,7 @@
   return getenv("BENCHMARK_FUNCTIONAL_TEST_ONLY") != nullptr;
 }
 
-void BenchmarkCommon(benchmark::State& state) {
+void BenchmarkProducer(benchmark::State& state) {
   base::TestTaskRunner task_runner;
 
   TestHelper helper(&task_runner);
@@ -87,10 +87,10 @@
   uint64_t wall_ns =
       static_cast<uint64_t>(base::GetWallTimeNs().count()) - wall_start_ns;
 
-  state.counters["Pro CPU"] = benchmark::Counter(100.0 * producer_ns / wall_ns);
   state.counters["Ser CPU"] = benchmark::Counter(100.0 * service_ns / wall_ns);
   state.counters["Ser ns/m"] =
       benchmark::Counter(1.0 * service_ns / message_count);
+  state.counters["Pro CPU"] = benchmark::Counter(100.0 * producer_ns / wall_ns);
   state.SetBytesProcessed(iterations * message_bytes * message_count);
 
   // Read back the buffer just to check correctness.
@@ -110,7 +110,104 @@
   }
 }
 
-void SaturateCpuArgs(benchmark::internal::Benchmark* b) {
+static void BenchmarkConsumer(benchmark::State& state) {
+  base::TestTaskRunner task_runner;
+
+  TestHelper helper(&task_runner);
+  helper.StartServiceIfRequired();
+
+  FakeProducer* producer = helper.ConnectFakeProducer();
+  helper.ConnectConsumer();
+  helper.WaitForConsumerConnect();
+
+  TraceConfig trace_config;
+
+  static const uint32_t kBufferSizeBytes =
+      IsBenchmarkFunctionalOnly() ? 16 * 1024 : 2 * 1024 * 1024;
+  trace_config.add_buffers()->set_size_kb(kBufferSizeBytes / 1024);
+
+  static constexpr uint32_t kRandomSeed = 42;
+  uint32_t message_bytes = static_cast<uint32_t>(state.range(0));
+  uint32_t mb_per_s = static_cast<uint32_t>(state.range(1));
+  bool is_saturated_producer = mb_per_s == 0;
+
+  uint32_t message_count = kBufferSizeBytes / message_bytes;
+  uint32_t messages_per_s = mb_per_s * 1024 * 1024 / message_bytes;
+  uint32_t number_of_batches =
+      is_saturated_producer ? 0 : std::max(1u, message_count / messages_per_s);
+
+  auto* ds_config = trace_config.add_data_sources()->mutable_config();
+  ds_config->set_name("android.perfetto.FakeProducer");
+  ds_config->set_target_buffer(0);
+  ds_config->mutable_for_testing()->set_seed(kRandomSeed);
+  ds_config->mutable_for_testing()->set_message_count(message_count);
+  ds_config->mutable_for_testing()->set_message_size(message_bytes);
+  ds_config->mutable_for_testing()->set_max_messages_per_second(messages_per_s);
+
+  helper.StartTracing(trace_config);
+  helper.WaitForProducerEnabled();
+
+  uint64_t wall_start_ns = static_cast<uint64_t>(base::GetWallTimeNs().count());
+  uint64_t service_start_ns =
+      static_cast<uint64_t>(helper.service_thread()->GetThreadCPUTimeNs());
+  uint64_t consumer_start_ns =
+      static_cast<uint64_t>(base::GetThreadCPUTimeNs().count());
+  uint64_t read_time_taken_ns = 0;
+
+  uint64_t iterations = 0;
+  uint32_t counter = 0;
+  for (auto _ : state) {
+    auto cname = "produced.and.committed." + std::to_string(iterations++);
+    auto on_produced_and_committed = task_runner.CreateCheckpoint(cname);
+    producer->ProduceEventBatch(helper.WrapTask(on_produced_and_committed));
+
+    if (is_saturated_producer) {
+      // If the producer is running in saturated mode, wait until it flushes
+      // data.
+      task_runner.RunUntilCheckpoint(cname);
+
+      // Then time how long it takes to read back the data.
+      int64_t start = base::GetWallTimeNs().count();
+      helper.ReadData(counter);
+      helper.WaitForReadData(counter++);
+      read_time_taken_ns +=
+          static_cast<uint64_t>(base::GetWallTimeNs().count() - start);
+    } else {
+      // If the producer is not running in saturated mode, every second the
+      // producer will send a batch of data over. Wait for a second before
+      // performing readback; do this for each batch the producer sends.
+      for (uint32_t i = 0; i < number_of_batches; i++) {
+        auto batch_cname = "batch.checkpoint." + std::to_string(counter);
+        auto batch_checkpoint = task_runner.CreateCheckpoint(batch_cname);
+        task_runner.PostDelayedTask(batch_checkpoint, 1000);
+        task_runner.RunUntilCheckpoint(batch_cname);
+
+        int64_t start = base::GetWallTimeNs().count();
+        helper.ReadData(counter);
+        helper.WaitForReadData(counter++);
+        read_time_taken_ns +=
+            static_cast<uint64_t>(base::GetWallTimeNs().count() - start);
+      }
+    }
+  }
+  uint64_t service_ns =
+      helper.service_thread()->GetThreadCPUTimeNs() - service_start_ns;
+  uint64_t consumer_ns =
+      static_cast<uint64_t>(base::GetThreadCPUTimeNs().count()) -
+      consumer_start_ns;
+  uint64_t wall_ns =
+      static_cast<uint64_t>(base::GetWallTimeNs().count()) - wall_start_ns;
+
+  state.counters["Ser CPU"] = benchmark::Counter(100.0 * service_ns / wall_ns);
+  state.counters["Ser ns/m"] =
+      benchmark::Counter(1.0 * service_ns / message_count);
+  state.counters["Con CPU"] = benchmark::Counter(100.0 * consumer_ns / wall_ns);
+  state.counters["Con Speed"] =
+      benchmark::Counter(iterations * 1000.0 * 1000 * 1000 * kBufferSizeBytes /
+                         read_time_taken_ns);
+}
+
+void SaturateCpuProducerArgs(benchmark::internal::Benchmark* b) {
   int min_message_count = 16;
   int max_message_count = IsBenchmarkFunctionalOnly() ? 1024 : 1024 * 1024;
   int min_payload = 8;
@@ -122,7 +219,7 @@
   }
 }
 
-void ConstantRateArgs(benchmark::internal::Benchmark* b) {
+void ConstantRateProducerArgs(benchmark::internal::Benchmark* b) {
   int message_count = IsBenchmarkFunctionalOnly() ? 2 * 1024 : 128 * 1024;
   int min_speed = IsBenchmarkFunctionalOnly() ? 64 : 8;
   int max_speed = IsBenchmarkFunctionalOnly() ? 128 : 128;
@@ -131,23 +228,60 @@
     b->Args({message_count, 256, speed});
   }
 }
+
+void SaturateCpuConsumerArgs(benchmark::internal::Benchmark* b) {
+  int min_payload = 8;
+  int max_payload = IsBenchmarkFunctionalOnly() ? 16 : 64 * 1024;
+  for (int bytes = min_payload; bytes <= max_payload; bytes *= 2) {
+    b->Args({bytes, 0 /* speed */});
+  }
+}
+
+void ConstantRateConsumerArgs(benchmark::internal::Benchmark* b) {
+  int min_speed = IsBenchmarkFunctionalOnly() ? 128 : 1;
+  int max_speed = IsBenchmarkFunctionalOnly() ? 128 : 2;
+  for (int speed = min_speed; speed <= max_speed; speed *= 2) {
+    b->Args({2, speed});
+    b->Args({4, speed});
+  }
+}
+
 }  // namespace
 
-static void BM_EndToEnd_SaturateCpu(benchmark::State& state) {
-  BenchmarkCommon(state);
+static void BM_EndToEnd_Producer_SaturateCpu(benchmark::State& state) {
+  BenchmarkProducer(state);
 }
 
-BENCHMARK(BM_EndToEnd_SaturateCpu)
+BENCHMARK(BM_EndToEnd_Producer_SaturateCpu)
     ->Unit(benchmark::kMicrosecond)
     ->UseRealTime()
-    ->Apply(SaturateCpuArgs);
+    ->Apply(SaturateCpuProducerArgs);
 
-static void BM_EndToEnd_ConstantRate(benchmark::State& state) {
-  BenchmarkCommon(state);
+static void BM_EndToEnd_Producer_ConstantRate(benchmark::State& state) {
+  BenchmarkProducer(state);
 }
 
-BENCHMARK(BM_EndToEnd_ConstantRate)
+BENCHMARK(BM_EndToEnd_Producer_ConstantRate)
     ->Unit(benchmark::kMicrosecond)
     ->UseRealTime()
-    ->Apply(ConstantRateArgs);
+    ->Apply(ConstantRateProducerArgs);
+
+static void BM_EndToEnd_Consumer_SaturateCpu(benchmark::State& state) {
+  BenchmarkConsumer(state);
+}
+
+BENCHMARK(BM_EndToEnd_Consumer_SaturateCpu)
+    ->Unit(benchmark::kMicrosecond)
+    ->UseRealTime()
+    ->Apply(SaturateCpuConsumerArgs);
+
+static void BM_EndToEnd_Consumer_ConstantRate(benchmark::State& state) {
+  BenchmarkConsumer(state);
+}
+
+BENCHMARK(BM_EndToEnd_Consumer_ConstantRate)
+    ->Unit(benchmark::kMillisecond)
+    ->UseRealTime()
+    ->Apply(ConstantRateConsumerArgs);
+
 }  // namespace perfetto