Support multiple producer socket names

This change add the support of multiple producer socket names by having
multiple values in the environment variable PERFETTO_PRODUCER_SOCK_NAME,
separated using comma. Example:

PERFETTO_PRODUCER_SOCK_NAME=/run/perfetto/perfetto-producer.sock,\
/run/perfetto/perfetto-producer-2.sock traced

which launches traced that uses 2 producer endpoints.

Bug: 284258446
Change-Id: Ibda874916cfbf8da410a0412c69e04ded549a601
diff --git a/test/test_helper.cc b/test/test_helper.cc
index 49a9961..6054f3f 100644
--- a/test/test_helper.cc
+++ b/test/test_helper.cc
@@ -15,8 +15,10 @@
  */
 
 #include "test/test_helper.h"
+#include <string>
 
 #include "perfetto/base/compiler.h"
+#include "perfetto/ext/base/string_utils.h"
 #include "perfetto/ext/tracing/core/trace_packet.h"
 #include "perfetto/ext/tracing/ipc/default_socket.h"
 #include "perfetto/tracing/core/tracing_service_state.h"
@@ -63,16 +65,33 @@
 #endif
 
 TestHelper::TestHelper(base::TestTaskRunner* task_runner, Mode mode)
+    : TestHelper(task_runner, mode, ProducerSocketForMode(mode)) {}
+
+TestHelper::TestHelper(base::TestTaskRunner* task_runner,
+                       Mode mode,
+                       const char* producer_socket)
     : instance_num_(next_instance_num_++),
       task_runner_(task_runner),
       mode_(mode),
-      producer_socket_(ProducerSocketForMode(mode)),
+      producer_socket_(producer_socket),
       consumer_socket_(ConsumerSocketForMode(mode)),
-      service_thread_(producer_socket_, consumer_socket_),
-      fake_producer_thread_(producer_socket_,
-                            WrapTask(CreateCheckpoint("producer.connect")),
-                            WrapTask(CreateCheckpoint("producer.setup")),
-                            WrapTask(CreateCheckpoint("producer.enabled"))) {}
+      service_thread_(producer_socket_, consumer_socket_) {
+  auto producer_sockets = TokenizeProducerSockets(producer_socket_);
+  static constexpr const char* kDefaultFakeProducerName =
+      "android.perfetto.FakeProducer";
+  for (size_t i = 0; i < producer_sockets.size(); i++) {
+    auto cp_connect = "producer." + std::to_string(i) + ".connect";
+    auto cp_setup = "producer." + std::to_string(i) + ".setup";
+    auto cp_enabled = "producer." + std::to_string(i) + ".enabled";
+    std::string producer_name =
+        (!!i) ? +kDefaultFakeProducerName + std::string(".") + std::to_string(i)
+              : kDefaultFakeProducerName;
+    fake_producer_threads_.emplace_back(std::make_unique<FakeProducerThread>(
+        producer_sockets[i], WrapTask(CreateCheckpoint(cp_connect)),
+        WrapTask(CreateCheckpoint(cp_setup)),
+        WrapTask(CreateCheckpoint(cp_enabled)), producer_name));
+  }
+}
 
 void TestHelper::OnConnect() {
   std::move(on_connect_callback_)();
@@ -122,12 +141,12 @@
   service_thread_.Start();
 }
 
-FakeProducer* TestHelper::ConnectFakeProducer() {
-  fake_producer_thread_.Connect();
+FakeProducer* TestHelper::ConnectFakeProducer(size_t idx) {
+  fake_producer_threads_[idx]->Connect();
   // This will wait until the service has seen the RegisterDataSource() call
   // (because of the Sync() in FakeProducer::OnConnect()).
-  RunUntilCheckpoint("producer.connect");
-  return fake_producer_thread_.producer();
+  RunUntilCheckpoint("producer." + std::to_string(idx) + ".connect");
+  return fake_producer_threads_[idx]->producer();
 }
 
 void TestHelper::ConnectConsumer() {
@@ -157,19 +176,24 @@
 }
 
 void TestHelper::CreateProducerProvidedSmb() {
-  fake_producer_thread_.CreateProducerProvidedSmb();
+  for (auto& thread : fake_producer_threads_)
+    thread->CreateProducerProvidedSmb();
 }
 
-bool TestHelper::IsShmemProvidedByProducer() {
-  return fake_producer_thread_.producer()->IsShmemProvidedByProducer();
+bool TestHelper::IsShmemProvidedByProducer(size_t i) {
+  return fake_producer_threads_[i]->producer()->IsShmemProvidedByProducer();
 }
 
 void TestHelper::ProduceStartupEventBatch(
     const protos::gen::TestConfig& config) {
-  auto on_data_written = CreateCheckpoint("startup_data_written");
-  fake_producer_thread_.ProduceStartupEventBatch(config,
-                                                 WrapTask(on_data_written));
-  RunUntilCheckpoint("startup_data_written");
+  for (size_t i = 0; i < fake_producer_threads_.size(); i++) {
+    auto checkpoint_name =
+        base::StackString<32>("startup_data_written.%zu", i).ToStdString();
+    auto on_data_written = CreateCheckpoint(checkpoint_name);
+    fake_producer_threads_[i]->ProduceStartupEventBatch(
+        config, WrapTask(on_data_written));
+    RunUntilCheckpoint(checkpoint_name);
+  }
 }
 
 void TestHelper::StartTracing(const TraceConfig& config,
@@ -207,12 +231,12 @@
   RunUntilCheckpoint("consumer.connected." + std::to_string(cur_consumer_num_));
 }
 
-void TestHelper::WaitForProducerSetup() {
-  RunUntilCheckpoint("producer.setup");
+void TestHelper::WaitForProducerSetup(size_t idx) {
+  RunUntilCheckpoint("producer." + std::to_string(idx) + ".setup");
 }
 
-void TestHelper::WaitForProducerEnabled() {
-  RunUntilCheckpoint("producer.enabled");
+void TestHelper::WaitForProducerEnabled(size_t idx) {
+  RunUntilCheckpoint("producer." + std::to_string(idx) + ".enabled");
 }
 
 void TestHelper::WaitForTracingDisabled(uint32_t timeout_ms) {
@@ -248,11 +272,12 @@
   WaitFor(predicate, "connection of data source " + ds_name);
 }
 
-void TestHelper::SyncAndWaitProducer() {
+void TestHelper::SyncAndWaitProducer(size_t idx) {
   static int sync_id = 0;
-  std::string checkpoint_name = "producer_sync_" + std::to_string(++sync_id);
+  std::string checkpoint_name =
+      "producer_sync_" + std::to_string(idx) + "_" + std::to_string(++sync_id);
   auto checkpoint = CreateCheckpoint(checkpoint_name);
-  fake_producer_thread_.producer()->Sync(
+  fake_producer_threads_[idx]->producer()->Sync(
       [this, &checkpoint] { task_runner_->PostTask(checkpoint); });
   RunUntilCheckpoint(checkpoint_name);
 }