| /* |
| * Copyright (C) 2019 The Android Open Source Project |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #ifndef SRC_TRACING_INTERNAL_TRACING_MUXER_IMPL_H_ |
| #define SRC_TRACING_INTERNAL_TRACING_MUXER_IMPL_H_ |
| |
| #include <stddef.h> |
| #include <stdint.h> |
| |
| #include <array> |
| #include <atomic> |
| #include <bitset> |
| #include <functional> |
| #include <list> |
| #include <map> |
| #include <memory> |
| #include <set> |
| #include <utility> |
| #include <vector> |
| |
| #include "perfetto/base/time.h" |
| #include "perfetto/ext/base/scoped_file.h" |
| #include "perfetto/ext/base/thread_checker.h" |
| #include "perfetto/ext/tracing/core/basic_types.h" |
| #include "perfetto/ext/tracing/core/consumer.h" |
| #include "perfetto/ext/tracing/core/producer.h" |
| #include "perfetto/tracing/backend_type.h" |
| #include "perfetto/tracing/core/data_source_descriptor.h" |
| #include "perfetto/tracing/core/forward_decls.h" |
| #include "perfetto/tracing/core/trace_config.h" |
| #include "perfetto/tracing/internal/basic_types.h" |
| #include "perfetto/tracing/internal/tracing_muxer.h" |
| #include "perfetto/tracing/tracing.h" |
| |
| #include "protos/perfetto/common/interceptor_descriptor.gen.h" |
| |
| namespace perfetto { |
| |
| class ConsumerEndpoint; |
| class DataSourceBase; |
| class ProducerEndpoint; |
| class TraceWriterBase; |
| class TracingBackend; |
| class TracingSession; |
| struct TracingInitArgs; |
| |
| namespace base { |
| class TaskRunner; |
| } |
| |
| namespace shlib { |
| void ResetForTesting(); |
| } |
| |
| namespace test { |
| class TracingMuxerImplInternalsForTest; |
| } |
| |
| namespace internal { |
| |
| struct DataSourceStaticState; |
| |
| // This class acts as a bridge between the public API and the TracingBackend(s). |
| // It exposes a simplified view of the world to the API methods handling all the |
| // bookkeeping to map data source instances and trace writers to the various |
| // backends. It deals with N data sources, M backends (1 backend == 1 tracing |
| // service == 1 producer connection) and T concurrent tracing sessions. |
| // |
| // Handing data source registration and start/stop flows [producer side]: |
| // ---------------------------------------------------------------------- |
| // 1. The API client subclasses perfetto::DataSource and calls |
| // DataSource::Register<MyDataSource>(). In turn this calls into the |
| // TracingMuxer. |
| // 2. The tracing muxer iterates through all the backends (1 backend == 1 |
| // service == 1 producer connection) and registers the data source on each |
| // backend. |
| // 3. When any (services behind a) backend starts tracing and requests to start |
| // that specific data source, the TracingMuxerImpl constructs a new instance |
| // of MyDataSource and calls the OnStart() method. |
| // |
| // Controlling trace and retrieving trace data [consumer side]: |
| // ------------------------------------------------------------ |
| // 1. The API client calls Tracing::NewTrace(), returns a RAII TracingSession |
| // object. |
| // 2. NewTrace() calls into internal::TracingMuxer(Impl). TracingMuxer |
| // subclasses the TracingSession object (TracingSessionImpl) and returns it. |
| // 3. The tracing muxer identifies the backend (according to the args passed to |
| // NewTrace), creates a new Consumer and connects to it. |
| // 4. When the API client calls Start()/Stop()/ReadTrace() methods, the |
| // TracingMuxer forwards them to the consumer associated to the |
| // TracingSession. Likewise for callbacks coming from the consumer-side of |
| // the service. |
| class TracingMuxerImpl : public TracingMuxer { |
| public: |
| // This is different than TracingSessionID because it's global across all |
| // backends. TracingSessionID is global only within the scope of one service. |
| using TracingSessionGlobalID = uint64_t; |
| |
| struct RegisteredDataSource { |
| DataSourceDescriptor descriptor; |
| DataSourceFactory factory{}; |
| bool supports_multiple_instances = false; |
| bool requires_callbacks_under_lock = false; |
| bool no_flush = false; |
| DataSourceStaticState* static_state = nullptr; |
| }; |
| |
| static void InitializeInstance(const TracingInitArgs&); |
| static void ResetForTesting(); |
| static void Shutdown(); |
| |
| // TracingMuxer implementation. |
| bool RegisterDataSource(const DataSourceDescriptor&, |
| DataSourceFactory, |
| DataSourceParams, |
| bool no_flush, |
| DataSourceStaticState*) override; |
| void UpdateDataSourceDescriptor(const DataSourceDescriptor&, |
| const DataSourceStaticState*) override; |
| std::unique_ptr<TraceWriterBase> CreateTraceWriter( |
| DataSourceStaticState*, |
| uint32_t data_source_instance_index, |
| DataSourceState*, |
| BufferExhaustedPolicy buffer_exhausted_policy) override; |
| void DestroyStoppedTraceWritersForCurrentThread() override; |
| void RegisterInterceptor(const InterceptorDescriptor&, |
| InterceptorFactory, |
| InterceptorBase::TLSFactory, |
| InterceptorBase::TracePacketCallback) override; |
| |
| void ActivateTriggers(const std::vector<std::string>&, uint32_t) override; |
| |
| std::unique_ptr<TracingSession> CreateTracingSession( |
| BackendType, |
| TracingConsumerBackend* (*system_backend_factory)()); |
| std::unique_ptr<StartupTracingSession> CreateStartupTracingSession( |
| const TraceConfig& config, |
| Tracing::SetupStartupTracingOpts); |
| std::unique_ptr<StartupTracingSession> CreateStartupTracingSessionBlocking( |
| const TraceConfig& config, |
| Tracing::SetupStartupTracingOpts); |
| |
| // Producer-side bookkeeping methods. |
| void UpdateDataSourcesOnAllBackends(); |
| void SetupDataSource(TracingBackendId, |
| uint32_t backend_connection_id, |
| DataSourceInstanceID, |
| const DataSourceConfig&); |
| void StartDataSource(TracingBackendId, DataSourceInstanceID); |
| void StopDataSource_AsyncBegin(TracingBackendId, DataSourceInstanceID); |
| void ClearDataSourceIncrementalState(TracingBackendId, DataSourceInstanceID); |
| void SyncProducersForTesting(); |
| |
| // Consumer-side bookkeeping methods. |
| void SetupTracingSession(TracingSessionGlobalID, |
| const std::shared_ptr<TraceConfig>&, |
| base::ScopedFile trace_fd = base::ScopedFile()); |
| void StartTracingSession(TracingSessionGlobalID); |
| void ChangeTracingSessionConfig(TracingSessionGlobalID, const TraceConfig&); |
| void StopTracingSession(TracingSessionGlobalID); |
| void DestroyTracingSession(TracingSessionGlobalID); |
| void FlushTracingSession(TracingSessionGlobalID, |
| uint32_t, |
| std::function<void(bool)>); |
| void ReadTracingSessionData( |
| TracingSessionGlobalID, |
| std::function<void(TracingSession::ReadTraceCallbackArgs)>); |
| void GetTraceStats(TracingSessionGlobalID, |
| TracingSession::GetTraceStatsCallback); |
| void QueryServiceState(TracingSessionGlobalID, |
| TracingSession::QueryServiceStateCallback); |
| |
| // Sets the batching period to |batch_commits_duration_ms| on the backends |
| // with type |backend_type|. |
| void SetBatchCommitsDurationForTesting(uint32_t batch_commits_duration_ms, |
| BackendType backend_type); |
| |
| // Enables direct SMB patching on the backends with type |backend_type| (see |
| // SharedMemoryArbiter::EnableDirectSMBPatching). Returns true if the |
| // operation succeeded for all backends with type |backend_type|, false |
| // otherwise. |
| bool EnableDirectSMBPatchingForTesting(BackendType backend_type); |
| |
| void SetMaxProducerReconnectionsForTesting(uint32_t count); |
| |
| private: |
| friend class test::TracingMuxerImplInternalsForTest; |
| friend void shlib::ResetForTesting(); |
| |
| // For each TracingBackend we create and register one ProducerImpl instance. |
| // This talks to the producer-side of the service, gets start/stop requests |
| // from it and routes them to the registered data sources. |
| // One ProducerImpl == one backend == one tracing service. |
| // This class is needed to disambiguate callbacks coming from different |
| // services. TracingMuxerImpl can't directly implement the Producer interface |
| // because the Producer virtual methods don't allow to identify the service. |
| class ProducerImpl : public Producer { |
| public: |
| ProducerImpl(TracingMuxerImpl*, |
| TracingBackendId, |
| uint32_t shmem_batch_commits_duration_ms, |
| bool shmem_direct_patching_enabled); |
| ~ProducerImpl() override; |
| |
| void Initialize(std::unique_ptr<ProducerEndpoint> endpoint); |
| void RegisterDataSource(const DataSourceDescriptor&, |
| DataSourceFactory, |
| DataSourceStaticState*); |
| void DisposeConnection(); |
| |
| // perfetto::Producer implementation. |
| void OnConnect() override; |
| void OnDisconnect() override; |
| void OnTracingSetup() override; |
| void OnStartupTracingSetup() override; |
| void SetupDataSource(DataSourceInstanceID, |
| const DataSourceConfig&) override; |
| void StartDataSource(DataSourceInstanceID, |
| const DataSourceConfig&) override; |
| void StopDataSource(DataSourceInstanceID) override; |
| void Flush(FlushRequestID, |
| const DataSourceInstanceID*, |
| size_t, |
| FlushFlags) override; |
| void ClearIncrementalState(const DataSourceInstanceID*, size_t) override; |
| |
| bool SweepDeadServices(); |
| void SendOnConnectTriggers(); |
| void NotifyFlushForDataSourceDone(DataSourceInstanceID, FlushRequestID); |
| |
| PERFETTO_THREAD_CHECKER(thread_checker_) |
| TracingMuxerImpl* muxer_; |
| TracingBackendId const backend_id_; |
| bool connected_ = false; |
| bool did_setup_tracing_ = false; |
| bool did_setup_startup_tracing_ = false; |
| std::atomic<uint32_t> connection_id_{0}; |
| uint16_t last_startup_target_buffer_reservation_ = 0; |
| bool is_producer_provided_smb_ = false; |
| bool producer_provided_smb_failed_ = false; |
| |
| const uint32_t shmem_batch_commits_duration_ms_ = 0; |
| const bool shmem_direct_patching_enabled_ = false; |
| |
| // Set of data sources that have been actually registered on this producer. |
| // This can be a subset of the global |data_sources_|, because data sources |
| // can register before the producer is fully connected. |
| std::bitset<kMaxDataSources> registered_data_sources_{}; |
| |
| // A collection of disconnected service endpoints. Since trace writers on |
| // arbitrary threads might continue writing data to disconnected services, |
| // we keep the old services around and periodically try to clean up ones |
| // that no longer have any writers (see SweepDeadServices). |
| std::list<std::shared_ptr<ProducerEndpoint>> dead_services_; |
| |
| // Triggers that should be sent when the service connects (trigger_name, |
| // expiration). |
| std::list<std::pair<std::string, base::TimeMillis>> on_connect_triggers_; |
| |
| std::map<FlushRequestID, std::set<DataSourceInstanceID>> pending_flushes_; |
| |
| // The currently active service endpoint is maintained as an atomic shared |
| // pointer so it won't get deleted from underneath threads that are creating |
| // trace writers. At any given time one endpoint can be shared (and thus |
| // kept alive) by the |service_| pointer, an entry in |dead_services_| and |
| // as a pointer on the stack in CreateTraceWriter() (on an arbitrary |
| // thread). The endpoint is never shared outside ProducerImpl itself. |
| // |
| // WARNING: Any *write* access to this variable or any *read* access from a |
| // non-muxer thread must be done through std::atomic_{load,store} to avoid |
| // data races. |
| std::shared_ptr<ProducerEndpoint> service_; // Keep last. |
| }; |
| |
| // For each TracingSession created by the API client (Tracing::NewTrace() we |
| // create and register one ConsumerImpl instance. |
| // This talks to the consumer-side of the service, gets end-of-trace and |
| // on-trace-data callbacks and routes them to the API client callbacks. |
| // This class is needed to disambiguate callbacks coming from different |
| // tracing sessions. |
| class ConsumerImpl : public Consumer { |
| public: |
| ConsumerImpl(TracingMuxerImpl*, BackendType, TracingSessionGlobalID); |
| ~ConsumerImpl() override; |
| |
| void Initialize(std::unique_ptr<ConsumerEndpoint> endpoint); |
| |
| // perfetto::Consumer implementation. |
| void OnConnect() override; |
| void OnDisconnect() override; |
| void OnTracingDisabled(const std::string& error) override; |
| void OnTraceData(std::vector<TracePacket>, bool has_more) override; |
| void OnDetach(bool success) override; |
| void OnAttach(bool success, const TraceConfig&) override; |
| void OnTraceStats(bool success, const TraceStats&) override; |
| void OnObservableEvents(const ObservableEvents&) override; |
| void OnSessionCloned(const OnSessionClonedArgs&) override; |
| |
| void NotifyStartComplete(); |
| void NotifyError(const TracingError&); |
| void NotifyStopComplete(); |
| |
| // Will eventually inform the |muxer_| when it is safe to remove |this|. |
| void Disconnect(); |
| |
| TracingMuxerImpl* muxer_; |
| BackendType const backend_type_; |
| TracingSessionGlobalID const session_id_; |
| bool connected_ = false; |
| |
| // This is to handle the case where the Setup call from the API client |
| // arrives before the consumer has connected. In this case we keep around |
| // the config and check if we have it after connection. |
| bool start_pending_ = false; |
| |
| // Similarly if the session is stopped before the consumer was connected, we |
| // need to wait until the session has started before stopping it. |
| bool stop_pending_ = false; |
| |
| // Similarly we need to buffer a call to get trace statistics if the |
| // consumer wasn't connected yet. |
| bool get_trace_stats_pending_ = false; |
| |
| // Whether this session was already stopped. This will happen in response to |
| // Stop{,Blocking}, but also if the service stops the session for us |
| // automatically (e.g., when there are no data sources). |
| bool stopped_ = false; |
| |
| // shared_ptr because it's posted across threads. This is to avoid copying |
| // it more than once. |
| std::shared_ptr<TraceConfig> trace_config_; |
| base::ScopedFile trace_fd_; |
| |
| // If the API client passes a callback to start, we should invoke this when |
| // NotifyStartComplete() is invoked. |
| std::function<void()> start_complete_callback_; |
| |
| // An internal callback used to implement StartBlocking(). |
| std::function<void()> blocking_start_complete_callback_; |
| |
| // If the API client passes a callback to get notification about the |
| // errors, we should invoke this when NotifyError() is invoked. |
| std::function<void(TracingError)> error_callback_; |
| |
| // If the API client passes a callback to stop, we should invoke this when |
| // OnTracingDisabled() is invoked. |
| std::function<void()> stop_complete_callback_; |
| |
| // An internal callback used to implement StopBlocking(). |
| std::function<void()> blocking_stop_complete_callback_; |
| |
| // Callback passed to ReadTrace(). |
| std::function<void(TracingSession::ReadTraceCallbackArgs)> |
| read_trace_callback_; |
| |
| // Callback passed to GetTraceStats(). |
| TracingSession::GetTraceStatsCallback get_trace_stats_callback_; |
| |
| // Callback for a pending call to QueryServiceState(). |
| TracingSession::QueryServiceStateCallback query_service_state_callback_; |
| |
| // The states of all data sources in this tracing session. |true| means the |
| // data source has started tracing. |
| using DataSourceHandle = std::pair<std::string, std::string>; |
| std::map<DataSourceHandle, bool> data_source_states_; |
| |
| std::unique_ptr<ConsumerEndpoint> service_; // Keep before last. |
| PERFETTO_THREAD_CHECKER(thread_checker_) // Keep last. |
| }; |
| |
| // This object is returned to API clients when they call |
| // Tracing::CreateTracingSession(). |
| class TracingSessionImpl : public TracingSession { |
| public: |
| TracingSessionImpl(TracingMuxerImpl*, TracingSessionGlobalID, BackendType); |
| ~TracingSessionImpl() override; |
| void Setup(const TraceConfig&, int fd) override; |
| void Start() override; |
| void StartBlocking() override; |
| void SetOnStartCallback(std::function<void()>) override; |
| void SetOnErrorCallback(std::function<void(TracingError)>) override; |
| void Stop() override; |
| void StopBlocking() override; |
| void Flush(std::function<void(bool)>, uint32_t timeout_ms) override; |
| void ReadTrace(ReadTraceCallback) override; |
| void SetOnStopCallback(std::function<void()>) override; |
| void GetTraceStats(GetTraceStatsCallback) override; |
| void QueryServiceState(QueryServiceStateCallback) override; |
| void ChangeTraceConfig(const TraceConfig&) override; |
| |
| private: |
| TracingMuxerImpl* const muxer_; |
| TracingSessionGlobalID const session_id_; |
| BackendType const backend_type_; |
| }; |
| |
| // This object is returned to API clients when they call |
| // Tracing::SetupStartupTracing(). |
| class StartupTracingSessionImpl : public StartupTracingSession { |
| public: |
| StartupTracingSessionImpl(TracingMuxerImpl*, |
| TracingSessionGlobalID, |
| BackendType); |
| ~StartupTracingSessionImpl() override; |
| void Abort() override; |
| void AbortBlocking() override; |
| |
| private: |
| TracingMuxerImpl* const muxer_; |
| TracingSessionGlobalID const session_id_; |
| BackendType backend_type_; |
| }; |
| |
| struct RegisteredInterceptor { |
| protos::gen::InterceptorDescriptor descriptor; |
| InterceptorFactory factory{}; |
| InterceptorBase::TLSFactory tls_factory{}; |
| InterceptorBase::TracePacketCallback packet_callback{}; |
| }; |
| |
| struct RegisteredStartupSession { |
| TracingSessionID session_id = 0; |
| int num_unbound_data_sources = 0; |
| |
| bool is_aborting = false; |
| int num_aborting_data_sources = 0; |
| |
| std::function<void()> on_aborted; |
| std::function<void()> on_adopted; |
| }; |
| |
| struct RegisteredProducerBackend { |
| // Backends are supposed to have static lifetime. |
| TracingProducerBackend* backend = nullptr; |
| TracingBackendId id = 0; |
| BackendType type{}; |
| |
| TracingBackend::ConnectProducerArgs producer_conn_args; |
| std::unique_ptr<ProducerImpl> producer; |
| |
| std::vector<RegisteredStartupSession> startup_sessions; |
| }; |
| |
| struct RegisteredConsumerBackend { |
| // Backends are supposed to have static lifetime. |
| TracingConsumerBackend* backend = nullptr; |
| BackendType type{}; |
| // The calling code can request more than one concurrently active tracing |
| // session for the same backend. We need to create one consumer per session. |
| std::vector<std::unique_ptr<ConsumerImpl>> consumers; |
| }; |
| |
| void UpdateDataSourceOnAllBackends(RegisteredDataSource& rds, |
| bool is_changed); |
| explicit TracingMuxerImpl(const TracingInitArgs&); |
| void Initialize(const TracingInitArgs& args); |
| void AddBackends(const TracingInitArgs& args); |
| void AddConsumerBackend(TracingConsumerBackend* backend, BackendType type); |
| void AddProducerBackend(TracingProducerBackend* backend, |
| BackendType type, |
| const TracingInitArgs& args); |
| ConsumerImpl* FindConsumer(TracingSessionGlobalID session_id); |
| std::pair<ConsumerImpl*, RegisteredConsumerBackend*> FindConsumerAndBackend( |
| TracingSessionGlobalID session_id); |
| RegisteredProducerBackend* FindProducerBackendById(TracingBackendId id); |
| RegisteredProducerBackend* FindProducerBackendByType(BackendType type); |
| RegisteredConsumerBackend* FindConsumerBackendByType(BackendType type); |
| void InitializeConsumer(TracingSessionGlobalID session_id); |
| void OnConsumerDisconnected(ConsumerImpl* consumer); |
| void OnProducerDisconnected(ProducerImpl* producer); |
| // Test only method. |
| void SweepDeadBackends(); |
| |
| struct FindDataSourceRes { |
| FindDataSourceRes() = default; |
| FindDataSourceRes(DataSourceStaticState* a, |
| DataSourceState* b, |
| uint32_t c, |
| bool d) |
| : static_state(a), |
| internal_state(b), |
| instance_idx(c), |
| requires_callbacks_under_lock(d) {} |
| explicit operator bool() const { return !!internal_state; } |
| |
| DataSourceStaticState* static_state = nullptr; |
| DataSourceState* internal_state = nullptr; |
| uint32_t instance_idx = 0; |
| bool requires_callbacks_under_lock = false; |
| }; |
| FindDataSourceRes FindDataSource(TracingBackendId, DataSourceInstanceID); |
| |
| FindDataSourceRes SetupDataSourceImpl( |
| const RegisteredDataSource&, |
| TracingBackendId, |
| uint32_t backend_connection_id, |
| DataSourceInstanceID, |
| const DataSourceConfig&, |
| TracingSessionGlobalID startup_session_id); |
| void StartDataSourceImpl(const FindDataSourceRes&); |
| void StopDataSource_AsyncBeginImpl(const FindDataSourceRes&); |
| void StopDataSource_AsyncEnd(TracingBackendId, |
| uint32_t backend_connection_id, |
| DataSourceInstanceID, |
| const FindDataSourceRes&); |
| bool FlushDataSource_AsyncBegin(TracingBackendId, |
| DataSourceInstanceID, |
| FlushRequestID, |
| FlushFlags); |
| void FlushDataSource_AsyncEnd(TracingBackendId, |
| uint32_t backend_connection_id, |
| DataSourceInstanceID, |
| const FindDataSourceRes&, |
| FlushRequestID); |
| void AbortStartupTracingSession(TracingSessionGlobalID, BackendType); |
| // When ResetForTesting() is executed, `cb` will be called on the calling |
| // thread and on the muxer thread. |
| void AppendResetForTestingCallback(std::function<void()> cb); |
| |
| // WARNING: If you add new state here, be sure to update ResetForTesting. |
| std::unique_ptr<base::TaskRunner> task_runner_; |
| std::vector<RegisteredDataSource> data_sources_; |
| // These lists can only have one backend per BackendType. The elements are |
| // sorted by BackendType priority (see BackendTypePriority). They always |
| // contain a fake low-priority kUnspecifiedBackend at the end. |
| std::list<RegisteredProducerBackend> producer_backends_; |
| std::list<RegisteredConsumerBackend> consumer_backends_; |
| std::vector<RegisteredInterceptor> interceptors_; |
| TracingPolicy* policy_ = nullptr; |
| |
| // Learn more at TracingInitArgs::supports_multiple_data_source_instances |
| bool supports_multiple_data_source_instances_ = true; |
| |
| std::atomic<TracingSessionGlobalID> next_tracing_session_id_{}; |
| std::atomic<uint32_t> next_data_source_index_{}; |
| uint32_t muxer_id_for_testing_{}; |
| |
| // Maximum number of times we will try to reconnect producer backend. |
| // Should only be modified for testing purposes. |
| std::atomic<uint32_t> max_producer_reconnections_{100u}; |
| |
| // Test only member. |
| // After ResetForTesting() is called, holds tracing backends which needs to be |
| // kept alive until all inbound references have gone away. See |
| // SweepDeadBackends(). |
| std::list<RegisteredProducerBackend> dead_backends_; |
| |
| // Test only member. |
| // Executes these cleanup functions on the calling thread and on the muxer |
| // thread when ResetForTesting() is called. |
| std::list<std::function<void()>> reset_callbacks_; |
| |
| PERFETTO_THREAD_CHECKER(thread_checker_) |
| }; |
| |
| } // namespace internal |
| } // namespace perfetto |
| |
| #endif // SRC_TRACING_INTERNAL_TRACING_MUXER_IMPL_H_ |