| /* |
| * Copyright (C) 2019 The Android Open Source Project |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "src/profiling/perf/perf_producer.h" |
| |
| #include <optional> |
| #include <random> |
| #include <utility> |
| #include <vector> |
| |
| #include <unistd.h> |
| |
| #include <unwindstack/Error.h> |
| #include <unwindstack/Unwinder.h> |
| |
| #include "perfetto/base/logging.h" |
| #include "perfetto/base/task_runner.h" |
| #include "perfetto/ext/base/file_utils.h" |
| #include "perfetto/ext/base/metatrace.h" |
| #include "perfetto/ext/base/string_utils.h" |
| #include "perfetto/ext/base/utils.h" |
| #include "perfetto/ext/base/weak_ptr.h" |
| #include "perfetto/ext/tracing/core/basic_types.h" |
| #include "perfetto/ext/tracing/core/producer.h" |
| #include "perfetto/ext/tracing/core/tracing_service.h" |
| #include "perfetto/ext/tracing/ipc/producer_ipc_client.h" |
| #include "perfetto/tracing/core/data_source_config.h" |
| #include "perfetto/tracing/core/data_source_descriptor.h" |
| #include "src/profiling/common/callstack_trie.h" |
| #include "src/profiling/common/proc_cmdline.h" |
| #include "src/profiling/common/producer_support.h" |
| #include "src/profiling/common/profiler_guardrails.h" |
| #include "src/profiling/common/unwind_support.h" |
| #include "src/profiling/perf/common_types.h" |
| #include "src/profiling/perf/event_reader.h" |
| |
| #include "protos/perfetto/common/builtin_clock.pbzero.h" |
| #include "protos/perfetto/common/perf_events.gen.h" |
| #include "protos/perfetto/common/perf_events.pbzero.h" |
| #include "protos/perfetto/config/profiling/perf_event_config.gen.h" |
| #include "protos/perfetto/trace/profiling/profile_packet.pbzero.h" |
| #include "protos/perfetto/trace/trace_packet.pbzero.h" |
| #include "protos/perfetto/trace/trace_packet_defaults.pbzero.h" |
| |
| namespace perfetto { |
| namespace profiling { |
| namespace { |
| |
| // TODO(b/151835887): on Android, when using signals, there exists a vulnerable |
| // window between a process image being replaced by execve, and the new |
| // libc instance reinstalling the proper signal handlers. During this window, |
| // the signal disposition is defaulted to terminating the process. |
| // This is a best-effort mitigation from the daemon's side, using a heuristic |
| // that most execve calls follow a fork. So if we get a sample for a very fresh |
| // process, the grace period will give it a chance to get to |
| // a properly initialised state prior to getting signalled. This doesn't help |
| // cases when a mature process calls execve, or when the target gets descheduled |
| // (since this is a naive walltime wait). |
| // The proper fix is in the platform, see bug for progress. |
| constexpr uint32_t kProcDescriptorsAndroidDelayMs = 50; |
| |
| constexpr uint32_t kMemoryLimitCheckPeriodMs = 1000; |
| |
| constexpr uint32_t kInitialConnectionBackoffMs = 100; |
| constexpr uint32_t kMaxConnectionBackoffMs = 30 * 1000; |
| |
| constexpr char kProducerName[] = "perfetto.traced_perf"; |
| constexpr char kDataSourceName[] = "linux.perf"; |
| |
| size_t NumberOfCpus() { |
| return static_cast<size_t>(sysconf(_SC_NPROCESSORS_CONF)); |
| } |
| |
| std::vector<uint32_t> GetOnlineCpus() { |
| size_t cpu_count = NumberOfCpus(); |
| if (cpu_count == 0) { |
| return {}; |
| } |
| |
| static constexpr char kOnlineValue[] = "1\n"; |
| std::vector<uint32_t> online_cpus; |
| online_cpus.reserve(cpu_count); |
| for (uint32_t cpu = 0; cpu < cpu_count; ++cpu) { |
| std::string res; |
| base::StackString<1024> path("/sys/devices/system/cpu/cpu%u/online", cpu); |
| if (!base::ReadFile(path.c_str(), &res)) { |
| // Always consider CPU 0 to be online if the "online" file does not exist |
| // for it. There seem to be several assumptions in the kernel which make |
| // CPU 0 special so this is a pretty safe bet. |
| if (cpu != 0) { |
| return {}; |
| } |
| res = kOnlineValue; |
| } |
| if (res != kOnlineValue) { |
| continue; |
| } |
| online_cpus.push_back(cpu); |
| } |
| return online_cpus; |
| } |
| |
| int32_t ToBuiltinClock(int32_t clockid) { |
| switch (clockid) { |
| case CLOCK_REALTIME: |
| return protos::pbzero::BUILTIN_CLOCK_REALTIME; |
| case CLOCK_MONOTONIC: |
| return protos::pbzero::BUILTIN_CLOCK_MONOTONIC; |
| case CLOCK_MONOTONIC_RAW: |
| return protos::pbzero::BUILTIN_CLOCK_MONOTONIC_RAW; |
| case CLOCK_BOOTTIME: |
| return protos::pbzero::BUILTIN_CLOCK_BOOTTIME; |
| // Should never get invalid input here as otherwise the syscall itself |
| // would've failed earlier. |
| default: |
| return protos::pbzero::BUILTIN_CLOCK_UNKNOWN; |
| } |
| } |
| |
| TraceWriter::TracePacketHandle StartTracePacket(TraceWriter* trace_writer) { |
| auto packet = trace_writer->NewTracePacket(); |
| packet->set_sequence_flags( |
| protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE); |
| return packet; |
| } |
| |
| void WritePerfEventDefaultsPacket(const EventConfig& event_config, |
| TraceWriter* trace_writer) { |
| auto packet = trace_writer->NewTracePacket(); |
| packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count())); |
| packet->set_timestamp_clock_id(protos::pbzero::BUILTIN_CLOCK_BOOTTIME); |
| |
| // start new incremental state generation: |
| packet->set_sequence_flags( |
| protos::pbzero::TracePacket::SEQ_INCREMENTAL_STATE_CLEARED); |
| |
| // default packet timestamp clock for the samples: |
| perf_event_attr* perf_attr = event_config.perf_attr(); |
| auto* defaults = packet->set_trace_packet_defaults(); |
| int32_t builtin_clock = ToBuiltinClock(perf_attr->clockid); |
| defaults->set_timestamp_clock_id(static_cast<uint32_t>(builtin_clock)); |
| |
| auto* perf_defaults = defaults->set_perf_sample_defaults(); |
| auto* timebase_pb = perf_defaults->set_timebase(); |
| |
| // frequency/period: |
| if (perf_attr->freq) { |
| timebase_pb->set_frequency(perf_attr->sample_freq); |
| } else { |
| timebase_pb->set_period(perf_attr->sample_period); |
| } |
| |
| // timebase event: |
| const PerfCounter& timebase = event_config.timebase_event(); |
| switch (timebase.event_type()) { |
| case PerfCounter::Type::kBuiltinCounter: { |
| timebase_pb->set_counter( |
| static_cast<protos::pbzero::PerfEvents::Counter>(timebase.counter)); |
| break; |
| } |
| case PerfCounter::Type::kTracepoint: { |
| auto* tracepoint_pb = timebase_pb->set_tracepoint(); |
| tracepoint_pb->set_name(timebase.tracepoint_name); |
| tracepoint_pb->set_filter(timebase.tracepoint_filter); |
| break; |
| } |
| case PerfCounter::Type::kRawEvent: { |
| auto* raw_pb = timebase_pb->set_raw_event(); |
| raw_pb->set_type(timebase.attr_type); |
| raw_pb->set_config(timebase.attr_config); |
| raw_pb->set_config1(timebase.attr_config1); |
| raw_pb->set_config2(timebase.attr_config2); |
| break; |
| } |
| } |
| |
| // optional name to identify the counter during parsing: |
| if (!timebase.name.empty()) { |
| timebase_pb->set_name(timebase.name); |
| } |
| |
| // follower events: |
| for (const auto& e : event_config.follower_events()) { |
| auto* followers_pb = perf_defaults->add_followers(); |
| followers_pb->set_name(e.name); |
| |
| switch (e.event_type()) { |
| case PerfCounter::Type::kBuiltinCounter: { |
| followers_pb->set_counter( |
| static_cast<protos::pbzero::PerfEvents::Counter>(e.counter)); |
| break; |
| } |
| case PerfCounter::Type::kTracepoint: { |
| auto* tracepoint_pb = followers_pb->set_tracepoint(); |
| tracepoint_pb->set_name(e.tracepoint_name); |
| tracepoint_pb->set_filter(e.tracepoint_filter); |
| break; |
| } |
| case PerfCounter::Type::kRawEvent: { |
| auto* raw_pb = followers_pb->set_raw_event(); |
| raw_pb->set_type(e.attr_type); |
| raw_pb->set_config(e.attr_config); |
| raw_pb->set_config1(e.attr_config1); |
| raw_pb->set_config2(e.attr_config2); |
| break; |
| } |
| } |
| } |
| |
| // Not setting timebase.timestamp_clock since the field that matters during |
| // parsing is the root timestamp_clock_id set above. |
| |
| // Record the random shard we've chosen so that the post-processing can infer |
| // which processes would've been unwound if sampled. In particular this lets |
| // us distinguish between "running but not chosen" and "running and chosen, |
| // but not sampled" cases. |
| const auto& process_sharding = event_config.filter().process_sharding; |
| if (process_sharding.has_value()) { |
| perf_defaults->set_process_shard_count(process_sharding->shard_count); |
| perf_defaults->set_chosen_process_shard(process_sharding->chosen_shard); |
| } |
| } |
| |
| uint32_t TimeToNextReadTickMs(DataSourceInstanceID ds_id, uint32_t period_ms) { |
| // Normally, we'd schedule the next tick at the next |period_ms| |
| // boundary of the boot clock. However, to avoid aligning the read tasks of |
| // all concurrent data sources, we select a deterministic offset based on the |
| // data source id. |
| std::minstd_rand prng(static_cast<std::minstd_rand::result_type>(ds_id)); |
| std::uniform_int_distribution<uint32_t> dist(0, period_ms - 1); |
| uint32_t ds_period_offset = dist(prng); |
| |
| uint64_t now_ms = static_cast<uint64_t>(base::GetWallTimeMs().count()); |
| return period_ms - ((now_ms - ds_period_offset) % period_ms); |
| } |
| |
| protos::pbzero::Profiling::CpuMode ToCpuModeEnum(uint16_t perf_cpu_mode) { |
| using Profiling = protos::pbzero::Profiling; |
| switch (perf_cpu_mode) { |
| case PERF_RECORD_MISC_KERNEL: |
| return Profiling::MODE_KERNEL; |
| case PERF_RECORD_MISC_USER: |
| return Profiling::MODE_USER; |
| case PERF_RECORD_MISC_HYPERVISOR: |
| return Profiling::MODE_HYPERVISOR; |
| case PERF_RECORD_MISC_GUEST_KERNEL: |
| return Profiling::MODE_GUEST_KERNEL; |
| case PERF_RECORD_MISC_GUEST_USER: |
| return Profiling::MODE_GUEST_USER; |
| default: |
| return Profiling::MODE_UNKNOWN; |
| } |
| } |
| |
| protos::pbzero::Profiling::StackUnwindError ToProtoEnum( |
| unwindstack::ErrorCode error_code) { |
| using Profiling = protos::pbzero::Profiling; |
| switch (error_code) { |
| case unwindstack::ERROR_NONE: |
| return Profiling::UNWIND_ERROR_NONE; |
| case unwindstack::ERROR_MEMORY_INVALID: |
| return Profiling::UNWIND_ERROR_MEMORY_INVALID; |
| case unwindstack::ERROR_UNWIND_INFO: |
| return Profiling::UNWIND_ERROR_UNWIND_INFO; |
| case unwindstack::ERROR_UNSUPPORTED: |
| return Profiling::UNWIND_ERROR_UNSUPPORTED; |
| case unwindstack::ERROR_INVALID_MAP: |
| return Profiling::UNWIND_ERROR_INVALID_MAP; |
| case unwindstack::ERROR_MAX_FRAMES_EXCEEDED: |
| return Profiling::UNWIND_ERROR_MAX_FRAMES_EXCEEDED; |
| case unwindstack::ERROR_REPEATED_FRAME: |
| return Profiling::UNWIND_ERROR_REPEATED_FRAME; |
| case unwindstack::ERROR_INVALID_ELF: |
| return Profiling::UNWIND_ERROR_INVALID_ELF; |
| case unwindstack::ERROR_SYSTEM_CALL: |
| return Profiling::UNWIND_ERROR_SYSTEM_CALL; |
| case unwindstack::ERROR_THREAD_TIMEOUT: |
| return Profiling::UNWIND_ERROR_THREAD_TIMEOUT; |
| case unwindstack::ERROR_THREAD_DOES_NOT_EXIST: |
| return Profiling::UNWIND_ERROR_THREAD_DOES_NOT_EXIST; |
| case unwindstack::ERROR_BAD_ARCH: |
| return Profiling::UNWIND_ERROR_BAD_ARCH; |
| case unwindstack::ERROR_MAPS_PARSE: |
| return Profiling::UNWIND_ERROR_MAPS_PARSE; |
| case unwindstack::ERROR_INVALID_PARAMETER: |
| return Profiling::UNWIND_ERROR_INVALID_PARAMETER; |
| case unwindstack::ERROR_PTRACE_CALL: |
| return Profiling::UNWIND_ERROR_PTRACE_CALL; |
| } |
| return Profiling::UNWIND_ERROR_UNKNOWN; |
| } |
| |
| } // namespace |
| |
| // static |
| bool PerfProducer::ShouldRejectDueToFilter( |
| pid_t pid, |
| const TargetFilter& filter, |
| bool skip_cmdline, |
| base::FlatSet<std::string>* additional_cmdlines, |
| std::function<bool(std::string*)> read_proc_pid_cmdline) { |
| PERFETTO_CHECK(additional_cmdlines); |
| |
| std::string cmdline; |
| bool have_cmdline = false; |
| if (!skip_cmdline) |
| have_cmdline = read_proc_pid_cmdline(&cmdline); |
| |
| const char* binname = ""; |
| if (have_cmdline) { |
| binname = glob_aware::FindBinaryName(cmdline.c_str(), cmdline.size()); |
| } |
| |
| auto has_matching_pattern = [](const std::vector<std::string>& patterns, |
| const char* cmd, const char* name) { |
| for (const std::string& pattern : patterns) { |
| if (glob_aware::MatchGlobPattern(pattern.c_str(), cmd, name)) { |
| return true; |
| } |
| } |
| return false; |
| }; |
| |
| if (have_cmdline && |
| has_matching_pattern(filter.exclude_cmdlines, cmdline.c_str(), binname)) { |
| PERFETTO_DLOG("Explicitly rejecting samples for pid [%d] due to cmdline", |
| static_cast<int>(pid)); |
| return true; |
| } |
| if (filter.exclude_pids.count(pid)) { |
| PERFETTO_DLOG("Explicitly rejecting samples for pid [%d] due to pid", |
| static_cast<int>(pid)); |
| return true; |
| } |
| |
| if (have_cmdline && |
| has_matching_pattern(filter.cmdlines, cmdline.c_str(), binname)) { |
| return false; |
| } |
| if (filter.pids.count(pid)) { |
| return false; |
| } |
| |
| // Empty allow filter means keep everything that isn't explicitly excluded. |
| if (filter.cmdlines.empty() && filter.pids.empty() && |
| !filter.additional_cmdline_count && |
| !filter.process_sharding.has_value()) { |
| return false; |
| } |
| |
| // Niche option: process sharding to amortise systemwide unwinding costs. |
| // Selects a subset of all processes by using the low order bits of their pid. |
| if (filter.process_sharding.has_value()) { |
| uint32_t upid = static_cast<uint32_t>(pid); |
| if (upid % filter.process_sharding->shard_count == |
| filter.process_sharding->chosen_shard) { |
| PERFETTO_DLOG("Process sharding: keeping pid [%d]", |
| static_cast<int>(pid)); |
| return false; |
| } else { |
| PERFETTO_DLOG("Process sharding: rejecting pid [%d]", |
| static_cast<int>(pid)); |
| return true; |
| } |
| } |
| |
| // Niche option: additionally remember the first seen N process cmdlines, and |
| // keep all processes with those names. |
| if (have_cmdline) { |
| if (additional_cmdlines->count(cmdline)) { |
| return false; |
| } |
| if (additional_cmdlines->size() < filter.additional_cmdline_count) { |
| additional_cmdlines->insert(cmdline); |
| return false; |
| } |
| } |
| |
| PERFETTO_DLOG("Rejecting samples for pid [%d]", static_cast<int>(pid)); |
| return true; |
| } |
| |
| PerfProducer::PerfProducer(ProcDescriptorGetter* proc_fd_getter, |
| base::TaskRunner* task_runner) |
| : task_runner_(task_runner), |
| proc_fd_getter_(proc_fd_getter), |
| unwinding_worker_(this), |
| weak_factory_(this) { |
| proc_fd_getter->SetDelegate(this); |
| } |
| |
| void PerfProducer::SetupDataSource(DataSourceInstanceID, |
| const DataSourceConfig&) {} |
| |
| void PerfProducer::StartDataSource(DataSourceInstanceID ds_id, |
| const DataSourceConfig& config) { |
| uint64_t tracing_session_id = config.tracing_session_id(); |
| PERFETTO_LOG("StartDataSource(ds %zu, session %" PRIu64 ", name %s)", |
| static_cast<size_t>(ds_id), tracing_session_id, |
| config.name().c_str()); |
| |
| if (config.name() == MetatraceWriter::kDataSourceName) { |
| StartMetatraceSource(ds_id, static_cast<BufferID>(config.target_buffer())); |
| return; |
| } |
| |
| // linux.perf data source |
| if (config.name() != kDataSourceName) |
| return; |
| |
| // Tracepoint name -> id lookup in case the config asks for tracepoints: |
| auto tracepoint_id_lookup = [this](const std::string& group, |
| const std::string& name) { |
| if (!tracefs_) // lazy init or retry |
| tracefs_ = FtraceProcfs::CreateGuessingMountPoint(); |
| if (!tracefs_) // still didn't find an accessible tracefs |
| return 0u; |
| return tracefs_->ReadEventId(group, name); |
| }; |
| |
| protos::gen::PerfEventConfig event_config_pb; |
| if (!event_config_pb.ParseFromString(config.perf_event_config_raw())) { |
| PERFETTO_ELOG("PerfEventConfig could not be parsed."); |
| return; |
| } |
| |
| // Unlikely: handle a callstack sampling option that shares a random decision |
| // between all data sources within a tracing session. Instead of introducing |
| // session-scoped data, we replicate the decision in each per-DS EventConfig. |
| std::optional<ProcessSharding> process_sharding; |
| uint32_t shard_count = |
| event_config_pb.callstack_sampling().scope().process_shard_count(); |
| if (shard_count > 0) { |
| process_sharding = |
| GetOrChooseCallstackProcessShard(tracing_session_id, shard_count); |
| } |
| |
| std::optional<EventConfig> event_config = EventConfig::Create( |
| event_config_pb, config, process_sharding, tracepoint_id_lookup); |
| if (!event_config.has_value()) { |
| PERFETTO_ELOG("PerfEventConfig rejected."); |
| return; |
| } |
| |
| std::vector<uint32_t> online_cpus = GetOnlineCpus(); |
| if (online_cpus.empty()) { |
| PERFETTO_ELOG("No online CPUs found."); |
| return; |
| } |
| |
| std::vector<EventReader> per_cpu_readers; |
| for (uint32_t cpu : online_cpus) { |
| std::optional<EventReader> event_reader = |
| EventReader::ConfigureEvents(cpu, event_config.value()); |
| if (!event_reader.has_value()) { |
| PERFETTO_ELOG("Failed to set up perf events for cpu%" PRIu32 |
| ", discarding data source.", |
| cpu); |
| return; |
| } |
| per_cpu_readers.emplace_back(std::move(event_reader.value())); |
| } |
| |
| auto buffer_id = static_cast<BufferID>(config.target_buffer()); |
| auto writer = endpoint_->CreateTraceWriter(buffer_id); |
| |
| // Construct the data source instance. |
| std::map<DataSourceInstanceID, DataSourceState>::iterator ds_it; |
| bool inserted; |
| std::tie(ds_it, inserted) = data_sources_.emplace( |
| std::piecewise_construct, std::forward_as_tuple(ds_id), |
| std::forward_as_tuple(event_config.value(), tracing_session_id, |
| std::move(writer), std::move(per_cpu_readers))); |
| PERFETTO_CHECK(inserted); |
| DataSourceState& ds = ds_it->second; |
| |
| // Start the configured events. |
| for (auto& per_cpu_reader : ds.per_cpu_readers) { |
| per_cpu_reader.EnableEvents(); |
| } |
| |
| WritePerfEventDefaultsPacket(ds.event_config, ds.trace_writer.get()); |
| |
| InterningOutputTracker::WriteFixedInterningsPacket( |
| ds_it->second.trace_writer.get(), |
| protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE); |
| |
| // Inform unwinder of the new data source instance, and optionally start a |
| // periodic task to clear its cached state. |
| auto unwind_mode = (ds.event_config.unwind_mode() == |
| protos::gen::PerfEventConfig::UNWIND_FRAME_POINTER) |
| ? Unwinder::UnwindMode::kFramePointer |
| : Unwinder::UnwindMode::kUnwindStack; |
| unwinding_worker_->PostStartDataSource(ds_id, ds.event_config.kernel_frames(), |
| unwind_mode); |
| if (ds.event_config.unwind_state_clear_period_ms()) { |
| unwinding_worker_->PostClearCachedStatePeriodic( |
| ds_id, ds.event_config.unwind_state_clear_period_ms()); |
| } |
| |
| // Kick off periodic read task. |
| auto tick_period_ms = ds.event_config.read_tick_period_ms(); |
| auto weak_this = weak_factory_.GetWeakPtr(); |
| task_runner_->PostDelayedTask( |
| [weak_this, ds_id] { |
| if (weak_this) |
| weak_this->TickDataSourceRead(ds_id); |
| }, |
| TimeToNextReadTickMs(ds_id, tick_period_ms)); |
| |
| // Optionally kick off periodic memory footprint limit check. |
| uint32_t max_daemon_memory_kb = event_config_pb.max_daemon_memory_kb(); |
| if (max_daemon_memory_kb > 0) { |
| task_runner_->PostDelayedTask( |
| [weak_this, ds_id, max_daemon_memory_kb] { |
| if (weak_this) |
| weak_this->CheckMemoryFootprintPeriodic(ds_id, |
| max_daemon_memory_kb); |
| }, |
| kMemoryLimitCheckPeriodMs); |
| } |
| } |
| |
| void PerfProducer::CheckMemoryFootprintPeriodic(DataSourceInstanceID ds_id, |
| uint32_t max_daemon_memory_kb) { |
| auto ds_it = data_sources_.find(ds_id); |
| if (ds_it == data_sources_.end()) |
| return; // stop recurring |
| |
| GuardrailConfig gconfig = {}; |
| gconfig.memory_guardrail_kb = max_daemon_memory_kb; |
| |
| ProfilerMemoryGuardrails footprint_snapshot; |
| if (footprint_snapshot.IsOverMemoryThreshold(gconfig)) { |
| PurgeDataSource(ds_id); |
| return; // stop recurring |
| } |
| |
| // repost |
| auto weak_this = weak_factory_.GetWeakPtr(); |
| task_runner_->PostDelayedTask( |
| [weak_this, ds_id, max_daemon_memory_kb] { |
| if (weak_this) |
| weak_this->CheckMemoryFootprintPeriodic(ds_id, max_daemon_memory_kb); |
| }, |
| kMemoryLimitCheckPeriodMs); |
| } |
| |
| void PerfProducer::StopDataSource(DataSourceInstanceID ds_id) { |
| PERFETTO_LOG("StopDataSource(%zu)", static_cast<size_t>(ds_id)); |
| |
| // Metatrace: stop immediately (will miss the events from the |
| // asynchronous shutdown of the primary data source). |
| auto meta_it = metatrace_writers_.find(ds_id); |
| if (meta_it != metatrace_writers_.end()) { |
| meta_it->second.WriteAllAndFlushTraceWriter([] {}); |
| metatrace_writers_.erase(meta_it); |
| return; |
| } |
| |
| auto ds_it = data_sources_.find(ds_id); |
| if (ds_it == data_sources_.end()) { |
| // Most likely, the source is missing due to an abrupt stop (via |
| // |PurgeDataSource|). Tell the service that we've stopped the source now, |
| // so that it doesn't wait for the ack until the timeout. |
| endpoint_->NotifyDataSourceStopped(ds_id); |
| return; |
| } |
| |
| // Start shutting down the reading frontend, which will propagate the stop |
| // further as the intermediate buffers are cleared. |
| DataSourceState& ds = ds_it->second; |
| InitiateReaderStop(&ds); |
| } |
| |
| // The perf data sources ignore flush requests, as flushing would be |
| // unnecessarily complicated given out-of-order unwinding and proc-fd timeouts. |
| // Instead of responding to explicit flushes, we can ensure that we're otherwise |
| // well-behaved (do not reorder packets too much), and let the service scrape |
| // the SMB. |
| void PerfProducer::Flush(FlushRequestID flush_id, |
| const DataSourceInstanceID* data_source_ids, |
| size_t num_data_sources, |
| FlushFlags) { |
| // Flush metatracing if requested. |
| for (size_t i = 0; i < num_data_sources; i++) { |
| auto ds_id = data_source_ids[i]; |
| PERFETTO_DLOG("Flush(%zu)", static_cast<size_t>(ds_id)); |
| |
| auto meta_it = metatrace_writers_.find(ds_id); |
| if (meta_it != metatrace_writers_.end()) { |
| meta_it->second.WriteAllAndFlushTraceWriter([] {}); |
| } |
| } |
| |
| endpoint_->NotifyFlushComplete(flush_id); |
| } |
| |
| void PerfProducer::ClearIncrementalState( |
| const DataSourceInstanceID* data_source_ids, |
| size_t num_data_sources) { |
| for (size_t i = 0; i < num_data_sources; i++) { |
| auto ds_id = data_source_ids[i]; |
| PERFETTO_DLOG("ClearIncrementalState(%zu)", static_cast<size_t>(ds_id)); |
| |
| if (metatrace_writers_.find(ds_id) != metatrace_writers_.end()) |
| continue; |
| |
| auto ds_it = data_sources_.find(ds_id); |
| if (ds_it == data_sources_.end()) { |
| PERFETTO_DLOG("ClearIncrementalState(%zu): did not find matching entry", |
| static_cast<size_t>(ds_id)); |
| continue; |
| } |
| DataSourceState& ds = ds_it->second; |
| |
| WritePerfEventDefaultsPacket(ds.event_config, ds.trace_writer.get()); |
| |
| // Forget which incremental state we've emitted before. |
| ds.interning_output.ClearHistory(); |
| InterningOutputTracker::WriteFixedInterningsPacket( |
| ds.trace_writer.get(), |
| protos::pbzero::TracePacket::SEQ_NEEDS_INCREMENTAL_STATE); |
| |
| // Drop the cross-datasource callstack interning trie. This is not |
| // necessary for correctness (the preceding step is sufficient). However, |
| // incremental clearing is likely to be used in ring buffer traces, where |
| // it makes sense to reset the trie's size periodically, and this is a |
| // reasonable point to do so. The trie keeps the monotonic interning IDs, |
| // so there is no confusion for other concurrent data sources. We do not |
| // bother with clearing concurrent sources' interning output trackers as |
| // their footprint should be trivial. |
| callstack_trie_.ClearTrie(); |
| } |
| } |
| |
| void PerfProducer::TickDataSourceRead(DataSourceInstanceID ds_id) { |
| auto it = data_sources_.find(ds_id); |
| if (it == data_sources_.end()) { |
| PERFETTO_DLOG("TickDataSourceRead(%zu): source gone", |
| static_cast<size_t>(ds_id)); |
| return; |
| } |
| DataSourceState& ds = it->second; |
| |
| PERFETTO_METATRACE_SCOPED(TAG_PRODUCER, PROFILER_READ_TICK); |
| |
| // Make a pass over all per-cpu readers. |
| uint64_t max_samples = ds.event_config.samples_per_tick_limit(); |
| bool more_records_available = false; |
| for (EventReader& reader : ds.per_cpu_readers) { |
| if (ReadAndParsePerCpuBuffer(&reader, max_samples, ds_id, &ds)) { |
| more_records_available = true; |
| } |
| } |
| |
| // Wake up the unwinder as we've (likely) pushed samples into its queue. |
| unwinding_worker_->PostProcessQueue(); |
| |
| if (PERFETTO_UNLIKELY(ds.status == DataSourceState::Status::kShuttingDown) && |
| !more_records_available) { |
| unwinding_worker_->PostInitiateDataSourceStop(ds_id); |
| } else { |
| // otherwise, keep reading |
| auto tick_period_ms = it->second.event_config.read_tick_period_ms(); |
| auto weak_this = weak_factory_.GetWeakPtr(); |
| task_runner_->PostDelayedTask( |
| [weak_this, ds_id] { |
| if (weak_this) |
| weak_this->TickDataSourceRead(ds_id); |
| }, |
| TimeToNextReadTickMs(ds_id, tick_period_ms)); |
| } |
| } |
| |
| bool PerfProducer::ReadAndParsePerCpuBuffer(EventReader* reader, |
| uint64_t max_samples, |
| DataSourceInstanceID ds_id, |
| DataSourceState* ds) { |
| PERFETTO_METATRACE_SCOPED(TAG_PRODUCER, PROFILER_READ_CPU); |
| |
| // If the kernel ring buffer dropped data, record it in the trace. |
| size_t cpu = reader->cpu(); |
| auto records_lost_callback = [this, ds_id, cpu](uint64_t records_lost) { |
| auto weak_this = weak_factory_.GetWeakPtr(); |
| task_runner_->PostTask([weak_this, ds_id, cpu, records_lost] { |
| if (weak_this) |
| weak_this->EmitRingBufferLoss(ds_id, cpu, records_lost); |
| }); |
| }; |
| |
| for (uint64_t i = 0; i < max_samples; i++) { |
| std::optional<ParsedSample> sample = |
| reader->ReadUntilSample(records_lost_callback); |
| if (!sample) { |
| return false; // caught up to the writer |
| } |
| |
| // Counter-only mode: skip the unwinding stage, serialise the sample |
| // immediately. |
| const EventConfig& event_config = ds->event_config; |
| if (!event_config.sample_callstacks()) { |
| CompletedSample output; |
| output.common = sample->common; |
| EmitSample(ds_id, std::move(output)); |
| continue; |
| } |
| |
| // Sampling either or both of userspace and kernel callstacks. |
| pid_t pid = sample->common.pid; |
| auto& process_state = ds->process_states[pid]; // insert if new |
| |
| // Asynchronous proc-fd lookup timed out. |
| if (process_state == ProcessTrackingStatus::kFdsTimedOut) { |
| PERFETTO_DLOG("Skipping sample for pid [%d]: kFdsTimedOut", |
| static_cast<int>(pid)); |
| EmitSkippedSample(ds_id, std::move(sample.value()), |
| SampleSkipReason::kReadStage); |
| continue; |
| } |
| |
| // Previously excluded, e.g. due to failing the target filter check. |
| if (process_state == ProcessTrackingStatus::kRejected) { |
| PERFETTO_DLOG("Skipping sample for pid [%d]: kRejected", |
| static_cast<int>(pid)); |
| continue; |
| } |
| |
| // Seeing pid for the first time. We need to consider whether the process |
| // is a kernel thread, and which callstacks we're recording. |
| // |
| // {user} stacks -> user processes: signal for proc-fd lookup |
| // -> kthreads: reject |
| // |
| // {kernel} stacks -> user processes: accept without proc-fds |
| // -> kthreads: accept without proc-fds |
| // |
| // {kernel+user} stacks -> user processes: signal for proc-fd lookup |
| // -> kthreads: accept without proc-fds |
| // |
| if (process_state == ProcessTrackingStatus::kInitial) { |
| PERFETTO_DLOG("New pid: [%d]", static_cast<int>(pid)); |
| |
| // Kernel threads (which have no userspace state) are never relevant if |
| // we're not recording kernel callchains. |
| bool is_kthread = !sample->regs; // no userspace regs |
| if (is_kthread && !event_config.kernel_frames()) { |
| process_state = ProcessTrackingStatus::kRejected; |
| continue; |
| } |
| |
| // Check whether samples for this new process should be dropped due to |
| // the target filtering. Kernel threads don't have a cmdline, but we |
| // still check against pid inclusion/exclusion. |
| if (ShouldRejectDueToFilter( |
| pid, event_config.filter(), is_kthread, &ds->additional_cmdlines, |
| [pid](std::string* cmdline) { |
| return glob_aware::ReadProcCmdlineForPID(pid, cmdline); |
| })) { |
| process_state = ProcessTrackingStatus::kRejected; |
| continue; |
| } |
| |
| // At this point, sampled process is known to be of interest. |
| if (!is_kthread && event_config.user_frames()) { |
| // Start resolving the proc-fds. Response is async. |
| process_state = ProcessTrackingStatus::kFdsResolving; |
| InitiateDescriptorLookup(ds_id, pid, |
| event_config.remote_descriptor_timeout_ms()); |
| // note: fallthrough |
| } else { |
| // Either a kernel thread (no need to obtain proc-fds), or a userspace |
| // process but we're not recording userspace callstacks. |
| process_state = ProcessTrackingStatus::kAccepted; |
| unwinding_worker_->PostRecordNoUserspaceProcess(ds_id, pid); |
| // note: fallthrough |
| } |
| } |
| |
| PERFETTO_CHECK(process_state == ProcessTrackingStatus::kAccepted || |
| process_state == ProcessTrackingStatus::kFdsResolving); |
| |
| // If we're only interested in the kernel callchains, then userspace |
| // process samples are relevant only if they were sampled during kernel |
| // context. |
| if (!event_config.user_frames() && |
| sample->common.cpu_mode == PERF_RECORD_MISC_USER) { |
| PERFETTO_DLOG("Skipping usermode sample for kernel-only config"); |
| continue; |
| } |
| |
| // Optionally: drop sample if above a given threshold of sampled stacks |
| // that are waiting in the unwinding queue. |
| uint64_t max_footprint_bytes = event_config.max_enqueued_footprint_bytes(); |
| uint64_t sample_stack_size = sample->stack.size(); |
| if (max_footprint_bytes) { |
| uint64_t footprint_bytes = unwinding_worker_->GetEnqueuedFootprint(); |
| if (footprint_bytes + sample_stack_size >= max_footprint_bytes) { |
| PERFETTO_DLOG("Skipping sample enqueueing due to footprint limit."); |
| EmitSkippedSample(ds_id, std::move(sample.value()), |
| SampleSkipReason::kUnwindEnqueue); |
| continue; |
| } |
| } |
| |
| // Push the sample into the unwinding queue if there is room. |
| auto& queue = unwinding_worker_->unwind_queue(); |
| WriteView write_view = queue.BeginWrite(); |
| if (write_view.valid) { |
| queue.at(write_view.write_pos) = |
| UnwindEntry{ds_id, std::move(sample.value())}; |
| queue.CommitWrite(); |
| unwinding_worker_->IncrementEnqueuedFootprint(sample_stack_size); |
| } else { |
| PERFETTO_DLOG("Unwinder queue full, skipping sample"); |
| EmitSkippedSample(ds_id, std::move(sample.value()), |
| SampleSkipReason::kUnwindEnqueue); |
| } |
| } // for (i < max_samples) |
| |
| // Most likely more events in the kernel buffer. Though we might be exactly on |
| // the boundary due to |max_samples|. |
| return true; |
| } |
| |
| // Note: first-fit makes descriptor request fulfillment not true FIFO. But the |
| // edge-cases where it matters are very unlikely. |
| void PerfProducer::OnProcDescriptors(pid_t pid, |
| uid_t uid, |
| base::ScopedFile maps_fd, |
| base::ScopedFile mem_fd) { |
| // Find first-fit data source that requested descriptors for the process. |
| for (auto& it : data_sources_) { |
| DataSourceState& ds = it.second; |
| auto proc_status_it = ds.process_states.find(pid); |
| if (proc_status_it == ds.process_states.end()) |
| continue; |
| |
| // TODO(rsavitski): consider checking ProcessTrackingStatus before |
| // CanProfile. |
| if (!CanProfile(ds.event_config.raw_ds_config(), uid, |
| ds.event_config.target_installed_by())) { |
| PERFETTO_DLOG("Not profileable: pid [%d], uid [%d] for DS [%zu]", |
| static_cast<int>(pid), static_cast<int>(uid), |
| static_cast<size_t>(it.first)); |
| continue; |
| } |
| |
| // Match against either resolving, or expired state. In the latter |
| // case, it means that the async response was slow enough that we've marked |
| // the lookup as expired (but can now recover for future samples). |
| auto proc_status = proc_status_it->second; |
| if (proc_status == ProcessTrackingStatus::kFdsResolving || |
| proc_status == ProcessTrackingStatus::kFdsTimedOut) { |
| PERFETTO_DLOG("Handing off proc-fds for pid [%d] to DS [%zu]", |
| static_cast<int>(pid), static_cast<size_t>(it.first)); |
| |
| proc_status_it->second = ProcessTrackingStatus::kAccepted; |
| unwinding_worker_->PostAdoptProcDescriptors( |
| it.first, pid, std::move(maps_fd), std::move(mem_fd)); |
| return; // done |
| } |
| } |
| PERFETTO_DLOG( |
| "Discarding proc-fds for pid [%d] as found no outstanding requests.", |
| static_cast<int>(pid)); |
| } |
| |
| void PerfProducer::InitiateDescriptorLookup(DataSourceInstanceID ds_id, |
| pid_t pid, |
| uint32_t timeout_ms) { |
| if (!proc_fd_getter_->RequiresDelayedRequest()) { |
| StartDescriptorLookup(ds_id, pid, timeout_ms); |
| return; |
| } |
| |
| // Delay lookups on Android. See comment on |kProcDescriptorsAndroidDelayMs|. |
| auto weak_this = weak_factory_.GetWeakPtr(); |
| task_runner_->PostDelayedTask( |
| [weak_this, ds_id, pid, timeout_ms] { |
| if (weak_this) |
| weak_this->StartDescriptorLookup(ds_id, pid, timeout_ms); |
| }, |
| kProcDescriptorsAndroidDelayMs); |
| } |
| |
| void PerfProducer::StartDescriptorLookup(DataSourceInstanceID ds_id, |
| pid_t pid, |
| uint32_t timeout_ms) { |
| proc_fd_getter_->GetDescriptorsForPid(pid); |
| |
| auto weak_this = weak_factory_.GetWeakPtr(); |
| task_runner_->PostDelayedTask( |
| [weak_this, ds_id, pid] { |
| if (weak_this) |
| weak_this->EvaluateDescriptorLookupTimeout(ds_id, pid); |
| }, |
| timeout_ms); |
| } |
| |
| void PerfProducer::EvaluateDescriptorLookupTimeout(DataSourceInstanceID ds_id, |
| pid_t pid) { |
| auto ds_it = data_sources_.find(ds_id); |
| if (ds_it == data_sources_.end()) |
| return; |
| |
| DataSourceState& ds = ds_it->second; |
| auto proc_status_it = ds.process_states.find(pid); |
| if (proc_status_it == ds.process_states.end()) |
| return; |
| |
| // If the request is still outstanding, mark the process as expired (causing |
| // outstanding and future samples to be discarded). |
| auto proc_status = proc_status_it->second; |
| if (proc_status == ProcessTrackingStatus::kFdsResolving) { |
| PERFETTO_DLOG("Descriptor lookup timeout of pid [%d] for DS [%zu]", |
| static_cast<int>(pid), static_cast<size_t>(ds_it->first)); |
| |
| proc_status_it->second = ProcessTrackingStatus::kFdsTimedOut; |
| // Also inform the unwinder of the state change (so that it can discard any |
| // of the already-enqueued samples). |
| unwinding_worker_->PostRecordTimedOutProcDescriptors(ds_id, pid); |
| } |
| } |
| |
| void PerfProducer::PostEmitSample(DataSourceInstanceID ds_id, |
| CompletedSample sample) { |
| // hack: c++11 lambdas can't be moved into, so stash the sample on the heap. |
| CompletedSample* raw_sample = new CompletedSample(std::move(sample)); |
| auto weak_this = weak_factory_.GetWeakPtr(); |
| task_runner_->PostTask([weak_this, ds_id, raw_sample] { |
| if (weak_this) |
| weak_this->EmitSample(ds_id, std::move(*raw_sample)); |
| delete raw_sample; |
| }); |
| } |
| |
| void PerfProducer::EmitSample(DataSourceInstanceID ds_id, |
| CompletedSample sample) { |
| auto ds_it = data_sources_.find(ds_id); |
| if (ds_it == data_sources_.end()) { |
| PERFETTO_DLOG("EmitSample(ds: %zu): source gone", |
| static_cast<size_t>(ds_id)); |
| return; |
| } |
| DataSourceState& ds = ds_it->second; |
| |
| // intern callsite |
| GlobalCallstackTrie::Node* callstack_root = |
| callstack_trie_.CreateCallsite(sample.frames, sample.build_ids); |
| uint64_t callstack_iid = callstack_root->id(); |
| |
| // start packet, timestamp domain defaults to monotonic_raw |
| auto packet = StartTracePacket(ds.trace_writer.get()); |
| packet->set_timestamp(sample.common.timestamp); |
| |
| // write new interning data (if any) |
| protos::pbzero::InternedData* interned_out = packet->set_interned_data(); |
| ds.interning_output.WriteCallstack(callstack_root, &callstack_trie_, |
| interned_out); |
| |
| // write the sample itself |
| auto* perf_sample = packet->set_perf_sample(); |
| perf_sample->set_cpu(sample.common.cpu); |
| perf_sample->set_pid(static_cast<uint32_t>(sample.common.pid)); |
| perf_sample->set_tid(static_cast<uint32_t>(sample.common.tid)); |
| perf_sample->set_cpu_mode(ToCpuModeEnum(sample.common.cpu_mode)); |
| perf_sample->set_timebase_count(sample.common.timebase_count); |
| |
| for (size_t i = 0; i < sample.common.follower_counts.size(); ++i) { |
| perf_sample->add_follower_counts(sample.common.follower_counts[i]); |
| } |
| |
| perf_sample->set_callstack_iid(callstack_iid); |
| if (sample.unwind_error != unwindstack::ERROR_NONE) { |
| perf_sample->set_unwind_error(ToProtoEnum(sample.unwind_error)); |
| } |
| } |
| |
| void PerfProducer::EmitRingBufferLoss(DataSourceInstanceID ds_id, |
| size_t cpu, |
| uint64_t records_lost) { |
| auto ds_it = data_sources_.find(ds_id); |
| if (ds_it == data_sources_.end()) |
| return; |
| DataSourceState& ds = ds_it->second; |
| PERFETTO_DLOG("DataSource(%zu): cpu%zu lost [%" PRIu64 "] records", |
| static_cast<size_t>(ds_id), cpu, records_lost); |
| |
| // The data loss record relates to a single ring buffer, and indicates loss |
| // since the last successfully-written record in that buffer. Therefore the |
| // data loss record itself has no timestamp. |
| // We timestamp the packet with the boot clock for packet ordering purposes, |
| // but it no longer has a (precise) interpretation relative to the sample |
| // stream from that per-cpu buffer. See the proto comments for more details. |
| auto packet = StartTracePacket(ds.trace_writer.get()); |
| packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count())); |
| packet->set_timestamp_clock_id( |
| protos::pbzero::BuiltinClock::BUILTIN_CLOCK_BOOTTIME); |
| |
| auto* perf_sample = packet->set_perf_sample(); |
| perf_sample->set_cpu(static_cast<uint32_t>(cpu)); |
| perf_sample->set_kernel_records_lost(records_lost); |
| } |
| |
| void PerfProducer::PostEmitUnwinderSkippedSample(DataSourceInstanceID ds_id, |
| ParsedSample sample) { |
| PostEmitSkippedSample(ds_id, std::move(sample), |
| SampleSkipReason::kUnwindStage); |
| } |
| |
| void PerfProducer::PostEmitSkippedSample(DataSourceInstanceID ds_id, |
| ParsedSample sample, |
| SampleSkipReason reason) { |
| // hack: c++11 lambdas can't be moved into, so stash the sample on the heap. |
| ParsedSample* raw_sample = new ParsedSample(std::move(sample)); |
| auto weak_this = weak_factory_.GetWeakPtr(); |
| task_runner_->PostTask([weak_this, ds_id, raw_sample, reason] { |
| if (weak_this) |
| weak_this->EmitSkippedSample(ds_id, std::move(*raw_sample), reason); |
| delete raw_sample; |
| }); |
| } |
| |
| void PerfProducer::EmitSkippedSample(DataSourceInstanceID ds_id, |
| ParsedSample sample, |
| SampleSkipReason reason) { |
| auto ds_it = data_sources_.find(ds_id); |
| if (ds_it == data_sources_.end()) |
| return; |
| DataSourceState& ds = ds_it->second; |
| |
| // Note: timestamp defaults to the monotonic_raw domain. |
| auto packet = StartTracePacket(ds.trace_writer.get()); |
| packet->set_timestamp(sample.common.timestamp); |
| auto* perf_sample = packet->set_perf_sample(); |
| perf_sample->set_cpu(sample.common.cpu); |
| perf_sample->set_pid(static_cast<uint32_t>(sample.common.pid)); |
| perf_sample->set_tid(static_cast<uint32_t>(sample.common.tid)); |
| perf_sample->set_cpu_mode(ToCpuModeEnum(sample.common.cpu_mode)); |
| perf_sample->set_timebase_count(sample.common.timebase_count); |
| |
| for (size_t i = 0; i < sample.common.follower_counts.size(); ++i) { |
| perf_sample->add_follower_counts(sample.common.follower_counts[i]); |
| } |
| |
| using PerfSample = protos::pbzero::PerfSample; |
| switch (reason) { |
| case SampleSkipReason::kReadStage: |
| perf_sample->set_sample_skipped_reason( |
| PerfSample::PROFILER_SKIP_READ_STAGE); |
| break; |
| case SampleSkipReason::kUnwindEnqueue: |
| perf_sample->set_sample_skipped_reason( |
| PerfSample::PROFILER_SKIP_UNWIND_ENQUEUE); |
| break; |
| case SampleSkipReason::kUnwindStage: |
| perf_sample->set_sample_skipped_reason( |
| PerfSample::PROFILER_SKIP_UNWIND_STAGE); |
| break; |
| } |
| } |
| |
| void PerfProducer::InitiateReaderStop(DataSourceState* ds) { |
| PERFETTO_DLOG("InitiateReaderStop"); |
| PERFETTO_CHECK(ds->status != DataSourceState::Status::kShuttingDown); |
| |
| ds->status = DataSourceState::Status::kShuttingDown; |
| for (auto& event_reader : ds->per_cpu_readers) { |
| event_reader.DisableEvents(); |
| } |
| } |
| |
| void PerfProducer::PostFinishDataSourceStop(DataSourceInstanceID ds_id) { |
| auto weak_producer = weak_factory_.GetWeakPtr(); |
| task_runner_->PostTask([weak_producer, ds_id] { |
| if (weak_producer) |
| weak_producer->FinishDataSourceStop(ds_id); |
| }); |
| } |
| |
| void PerfProducer::FinishDataSourceStop(DataSourceInstanceID ds_id) { |
| PERFETTO_LOG("FinishDataSourceStop(%zu)", static_cast<size_t>(ds_id)); |
| auto ds_it = data_sources_.find(ds_id); |
| if (ds_it == data_sources_.end()) { |
| PERFETTO_DLOG("FinishDataSourceStop(%zu): source gone", |
| static_cast<size_t>(ds_id)); |
| return; |
| } |
| DataSourceState& ds = ds_it->second; |
| PERFETTO_CHECK(ds.status == DataSourceState::Status::kShuttingDown); |
| |
| ds.trace_writer->Flush(); |
| data_sources_.erase(ds_it); |
| |
| endpoint_->NotifyDataSourceStopped(ds_id); |
| |
| // Clean up resources if there are no more active sources. |
| if (data_sources_.empty()) { |
| callstack_trie_.ClearTrie(); // purge internings |
| base::MaybeReleaseAllocatorMemToOS(); |
| } |
| } |
| |
| // TODO(rsavitski): maybe make the tracing service respect premature |
| // producer-driven stops, and then issue a NotifyDataSourceStopped here. |
| // Alternatively (and at the expense of higher complexity) introduce a new data |
| // source status of "tombstoned", and propagate it until the source is stopped |
| // by the service (this would technically allow for stricter lifetime checking |
| // of data sources, and help with discarding periodic flushes). |
| // TODO(rsavitski): Purging while stopping will currently leave the stop |
| // unacknowledged. Consider checking whether the DS is stopping here, and if so, |
| // notifying immediately after erasing. |
| void PerfProducer::PurgeDataSource(DataSourceInstanceID ds_id) { |
| auto ds_it = data_sources_.find(ds_id); |
| if (ds_it == data_sources_.end()) |
| return; |
| DataSourceState& ds = ds_it->second; |
| |
| PERFETTO_LOG("Stopping DataSource(%zu) prematurely", |
| static_cast<size_t>(ds_id)); |
| |
| unwinding_worker_->PostPurgeDataSource(ds_id); |
| |
| // Write a packet indicating the abrupt stop. |
| { |
| auto packet = StartTracePacket(ds.trace_writer.get()); |
| packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count())); |
| packet->set_timestamp_clock_id( |
| protos::pbzero::BuiltinClock::BUILTIN_CLOCK_BOOTTIME); |
| auto* perf_sample = packet->set_perf_sample(); |
| auto* producer_event = perf_sample->set_producer_event(); |
| producer_event->set_source_stop_reason( |
| protos::pbzero::PerfSample::ProducerEvent::PROFILER_STOP_GUARDRAIL); |
| } |
| |
| ds.trace_writer->Flush(); |
| data_sources_.erase(ds_it); |
| |
| // Clean up resources if there are no more active sources. |
| if (data_sources_.empty()) { |
| callstack_trie_.ClearTrie(); // purge internings |
| base::MaybeReleaseAllocatorMemToOS(); |
| } |
| } |
| |
| // Either: |
| // * choose a random number up to |shard_count|. |
| // * reuse a choice made previously by a data source within this tracing |
| // session. The config option requires that all data sources within one config |
| // have the same shard count. |
| std::optional<ProcessSharding> PerfProducer::GetOrChooseCallstackProcessShard( |
| uint64_t tracing_session_id, |
| uint32_t shard_count) { |
| for (auto& it : data_sources_) { |
| const DataSourceState& ds = it.second; |
| const auto& sharding = ds.event_config.filter().process_sharding; |
| if ((ds.tracing_session_id != tracing_session_id) || !sharding.has_value()) |
| continue; |
| |
| // Found existing data source, reuse its decision while doing best-effort |
| // error reporting (logging) if the shard count is not the same. |
| if (sharding->shard_count != shard_count) { |
| PERFETTO_ELOG( |
| "Mismatch of process_shard_count between data sources in tracing " |
| "session %" PRIu64 ". Overriding shard count to match.", |
| tracing_session_id); |
| } |
| return sharding; |
| } |
| |
| // First data source in this session, choose random shard. |
| std::random_device r; |
| std::minstd_rand minstd(r()); |
| std::uniform_int_distribution<uint32_t> dist(0, shard_count - 1); |
| uint32_t chosen_shard = dist(minstd); |
| |
| ProcessSharding ret; |
| ret.shard_count = shard_count; |
| ret.chosen_shard = chosen_shard; |
| |
| PERFETTO_DCHECK(ret.shard_count && ret.chosen_shard < ret.shard_count); |
| return ret; |
| } |
| |
| void PerfProducer::StartMetatraceSource(DataSourceInstanceID ds_id, |
| BufferID target_buffer) { |
| auto writer = endpoint_->CreateTraceWriter(target_buffer); |
| |
| auto it_and_inserted = metatrace_writers_.emplace( |
| std::piecewise_construct, std::make_tuple(ds_id), std::make_tuple()); |
| PERFETTO_DCHECK(it_and_inserted.second); |
| // Note: only the first concurrent writer will actually be active. |
| metatrace_writers_[ds_id].Enable(task_runner_, std::move(writer), |
| metatrace::TAG_ANY); |
| } |
| |
| void PerfProducer::ConnectWithRetries(const char* socket_name) { |
| PERFETTO_DCHECK(state_ == kNotStarted); |
| state_ = kNotConnected; |
| |
| ResetConnectionBackoff(); |
| producer_socket_name_ = socket_name; |
| ConnectService(); |
| } |
| |
| void PerfProducer::ConnectService() { |
| PERFETTO_DCHECK(state_ == kNotConnected); |
| state_ = kConnecting; |
| endpoint_ = ProducerIPCClient::Connect( |
| producer_socket_name_, this, kProducerName, task_runner_, |
| TracingService::ProducerSMBScrapingMode::kEnabled); |
| } |
| |
| void PerfProducer::IncreaseConnectionBackoff() { |
| connection_backoff_ms_ *= 2; |
| if (connection_backoff_ms_ > kMaxConnectionBackoffMs) |
| connection_backoff_ms_ = kMaxConnectionBackoffMs; |
| } |
| |
| void PerfProducer::ResetConnectionBackoff() { |
| connection_backoff_ms_ = kInitialConnectionBackoffMs; |
| } |
| |
| void PerfProducer::OnConnect() { |
| PERFETTO_DCHECK(state_ == kConnecting); |
| state_ = kConnected; |
| ResetConnectionBackoff(); |
| PERFETTO_LOG("Connected to the service"); |
| |
| { |
| // linux.perf |
| DataSourceDescriptor desc; |
| desc.set_name(kDataSourceName); |
| desc.set_handles_incremental_state_clear(true); |
| desc.set_will_notify_on_stop(true); |
| endpoint_->RegisterDataSource(desc); |
| } |
| { |
| // metatrace |
| DataSourceDescriptor desc; |
| desc.set_name(MetatraceWriter::kDataSourceName); |
| endpoint_->RegisterDataSource(desc); |
| } |
| // Used by tracebox to synchronize with traced_probes being registered. |
| if (all_data_sources_registered_cb_) { |
| endpoint_->Sync(all_data_sources_registered_cb_); |
| } |
| } |
| |
| void PerfProducer::OnDisconnect() { |
| PERFETTO_DCHECK(state_ == kConnected || state_ == kConnecting); |
| PERFETTO_LOG("Disconnected from tracing service"); |
| |
| auto weak_producer = weak_factory_.GetWeakPtr(); |
| if (state_ == kConnected) |
| return task_runner_->PostTask([weak_producer] { |
| if (weak_producer) |
| weak_producer->Restart(); |
| }); |
| |
| state_ = kNotConnected; |
| IncreaseConnectionBackoff(); |
| task_runner_->PostDelayedTask( |
| [weak_producer] { |
| if (weak_producer) |
| weak_producer->ConnectService(); |
| }, |
| connection_backoff_ms_); |
| } |
| |
| void PerfProducer::Restart() { |
| // We lost the connection with the tracing service. At this point we need |
| // to reset all the data sources. Trying to handle that manually is going to |
| // be error prone. What we do here is simply destroy the instance and |
| // recreate it again. |
| base::TaskRunner* task_runner = task_runner_; |
| const char* socket_name = producer_socket_name_; |
| ProcDescriptorGetter* proc_fd_getter = proc_fd_getter_; |
| |
| // Invoke destructor and then the constructor again. |
| this->~PerfProducer(); |
| new (this) PerfProducer(proc_fd_getter, task_runner); |
| |
| ConnectWithRetries(socket_name); |
| } |
| |
| } // namespace profiling |
| } // namespace perfetto |