Support multiple producer socket names
This change add the support of multiple producer socket names by having
multiple values in the environment variable PERFETTO_PRODUCER_SOCK_NAME,
separated using comma. Example:
PERFETTO_PRODUCER_SOCK_NAME=/run/perfetto/perfetto-producer.sock,\
/run/perfetto/perfetto-producer-2.sock traced
which launches traced that uses 2 producer endpoints.
Bug: 284258446
Change-Id: Ibda874916cfbf8da410a0412c69e04ded549a601
diff --git a/test/test_helper.cc b/test/test_helper.cc
index 49a9961..6054f3f 100644
--- a/test/test_helper.cc
+++ b/test/test_helper.cc
@@ -15,8 +15,10 @@
*/
#include "test/test_helper.h"
+#include <string>
#include "perfetto/base/compiler.h"
+#include "perfetto/ext/base/string_utils.h"
#include "perfetto/ext/tracing/core/trace_packet.h"
#include "perfetto/ext/tracing/ipc/default_socket.h"
#include "perfetto/tracing/core/tracing_service_state.h"
@@ -63,16 +65,33 @@
#endif
TestHelper::TestHelper(base::TestTaskRunner* task_runner, Mode mode)
+ : TestHelper(task_runner, mode, ProducerSocketForMode(mode)) {}
+
+TestHelper::TestHelper(base::TestTaskRunner* task_runner,
+ Mode mode,
+ const char* producer_socket)
: instance_num_(next_instance_num_++),
task_runner_(task_runner),
mode_(mode),
- producer_socket_(ProducerSocketForMode(mode)),
+ producer_socket_(producer_socket),
consumer_socket_(ConsumerSocketForMode(mode)),
- service_thread_(producer_socket_, consumer_socket_),
- fake_producer_thread_(producer_socket_,
- WrapTask(CreateCheckpoint("producer.connect")),
- WrapTask(CreateCheckpoint("producer.setup")),
- WrapTask(CreateCheckpoint("producer.enabled"))) {}
+ service_thread_(producer_socket_, consumer_socket_) {
+ auto producer_sockets = TokenizeProducerSockets(producer_socket_);
+ static constexpr const char* kDefaultFakeProducerName =
+ "android.perfetto.FakeProducer";
+ for (size_t i = 0; i < producer_sockets.size(); i++) {
+ auto cp_connect = "producer." + std::to_string(i) + ".connect";
+ auto cp_setup = "producer." + std::to_string(i) + ".setup";
+ auto cp_enabled = "producer." + std::to_string(i) + ".enabled";
+ std::string producer_name =
+ (!!i) ? +kDefaultFakeProducerName + std::string(".") + std::to_string(i)
+ : kDefaultFakeProducerName;
+ fake_producer_threads_.emplace_back(std::make_unique<FakeProducerThread>(
+ producer_sockets[i], WrapTask(CreateCheckpoint(cp_connect)),
+ WrapTask(CreateCheckpoint(cp_setup)),
+ WrapTask(CreateCheckpoint(cp_enabled)), producer_name));
+ }
+}
void TestHelper::OnConnect() {
std::move(on_connect_callback_)();
@@ -122,12 +141,12 @@
service_thread_.Start();
}
-FakeProducer* TestHelper::ConnectFakeProducer() {
- fake_producer_thread_.Connect();
+FakeProducer* TestHelper::ConnectFakeProducer(size_t idx) {
+ fake_producer_threads_[idx]->Connect();
// This will wait until the service has seen the RegisterDataSource() call
// (because of the Sync() in FakeProducer::OnConnect()).
- RunUntilCheckpoint("producer.connect");
- return fake_producer_thread_.producer();
+ RunUntilCheckpoint("producer." + std::to_string(idx) + ".connect");
+ return fake_producer_threads_[idx]->producer();
}
void TestHelper::ConnectConsumer() {
@@ -157,19 +176,24 @@
}
void TestHelper::CreateProducerProvidedSmb() {
- fake_producer_thread_.CreateProducerProvidedSmb();
+ for (auto& thread : fake_producer_threads_)
+ thread->CreateProducerProvidedSmb();
}
-bool TestHelper::IsShmemProvidedByProducer() {
- return fake_producer_thread_.producer()->IsShmemProvidedByProducer();
+bool TestHelper::IsShmemProvidedByProducer(size_t i) {
+ return fake_producer_threads_[i]->producer()->IsShmemProvidedByProducer();
}
void TestHelper::ProduceStartupEventBatch(
const protos::gen::TestConfig& config) {
- auto on_data_written = CreateCheckpoint("startup_data_written");
- fake_producer_thread_.ProduceStartupEventBatch(config,
- WrapTask(on_data_written));
- RunUntilCheckpoint("startup_data_written");
+ for (size_t i = 0; i < fake_producer_threads_.size(); i++) {
+ auto checkpoint_name =
+ base::StackString<32>("startup_data_written.%zu", i).ToStdString();
+ auto on_data_written = CreateCheckpoint(checkpoint_name);
+ fake_producer_threads_[i]->ProduceStartupEventBatch(
+ config, WrapTask(on_data_written));
+ RunUntilCheckpoint(checkpoint_name);
+ }
}
void TestHelper::StartTracing(const TraceConfig& config,
@@ -207,12 +231,12 @@
RunUntilCheckpoint("consumer.connected." + std::to_string(cur_consumer_num_));
}
-void TestHelper::WaitForProducerSetup() {
- RunUntilCheckpoint("producer.setup");
+void TestHelper::WaitForProducerSetup(size_t idx) {
+ RunUntilCheckpoint("producer." + std::to_string(idx) + ".setup");
}
-void TestHelper::WaitForProducerEnabled() {
- RunUntilCheckpoint("producer.enabled");
+void TestHelper::WaitForProducerEnabled(size_t idx) {
+ RunUntilCheckpoint("producer." + std::to_string(idx) + ".enabled");
}
void TestHelper::WaitForTracingDisabled(uint32_t timeout_ms) {
@@ -248,11 +272,12 @@
WaitFor(predicate, "connection of data source " + ds_name);
}
-void TestHelper::SyncAndWaitProducer() {
+void TestHelper::SyncAndWaitProducer(size_t idx) {
static int sync_id = 0;
- std::string checkpoint_name = "producer_sync_" + std::to_string(++sync_id);
+ std::string checkpoint_name =
+ "producer_sync_" + std::to_string(idx) + "_" + std::to_string(++sync_id);
auto checkpoint = CreateCheckpoint(checkpoint_name);
- fake_producer_thread_.producer()->Sync(
+ fake_producer_threads_[idx]->producer()->Sync(
[this, &checkpoint] { task_runner_->PostTask(checkpoint); });
RunUntilCheckpoint(checkpoint_name);
}