blob: 5a583ba4658ea952c3f7bbab31b4aafaeac7ddd6 [file] [log] [blame]
/*
* Copyright (C) 2017 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef SRC_TRACING_SERVICE_TRACING_SERVICE_IMPL_H_
#define SRC_TRACING_SERVICE_TRACING_SERVICE_IMPL_H_
#include <algorithm>
#include <functional>
#include <map>
#include <memory>
#include <mutex>
#include <optional>
#include <random>
#include <set>
#include <utility>
#include <vector>
#include "perfetto/base/logging.h"
#include "perfetto/base/status.h"
#include "perfetto/base/time.h"
#include "perfetto/ext/base/circular_queue.h"
#include "perfetto/ext/base/periodic_task.h"
#include "perfetto/ext/base/uuid.h"
#include "perfetto/ext/base/weak_ptr.h"
#include "perfetto/ext/tracing/core/basic_types.h"
#include "perfetto/ext/tracing/core/client_identity.h"
#include "perfetto/ext/tracing/core/commit_data_request.h"
#include "perfetto/ext/tracing/core/observable_events.h"
#include "perfetto/ext/tracing/core/shared_memory_abi.h"
#include "perfetto/ext/tracing/core/trace_stats.h"
#include "perfetto/ext/tracing/core/tracing_service.h"
#include "perfetto/tracing/core/data_source_config.h"
#include "perfetto/tracing/core/data_source_descriptor.h"
#include "perfetto/tracing/core/forward_decls.h"
#include "perfetto/tracing/core/trace_config.h"
#include "src/android_stats/perfetto_atoms.h"
#include "src/tracing/core/id_allocator.h"
namespace protozero {
class MessageFilter;
}
namespace perfetto {
namespace base {
class TaskRunner;
} // namespace base
namespace protos {
namespace gen {
enum TraceStats_FinalFlushOutcome : int;
}
} // namespace protos
class Consumer;
class Producer;
class SharedMemory;
class SharedMemoryArbiterImpl;
class TraceBuffer;
class TracePacket;
// The tracing service business logic.
class TracingServiceImpl : public TracingService {
private:
struct DataSourceInstance;
public:
static constexpr size_t kMaxShmSize = 32 * 1024 * 1024ul;
static constexpr uint32_t kDataSourceStopTimeoutMs = 5000;
static constexpr uint8_t kSyncMarker[] = {0x82, 0x47, 0x7a, 0x76, 0xb2, 0x8d,
0x42, 0xba, 0x81, 0xdc, 0x33, 0x32,
0x6d, 0x57, 0xa0, 0x79};
static constexpr size_t kMaxTracePacketSliceSize =
128 * 1024 - 512; // This is ipc::kIPCBufferSize - 512, see assertion in
// tracing_integration_test.cc and b/195065199
// This is a rough threshold to determine how many bytes to read from the
// buffers on each iteration when writing into a file. Since filtering and
// compression allocate memory, this effectively limits the amount of memory
// allocated.
static constexpr size_t kWriteIntoFileChunkSize = 1024 * 1024ul;
// The implementation behind the service endpoint exposed to each producer.
class ProducerEndpointImpl : public TracingService::ProducerEndpoint {
public:
ProducerEndpointImpl(ProducerID,
const ClientIdentity& client_identity,
TracingServiceImpl*,
base::TaskRunner*,
Producer*,
const std::string& producer_name,
const std::string& sdk_version,
bool in_process,
bool smb_scraping_enabled);
~ProducerEndpointImpl() override;
// TracingService::ProducerEndpoint implementation.
void Disconnect() override;
void RegisterDataSource(const DataSourceDescriptor&) override;
void UpdateDataSource(const DataSourceDescriptor&) override;
void UnregisterDataSource(const std::string& name) override;
void RegisterTraceWriter(uint32_t writer_id,
uint32_t target_buffer) override;
void UnregisterTraceWriter(uint32_t writer_id) override;
void CommitData(const CommitDataRequest&, CommitDataCallback) override;
void SetupSharedMemory(std::unique_ptr<SharedMemory>,
size_t page_size_bytes,
bool provided_by_producer);
std::unique_ptr<TraceWriter> CreateTraceWriter(
BufferID,
BufferExhaustedPolicy) override;
SharedMemoryArbiter* MaybeSharedMemoryArbiter() override;
bool IsShmemProvidedByProducer() const override;
void NotifyFlushComplete(FlushRequestID) override;
void NotifyDataSourceStarted(DataSourceInstanceID) override;
void NotifyDataSourceStopped(DataSourceInstanceID) override;
SharedMemory* shared_memory() const override;
size_t shared_buffer_page_size_kb() const override;
void ActivateTriggers(const std::vector<std::string>&) override;
void Sync(std::function<void()> callback) override;
void OnTracingSetup();
void SetupDataSource(DataSourceInstanceID, const DataSourceConfig&);
void StartDataSource(DataSourceInstanceID, const DataSourceConfig&);
void StopDataSource(DataSourceInstanceID);
void Flush(FlushRequestID,
const std::vector<DataSourceInstanceID>&,
FlushFlags);
void OnFreeBuffers(const std::vector<BufferID>& target_buffers);
void ClearIncrementalState(const std::vector<DataSourceInstanceID>&);
bool is_allowed_target_buffer(BufferID buffer_id) const {
return allowed_target_buffers_.count(buffer_id);
}
std::optional<BufferID> buffer_id_for_writer(WriterID writer_id) const {
const auto it = writers_.find(writer_id);
if (it != writers_.end())
return it->second;
return std::nullopt;
}
uid_t uid() const { return client_identity_.uid(); }
pid_t pid() const { return client_identity_.pid(); }
const ClientIdentity& client_identity() const { return client_identity_; }
private:
friend class TracingServiceImpl;
friend class TracingServiceImplTest;
friend class TracingIntegrationTest;
ProducerEndpointImpl(const ProducerEndpointImpl&) = delete;
ProducerEndpointImpl& operator=(const ProducerEndpointImpl&) = delete;
ProducerID const id_;
ClientIdentity const client_identity_;
TracingServiceImpl* const service_;
base::TaskRunner* const task_runner_;
Producer* producer_;
std::unique_ptr<SharedMemory> shared_memory_;
size_t shared_buffer_page_size_kb_ = 0;
SharedMemoryABI shmem_abi_;
size_t shmem_size_hint_bytes_ = 0;
size_t shmem_page_size_hint_bytes_ = 0;
bool is_shmem_provided_by_producer_ = false;
const std::string name_;
std::string sdk_version_;
bool in_process_;
bool smb_scraping_enabled_;
// Set of the global target_buffer IDs that the producer is configured to
// write into in any active tracing session.
std::set<BufferID> allowed_target_buffers_;
// Maps registered TraceWriter IDs to their target buffers as registered by
// the producer. Note that producers aren't required to register their
// writers, so we may see commits of chunks with WriterIDs that aren't
// contained in this map. However, if a producer does register a writer, the
// service will prevent the writer from writing into any other buffer than
// the one associated with it here. The BufferIDs stored in this map are
// untrusted, so need to be verified against |allowed_target_buffers_|
// before use.
std::map<WriterID, BufferID> writers_;
// This is used only in in-process configurations.
// SharedMemoryArbiterImpl methods themselves are thread-safe.
std::unique_ptr<SharedMemoryArbiterImpl> inproc_shmem_arbiter_;
PERFETTO_THREAD_CHECKER(thread_checker_)
base::WeakPtrFactory<ProducerEndpointImpl> weak_ptr_factory_; // Keep last.
};
// The implementation behind the service endpoint exposed to each consumer.
class ConsumerEndpointImpl : public TracingService::ConsumerEndpoint {
public:
ConsumerEndpointImpl(TracingServiceImpl*,
base::TaskRunner*,
Consumer*,
uid_t uid);
~ConsumerEndpointImpl() override;
void NotifyOnTracingDisabled(const std::string& error);
void NotifyCloneSnapshotTrigger();
// TracingService::ConsumerEndpoint implementation.
void EnableTracing(const TraceConfig&, base::ScopedFile) override;
void ChangeTraceConfig(const TraceConfig& cfg) override;
void StartTracing() override;
void DisableTracing() override;
void ReadBuffers() override;
void FreeBuffers() override;
void Flush(uint32_t timeout_ms, FlushCallback, FlushFlags) override;
void Detach(const std::string& key) override;
void Attach(const std::string& key) override;
void GetTraceStats() override;
void ObserveEvents(uint32_t enabled_event_types) override;
void QueryServiceState(QueryServiceStateArgs,
QueryServiceStateCallback) override;
void QueryCapabilities(QueryCapabilitiesCallback) override;
void SaveTraceForBugreport(SaveTraceForBugreportCallback) override;
void CloneSession(TracingSessionID, CloneSessionArgs) override;
// Will queue a task to notify the consumer about the state change.
void OnDataSourceInstanceStateChange(const ProducerEndpointImpl&,
const DataSourceInstance&);
void OnAllDataSourcesStarted();
base::WeakPtr<ConsumerEndpointImpl> GetWeakPtr() {
return weak_ptr_factory_.GetWeakPtr();
}
private:
friend class TracingServiceImpl;
ConsumerEndpointImpl(const ConsumerEndpointImpl&) = delete;
ConsumerEndpointImpl& operator=(const ConsumerEndpointImpl&) = delete;
// Returns a pointer to an ObservableEvents object that the caller can fill
// and schedules a task to send the ObservableEvents to the consumer.
ObservableEvents* AddObservableEvents();
base::TaskRunner* const task_runner_;
TracingServiceImpl* const service_;
Consumer* const consumer_;
uid_t const uid_;
TracingSessionID tracing_session_id_ = 0;
// Whether the consumer is interested in DataSourceInstance state change
// events.
uint32_t observable_events_mask_ = 0;
// ObservableEvents that will be sent to the consumer. If set, a task to
// flush the events to the consumer has been queued.
std::unique_ptr<ObservableEvents> observable_events_;
PERFETTO_THREAD_CHECKER(thread_checker_)
base::WeakPtrFactory<ConsumerEndpointImpl> weak_ptr_factory_; // Keep last.
};
class RelayEndpointImpl : public TracingService::RelayEndpoint {
public:
using SyncMode = RelayEndpoint::SyncMode;
struct SyncedClockSnapshots {
SyncedClockSnapshots(SyncMode _sync_mode,
ClockSnapshotVector _client_clocks,
ClockSnapshotVector _host_clocks)
: sync_mode(_sync_mode),
client_clocks(std::move(_client_clocks)),
host_clocks(std::move(_host_clocks)) {}
SyncMode sync_mode;
ClockSnapshotVector client_clocks;
ClockSnapshotVector host_clocks;
};
explicit RelayEndpointImpl(RelayClientID relay_client_id,
TracingServiceImpl* service);
~RelayEndpointImpl() override;
void SyncClocks(SyncMode sync_mode,
ClockSnapshotVector client_clocks,
ClockSnapshotVector host_clocks) override;
void Disconnect() override;
MachineID machine_id() const { return relay_client_id_.first; }
base::CircularQueue<SyncedClockSnapshots>& synced_clocks() {
return synced_clocks_;
}
private:
RelayEndpointImpl(const RelayEndpointImpl&) = delete;
RelayEndpointImpl& operator=(const RelayEndpointImpl&) = delete;
RelayClientID relay_client_id_;
TracingServiceImpl* const service_;
base::CircularQueue<SyncedClockSnapshots> synced_clocks_;
PERFETTO_THREAD_CHECKER(thread_checker_)
};
explicit TracingServiceImpl(std::unique_ptr<SharedMemory::Factory>,
base::TaskRunner*,
InitOpts = {});
~TracingServiceImpl() override;
// Called by ProducerEndpointImpl.
void DisconnectProducer(ProducerID);
void RegisterDataSource(ProducerID, const DataSourceDescriptor&);
void UpdateDataSource(ProducerID, const DataSourceDescriptor&);
void UnregisterDataSource(ProducerID, const std::string& name);
void CopyProducerPageIntoLogBuffer(ProducerID,
const ClientIdentity&,
WriterID,
ChunkID,
BufferID,
uint16_t num_fragments,
uint8_t chunk_flags,
bool chunk_complete,
const uint8_t* src,
size_t size);
void ApplyChunkPatches(ProducerID,
const std::vector<CommitDataRequest::ChunkToPatch>&);
void NotifyFlushDoneForProducer(ProducerID, FlushRequestID);
void NotifyDataSourceStarted(ProducerID, const DataSourceInstanceID);
void NotifyDataSourceStopped(ProducerID, const DataSourceInstanceID);
void ActivateTriggers(ProducerID, const std::vector<std::string>& triggers);
// Called by ConsumerEndpointImpl.
bool DetachConsumer(ConsumerEndpointImpl*, const std::string& key);
bool AttachConsumer(ConsumerEndpointImpl*, const std::string& key);
void DisconnectConsumer(ConsumerEndpointImpl*);
base::Status EnableTracing(ConsumerEndpointImpl*,
const TraceConfig&,
base::ScopedFile);
void ChangeTraceConfig(ConsumerEndpointImpl*, const TraceConfig&);
base::Status StartTracing(TracingSessionID);
void DisableTracing(TracingSessionID, bool disable_immediately = false);
void Flush(TracingSessionID tsid,
uint32_t timeout_ms,
ConsumerEndpoint::FlushCallback,
FlushFlags);
void FlushAndDisableTracing(TracingSessionID);
void FlushAndCloneSession(ConsumerEndpointImpl*,
TracingSessionID,
bool skip_filter,
bool for_bugreport);
// Starts reading the internal tracing buffers from the tracing session `tsid`
// and sends them to `*consumer` (which must be != nullptr).
//
// Only reads a limited amount of data in one call. If there's more data,
// immediately schedules itself on a PostTask.
//
// Returns false in case of error.
bool ReadBuffersIntoConsumer(TracingSessionID tsid,
ConsumerEndpointImpl* consumer);
// Reads all the tracing buffers from the tracing session `tsid` and writes
// them into the associated file.
//
// Reads all the data in the buffers (or until the file is full) before
// returning.
//
// If the tracing session write_period_ms is 0, the file is full or there has
// been an error, flushes the file and closes it. Otherwise, schedules itself
// to be executed after write_period_ms.
//
// Returns false in case of error.
bool ReadBuffersIntoFile(TracingSessionID);
void FreeBuffers(TracingSessionID);
// Service implementation.
std::unique_ptr<TracingService::ProducerEndpoint> ConnectProducer(
Producer*,
const ClientIdentity& client_identity,
const std::string& producer_name,
size_t shared_memory_size_hint_bytes = 0,
bool in_process = false,
ProducerSMBScrapingMode smb_scraping_mode =
ProducerSMBScrapingMode::kDefault,
size_t shared_memory_page_size_hint_bytes = 0,
std::unique_ptr<SharedMemory> shm = nullptr,
const std::string& sdk_version = {}) override;
std::unique_ptr<TracingService::ConsumerEndpoint> ConnectConsumer(
Consumer*,
uid_t) override;
std::unique_ptr<TracingService::RelayEndpoint> ConnectRelayClient(
RelayClientID) override;
void DisconnectRelayClient(RelayClientID);
// Set whether SMB scraping should be enabled by default or not. Producers can
// override this setting for their own SMBs.
void SetSMBScrapingEnabled(bool enabled) override {
smb_scraping_enabled_ = enabled;
}
// Exposed mainly for testing.
size_t num_producers() const { return producers_.size(); }
ProducerEndpointImpl* GetProducer(ProducerID) const;
private:
friend class TracingServiceImplTest;
friend class TracingIntegrationTest;
static constexpr int64_t kOneDayInNs = 24ll * 60 * 60 * 1000 * 1000 * 1000;
struct TriggerHistory {
int64_t timestamp_ns;
uint64_t name_hash;
bool operator<(const TriggerHistory& other) const {
return timestamp_ns < other.timestamp_ns;
}
};
struct RegisteredDataSource {
ProducerID producer_id;
DataSourceDescriptor descriptor;
};
// Represents an active data source for a tracing session.
struct DataSourceInstance {
DataSourceInstance(DataSourceInstanceID id,
const DataSourceConfig& cfg,
const std::string& ds_name,
bool notify_on_start,
bool notify_on_stop,
bool handles_incremental_state_invalidation,
bool no_flush_)
: instance_id(id),
config(cfg),
data_source_name(ds_name),
will_notify_on_start(notify_on_start),
will_notify_on_stop(notify_on_stop),
handles_incremental_state_clear(
handles_incremental_state_invalidation),
no_flush(no_flush_) {}
DataSourceInstance(const DataSourceInstance&) = delete;
DataSourceInstance& operator=(const DataSourceInstance&) = delete;
DataSourceInstanceID instance_id;
DataSourceConfig config;
std::string data_source_name;
bool will_notify_on_start;
bool will_notify_on_stop;
bool handles_incremental_state_clear;
bool no_flush;
enum DataSourceInstanceState {
CONFIGURED,
STARTING,
STARTED,
STOPPING,
STOPPED
};
DataSourceInstanceState state = CONFIGURED;
};
struct PendingFlush {
std::set<ProducerID> producers;
ConsumerEndpoint::FlushCallback callback;
explicit PendingFlush(decltype(callback) cb) : callback(std::move(cb)) {}
};
using PendingCloneID = uint64_t;
struct PendingClone {
size_t pending_flush_cnt = 0;
// This vector might not be populated all at once. Some buffers might be
// nullptr while flushing is not done.
std::vector<std::unique_ptr<TraceBuffer>> buffers;
bool flush_failed = false;
base::WeakPtr<ConsumerEndpointImpl> weak_consumer;
bool skip_trace_filter = false;
};
// Holds the state of a tracing session. A tracing session is uniquely bound
// a specific Consumer. Each Consumer can own one or more sessions.
struct TracingSession {
enum State {
DISABLED = 0,
CONFIGURED,
STARTED,
DISABLING_WAITING_STOP_ACKS,
CLONED_READ_ONLY,
};
TracingSession(TracingSessionID,
ConsumerEndpointImpl*,
const TraceConfig&,
base::TaskRunner*);
TracingSession(TracingSession&&) = delete;
TracingSession& operator=(TracingSession&&) = delete;
size_t num_buffers() const { return buffers_index.size(); }
uint32_t delay_to_next_write_period_ms() const {
PERFETTO_DCHECK(write_period_ms > 0);
return write_period_ms -
static_cast<uint32_t>(base::GetWallTimeMs().count() %
write_period_ms);
}
uint32_t flush_timeout_ms() {
uint32_t timeout_ms = config.flush_timeout_ms();
return timeout_ms ? timeout_ms : kDefaultFlushTimeoutMs;
}
uint32_t data_source_stop_timeout_ms() {
uint32_t timeout_ms = config.data_source_stop_timeout_ms();
return timeout_ms ? timeout_ms : kDataSourceStopTimeoutMs;
}
PacketSequenceID GetPacketSequenceID(MachineID machine_id,
ProducerID producer_id,
WriterID writer_id) {
auto key = std::make_tuple(machine_id, producer_id, writer_id);
auto it = packet_sequence_ids.find(key);
if (it != packet_sequence_ids.end())
return it->second;
// We shouldn't run out of sequence IDs (producer ID is 16 bit, writer IDs
// are limited to 1024).
static_assert(kMaxPacketSequenceID > kMaxProducerID * kMaxWriterID,
"PacketSequenceID value space doesn't cover service "
"sequence ID and all producer/writer ID combinations!");
PERFETTO_DCHECK(last_packet_sequence_id < kMaxPacketSequenceID);
PacketSequenceID sequence_id = ++last_packet_sequence_id;
packet_sequence_ids[key] = sequence_id;
return sequence_id;
}
DataSourceInstance* GetDataSourceInstance(
ProducerID producer_id,
DataSourceInstanceID instance_id) {
for (auto& inst_kv : data_source_instances) {
if (inst_kv.first != producer_id ||
inst_kv.second.instance_id != instance_id) {
continue;
}
return &inst_kv.second;
}
return nullptr;
}
bool AllDataSourceInstancesStarted() {
return std::all_of(
data_source_instances.begin(), data_source_instances.end(),
[](decltype(data_source_instances)::const_reference x) {
return x.second.state == DataSourceInstance::STARTED;
});
}
bool AllDataSourceInstancesStopped() {
return std::all_of(
data_source_instances.begin(), data_source_instances.end(),
[](decltype(data_source_instances)::const_reference x) {
return x.second.state == DataSourceInstance::STOPPED;
});
}
const TracingSessionID id;
// The consumer that started the session.
// Can be nullptr if the consumer detached from the session.
ConsumerEndpointImpl* consumer_maybe_null;
// Unix uid of the consumer. This is valid even after the consumer detaches
// and does not change for the entire duration of the session. It is used to
// prevent that a consumer re-attaches to a session from a different uid.
uid_t const consumer_uid;
// The list of triggers this session received while alive and the time they
// were received at. This is used to insert 'fake' packets back to the
// consumer so they can tell when some event happened. The order matches the
// order they were received.
struct TriggerInfo {
uint64_t boot_time_ns;
std::string trigger_name;
std::string producer_name;
uid_t producer_uid;
};
std::vector<TriggerInfo> received_triggers;
// The trace config provided by the Consumer when calling
// EnableTracing(), plus any updates performed by ChangeTraceConfig.
TraceConfig config;
// List of data source instances that have been enabled on the various
// producers for this tracing session.
std::multimap<ProducerID, DataSourceInstance> data_source_instances;
// For each Flush(N) request, keeps track of the set of producers for which
// we are still awaiting a NotifyFlushComplete(N) ack.
std::map<FlushRequestID, PendingFlush> pending_flushes;
// For each Clone request, keeps track of the flushes acknowledgement that
// we are still waiting for.
std::map<PendingCloneID, PendingClone> pending_clones;
PendingCloneID last_pending_clone_id_ = 0;
// Maps a per-trace-session buffer index into the corresponding global
// BufferID (shared namespace amongst all consumers). This vector has as
// many entries as |config.buffers_size()|.
std::vector<BufferID> buffers_index;
std::map<std::tuple<MachineID, ProducerID, WriterID>, PacketSequenceID>
packet_sequence_ids;
PacketSequenceID last_packet_sequence_id = kServicePacketSequenceID;
// Whether we should emit the trace stats next time we reach EOF while
// performing ReadBuffers.
bool should_emit_stats = false;
// Whether we should emit the sync marker the next time ReadBuffers() is
// called.
bool should_emit_sync_marker = false;
// Whether we put the initial packets (trace config, system info,
// etc.) into the trace output yet.
bool did_emit_initial_packets = false;
// Whether we emitted clock offsets for relay clients yet.
bool did_emit_remote_clock_sync_ = false;
// Whether we should compress TracePackets after reading them.
bool compress_deflate = false;
// The number of received triggers we've emitted into the trace output.
size_t num_triggers_emitted_into_trace = 0;
// Packets that failed validation of the TrustedPacket.
uint64_t invalid_packets = 0;
// Flush() stats. See comments in trace_stats.proto for more.
uint64_t flushes_requested = 0;
uint64_t flushes_succeeded = 0;
uint64_t flushes_failed = 0;
// Outcome of the final Flush() done by FlushAndDisableTracing().
protos::gen::TraceStats_FinalFlushOutcome final_flush_outcome{};
// Set to true on the first call to MaybeNotifyAllDataSourcesStarted().
bool did_notify_all_data_source_started = false;
// Stores all lifecycle events of a particular type (i.e. associated with a
// single field id in the TracingServiceEvent proto).
struct LifecycleEvent {
LifecycleEvent(uint32_t f_id, uint32_t m_size = 1)
: field_id(f_id), max_size(m_size), timestamps(m_size) {}
// The field id of the event in the TracingServiceEvent proto.
uint32_t field_id;
// Stores the max size of |timestamps|. Set to 1 by default (in
// the constructor) but can be overriden in TraceSession constructor
// if a larger size is required.
uint32_t max_size;
// Stores the timestamps emitted for each event type (in nanoseconds).
// Emitted into the trace and cleared when the consumer next calls
// ReadBuffers.
base::CircularQueue<int64_t> timestamps;
};
std::vector<LifecycleEvent> lifecycle_events;
using ClockSnapshotData = ClockSnapshotVector;
// Initial clock snapshot, captured at trace start time (when state goes to
// TracingSession::STARTED). Emitted into the trace when the consumer first
// calls ReadBuffers().
ClockSnapshotData initial_clock_snapshot;
// Stores clock snapshots to emit into the trace as a ring buffer. This
// buffer is populated both periodically and when lifecycle events happen
// but only when significant clock drift is detected. Emitted into the trace
// and cleared when the consumer next calls ReadBuffers().
base::CircularQueue<ClockSnapshotData> clock_snapshot_ring_buffer;
State state = DISABLED;
// If the consumer detached the session, this variable defines the key used
// for identifying the session later when reattaching.
std::string detach_key;
// This is set when the Consumer calls sets |write_into_file| == true in the
// TraceConfig. In this case this represents the file we should stream the
// trace packets into, rather than returning it to the consumer via
// OnTraceData().
base::ScopedFile write_into_file;
uint32_t write_period_ms = 0;
uint64_t max_file_size_bytes = 0;
uint64_t bytes_written_into_file = 0;
// Periodic task for snapshotting service events (e.g. clocks, sync markers
// etc)
base::PeriodicTask snapshot_periodic_task;
// Deferred task that stops the trace when |duration_ms| expires. This is
// to handle the case of |prefer_suspend_clock_for_duration| which cannot
// use PostDelayedTask.
base::PeriodicTask timed_stop_task;
// When non-NULL the packets should be post-processed using the filter.
std::unique_ptr<protozero::MessageFilter> trace_filter;
uint64_t filter_input_packets = 0;
uint64_t filter_input_bytes = 0;
uint64_t filter_output_bytes = 0;
uint64_t filter_errors = 0;
uint64_t filter_time_taken_ns = 0;
std::vector<uint64_t> filter_bytes_discarded_per_buffer;
// A randomly generated trace identifier. Note that this does NOT always
// match the requested TraceConfig.trace_uuid_msb/lsb. Spcifically, it does
// until a gap-less snapshot is requested. Each snapshot re-generates the
// uuid to avoid emitting two different traces with the same uuid.
base::Uuid trace_uuid;
// NOTE: when adding new fields here consider whether that state should be
// copied over in DoCloneSession() or not. Ask yourself: is this a
// "runtime state" (e.g. active data sources) or a "trace (meta)data state"?
// If the latter, it should be handled by DoCloneSession()).
};
TracingServiceImpl(const TracingServiceImpl&) = delete;
TracingServiceImpl& operator=(const TracingServiceImpl&) = delete;
DataSourceInstance* SetupDataSource(const TraceConfig::DataSource&,
const TraceConfig::ProducerConfig&,
const RegisteredDataSource&,
TracingSession*);
// Returns the next available ProducerID that is not in |producers_|.
ProducerID GetNextProducerID();
// Returns a pointer to the |tracing_sessions_| entry or nullptr if the
// session doesn't exists.
TracingSession* GetTracingSession(TracingSessionID);
// Returns a pointer to the tracing session that has the highest
// TraceConfig.bugreport_score, if any, or nullptr.
TracingSession* FindTracingSessionWithMaxBugreportScore();
// Returns a pointer to the |tracing_sessions_| entry, matching the given
// uid and detach key, or nullptr if no such session exists.
TracingSession* GetDetachedSession(uid_t, const std::string& key);
// Update the memory guard rail by using the latest information from the
// shared memory and trace buffers.
void UpdateMemoryGuardrail();
void StartDataSourceInstance(ProducerEndpointImpl*,
TracingSession*,
DataSourceInstance*);
void StopDataSourceInstance(ProducerEndpointImpl*,
TracingSession*,
DataSourceInstance*,
bool disable_immediately);
void PeriodicSnapshotTask(TracingSessionID);
void MaybeSnapshotClocksIntoRingBuffer(TracingSession*);
bool SnapshotClocks(TracingSession::ClockSnapshotData*);
void SnapshotLifecyleEvent(TracingSession*,
uint32_t field_id,
bool snapshot_clocks);
void EmitClockSnapshot(TracingSession*,
TracingSession::ClockSnapshotData,
std::vector<TracePacket>*);
void EmitSyncMarker(std::vector<TracePacket>*);
void EmitStats(TracingSession*, std::vector<TracePacket>*);
TraceStats GetTraceStats(TracingSession*);
void EmitLifecycleEvents(TracingSession*, std::vector<TracePacket>*);
void EmitUuid(TracingSession*, std::vector<TracePacket>*);
void MaybeEmitTraceConfig(TracingSession*, std::vector<TracePacket>*);
void EmitSystemInfo(std::vector<TracePacket>*);
void MaybeEmitReceivedTriggers(TracingSession*, std::vector<TracePacket>*);
void MaybeEmitRemoteClockSync(TracingSession*, std::vector<TracePacket>*);
void MaybeNotifyAllDataSourcesStarted(TracingSession*);
void OnFlushTimeout(TracingSessionID, FlushRequestID);
void OnDisableTracingTimeout(TracingSessionID);
void DisableTracingNotifyConsumerAndFlushFile(TracingSession*);
void PeriodicFlushTask(TracingSessionID, bool post_next_only);
void CompleteFlush(TracingSessionID tsid,
ConsumerEndpoint::FlushCallback callback,
bool success);
void ScrapeSharedMemoryBuffers(TracingSession*, ProducerEndpointImpl*);
void PeriodicClearIncrementalStateTask(TracingSessionID, bool post_next_only);
TraceBuffer* GetBufferByID(BufferID);
void FlushDataSourceInstances(
TracingSession*,
uint32_t timeout_ms,
std::map<ProducerID, std::vector<DataSourceInstanceID>>,
ConsumerEndpoint::FlushCallback,
FlushFlags);
std::map<ProducerID, std::vector<DataSourceInstanceID>>
GetFlushableDataSourceInstancesForBuffers(TracingSession*,
std::set<BufferID>);
bool DoCloneBuffers(TracingSession*,
std::set<BufferID>,
std::vector<std::unique_ptr<TraceBuffer>>*);
base::Status FinishCloneSession(ConsumerEndpointImpl*,
TracingSessionID,
std::vector<std::unique_ptr<TraceBuffer>>,
bool skip_filter,
bool final_flush_outcome,
base::Uuid*);
void OnFlushDoneForClone(TracingSessionID src_tsid,
PendingCloneID clone_id,
std::set<BufferID> buf_ids,
bool final_flush_outcome);
// Returns true if `*tracing_session` is waiting for a trigger that hasn't
// happened.
static bool IsWaitingForTrigger(TracingSession* tracing_session);
// Reads the buffers from `*tracing_session` and returns them (along with some
// metadata packets).
//
// The function stops when the cumulative size of the return packets exceeds
// `threshold` (so it's not a strict upper bound) and sets `*has_more` to
// true, or when there are no more packets (and sets `*has_more` to false).
std::vector<TracePacket> ReadBuffers(TracingSession* tracing_session,
size_t threshold,
bool* has_more);
// If `*tracing_session` has a filter, applies it to `*packets`. Doesn't
// change the number of `*packets`, only their content.
void MaybeFilterPackets(TracingSession* tracing_session,
std::vector<TracePacket>* packets);
// If `*tracing_session` has compression enabled, compress `*packets`.
void MaybeCompressPackets(TracingSession* tracing_session,
std::vector<TracePacket>* packets);
// If `*tracing_session` is configured to write into a file, writes `packets`
// into the file.
//
// Returns true if the file should be closed (because it's full or there has
// been an error), false otherwise.
bool WriteIntoFile(TracingSession* tracing_session,
std::vector<TracePacket> packets);
void OnStartTriggersTimeout(TracingSessionID tsid);
void MaybeLogUploadEvent(const TraceConfig&,
const base::Uuid&,
PerfettoStatsdAtom atom,
const std::string& trigger_name = "");
void MaybeLogTriggerEvent(const TraceConfig&,
PerfettoTriggerAtom atom,
const std::string& trigger_name);
size_t PurgeExpiredAndCountTriggerInWindow(int64_t now_ns,
uint64_t trigger_name_hash);
static void StopOnDurationMsExpiry(base::WeakPtr<TracingServiceImpl>,
TracingSessionID);
base::TaskRunner* const task_runner_;
const InitOpts init_opts_;
std::unique_ptr<SharedMemory::Factory> shm_factory_;
ProducerID last_producer_id_ = 0;
DataSourceInstanceID last_data_source_instance_id_ = 0;
TracingSessionID last_tracing_session_id_ = 0;
FlushRequestID last_flush_request_id_ = 0;
uid_t uid_ = 0;
// Buffer IDs are global across all consumers (because a Producer can produce
// data for more than one trace session, hence more than one consumer).
IdAllocator<BufferID> buffer_ids_;
std::multimap<std::string /*name*/, RegisteredDataSource> data_sources_;
std::map<ProducerID, ProducerEndpointImpl*> producers_;
std::set<ConsumerEndpointImpl*> consumers_;
std::map<RelayClientID, RelayEndpointImpl*> relay_clients_;
std::map<TracingSessionID, TracingSession> tracing_sessions_;
std::map<BufferID, std::unique_ptr<TraceBuffer>> buffers_;
std::map<std::string, int64_t> session_to_last_trace_s_;
// Contains timestamps of triggers.
// The queue is sorted by timestamp and invocations older than
// |trigger_window_ns_| are purged when a trigger happens.
base::CircularQueue<TriggerHistory> trigger_history_;
bool smb_scraping_enabled_ = false;
bool lockdown_mode_ = false;
uint32_t min_write_period_ms_ = 100; // Overridable for testing.
int64_t trigger_window_ns_ = kOneDayInNs; // Overridable for testing.
std::minstd_rand trigger_probability_rand_;
std::uniform_real_distribution<> trigger_probability_dist_;
double trigger_rnd_override_for_testing_ = 0; // Overridable for testing.
uint8_t sync_marker_packet_[32]; // Lazily initialized.
size_t sync_marker_packet_size_ = 0;
// Stats.
uint64_t chunks_discarded_ = 0;
uint64_t patches_discarded_ = 0;
PERFETTO_THREAD_CHECKER(thread_checker_)
base::WeakPtrFactory<TracingServiceImpl>
weak_ptr_factory_; // Keep at the end.
};
} // namespace perfetto
#endif // SRC_TRACING_SERVICE_TRACING_SERVICE_IMPL_H_