blob: 04832f1945ad88c392439ba0e3bf3e5a2b31c480 [file] [log] [blame]
/*
* Copyright (C) 2019 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "src/tracing/internal/tracing_muxer_impl.h"
#include <algorithm>
#include <atomic>
#include <mutex>
#include <optional>
#include <vector>
#include "perfetto/base/build_config.h"
#include "perfetto/base/logging.h"
#include "perfetto/base/task_runner.h"
#include "perfetto/base/time.h"
#include "perfetto/ext/base/hash.h"
#include "perfetto/ext/base/thread_checker.h"
#include "perfetto/ext/base/waitable_event.h"
#include "perfetto/ext/tracing/core/shared_memory_arbiter.h"
#include "perfetto/ext/tracing/core/trace_packet.h"
#include "perfetto/ext/tracing/core/trace_stats.h"
#include "perfetto/ext/tracing/core/trace_writer.h"
#include "perfetto/ext/tracing/core/tracing_service.h"
#include "perfetto/tracing/buffer_exhausted_policy.h"
#include "perfetto/tracing/core/data_source_config.h"
#include "perfetto/tracing/core/tracing_service_state.h"
#include "perfetto/tracing/data_source.h"
#include "perfetto/tracing/internal/data_source_internal.h"
#include "perfetto/tracing/internal/interceptor_trace_writer.h"
#include "perfetto/tracing/internal/tracing_backend_fake.h"
#include "perfetto/tracing/trace_writer_base.h"
#include "perfetto/tracing/tracing.h"
#include "perfetto/tracing/tracing_backend.h"
#include "src/tracing/core/null_trace_writer.h"
#include "src/tracing/internal/tracing_muxer_fake.h"
#include "protos/perfetto/config/interceptor_config.gen.h"
#if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
#include <io.h> // For dup()
#else
#include <unistd.h> // For dup()
#endif
namespace perfetto {
namespace internal {
namespace {
using RegisteredDataSource = TracingMuxerImpl::RegisteredDataSource;
// A task runner which prevents calls to DataSource::Trace() while an operation
// is in progress. Used to guard against unexpected re-entrancy where the
// user-provided task runner implementation tries to enter a trace point under
// the hood.
class NonReentrantTaskRunner : public base::TaskRunner {
public:
NonReentrantTaskRunner(TracingMuxer* muxer,
std::unique_ptr<base::TaskRunner> task_runner)
: muxer_(muxer), task_runner_(std::move(task_runner)) {}
// base::TaskRunner implementation.
void PostTask(std::function<void()> task) override {
CallWithGuard([&] { task_runner_->PostTask(std::move(task)); });
}
void PostDelayedTask(std::function<void()> task, uint32_t delay_ms) override {
CallWithGuard(
[&] { task_runner_->PostDelayedTask(std::move(task), delay_ms); });
}
void AddFileDescriptorWatch(base::PlatformHandle fd,
std::function<void()> callback) override {
CallWithGuard(
[&] { task_runner_->AddFileDescriptorWatch(fd, std::move(callback)); });
}
void RemoveFileDescriptorWatch(base::PlatformHandle fd) override {
CallWithGuard([&] { task_runner_->RemoveFileDescriptorWatch(fd); });
}
bool RunsTasksOnCurrentThread() const override {
bool result;
CallWithGuard([&] { result = task_runner_->RunsTasksOnCurrentThread(); });
return result;
}
private:
template <typename T>
void CallWithGuard(T lambda) const {
auto* root_tls = muxer_->GetOrCreateTracingTLS();
if (PERFETTO_UNLIKELY(root_tls->is_in_trace_point)) {
lambda();
return;
}
ScopedReentrancyAnnotator scoped_annotator(*root_tls);
lambda();
}
TracingMuxer* const muxer_;
std::unique_ptr<base::TaskRunner> task_runner_;
};
class StopArgsImpl : public DataSourceBase::StopArgs {
public:
std::function<void()> HandleStopAsynchronously() const override {
auto closure = std::move(async_stop_closure);
async_stop_closure = std::function<void()>();
return closure;
}
mutable std::function<void()> async_stop_closure;
};
class FlushArgsImpl : public DataSourceBase::FlushArgs {
public:
std::function<void()> HandleFlushAsynchronously() const override {
auto closure = std::move(async_flush_closure);
async_flush_closure = std::function<void()>();
return closure;
}
mutable std::function<void()> async_flush_closure;
};
// Holds an earlier TracingMuxerImpl instance after ResetForTesting() is called.
static TracingMuxerImpl* g_prev_instance{};
template <typename RegisteredBackend>
struct CompareBackendByType {
static int BackendTypePriority(BackendType type) {
switch (type) {
case kSystemBackend:
return 0;
case kInProcessBackend:
return 1;
case kCustomBackend:
return 2;
// The UnspecifiedBackend has the highest priority so that
// TracingBackendFake is the last one on the backend lists.
case kUnspecifiedBackend:
break;
}
return 3;
}
bool operator()(BackendType type, const RegisteredBackend& b) {
return BackendTypePriority(type) < BackendTypePriority(b.type);
}
};
} // namespace
// ----- Begin of TracingMuxerImpl::ProducerImpl
TracingMuxerImpl::ProducerImpl::ProducerImpl(
TracingMuxerImpl* muxer,
TracingBackendId backend_id,
uint32_t shmem_batch_commits_duration_ms,
bool shmem_direct_patching_enabled)
: muxer_(muxer),
backend_id_(backend_id),
shmem_batch_commits_duration_ms_(shmem_batch_commits_duration_ms),
shmem_direct_patching_enabled_(shmem_direct_patching_enabled) {}
TracingMuxerImpl::ProducerImpl::~ProducerImpl() {
muxer_ = nullptr;
}
void TracingMuxerImpl::ProducerImpl::Initialize(
std::unique_ptr<ProducerEndpoint> endpoint) {
PERFETTO_DCHECK_THREAD(thread_checker_);
PERFETTO_DCHECK(!connected_);
connection_id_.fetch_add(1, std::memory_order_relaxed);
is_producer_provided_smb_ = endpoint->shared_memory();
last_startup_target_buffer_reservation_ = 0;
// Adopt the endpoint into a shared pointer so that we can safely share it
// across threads that create trace writers. The custom deleter function
// ensures that the endpoint is always destroyed on the muxer's thread. (Note
// that |task_runner| is assumed to outlive tracing sessions on all threads.)
auto* task_runner = muxer_->task_runner_.get();
auto deleter = [task_runner](ProducerEndpoint* e) {
if (task_runner->RunsTasksOnCurrentThread()) {
delete e;
return;
}
task_runner->PostTask([e] { delete e; });
};
std::shared_ptr<ProducerEndpoint> service(endpoint.release(), deleter);
// This atomic store is needed because another thread might be concurrently
// creating a trace writer using the previous (disconnected) |service_|. See
// CreateTraceWriter().
std::atomic_store(&service_, std::move(service));
// Don't try to use the service here since it may not have connected yet. See
// OnConnect().
}
void TracingMuxerImpl::ProducerImpl::OnConnect() {
PERFETTO_DLOG("Producer connected");
PERFETTO_DCHECK_THREAD(thread_checker_);
PERFETTO_DCHECK(!connected_);
if (is_producer_provided_smb_ && !service_->IsShmemProvidedByProducer()) {
PERFETTO_ELOG(
"The service likely doesn't support producer-provided SMBs. Preventing "
"future attempts to use producer-provided SMB again with this "
"backend.");
producer_provided_smb_failed_ = true;
// Will call OnDisconnect() and cause a reconnect without producer-provided
// SMB.
service_->Disconnect();
return;
}
connected_ = true;
muxer_->UpdateDataSourcesOnAllBackends();
SendOnConnectTriggers();
}
void TracingMuxerImpl::ProducerImpl::OnDisconnect() {
PERFETTO_DCHECK_THREAD(thread_checker_);
// If we're being destroyed, bail out.
if (!muxer_)
return;
connected_ = false;
// Active data sources for this producer will be stopped by
// DestroyStoppedTraceWritersForCurrentThread() since the reconnected producer
// will have a different connection id (even before it has finished
// connecting).
registered_data_sources_.reset();
DisposeConnection();
// Try reconnecting the producer.
muxer_->OnProducerDisconnected(this);
}
void TracingMuxerImpl::ProducerImpl::DisposeConnection() {
// Keep the old service around as a dead connection in case it has active
// trace writers. If any tracing sessions were created, we can't clear
// |service_| here because other threads may be concurrently creating new
// trace writers. Any reconnection attempt will atomically swap the new
// service in place of the old one.
if (did_setup_tracing_ || did_setup_startup_tracing_) {
dead_services_.push_back(service_);
} else {
service_.reset();
}
}
void TracingMuxerImpl::ProducerImpl::OnTracingSetup() {
PERFETTO_DCHECK_THREAD(thread_checker_);
did_setup_tracing_ = true;
service_->MaybeSharedMemoryArbiter()->SetBatchCommitsDuration(
shmem_batch_commits_duration_ms_);
if (shmem_direct_patching_enabled_) {
service_->MaybeSharedMemoryArbiter()->EnableDirectSMBPatching();
}
}
void TracingMuxerImpl::ProducerImpl::OnStartupTracingSetup() {
PERFETTO_DCHECK_THREAD(thread_checker_);
did_setup_startup_tracing_ = true;
}
void TracingMuxerImpl::ProducerImpl::SetupDataSource(
DataSourceInstanceID id,
const DataSourceConfig& cfg) {
PERFETTO_DCHECK_THREAD(thread_checker_);
if (!muxer_)
return;
muxer_->SetupDataSource(
backend_id_, connection_id_.load(std::memory_order_relaxed), id, cfg);
}
void TracingMuxerImpl::ProducerImpl::StartDataSource(DataSourceInstanceID id,
const DataSourceConfig&) {
PERFETTO_DCHECK_THREAD(thread_checker_);
if (!muxer_)
return;
muxer_->StartDataSource(backend_id_, id);
service_->NotifyDataSourceStarted(id);
}
void TracingMuxerImpl::ProducerImpl::StopDataSource(DataSourceInstanceID id) {
PERFETTO_DCHECK_THREAD(thread_checker_);
if (!muxer_)
return;
muxer_->StopDataSource_AsyncBegin(backend_id_, id);
}
void TracingMuxerImpl::ProducerImpl::Flush(
FlushRequestID flush_id,
const DataSourceInstanceID* instances,
size_t instance_count,
FlushFlags flush_flags) {
PERFETTO_DCHECK_THREAD(thread_checker_);
bool all_handled = true;
if (muxer_) {
for (size_t i = 0; i < instance_count; i++) {
DataSourceInstanceID ds_id = instances[i];
bool handled = muxer_->FlushDataSource_AsyncBegin(backend_id_, ds_id,
flush_id, flush_flags);
if (!handled) {
pending_flushes_[flush_id].insert(ds_id);
all_handled = false;
}
}
}
if (all_handled) {
service_->NotifyFlushComplete(flush_id);
}
}
void TracingMuxerImpl::ProducerImpl::ClearIncrementalState(
const DataSourceInstanceID* instances,
size_t instance_count) {
PERFETTO_DCHECK_THREAD(thread_checker_);
if (!muxer_)
return;
for (size_t inst_idx = 0; inst_idx < instance_count; inst_idx++) {
muxer_->ClearDataSourceIncrementalState(backend_id_, instances[inst_idx]);
}
}
bool TracingMuxerImpl::ProducerImpl::SweepDeadServices() {
PERFETTO_DCHECK_THREAD(thread_checker_);
auto is_unused = [](const std::shared_ptr<ProducerEndpoint>& endpoint) {
auto* arbiter = endpoint->MaybeSharedMemoryArbiter();
return !arbiter || arbiter->TryShutdown();
};
for (auto it = dead_services_.begin(); it != dead_services_.end();) {
auto next_it = it;
next_it++;
if (is_unused(*it)) {
dead_services_.erase(it);
}
it = next_it;
}
return dead_services_.empty();
}
void TracingMuxerImpl::ProducerImpl::SendOnConnectTriggers() {
PERFETTO_DCHECK_THREAD(thread_checker_);
base::TimeMillis now = base::GetWallTimeMs();
std::vector<std::string> triggers;
while (!on_connect_triggers_.empty()) {
// Skip if we passed TTL.
if (on_connect_triggers_.front().second > now) {
triggers.push_back(std::move(on_connect_triggers_.front().first));
}
on_connect_triggers_.pop_front();
}
if (!triggers.empty()) {
service_->ActivateTriggers(triggers);
}
}
void TracingMuxerImpl::ProducerImpl::NotifyFlushForDataSourceDone(
DataSourceInstanceID ds_id,
FlushRequestID flush_id) {
if (!connected_) {
return;
}
{
auto it = pending_flushes_.find(flush_id);
if (it == pending_flushes_.end()) {
return;
}
std::set<DataSourceInstanceID>& ds_ids = it->second;
ds_ids.erase(ds_id);
}
std::optional<DataSourceInstanceID> biggest_flush_id;
for (auto it = pending_flushes_.begin(); it != pending_flushes_.end();) {
if (it->second.empty()) {
biggest_flush_id = it->first;
it = pending_flushes_.erase(it);
} else {
break;
}
}
if (biggest_flush_id) {
service_->NotifyFlushComplete(*biggest_flush_id);
}
}
// ----- End of TracingMuxerImpl::ProducerImpl methods.
// ----- Begin of TracingMuxerImpl::ConsumerImpl
TracingMuxerImpl::ConsumerImpl::ConsumerImpl(TracingMuxerImpl* muxer,
BackendType backend_type,
TracingSessionGlobalID session_id)
: muxer_(muxer), backend_type_(backend_type), session_id_(session_id) {}
TracingMuxerImpl::ConsumerImpl::~ConsumerImpl() {
muxer_ = nullptr;
}
void TracingMuxerImpl::ConsumerImpl::Initialize(
std::unique_ptr<ConsumerEndpoint> endpoint) {
PERFETTO_DCHECK_THREAD(thread_checker_);
service_ = std::move(endpoint);
// Don't try to use the service here since it may not have connected yet. See
// OnConnect().
}
void TracingMuxerImpl::ConsumerImpl::OnConnect() {
PERFETTO_DCHECK_THREAD(thread_checker_);
PERFETTO_DCHECK(!connected_);
connected_ = true;
// Observe data source instance events so we get notified when tracing starts.
service_->ObserveEvents(ObservableEvents::TYPE_DATA_SOURCES_INSTANCES |
ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED);
// If the API client configured and started tracing before we connected,
// tell the backend about it now.
if (trace_config_)
muxer_->SetupTracingSession(session_id_, trace_config_);
if (start_pending_)
muxer_->StartTracingSession(session_id_);
if (get_trace_stats_pending_) {
auto callback = std::move(get_trace_stats_callback_);
get_trace_stats_callback_ = nullptr;
muxer_->GetTraceStats(session_id_, std::move(callback));
}
if (query_service_state_callback_) {
auto callback = std::move(query_service_state_callback_);
query_service_state_callback_ = nullptr;
muxer_->QueryServiceState(session_id_, std::move(callback));
}
if (stop_pending_)
muxer_->StopTracingSession(session_id_);
}
void TracingMuxerImpl::ConsumerImpl::OnDisconnect() {
PERFETTO_DCHECK_THREAD(thread_checker_);
// If we're being destroyed, bail out.
if (!muxer_)
return;
#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
if (!connected_ && backend_type_ == kSystemBackend) {
PERFETTO_ELOG(
"Unable to connect to the system tracing service as a consumer. On "
"Android, use the \"perfetto\" command line tool instead to start "
"system-wide tracing sessions");
}
#endif // PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
// Notify the client about disconnection.
NotifyError(TracingError{TracingError::kDisconnected, "Peer disconnected"});
// Make sure the client doesn't hang in a blocking start/stop because of the
// disconnection.
NotifyStartComplete();
NotifyStopComplete();
// It shouldn't be necessary to call StopTracingSession. If we get this call
// it means that the service did shutdown before us, so there is no point
// trying it to ask it to stop the session. We should just remember to cleanup
// the consumer vector.
connected_ = false;
// Notify the muxer that it is safe to destroy |this|. This is needed because
// the ConsumerEndpoint stored in |service_| requires that |this| be safe to
// access until OnDisconnect() is called.
muxer_->OnConsumerDisconnected(this);
}
void TracingMuxerImpl::ConsumerImpl::Disconnect() {
// This is weird and deserves a comment.
//
// When we called the ConnectConsumer method on the service it returns
// us a ConsumerEndpoint which we stored in |service_|, however this
// ConsumerEndpoint holds a pointer to the ConsumerImpl pointed to by
// |this|. Part of the API contract to TracingService::ConnectConsumer is that
// the ConsumerImpl pointer has to be valid until the
// ConsumerImpl::OnDisconnect method is called. Therefore we reset the
// ConsumerEndpoint |service_|. Eventually this will call
// ConsumerImpl::OnDisconnect and we will inform the muxer it is safe to
// call the destructor of |this|.
service_.reset();
}
void TracingMuxerImpl::ConsumerImpl::OnTracingDisabled(
const std::string& error) {
PERFETTO_DCHECK_THREAD(thread_checker_);
PERFETTO_DCHECK(!stopped_);
stopped_ = true;
if (!error.empty())
NotifyError(TracingError{TracingError::kTracingFailed, error});
// If we're still waiting for the start event, fire it now. This may happen if
// there are no active data sources in the session.
NotifyStartComplete();
NotifyStopComplete();
}
void TracingMuxerImpl::ConsumerImpl::NotifyStartComplete() {
PERFETTO_DCHECK_THREAD(thread_checker_);
if (start_complete_callback_) {
muxer_->task_runner_->PostTask(std::move(start_complete_callback_));
start_complete_callback_ = nullptr;
}
if (blocking_start_complete_callback_) {
muxer_->task_runner_->PostTask(
std::move(blocking_start_complete_callback_));
blocking_start_complete_callback_ = nullptr;
}
}
void TracingMuxerImpl::ConsumerImpl::NotifyError(const TracingError& error) {
PERFETTO_DCHECK_THREAD(thread_checker_);
if (error_callback_) {
muxer_->task_runner_->PostTask(
std::bind(std::move(error_callback_), error));
}
}
void TracingMuxerImpl::ConsumerImpl::NotifyStopComplete() {
PERFETTO_DCHECK_THREAD(thread_checker_);
if (stop_complete_callback_) {
muxer_->task_runner_->PostTask(std::move(stop_complete_callback_));
stop_complete_callback_ = nullptr;
}
if (blocking_stop_complete_callback_) {
muxer_->task_runner_->PostTask(std::move(blocking_stop_complete_callback_));
blocking_stop_complete_callback_ = nullptr;
}
}
void TracingMuxerImpl::ConsumerImpl::OnTraceData(
std::vector<TracePacket> packets,
bool has_more) {
PERFETTO_DCHECK_THREAD(thread_checker_);
if (!read_trace_callback_)
return;
size_t capacity = 0;
for (const auto& packet : packets) {
// 16 is an over-estimation of the proto preamble size
capacity += packet.size() + 16;
}
// The shared_ptr is to avoid making a copy of the buffer when PostTask-ing.
std::shared_ptr<std::vector<char>> buf(new std::vector<char>());
buf->reserve(capacity);
for (auto& packet : packets) {
char* start;
size_t size;
std::tie(start, size) = packet.GetProtoPreamble();
buf->insert(buf->end(), start, start + size);
for (auto& slice : packet.slices()) {
const auto* slice_data = reinterpret_cast<const char*>(slice.start);
buf->insert(buf->end(), slice_data, slice_data + slice.size);
}
}
auto callback = read_trace_callback_;
muxer_->task_runner_->PostTask([callback, buf, has_more] {
TracingSession::ReadTraceCallbackArgs callback_arg{};
callback_arg.data = buf->empty() ? nullptr : &(*buf)[0];
callback_arg.size = buf->size();
callback_arg.has_more = has_more;
callback(callback_arg);
});
if (!has_more)
read_trace_callback_ = nullptr;
}
void TracingMuxerImpl::ConsumerImpl::OnObservableEvents(
const ObservableEvents& events) {
if (events.instance_state_changes_size()) {
for (const auto& state_change : events.instance_state_changes()) {
DataSourceHandle handle{state_change.producer_name(),
state_change.data_source_name()};
data_source_states_[handle] =
state_change.state() ==
ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STARTED;
}
}
if (events.instance_state_changes_size() ||
events.all_data_sources_started()) {
// Data sources are first reported as being stopped before starting, so once
// all the data sources we know about have started we can declare tracing
// begun. In the case where there are no matching data sources for the
// session, the service will report the all_data_sources_started() event
// without adding any instances (only since Android S / Perfetto v10.0).
if (start_complete_callback_ || blocking_start_complete_callback_) {
bool all_data_sources_started = std::all_of(
data_source_states_.cbegin(), data_source_states_.cend(),
[](std::pair<DataSourceHandle, bool> state) { return state.second; });
if (all_data_sources_started)
NotifyStartComplete();
}
}
}
void TracingMuxerImpl::ConsumerImpl::OnSessionCloned(
const OnSessionClonedArgs&) {
// CloneSession is not exposed in the SDK. This should never happen.
PERFETTO_DCHECK(false);
}
void TracingMuxerImpl::ConsumerImpl::OnTraceStats(
bool success,
const TraceStats& trace_stats) {
if (!get_trace_stats_callback_)
return;
TracingSession::GetTraceStatsCallbackArgs callback_arg{};
callback_arg.success = success;
callback_arg.trace_stats_data = trace_stats.SerializeAsArray();
muxer_->task_runner_->PostTask(
std::bind(std::move(get_trace_stats_callback_), std::move(callback_arg)));
get_trace_stats_callback_ = nullptr;
}
// The callbacks below are not used.
void TracingMuxerImpl::ConsumerImpl::OnDetach(bool) {}
void TracingMuxerImpl::ConsumerImpl::OnAttach(bool, const TraceConfig&) {}
// ----- End of TracingMuxerImpl::ConsumerImpl
// ----- Begin of TracingMuxerImpl::TracingSessionImpl
// TracingSessionImpl is the RAII object returned to API clients when they
// invoke Tracing::CreateTracingSession. They use it for starting/stopping
// tracing.
TracingMuxerImpl::TracingSessionImpl::TracingSessionImpl(
TracingMuxerImpl* muxer,
TracingSessionGlobalID session_id,
BackendType backend_type)
: muxer_(muxer), session_id_(session_id), backend_type_(backend_type) {}
// Can be destroyed from any thread.
TracingMuxerImpl::TracingSessionImpl::~TracingSessionImpl() {
auto* muxer = muxer_;
auto session_id = session_id_;
muxer->task_runner_->PostTask(
[muxer, session_id] { muxer->DestroyTracingSession(session_id); });
}
// Can be called from any thread.
void TracingMuxerImpl::TracingSessionImpl::Setup(const TraceConfig& cfg,
int fd) {
auto* muxer = muxer_;
auto session_id = session_id_;
std::shared_ptr<TraceConfig> trace_config(new TraceConfig(cfg));
if (fd >= 0) {
base::ignore_result(backend_type_); // For -Wunused in the amalgamation.
#if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
if (backend_type_ != kInProcessBackend) {
PERFETTO_FATAL(
"Passing a file descriptor to TracingSession::Setup() is only "
"supported with the kInProcessBackend on Windows. Use "
"TracingSession::ReadTrace() instead");
}
#endif
trace_config->set_write_into_file(true);
fd = dup(fd);
}
muxer->task_runner_->PostTask([muxer, session_id, trace_config, fd] {
muxer->SetupTracingSession(session_id, trace_config, base::ScopedFile(fd));
});
}
// Can be called from any thread.
void TracingMuxerImpl::TracingSessionImpl::Start() {
auto* muxer = muxer_;
auto session_id = session_id_;
muxer->task_runner_->PostTask(
[muxer, session_id] { muxer->StartTracingSession(session_id); });
}
// Can be called from any thread.
void TracingMuxerImpl::TracingSessionImpl::ChangeTraceConfig(
const TraceConfig& cfg) {
auto* muxer = muxer_;
auto session_id = session_id_;
muxer->task_runner_->PostTask([muxer, session_id, cfg] {
muxer->ChangeTracingSessionConfig(session_id, cfg);
});
}
// Can be called from any thread except the service thread.
void TracingMuxerImpl::TracingSessionImpl::StartBlocking() {
PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
auto* muxer = muxer_;
auto session_id = session_id_;
base::WaitableEvent tracing_started;
muxer->task_runner_->PostTask([muxer, session_id, &tracing_started] {
auto* consumer = muxer->FindConsumer(session_id);
if (!consumer) {
// TODO(skyostil): Signal an error to the user.
tracing_started.Notify();
return;
}
PERFETTO_DCHECK(!consumer->blocking_start_complete_callback_);
consumer->blocking_start_complete_callback_ = [&] {
tracing_started.Notify();
};
muxer->StartTracingSession(session_id);
});
tracing_started.Wait();
}
// Can be called from any thread.
void TracingMuxerImpl::TracingSessionImpl::Flush(
std::function<void(bool)> user_callback,
uint32_t timeout_ms) {
auto* muxer = muxer_;
auto session_id = session_id_;
muxer->task_runner_->PostTask([muxer, session_id, timeout_ms, user_callback] {
auto* consumer = muxer->FindConsumer(session_id);
if (!consumer) {
std::move(user_callback)(false);
return;
}
muxer->FlushTracingSession(session_id, timeout_ms,
std::move(user_callback));
});
}
// Can be called from any thread.
void TracingMuxerImpl::TracingSessionImpl::Stop() {
auto* muxer = muxer_;
auto session_id = session_id_;
muxer->task_runner_->PostTask(
[muxer, session_id] { muxer->StopTracingSession(session_id); });
}
// Can be called from any thread except the service thread.
void TracingMuxerImpl::TracingSessionImpl::StopBlocking() {
PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
auto* muxer = muxer_;
auto session_id = session_id_;
base::WaitableEvent tracing_stopped;
muxer->task_runner_->PostTask([muxer, session_id, &tracing_stopped] {
auto* consumer = muxer->FindConsumer(session_id);
if (!consumer) {
// TODO(skyostil): Signal an error to the user.
tracing_stopped.Notify();
return;
}
PERFETTO_DCHECK(!consumer->blocking_stop_complete_callback_);
consumer->blocking_stop_complete_callback_ = [&] {
tracing_stopped.Notify();
};
muxer->StopTracingSession(session_id);
});
tracing_stopped.Wait();
}
// Can be called from any thread.
void TracingMuxerImpl::TracingSessionImpl::ReadTrace(ReadTraceCallback cb) {
auto* muxer = muxer_;
auto session_id = session_id_;
muxer->task_runner_->PostTask([muxer, session_id, cb] {
muxer->ReadTracingSessionData(session_id, std::move(cb));
});
}
// Can be called from any thread.
void TracingMuxerImpl::TracingSessionImpl::SetOnStartCallback(
std::function<void()> cb) {
auto* muxer = muxer_;
auto session_id = session_id_;
muxer->task_runner_->PostTask([muxer, session_id, cb] {
auto* consumer = muxer->FindConsumer(session_id);
if (!consumer)
return;
consumer->start_complete_callback_ = cb;
});
}
// Can be called from any thread
void TracingMuxerImpl::TracingSessionImpl::SetOnErrorCallback(
std::function<void(TracingError)> cb) {
auto* muxer = muxer_;
auto session_id = session_id_;
muxer->task_runner_->PostTask([muxer, session_id, cb] {
auto* consumer = muxer->FindConsumer(session_id);
if (!consumer) {
// Notify the client about concurrent disconnection of the session.
if (cb)
cb(TracingError{TracingError::kDisconnected, "Peer disconnected"});
return;
}
consumer->error_callback_ = cb;
});
}
// Can be called from any thread.
void TracingMuxerImpl::TracingSessionImpl::SetOnStopCallback(
std::function<void()> cb) {
auto* muxer = muxer_;
auto session_id = session_id_;
muxer->task_runner_->PostTask([muxer, session_id, cb] {
auto* consumer = muxer->FindConsumer(session_id);
if (!consumer)
return;
consumer->stop_complete_callback_ = cb;
});
}
// Can be called from any thread.
void TracingMuxerImpl::TracingSessionImpl::GetTraceStats(
GetTraceStatsCallback cb) {
auto* muxer = muxer_;
auto session_id = session_id_;
muxer->task_runner_->PostTask([muxer, session_id, cb] {
muxer->GetTraceStats(session_id, std::move(cb));
});
}
// Can be called from any thread.
void TracingMuxerImpl::TracingSessionImpl::QueryServiceState(
QueryServiceStateCallback cb) {
auto* muxer = muxer_;
auto session_id = session_id_;
muxer->task_runner_->PostTask([muxer, session_id, cb] {
muxer->QueryServiceState(session_id, std::move(cb));
});
}
// ----- End of TracingMuxerImpl::TracingSessionImpl
// ----- Begin of TracingMuxerImpl::StartupTracingSessionImpl
TracingMuxerImpl::StartupTracingSessionImpl::StartupTracingSessionImpl(
TracingMuxerImpl* muxer,
TracingSessionGlobalID session_id,
BackendType backend_type)
: muxer_(muxer), session_id_(session_id), backend_type_(backend_type) {}
// Can be destroyed from any thread.
TracingMuxerImpl::StartupTracingSessionImpl::~StartupTracingSessionImpl() =
default;
void TracingMuxerImpl::StartupTracingSessionImpl::Abort() {
auto* muxer = muxer_;
auto session_id = session_id_;
auto backend_type = backend_type_;
muxer->task_runner_->PostTask([muxer, session_id, backend_type] {
muxer->AbortStartupTracingSession(session_id, backend_type);
});
}
// Must not be called from the SDK's internal thread.
void TracingMuxerImpl::StartupTracingSessionImpl::AbortBlocking() {
auto* muxer = muxer_;
auto session_id = session_id_;
auto backend_type = backend_type_;
PERFETTO_CHECK(!muxer->task_runner_->RunsTasksOnCurrentThread());
base::WaitableEvent event;
muxer->task_runner_->PostTask([muxer, session_id, backend_type, &event] {
muxer->AbortStartupTracingSession(session_id, backend_type);
event.Notify();
});
event.Wait();
}
// ----- End of TracingMuxerImpl::StartupTracingSessionImpl
// static
TracingMuxer* TracingMuxer::instance_ = TracingMuxerFake::Get();
// This is called by perfetto::Tracing::Initialize().
// Can be called on any thread. Typically, but not necessarily, that will be
// the embedder's main thread.
TracingMuxerImpl::TracingMuxerImpl(const TracingInitArgs& args)
: TracingMuxer(args.platform ? args.platform
: Platform::GetDefaultPlatform()) {
PERFETTO_DETACH_FROM_THREAD(thread_checker_);
instance_ = this;
// Create the thread where muxer, producers and service will live.
Platform::CreateTaskRunnerArgs tr_args{/*name_for_debugging=*/"TracingMuxer"};
task_runner_.reset(new NonReentrantTaskRunner(
this, platform_->CreateTaskRunner(std::move(tr_args))));
// Run the initializer on that thread.
task_runner_->PostTask([this, args] {
Initialize(args);
AddBackends(args);
});
}
void TracingMuxerImpl::Initialize(const TracingInitArgs& args) {
PERFETTO_DCHECK_THREAD(thread_checker_); // Rebind the thread checker.
policy_ = args.tracing_policy;
supports_multiple_data_source_instances_ =
args.supports_multiple_data_source_instances;
// Fallback backend for producer creation for an unsupported backend type.
PERFETTO_CHECK(producer_backends_.empty());
AddProducerBackend(internal::TracingBackendFake::GetInstance(),
BackendType::kUnspecifiedBackend, args);
// Fallback backend for consumer creation for an unsupported backend type.
// This backend simply fails any attempt to start a tracing session.
PERFETTO_CHECK(consumer_backends_.empty());
AddConsumerBackend(internal::TracingBackendFake::GetInstance(),
BackendType::kUnspecifiedBackend);
}
void TracingMuxerImpl::AddConsumerBackend(TracingConsumerBackend* backend,
BackendType type) {
if (!backend) {
// We skip the log in release builds because the *_backend_fake.cc code
// has already an ELOG before returning a nullptr.
PERFETTO_DLOG("Consumer backend creation failed, type %d",
static_cast<int>(type));
return;
}
// Keep the backends sorted by type.
auto it =
std::upper_bound(consumer_backends_.begin(), consumer_backends_.end(),
type, CompareBackendByType<RegisteredConsumerBackend>());
it = consumer_backends_.emplace(it);
RegisteredConsumerBackend& rb = *it;
rb.backend = backend;
rb.type = type;
}
void TracingMuxerImpl::AddProducerBackend(TracingProducerBackend* backend,
BackendType type,
const TracingInitArgs& args) {
if (!backend) {
// We skip the log in release builds because the *_backend_fake.cc code
// has already an ELOG before returning a nullptr.
PERFETTO_DLOG("Producer backend creation failed, type %d",
static_cast<int>(type));
return;
}
TracingBackendId backend_id = producer_backends_.size();
// Keep the backends sorted by type.
auto it =
std::upper_bound(producer_backends_.begin(), producer_backends_.end(),
type, CompareBackendByType<RegisteredProducerBackend>());
it = producer_backends_.emplace(it);
RegisteredProducerBackend& rb = *it;
rb.backend = backend;
rb.id = backend_id;
rb.type = type;
rb.producer.reset(new ProducerImpl(this, backend_id,
args.shmem_batch_commits_duration_ms,
args.shmem_direct_patching_enabled));
rb.producer_conn_args.producer = rb.producer.get();
rb.producer_conn_args.producer_name = platform_->GetCurrentProcessName();
rb.producer_conn_args.task_runner = task_runner_.get();
rb.producer_conn_args.shmem_size_hint_bytes = args.shmem_size_hint_kb * 1024;
rb.producer_conn_args.shmem_page_size_hint_bytes =
args.shmem_page_size_hint_kb * 1024;
rb.producer_conn_args.create_socket_async = args.create_socket_async;
rb.producer->Initialize(rb.backend->ConnectProducer(rb.producer_conn_args));
}
TracingMuxerImpl::RegisteredProducerBackend*
TracingMuxerImpl::FindProducerBackendById(TracingBackendId id) {
for (RegisteredProducerBackend& b : producer_backends_) {
if (b.id == id) {
return &b;
}
}
return nullptr;
}
TracingMuxerImpl::RegisteredProducerBackend*
TracingMuxerImpl::FindProducerBackendByType(BackendType type) {
for (RegisteredProducerBackend& b : producer_backends_) {
if (b.type == type) {
return &b;
}
}
return nullptr;
}
TracingMuxerImpl::RegisteredConsumerBackend*
TracingMuxerImpl::FindConsumerBackendByType(BackendType type) {
for (RegisteredConsumerBackend& b : consumer_backends_) {
if (b.type == type) {
return &b;
}
}
return nullptr;
}
void TracingMuxerImpl::AddBackends(const TracingInitArgs& args) {
if (args.backends & kSystemBackend) {
PERFETTO_CHECK(args.system_producer_backend_factory_);
if (FindProducerBackendByType(kSystemBackend) == nullptr) {
AddProducerBackend(args.system_producer_backend_factory_(),
kSystemBackend, args);
}
if (args.enable_system_consumer) {
PERFETTO_CHECK(args.system_consumer_backend_factory_);
if (FindConsumerBackendByType(kSystemBackend) == nullptr) {
AddConsumerBackend(args.system_consumer_backend_factory_(),
kSystemBackend);
}
}
}
if (args.backends & kInProcessBackend) {
TracingBackend* b = nullptr;
if (FindProducerBackendByType(kInProcessBackend) == nullptr) {
if (!b) {
PERFETTO_CHECK(args.in_process_backend_factory_);
b = args.in_process_backend_factory_();
}
AddProducerBackend(b, kInProcessBackend, args);
}
if (FindConsumerBackendByType(kInProcessBackend) == nullptr) {
if (!b) {
PERFETTO_CHECK(args.in_process_backend_factory_);
b = args.in_process_backend_factory_();
}
AddConsumerBackend(b, kInProcessBackend);
}
}
if (args.backends & kCustomBackend) {
PERFETTO_CHECK(args.custom_backend);
if (FindProducerBackendByType(kCustomBackend) == nullptr) {
AddProducerBackend(args.custom_backend, kCustomBackend, args);
}
if (FindConsumerBackendByType(kCustomBackend) == nullptr) {
AddConsumerBackend(args.custom_backend, kCustomBackend);
}
}
if (args.backends & ~(kSystemBackend | kInProcessBackend | kCustomBackend)) {
PERFETTO_FATAL("Unsupported tracing backend type");
}
}
// Can be called from any thread (but not concurrently).
bool TracingMuxerImpl::RegisterDataSource(
const DataSourceDescriptor& descriptor,
DataSourceFactory factory,
DataSourceParams params,
bool no_flush,
DataSourceStaticState* static_state) {
// Ignore repeated registrations.
if (static_state->index != kMaxDataSources)
return true;
uint32_t new_index = next_data_source_index_++;
if (new_index >= kMaxDataSources) {
PERFETTO_DLOG(
"RegisterDataSource failed: too many data sources already registered");
return false;
}
// Initialize the static state.
static_assert(sizeof(static_state->instances[0]) >= sizeof(DataSourceState),
"instances[] size mismatch");
for (size_t i = 0; i < static_state->instances.size(); i++)
new (&static_state->instances[i]) DataSourceState{};
static_state->index = new_index;
// Generate a semi-unique id for this data source.
base::Hasher hash;
hash.Update(reinterpret_cast<intptr_t>(static_state));
hash.Update(base::GetWallTimeNs().count());
static_state->id = hash.digest() ? hash.digest() : 1;
task_runner_->PostTask([this, descriptor, factory, static_state, params,
no_flush] {
data_sources_.emplace_back();
RegisteredDataSource& rds = data_sources_.back();
rds.descriptor = descriptor;
rds.factory = factory;
rds.supports_multiple_instances =
supports_multiple_data_source_instances_ &&
params.supports_multiple_instances;
rds.requires_callbacks_under_lock = params.requires_callbacks_under_lock;
rds.static_state = static_state;
rds.no_flush = no_flush;
UpdateDataSourceOnAllBackends(rds, /*is_changed=*/false);
});
return true;
}
// Can be called from any thread (but not concurrently).
void TracingMuxerImpl::UpdateDataSourceDescriptor(
const DataSourceDescriptor& descriptor,
const DataSourceStaticState* static_state) {
task_runner_->PostTask([this, descriptor, static_state] {
for (auto& rds : data_sources_) {
if (rds.static_state == static_state) {
PERFETTO_CHECK(rds.descriptor.name() == descriptor.name());
rds.descriptor = descriptor;
rds.descriptor.set_id(static_state->id);
UpdateDataSourceOnAllBackends(rds, /*is_changed=*/true);
return;
}
}
});
}
// Can be called from any thread (but not concurrently).
void TracingMuxerImpl::RegisterInterceptor(
const InterceptorDescriptor& descriptor,
InterceptorFactory factory,
InterceptorBase::TLSFactory tls_factory,
InterceptorBase::TracePacketCallback packet_callback) {
task_runner_->PostTask([this, descriptor, factory, tls_factory,
packet_callback] {
// Ignore repeated registrations.
for (const auto& interceptor : interceptors_) {
if (interceptor.descriptor.name() == descriptor.name()) {
PERFETTO_DCHECK(interceptor.tls_factory == tls_factory);
PERFETTO_DCHECK(interceptor.packet_callback == packet_callback);
return;
}
}
// Only allow certain interceptors for now.
if (descriptor.name() != "test_interceptor" &&
descriptor.name() != "console" && descriptor.name() != "etwexport") {
PERFETTO_ELOG(
"Interceptors are experimental. If you want to use them, please "
"get in touch with the project maintainers "
"(https://perfetto.dev/docs/contributing/"
"getting-started#community).");
return;
}
interceptors_.emplace_back();
RegisteredInterceptor& interceptor = interceptors_.back();
interceptor.descriptor = descriptor;
interceptor.factory = factory;
interceptor.tls_factory = tls_factory;
interceptor.packet_callback = packet_callback;
});
}
void TracingMuxerImpl::ActivateTriggers(
const std::vector<std::string>& triggers,
uint32_t ttl_ms) {
base::TimeMillis expire_time =
base::GetWallTimeMs() + base::TimeMillis(ttl_ms);
task_runner_->PostTask([this, triggers, expire_time] {
for (RegisteredProducerBackend& backend : producer_backends_) {
if (backend.producer->connected_) {
backend.producer->service_->ActivateTriggers(triggers);
} else {
for (const std::string& trigger : triggers) {
backend.producer->on_connect_triggers_.emplace_back(trigger,
expire_time);
}
}
}
});
}
// Checks if there is any matching startup tracing data source instance for a
// new SetupDataSource call. If so, moves the data source to this tracing
// session (and its target buffer) and returns true, otherwise returns false.
static bool MaybeAdoptStartupTracingInDataSource(
TracingBackendId backend_id,
uint32_t backend_connection_id,
DataSourceInstanceID instance_id,
const DataSourceConfig& cfg,
const std::vector<RegisteredDataSource>& data_sources) {
for (const auto& rds : data_sources) {
DataSourceStaticState* static_state = rds.static_state;
for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
auto* internal_state = static_state->TryGet(i);
if (internal_state &&
internal_state->startup_target_buffer_reservation.load(
std::memory_order_relaxed) &&
internal_state->data_source_instance_id == 0 &&
internal_state->backend_id == backend_id &&
internal_state->backend_connection_id == backend_connection_id &&
internal_state->config &&
internal_state->data_source->CanAdoptStartupSession(
*internal_state->config, cfg)) {
PERFETTO_DLOG("Setting up data source %" PRIu64
" %s by adopting it from a startup tracing session",
instance_id, cfg.name().c_str());
std::lock_guard<std::recursive_mutex> lock(internal_state->lock);
// Set the associations. The actual takeover happens in
// StartDataSource().
internal_state->data_source_instance_id = instance_id;
internal_state->buffer_id =
static_cast<internal::BufferId>(cfg.target_buffer());
internal_state->config.reset(new DataSourceConfig(cfg));
// TODO(eseckler): Should the data souce config provided by the service
// be allowed to specify additional interceptors / additional data
// source params?
return true;
}
}
}
return false;
}
// Called by the service of one of the backends.
void TracingMuxerImpl::SetupDataSource(TracingBackendId backend_id,
uint32_t backend_connection_id,
DataSourceInstanceID instance_id,
const DataSourceConfig& cfg) {
PERFETTO_DLOG("Setting up data source %" PRIu64 " %s", instance_id,
cfg.name().c_str());
PERFETTO_DCHECK_THREAD(thread_checker_);
// First check if there is any matching startup tracing data source instance.
if (MaybeAdoptStartupTracingInDataSource(backend_id, backend_connection_id,
instance_id, cfg, data_sources_)) {
return;
}
for (const auto& rds : data_sources_) {
if (rds.descriptor.name() != cfg.name())
continue;
DataSourceStaticState& static_state = *rds.static_state;
// If this data source is already active for this exact config, don't start
// another instance. This happens when we have several data sources with the
// same name, in which case the service sends one SetupDataSource event for
// each one. Since we can't map which event maps to which data source, we
// ensure each event only starts one data source instance.
// TODO(skyostil): Register a unique id with each data source to the service
// to disambiguate.
bool active_for_config = false;
for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
if (!static_state.TryGet(i))
continue;
auto* internal_state =
reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
if (internal_state->backend_id == backend_id &&
internal_state->backend_connection_id == backend_connection_id &&
internal_state->config && *internal_state->config == cfg) {
active_for_config = true;
break;
}
}
if (active_for_config) {
PERFETTO_DLOG(
"Data source %s is already active with this config, skipping",
cfg.name().c_str());
continue;
}
SetupDataSourceImpl(rds, backend_id, backend_connection_id, instance_id,
cfg, /*startup_session_id=*/0);
return;
}
}
TracingMuxerImpl::FindDataSourceRes TracingMuxerImpl::SetupDataSourceImpl(
const RegisteredDataSource& rds,
TracingBackendId backend_id,
uint32_t backend_connection_id,
DataSourceInstanceID instance_id,
const DataSourceConfig& cfg,
TracingSessionGlobalID startup_session_id) {
PERFETTO_DCHECK_THREAD(thread_checker_);
DataSourceStaticState& static_state = *rds.static_state;
// If any bit is set in `static_state.valid_instances` then at least one
// other instance of data source is running.
if (!rds.supports_multiple_instances &&
static_state.valid_instances.load(std::memory_order_acquire) != 0) {
PERFETTO_ELOG(
"Failed to setup data source because some another instance of this "
"data source is already active");
return FindDataSourceRes();
}
for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
// Find a free slot.
if (static_state.TryGet(i))
continue;
auto* internal_state =
reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
std::unique_lock<std::recursive_mutex> lock(internal_state->lock);
static_assert(
std::is_same<decltype(internal_state->data_source_instance_id),
DataSourceInstanceID>::value,
"data_source_instance_id type mismatch");
internal_state->muxer_id_for_testing = muxer_id_for_testing_;
RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
if (startup_session_id) {
uint16_t& last_reservation =
backend.producer->last_startup_target_buffer_reservation_;
if (last_reservation == std::numeric_limits<uint16_t>::max()) {
PERFETTO_ELOG(
"Startup buffer reservations exhausted, dropping data source");
return FindDataSourceRes();
}
internal_state->startup_target_buffer_reservation.store(
++last_reservation, std::memory_order_relaxed);
} else {
internal_state->startup_target_buffer_reservation.store(
0, std::memory_order_relaxed);
}
internal_state->backend_id = backend_id;
internal_state->backend_connection_id = backend_connection_id;
internal_state->data_source_instance_id = instance_id;
internal_state->buffer_id =
static_cast<internal::BufferId>(cfg.target_buffer());
internal_state->config.reset(new DataSourceConfig(cfg));
internal_state->startup_session_id = startup_session_id;
internal_state->data_source = rds.factory();
internal_state->interceptor = nullptr;
internal_state->interceptor_id = 0;
internal_state->will_notify_on_stop = rds.descriptor.will_notify_on_stop();
if (cfg.has_interceptor_config()) {
for (size_t j = 0; j < interceptors_.size(); j++) {
if (cfg.interceptor_config().name() ==
interceptors_[j].descriptor.name()) {
PERFETTO_DLOG("Intercepting data source %" PRIu64
" \"%s\" into \"%s\"",
instance_id, cfg.name().c_str(),
cfg.interceptor_config().name().c_str());
internal_state->interceptor_id = static_cast<uint32_t>(j + 1);
internal_state->interceptor = interceptors_[j].factory();
internal_state->interceptor->OnSetup({cfg});
break;
}
}
if (!internal_state->interceptor_id) {
PERFETTO_ELOG("Unknown interceptor configured for data source: %s",
cfg.interceptor_config().name().c_str());
}
}
// This must be made at the end. See matching acquire-load in
// DataSource::Trace().
static_state.valid_instances.fetch_or(1 << i, std::memory_order_release);
DataSourceBase::SetupArgs setup_args;
setup_args.config = &cfg;
setup_args.backend_type = backend.type;
setup_args.internal_instance_index = i;
if (!rds.requires_callbacks_under_lock)
lock.unlock();
internal_state->data_source->OnSetup(setup_args);
return FindDataSourceRes(&static_state, internal_state, i,
rds.requires_callbacks_under_lock);
}
PERFETTO_ELOG(
"Maximum number of data source instances exhausted. "
"Dropping data source %" PRIu64,
instance_id);
return FindDataSourceRes();
}
// Called by the service of one of the backends.
void TracingMuxerImpl::StartDataSource(TracingBackendId backend_id,
DataSourceInstanceID instance_id) {
PERFETTO_DLOG("Starting data source %" PRIu64, instance_id);
PERFETTO_DCHECK_THREAD(thread_checker_);
auto ds = FindDataSource(backend_id, instance_id);
if (!ds) {
PERFETTO_ELOG("Could not find data source to start");
return;
}
// Check if the data source was already started for startup tracing.
uint16_t startup_reservation =
ds.internal_state->startup_target_buffer_reservation.load(
std::memory_order_relaxed);
if (startup_reservation) {
RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
TracingSessionGlobalID session_id = ds.internal_state->startup_session_id;
auto session_it = std::find_if(
backend.startup_sessions.begin(), backend.startup_sessions.end(),
[session_id](const RegisteredStartupSession& session) {
return session.session_id == session_id;
});
PERFETTO_DCHECK(session_it != backend.startup_sessions.end());
if (session_it->is_aborting) {
PERFETTO_DLOG("Data source %" PRIu64
" was already aborted for startup tracing, not starting it",
instance_id);
return;
}
PERFETTO_DLOG(
"Data source %" PRIu64
" was already started for startup tracing, binding its target buffer",
instance_id);
backend.producer->service_->MaybeSharedMemoryArbiter()
->BindStartupTargetBuffer(startup_reservation,
ds.internal_state->buffer_id);
// The reservation ID can be used even after binding it, so there's no need
// for any barriers here - we just need atomicity.
ds.internal_state->startup_target_buffer_reservation.store(
0, std::memory_order_relaxed);
// TODO(eseckler): Should we reset incremental state at this point, or
// notify the data source some other way?
// The session should not have been fully bound yet (or aborted).
PERFETTO_DCHECK(session_it->num_unbound_data_sources > 0);
session_it->num_unbound_data_sources--;
if (session_it->num_unbound_data_sources == 0) {
if (session_it->on_adopted)
task_runner_->PostTask(session_it->on_adopted);
backend.startup_sessions.erase(session_it);
}
return;
}
StartDataSourceImpl(ds);
}
void TracingMuxerImpl::StartDataSourceImpl(const FindDataSourceRes& ds) {
PERFETTO_DCHECK_THREAD(thread_checker_);
DataSourceBase::StartArgs start_args{};
start_args.internal_instance_index = ds.instance_idx;
std::unique_lock<std::recursive_mutex> lock(ds.internal_state->lock);
if (ds.internal_state->interceptor)
ds.internal_state->interceptor->OnStart({});
ds.internal_state->trace_lambda_enabled.store(true,
std::memory_order_relaxed);
PERFETTO_DCHECK(ds.internal_state->data_source != nullptr);
if (!ds.requires_callbacks_under_lock)
lock.unlock();
ds.internal_state->data_source->OnStart(start_args);
}
// Called by the service of one of the backends.
void TracingMuxerImpl::StopDataSource_AsyncBegin(
TracingBackendId backend_id,
DataSourceInstanceID instance_id) {
PERFETTO_DLOG("Stopping data source %" PRIu64, instance_id);
PERFETTO_DCHECK_THREAD(thread_checker_);
auto ds = FindDataSource(backend_id, instance_id);
if (!ds) {
PERFETTO_ELOG("Could not find data source to stop");
return;
}
StopDataSource_AsyncBeginImpl(ds);
}
void TracingMuxerImpl::StopDataSource_AsyncBeginImpl(
const FindDataSourceRes& ds) {
TracingBackendId backend_id = ds.internal_state->backend_id;
uint32_t backend_connection_id = ds.internal_state->backend_connection_id;
DataSourceInstanceID instance_id = ds.internal_state->data_source_instance_id;
StopArgsImpl stop_args{};
stop_args.internal_instance_index = ds.instance_idx;
stop_args.async_stop_closure = [this, backend_id, backend_connection_id,
instance_id, ds] {
// TracingMuxerImpl is long lived, capturing |this| is okay.
// The notification closure can be moved out of the StopArgs by the
// embedder to handle stop asynchronously. The embedder might then
// call the closure on a different thread than the current one, hence
// this nested PostTask().
task_runner_->PostTask(
[this, backend_id, backend_connection_id, instance_id, ds] {
StopDataSource_AsyncEnd(backend_id, backend_connection_id,
instance_id, ds);
});
};
{
std::unique_lock<std::recursive_mutex> lock(ds.internal_state->lock);
// Don't call OnStop again if the datasource is already stopping.
if (ds.internal_state->async_stop_in_progress)
return;
ds.internal_state->async_stop_in_progress = true;
if (ds.internal_state->interceptor)
ds.internal_state->interceptor->OnStop({});
if (!ds.requires_callbacks_under_lock)
lock.unlock();
ds.internal_state->data_source->OnStop(stop_args);
}
// If the embedder hasn't called StopArgs.HandleStopAsynchronously() run the
// async closure here. In theory we could avoid the PostTask and call
// straight into CompleteDataSourceAsyncStop(). We keep that to reduce
// divergencies between the deferred-stop vs non-deferred-stop code paths.
if (stop_args.async_stop_closure)
std::move(stop_args.async_stop_closure)();
}
void TracingMuxerImpl::StopDataSource_AsyncEnd(TracingBackendId backend_id,
uint32_t backend_connection_id,
DataSourceInstanceID instance_id,
const FindDataSourceRes& ds) {
PERFETTO_DLOG("Ending async stop of data source %" PRIu64, instance_id);
PERFETTO_DCHECK_THREAD(thread_checker_);
// Check that the data source instance is still active and was not modified
// while it was being stopped.
if (!ds.static_state->TryGet(ds.instance_idx) ||
ds.internal_state->backend_id != backend_id ||
ds.internal_state->backend_connection_id != backend_connection_id ||
ds.internal_state->data_source_instance_id != instance_id) {
PERFETTO_ELOG(
"Async stop of data source %" PRIu64
" failed. This might be due to calling the async_stop_closure twice.",
instance_id);
return;
}
const uint32_t mask = ~(1 << ds.instance_idx);
ds.static_state->valid_instances.fetch_and(mask, std::memory_order_acq_rel);
bool will_notify_on_stop;
// Take the mutex to prevent that the data source is in the middle of
// a Trace() execution where it called GetDataSourceLocked() while we
// destroy it.
uint16_t startup_buffer_reservation;
TracingSessionGlobalID startup_session_id;
{
std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
ds.internal_state->trace_lambda_enabled.store(false,
std::memory_order_relaxed);
ds.internal_state->data_source.reset();
ds.internal_state->interceptor.reset();
ds.internal_state->config.reset();
ds.internal_state->async_stop_in_progress = false;
will_notify_on_stop = ds.internal_state->will_notify_on_stop;
startup_buffer_reservation =
ds.internal_state->startup_target_buffer_reservation.load(
std::memory_order_relaxed);
startup_session_id = ds.internal_state->startup_session_id;
}
// The other fields of internal_state are deliberately *not* cleared.
// See races-related comments of DataSource::Trace().
TracingMuxer::generation_++;
// |producer_backends_| is append-only, Backend instances are always valid.
PERFETTO_CHECK(backend_id < producer_backends_.size());
RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
ProducerImpl* producer = backend.producer.get();
if (!producer)
return;
// If the data source instance still has a startup buffer reservation, it was
// only active for startup tracing and never started by the service. Discard
// the startup buffer reservation.
if (startup_buffer_reservation) {
PERFETTO_DCHECK(startup_session_id);
if (producer->service_ && producer->service_->MaybeSharedMemoryArbiter()) {
producer->service_->MaybeSharedMemoryArbiter()
->AbortStartupTracingForReservation(startup_buffer_reservation);
}
auto session_it = std::find_if(
backend.startup_sessions.begin(), backend.startup_sessions.end(),
[startup_session_id](const RegisteredStartupSession& session) {
return session.session_id == startup_session_id;
});
// Session should not be removed until abortion of all data source instances
// is complete.
PERFETTO_DCHECK(session_it != backend.startup_sessions.end());
session_it->num_aborting_data_sources--;
if (session_it->num_aborting_data_sources == 0) {
if (session_it->on_aborted)
task_runner_->PostTask(session_it->on_aborted);
backend.startup_sessions.erase(session_it);
}
}
if (producer->connected_ &&
backend.producer->connection_id_.load(std::memory_order_relaxed) ==
backend_connection_id) {
// Flush any commits that might have been batched by SharedMemoryArbiter.
producer->service_->MaybeSharedMemoryArbiter()
->FlushPendingCommitDataRequests();
if (instance_id && will_notify_on_stop)
producer->service_->NotifyDataSourceStopped(instance_id);
}
producer->SweepDeadServices();
}
void TracingMuxerImpl::ClearDataSourceIncrementalState(
TracingBackendId backend_id,
DataSourceInstanceID instance_id) {
PERFETTO_DCHECK_THREAD(thread_checker_);
PERFETTO_DLOG("Clearing incremental state for data source %" PRIu64,
instance_id);
auto ds = FindDataSource(backend_id, instance_id);
if (!ds) {
PERFETTO_ELOG("Could not find data source to clear incremental state for");
return;
}
DataSourceBase::ClearIncrementalStateArgs clear_incremental_state_args;
clear_incremental_state_args.internal_instance_index = ds.instance_idx;
{
std::unique_lock<std::recursive_mutex> lock;
if (ds.requires_callbacks_under_lock)
lock = std::unique_lock<std::recursive_mutex>(ds.internal_state->lock);
ds.internal_state->data_source->WillClearIncrementalState(
clear_incremental_state_args);
}
// Make DataSource::TraceContext::GetIncrementalState() eventually notice that
// the incremental state should be cleared.
ds.static_state->incremental_state_generation.fetch_add(
1, std::memory_order_relaxed);
}
bool TracingMuxerImpl::FlushDataSource_AsyncBegin(
TracingBackendId backend_id,
DataSourceInstanceID instance_id,
FlushRequestID flush_id,
FlushFlags flush_flags) {
PERFETTO_DLOG("Flushing data source %" PRIu64, instance_id);
auto ds = FindDataSource(backend_id, instance_id);
if (!ds) {
PERFETTO_ELOG("Could not find data source to flush");
return true;
}
uint32_t backend_connection_id = ds.internal_state->backend_connection_id;
FlushArgsImpl flush_args;
flush_args.flush_flags = flush_flags;
flush_args.internal_instance_index = ds.instance_idx;
flush_args.async_flush_closure = [this, backend_id, backend_connection_id,
instance_id, ds, flush_id] {
// TracingMuxerImpl is long lived, capturing |this| is okay.
// The notification closure can be moved out of the StopArgs by the
// embedder to handle stop asynchronously. The embedder might then
// call the closure on a different thread than the current one, hence
// this nested PostTask().
task_runner_->PostTask(
[this, backend_id, backend_connection_id, instance_id, ds, flush_id] {
FlushDataSource_AsyncEnd(backend_id, backend_connection_id,
instance_id, ds, flush_id);
});
};
{
std::unique_lock<std::recursive_mutex> lock;
if (ds.requires_callbacks_under_lock)
lock = std::unique_lock<std::recursive_mutex>(ds.internal_state->lock);
ds.internal_state->data_source->OnFlush(flush_args);
}
// |async_flush_closure| is moved out of |flush_args| if the producer
// requested to handle the flush asynchronously.
bool handled = static_cast<bool>(flush_args.async_flush_closure);
return handled;
}
void TracingMuxerImpl::FlushDataSource_AsyncEnd(
TracingBackendId backend_id,
uint32_t backend_connection_id,
DataSourceInstanceID instance_id,
const FindDataSourceRes& ds,
FlushRequestID flush_id) {
PERFETTO_DLOG("Ending async flush of data source %" PRIu64, instance_id);
PERFETTO_DCHECK_THREAD(thread_checker_);
// Check that the data source instance is still active and was not modified
// while it was being flushed.
if (!ds.static_state->TryGet(ds.instance_idx) ||
ds.internal_state->backend_id != backend_id ||
ds.internal_state->backend_connection_id != backend_connection_id ||
ds.internal_state->data_source_instance_id != instance_id) {
PERFETTO_ELOG("Async flush of data source %" PRIu64
" failed. This might be due to the data source being stopped "
"in the meantime",
instance_id);
return;
}
// |producer_backends_| is append-only, Backend instances are always valid.
PERFETTO_CHECK(backend_id < producer_backends_.size());
RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
ProducerImpl* producer = backend.producer.get();
if (!producer)
return;
// If the tracing service disconnects and reconnects while a data source is
// handling a flush request, there's no point is sending the flush reply to
// the newly reconnected producer.
if (producer->connected_ &&
backend.producer->connection_id_.load(std::memory_order_relaxed) ==
backend_connection_id) {
producer->NotifyFlushForDataSourceDone(instance_id, flush_id);
}
}
void TracingMuxerImpl::SyncProducersForTesting() {
std::mutex mutex;
std::condition_variable cv;
// IPC-based producers don't report connection errors explicitly for each
// command, but instead with an asynchronous callback
// (ProducerImpl::OnDisconnected). This means that the sync command below
// may have completed but failed to reach the service because of a
// disconnection, but we can't tell until the disconnection message comes
// through. To guard against this, we run two whole rounds of sync round-trips
// before returning; the first one will detect any disconnected producers and
// the second one will ensure any reconnections have completed and all data
// sources are registered in the service again.
for (size_t i = 0; i < 2; i++) {
size_t countdown = std::numeric_limits<size_t>::max();
task_runner_->PostTask([this, &mutex, &cv, &countdown] {
{
std::unique_lock<std::mutex> countdown_lock(mutex);
countdown = producer_backends_.size();
}
for (auto& backend : producer_backends_) {
auto* producer = backend.producer.get();
producer->service_->Sync([&mutex, &cv, &countdown] {
std::unique_lock<std::mutex> countdown_lock(mutex);
countdown--;
cv.notify_one();
});
}
});
{
std::unique_lock<std::mutex> countdown_lock(mutex);
cv.wait(countdown_lock, [&countdown] { return !countdown; });
}
}
// Check that all producers are indeed connected.
bool done = false;
bool all_producers_connected = true;
task_runner_->PostTask([this, &mutex, &cv, &done, &all_producers_connected] {
for (auto& backend : producer_backends_)
all_producers_connected &= backend.producer->connected_;
std::unique_lock<std::mutex> lock(mutex);
done = true;
cv.notify_one();
});
{
std::unique_lock<std::mutex> lock(mutex);
cv.wait(lock, [&done] { return done; });
}
PERFETTO_DCHECK(all_producers_connected);
}
void TracingMuxerImpl::DestroyStoppedTraceWritersForCurrentThread() {
// Iterate across all possible data source types.
auto cur_generation = generation_.load(std::memory_order_acquire);
auto* root_tls = GetOrCreateTracingTLS();
auto destroy_stopped_instances = [](DataSourceThreadLocalState& tls) {
// |tls| has a vector of per-data-source-instance thread-local state.
DataSourceStaticState* static_state = tls.static_state;
if (!static_state)
return; // Slot not used.
// Iterate across all possible instances for this data source.
for (uint32_t inst = 0; inst < kMaxDataSourceInstances; inst++) {
DataSourceInstanceThreadLocalState& ds_tls = tls.per_instance[inst];
if (!ds_tls.trace_writer)
continue;
DataSourceState* ds_state = static_state->TryGet(inst);
if (ds_state &&
ds_state->muxer_id_for_testing == ds_tls.muxer_id_for_testing &&
ds_state->backend_id == ds_tls.backend_id &&
ds_state->backend_connection_id == ds_tls.backend_connection_id &&
ds_state->startup_target_buffer_reservation.load(
std::memory_order_relaxed) ==
ds_tls.startup_target_buffer_reservation &&
ds_state->buffer_id == ds_tls.buffer_id &&
ds_state->data_source_instance_id == ds_tls.data_source_instance_id) {
continue;
}
// The DataSource instance has been destroyed or recycled.
ds_tls.Reset(); // Will also destroy the |ds_tls.trace_writer|.
}
};
for (size_t ds_idx = 0; ds_idx < kMaxDataSources; ds_idx++) {
// |tls| has a vector of per-data-source-instance thread-local state.
DataSourceThreadLocalState& tls = root_tls->data_sources_tls[ds_idx];
destroy_stopped_instances(tls);
}
destroy_stopped_instances(root_tls->track_event_tls);
root_tls->generation = cur_generation;
}
// Called both when a new data source is registered or when a new backend
// connects. In both cases we want to be sure we reflected the data source
// registrations on the backends.
void TracingMuxerImpl::UpdateDataSourcesOnAllBackends() {
PERFETTO_DCHECK_THREAD(thread_checker_);
for (RegisteredDataSource& rds : data_sources_) {
UpdateDataSourceOnAllBackends(rds, /*is_changed=*/false);
}
}
void TracingMuxerImpl::UpdateDataSourceOnAllBackends(RegisteredDataSource& rds,
bool is_changed) {
PERFETTO_DCHECK_THREAD(thread_checker_);
for (RegisteredProducerBackend& backend : producer_backends_) {
// We cannot call RegisterDataSource on the backend before it connects.
if (!backend.producer->connected_)
continue;
PERFETTO_DCHECK(rds.static_state->index < kMaxDataSources);
bool is_registered = backend.producer->registered_data_sources_.test(
rds.static_state->index);
if (is_registered && !is_changed)
continue;
if (!rds.descriptor.no_flush()) {
rds.descriptor.set_no_flush(rds.no_flush);
}
rds.descriptor.set_will_notify_on_start(true);
if (!rds.descriptor.has_will_notify_on_stop()) {
rds.descriptor.set_will_notify_on_stop(true);
}
rds.descriptor.set_handles_incremental_state_clear(true);
rds.descriptor.set_id(rds.static_state->id);
if (is_registered) {
backend.producer->service_->UpdateDataSource(rds.descriptor);
} else {
backend.producer->service_->RegisterDataSource(rds.descriptor);
}
backend.producer->registered_data_sources_.set(rds.static_state->index);
}
}
void TracingMuxerImpl::SetupTracingSession(
TracingSessionGlobalID session_id,
const std::shared_ptr<TraceConfig>& trace_config,
base::ScopedFile trace_fd) {
PERFETTO_DCHECK_THREAD(thread_checker_);
PERFETTO_CHECK(!trace_fd || trace_config->write_into_file());
auto* consumer = FindConsumer(session_id);
if (!consumer)
return;
consumer->trace_config_ = trace_config;
if (trace_fd)
consumer->trace_fd_ = std::move(trace_fd);
if (!consumer->connected_)
return;
// Only used in the deferred start mode.
if (trace_config->deferred_start()) {
consumer->service_->EnableTracing(*trace_config,
std::move(consumer->trace_fd_));
}
}
void TracingMuxerImpl::StartTracingSession(TracingSessionGlobalID session_id) {
PERFETTO_DCHECK_THREAD(thread_checker_);
auto* consumer = FindConsumer(session_id);
if (!consumer)
return;
if (!consumer->trace_config_) {
PERFETTO_ELOG("Must call Setup(config) first");
return;
}
if (!consumer->connected_) {
consumer->start_pending_ = true;
return;
}
consumer->start_pending_ = false;
if (consumer->trace_config_->deferred_start()) {
consumer->service_->StartTracing();
} else {
consumer->service_->EnableTracing(*consumer->trace_config_,
std::move(consumer->trace_fd_));
}
// TODO implement support for the deferred-start + fast-triggering case.
}
void TracingMuxerImpl::ChangeTracingSessionConfig(
TracingSessionGlobalID session_id,
const TraceConfig& trace_config) {
PERFETTO_DCHECK_THREAD(thread_checker_);
auto* consumer = FindConsumer(session_id);
if (!consumer)
return;
if (!consumer->trace_config_) {
// Changing the config is only supported for started sessions.
PERFETTO_ELOG("Must call Setup(config) and Start() first");
return;
}
consumer->trace_config_ = std::make_shared<TraceConfig>(trace_config);
if (consumer->connected_)
consumer->service_->ChangeTraceConfig(trace_config);
}
void TracingMuxerImpl::FlushTracingSession(TracingSessionGlobalID session_id,
uint32_t timeout_ms,
std::function<void(bool)> callback) {
PERFETTO_DCHECK_THREAD(thread_checker_);
auto* consumer = FindConsumer(session_id);
if (!consumer || consumer->start_pending_ || consumer->stop_pending_ ||
!consumer->trace_config_) {
PERFETTO_ELOG("Flush() can be called only after Start() and before Stop()");
std::move(callback)(false);
return;
}
// For now we don't want to expose the flush reason to the consumer-side SDK
// users to avoid misuses until there is a strong need.
consumer->service_->Flush(timeout_ms, std::move(callback),
FlushFlags(FlushFlags::Initiator::kConsumerSdk,
FlushFlags::Reason::kExplicit));
}
void TracingMuxerImpl::StopTracingSession(TracingSessionGlobalID session_id) {
PERFETTO_DCHECK_THREAD(thread_checker_);
auto* consumer = FindConsumer(session_id);
if (!consumer)
return;
if (consumer->start_pending_) {
// If the session hasn't started yet, wait until it does before stopping.
consumer->stop_pending_ = true;
return;
}
consumer->stop_pending_ = false;
if (consumer->stopped_) {
// If the session was already stopped (e.g., it failed to start), don't try
// stopping again.
consumer->NotifyStopComplete();
} else if (!consumer->trace_config_) {
PERFETTO_ELOG("Must call Setup(config) and Start() first");
return;
} else {
consumer->service_->DisableTracing();
}
consumer->trace_config_.reset();
}
void TracingMuxerImpl::DestroyTracingSession(
TracingSessionGlobalID session_id) {
PERFETTO_DCHECK_THREAD(thread_checker_);
for (RegisteredConsumerBackend& backend : consumer_backends_) {
// We need to find the consumer (if any) and call Disconnect as we destroy
// the tracing session. We can't call Disconnect() inside this for loop
// because in the in-process case this will end up to a synchronous call to
// OnConsumerDisconnect which will invalidate all the iterators to
// |backend.consumers|.
ConsumerImpl* consumer = nullptr;
for (auto& con : backend.consumers) {
if (con->session_id_ == session_id) {
consumer = con.get();
break;
}
}
if (consumer) {
// We broke out of the loop above on the assumption that each backend will
// only have a single consumer per session. This DCHECK ensures that
// this is the case.
PERFETTO_DCHECK(
std::count_if(backend.consumers.begin(), backend.consumers.end(),
[session_id](const std::unique_ptr<ConsumerImpl>& con) {
return con->session_id_ == session_id;
}) == 1u);
consumer->Disconnect();
}
}
}
void TracingMuxerImpl::ReadTracingSessionData(
TracingSessionGlobalID session_id,
std::function<void(TracingSession::ReadTraceCallbackArgs)> callback) {
PERFETTO_DCHECK_THREAD(thread_checker_);
auto* consumer = FindConsumer(session_id);
if (!consumer) {
// TODO(skyostil): Signal an error to the user.
TracingSession::ReadTraceCallbackArgs callback_arg{};
callback(callback_arg);
return;
}
PERFETTO_DCHECK(!consumer->read_trace_callback_);
consumer->read_trace_callback_ = std::move(callback);
consumer->service_->ReadBuffers();
}
void TracingMuxerImpl::GetTraceStats(
TracingSessionGlobalID session_id,
TracingSession::GetTraceStatsCallback callback) {
PERFETTO_DCHECK_THREAD(thread_checker_);
auto* consumer = FindConsumer(session_id);
if (!consumer) {
TracingSession::GetTraceStatsCallbackArgs callback_arg{};
callback_arg.success = false;
callback(std::move(callback_arg));
return;
}
PERFETTO_DCHECK(!consumer->get_trace_stats_callback_);
consumer->get_trace_stats_callback_ = std::move(callback);
if (!consumer->connected_) {
consumer->get_trace_stats_pending_ = true;
return;
}
consumer->get_trace_stats_pending_ = false;
consumer->service_->GetTraceStats();
}
void TracingMuxerImpl::QueryServiceState(
TracingSessionGlobalID session_id,
TracingSession::QueryServiceStateCallback callback) {
PERFETTO_DCHECK_THREAD(thread_checker_);
auto* consumer = FindConsumer(session_id);
if (!consumer) {
TracingSession::QueryServiceStateCallbackArgs callback_arg{};
callback_arg.success = false;
callback(std::move(callback_arg));
return;
}
PERFETTO_DCHECK(!consumer->query_service_state_callback_);
if (!consumer->connected_) {
consumer->query_service_state_callback_ = std::move(callback);
return;
}
auto callback_wrapper = [callback](bool success,
protos::gen::TracingServiceState state) {
TracingSession::QueryServiceStateCallbackArgs callback_arg{};
callback_arg.success = success;
callback_arg.service_state_data = state.SerializeAsArray();
callback(std::move(callback_arg));
};
consumer->service_->QueryServiceState({}, std::move(callback_wrapper));
}
void TracingMuxerImpl::SetBatchCommitsDurationForTesting(
uint32_t batch_commits_duration_ms,
BackendType backend_type) {
for (RegisteredProducerBackend& backend : producer_backends_) {
if (backend.producer && backend.producer->connected_ &&
backend.type == backend_type) {
backend.producer->service_->MaybeSharedMemoryArbiter()
->SetBatchCommitsDuration(batch_commits_duration_ms);
}
}
}
bool TracingMuxerImpl::EnableDirectSMBPatchingForTesting(
BackendType backend_type) {
for (RegisteredProducerBackend& backend : producer_backends_) {
if (backend.producer && backend.producer->connected_ &&
backend.type == backend_type &&
!backend.producer->service_->MaybeSharedMemoryArbiter()
->EnableDirectSMBPatching()) {
return false;
}
}
return true;
}
TracingMuxerImpl::ConsumerImpl* TracingMuxerImpl::FindConsumer(
TracingSessionGlobalID session_id) {
PERFETTO_DCHECK_THREAD(thread_checker_);
return FindConsumerAndBackend(session_id).first;
}
std::pair<TracingMuxerImpl::ConsumerImpl*,
TracingMuxerImpl::RegisteredConsumerBackend*>
TracingMuxerImpl::FindConsumerAndBackend(TracingSessionGlobalID session_id) {
PERFETTO_DCHECK_THREAD(thread_checker_);
for (RegisteredConsumerBackend& backend : consumer_backends_) {
for (auto& consumer : backend.consumers) {
if (consumer->session_id_ == session_id) {
return {consumer.get(), &backend};
}
}
}
return {nullptr, nullptr};
}
void TracingMuxerImpl::InitializeConsumer(TracingSessionGlobalID session_id) {
PERFETTO_DCHECK_THREAD(thread_checker_);
auto res = FindConsumerAndBackend(session_id);
if (!res.first || !res.second)
return;
TracingMuxerImpl::ConsumerImpl* consumer = res.first;
RegisteredConsumerBackend& backend = *res.second;
TracingBackend::ConnectConsumerArgs conn_args;
conn_args.consumer = consumer;
conn_args.task_runner = task_runner_.get();
consumer->Initialize(backend.backend->ConnectConsumer(conn_args));
}
void TracingMuxerImpl::OnConsumerDisconnected(ConsumerImpl* consumer) {
PERFETTO_DCHECK_THREAD(thread_checker_);
for (RegisteredConsumerBackend& backend : consumer_backends_) {
auto pred = [consumer](const std::unique_ptr<ConsumerImpl>& con) {
return con.get() == consumer;
};
backend.consumers.erase(std::remove_if(backend.consumers.begin(),
backend.consumers.end(), pred),
backend.consumers.end());
}
}
void TracingMuxerImpl::SetMaxProducerReconnectionsForTesting(uint32_t count) {
max_producer_reconnections_.store(count);
}
void TracingMuxerImpl::OnProducerDisconnected(ProducerImpl* producer) {
PERFETTO_DCHECK_THREAD(thread_checker_);
for (RegisteredProducerBackend& backend : producer_backends_) {
if (backend.producer.get() != producer)
continue;
// The tracing service is disconnected. It does not make sense to keep
// tracing (we wouldn't be able to commit). On reconnection, the tracing
// service will restart the data sources.
for (const auto& rds : data_sources_) {
DataSourceStaticState* static_state = rds.static_state;
for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
auto* internal_state = static_state->TryGet(i);
if (internal_state && internal_state->backend_id == backend.id &&
internal_state->backend_connection_id ==
backend.producer->connection_id_.load(
std::memory_order_relaxed)) {
StopDataSource_AsyncBeginImpl(
FindDataSourceRes(static_state, internal_state, i,
rds.requires_callbacks_under_lock));
}
}
}
// Try reconnecting the disconnected producer. If the connection succeeds,
// all the data sources will be automatically re-registered.
if (producer->connection_id_.load(std::memory_order_relaxed) >
max_producer_reconnections_.load()) {
// Avoid reconnecting a failing producer too many times. Instead we just
// leak the producer instead of trying to avoid further complicating
// cross-thread trace writer creation.
PERFETTO_ELOG("Producer disconnected too many times; not reconnecting");
continue;
}
backend.producer->Initialize(
backend.backend->ConnectProducer(backend.producer_conn_args));
// Don't use producer-provided SMBs for the next connection unless startup
// tracing requires it again.
backend.producer_conn_args.use_producer_provided_smb = false;
}
}
void TracingMuxerImpl::SweepDeadBackends() {
PERFETTO_DCHECK_THREAD(thread_checker_);
for (auto it = dead_backends_.begin(); it != dead_backends_.end();) {
auto next_it = it;
next_it++;
if (it->producer->SweepDeadServices())
dead_backends_.erase(it);
it = next_it;
}
}
TracingMuxerImpl::FindDataSourceRes TracingMuxerImpl::FindDataSource(
TracingBackendId backend_id,
DataSourceInstanceID instance_id) {
PERFETTO_DCHECK_THREAD(thread_checker_);
RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id);
for (const auto& rds : data_sources_) {
DataSourceStaticState* static_state = rds.static_state;
for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
auto* internal_state = static_state->TryGet(i);
if (internal_state && internal_state->backend_id == backend_id &&
internal_state->backend_connection_id ==
backend.producer->connection_id_.load(
std::memory_order_relaxed) &&
internal_state->data_source_instance_id == instance_id) {
return FindDataSourceRes(static_state, internal_state, i,
rds.requires_callbacks_under_lock);
}
}
}
return FindDataSourceRes();
}
// Can be called from any thread.
std::unique_ptr<TraceWriterBase> TracingMuxerImpl::CreateTraceWriter(
DataSourceStaticState* static_state,
uint32_t data_source_instance_index,
DataSourceState* data_source,
BufferExhaustedPolicy buffer_exhausted_policy) {
if (PERFETTO_UNLIKELY(data_source->interceptor_id)) {
// If the session is being intercepted, return a heap-backed trace writer
// instead. This is safe because all the data given to the interceptor is
// either thread-local (|instance_index|), statically allocated
// (|static_state|) or constant after initialization (|interceptor|). Access
// to the interceptor instance itself through |data_source| is protected by
// a statically allocated lock (similarly to the data source instance).
auto& interceptor = interceptors_[data_source->interceptor_id - 1];
return std::unique_ptr<TraceWriterBase>(new InterceptorTraceWriter(
interceptor.tls_factory(static_state, data_source_instance_index),
interceptor.packet_callback, static_state, data_source_instance_index));
}
ProducerImpl* producer =
FindProducerBackendById(data_source->backend_id)->producer.get();
// Atomically load the current service endpoint. We keep the pointer as a
// shared pointer on the stack to guard against it from being concurrently
// modified on the thread by ProducerImpl::Initialize() swapping in a
// reconnected service on the muxer task runner thread.
//
// The endpoint may also be concurrently modified by SweepDeadServices()
// clearing out old disconnected services. We guard against that by
// SharedMemoryArbiter keeping track of any outstanding trace writers. After
// shutdown has started, the trace writer created below will be a null one
// which will drop any written data. See SharedMemoryArbiter::TryShutdown().
//
// We use an atomic pointer instead of holding a lock because
// CreateTraceWriter posts tasks under the hood.
std::shared_ptr<ProducerEndpoint> service =
std::atomic_load(&producer->service_);
// The service may have been disconnected and reconnected concurrently after
// the data source was enabled, in which case we may not have an arbiter, or
// would be creating a TraceWriter for the wrong (a newer) connection / SMB.
// Instead, early-out now. A relaxed load is fine here because the atomic_load
// above ensures that the |service| isn't newer.
if (producer->connection_id_.load(std::memory_order_relaxed) !=
data_source->backend_connection_id) {
return std::unique_ptr<TraceWriter>(new NullTraceWriter());
}
// We just need a relaxed atomic read here: We can use the reservation ID even
// after the buffer was bound, we just need to be sure to read it atomically.
uint16_t startup_buffer_reservation =
data_source->startup_target_buffer_reservation.load(
std::memory_order_relaxed);
if (startup_buffer_reservation) {
return service->MaybeSharedMemoryArbiter()->CreateStartupTraceWriter(
startup_buffer_reservation);
}
return service->CreateTraceWriter(data_source->buffer_id,
buffer_exhausted_policy);
}
// This is called via the public API Tracing::NewTrace().
// Can be called from any thread.
std::unique_ptr<TracingSession> TracingMuxerImpl::CreateTracingSession(
BackendType requested_backend_type,
TracingConsumerBackend* (*system_backend_factory)()) {
TracingSessionGlobalID session_id = ++next_tracing_session_id_;
// |backend_type| can only specify one backend, not an OR-ed mask.
PERFETTO_CHECK((requested_backend_type & (requested_backend_type - 1)) == 0);
// Capturing |this| is fine because the TracingMuxer is a leaky singleton.
task_runner_->PostTask([this, requested_backend_type, session_id,
system_backend_factory] {
if (requested_backend_type == kSystemBackend && system_backend_factory &&
!FindConsumerBackendByType(kSystemBackend)) {
AddConsumerBackend(system_backend_factory(), kSystemBackend);
}
for (RegisteredConsumerBackend& backend : consumer_backends_) {
if (requested_backend_type && backend.type &&
backend.type != requested_backend_type) {
continue;
}
// Create the consumer now, even if we have to ask the embedder below, so
// that any other tasks executing after this one can find the consumer and
// change its pending attributes.
backend.consumers.emplace_back(
new ConsumerImpl(this, backend.type, session_id));
// The last registered backend in |consumer_backends_| is the unsupported
// backend without a valid type.
if (!backend.type) {
PERFETTO_ELOG(
"No tracing backend ready for type=%d, consumer will disconnect",
requested_backend_type);
InitializeConsumer(session_id);
return;
}
// Check if the embedder wants to be asked for permission before
// connecting the consumer.
if (!policy_) {
InitializeConsumer(session_id);
return;
}
BackendType type = backend.type;
TracingPolicy::ShouldAllowConsumerSessionArgs args;
args.backend_type = backend.type;
args.result_callback = [this, type, session_id](bool allow) {
task_runner_->PostTask([this, type, session_id, allow] {
if (allow) {
InitializeConsumer(session_id);
return;
}
PERFETTO_ELOG(
"Consumer session for backend type type=%d forbidden, "
"consumer will disconnect",
type);
auto* consumer = FindConsumer(session_id);
if (!consumer)
return;
consumer->OnDisconnect();
});
};
policy_->ShouldAllowConsumerSession(args);
return;
}
PERFETTO_DFATAL("Not reached");
});
return std::unique_ptr<TracingSession>(
new TracingSessionImpl(this, session_id, requested_backend_type));
}
// static
// This is called via the public API Tracing::SetupStartupTracing().
// Can be called from any thread.
std::unique_ptr<StartupTracingSession>
TracingMuxerImpl::CreateStartupTracingSession(
const TraceConfig& config,
Tracing::SetupStartupTracingOpts opts) {
BackendType backend_type = opts.backend;
// |backend_type| can only specify one backend, not an OR-ed mask.
PERFETTO_CHECK((backend_type & (backend_type - 1)) == 0);
// The in-process backend doesn't support startup tracing.
PERFETTO_CHECK(backend_type != BackendType::kInProcessBackend);
TracingSessionGlobalID session_id = ++next_tracing_session_id_;
// Capturing |this| is fine because the TracingMuxer is a leaky singleton.
task_runner_->PostTask([this, config, opts, backend_type, session_id] {
for (RegisteredProducerBackend& backend : producer_backends_) {
if (backend_type && backend.type && backend.type != backend_type) {
continue;
}
TracingBackendId backend_id = backend.id;
// The last registered backend in |producer_backends_| is the unsupported
// backend without a valid type.
if (!backend.type) {
PERFETTO_ELOG(
"No tracing backend initialized for type=%d, startup tracing "
"failed",
backend_type);
if (opts.on_setup)
opts.on_setup(Tracing::OnStartupTracingSetupCallbackArgs{
0 /* num_data_sources_started */});
return;
}
if (!backend.producer->service_ ||
!backend.producer->service_->shared_memory()) {
// If we unsuccessfully attempted to use a producer-provided SMB in the
// past, don't try again.
if (backend.producer->producer_provided_smb_failed_) {
PERFETTO_ELOG(
"Backend %zu doesn't seem to support producer-provided "
"SMBs, startup tracing failed",
backend_id);
if (opts.on_setup)
opts.on_setup(Tracing::OnStartupTracingSetupCallbackArgs{
0 /* num_data_sources_started */});
return;
}
PERFETTO_DLOG("Reconnecting backend %zu for startup tracing",
backend_id);
backend.producer_conn_args.use_producer_provided_smb = true;
backend.producer->service_->Disconnect(); // Causes a reconnect.
PERFETTO_DCHECK(backend.producer->service_ &&
backend.producer->service_->MaybeSharedMemoryArbiter());
}
RegisteredStartupSession session;
session.session_id = session_id;
session.on_aborted = opts.on_aborted;
session.on_adopted = opts.on_adopted;
for (const TraceConfig::DataSource& ds_cfg : config.data_sources()) {
// Find all matching data sources and start one instance of each.
for (const auto& rds : data_sources_) {
if (rds.descriptor.name() != ds_cfg.config().name())
continue;
PERFETTO_DLOG(
"Setting up data source %s for startup tracing with target "
"buffer reservation %" PRIi32,
rds.descriptor.name().c_str(),
backend.producer->last_startup_target_buffer_reservation_ + 1u);
auto ds = SetupDataSourceImpl(
rds, backend_id,
backend.producer->connection_id_.load(std::memory_order_relaxed),
/*instance_id=*/0, ds_cfg.config(),
/*startup_session_id=*/session_id);
if (ds) {
StartDataSourceImpl(ds);
session.num_unbound_data_sources++;
}
}
}
int num_ds = session.num_unbound_data_sources;
auto on_setup = opts.on_setup;
if (on_setup) {
backend.producer->OnStartupTracingSetup();
task_runner_->PostTask([on_setup, num_ds] {
on_setup(Tracing::OnStartupTracingSetupCallbackArgs{num_ds});
});
}
if (num_ds > 0) {
backend.startup_sessions.push_back(std::move(session));
if (opts.timeout_ms > 0) {
task_runner_->PostDelayedTask(
[this, session_id, backend_type] {
AbortStartupTracingSession(session_id, backend_type);
},
opts.timeout_ms);
}
}
return;
}
PERFETTO_DFATAL("Invalid startup tracing session backend");
});
return std::unique_ptr<StartupTracingSession>(
new StartupTracingSessionImpl(this, session_id, backend_type));
}
// Must not be called from the SDK's internal thread.
std::unique_ptr<StartupTracingSession>
TracingMuxerImpl::CreateStartupTracingSessionBlocking(
const TraceConfig& config,
Tracing::SetupStartupTracingOpts opts) {
auto previous_on_setup = std::move(opts.on_setup);
PERFETTO_CHECK(!task_runner_->RunsTasksOnCurrentThread());
base::WaitableEvent event;
// It is safe to capture by reference because once on_setup is called only
// once before this method returns.
opts.on_setup = [&](Tracing::OnStartupTracingSetupCallbackArgs args) {
if (previous_on_setup) {
previous_on_setup(std::move(args));
}
event.Notify();
};
auto session = CreateStartupTracingSession(config, std::move(opts));
event.Wait();
return session;
}
void TracingMuxerImpl::AbortStartupTracingSession(
TracingSessionGlobalID session_id,
BackendType backend_type) {
PERFETTO_DCHECK_THREAD(thread_checker_);
for (RegisteredProducerBackend& backend : producer_backends_) {
if (backend_type != backend.type)
continue;
auto session_it = std::find_if(
backend.startup_sessions.begin(), backend.startup_sessions.end(),
[session_id](const RegisteredStartupSession& session) {
return session.session_id == session_id;
});
// The startup session may have already been aborted or fully adopted.
if (session_it == backend.startup_sessions.end())
return;
if (session_it->is_aborting)
return;
session_it->is_aborting = true;
// Iterate all data sources and abort them if they weren't adopted yet.
for (const auto& rds : data_sources_) {
DataSourceStaticState* static_state = rds.static_state;
for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
auto* internal_state = static_state->TryGet(i);
if (internal_state &&
internal_state->startup_target_buffer_reservation.load(
std::memory_order_relaxed) &&
internal_state->data_source_instance_id == 0 &&
internal_state->startup_session_id == session_id) {
PERFETTO_DLOG(
"Aborting startup tracing for data source %s (target buffer "
"reservation %" PRIu16 ")",
rds.descriptor.name().c_str(),
internal_state->startup_target_buffer_reservation.load(
std::memory_order_relaxed));
// Abort the instance asynchronously by stopping it. From this point
// onwards, the service will not be able to adopt it via
// StartDataSource().
session_it->num_aborting_data_sources++;
StopDataSource_AsyncBeginImpl(
FindDataSourceRes(static_state, internal_state, i,
rds.requires_callbacks_under_lock));
}
}
}
// If we did everything right, we should have aborted all still-unbound data
// source instances.
PERFETTO_DCHECK(session_it->num_unbound_data_sources ==
session_it->num_aborting_data_sources);
if (session_it->num_aborting_data_sources == 0) {
if (session_it->on_aborted)
task_runner_->PostTask(session_it->on_aborted);
backend.startup_sessions.erase(session_it);
}
return;
}
// We might reach here in tests because when we start a trace, we post the
// Task(AbortStartupTrace, delay=timeout). When we do
// perfetto::ResetForTesting, we sweep dead backends, and we are not able to
// kill those delayed tasks because TaskRunner doesn't have support for
// deleting scheduled future tasks and TaskRunner doesn't have any API for us
// to wait for the completion of all the scheduled tasks (apart from
// deleting the TaskRunner) and we want to avoid doing that because we need
// a long running TaskRunner in muxer.
PERFETTO_DLOG("Invalid startup tracing session backend");
}
void TracingMuxerImpl::InitializeInstance(const TracingInitArgs& args) {
if (instance_ != TracingMuxerFake::Get()) {
// The tracing muxer was already initialized. We might need to initialize
// additional backends that were not configured earlier.
auto* muxer = static_cast<TracingMuxerImpl*>(instance_);
muxer->task_runner_->PostTask([muxer, args] { muxer->AddBackends(args); });
return;
}
// If we previously had a TracingMuxerImpl instance which was reset,
// reinitialize and reuse it instead of trying to create a new one. See
// ResetForTesting().
if (g_prev_instance) {
auto* muxer = g_prev_instance;
g_prev_instance = nullptr;
instance_ = muxer;
muxer->task_runner_->PostTask([muxer, args] {
muxer->Initialize(args);
muxer->AddBackends(args);
});
} else {
new TracingMuxerImpl(args);
}
}
// static
void TracingMuxerImpl::ResetForTesting() {
// Ideally we'd tear down the entire TracingMuxerImpl, but the lifetimes of
// various objects make that a non-starter. In particular:
//
// 1) Any thread that has entered a trace event has a TraceWriter, which holds
// a reference back to ProducerImpl::service_.
//
// 2) ProducerImpl::service_ has a reference back to the ProducerImpl.
//
// 3) ProducerImpl holds reference to TracingMuxerImpl::task_runner_, which in
// turn depends on TracingMuxerImpl itself.
//
// Because of this, it's not safe to deallocate TracingMuxerImpl until all
// threads have dropped their TraceWriters. Since we can't really ask the
// caller to guarantee this, we'll instead reset enough of the muxer's state
// so that it can be reinitialized later and ensure all necessary objects from
// the old state remain alive until all references have gone away.
auto* muxer = reinterpret_cast<TracingMuxerImpl*>(instance_);
base::WaitableEvent reset_done;
auto do_reset = [muxer, &reset_done] {
muxer->DestroyStoppedTraceWritersForCurrentThread();
// Unregister all data sources so they don't interfere with any future
// tracing sessions.
for (RegisteredDataSource& rds : muxer->data_sources_) {
for (RegisteredProducerBackend& backend : muxer->producer_backends_) {
if (!backend.producer->service_ || !backend.producer->connected_)
continue;
backend.producer->service_->UnregisterDataSource(rds.descriptor.name());
}
}
for (auto& backend : muxer->consumer_backends_) {
// Check that no consumer session is currently active on any backend.
for (auto& consumer : backend.consumers)
PERFETTO_CHECK(!consumer->service_);
}
for (auto& backend : muxer->producer_backends_) {
backend.producer->muxer_ = nullptr;
backend.producer->DisposeConnection();
muxer->dead_backends_.push_back(std::move(backend));
}
muxer->consumer_backends_.clear();
muxer->producer_backends_.clear();
muxer->interceptors_.clear();
for (auto& ds : muxer->data_sources_) {
ds.static_state->ResetForTesting();
}
muxer->data_sources_.clear();
muxer->next_data_source_index_ = 0;
// Free all backends without active trace writers or other inbound
// references. Note that even if all the backends get swept, the muxer still
// needs to stay around since |task_runner_| is assumed to be long-lived.
muxer->SweepDeadBackends();
// Make sure we eventually discard any per-thread trace writers from the
// previous instance.
muxer->muxer_id_for_testing_++;
g_prev_instance = muxer;
instance_ = TracingMuxerFake::Get();
// Call the user provided cleanups on the muxer thread.
for (auto& cb : muxer->reset_callbacks_) {
cb();
}
reset_done.Notify();
};
// Some tests run the muxer and the test on the same thread. In these cases,
// we can reset synchronously.
if (muxer->task_runner_->RunsTasksOnCurrentThread()) {
do_reset();
} else {
muxer->DestroyStoppedTraceWritersForCurrentThread();
muxer->task_runner_->PostTask(std::move(do_reset));
reset_done.Wait();
// Call the user provided cleanups also on this thread.
for (auto& cb : muxer->reset_callbacks_) {
cb();
}
}
muxer->reset_callbacks_.clear();
}
// static
void TracingMuxerImpl::Shutdown() {
auto* muxer = reinterpret_cast<TracingMuxerImpl*>(instance_);
// Shutting down on the muxer thread would lead to a deadlock.
PERFETTO_CHECK(!muxer->task_runner_->RunsTasksOnCurrentThread());
muxer->DestroyStoppedTraceWritersForCurrentThread();
std::unique_ptr<base::TaskRunner> owned_task_runner(
muxer->task_runner_.get());
base::WaitableEvent shutdown_done;
owned_task_runner->PostTask([muxer, &shutdown_done] {
// Check that no consumer session is currently active on any backend.
// Producers will be automatically disconnected as a part of deleting the
// muxer below.
for (auto& backend : muxer->consumer_backends_) {
for (auto& consumer : backend.consumers) {
PERFETTO_CHECK(!consumer->service_);
}
}
// Make sure no trace writers are lingering around on the muxer thread. Note
// that we can't do this for any arbitrary thread in the process; it is the
// caller's responsibility to clean them up before shutting down Perfetto.
muxer->DestroyStoppedTraceWritersForCurrentThread();
// The task runner must be deleted outside the muxer thread. This is done by
// `owned_task_runner` above.
muxer->task_runner_.release();
auto* platform = muxer->platform_;
delete muxer;
instance_ = TracingMuxerFake::Get();
platform->Shutdown();
shutdown_done.Notify();
});
shutdown_done.Wait();
}
void TracingMuxerImpl::AppendResetForTestingCallback(std::function<void()> cb) {
reset_callbacks_.push_back(std::move(cb));
}
TracingMuxer::~TracingMuxer() = default;
static_assert(std::is_same<internal::BufferId, BufferID>::value,
"public's BufferId and tracing/core's BufferID diverged");
} // namespace internal
} // namespace perfetto