tracing_muxer_impl: Split producer and consumer backends
Instead of having the `consumer_enabled` boolean flag for each backend,
we can use two different interfaces: one for producer backends, one for
consumer backends.
This will allow future CLs to:
* Remove the SystemProducerOnlyBackend, which aborts in some cases.
* Initialize separately the consumer and the producer backends only when
needed.
Change-Id: I6f46be8b442dd98df4a0626739dff20c17c59260
diff --git a/include/perfetto/tracing/internal/basic_types.h b/include/perfetto/tracing/internal/basic_types.h
index 81f5353..48d46a7 100644
--- a/include/perfetto/tracing/internal/basic_types.h
+++ b/include/perfetto/tracing/internal/basic_types.h
@@ -27,7 +27,7 @@
// with the definition in tracing/core/basic_types.h
using BufferId = uint16_t;
-// This is a direct index in the TracingMuxer::backends_ vector.
+// This is a direct index in the TracingMuxer::producer_backends_ vector.
// Backends are only added and never removed.
using TracingBackendId = size_t;
diff --git a/include/perfetto/tracing/tracing_backend.h b/include/perfetto/tracing/tracing_backend.h
index 99b1e52..965d726 100644
--- a/include/perfetto/tracing/tracing_backend.h
+++ b/include/perfetto/tracing/tracing_backend.h
@@ -43,9 +43,10 @@
class Producer;
class ProducerEndpoint;
-class PERFETTO_EXPORT_COMPONENT TracingBackend {
+// Responsible for connecting to the producer.
+class PERFETTO_EXPORT_COMPONENT TracingProducerBackend {
public:
- virtual ~TracingBackend();
+ virtual ~TracingProducerBackend();
// Connects a Producer instance and obtains a ProducerEndpoint, which is
// essentially a 1:1 channel between one Producer and the Service.
@@ -77,6 +78,12 @@
virtual std::unique_ptr<ProducerEndpoint> ConnectProducer(
const ConnectProducerArgs&) = 0;
+};
+
+// Responsible for connecting to the consumer.
+class PERFETTO_EXPORT_COMPONENT TracingConsumerBackend {
+ public:
+ virtual ~TracingConsumerBackend();
// As above, for the Consumer-side.
struct ConnectConsumerArgs {
@@ -91,6 +98,12 @@
const ConnectConsumerArgs&) = 0;
};
+class PERFETTO_EXPORT_COMPONENT TracingBackend : public TracingProducerBackend,
+ public TracingConsumerBackend {
+ public:
+ ~TracingBackend() override;
+};
+
} // namespace perfetto
#endif // INCLUDE_PERFETTO_TRACING_TRACING_BACKEND_H_
diff --git a/src/tracing/internal/tracing_muxer_impl.cc b/src/tracing/internal/tracing_muxer_impl.cc
index 38cd239..83e8dae 100644
--- a/src/tracing/internal/tracing_muxer_impl.cc
+++ b/src/tracing/internal/tracing_muxer_impl.cc
@@ -353,12 +353,8 @@
// ----- Begin of TracingMuxerImpl::ConsumerImpl
TracingMuxerImpl::ConsumerImpl::ConsumerImpl(TracingMuxerImpl* muxer,
BackendType backend_type,
- TracingBackendId backend_id,
TracingSessionGlobalID session_id)
- : muxer_(muxer),
- backend_type_(backend_type),
- backend_id_(backend_id),
- session_id_(session_id) {}
+ : muxer_(muxer), backend_type_(backend_type), session_id_(session_id) {}
TracingMuxerImpl::ConsumerImpl::~ConsumerImpl() {
muxer_ = nullptr;
@@ -862,20 +858,20 @@
supports_multiple_data_source_instances_ =
args.supports_multiple_data_source_instances;
- auto add_backend = [this, &args](TracingBackend* backend, BackendType type) {
+ auto add_producer_backend = [this, &args](TracingProducerBackend* backend,
+ BackendType type) {
if (!backend) {
// We skip the log in release builds because the *_backend_fake.cc code
// has already an ELOG before returning a nullptr.
PERFETTO_DLOG("Backend creation failed, type %d", static_cast<int>(type));
return;
}
- TracingBackendId backend_id = backends_.size();
- backends_.emplace_back();
- RegisteredBackend& rb = backends_.back();
+ TracingBackendId backend_id = producer_backends_.size();
+ producer_backends_.emplace_back();
+ RegisteredProducerBackend& rb = producer_backends_.back();
rb.backend = backend;
rb.id = backend_id;
rb.type = type;
- rb.consumer_enabled = type != kSystemBackend || args.enable_system_consumer;
rb.producer.reset(new ProducerImpl(this, backend_id,
args.shmem_batch_commits_duration_ms));
rb.producer_conn_args.producer = rb.producer.get();
@@ -888,19 +884,36 @@
rb.producer->Initialize(rb.backend->ConnectProducer(rb.producer_conn_args));
};
+ auto add_consumer_backend = [this](TracingConsumerBackend* backend,
+ BackendType type) {
+ if (!backend) {
+ return;
+ }
+ consumer_backends_.emplace_back();
+ RegisteredConsumerBackend& rb = consumer_backends_.back();
+ rb.backend = backend;
+ rb.type = type;
+ };
+
if (args.backends & kSystemBackend) {
PERFETTO_CHECK(args.system_backend_factory_);
- add_backend(args.system_backend_factory_(), kSystemBackend);
+ auto* b = args.system_backend_factory_();
+ add_producer_backend(b, kSystemBackend);
+ if (args.enable_system_consumer)
+ add_consumer_backend(b, kSystemBackend);
}
if (args.backends & kInProcessBackend) {
PERFETTO_CHECK(args.in_process_backend_factory_);
- add_backend(args.in_process_backend_factory_(), kInProcessBackend);
+ auto* b = args.in_process_backend_factory_();
+ add_producer_backend(b, kInProcessBackend);
+ add_consumer_backend(b, kInProcessBackend);
}
if (args.backends & kCustomBackend) {
PERFETTO_CHECK(args.custom_backend);
- add_backend(args.custom_backend, kCustomBackend);
+ add_producer_backend(args.custom_backend, kCustomBackend);
+ add_consumer_backend(args.custom_backend, kCustomBackend);
}
if (args.backends & ~(kSystemBackend | kInProcessBackend | kCustomBackend)) {
@@ -910,8 +923,10 @@
// Fallback backend for consumer creation for an unsupported backend type.
// This backend simply fails any attempt to start a tracing session.
// NOTE: This backend instance has to be added last.
- add_backend(internal::TracingBackendFake::GetInstance(),
- BackendType::kUnspecifiedBackend);
+ add_producer_backend(internal::TracingBackendFake::GetInstance(),
+ BackendType::kUnspecifiedBackend);
+ add_consumer_backend(internal::TracingBackendFake::GetInstance(),
+ BackendType::kUnspecifiedBackend);
}
// Can be called from any thread (but not concurrently).
@@ -1019,7 +1034,7 @@
base::TimeMillis expire_time =
base::GetWallTimeMs() + base::TimeMillis(ttl_ms);
task_runner_->PostTask([this, triggers, expire_time] {
- for (RegisteredBackend& backend : backends_) {
+ for (RegisteredProducerBackend& backend : producer_backends_) {
if (backend.producer->connected_) {
backend.producer->service_->ActivateTriggers(triggers);
} else {
@@ -1171,7 +1186,7 @@
internal_state->muxer_id_for_testing = muxer_id_for_testing_;
if (startup_session_id) {
- RegisteredBackend& backend = backends_[backend_id];
+ RegisteredProducerBackend& backend = producer_backends_[backend_id];
uint16_t& last_reservation =
backend.producer->last_startup_target_buffer_reservation_;
if (last_reservation == std::numeric_limits<uint16_t>::max()) {
@@ -1257,7 +1272,7 @@
ds.internal_state->startup_target_buffer_reservation.load(
std::memory_order_relaxed);
if (startup_reservation) {
- RegisteredBackend& backend = backends_[backend_id];
+ RegisteredProducerBackend& backend = producer_backends_[backend_id];
TracingSessionGlobalID session_id = ds.internal_state->startup_session_id;
auto session_it = std::find_if(
backend.startup_sessions.begin(), backend.startup_sessions.end(),
@@ -1424,10 +1439,10 @@
TracingMuxer::generation_++;
- // |backends_| is append-only, Backend instances are always valid.
- PERFETTO_CHECK(backend_id < backends_.size());
- RegisteredBackend& backend = backends_[backend_id];
- ProducerImpl* producer = backends_[backend_id].producer.get();
+ // |producer_backends_| is append-only, Backend instances are always valid.
+ PERFETTO_CHECK(backend_id < producer_backends_.size());
+ RegisteredProducerBackend& backend = producer_backends_[backend_id];
+ ProducerImpl* producer = producer_backends_[backend_id].producer.get();
if (!producer)
return;
@@ -1517,9 +1532,9 @@
task_runner_->PostTask([this, &mutex, &cv, &countdown] {
{
std::unique_lock<std::mutex> countdown_lock(mutex);
- countdown = backends_.size();
+ countdown = producer_backends_.size();
}
- for (auto& backend : backends_) {
+ for (auto& backend : producer_backends_) {
auto* producer = backend.producer.get();
producer->service_->Sync([&mutex, &cv, &countdown] {
std::unique_lock<std::mutex> countdown_lock(mutex);
@@ -1539,7 +1554,7 @@
bool done = false;
bool all_producers_connected = true;
task_runner_->PostTask([this, &mutex, &cv, &done, &all_producers_connected] {
- for (auto& backend : backends_)
+ for (auto& backend : producer_backends_)
all_producers_connected &= backend.producer->connected_;
std::unique_lock<std::mutex> lock(mutex);
done = true;
@@ -1610,7 +1625,7 @@
void TracingMuxerImpl::UpdateDataSourceOnAllBackends(RegisteredDataSource& rds,
bool is_changed) {
PERFETTO_DCHECK_THREAD(thread_checker_);
- for (RegisteredBackend& backend : backends_) {
+ for (RegisteredProducerBackend& backend : producer_backends_) {
// We cannot call RegisterDataSource on the backend before it connects.
if (!backend.producer->connected_)
continue;
@@ -1754,7 +1769,7 @@
void TracingMuxerImpl::DestroyTracingSession(
TracingSessionGlobalID session_id) {
PERFETTO_DCHECK_THREAD(thread_checker_);
- for (RegisteredBackend& backend : backends_) {
+ for (RegisteredConsumerBackend& backend : consumer_backends_) {
// We need to find the consumer (if any) and call Disconnect as we destroy
// the tracing session. We can't call Disconnect() inside this for loop
// because in the in-process case this will end up to a synchronous call to
@@ -1847,7 +1862,7 @@
void TracingMuxerImpl::SetBatchCommitsDurationForTesting(
uint32_t batch_commits_duration_ms,
BackendType backend_type) {
- for (RegisteredBackend& backend : backends_) {
+ for (RegisteredProducerBackend& backend : producer_backends_) {
if (backend.producer && backend.producer->connected_ &&
backend.type == backend_type) {
backend.producer->service_->MaybeSharedMemoryArbiter()
@@ -1858,7 +1873,7 @@
bool TracingMuxerImpl::EnableDirectSMBPatchingForTesting(
BackendType backend_type) {
- for (RegisteredBackend& backend : backends_) {
+ for (RegisteredProducerBackend& backend : producer_backends_) {
if (backend.producer && backend.producer->connected_ &&
backend.type == backend_type &&
!backend.producer->service_->MaybeSharedMemoryArbiter()
@@ -1872,27 +1887,31 @@
TracingMuxerImpl::ConsumerImpl* TracingMuxerImpl::FindConsumer(
TracingSessionGlobalID session_id) {
PERFETTO_DCHECK_THREAD(thread_checker_);
- for (RegisteredBackend& backend : backends_) {
+ return FindConsumerAndBackend(session_id).first;
+}
+
+std::pair<TracingMuxerImpl::ConsumerImpl*,
+ TracingMuxerImpl::RegisteredConsumerBackend*>
+TracingMuxerImpl::FindConsumerAndBackend(TracingSessionGlobalID session_id) {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
+ for (RegisteredConsumerBackend& backend : consumer_backends_) {
for (auto& consumer : backend.consumers) {
if (consumer->session_id_ == session_id) {
- return consumer.get();
+ return {consumer.get(), &backend};
}
}
}
- return nullptr;
+ return {nullptr, nullptr};
}
void TracingMuxerImpl::InitializeConsumer(TracingSessionGlobalID session_id) {
PERFETTO_DCHECK_THREAD(thread_checker_);
- auto* consumer = FindConsumer(session_id);
- if (!consumer)
+ auto res = FindConsumerAndBackend(session_id);
+ if (!res.first || !res.second)
return;
-
- TracingBackendId backend_id = consumer->backend_id_;
- // |backends_| is append-only, Backend instances are always valid.
- PERFETTO_CHECK(backend_id < backends_.size());
- RegisteredBackend& backend = backends_[backend_id];
+ TracingMuxerImpl::ConsumerImpl* consumer = res.first;
+ RegisteredConsumerBackend& backend = *res.second;
TracingBackend::ConnectConsumerArgs conn_args;
conn_args.consumer = consumer;
@@ -1902,7 +1921,7 @@
void TracingMuxerImpl::OnConsumerDisconnected(ConsumerImpl* consumer) {
PERFETTO_DCHECK_THREAD(thread_checker_);
- for (RegisteredBackend& backend : backends_) {
+ for (RegisteredConsumerBackend& backend : consumer_backends_) {
auto pred = [consumer](const std::unique_ptr<ConsumerImpl>& con) {
return con.get() == consumer;
};
@@ -1918,7 +1937,7 @@
void TracingMuxerImpl::OnProducerDisconnected(ProducerImpl* producer) {
PERFETTO_DCHECK_THREAD(thread_checker_);
- for (RegisteredBackend& backend : backends_) {
+ for (RegisteredProducerBackend& backend : producer_backends_) {
if (backend.producer.get() != producer)
continue;
// Try reconnecting the disconnected producer. If the connection succeeds,
@@ -1961,7 +1980,7 @@
TracingBackendId backend_id,
DataSourceInstanceID instance_id) {
PERFETTO_DCHECK_THREAD(thread_checker_);
- auto& backend = backends_[backend_id];
+ auto& backend = producer_backends_[backend_id];
for (const auto& rds : data_sources_) {
DataSourceStaticState* static_state = rds.static_state;
for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
@@ -1997,7 +2016,8 @@
interceptor.tls_factory(static_state, data_source_instance_index),
interceptor.packet_callback, static_state, data_source_instance_index));
}
- ProducerImpl* producer = backends_[data_source->backend_id].producer.get();
+ ProducerImpl* producer =
+ producer_backends_[data_source->backend_id].producer.get();
// Atomically load the current service endpoint. We keep the pointer as a
// shared pointer on the stack to guard against it from being concurrently
// modified on the thread by ProducerImpl::Initialize() swapping in a
@@ -2048,26 +2068,20 @@
// Capturing |this| is fine because the TracingMuxer is a leaky singleton.
task_runner_->PostTask([this, requested_backend_type, session_id] {
- for (RegisteredBackend& backend : backends_) {
+ for (RegisteredConsumerBackend& backend : consumer_backends_) {
if (requested_backend_type && backend.type &&
backend.type != requested_backend_type) {
continue;
}
- if (!backend.consumer_enabled) {
- continue;
- }
-
- TracingBackendId backend_id = backend.id;
-
// Create the consumer now, even if we have to ask the embedder below, so
// that any other tasks executing after this one can find the consumer and
// change its pending attributes.
backend.consumers.emplace_back(
- new ConsumerImpl(this, backend.type, backend.id, session_id));
+ new ConsumerImpl(this, backend.type, session_id));
- // The last registered backend in |backends_| is the unsupported backend
- // without a valid type.
+ // The last registered backend in |consumer_backends_| is the unsupported
+ // backend without a valid type.
if (!backend.type) {
PERFETTO_ELOG(
"No tracing backend ready for type=%d, consumer will disconnect",
@@ -2083,10 +2097,11 @@
return;
}
+ BackendType type = backend.type;
TracingPolicy::ShouldAllowConsumerSessionArgs args;
args.backend_type = backend.type;
- args.result_callback = [this, backend_id, session_id](bool allow) {
- task_runner_->PostTask([this, backend_id, session_id, allow] {
+ args.result_callback = [this, type, session_id](bool allow) {
+ task_runner_->PostTask([this, type, session_id, allow] {
if (allow) {
InitializeConsumer(session_id);
return;
@@ -2095,7 +2110,7 @@
PERFETTO_ELOG(
"Consumer session for backend type type=%d forbidden, "
"consumer will disconnect",
- backends_[backend_id].type);
+ type);
auto* consumer = FindConsumer(session_id);
if (!consumer)
@@ -2131,15 +2146,15 @@
// Capturing |this| is fine because the TracingMuxer is a leaky singleton.
task_runner_->PostTask([this, config, opts, backend_type, session_id] {
- for (RegisteredBackend& backend : backends_) {
+ for (RegisteredProducerBackend& backend : producer_backends_) {
if (backend_type && backend.type && backend.type != backend_type) {
continue;
}
TracingBackendId backend_id = backend.id;
- // The last registered backend in |backends_| is the unsupported backend
- // without a valid type.
+ // The last registered backend in |producer_backends_| is the unsupported
+ // backend without a valid type.
if (!backend.type) {
PERFETTO_ELOG(
"No tracing backend initialized for type=%d, startup tracing "
@@ -2259,7 +2274,7 @@
BackendType backend_type) {
PERFETTO_DCHECK_THREAD(thread_checker_);
- for (RegisteredBackend& backend : backends_) {
+ for (RegisteredProducerBackend& backend : producer_backends_) {
if (backend_type != backend.type)
continue;
@@ -2370,21 +2385,24 @@
// Unregister all data sources so they don't interfere with any future
// tracing sessions.
for (RegisteredDataSource& rds : muxer->data_sources_) {
- for (RegisteredBackend& backend : muxer->backends_) {
+ for (RegisteredProducerBackend& backend : muxer->producer_backends_) {
if (!backend.producer->service_)
continue;
backend.producer->service_->UnregisterDataSource(rds.descriptor.name());
}
}
- for (auto& backend : muxer->backends_) {
+ for (auto& backend : muxer->consumer_backends_) {
// Check that no consumer session is currently active on any backend.
for (auto& consumer : backend.consumers)
PERFETTO_CHECK(!consumer->service_);
+ }
+ for (auto& backend : muxer->producer_backends_) {
backend.producer->muxer_ = nullptr;
backend.producer->DisposeConnection();
muxer->dead_backends_.push_back(std::move(backend));
}
- muxer->backends_.clear();
+ muxer->consumer_backends_.clear();
+ muxer->producer_backends_.clear();
muxer->interceptors_.clear();
for (auto& ds : muxer->data_sources_) {
@@ -2433,7 +2451,7 @@
// Check that no consumer session is currently active on any backend.
// Producers will be automatically disconnected as a part of deleting the
// muxer below.
- for (auto& backend : muxer->backends_) {
+ for (auto& backend : muxer->consumer_backends_) {
for (auto& consumer : backend.consumers) {
PERFETTO_CHECK(!consumer->service_);
}
diff --git a/src/tracing/internal/tracing_muxer_impl.h b/src/tracing/internal/tracing_muxer_impl.h
index 4d83ab9..5bec783 100644
--- a/src/tracing/internal/tracing_muxer_impl.h
+++ b/src/tracing/internal/tracing_muxer_impl.h
@@ -26,6 +26,7 @@
#include <list>
#include <map>
#include <memory>
+#include <utility>
#include <vector>
#include "perfetto/base/time.h"
@@ -272,10 +273,7 @@
// tracing sessions.
class ConsumerImpl : public Consumer {
public:
- ConsumerImpl(TracingMuxerImpl*,
- BackendType,
- TracingBackendId,
- TracingSessionGlobalID);
+ ConsumerImpl(TracingMuxerImpl*, BackendType, TracingSessionGlobalID);
~ConsumerImpl() override;
void Initialize(std::unique_ptr<ConsumerEndpoint> endpoint);
@@ -300,7 +298,6 @@
TracingMuxerImpl* muxer_;
BackendType const backend_type_;
- TracingBackendId const backend_id_;
TracingSessionGlobalID const session_id_;
bool connected_ = false;
@@ -425,22 +422,25 @@
std::function<void()> on_adopted;
};
- struct RegisteredBackend {
+ struct RegisteredProducerBackend {
// Backends are supposed to have static lifetime.
- TracingBackend* backend = nullptr;
+ TracingProducerBackend* backend = nullptr;
TracingBackendId id = 0;
BackendType type{};
TracingBackend::ConnectProducerArgs producer_conn_args;
std::unique_ptr<ProducerImpl> producer;
+ std::vector<RegisteredStartupSession> startup_sessions;
+ };
+
+ struct RegisteredConsumerBackend {
+ // Backends are supposed to have static lifetime.
+ TracingConsumerBackend* backend = nullptr;
+ BackendType type{};
// The calling code can request more than one concurrently active tracing
// session for the same backend. We need to create one consumer per session.
std::vector<std::unique_ptr<ConsumerImpl>> consumers;
-
- std::vector<RegisteredStartupSession> startup_sessions;
-
- bool consumer_enabled = true;
};
void UpdateDataSourceOnAllBackends(RegisteredDataSource& rds,
@@ -448,6 +448,8 @@
explicit TracingMuxerImpl(const TracingInitArgs&);
void Initialize(const TracingInitArgs& args);
ConsumerImpl* FindConsumer(TracingSessionGlobalID session_id);
+ std::pair<ConsumerImpl*, RegisteredConsumerBackend*> FindConsumerAndBackend(
+ TracingSessionGlobalID session_id);
void InitializeConsumer(TracingSessionGlobalID session_id);
void OnConsumerDisconnected(ConsumerImpl* consumer);
void OnProducerDisconnected(ProducerImpl* producer);
@@ -493,7 +495,8 @@
// WARNING: If you add new state here, be sure to update ResetForTesting.
std::unique_ptr<base::TaskRunner> task_runner_;
std::vector<RegisteredDataSource> data_sources_;
- std::vector<RegisteredBackend> backends_;
+ std::vector<RegisteredProducerBackend> producer_backends_;
+ std::vector<RegisteredConsumerBackend> consumer_backends_;
std::vector<RegisteredInterceptor> interceptors_;
TracingPolicy* policy_ = nullptr;
@@ -512,7 +515,7 @@
// After ResetForTesting() is called, holds tracing backends which needs to be
// kept alive until all inbound references have gone away. See
// SweepDeadBackends().
- std::list<RegisteredBackend> dead_backends_;
+ std::list<RegisteredProducerBackend> dead_backends_;
PERFETTO_THREAD_CHECKER(thread_checker_)
};
diff --git a/src/tracing/test/api_test_support.cc b/src/tracing/test/api_test_support.cc
index 064f259..9e7b4ee 100644
--- a/src/tracing/test/api_test_support.cc
+++ b/src/tracing/test/api_test_support.cc
@@ -168,15 +168,16 @@
// static
bool TracingMuxerImplInternalsForTest::DoesSystemBackendHaveSMB() {
- using RegisteredBackend = TracingMuxerImpl::RegisteredBackend;
+ using RegisteredProducerBackend = TracingMuxerImpl::RegisteredProducerBackend;
// Ideally we should be doing dynamic_cast and a DCHECK(muxer != nullptr);
auto* muxer =
reinterpret_cast<TracingMuxerImpl*>(TracingMuxerImpl::instance_);
- const auto& backends = muxer->backends_;
- const auto& backend = std::find_if(backends.begin(), backends.end(),
- [](const RegisteredBackend& r_backend) {
- return r_backend.type == kSystemBackend;
- });
+ const auto& backends = muxer->producer_backends_;
+ const auto& backend =
+ std::find_if(backends.begin(), backends.end(),
+ [](const RegisteredProducerBackend& r_backend) {
+ return r_backend.type == kSystemBackend;
+ });
if (backend == backends.end())
return false;
const auto& service = backend->producer->service_;
diff --git a/src/tracing/virtual_destructors.cc b/src/tracing/virtual_destructors.cc
index 9a555fe..19620dd 100644
--- a/src/tracing/virtual_destructors.cc
+++ b/src/tracing/virtual_destructors.cc
@@ -45,6 +45,8 @@
} // namespace internal
+TracingProducerBackend::~TracingProducerBackend() = default;
+TracingConsumerBackend::~TracingConsumerBackend() = default;
TracingBackend::~TracingBackend() = default;
} // namespace perfetto