| /* |
| * Copyright (C) 2019 The Android Open Source Project |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "src/tracing/internal/tracing_muxer_impl.h" |
| |
| #include <algorithm> |
| #include <atomic> |
| #include <mutex> |
| #include <optional> |
| #include <vector> |
| |
| #include "perfetto/base/build_config.h" |
| #include "perfetto/base/logging.h" |
| #include "perfetto/base/task_runner.h" |
| #include "perfetto/base/time.h" |
| #include "perfetto/ext/base/hash.h" |
| #include "perfetto/ext/base/thread_checker.h" |
| #include "perfetto/ext/base/waitable_event.h" |
| #include "perfetto/ext/tracing/core/shared_memory_arbiter.h" |
| #include "perfetto/ext/tracing/core/trace_packet.h" |
| #include "perfetto/ext/tracing/core/trace_stats.h" |
| #include "perfetto/ext/tracing/core/trace_writer.h" |
| #include "perfetto/ext/tracing/core/tracing_service.h" |
| #include "perfetto/tracing/buffer_exhausted_policy.h" |
| #include "perfetto/tracing/core/data_source_config.h" |
| #include "perfetto/tracing/core/tracing_service_state.h" |
| #include "perfetto/tracing/data_source.h" |
| #include "perfetto/tracing/internal/data_source_internal.h" |
| #include "perfetto/tracing/internal/interceptor_trace_writer.h" |
| #include "perfetto/tracing/internal/tracing_backend_fake.h" |
| #include "perfetto/tracing/trace_writer_base.h" |
| #include "perfetto/tracing/tracing.h" |
| #include "perfetto/tracing/tracing_backend.h" |
| #include "src/tracing/core/null_trace_writer.h" |
| #include "src/tracing/internal/tracing_muxer_fake.h" |
| |
| #include "protos/perfetto/config/interceptor_config.gen.h" |
| |
| #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) |
| #include <io.h> // For dup() |
| #else |
| #include <unistd.h> // For dup() |
| #endif |
| |
| namespace perfetto { |
| namespace internal { |
| |
| namespace { |
| |
| using RegisteredDataSource = TracingMuxerImpl::RegisteredDataSource; |
| |
| // A task runner which prevents calls to DataSource::Trace() while an operation |
| // is in progress. Used to guard against unexpected re-entrancy where the |
| // user-provided task runner implementation tries to enter a trace point under |
| // the hood. |
| class NonReentrantTaskRunner : public base::TaskRunner { |
| public: |
| NonReentrantTaskRunner(TracingMuxer* muxer, |
| std::unique_ptr<base::TaskRunner> task_runner) |
| : muxer_(muxer), task_runner_(std::move(task_runner)) {} |
| |
| // base::TaskRunner implementation. |
| void PostTask(std::function<void()> task) override { |
| CallWithGuard([&] { task_runner_->PostTask(std::move(task)); }); |
| } |
| |
| void PostDelayedTask(std::function<void()> task, uint32_t delay_ms) override { |
| CallWithGuard( |
| [&] { task_runner_->PostDelayedTask(std::move(task), delay_ms); }); |
| } |
| |
| void AddFileDescriptorWatch(base::PlatformHandle fd, |
| std::function<void()> callback) override { |
| CallWithGuard( |
| [&] { task_runner_->AddFileDescriptorWatch(fd, std::move(callback)); }); |
| } |
| |
| void RemoveFileDescriptorWatch(base::PlatformHandle fd) override { |
| CallWithGuard([&] { task_runner_->RemoveFileDescriptorWatch(fd); }); |
| } |
| |
| bool RunsTasksOnCurrentThread() const override { |
| bool result; |
| CallWithGuard([&] { result = task_runner_->RunsTasksOnCurrentThread(); }); |
| return result; |
| } |
| |
| private: |
| template <typename T> |
| void CallWithGuard(T lambda) const { |
| auto* root_tls = muxer_->GetOrCreateTracingTLS(); |
| if (PERFETTO_UNLIKELY(root_tls->is_in_trace_point)) { |
| lambda(); |
| return; |
| } |
| ScopedReentrancyAnnotator scoped_annotator(*root_tls); |
| lambda(); |
| } |
| |
| TracingMuxer* const muxer_; |
| std::unique_ptr<base::TaskRunner> task_runner_; |
| }; |
| |
| class StopArgsImpl : public DataSourceBase::StopArgs { |
| public: |
| std::function<void()> HandleStopAsynchronously() const override { |
| auto closure = std::move(async_stop_closure); |
| async_stop_closure = std::function<void()>(); |
| return closure; |
| } |
| |
| mutable std::function<void()> async_stop_closure; |
| }; |
| |
| class FlushArgsImpl : public DataSourceBase::FlushArgs { |
| public: |
| std::function<void()> HandleFlushAsynchronously() const override { |
| auto closure = std::move(async_flush_closure); |
| async_flush_closure = std::function<void()>(); |
| return closure; |
| } |
| |
| mutable std::function<void()> async_flush_closure; |
| }; |
| |
| // Holds an earlier TracingMuxerImpl instance after ResetForTesting() is called. |
| static TracingMuxerImpl* g_prev_instance{}; |
| |
| template <typename RegisteredBackend> |
| struct CompareBackendByType { |
| static int BackendTypePriority(BackendType type) { |
| switch (type) { |
| case kSystemBackend: |
| return 0; |
| case kInProcessBackend: |
| return 1; |
| case kCustomBackend: |
| return 2; |
| // The UnspecifiedBackend has the highest priority so that |
| // TracingBackendFake is the last one on the backend lists. |
| case kUnspecifiedBackend: |
| break; |
| } |
| return 3; |
| } |
| bool operator()(BackendType type, const RegisteredBackend& b) { |
| return BackendTypePriority(type) < BackendTypePriority(b.type); |
| } |
| }; |
| |
| } // namespace |
| |
| // ----- Begin of TracingMuxerImpl::ProducerImpl |
| TracingMuxerImpl::ProducerImpl::ProducerImpl( |
| TracingMuxerImpl* muxer, |
| TracingBackendId backend_id, |
| uint32_t shmem_batch_commits_duration_ms, |
| bool shmem_direct_patching_enabled) |
| : muxer_(muxer), |
| backend_id_(backend_id), |
| shmem_batch_commits_duration_ms_(shmem_batch_commits_duration_ms), |
| shmem_direct_patching_enabled_(shmem_direct_patching_enabled) {} |
| |
| TracingMuxerImpl::ProducerImpl::~ProducerImpl() { |
| muxer_ = nullptr; |
| } |
| |
| void TracingMuxerImpl::ProducerImpl::Initialize( |
| std::unique_ptr<ProducerEndpoint> endpoint) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| PERFETTO_DCHECK(!connected_); |
| connection_id_.fetch_add(1, std::memory_order_relaxed); |
| is_producer_provided_smb_ = endpoint->shared_memory(); |
| last_startup_target_buffer_reservation_ = 0; |
| |
| // Adopt the endpoint into a shared pointer so that we can safely share it |
| // across threads that create trace writers. The custom deleter function |
| // ensures that the endpoint is always destroyed on the muxer's thread. (Note |
| // that |task_runner| is assumed to outlive tracing sessions on all threads.) |
| auto* task_runner = muxer_->task_runner_.get(); |
| auto deleter = [task_runner](ProducerEndpoint* e) { |
| if (task_runner->RunsTasksOnCurrentThread()) { |
| delete e; |
| return; |
| } |
| task_runner->PostTask([e] { delete e; }); |
| }; |
| std::shared_ptr<ProducerEndpoint> service(endpoint.release(), deleter); |
| // This atomic store is needed because another thread might be concurrently |
| // creating a trace writer using the previous (disconnected) |service_|. See |
| // CreateTraceWriter(). |
| std::atomic_store(&service_, std::move(service)); |
| // Don't try to use the service here since it may not have connected yet. See |
| // OnConnect(). |
| } |
| |
| void TracingMuxerImpl::ProducerImpl::OnConnect() { |
| PERFETTO_DLOG("Producer connected"); |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| PERFETTO_DCHECK(!connected_); |
| if (is_producer_provided_smb_ && !service_->IsShmemProvidedByProducer()) { |
| PERFETTO_ELOG( |
| "The service likely doesn't support producer-provided SMBs. Preventing " |
| "future attempts to use producer-provided SMB again with this " |
| "backend."); |
| producer_provided_smb_failed_ = true; |
| // Will call OnDisconnect() and cause a reconnect without producer-provided |
| // SMB. |
| service_->Disconnect(); |
| return; |
| } |
| connected_ = true; |
| muxer_->UpdateDataSourcesOnAllBackends(); |
| SendOnConnectTriggers(); |
| } |
| |
| void TracingMuxerImpl::ProducerImpl::OnDisconnect() { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| // If we're being destroyed, bail out. |
| if (!muxer_) |
| return; |
| connected_ = false; |
| // Active data sources for this producer will be stopped by |
| // DestroyStoppedTraceWritersForCurrentThread() since the reconnected producer |
| // will have a different connection id (even before it has finished |
| // connecting). |
| registered_data_sources_.reset(); |
| DisposeConnection(); |
| |
| // Try reconnecting the producer. |
| muxer_->OnProducerDisconnected(this); |
| } |
| |
| void TracingMuxerImpl::ProducerImpl::DisposeConnection() { |
| // Keep the old service around as a dead connection in case it has active |
| // trace writers. If any tracing sessions were created, we can't clear |
| // |service_| here because other threads may be concurrently creating new |
| // trace writers. Any reconnection attempt will atomically swap the new |
| // service in place of the old one. |
| if (did_setup_tracing_ || did_setup_startup_tracing_) { |
| dead_services_.push_back(service_); |
| } else { |
| service_.reset(); |
| } |
| } |
| |
| void TracingMuxerImpl::ProducerImpl::OnTracingSetup() { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| did_setup_tracing_ = true; |
| service_->MaybeSharedMemoryArbiter()->SetBatchCommitsDuration( |
| shmem_batch_commits_duration_ms_); |
| if (shmem_direct_patching_enabled_) { |
| service_->MaybeSharedMemoryArbiter()->EnableDirectSMBPatching(); |
| } |
| } |
| |
| void TracingMuxerImpl::ProducerImpl::OnStartupTracingSetup() { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| did_setup_startup_tracing_ = true; |
| } |
| |
| void TracingMuxerImpl::ProducerImpl::SetupDataSource( |
| DataSourceInstanceID id, |
| const DataSourceConfig& cfg) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| if (!muxer_) |
| return; |
| muxer_->SetupDataSource( |
| backend_id_, connection_id_.load(std::memory_order_relaxed), id, cfg); |
| } |
| |
| void TracingMuxerImpl::ProducerImpl::StartDataSource(DataSourceInstanceID id, |
| const DataSourceConfig&) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| if (!muxer_) |
| return; |
| muxer_->StartDataSource(backend_id_, id); |
| service_->NotifyDataSourceStarted(id); |
| } |
| |
| void TracingMuxerImpl::ProducerImpl::StopDataSource(DataSourceInstanceID id) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| if (!muxer_) |
| return; |
| muxer_->StopDataSource_AsyncBegin(backend_id_, id); |
| } |
| |
| void TracingMuxerImpl::ProducerImpl::Flush( |
| FlushRequestID flush_id, |
| const DataSourceInstanceID* instances, |
| size_t instance_count, |
| FlushFlags flush_flags) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| bool all_handled = true; |
| if (muxer_) { |
| for (size_t i = 0; i < instance_count; i++) { |
| DataSourceInstanceID ds_id = instances[i]; |
| bool handled = muxer_->FlushDataSource_AsyncBegin(backend_id_, ds_id, |
| flush_id, flush_flags); |
| if (!handled) { |
| pending_flushes_[flush_id].insert(ds_id); |
| all_handled = false; |
| } |
| } |
| } |
| |
| if (all_handled) { |
| service_->NotifyFlushComplete(flush_id); |
| } |
| } |
| |
| void TracingMuxerImpl::ProducerImpl::ClearIncrementalState( |
| const DataSourceInstanceID* instances, |
| size_t instance_count) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| if (!muxer_) |
| return; |
| for (size_t inst_idx = 0; inst_idx < instance_count; inst_idx++) { |
| muxer_->ClearDataSourceIncrementalState(backend_id_, instances[inst_idx]); |
| } |
| } |
| |
| bool TracingMuxerImpl::ProducerImpl::SweepDeadServices() { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| auto is_unused = [](const std::shared_ptr<ProducerEndpoint>& endpoint) { |
| auto* arbiter = endpoint->MaybeSharedMemoryArbiter(); |
| return !arbiter || arbiter->TryShutdown(); |
| }; |
| for (auto it = dead_services_.begin(); it != dead_services_.end();) { |
| auto next_it = it; |
| next_it++; |
| if (is_unused(*it)) { |
| dead_services_.erase(it); |
| } |
| it = next_it; |
| } |
| return dead_services_.empty(); |
| } |
| |
| void TracingMuxerImpl::ProducerImpl::SendOnConnectTriggers() { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| base::TimeMillis now = base::GetWallTimeMs(); |
| std::vector<std::string> triggers; |
| while (!on_connect_triggers_.empty()) { |
| // Skip if we passed TTL. |
| if (on_connect_triggers_.front().second > now) { |
| triggers.push_back(std::move(on_connect_triggers_.front().first)); |
| } |
| on_connect_triggers_.pop_front(); |
| } |
| if (!triggers.empty()) { |
| service_->ActivateTriggers(triggers); |
| } |
| } |
| |
| void TracingMuxerImpl::ProducerImpl::NotifyFlushForDataSourceDone( |
| DataSourceInstanceID ds_id, |
| FlushRequestID flush_id) { |
| if (!connected_) { |
| return; |
| } |
| |
| { |
| auto it = pending_flushes_.find(flush_id); |
| if (it == pending_flushes_.end()) { |
| return; |
| } |
| std::set<DataSourceInstanceID>& ds_ids = it->second; |
| ds_ids.erase(ds_id); |
| } |
| |
| std::optional<DataSourceInstanceID> biggest_flush_id; |
| for (auto it = pending_flushes_.begin(); it != pending_flushes_.end();) { |
| if (it->second.empty()) { |
| biggest_flush_id = it->first; |
| it = pending_flushes_.erase(it); |
| } else { |
| break; |
| } |
| } |
| |
| if (biggest_flush_id) { |
| service_->NotifyFlushComplete(*biggest_flush_id); |
| } |
| } |
| |
| // ----- End of TracingMuxerImpl::ProducerImpl methods. |
| |
| // ----- Begin of TracingMuxerImpl::ConsumerImpl |
| TracingMuxerImpl::ConsumerImpl::ConsumerImpl(TracingMuxerImpl* muxer, |
| BackendType backend_type, |
| TracingSessionGlobalID session_id) |
| : muxer_(muxer), backend_type_(backend_type), session_id_(session_id) {} |
| |
| TracingMuxerImpl::ConsumerImpl::~ConsumerImpl() { |
| muxer_ = nullptr; |
| } |
| |
| void TracingMuxerImpl::ConsumerImpl::Initialize( |
| std::unique_ptr<ConsumerEndpoint> endpoint) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| service_ = std::move(endpoint); |
| // Don't try to use the service here since it may not have connected yet. See |
| // OnConnect(). |
| } |
| |
| void TracingMuxerImpl::ConsumerImpl::OnConnect() { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| PERFETTO_DCHECK(!connected_); |
| connected_ = true; |
| |
| // Observe data source instance events so we get notified when tracing starts. |
| service_->ObserveEvents(ObservableEvents::TYPE_DATA_SOURCES_INSTANCES | |
| ObservableEvents::TYPE_ALL_DATA_SOURCES_STARTED); |
| |
| // If the API client configured and started tracing before we connected, |
| // tell the backend about it now. |
| if (trace_config_) |
| muxer_->SetupTracingSession(session_id_, trace_config_); |
| if (start_pending_) |
| muxer_->StartTracingSession(session_id_); |
| if (get_trace_stats_pending_) { |
| auto callback = std::move(get_trace_stats_callback_); |
| get_trace_stats_callback_ = nullptr; |
| muxer_->GetTraceStats(session_id_, std::move(callback)); |
| } |
| if (query_service_state_callback_) { |
| auto callback = std::move(query_service_state_callback_); |
| query_service_state_callback_ = nullptr; |
| muxer_->QueryServiceState(session_id_, std::move(callback)); |
| } |
| if (session_to_clone_) { |
| service_->CloneSession(*session_to_clone_); |
| session_to_clone_ = std::nullopt; |
| } |
| |
| if (stop_pending_) |
| muxer_->StopTracingSession(session_id_); |
| } |
| |
| void TracingMuxerImpl::ConsumerImpl::OnDisconnect() { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| // If we're being destroyed, bail out. |
| if (!muxer_) |
| return; |
| #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) |
| if (!connected_ && backend_type_ == kSystemBackend) { |
| PERFETTO_ELOG( |
| "Unable to connect to the system tracing service as a consumer. On " |
| "Android, use the \"perfetto\" command line tool instead to start " |
| "system-wide tracing sessions"); |
| } |
| #endif // PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID) |
| |
| // Notify the client about disconnection. |
| NotifyError(TracingError{TracingError::kDisconnected, "Peer disconnected"}); |
| |
| // Make sure the client doesn't hang in a blocking start/stop because of the |
| // disconnection. |
| NotifyStartComplete(); |
| NotifyStopComplete(); |
| |
| // It shouldn't be necessary to call StopTracingSession. If we get this call |
| // it means that the service did shutdown before us, so there is no point |
| // trying it to ask it to stop the session. We should just remember to cleanup |
| // the consumer vector. |
| connected_ = false; |
| |
| // Notify the muxer that it is safe to destroy |this|. This is needed because |
| // the ConsumerEndpoint stored in |service_| requires that |this| be safe to |
| // access until OnDisconnect() is called. |
| muxer_->OnConsumerDisconnected(this); |
| } |
| |
| void TracingMuxerImpl::ConsumerImpl::Disconnect() { |
| // This is weird and deserves a comment. |
| // |
| // When we called the ConnectConsumer method on the service it returns |
| // us a ConsumerEndpoint which we stored in |service_|, however this |
| // ConsumerEndpoint holds a pointer to the ConsumerImpl pointed to by |
| // |this|. Part of the API contract to TracingService::ConnectConsumer is that |
| // the ConsumerImpl pointer has to be valid until the |
| // ConsumerImpl::OnDisconnect method is called. Therefore we reset the |
| // ConsumerEndpoint |service_|. Eventually this will call |
| // ConsumerImpl::OnDisconnect and we will inform the muxer it is safe to |
| // call the destructor of |this|. |
| service_.reset(); |
| } |
| |
| void TracingMuxerImpl::ConsumerImpl::OnTracingDisabled( |
| const std::string& error) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| PERFETTO_DCHECK(!stopped_); |
| stopped_ = true; |
| |
| if (!error.empty()) |
| NotifyError(TracingError{TracingError::kTracingFailed, error}); |
| |
| // If we're still waiting for the start event, fire it now. This may happen if |
| // there are no active data sources in the session. |
| NotifyStartComplete(); |
| NotifyStopComplete(); |
| } |
| |
| void TracingMuxerImpl::ConsumerImpl::NotifyStartComplete() { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| if (start_complete_callback_) { |
| muxer_->task_runner_->PostTask(std::move(start_complete_callback_)); |
| start_complete_callback_ = nullptr; |
| } |
| if (blocking_start_complete_callback_) { |
| muxer_->task_runner_->PostTask( |
| std::move(blocking_start_complete_callback_)); |
| blocking_start_complete_callback_ = nullptr; |
| } |
| } |
| |
| void TracingMuxerImpl::ConsumerImpl::NotifyError(const TracingError& error) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| if (error_callback_) { |
| muxer_->task_runner_->PostTask( |
| std::bind(std::move(error_callback_), error)); |
| } |
| } |
| |
| void TracingMuxerImpl::ConsumerImpl::NotifyStopComplete() { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| if (stop_complete_callback_) { |
| muxer_->task_runner_->PostTask(std::move(stop_complete_callback_)); |
| stop_complete_callback_ = nullptr; |
| } |
| if (blocking_stop_complete_callback_) { |
| muxer_->task_runner_->PostTask(std::move(blocking_stop_complete_callback_)); |
| blocking_stop_complete_callback_ = nullptr; |
| } |
| } |
| |
| void TracingMuxerImpl::ConsumerImpl::OnTraceData( |
| std::vector<TracePacket> packets, |
| bool has_more) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| if (!read_trace_callback_) |
| return; |
| |
| size_t capacity = 0; |
| for (const auto& packet : packets) { |
| // 16 is an over-estimation of the proto preamble size |
| capacity += packet.size() + 16; |
| } |
| |
| // The shared_ptr is to avoid making a copy of the buffer when PostTask-ing. |
| std::shared_ptr<std::vector<char>> buf(new std::vector<char>()); |
| buf->reserve(capacity); |
| for (auto& packet : packets) { |
| char* start; |
| size_t size; |
| std::tie(start, size) = packet.GetProtoPreamble(); |
| buf->insert(buf->end(), start, start + size); |
| for (auto& slice : packet.slices()) { |
| const auto* slice_data = reinterpret_cast<const char*>(slice.start); |
| buf->insert(buf->end(), slice_data, slice_data + slice.size); |
| } |
| } |
| |
| auto callback = read_trace_callback_; |
| muxer_->task_runner_->PostTask([callback, buf, has_more] { |
| TracingSession::ReadTraceCallbackArgs callback_arg{}; |
| callback_arg.data = buf->empty() ? nullptr : &(*buf)[0]; |
| callback_arg.size = buf->size(); |
| callback_arg.has_more = has_more; |
| callback(callback_arg); |
| }); |
| |
| if (!has_more) |
| read_trace_callback_ = nullptr; |
| } |
| |
| void TracingMuxerImpl::ConsumerImpl::OnObservableEvents( |
| const ObservableEvents& events) { |
| if (events.instance_state_changes_size()) { |
| for (const auto& state_change : events.instance_state_changes()) { |
| DataSourceHandle handle{state_change.producer_name(), |
| state_change.data_source_name()}; |
| data_source_states_[handle] = |
| state_change.state() == |
| ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STARTED; |
| } |
| } |
| |
| if (events.instance_state_changes_size() || |
| events.all_data_sources_started()) { |
| // Data sources are first reported as being stopped before starting, so once |
| // all the data sources we know about have started we can declare tracing |
| // begun. In the case where there are no matching data sources for the |
| // session, the service will report the all_data_sources_started() event |
| // without adding any instances (only since Android S / Perfetto v10.0). |
| if (start_complete_callback_ || blocking_start_complete_callback_) { |
| bool all_data_sources_started = std::all_of( |
| data_source_states_.cbegin(), data_source_states_.cend(), |
| [](std::pair<DataSourceHandle, bool> state) { return state.second; }); |
| if (all_data_sources_started) |
| NotifyStartComplete(); |
| } |
| } |
| } |
| |
| void TracingMuxerImpl::ConsumerImpl::OnSessionCloned( |
| const OnSessionClonedArgs& args) { |
| if (!clone_trace_callback_) |
| return; |
| TracingSession::CloneTraceCallbackArgs callback_arg{}; |
| callback_arg.success = args.success; |
| callback_arg.error = std::move(args.error); |
| callback_arg.uuid_msb = args.uuid.msb(); |
| callback_arg.uuid_lsb = args.uuid.lsb(); |
| muxer_->task_runner_->PostTask( |
| std::bind(std::move(clone_trace_callback_), std::move(callback_arg))); |
| clone_trace_callback_ = nullptr; |
| } |
| |
| void TracingMuxerImpl::ConsumerImpl::OnTraceStats( |
| bool success, |
| const TraceStats& trace_stats) { |
| if (!get_trace_stats_callback_) |
| return; |
| TracingSession::GetTraceStatsCallbackArgs callback_arg{}; |
| callback_arg.success = success; |
| callback_arg.trace_stats_data = trace_stats.SerializeAsArray(); |
| muxer_->task_runner_->PostTask( |
| std::bind(std::move(get_trace_stats_callback_), std::move(callback_arg))); |
| get_trace_stats_callback_ = nullptr; |
| } |
| |
| // The callbacks below are not used. |
| void TracingMuxerImpl::ConsumerImpl::OnDetach(bool) {} |
| void TracingMuxerImpl::ConsumerImpl::OnAttach(bool, const TraceConfig&) {} |
| // ----- End of TracingMuxerImpl::ConsumerImpl |
| |
| // ----- Begin of TracingMuxerImpl::TracingSessionImpl |
| |
| // TracingSessionImpl is the RAII object returned to API clients when they |
| // invoke Tracing::CreateTracingSession. They use it for starting/stopping |
| // tracing. |
| |
| TracingMuxerImpl::TracingSessionImpl::TracingSessionImpl( |
| TracingMuxerImpl* muxer, |
| TracingSessionGlobalID session_id, |
| BackendType backend_type) |
| : muxer_(muxer), session_id_(session_id), backend_type_(backend_type) {} |
| |
| // Can be destroyed from any thread. |
| TracingMuxerImpl::TracingSessionImpl::~TracingSessionImpl() { |
| auto* muxer = muxer_; |
| auto session_id = session_id_; |
| muxer->task_runner_->PostTask( |
| [muxer, session_id] { muxer->DestroyTracingSession(session_id); }); |
| } |
| |
| // Can be called from any thread. |
| void TracingMuxerImpl::TracingSessionImpl::Setup(const TraceConfig& cfg, |
| int fd) { |
| auto* muxer = muxer_; |
| auto session_id = session_id_; |
| std::shared_ptr<TraceConfig> trace_config(new TraceConfig(cfg)); |
| if (fd >= 0) { |
| base::ignore_result(backend_type_); // For -Wunused in the amalgamation. |
| #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN) |
| if (backend_type_ != kInProcessBackend) { |
| PERFETTO_FATAL( |
| "Passing a file descriptor to TracingSession::Setup() is only " |
| "supported with the kInProcessBackend on Windows. Use " |
| "TracingSession::ReadTrace() instead"); |
| } |
| #endif |
| trace_config->set_write_into_file(true); |
| fd = dup(fd); |
| } |
| muxer->task_runner_->PostTask([muxer, session_id, trace_config, fd] { |
| muxer->SetupTracingSession(session_id, trace_config, base::ScopedFile(fd)); |
| }); |
| } |
| |
| // Can be called from any thread. |
| void TracingMuxerImpl::TracingSessionImpl::Start() { |
| auto* muxer = muxer_; |
| auto session_id = session_id_; |
| muxer->task_runner_->PostTask( |
| [muxer, session_id] { muxer->StartTracingSession(session_id); }); |
| } |
| |
| void TracingMuxerImpl::TracingSessionImpl::CloneTrace(CloneTraceArgs args, |
| CloneTraceCallback cb) { |
| auto* muxer = muxer_; |
| auto session_id = session_id_; |
| muxer->task_runner_->PostTask([muxer, session_id, args, cb] { |
| muxer->CloneTracingSession(session_id, args, std::move(cb)); |
| }); |
| } |
| |
| // Can be called from any thread. |
| void TracingMuxerImpl::TracingSessionImpl::ChangeTraceConfig( |
| const TraceConfig& cfg) { |
| auto* muxer = muxer_; |
| auto session_id = session_id_; |
| muxer->task_runner_->PostTask([muxer, session_id, cfg] { |
| muxer->ChangeTracingSessionConfig(session_id, cfg); |
| }); |
| } |
| |
| // Can be called from any thread except the service thread. |
| void TracingMuxerImpl::TracingSessionImpl::StartBlocking() { |
| PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread()); |
| auto* muxer = muxer_; |
| auto session_id = session_id_; |
| base::WaitableEvent tracing_started; |
| muxer->task_runner_->PostTask([muxer, session_id, &tracing_started] { |
| auto* consumer = muxer->FindConsumer(session_id); |
| if (!consumer) { |
| // TODO(skyostil): Signal an error to the user. |
| tracing_started.Notify(); |
| return; |
| } |
| PERFETTO_DCHECK(!consumer->blocking_start_complete_callback_); |
| consumer->blocking_start_complete_callback_ = [&] { |
| tracing_started.Notify(); |
| }; |
| muxer->StartTracingSession(session_id); |
| }); |
| tracing_started.Wait(); |
| } |
| |
| // Can be called from any thread. |
| void TracingMuxerImpl::TracingSessionImpl::Flush( |
| std::function<void(bool)> user_callback, |
| uint32_t timeout_ms) { |
| auto* muxer = muxer_; |
| auto session_id = session_id_; |
| muxer->task_runner_->PostTask([muxer, session_id, timeout_ms, user_callback] { |
| auto* consumer = muxer->FindConsumer(session_id); |
| if (!consumer) { |
| std::move(user_callback)(false); |
| return; |
| } |
| muxer->FlushTracingSession(session_id, timeout_ms, |
| std::move(user_callback)); |
| }); |
| } |
| |
| // Can be called from any thread. |
| void TracingMuxerImpl::TracingSessionImpl::Stop() { |
| auto* muxer = muxer_; |
| auto session_id = session_id_; |
| muxer->task_runner_->PostTask( |
| [muxer, session_id] { muxer->StopTracingSession(session_id); }); |
| } |
| |
| // Can be called from any thread except the service thread. |
| void TracingMuxerImpl::TracingSessionImpl::StopBlocking() { |
| PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread()); |
| auto* muxer = muxer_; |
| auto session_id = session_id_; |
| base::WaitableEvent tracing_stopped; |
| muxer->task_runner_->PostTask([muxer, session_id, &tracing_stopped] { |
| auto* consumer = muxer->FindConsumer(session_id); |
| if (!consumer) { |
| // TODO(skyostil): Signal an error to the user. |
| tracing_stopped.Notify(); |
| return; |
| } |
| PERFETTO_DCHECK(!consumer->blocking_stop_complete_callback_); |
| consumer->blocking_stop_complete_callback_ = [&] { |
| tracing_stopped.Notify(); |
| }; |
| muxer->StopTracingSession(session_id); |
| }); |
| tracing_stopped.Wait(); |
| } |
| |
| // Can be called from any thread. |
| void TracingMuxerImpl::TracingSessionImpl::ReadTrace(ReadTraceCallback cb) { |
| auto* muxer = muxer_; |
| auto session_id = session_id_; |
| muxer->task_runner_->PostTask([muxer, session_id, cb] { |
| muxer->ReadTracingSessionData(session_id, std::move(cb)); |
| }); |
| } |
| |
| // Can be called from any thread. |
| void TracingMuxerImpl::TracingSessionImpl::SetOnStartCallback( |
| std::function<void()> cb) { |
| auto* muxer = muxer_; |
| auto session_id = session_id_; |
| muxer->task_runner_->PostTask([muxer, session_id, cb] { |
| auto* consumer = muxer->FindConsumer(session_id); |
| if (!consumer) |
| return; |
| consumer->start_complete_callback_ = cb; |
| }); |
| } |
| |
| // Can be called from any thread |
| void TracingMuxerImpl::TracingSessionImpl::SetOnErrorCallback( |
| std::function<void(TracingError)> cb) { |
| auto* muxer = muxer_; |
| auto session_id = session_id_; |
| muxer->task_runner_->PostTask([muxer, session_id, cb] { |
| auto* consumer = muxer->FindConsumer(session_id); |
| if (!consumer) { |
| // Notify the client about concurrent disconnection of the session. |
| if (cb) |
| cb(TracingError{TracingError::kDisconnected, "Peer disconnected"}); |
| return; |
| } |
| consumer->error_callback_ = cb; |
| }); |
| } |
| |
| // Can be called from any thread. |
| void TracingMuxerImpl::TracingSessionImpl::SetOnStopCallback( |
| std::function<void()> cb) { |
| auto* muxer = muxer_; |
| auto session_id = session_id_; |
| muxer->task_runner_->PostTask([muxer, session_id, cb] { |
| auto* consumer = muxer->FindConsumer(session_id); |
| if (!consumer) |
| return; |
| consumer->stop_complete_callback_ = cb; |
| }); |
| } |
| |
| // Can be called from any thread. |
| void TracingMuxerImpl::TracingSessionImpl::GetTraceStats( |
| GetTraceStatsCallback cb) { |
| auto* muxer = muxer_; |
| auto session_id = session_id_; |
| muxer->task_runner_->PostTask([muxer, session_id, cb] { |
| muxer->GetTraceStats(session_id, std::move(cb)); |
| }); |
| } |
| |
| // Can be called from any thread. |
| void TracingMuxerImpl::TracingSessionImpl::QueryServiceState( |
| QueryServiceStateCallback cb) { |
| auto* muxer = muxer_; |
| auto session_id = session_id_; |
| muxer->task_runner_->PostTask([muxer, session_id, cb] { |
| muxer->QueryServiceState(session_id, std::move(cb)); |
| }); |
| } |
| |
| // ----- End of TracingMuxerImpl::TracingSessionImpl |
| |
| // ----- Begin of TracingMuxerImpl::StartupTracingSessionImpl |
| |
| TracingMuxerImpl::StartupTracingSessionImpl::StartupTracingSessionImpl( |
| TracingMuxerImpl* muxer, |
| TracingSessionGlobalID session_id, |
| BackendType backend_type) |
| : muxer_(muxer), session_id_(session_id), backend_type_(backend_type) {} |
| |
| // Can be destroyed from any thread. |
| TracingMuxerImpl::StartupTracingSessionImpl::~StartupTracingSessionImpl() = |
| default; |
| |
| void TracingMuxerImpl::StartupTracingSessionImpl::Abort() { |
| auto* muxer = muxer_; |
| auto session_id = session_id_; |
| auto backend_type = backend_type_; |
| muxer->task_runner_->PostTask([muxer, session_id, backend_type] { |
| muxer->AbortStartupTracingSession(session_id, backend_type); |
| }); |
| } |
| |
| // Must not be called from the SDK's internal thread. |
| void TracingMuxerImpl::StartupTracingSessionImpl::AbortBlocking() { |
| auto* muxer = muxer_; |
| auto session_id = session_id_; |
| auto backend_type = backend_type_; |
| PERFETTO_CHECK(!muxer->task_runner_->RunsTasksOnCurrentThread()); |
| base::WaitableEvent event; |
| muxer->task_runner_->PostTask([muxer, session_id, backend_type, &event] { |
| muxer->AbortStartupTracingSession(session_id, backend_type); |
| event.Notify(); |
| }); |
| event.Wait(); |
| } |
| |
| // ----- End of TracingMuxerImpl::StartupTracingSessionImpl |
| |
| // static |
| TracingMuxer* TracingMuxer::instance_ = TracingMuxerFake::Get(); |
| |
| // This is called by perfetto::Tracing::Initialize(). |
| // Can be called on any thread. Typically, but not necessarily, that will be |
| // the embedder's main thread. |
| TracingMuxerImpl::TracingMuxerImpl(const TracingInitArgs& args) |
| : TracingMuxer(args.platform ? args.platform |
| : Platform::GetDefaultPlatform()) { |
| PERFETTO_DETACH_FROM_THREAD(thread_checker_); |
| instance_ = this; |
| |
| // Create the thread where muxer, producers and service will live. |
| Platform::CreateTaskRunnerArgs tr_args{/*name_for_debugging=*/"TracingMuxer"}; |
| task_runner_.reset(new NonReentrantTaskRunner( |
| this, platform_->CreateTaskRunner(std::move(tr_args)))); |
| |
| // Run the initializer on that thread. |
| task_runner_->PostTask([this, args] { |
| Initialize(args); |
| AddBackends(args); |
| }); |
| } |
| |
| void TracingMuxerImpl::Initialize(const TracingInitArgs& args) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); // Rebind the thread checker. |
| |
| policy_ = args.tracing_policy; |
| supports_multiple_data_source_instances_ = |
| args.supports_multiple_data_source_instances; |
| |
| // Fallback backend for producer creation for an unsupported backend type. |
| PERFETTO_CHECK(producer_backends_.empty()); |
| AddProducerBackend(internal::TracingBackendFake::GetInstance(), |
| BackendType::kUnspecifiedBackend, args); |
| // Fallback backend for consumer creation for an unsupported backend type. |
| // This backend simply fails any attempt to start a tracing session. |
| PERFETTO_CHECK(consumer_backends_.empty()); |
| AddConsumerBackend(internal::TracingBackendFake::GetInstance(), |
| BackendType::kUnspecifiedBackend); |
| } |
| |
| void TracingMuxerImpl::AddConsumerBackend(TracingConsumerBackend* backend, |
| BackendType type) { |
| if (!backend) { |
| // We skip the log in release builds because the *_backend_fake.cc code |
| // has already an ELOG before returning a nullptr. |
| PERFETTO_DLOG("Consumer backend creation failed, type %d", |
| static_cast<int>(type)); |
| return; |
| } |
| // Keep the backends sorted by type. |
| auto it = |
| std::upper_bound(consumer_backends_.begin(), consumer_backends_.end(), |
| type, CompareBackendByType<RegisteredConsumerBackend>()); |
| it = consumer_backends_.emplace(it); |
| |
| RegisteredConsumerBackend& rb = *it; |
| rb.backend = backend; |
| rb.type = type; |
| } |
| |
| void TracingMuxerImpl::AddProducerBackend(TracingProducerBackend* backend, |
| BackendType type, |
| const TracingInitArgs& args) { |
| if (!backend) { |
| // We skip the log in release builds because the *_backend_fake.cc code |
| // has already an ELOG before returning a nullptr. |
| PERFETTO_DLOG("Producer backend creation failed, type %d", |
| static_cast<int>(type)); |
| return; |
| } |
| TracingBackendId backend_id = producer_backends_.size(); |
| // Keep the backends sorted by type. |
| auto it = |
| std::upper_bound(producer_backends_.begin(), producer_backends_.end(), |
| type, CompareBackendByType<RegisteredProducerBackend>()); |
| it = producer_backends_.emplace(it); |
| |
| RegisteredProducerBackend& rb = *it; |
| rb.backend = backend; |
| rb.id = backend_id; |
| rb.type = type; |
| rb.producer.reset(new ProducerImpl(this, backend_id, |
| args.shmem_batch_commits_duration_ms, |
| args.shmem_direct_patching_enabled)); |
| rb.producer_conn_args.producer = rb.producer.get(); |
| rb.producer_conn_args.producer_name = platform_->GetCurrentProcessName(); |
| rb.producer_conn_args.task_runner = task_runner_.get(); |
| rb.producer_conn_args.shmem_size_hint_bytes = args.shmem_size_hint_kb * 1024; |
| rb.producer_conn_args.shmem_page_size_hint_bytes = |
| args.shmem_page_size_hint_kb * 1024; |
| rb.producer_conn_args.create_socket_async = args.create_socket_async; |
| rb.producer->Initialize(rb.backend->ConnectProducer(rb.producer_conn_args)); |
| } |
| |
| TracingMuxerImpl::RegisteredProducerBackend* |
| TracingMuxerImpl::FindProducerBackendById(TracingBackendId id) { |
| for (RegisteredProducerBackend& b : producer_backends_) { |
| if (b.id == id) { |
| return &b; |
| } |
| } |
| return nullptr; |
| } |
| |
| TracingMuxerImpl::RegisteredProducerBackend* |
| TracingMuxerImpl::FindProducerBackendByType(BackendType type) { |
| for (RegisteredProducerBackend& b : producer_backends_) { |
| if (b.type == type) { |
| return &b; |
| } |
| } |
| return nullptr; |
| } |
| |
| TracingMuxerImpl::RegisteredConsumerBackend* |
| TracingMuxerImpl::FindConsumerBackendByType(BackendType type) { |
| for (RegisteredConsumerBackend& b : consumer_backends_) { |
| if (b.type == type) { |
| return &b; |
| } |
| } |
| return nullptr; |
| } |
| |
| void TracingMuxerImpl::AddBackends(const TracingInitArgs& args) { |
| if (args.backends & kSystemBackend) { |
| PERFETTO_CHECK(args.system_producer_backend_factory_); |
| if (FindProducerBackendByType(kSystemBackend) == nullptr) { |
| AddProducerBackend(args.system_producer_backend_factory_(), |
| kSystemBackend, args); |
| } |
| if (args.enable_system_consumer) { |
| PERFETTO_CHECK(args.system_consumer_backend_factory_); |
| if (FindConsumerBackendByType(kSystemBackend) == nullptr) { |
| AddConsumerBackend(args.system_consumer_backend_factory_(), |
| kSystemBackend); |
| } |
| } |
| } |
| |
| if (args.backends & kInProcessBackend) { |
| TracingBackend* b = nullptr; |
| if (FindProducerBackendByType(kInProcessBackend) == nullptr) { |
| if (!b) { |
| PERFETTO_CHECK(args.in_process_backend_factory_); |
| b = args.in_process_backend_factory_(); |
| } |
| AddProducerBackend(b, kInProcessBackend, args); |
| } |
| if (FindConsumerBackendByType(kInProcessBackend) == nullptr) { |
| if (!b) { |
| PERFETTO_CHECK(args.in_process_backend_factory_); |
| b = args.in_process_backend_factory_(); |
| } |
| AddConsumerBackend(b, kInProcessBackend); |
| } |
| } |
| |
| if (args.backends & kCustomBackend) { |
| PERFETTO_CHECK(args.custom_backend); |
| if (FindProducerBackendByType(kCustomBackend) == nullptr) { |
| AddProducerBackend(args.custom_backend, kCustomBackend, args); |
| } |
| if (FindConsumerBackendByType(kCustomBackend) == nullptr) { |
| AddConsumerBackend(args.custom_backend, kCustomBackend); |
| } |
| } |
| |
| if (args.backends & ~(kSystemBackend | kInProcessBackend | kCustomBackend)) { |
| PERFETTO_FATAL("Unsupported tracing backend type"); |
| } |
| } |
| |
| // Can be called from any thread (but not concurrently). |
| bool TracingMuxerImpl::RegisterDataSource( |
| const DataSourceDescriptor& descriptor, |
| DataSourceFactory factory, |
| DataSourceParams params, |
| bool no_flush, |
| DataSourceStaticState* static_state) { |
| // Ignore repeated registrations. |
| if (static_state->index != kMaxDataSources) |
| return true; |
| |
| uint32_t new_index = next_data_source_index_++; |
| if (new_index >= kMaxDataSources) { |
| PERFETTO_DLOG( |
| "RegisterDataSource failed: too many data sources already registered"); |
| return false; |
| } |
| |
| // Initialize the static state. |
| static_assert(sizeof(static_state->instances[0]) >= sizeof(DataSourceState), |
| "instances[] size mismatch"); |
| for (size_t i = 0; i < static_state->instances.size(); i++) |
| new (&static_state->instances[i]) DataSourceState{}; |
| |
| static_state->index = new_index; |
| |
| // Generate a semi-unique id for this data source. |
| base::Hasher hash; |
| hash.Update(reinterpret_cast<intptr_t>(static_state)); |
| hash.Update(base::GetWallTimeNs().count()); |
| static_state->id = hash.digest() ? hash.digest() : 1; |
| |
| task_runner_->PostTask([this, descriptor, factory, static_state, params, |
| no_flush] { |
| data_sources_.emplace_back(); |
| RegisteredDataSource& rds = data_sources_.back(); |
| rds.descriptor = descriptor; |
| rds.factory = factory; |
| rds.supports_multiple_instances = |
| supports_multiple_data_source_instances_ && |
| params.supports_multiple_instances; |
| rds.requires_callbacks_under_lock = params.requires_callbacks_under_lock; |
| rds.static_state = static_state; |
| rds.no_flush = no_flush; |
| |
| UpdateDataSourceOnAllBackends(rds, /*is_changed=*/false); |
| }); |
| return true; |
| } |
| |
| // Can be called from any thread (but not concurrently). |
| void TracingMuxerImpl::UpdateDataSourceDescriptor( |
| const DataSourceDescriptor& descriptor, |
| const DataSourceStaticState* static_state) { |
| task_runner_->PostTask([this, descriptor, static_state] { |
| for (auto& rds : data_sources_) { |
| if (rds.static_state == static_state) { |
| PERFETTO_CHECK(rds.descriptor.name() == descriptor.name()); |
| rds.descriptor = descriptor; |
| rds.descriptor.set_id(static_state->id); |
| UpdateDataSourceOnAllBackends(rds, /*is_changed=*/true); |
| return; |
| } |
| } |
| }); |
| } |
| |
| // Can be called from any thread (but not concurrently). |
| void TracingMuxerImpl::RegisterInterceptor( |
| const InterceptorDescriptor& descriptor, |
| InterceptorFactory factory, |
| InterceptorBase::TLSFactory tls_factory, |
| InterceptorBase::TracePacketCallback packet_callback) { |
| task_runner_->PostTask([this, descriptor, factory, tls_factory, |
| packet_callback] { |
| // Ignore repeated registrations. |
| for (const auto& interceptor : interceptors_) { |
| if (interceptor.descriptor.name() == descriptor.name()) { |
| PERFETTO_DCHECK(interceptor.tls_factory == tls_factory); |
| PERFETTO_DCHECK(interceptor.packet_callback == packet_callback); |
| return; |
| } |
| } |
| // Only allow certain interceptors for now. |
| if (descriptor.name() != "test_interceptor" && |
| descriptor.name() != "console" && descriptor.name() != "etwexport") { |
| PERFETTO_ELOG( |
| "Interceptors are experimental. If you want to use them, please " |
| "get in touch with the project maintainers " |
| "(https://perfetto.dev/docs/contributing/" |
| "getting-started#community)."); |
| return; |
| } |
| interceptors_.emplace_back(); |
| RegisteredInterceptor& interceptor = interceptors_.back(); |
| interceptor.descriptor = descriptor; |
| interceptor.factory = factory; |
| interceptor.tls_factory = tls_factory; |
| interceptor.packet_callback = packet_callback; |
| }); |
| } |
| |
| void TracingMuxerImpl::ActivateTriggers( |
| const std::vector<std::string>& triggers, |
| uint32_t ttl_ms) { |
| base::TimeMillis expire_time = |
| base::GetWallTimeMs() + base::TimeMillis(ttl_ms); |
| task_runner_->PostTask([this, triggers, expire_time] { |
| for (RegisteredProducerBackend& backend : producer_backends_) { |
| if (backend.producer->connected_) { |
| backend.producer->service_->ActivateTriggers(triggers); |
| } else { |
| for (const std::string& trigger : triggers) { |
| backend.producer->on_connect_triggers_.emplace_back(trigger, |
| expire_time); |
| } |
| } |
| } |
| }); |
| } |
| |
| // Checks if there is any matching startup tracing data source instance for a |
| // new SetupDataSource call. If so, moves the data source to this tracing |
| // session (and its target buffer) and returns true, otherwise returns false. |
| static bool MaybeAdoptStartupTracingInDataSource( |
| TracingBackendId backend_id, |
| uint32_t backend_connection_id, |
| DataSourceInstanceID instance_id, |
| const DataSourceConfig& cfg, |
| const std::vector<RegisteredDataSource>& data_sources) { |
| for (const auto& rds : data_sources) { |
| DataSourceStaticState* static_state = rds.static_state; |
| for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) { |
| auto* internal_state = static_state->TryGet(i); |
| |
| if (internal_state && |
| internal_state->startup_target_buffer_reservation.load( |
| std::memory_order_relaxed) && |
| internal_state->data_source_instance_id == 0 && |
| internal_state->backend_id == backend_id && |
| internal_state->backend_connection_id == backend_connection_id && |
| internal_state->config && |
| internal_state->data_source->CanAdoptStartupSession( |
| *internal_state->config, cfg)) { |
| PERFETTO_DLOG("Setting up data source %" PRIu64 |
| " %s by adopting it from a startup tracing session", |
| instance_id, cfg.name().c_str()); |
| |
| std::lock_guard<std::recursive_mutex> lock(internal_state->lock); |
| // Set the associations. The actual takeover happens in |
| // StartDataSource(). |
| internal_state->data_source_instance_id = instance_id; |
| internal_state->buffer_id = |
| static_cast<internal::BufferId>(cfg.target_buffer()); |
| internal_state->config.reset(new DataSourceConfig(cfg)); |
| |
| // TODO(eseckler): Should the data souce config provided by the service |
| // be allowed to specify additional interceptors / additional data |
| // source params? |
| |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| |
| // Called by the service of one of the backends. |
| void TracingMuxerImpl::SetupDataSource(TracingBackendId backend_id, |
| uint32_t backend_connection_id, |
| DataSourceInstanceID instance_id, |
| const DataSourceConfig& cfg) { |
| PERFETTO_DLOG("Setting up data source %" PRIu64 " %s", instance_id, |
| cfg.name().c_str()); |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| |
| // First check if there is any matching startup tracing data source instance. |
| if (MaybeAdoptStartupTracingInDataSource(backend_id, backend_connection_id, |
| instance_id, cfg, data_sources_)) { |
| return; |
| } |
| |
| for (const auto& rds : data_sources_) { |
| if (rds.descriptor.name() != cfg.name()) |
| continue; |
| DataSourceStaticState& static_state = *rds.static_state; |
| |
| // If this data source is already active for this exact config, don't start |
| // another instance. This happens when we have several data sources with the |
| // same name, in which case the service sends one SetupDataSource event for |
| // each one. Since we can't map which event maps to which data source, we |
| // ensure each event only starts one data source instance. |
| // TODO(skyostil): Register a unique id with each data source to the service |
| // to disambiguate. |
| bool active_for_config = false; |
| for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) { |
| if (!static_state.TryGet(i)) |
| continue; |
| auto* internal_state = |
| reinterpret_cast<DataSourceState*>(&static_state.instances[i]); |
| if (internal_state->backend_id == backend_id && |
| internal_state->backend_connection_id == backend_connection_id && |
| internal_state->config && *internal_state->config == cfg) { |
| active_for_config = true; |
| break; |
| } |
| } |
| if (active_for_config) { |
| PERFETTO_DLOG( |
| "Data source %s is already active with this config, skipping", |
| cfg.name().c_str()); |
| continue; |
| } |
| |
| SetupDataSourceImpl(rds, backend_id, backend_connection_id, instance_id, |
| cfg, /*startup_session_id=*/0); |
| return; |
| } |
| } |
| |
| TracingMuxerImpl::FindDataSourceRes TracingMuxerImpl::SetupDataSourceImpl( |
| const RegisteredDataSource& rds, |
| TracingBackendId backend_id, |
| uint32_t backend_connection_id, |
| DataSourceInstanceID instance_id, |
| const DataSourceConfig& cfg, |
| TracingSessionGlobalID startup_session_id) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| DataSourceStaticState& static_state = *rds.static_state; |
| |
| // If any bit is set in `static_state.valid_instances` then at least one |
| // other instance of data source is running. |
| if (!rds.supports_multiple_instances && |
| static_state.valid_instances.load(std::memory_order_acquire) != 0) { |
| PERFETTO_ELOG( |
| "Failed to setup data source because some another instance of this " |
| "data source is already active"); |
| return FindDataSourceRes(); |
| } |
| |
| for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) { |
| // Find a free slot. |
| if (static_state.TryGet(i)) |
| continue; |
| |
| auto* internal_state = |
| reinterpret_cast<DataSourceState*>(&static_state.instances[i]); |
| std::unique_lock<std::recursive_mutex> lock(internal_state->lock); |
| static_assert( |
| std::is_same<decltype(internal_state->data_source_instance_id), |
| DataSourceInstanceID>::value, |
| "data_source_instance_id type mismatch"); |
| internal_state->muxer_id_for_testing = muxer_id_for_testing_; |
| RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id); |
| |
| if (startup_session_id) { |
| uint16_t& last_reservation = |
| backend.producer->last_startup_target_buffer_reservation_; |
| if (last_reservation == std::numeric_limits<uint16_t>::max()) { |
| PERFETTO_ELOG( |
| "Startup buffer reservations exhausted, dropping data source"); |
| return FindDataSourceRes(); |
| } |
| internal_state->startup_target_buffer_reservation.store( |
| ++last_reservation, std::memory_order_relaxed); |
| } else { |
| internal_state->startup_target_buffer_reservation.store( |
| 0, std::memory_order_relaxed); |
| } |
| |
| internal_state->backend_id = backend_id; |
| internal_state->backend_connection_id = backend_connection_id; |
| internal_state->data_source_instance_id = instance_id; |
| internal_state->buffer_id = |
| static_cast<internal::BufferId>(cfg.target_buffer()); |
| internal_state->config.reset(new DataSourceConfig(cfg)); |
| internal_state->startup_session_id = startup_session_id; |
| internal_state->data_source = rds.factory(); |
| internal_state->interceptor = nullptr; |
| internal_state->interceptor_id = 0; |
| internal_state->will_notify_on_stop = rds.descriptor.will_notify_on_stop(); |
| |
| if (cfg.has_interceptor_config()) { |
| for (size_t j = 0; j < interceptors_.size(); j++) { |
| if (cfg.interceptor_config().name() == |
| interceptors_[j].descriptor.name()) { |
| PERFETTO_DLOG("Intercepting data source %" PRIu64 |
| " \"%s\" into \"%s\"", |
| instance_id, cfg.name().c_str(), |
| cfg.interceptor_config().name().c_str()); |
| internal_state->interceptor_id = static_cast<uint32_t>(j + 1); |
| internal_state->interceptor = interceptors_[j].factory(); |
| internal_state->interceptor->OnSetup({cfg}); |
| break; |
| } |
| } |
| if (!internal_state->interceptor_id) { |
| PERFETTO_ELOG("Unknown interceptor configured for data source: %s", |
| cfg.interceptor_config().name().c_str()); |
| } |
| } |
| |
| // This must be made at the end. See matching acquire-load in |
| // DataSource::Trace(). |
| static_state.valid_instances.fetch_or(1 << i, std::memory_order_release); |
| |
| DataSourceBase::SetupArgs setup_args; |
| setup_args.config = &cfg; |
| setup_args.backend_type = backend.type; |
| setup_args.internal_instance_index = i; |
| |
| if (!rds.requires_callbacks_under_lock) |
| lock.unlock(); |
| internal_state->data_source->OnSetup(setup_args); |
| |
| return FindDataSourceRes(&static_state, internal_state, i, |
| rds.requires_callbacks_under_lock); |
| } |
| PERFETTO_ELOG( |
| "Maximum number of data source instances exhausted. " |
| "Dropping data source %" PRIu64, |
| instance_id); |
| return FindDataSourceRes(); |
| } |
| |
| // Called by the service of one of the backends. |
| void TracingMuxerImpl::StartDataSource(TracingBackendId backend_id, |
| DataSourceInstanceID instance_id) { |
| PERFETTO_DLOG("Starting data source %" PRIu64, instance_id); |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| |
| auto ds = FindDataSource(backend_id, instance_id); |
| if (!ds) { |
| PERFETTO_ELOG("Could not find data source to start"); |
| return; |
| } |
| |
| // Check if the data source was already started for startup tracing. |
| uint16_t startup_reservation = |
| ds.internal_state->startup_target_buffer_reservation.load( |
| std::memory_order_relaxed); |
| if (startup_reservation) { |
| RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id); |
| TracingSessionGlobalID session_id = ds.internal_state->startup_session_id; |
| auto session_it = std::find_if( |
| backend.startup_sessions.begin(), backend.startup_sessions.end(), |
| [session_id](const RegisteredStartupSession& session) { |
| return session.session_id == session_id; |
| }); |
| PERFETTO_DCHECK(session_it != backend.startup_sessions.end()); |
| |
| if (session_it->is_aborting) { |
| PERFETTO_DLOG("Data source %" PRIu64 |
| " was already aborted for startup tracing, not starting it", |
| instance_id); |
| return; |
| } |
| |
| PERFETTO_DLOG( |
| "Data source %" PRIu64 |
| " was already started for startup tracing, binding its target buffer", |
| instance_id); |
| |
| backend.producer->service_->MaybeSharedMemoryArbiter() |
| ->BindStartupTargetBuffer(startup_reservation, |
| ds.internal_state->buffer_id); |
| |
| // The reservation ID can be used even after binding it, so there's no need |
| // for any barriers here - we just need atomicity. |
| ds.internal_state->startup_target_buffer_reservation.store( |
| 0, std::memory_order_relaxed); |
| |
| // TODO(eseckler): Should we reset incremental state at this point, or |
| // notify the data source some other way? |
| |
| // The session should not have been fully bound yet (or aborted). |
| PERFETTO_DCHECK(session_it->num_unbound_data_sources > 0); |
| |
| session_it->num_unbound_data_sources--; |
| if (session_it->num_unbound_data_sources == 0) { |
| if (session_it->on_adopted) |
| task_runner_->PostTask(session_it->on_adopted); |
| backend.startup_sessions.erase(session_it); |
| } |
| return; |
| } |
| |
| StartDataSourceImpl(ds); |
| } |
| |
| void TracingMuxerImpl::StartDataSourceImpl(const FindDataSourceRes& ds) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| |
| DataSourceBase::StartArgs start_args{}; |
| start_args.internal_instance_index = ds.instance_idx; |
| |
| std::unique_lock<std::recursive_mutex> lock(ds.internal_state->lock); |
| if (ds.internal_state->interceptor) |
| ds.internal_state->interceptor->OnStart({}); |
| ds.internal_state->trace_lambda_enabled.store(true, |
| std::memory_order_relaxed); |
| PERFETTO_DCHECK(ds.internal_state->data_source != nullptr); |
| |
| if (!ds.requires_callbacks_under_lock) |
| lock.unlock(); |
| ds.internal_state->data_source->OnStart(start_args); |
| } |
| |
| // Called by the service of one of the backends. |
| void TracingMuxerImpl::StopDataSource_AsyncBegin( |
| TracingBackendId backend_id, |
| DataSourceInstanceID instance_id) { |
| PERFETTO_DLOG("Stopping data source %" PRIu64, instance_id); |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| |
| auto ds = FindDataSource(backend_id, instance_id); |
| if (!ds) { |
| PERFETTO_ELOG("Could not find data source to stop"); |
| return; |
| } |
| |
| StopDataSource_AsyncBeginImpl(ds); |
| } |
| |
| void TracingMuxerImpl::StopDataSource_AsyncBeginImpl( |
| const FindDataSourceRes& ds) { |
| TracingBackendId backend_id = ds.internal_state->backend_id; |
| uint32_t backend_connection_id = ds.internal_state->backend_connection_id; |
| DataSourceInstanceID instance_id = ds.internal_state->data_source_instance_id; |
| |
| StopArgsImpl stop_args{}; |
| stop_args.internal_instance_index = ds.instance_idx; |
| stop_args.async_stop_closure = [this, backend_id, backend_connection_id, |
| instance_id, ds] { |
| // TracingMuxerImpl is long lived, capturing |this| is okay. |
| // The notification closure can be moved out of the StopArgs by the |
| // embedder to handle stop asynchronously. The embedder might then |
| // call the closure on a different thread than the current one, hence |
| // this nested PostTask(). |
| task_runner_->PostTask( |
| [this, backend_id, backend_connection_id, instance_id, ds] { |
| StopDataSource_AsyncEnd(backend_id, backend_connection_id, |
| instance_id, ds); |
| }); |
| }; |
| |
| { |
| std::unique_lock<std::recursive_mutex> lock(ds.internal_state->lock); |
| |
| // Don't call OnStop again if the datasource is already stopping. |
| if (ds.internal_state->async_stop_in_progress) |
| return; |
| ds.internal_state->async_stop_in_progress = true; |
| |
| if (ds.internal_state->interceptor) |
| ds.internal_state->interceptor->OnStop({}); |
| |
| if (!ds.requires_callbacks_under_lock) |
| lock.unlock(); |
| ds.internal_state->data_source->OnStop(stop_args); |
| } |
| |
| // If the embedder hasn't called StopArgs.HandleStopAsynchronously() run the |
| // async closure here. In theory we could avoid the PostTask and call |
| // straight into CompleteDataSourceAsyncStop(). We keep that to reduce |
| // divergencies between the deferred-stop vs non-deferred-stop code paths. |
| if (stop_args.async_stop_closure) |
| std::move(stop_args.async_stop_closure)(); |
| } |
| |
| void TracingMuxerImpl::StopDataSource_AsyncEnd(TracingBackendId backend_id, |
| uint32_t backend_connection_id, |
| DataSourceInstanceID instance_id, |
| const FindDataSourceRes& ds) { |
| PERFETTO_DLOG("Ending async stop of data source %" PRIu64, instance_id); |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| |
| // Check that the data source instance is still active and was not modified |
| // while it was being stopped. |
| if (!ds.static_state->TryGet(ds.instance_idx) || |
| ds.internal_state->backend_id != backend_id || |
| ds.internal_state->backend_connection_id != backend_connection_id || |
| ds.internal_state->data_source_instance_id != instance_id) { |
| PERFETTO_ELOG( |
| "Async stop of data source %" PRIu64 |
| " failed. This might be due to calling the async_stop_closure twice.", |
| instance_id); |
| return; |
| } |
| |
| const uint32_t mask = ~(1 << ds.instance_idx); |
| ds.static_state->valid_instances.fetch_and(mask, std::memory_order_acq_rel); |
| |
| bool will_notify_on_stop; |
| // Take the mutex to prevent that the data source is in the middle of |
| // a Trace() execution where it called GetDataSourceLocked() while we |
| // destroy it. |
| uint16_t startup_buffer_reservation; |
| TracingSessionGlobalID startup_session_id; |
| { |
| std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock); |
| ds.internal_state->trace_lambda_enabled.store(false, |
| std::memory_order_relaxed); |
| ds.internal_state->data_source.reset(); |
| ds.internal_state->interceptor.reset(); |
| ds.internal_state->config.reset(); |
| ds.internal_state->async_stop_in_progress = false; |
| will_notify_on_stop = ds.internal_state->will_notify_on_stop; |
| startup_buffer_reservation = |
| ds.internal_state->startup_target_buffer_reservation.load( |
| std::memory_order_relaxed); |
| startup_session_id = ds.internal_state->startup_session_id; |
| } |
| |
| // The other fields of internal_state are deliberately *not* cleared. |
| // See races-related comments of DataSource::Trace(). |
| |
| TracingMuxer::generation_++; |
| |
| // |producer_backends_| is append-only, Backend instances are always valid. |
| PERFETTO_CHECK(backend_id < producer_backends_.size()); |
| RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id); |
| ProducerImpl* producer = backend.producer.get(); |
| if (!producer) |
| return; |
| |
| // If the data source instance still has a startup buffer reservation, it was |
| // only active for startup tracing and never started by the service. Discard |
| // the startup buffer reservation. |
| if (startup_buffer_reservation) { |
| PERFETTO_DCHECK(startup_session_id); |
| |
| if (producer->service_ && producer->service_->MaybeSharedMemoryArbiter()) { |
| producer->service_->MaybeSharedMemoryArbiter() |
| ->AbortStartupTracingForReservation(startup_buffer_reservation); |
| } |
| |
| auto session_it = std::find_if( |
| backend.startup_sessions.begin(), backend.startup_sessions.end(), |
| [startup_session_id](const RegisteredStartupSession& session) { |
| return session.session_id == startup_session_id; |
| }); |
| |
| // Session should not be removed until abortion of all data source instances |
| // is complete. |
| PERFETTO_DCHECK(session_it != backend.startup_sessions.end()); |
| |
| session_it->num_aborting_data_sources--; |
| if (session_it->num_aborting_data_sources == 0) { |
| if (session_it->on_aborted) |
| task_runner_->PostTask(session_it->on_aborted); |
| |
| backend.startup_sessions.erase(session_it); |
| } |
| } |
| |
| if (producer->connected_ && |
| backend.producer->connection_id_.load(std::memory_order_relaxed) == |
| backend_connection_id) { |
| // Flush any commits that might have been batched by SharedMemoryArbiter. |
| producer->service_->MaybeSharedMemoryArbiter() |
| ->FlushPendingCommitDataRequests(); |
| if (instance_id && will_notify_on_stop) |
| producer->service_->NotifyDataSourceStopped(instance_id); |
| } |
| producer->SweepDeadServices(); |
| } |
| |
| void TracingMuxerImpl::ClearDataSourceIncrementalState( |
| TracingBackendId backend_id, |
| DataSourceInstanceID instance_id) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| PERFETTO_DLOG("Clearing incremental state for data source %" PRIu64, |
| instance_id); |
| auto ds = FindDataSource(backend_id, instance_id); |
| if (!ds) { |
| PERFETTO_ELOG("Could not find data source to clear incremental state for"); |
| return; |
| } |
| |
| DataSourceBase::ClearIncrementalStateArgs clear_incremental_state_args; |
| clear_incremental_state_args.internal_instance_index = ds.instance_idx; |
| { |
| std::unique_lock<std::recursive_mutex> lock; |
| if (ds.requires_callbacks_under_lock) |
| lock = std::unique_lock<std::recursive_mutex>(ds.internal_state->lock); |
| ds.internal_state->data_source->WillClearIncrementalState( |
| clear_incremental_state_args); |
| } |
| |
| // Make DataSource::TraceContext::GetIncrementalState() eventually notice that |
| // the incremental state should be cleared. |
| ds.static_state->incremental_state_generation.fetch_add( |
| 1, std::memory_order_relaxed); |
| } |
| |
| bool TracingMuxerImpl::FlushDataSource_AsyncBegin( |
| TracingBackendId backend_id, |
| DataSourceInstanceID instance_id, |
| FlushRequestID flush_id, |
| FlushFlags flush_flags) { |
| PERFETTO_DLOG("Flushing data source %" PRIu64, instance_id); |
| auto ds = FindDataSource(backend_id, instance_id); |
| if (!ds) { |
| PERFETTO_ELOG("Could not find data source to flush"); |
| return true; |
| } |
| |
| uint32_t backend_connection_id = ds.internal_state->backend_connection_id; |
| |
| FlushArgsImpl flush_args; |
| flush_args.flush_flags = flush_flags; |
| flush_args.internal_instance_index = ds.instance_idx; |
| flush_args.async_flush_closure = [this, backend_id, backend_connection_id, |
| instance_id, ds, flush_id] { |
| // TracingMuxerImpl is long lived, capturing |this| is okay. |
| // The notification closure can be moved out of the StopArgs by the |
| // embedder to handle stop asynchronously. The embedder might then |
| // call the closure on a different thread than the current one, hence |
| // this nested PostTask(). |
| task_runner_->PostTask( |
| [this, backend_id, backend_connection_id, instance_id, ds, flush_id] { |
| FlushDataSource_AsyncEnd(backend_id, backend_connection_id, |
| instance_id, ds, flush_id); |
| }); |
| }; |
| { |
| std::unique_lock<std::recursive_mutex> lock; |
| if (ds.requires_callbacks_under_lock) |
| lock = std::unique_lock<std::recursive_mutex>(ds.internal_state->lock); |
| ds.internal_state->data_source->OnFlush(flush_args); |
| } |
| |
| // |async_flush_closure| is moved out of |flush_args| if the producer |
| // requested to handle the flush asynchronously. |
| bool handled = static_cast<bool>(flush_args.async_flush_closure); |
| return handled; |
| } |
| |
| void TracingMuxerImpl::FlushDataSource_AsyncEnd( |
| TracingBackendId backend_id, |
| uint32_t backend_connection_id, |
| DataSourceInstanceID instance_id, |
| const FindDataSourceRes& ds, |
| FlushRequestID flush_id) { |
| PERFETTO_DLOG("Ending async flush of data source %" PRIu64, instance_id); |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| |
| // Check that the data source instance is still active and was not modified |
| // while it was being flushed. |
| if (!ds.static_state->TryGet(ds.instance_idx) || |
| ds.internal_state->backend_id != backend_id || |
| ds.internal_state->backend_connection_id != backend_connection_id || |
| ds.internal_state->data_source_instance_id != instance_id) { |
| PERFETTO_ELOG("Async flush of data source %" PRIu64 |
| " failed. This might be due to the data source being stopped " |
| "in the meantime", |
| instance_id); |
| return; |
| } |
| |
| // |producer_backends_| is append-only, Backend instances are always valid. |
| PERFETTO_CHECK(backend_id < producer_backends_.size()); |
| RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id); |
| |
| ProducerImpl* producer = backend.producer.get(); |
| if (!producer) |
| return; |
| |
| // If the tracing service disconnects and reconnects while a data source is |
| // handling a flush request, there's no point is sending the flush reply to |
| // the newly reconnected producer. |
| if (producer->connected_ && |
| backend.producer->connection_id_.load(std::memory_order_relaxed) == |
| backend_connection_id) { |
| producer->NotifyFlushForDataSourceDone(instance_id, flush_id); |
| } |
| } |
| |
| void TracingMuxerImpl::SyncProducersForTesting() { |
| std::mutex mutex; |
| std::condition_variable cv; |
| |
| // IPC-based producers don't report connection errors explicitly for each |
| // command, but instead with an asynchronous callback |
| // (ProducerImpl::OnDisconnected). This means that the sync command below |
| // may have completed but failed to reach the service because of a |
| // disconnection, but we can't tell until the disconnection message comes |
| // through. To guard against this, we run two whole rounds of sync round-trips |
| // before returning; the first one will detect any disconnected producers and |
| // the second one will ensure any reconnections have completed and all data |
| // sources are registered in the service again. |
| for (size_t i = 0; i < 2; i++) { |
| size_t countdown = std::numeric_limits<size_t>::max(); |
| task_runner_->PostTask([this, &mutex, &cv, &countdown] { |
| { |
| std::unique_lock<std::mutex> countdown_lock(mutex); |
| countdown = producer_backends_.size(); |
| } |
| for (auto& backend : producer_backends_) { |
| auto* producer = backend.producer.get(); |
| producer->service_->Sync([&mutex, &cv, &countdown] { |
| std::unique_lock<std::mutex> countdown_lock(mutex); |
| countdown--; |
| cv.notify_one(); |
| }); |
| } |
| }); |
| |
| { |
| std::unique_lock<std::mutex> countdown_lock(mutex); |
| cv.wait(countdown_lock, [&countdown] { return !countdown; }); |
| } |
| } |
| |
| // Check that all producers are indeed connected. |
| bool done = false; |
| bool all_producers_connected = true; |
| task_runner_->PostTask([this, &mutex, &cv, &done, &all_producers_connected] { |
| for (auto& backend : producer_backends_) |
| all_producers_connected &= backend.producer->connected_; |
| std::unique_lock<std::mutex> lock(mutex); |
| done = true; |
| cv.notify_one(); |
| }); |
| |
| { |
| std::unique_lock<std::mutex> lock(mutex); |
| cv.wait(lock, [&done] { return done; }); |
| } |
| PERFETTO_DCHECK(all_producers_connected); |
| } |
| |
| void TracingMuxerImpl::DestroyStoppedTraceWritersForCurrentThread() { |
| // Iterate across all possible data source types. |
| auto cur_generation = generation_.load(std::memory_order_acquire); |
| auto* root_tls = GetOrCreateTracingTLS(); |
| |
| auto destroy_stopped_instances = [](DataSourceThreadLocalState& tls) { |
| // |tls| has a vector of per-data-source-instance thread-local state. |
| DataSourceStaticState* static_state = tls.static_state; |
| if (!static_state) |
| return; // Slot not used. |
| |
| // Iterate across all possible instances for this data source. |
| for (uint32_t inst = 0; inst < kMaxDataSourceInstances; inst++) { |
| DataSourceInstanceThreadLocalState& ds_tls = tls.per_instance[inst]; |
| if (!ds_tls.trace_writer) |
| continue; |
| |
| DataSourceState* ds_state = static_state->TryGet(inst); |
| if (ds_state && |
| ds_state->muxer_id_for_testing == ds_tls.muxer_id_for_testing && |
| ds_state->backend_id == ds_tls.backend_id && |
| ds_state->backend_connection_id == ds_tls.backend_connection_id && |
| ds_state->startup_target_buffer_reservation.load( |
| std::memory_order_relaxed) == |
| ds_tls.startup_target_buffer_reservation && |
| ds_state->buffer_id == ds_tls.buffer_id && |
| ds_state->data_source_instance_id == ds_tls.data_source_instance_id) { |
| continue; |
| } |
| |
| // The DataSource instance has been destroyed or recycled. |
| ds_tls.Reset(); // Will also destroy the |ds_tls.trace_writer|. |
| } |
| }; |
| |
| for (size_t ds_idx = 0; ds_idx < kMaxDataSources; ds_idx++) { |
| // |tls| has a vector of per-data-source-instance thread-local state. |
| DataSourceThreadLocalState& tls = root_tls->data_sources_tls[ds_idx]; |
| destroy_stopped_instances(tls); |
| } |
| destroy_stopped_instances(root_tls->track_event_tls); |
| root_tls->generation = cur_generation; |
| } |
| |
| // Called both when a new data source is registered or when a new backend |
| // connects. In both cases we want to be sure we reflected the data source |
| // registrations on the backends. |
| void TracingMuxerImpl::UpdateDataSourcesOnAllBackends() { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| for (RegisteredDataSource& rds : data_sources_) { |
| UpdateDataSourceOnAllBackends(rds, /*is_changed=*/false); |
| } |
| } |
| |
| void TracingMuxerImpl::UpdateDataSourceOnAllBackends(RegisteredDataSource& rds, |
| bool is_changed) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| for (RegisteredProducerBackend& backend : producer_backends_) { |
| // We cannot call RegisterDataSource on the backend before it connects. |
| if (!backend.producer->connected_) |
| continue; |
| |
| PERFETTO_DCHECK(rds.static_state->index < kMaxDataSources); |
| bool is_registered = backend.producer->registered_data_sources_.test( |
| rds.static_state->index); |
| if (is_registered && !is_changed) |
| continue; |
| |
| if (!rds.descriptor.no_flush()) { |
| rds.descriptor.set_no_flush(rds.no_flush); |
| } |
| rds.descriptor.set_will_notify_on_start(true); |
| if (!rds.descriptor.has_will_notify_on_stop()) { |
| rds.descriptor.set_will_notify_on_stop(true); |
| } |
| |
| rds.descriptor.set_handles_incremental_state_clear(true); |
| rds.descriptor.set_id(rds.static_state->id); |
| if (is_registered) { |
| backend.producer->service_->UpdateDataSource(rds.descriptor); |
| } else { |
| backend.producer->service_->RegisterDataSource(rds.descriptor); |
| } |
| backend.producer->registered_data_sources_.set(rds.static_state->index); |
| } |
| } |
| |
| void TracingMuxerImpl::SetupTracingSession( |
| TracingSessionGlobalID session_id, |
| const std::shared_ptr<TraceConfig>& trace_config, |
| base::ScopedFile trace_fd) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| PERFETTO_CHECK(!trace_fd || trace_config->write_into_file()); |
| |
| auto* consumer = FindConsumer(session_id); |
| if (!consumer) |
| return; |
| |
| consumer->trace_config_ = trace_config; |
| if (trace_fd) |
| consumer->trace_fd_ = std::move(trace_fd); |
| |
| if (!consumer->connected_) |
| return; |
| |
| // Only used in the deferred start mode. |
| if (trace_config->deferred_start()) { |
| consumer->service_->EnableTracing(*trace_config, |
| std::move(consumer->trace_fd_)); |
| } |
| } |
| |
| void TracingMuxerImpl::StartTracingSession(TracingSessionGlobalID session_id) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| |
| auto* consumer = FindConsumer(session_id); |
| |
| if (!consumer) |
| return; |
| |
| if (!consumer->trace_config_) { |
| PERFETTO_ELOG("Must call Setup(config) first"); |
| return; |
| } |
| |
| if (!consumer->connected_) { |
| consumer->start_pending_ = true; |
| return; |
| } |
| |
| consumer->start_pending_ = false; |
| if (consumer->trace_config_->deferred_start()) { |
| consumer->service_->StartTracing(); |
| } else { |
| consumer->service_->EnableTracing(*consumer->trace_config_, |
| std::move(consumer->trace_fd_)); |
| } |
| |
| // TODO implement support for the deferred-start + fast-triggering case. |
| } |
| |
| void TracingMuxerImpl::CloneTracingSession( |
| TracingSessionGlobalID session_id, |
| TracingSession::CloneTraceArgs args, |
| TracingSession::CloneTraceCallback callback) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| auto* consumer = FindConsumer(session_id); |
| if (!consumer) { |
| TracingSession::CloneTraceCallbackArgs callback_arg{}; |
| callback_arg.success = false; |
| callback_arg.error = "Tracing session not found"; |
| callback(callback_arg); |
| return; |
| } |
| // Multiple concurrent cloning isn't supported. |
| PERFETTO_DCHECK(!consumer->clone_trace_callback_); |
| consumer->clone_trace_callback_ = std::move(callback); |
| ConsumerEndpoint::CloneSessionArgs consumer_args{}; |
| consumer_args.unique_session_name = args.unique_session_name; |
| if (!consumer->connected_) { |
| consumer->session_to_clone_ = std::move(consumer_args); |
| return; |
| } |
| consumer->session_to_clone_ = std::nullopt; |
| consumer->service_->CloneSession(consumer_args); |
| } |
| |
| void TracingMuxerImpl::ChangeTracingSessionConfig( |
| TracingSessionGlobalID session_id, |
| const TraceConfig& trace_config) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| |
| auto* consumer = FindConsumer(session_id); |
| |
| if (!consumer) |
| return; |
| |
| if (!consumer->trace_config_) { |
| // Changing the config is only supported for started sessions. |
| PERFETTO_ELOG("Must call Setup(config) and Start() first"); |
| return; |
| } |
| |
| consumer->trace_config_ = std::make_shared<TraceConfig>(trace_config); |
| if (consumer->connected_) |
| consumer->service_->ChangeTraceConfig(trace_config); |
| } |
| |
| void TracingMuxerImpl::FlushTracingSession(TracingSessionGlobalID session_id, |
| uint32_t timeout_ms, |
| std::function<void(bool)> callback) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| auto* consumer = FindConsumer(session_id); |
| if (!consumer || consumer->start_pending_ || consumer->stop_pending_ || |
| !consumer->trace_config_) { |
| PERFETTO_ELOG("Flush() can be called only after Start() and before Stop()"); |
| std::move(callback)(false); |
| return; |
| } |
| |
| // For now we don't want to expose the flush reason to the consumer-side SDK |
| // users to avoid misuses until there is a strong need. |
| consumer->service_->Flush(timeout_ms, std::move(callback), |
| FlushFlags(FlushFlags::Initiator::kConsumerSdk, |
| FlushFlags::Reason::kExplicit)); |
| } |
| |
| void TracingMuxerImpl::StopTracingSession(TracingSessionGlobalID session_id) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| auto* consumer = FindConsumer(session_id); |
| if (!consumer) |
| return; |
| |
| if (consumer->start_pending_) { |
| // If the session hasn't started yet, wait until it does before stopping. |
| consumer->stop_pending_ = true; |
| return; |
| } |
| |
| consumer->stop_pending_ = false; |
| if (consumer->stopped_) { |
| // If the session was already stopped (e.g., it failed to start), don't try |
| // stopping again. |
| consumer->NotifyStopComplete(); |
| } else if (!consumer->trace_config_) { |
| PERFETTO_ELOG("Must call Setup(config) and Start() first"); |
| return; |
| } else { |
| consumer->service_->DisableTracing(); |
| } |
| |
| consumer->trace_config_.reset(); |
| } |
| |
| void TracingMuxerImpl::DestroyTracingSession( |
| TracingSessionGlobalID session_id) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| for (RegisteredConsumerBackend& backend : consumer_backends_) { |
| // We need to find the consumer (if any) and call Disconnect as we destroy |
| // the tracing session. We can't call Disconnect() inside this for loop |
| // because in the in-process case this will end up to a synchronous call to |
| // OnConsumerDisconnect which will invalidate all the iterators to |
| // |backend.consumers|. |
| ConsumerImpl* consumer = nullptr; |
| for (auto& con : backend.consumers) { |
| if (con->session_id_ == session_id) { |
| consumer = con.get(); |
| break; |
| } |
| } |
| if (consumer) { |
| // We broke out of the loop above on the assumption that each backend will |
| // only have a single consumer per session. This DCHECK ensures that |
| // this is the case. |
| PERFETTO_DCHECK( |
| std::count_if(backend.consumers.begin(), backend.consumers.end(), |
| [session_id](const std::unique_ptr<ConsumerImpl>& con) { |
| return con->session_id_ == session_id; |
| }) == 1u); |
| consumer->Disconnect(); |
| } |
| } |
| } |
| |
| void TracingMuxerImpl::ReadTracingSessionData( |
| TracingSessionGlobalID session_id, |
| std::function<void(TracingSession::ReadTraceCallbackArgs)> callback) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| auto* consumer = FindConsumer(session_id); |
| if (!consumer) { |
| // TODO(skyostil): Signal an error to the user. |
| TracingSession::ReadTraceCallbackArgs callback_arg{}; |
| callback(callback_arg); |
| return; |
| } |
| PERFETTO_DCHECK(!consumer->read_trace_callback_); |
| consumer->read_trace_callback_ = std::move(callback); |
| consumer->service_->ReadBuffers(); |
| } |
| |
| void TracingMuxerImpl::GetTraceStats( |
| TracingSessionGlobalID session_id, |
| TracingSession::GetTraceStatsCallback callback) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| auto* consumer = FindConsumer(session_id); |
| if (!consumer) { |
| TracingSession::GetTraceStatsCallbackArgs callback_arg{}; |
| callback_arg.success = false; |
| callback(std::move(callback_arg)); |
| return; |
| } |
| PERFETTO_DCHECK(!consumer->get_trace_stats_callback_); |
| consumer->get_trace_stats_callback_ = std::move(callback); |
| if (!consumer->connected_) { |
| consumer->get_trace_stats_pending_ = true; |
| return; |
| } |
| consumer->get_trace_stats_pending_ = false; |
| consumer->service_->GetTraceStats(); |
| } |
| |
| void TracingMuxerImpl::QueryServiceState( |
| TracingSessionGlobalID session_id, |
| TracingSession::QueryServiceStateCallback callback) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| auto* consumer = FindConsumer(session_id); |
| if (!consumer) { |
| TracingSession::QueryServiceStateCallbackArgs callback_arg{}; |
| callback_arg.success = false; |
| callback(std::move(callback_arg)); |
| return; |
| } |
| PERFETTO_DCHECK(!consumer->query_service_state_callback_); |
| if (!consumer->connected_) { |
| consumer->query_service_state_callback_ = std::move(callback); |
| return; |
| } |
| auto callback_wrapper = [callback](bool success, |
| protos::gen::TracingServiceState state) { |
| TracingSession::QueryServiceStateCallbackArgs callback_arg{}; |
| callback_arg.success = success; |
| callback_arg.service_state_data = state.SerializeAsArray(); |
| callback(std::move(callback_arg)); |
| }; |
| consumer->service_->QueryServiceState({}, std::move(callback_wrapper)); |
| } |
| |
| void TracingMuxerImpl::SetBatchCommitsDurationForTesting( |
| uint32_t batch_commits_duration_ms, |
| BackendType backend_type) { |
| for (RegisteredProducerBackend& backend : producer_backends_) { |
| if (backend.producer && backend.producer->connected_ && |
| backend.type == backend_type) { |
| backend.producer->service_->MaybeSharedMemoryArbiter() |
| ->SetBatchCommitsDuration(batch_commits_duration_ms); |
| } |
| } |
| } |
| |
| bool TracingMuxerImpl::EnableDirectSMBPatchingForTesting( |
| BackendType backend_type) { |
| for (RegisteredProducerBackend& backend : producer_backends_) { |
| if (backend.producer && backend.producer->connected_ && |
| backend.type == backend_type && |
| !backend.producer->service_->MaybeSharedMemoryArbiter() |
| ->EnableDirectSMBPatching()) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| TracingMuxerImpl::ConsumerImpl* TracingMuxerImpl::FindConsumer( |
| TracingSessionGlobalID session_id) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| return FindConsumerAndBackend(session_id).first; |
| } |
| |
| std::pair<TracingMuxerImpl::ConsumerImpl*, |
| TracingMuxerImpl::RegisteredConsumerBackend*> |
| TracingMuxerImpl::FindConsumerAndBackend(TracingSessionGlobalID session_id) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| for (RegisteredConsumerBackend& backend : consumer_backends_) { |
| for (auto& consumer : backend.consumers) { |
| if (consumer->session_id_ == session_id) { |
| return {consumer.get(), &backend}; |
| } |
| } |
| } |
| return {nullptr, nullptr}; |
| } |
| |
| void TracingMuxerImpl::InitializeConsumer(TracingSessionGlobalID session_id) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| |
| auto res = FindConsumerAndBackend(session_id); |
| if (!res.first || !res.second) |
| return; |
| TracingMuxerImpl::ConsumerImpl* consumer = res.first; |
| RegisteredConsumerBackend& backend = *res.second; |
| |
| TracingBackend::ConnectConsumerArgs conn_args; |
| conn_args.consumer = consumer; |
| conn_args.task_runner = task_runner_.get(); |
| consumer->Initialize(backend.backend->ConnectConsumer(conn_args)); |
| } |
| |
| void TracingMuxerImpl::OnConsumerDisconnected(ConsumerImpl* consumer) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| for (RegisteredConsumerBackend& backend : consumer_backends_) { |
| auto pred = [consumer](const std::unique_ptr<ConsumerImpl>& con) { |
| return con.get() == consumer; |
| }; |
| backend.consumers.erase(std::remove_if(backend.consumers.begin(), |
| backend.consumers.end(), pred), |
| backend.consumers.end()); |
| } |
| } |
| |
| void TracingMuxerImpl::SetMaxProducerReconnectionsForTesting(uint32_t count) { |
| max_producer_reconnections_.store(count); |
| } |
| |
| void TracingMuxerImpl::OnProducerDisconnected(ProducerImpl* producer) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| for (RegisteredProducerBackend& backend : producer_backends_) { |
| if (backend.producer.get() != producer) |
| continue; |
| |
| // The tracing service is disconnected. It does not make sense to keep |
| // tracing (we wouldn't be able to commit). On reconnection, the tracing |
| // service will restart the data sources. |
| for (const auto& rds : data_sources_) { |
| DataSourceStaticState* static_state = rds.static_state; |
| for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) { |
| auto* internal_state = static_state->TryGet(i); |
| if (internal_state && internal_state->backend_id == backend.id && |
| internal_state->backend_connection_id == |
| backend.producer->connection_id_.load( |
| std::memory_order_relaxed)) { |
| StopDataSource_AsyncBeginImpl( |
| FindDataSourceRes(static_state, internal_state, i, |
| rds.requires_callbacks_under_lock)); |
| } |
| } |
| } |
| |
| // Try reconnecting the disconnected producer. If the connection succeeds, |
| // all the data sources will be automatically re-registered. |
| if (producer->connection_id_.load(std::memory_order_relaxed) > |
| max_producer_reconnections_.load()) { |
| // Avoid reconnecting a failing producer too many times. Instead we just |
| // leak the producer instead of trying to avoid further complicating |
| // cross-thread trace writer creation. |
| PERFETTO_ELOG("Producer disconnected too many times; not reconnecting"); |
| continue; |
| } |
| |
| backend.producer->Initialize( |
| backend.backend->ConnectProducer(backend.producer_conn_args)); |
| // Don't use producer-provided SMBs for the next connection unless startup |
| // tracing requires it again. |
| backend.producer_conn_args.use_producer_provided_smb = false; |
| } |
| } |
| |
| void TracingMuxerImpl::SweepDeadBackends() { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| for (auto it = dead_backends_.begin(); it != dead_backends_.end();) { |
| auto next_it = it; |
| next_it++; |
| if (it->producer->SweepDeadServices()) |
| dead_backends_.erase(it); |
| it = next_it; |
| } |
| } |
| |
| TracingMuxerImpl::FindDataSourceRes TracingMuxerImpl::FindDataSource( |
| TracingBackendId backend_id, |
| DataSourceInstanceID instance_id) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| RegisteredProducerBackend& backend = *FindProducerBackendById(backend_id); |
| for (const auto& rds : data_sources_) { |
| DataSourceStaticState* static_state = rds.static_state; |
| for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) { |
| auto* internal_state = static_state->TryGet(i); |
| if (internal_state && internal_state->backend_id == backend_id && |
| internal_state->backend_connection_id == |
| backend.producer->connection_id_.load( |
| std::memory_order_relaxed) && |
| internal_state->data_source_instance_id == instance_id) { |
| return FindDataSourceRes(static_state, internal_state, i, |
| rds.requires_callbacks_under_lock); |
| } |
| } |
| } |
| return FindDataSourceRes(); |
| } |
| |
| // Can be called from any thread. |
| std::unique_ptr<TraceWriterBase> TracingMuxerImpl::CreateTraceWriter( |
| DataSourceStaticState* static_state, |
| uint32_t data_source_instance_index, |
| DataSourceState* data_source, |
| BufferExhaustedPolicy buffer_exhausted_policy) { |
| if (PERFETTO_UNLIKELY(data_source->interceptor_id)) { |
| // If the session is being intercepted, return a heap-backed trace writer |
| // instead. This is safe because all the data given to the interceptor is |
| // either thread-local (|instance_index|), statically allocated |
| // (|static_state|) or constant after initialization (|interceptor|). Access |
| // to the interceptor instance itself through |data_source| is protected by |
| // a statically allocated lock (similarly to the data source instance). |
| auto& interceptor = interceptors_[data_source->interceptor_id - 1]; |
| return std::unique_ptr<TraceWriterBase>(new InterceptorTraceWriter( |
| interceptor.tls_factory(static_state, data_source_instance_index), |
| interceptor.packet_callback, static_state, data_source_instance_index)); |
| } |
| ProducerImpl* producer = |
| FindProducerBackendById(data_source->backend_id)->producer.get(); |
| // Atomically load the current service endpoint. We keep the pointer as a |
| // shared pointer on the stack to guard against it from being concurrently |
| // modified on the thread by ProducerImpl::Initialize() swapping in a |
| // reconnected service on the muxer task runner thread. |
| // |
| // The endpoint may also be concurrently modified by SweepDeadServices() |
| // clearing out old disconnected services. We guard against that by |
| // SharedMemoryArbiter keeping track of any outstanding trace writers. After |
| // shutdown has started, the trace writer created below will be a null one |
| // which will drop any written data. See SharedMemoryArbiter::TryShutdown(). |
| // |
| // We use an atomic pointer instead of holding a lock because |
| // CreateTraceWriter posts tasks under the hood. |
| std::shared_ptr<ProducerEndpoint> service = |
| std::atomic_load(&producer->service_); |
| |
| // The service may have been disconnected and reconnected concurrently after |
| // the data source was enabled, in which case we may not have an arbiter, or |
| // would be creating a TraceWriter for the wrong (a newer) connection / SMB. |
| // Instead, early-out now. A relaxed load is fine here because the atomic_load |
| // above ensures that the |service| isn't newer. |
| if (producer->connection_id_.load(std::memory_order_relaxed) != |
| data_source->backend_connection_id) { |
| return std::unique_ptr<TraceWriter>(new NullTraceWriter()); |
| } |
| |
| // We just need a relaxed atomic read here: We can use the reservation ID even |
| // after the buffer was bound, we just need to be sure to read it atomically. |
| uint16_t startup_buffer_reservation = |
| data_source->startup_target_buffer_reservation.load( |
| std::memory_order_relaxed); |
| if (startup_buffer_reservation) { |
| return service->MaybeSharedMemoryArbiter()->CreateStartupTraceWriter( |
| startup_buffer_reservation); |
| } |
| return service->CreateTraceWriter(data_source->buffer_id, |
| buffer_exhausted_policy); |
| } |
| |
| // This is called via the public API Tracing::NewTrace(). |
| // Can be called from any thread. |
| std::unique_ptr<TracingSession> TracingMuxerImpl::CreateTracingSession( |
| BackendType requested_backend_type, |
| TracingConsumerBackend* (*system_backend_factory)()) { |
| TracingSessionGlobalID session_id = ++next_tracing_session_id_; |
| |
| // |backend_type| can only specify one backend, not an OR-ed mask. |
| PERFETTO_CHECK((requested_backend_type & (requested_backend_type - 1)) == 0); |
| |
| // Capturing |this| is fine because the TracingMuxer is a leaky singleton. |
| task_runner_->PostTask([this, requested_backend_type, session_id, |
| system_backend_factory] { |
| if (requested_backend_type == kSystemBackend && system_backend_factory && |
| !FindConsumerBackendByType(kSystemBackend)) { |
| AddConsumerBackend(system_backend_factory(), kSystemBackend); |
| } |
| for (RegisteredConsumerBackend& backend : consumer_backends_) { |
| if (requested_backend_type && backend.type && |
| backend.type != requested_backend_type) { |
| continue; |
| } |
| |
| // Create the consumer now, even if we have to ask the embedder below, so |
| // that any other tasks executing after this one can find the consumer and |
| // change its pending attributes. |
| backend.consumers.emplace_back( |
| new ConsumerImpl(this, backend.type, session_id)); |
| |
| // The last registered backend in |consumer_backends_| is the unsupported |
| // backend without a valid type. |
| if (!backend.type) { |
| PERFETTO_ELOG( |
| "No tracing backend ready for type=%d, consumer will disconnect", |
| requested_backend_type); |
| InitializeConsumer(session_id); |
| return; |
| } |
| |
| // Check if the embedder wants to be asked for permission before |
| // connecting the consumer. |
| if (!policy_) { |
| InitializeConsumer(session_id); |
| return; |
| } |
| |
| BackendType type = backend.type; |
| TracingPolicy::ShouldAllowConsumerSessionArgs args; |
| args.backend_type = backend.type; |
| args.result_callback = [this, type, session_id](bool allow) { |
| task_runner_->PostTask([this, type, session_id, allow] { |
| if (allow) { |
| InitializeConsumer(session_id); |
| return; |
| } |
| |
| PERFETTO_ELOG( |
| "Consumer session for backend type type=%d forbidden, " |
| "consumer will disconnect", |
| type); |
| |
| auto* consumer = FindConsumer(session_id); |
| if (!consumer) |
| return; |
| |
| consumer->OnDisconnect(); |
| }); |
| }; |
| policy_->ShouldAllowConsumerSession(args); |
| return; |
| } |
| PERFETTO_DFATAL("Not reached"); |
| }); |
| |
| return std::unique_ptr<TracingSession>( |
| new TracingSessionImpl(this, session_id, requested_backend_type)); |
| } |
| |
| // static |
| // This is called via the public API Tracing::SetupStartupTracing(). |
| // Can be called from any thread. |
| std::unique_ptr<StartupTracingSession> |
| TracingMuxerImpl::CreateStartupTracingSession( |
| const TraceConfig& config, |
| Tracing::SetupStartupTracingOpts opts) { |
| BackendType backend_type = opts.backend; |
| // |backend_type| can only specify one backend, not an OR-ed mask. |
| PERFETTO_CHECK((backend_type & (backend_type - 1)) == 0); |
| // The in-process backend doesn't support startup tracing. |
| PERFETTO_CHECK(backend_type != BackendType::kInProcessBackend); |
| |
| TracingSessionGlobalID session_id = ++next_tracing_session_id_; |
| |
| // Capturing |this| is fine because the TracingMuxer is a leaky singleton. |
| task_runner_->PostTask([this, config, opts, backend_type, session_id] { |
| for (RegisteredProducerBackend& backend : producer_backends_) { |
| if (backend_type && backend.type && backend.type != backend_type) { |
| continue; |
| } |
| |
| TracingBackendId backend_id = backend.id; |
| |
| // The last registered backend in |producer_backends_| is the unsupported |
| // backend without a valid type. |
| if (!backend.type) { |
| PERFETTO_ELOG( |
| "No tracing backend initialized for type=%d, startup tracing " |
| "failed", |
| backend_type); |
| if (opts.on_setup) |
| opts.on_setup(Tracing::OnStartupTracingSetupCallbackArgs{ |
| 0 /* num_data_sources_started */}); |
| return; |
| } |
| |
| if (!backend.producer->service_ || |
| !backend.producer->service_->shared_memory()) { |
| // If we unsuccessfully attempted to use a producer-provided SMB in the |
| // past, don't try again. |
| if (backend.producer->producer_provided_smb_failed_) { |
| PERFETTO_ELOG( |
| "Backend %zu doesn't seem to support producer-provided " |
| "SMBs, startup tracing failed", |
| backend_id); |
| if (opts.on_setup) |
| opts.on_setup(Tracing::OnStartupTracingSetupCallbackArgs{ |
| 0 /* num_data_sources_started */}); |
| return; |
| } |
| |
| PERFETTO_DLOG("Reconnecting backend %zu for startup tracing", |
| backend_id); |
| backend.producer_conn_args.use_producer_provided_smb = true; |
| backend.producer->service_->Disconnect(); // Causes a reconnect. |
| PERFETTO_DCHECK(backend.producer->service_ && |
| backend.producer->service_->MaybeSharedMemoryArbiter()); |
| } |
| |
| RegisteredStartupSession session; |
| session.session_id = session_id; |
| session.on_aborted = opts.on_aborted; |
| session.on_adopted = opts.on_adopted; |
| |
| for (const TraceConfig::DataSource& ds_cfg : config.data_sources()) { |
| // Find all matching data sources and start one instance of each. |
| for (const auto& rds : data_sources_) { |
| if (rds.descriptor.name() != ds_cfg.config().name()) |
| continue; |
| |
| PERFETTO_DLOG( |
| "Setting up data source %s for startup tracing with target " |
| "buffer reservation %" PRIi32, |
| rds.descriptor.name().c_str(), |
| backend.producer->last_startup_target_buffer_reservation_ + 1u); |
| auto ds = SetupDataSourceImpl( |
| rds, backend_id, |
| backend.producer->connection_id_.load(std::memory_order_relaxed), |
| /*instance_id=*/0, ds_cfg.config(), |
| /*startup_session_id=*/session_id); |
| if (ds) { |
| StartDataSourceImpl(ds); |
| session.num_unbound_data_sources++; |
| } |
| } |
| } |
| |
| int num_ds = session.num_unbound_data_sources; |
| auto on_setup = opts.on_setup; |
| if (on_setup) { |
| backend.producer->OnStartupTracingSetup(); |
| task_runner_->PostTask([on_setup, num_ds] { |
| on_setup(Tracing::OnStartupTracingSetupCallbackArgs{num_ds}); |
| }); |
| } |
| |
| if (num_ds > 0) { |
| backend.startup_sessions.push_back(std::move(session)); |
| |
| if (opts.timeout_ms > 0) { |
| task_runner_->PostDelayedTask( |
| [this, session_id, backend_type] { |
| AbortStartupTracingSession(session_id, backend_type); |
| }, |
| opts.timeout_ms); |
| } |
| } |
| return; |
| } |
| PERFETTO_DFATAL("Invalid startup tracing session backend"); |
| }); |
| |
| return std::unique_ptr<StartupTracingSession>( |
| new StartupTracingSessionImpl(this, session_id, backend_type)); |
| } |
| |
| // Must not be called from the SDK's internal thread. |
| std::unique_ptr<StartupTracingSession> |
| TracingMuxerImpl::CreateStartupTracingSessionBlocking( |
| const TraceConfig& config, |
| Tracing::SetupStartupTracingOpts opts) { |
| auto previous_on_setup = std::move(opts.on_setup); |
| PERFETTO_CHECK(!task_runner_->RunsTasksOnCurrentThread()); |
| base::WaitableEvent event; |
| // It is safe to capture by reference because once on_setup is called only |
| // once before this method returns. |
| opts.on_setup = [&](Tracing::OnStartupTracingSetupCallbackArgs args) { |
| if (previous_on_setup) { |
| previous_on_setup(std::move(args)); |
| } |
| event.Notify(); |
| }; |
| auto session = CreateStartupTracingSession(config, std::move(opts)); |
| event.Wait(); |
| return session; |
| } |
| |
| void TracingMuxerImpl::AbortStartupTracingSession( |
| TracingSessionGlobalID session_id, |
| BackendType backend_type) { |
| PERFETTO_DCHECK_THREAD(thread_checker_); |
| |
| for (RegisteredProducerBackend& backend : producer_backends_) { |
| if (backend_type != backend.type) |
| continue; |
| |
| auto session_it = std::find_if( |
| backend.startup_sessions.begin(), backend.startup_sessions.end(), |
| [session_id](const RegisteredStartupSession& session) { |
| return session.session_id == session_id; |
| }); |
| |
| // The startup session may have already been aborted or fully adopted. |
| if (session_it == backend.startup_sessions.end()) |
| return; |
| if (session_it->is_aborting) |
| return; |
| |
| session_it->is_aborting = true; |
| |
| // Iterate all data sources and abort them if they weren't adopted yet. |
| for (const auto& rds : data_sources_) { |
| DataSourceStaticState* static_state = rds.static_state; |
| for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) { |
| auto* internal_state = static_state->TryGet(i); |
| if (internal_state && |
| internal_state->startup_target_buffer_reservation.load( |
| std::memory_order_relaxed) && |
| internal_state->data_source_instance_id == 0 && |
| internal_state->startup_session_id == session_id) { |
| PERFETTO_DLOG( |
| "Aborting startup tracing for data source %s (target buffer " |
| "reservation %" PRIu16 ")", |
| rds.descriptor.name().c_str(), |
| internal_state->startup_target_buffer_reservation.load( |
| std::memory_order_relaxed)); |
| |
| // Abort the instance asynchronously by stopping it. From this point |
| // onwards, the service will not be able to adopt it via |
| // StartDataSource(). |
| session_it->num_aborting_data_sources++; |
| StopDataSource_AsyncBeginImpl( |
| FindDataSourceRes(static_state, internal_state, i, |
| rds.requires_callbacks_under_lock)); |
| } |
| } |
| } |
| |
| // If we did everything right, we should have aborted all still-unbound data |
| // source instances. |
| PERFETTO_DCHECK(session_it->num_unbound_data_sources == |
| session_it->num_aborting_data_sources); |
| |
| if (session_it->num_aborting_data_sources == 0) { |
| if (session_it->on_aborted) |
| task_runner_->PostTask(session_it->on_aborted); |
| |
| backend.startup_sessions.erase(session_it); |
| } |
| return; |
| } |
| // We might reach here in tests because when we start a trace, we post the |
| // Task(AbortStartupTrace, delay=timeout). When we do |
| // perfetto::ResetForTesting, we sweep dead backends, and we are not able to |
| // kill those delayed tasks because TaskRunner doesn't have support for |
| // deleting scheduled future tasks and TaskRunner doesn't have any API for us |
| // to wait for the completion of all the scheduled tasks (apart from |
| // deleting the TaskRunner) and we want to avoid doing that because we need |
| // a long running TaskRunner in muxer. |
| PERFETTO_DLOG("Invalid startup tracing session backend"); |
| } |
| |
| void TracingMuxerImpl::InitializeInstance(const TracingInitArgs& args) { |
| if (instance_ != TracingMuxerFake::Get()) { |
| // The tracing muxer was already initialized. We might need to initialize |
| // additional backends that were not configured earlier. |
| auto* muxer = static_cast<TracingMuxerImpl*>(instance_); |
| muxer->task_runner_->PostTask([muxer, args] { muxer->AddBackends(args); }); |
| return; |
| } |
| // If we previously had a TracingMuxerImpl instance which was reset, |
| // reinitialize and reuse it instead of trying to create a new one. See |
| // ResetForTesting(). |
| if (g_prev_instance) { |
| auto* muxer = g_prev_instance; |
| g_prev_instance = nullptr; |
| instance_ = muxer; |
| muxer->task_runner_->PostTask([muxer, args] { |
| muxer->Initialize(args); |
| muxer->AddBackends(args); |
| }); |
| } else { |
| new TracingMuxerImpl(args); |
| } |
| } |
| |
| // static |
| void TracingMuxerImpl::ResetForTesting() { |
| // Ideally we'd tear down the entire TracingMuxerImpl, but the lifetimes of |
| // various objects make that a non-starter. In particular: |
| // |
| // 1) Any thread that has entered a trace event has a TraceWriter, which holds |
| // a reference back to ProducerImpl::service_. |
| // |
| // 2) ProducerImpl::service_ has a reference back to the ProducerImpl. |
| // |
| // 3) ProducerImpl holds reference to TracingMuxerImpl::task_runner_, which in |
| // turn depends on TracingMuxerImpl itself. |
| // |
| // Because of this, it's not safe to deallocate TracingMuxerImpl until all |
| // threads have dropped their TraceWriters. Since we can't really ask the |
| // caller to guarantee this, we'll instead reset enough of the muxer's state |
| // so that it can be reinitialized later and ensure all necessary objects from |
| // the old state remain alive until all references have gone away. |
| auto* muxer = reinterpret_cast<TracingMuxerImpl*>(instance_); |
| |
| base::WaitableEvent reset_done; |
| auto do_reset = [muxer, &reset_done] { |
| muxer->DestroyStoppedTraceWritersForCurrentThread(); |
| // Unregister all data sources so they don't interfere with any future |
| // tracing sessions. |
| for (RegisteredDataSource& rds : muxer->data_sources_) { |
| for (RegisteredProducerBackend& backend : muxer->producer_backends_) { |
| if (!backend.producer->service_ || !backend.producer->connected_) |
| continue; |
| backend.producer->service_->UnregisterDataSource(rds.descriptor.name()); |
| } |
| } |
| for (auto& backend : muxer->consumer_backends_) { |
| // Check that no consumer session is currently active on any backend. |
| for (auto& consumer : backend.consumers) |
| PERFETTO_CHECK(!consumer->service_); |
| } |
| for (auto& backend : muxer->producer_backends_) { |
| backend.producer->muxer_ = nullptr; |
| backend.producer->DisposeConnection(); |
| muxer->dead_backends_.push_back(std::move(backend)); |
| } |
| muxer->consumer_backends_.clear(); |
| muxer->producer_backends_.clear(); |
| muxer->interceptors_.clear(); |
| |
| for (auto& ds : muxer->data_sources_) { |
| ds.static_state->ResetForTesting(); |
| } |
| |
| muxer->data_sources_.clear(); |
| muxer->next_data_source_index_ = 0; |
| |
| // Free all backends without active trace writers or other inbound |
| // references. Note that even if all the backends get swept, the muxer still |
| // needs to stay around since |task_runner_| is assumed to be long-lived. |
| muxer->SweepDeadBackends(); |
| |
| // Make sure we eventually discard any per-thread trace writers from the |
| // previous instance. |
| muxer->muxer_id_for_testing_++; |
| |
| g_prev_instance = muxer; |
| instance_ = TracingMuxerFake::Get(); |
| |
| // Call the user provided cleanups on the muxer thread. |
| for (auto& cb : muxer->reset_callbacks_) { |
| cb(); |
| } |
| |
| reset_done.Notify(); |
| }; |
| |
| // Some tests run the muxer and the test on the same thread. In these cases, |
| // we can reset synchronously. |
| if (muxer->task_runner_->RunsTasksOnCurrentThread()) { |
| do_reset(); |
| } else { |
| muxer->DestroyStoppedTraceWritersForCurrentThread(); |
| muxer->task_runner_->PostTask(std::move(do_reset)); |
| reset_done.Wait(); |
| // Call the user provided cleanups also on this thread. |
| for (auto& cb : muxer->reset_callbacks_) { |
| cb(); |
| } |
| } |
| muxer->reset_callbacks_.clear(); |
| } |
| |
| // static |
| void TracingMuxerImpl::Shutdown() { |
| auto* muxer = reinterpret_cast<TracingMuxerImpl*>(instance_); |
| |
| // Shutting down on the muxer thread would lead to a deadlock. |
| PERFETTO_CHECK(!muxer->task_runner_->RunsTasksOnCurrentThread()); |
| muxer->DestroyStoppedTraceWritersForCurrentThread(); |
| |
| std::unique_ptr<base::TaskRunner> owned_task_runner( |
| muxer->task_runner_.get()); |
| base::WaitableEvent shutdown_done; |
| owned_task_runner->PostTask([muxer, &shutdown_done] { |
| // Check that no consumer session is currently active on any backend. |
| // Producers will be automatically disconnected as a part of deleting the |
| // muxer below. |
| for (auto& backend : muxer->consumer_backends_) { |
| for (auto& consumer : backend.consumers) { |
| PERFETTO_CHECK(!consumer->service_); |
| } |
| } |
| // Make sure no trace writers are lingering around on the muxer thread. Note |
| // that we can't do this for any arbitrary thread in the process; it is the |
| // caller's responsibility to clean them up before shutting down Perfetto. |
| muxer->DestroyStoppedTraceWritersForCurrentThread(); |
| // The task runner must be deleted outside the muxer thread. This is done by |
| // `owned_task_runner` above. |
| muxer->task_runner_.release(); |
| auto* platform = muxer->platform_; |
| delete muxer; |
| instance_ = TracingMuxerFake::Get(); |
| platform->Shutdown(); |
| shutdown_done.Notify(); |
| }); |
| shutdown_done.Wait(); |
| } |
| |
| void TracingMuxerImpl::AppendResetForTestingCallback(std::function<void()> cb) { |
| reset_callbacks_.push_back(std::move(cb)); |
| } |
| |
| TracingMuxer::~TracingMuxer() = default; |
| |
| static_assert(std::is_same<internal::BufferId, BufferID>::value, |
| "public's BufferId and tracing/core's BufferID diverged"); |
| |
| } // namespace internal |
| } // namespace perfetto |