traced_perf: move unwinding to a dedicated thread
This splits the single-threaded traced_perf into two threads:
* a primary thread that does the initial kernel buffer reading, as well
as the final interning/serialization, and IPC.
* an unwinder thread that sits in-between in terms of the dataflow.
The reasoning is that unwinding is very long-tailed, and we don't want
to starve the kernel buffer reading/IPC functions of the producer while
sitting in a 1s-long unwind on Android.
The unwinder uses a ring queue for the input samples (and is woken up by
the primary thread after it pushes a batch of samples). Once a sample is
unwound, it's posted directly back to the main thread (might need to
consider batching here as well if we don't want many staggered wakeups).
Note on the unwinding queue: this approach is enqueueing parsed samples
(with complex types like the unique_ptr and vector). An alternative
considered was to make the queue entries have their original kernel
format (so a direct memcpy from the kernel ring buffer). I can see a
variety of pros and cons for both approaches (which I won't summarize
here), but ultimately decided on keeping the early parsing into a
"complex" type, and dealing with just that type post-EventReader.
The pid-tracking is done primarily on the primary thread, but a subset
(pretty much ready vs expired) of updates is replicated to the Unwinder
(which acts as a listener, without pushing any updates of its own).
I considered two approach for the unwinding queues: a single shared
queue (as posted), and per-DataSource queues that would be created by
the primary thread, and adopted by the unwinder. There's an argument
that separate queues would be more fair when there are concurrent data
sources, and the load is too high. On the other hand, we will still
ultimately want a process-wide cap on the amount of inflight samples, so
a single queue shortcuts to that (at the expense of some fairness).
UnwindingHandle is a temporary copy-paste. I'm hoping to get rid of it
within a week (but it's a separate conversation on
base::ThreadTaskRunner API that I don't want blocking this patch).
Unwindstack caching is removed temporarily (since during reconnects, we
might be moving unwinding between threads while recreating the
Unwinder), will fix in a follow-up.
Note to reviewer: I'm not very confident about most of the file/class
naming choices. Please criticize the inconsistencies without reservation.
Bug: 144281346
Change-Id: I4f59d1b4d52cf589fbe60e78ad4c1ee0b9994c0a
diff --git a/Android.bp b/Android.bp
index 6f56224..40057d4 100644
--- a/Android.bp
+++ b/Android.bp
@@ -5926,6 +5926,11 @@
],
}
+// GN: //src/profiling/perf:common_types
+filegroup {
+ name: "perfetto_src_profiling_perf_common_types",
+}
+
// GN: //src/profiling/perf:proc_descriptors
filegroup {
name: "perfetto_src_profiling_perf_proc_descriptors",
@@ -5971,6 +5976,9 @@
// GN: //src/profiling/perf:unwinding
filegroup {
name: "perfetto_src_profiling_perf_unwinding",
+ srcs: [
+ "src/profiling/perf/unwinding.cc",
+ ],
}
// GN: //src/profiling/symbolizer:symbolize_database
@@ -7239,6 +7247,7 @@
":perfetto_src_profiling_memory_scoped_spinlock",
":perfetto_src_profiling_memory_unittests",
":perfetto_src_profiling_memory_wire_protocol",
+ ":perfetto_src_profiling_perf_common_types",
":perfetto_src_profiling_perf_proc_descriptors",
":perfetto_src_profiling_perf_producer",
":perfetto_src_profiling_perf_producer_unittests",
@@ -7749,10 +7758,12 @@
":perfetto_src_profiling_common_interning_output",
":perfetto_src_profiling_common_proc_utils",
":perfetto_src_profiling_common_unwind_support",
+ ":perfetto_src_profiling_perf_common_types",
":perfetto_src_profiling_perf_proc_descriptors",
":perfetto_src_profiling_perf_producer",
":perfetto_src_profiling_perf_regs_parsing",
":perfetto_src_profiling_perf_traced_perf_main",
+ ":perfetto_src_profiling_perf_unwinding",
":perfetto_src_protozero_protozero",
":perfetto_src_tracing_common",
":perfetto_src_tracing_core_core",
diff --git a/src/profiling/common/unwind_support.cc b/src/profiling/common/unwind_support.cc
index cb791bc..ebe965f 100644
--- a/src/profiling/common/unwind_support.cc
+++ b/src/profiling/common/unwind_support.cc
@@ -29,7 +29,7 @@
StackOverlayMemory::StackOverlayMemory(std::shared_ptr<unwindstack::Memory> mem,
uint64_t sp,
- uint8_t* stack,
+ const uint8_t* stack,
size_t size)
: mem_(std::move(mem)), sp_(sp), stack_end_(sp + size), stack_(stack) {}
diff --git a/src/profiling/common/unwind_support.h b/src/profiling/common/unwind_support.h
index 732531c..76a2652 100644
--- a/src/profiling/common/unwind_support.h
+++ b/src/profiling/common/unwind_support.h
@@ -89,15 +89,15 @@
public:
StackOverlayMemory(std::shared_ptr<unwindstack::Memory> mem,
uint64_t sp,
- uint8_t* stack,
+ const uint8_t* stack,
size_t size);
size_t Read(uint64_t addr, void* dst, size_t size) override;
private:
std::shared_ptr<unwindstack::Memory> mem_;
- uint64_t sp_;
- uint64_t stack_end_;
- uint8_t* stack_;
+ const uint64_t sp_;
+ const uint64_t stack_end_;
+ const uint8_t* const stack_;
};
struct UnwindingMetadata {
diff --git a/src/profiling/perf/BUILD.gn b/src/profiling/perf/BUILD.gn
index dcf57d7..3425945 100644
--- a/src/profiling/perf/BUILD.gn
+++ b/src/profiling/perf/BUILD.gn
@@ -42,7 +42,9 @@
source_set("producer") {
public_deps = [
+ ":common_types",
":regs_parsing",
+ ":unwinding",
"../../../include/perfetto/tracing/core",
"../../../src/tracing/core:service", # for metatrace
]
@@ -61,7 +63,6 @@
"../common:interner",
"../common:interning_output",
"../common:proc_utils",
- "../common:unwind_support",
]
sources = [
"event_config.h",
@@ -72,6 +73,34 @@
]
}
+source_set("common_types") {
+ public_deps = [ "../../../gn:libunwindstack" ]
+ deps = [
+ "../../../gn:default_deps",
+ "../common:unwind_support",
+ ]
+ sources = [ "common_types.h" ]
+}
+
+source_set("unwinding") {
+ public_deps = [
+ "../../../gn:libunwindstack",
+ "../../../src/tracing/core:service", # for metatrace
+ ]
+ deps = [
+ ":common_types",
+ "../../../gn:default_deps",
+ "../../../include/perfetto/ext/tracing/core",
+ "../../../src/base",
+ "../common:unwind_support",
+ ]
+ sources = [
+ "unwind_queue.h",
+ "unwinding.cc",
+ "unwinding.h",
+ ]
+}
+
source_set("regs_parsing") {
public_deps = [ "../../../gn:libunwindstack" ]
deps = [
@@ -96,14 +125,6 @@
]
}
-source_set("unwinding") {
- deps = [
- "../../../gn:default_deps",
- "../../../src/base",
- ]
- sources = [ "unwind_queue.h" ]
-}
-
source_set("producer_unittests") {
testonly = true
deps = [
diff --git a/src/profiling/perf/common_types.h b/src/profiling/perf/common_types.h
new file mode 100644
index 0000000..9ed0f41
--- /dev/null
+++ b/src/profiling/perf/common_types.h
@@ -0,0 +1,87 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_PROFILING_PERF_COMMON_TYPES_H_
+#define SRC_PROFILING_PERF_COMMON_TYPES_H_
+
+#include <memory>
+#include <vector>
+
+#include <linux/perf_event.h>
+#include <stdint.h>
+
+#include <unwindstack/Error.h>
+#include <unwindstack/Regs.h>
+
+#include "src/profiling/common/unwind_support.h"
+
+namespace perfetto {
+namespace profiling {
+
+// A parsed perf sample record (PERF_RECORD_SAMPLE from the kernel buffer).
+// Self-contained, used as as input to the callstack unwinding.
+struct ParsedSample {
+ uint16_t cpu_mode = PERF_RECORD_MISC_CPUMODE_UNKNOWN;
+ uint32_t cpu = 0;
+ pid_t pid = 0;
+ pid_t tid = 0;
+ uint64_t timestamp = 0;
+ std::unique_ptr<unwindstack::Regs> regs;
+ std::vector<char> stack;
+};
+
+// Entry in an unwinding queue. Either a sample that requires unwinding, or a
+// tombstoned entry (valid == false).
+struct UnwindEntry {
+ UnwindEntry() = default; // for initial unwinding queue entries' state
+
+ UnwindEntry(uint64_t _data_source_id, ParsedSample _sample)
+ : valid(true),
+ data_source_id(_data_source_id),
+ sample(std::move(_sample)) {}
+
+ bool valid = false;
+ uint64_t data_source_id = 0;
+ ParsedSample sample;
+};
+
+// Fully processed sample that is ready for output.
+struct CompletedSample {
+ // move-only
+ CompletedSample() = default;
+ CompletedSample(const CompletedSample&) = delete;
+ CompletedSample& operator=(const CompletedSample&) = delete;
+ CompletedSample(CompletedSample&&) = default;
+ CompletedSample& operator=(CompletedSample&&) = default;
+
+ uint16_t cpu_mode = PERF_RECORD_MISC_CPUMODE_UNKNOWN;
+ uint32_t cpu = 0;
+ pid_t pid = 0;
+ pid_t tid = 0;
+ uint64_t timestamp = 0;
+ std::vector<FrameData> frames;
+ unwindstack::ErrorCode unwind_error = unwindstack::ERROR_NONE;
+};
+
+enum class ProfilerStage {
+ kRead = 0,
+ kUnwind,
+};
+
+} // namespace profiling
+} // namespace perfetto
+
+#endif // SRC_PROFILING_PERF_COMMON_TYPES_H_
diff --git a/src/profiling/perf/event_reader.cc b/src/profiling/perf/event_reader.cc
index 90edfe1..701991e 100644
--- a/src/profiling/perf/event_reader.cc
+++ b/src/profiling/perf/event_reader.cc
@@ -249,10 +249,6 @@
return base::nullopt; // caught up with the writer
auto* event_hdr = reinterpret_cast<const perf_event_header*>(event);
- PERFETTO_DLOG("record header: [%zu][%zu][%zu]",
- static_cast<size_t>(event_hdr->type),
- static_cast<size_t>(event_hdr->misc),
- static_cast<size_t>(event_hdr->size));
if (event_hdr->type == PERF_RECORD_SAMPLE) {
ParsedSample sample = ParseSampleRecord(cpu_, event);
@@ -333,7 +329,6 @@
if (event_attr_.sample_type & PERF_SAMPLE_STACK_USER) {
uint64_t max_stack_size; // the requested size
parse_pos = ReadValue(&max_stack_size, parse_pos);
- PERFETTO_DLOG("max_stack_size: %" PRIu64 "", max_stack_size);
const char* stack_start = parse_pos;
parse_pos += max_stack_size; // skip to dyn_size
@@ -343,7 +338,8 @@
if (max_stack_size > 0) {
uint64_t filled_stack_size;
parse_pos = ReadValue(&filled_stack_size, parse_pos);
- PERFETTO_DLOG("filled_stack_size: %" PRIu64 "", filled_stack_size);
+ PERFETTO_DLOG("sampled stack size: %" PRIu64 " / %" PRIu64 "",
+ filled_stack_size, max_stack_size);
// copy stack bytes into a vector
size_t payload_sz = static_cast<size_t>(filled_stack_size);
diff --git a/src/profiling/perf/event_reader.h b/src/profiling/perf/event_reader.h
index fe67a4e..a72e674 100644
--- a/src/profiling/perf/event_reader.h
+++ b/src/profiling/perf/event_reader.h
@@ -25,6 +25,7 @@
#include "perfetto/ext/base/optional.h"
#include "perfetto/ext/base/scoped_file.h"
#include "perfetto/ext/tracing/core/basic_types.h"
+#include "src/profiling/perf/common_types.h"
#include "src/profiling/perf/event_config.h"
namespace perfetto {
@@ -68,16 +69,6 @@
alignas(uint64_t) char reconstructed_record_[kMaxPerfRecordSize];
};
-struct ParsedSample {
- uint32_t cpu = 0;
- pid_t pid = 0;
- pid_t tid = 0;
- uint64_t timestamp = 0;
- uint16_t cpu_mode = PERF_RECORD_MISC_CPUMODE_UNKNOWN;
- std::unique_ptr<unwindstack::Regs> regs;
- std::vector<char> stack;
-};
-
class EventReader {
public:
// Allow base::Optional<EventReader> without making the constructor public.
diff --git a/src/profiling/perf/perf_producer.cc b/src/profiling/perf/perf_producer.cc
index d8b1208..21b64ab 100644
--- a/src/profiling/perf/perf_producer.cc
+++ b/src/profiling/perf/perf_producer.cc
@@ -36,6 +36,7 @@
#include "src/profiling/common/callstack_trie.h"
#include "src/profiling/common/proc_utils.h"
#include "src/profiling/common/unwind_support.h"
+#include "src/profiling/perf/common_types.h"
#include "src/profiling/perf/event_reader.h"
#include "protos/perfetto/config/profiling/perf_event_config.pbzero.h"
@@ -46,18 +47,13 @@
namespace profiling {
namespace {
-// TODO(rsavitski): for low sampling rates, look into epoll to detect samples.
constexpr uint32_t kReadTickPeriodMs = 200;
-constexpr uint32_t kUnwindTickPeriodMs = 200;
// TODO(rsavitski): this is better calculated (at setup) from the buffer and
// sample sizes.
-constexpr size_t kMaxSamplesPerCpuPerReadTick = 32;
-// TODO(rsavitski): consider making this part of the config (for slow testing
-// platforms).
+constexpr size_t kMaxSamplesPerCpuPerReadTick = 64;
+// TODO(rsavitski): make this part of the config (for slow test platforms).
constexpr uint32_t kProcDescriptorTimeoutMs = 400;
-constexpr size_t kUnwindingMaxFrames = 1000;
-
constexpr uint32_t kInitialConnectionBackoffMs = 100;
constexpr uint32_t kMaxConnectionBackoffMs = 30 * 1000;
@@ -147,15 +143,9 @@
base::TaskRunner* task_runner)
: task_runner_(task_runner),
proc_fd_getter_(proc_fd_getter),
+ unwinding_worker_(this),
weak_factory_(this) {
proc_fd_getter->SetDelegate(this);
-
- // Enable the static unwinding cache, clearing it first in case we're
- // reconstructing the class in |Restart|.
- // TODO(rsavitski): the toggling needs to be done on the same thread as
- // unwinding (right now this is on the same primary thread).
- unwindstack::Elf::SetCachingEnabled(false);
- unwindstack::Elf::SetCachingEnabled(true);
}
// TODO(rsavitski): consider configure at setup + enable at start instead.
@@ -206,7 +196,7 @@
auto writer = endpoint_->CreateTraceWriter(buffer_id);
// Construct the data source instance.
- std::map<DataSourceInstanceID, DataSource>::iterator ds_it;
+ std::map<DataSourceInstanceID, DataSourceState>::iterator ds_it;
bool inserted;
std::tie(ds_it, inserted) = data_sources_.emplace(
std::piecewise_construct, std::forward_as_tuple(instance_id),
@@ -218,6 +208,9 @@
InterningOutputTracker::WriteFixedInterningsPacket(
ds_it->second.trace_writer.get());
+ // Inform unwinder of the new data source instance.
+ unwinding_worker_->PostStartDataSource(instance_id);
+
// Kick off periodic read task.
auto weak_this = weak_factory_.GetWeakPtr();
task_runner_->PostDelayedTask(
@@ -226,15 +219,6 @@
weak_this->TickDataSourceRead(instance_id);
},
kReadTickPeriodMs - (NowMs() % kReadTickPeriodMs));
-
- // Set up unwind queue and kick off a periodic task to process it.
- unwind_queues_.emplace(instance_id, std::deque<UnwindEntry>{});
- task_runner_->PostDelayedTask(
- [weak_this, instance_id] {
- if (weak_this)
- weak_this->TickDataSourceUnwind(instance_id);
- },
- kUnwindTickPeriodMs - (NowMs() % kUnwindTickPeriodMs));
}
void PerfProducer::StopDataSource(DataSourceInstanceID instance_id) {
@@ -255,7 +239,7 @@
// Start shutting down the reading frontend, which will propagate the stop
// further as the intermediate buffers are cleared.
- DataSource& ds = ds_it->second;
+ DataSourceState& ds = ds_it->second;
InitiateReaderStop(&ds);
}
@@ -292,7 +276,7 @@
static_cast<size_t>(ds_id));
return;
}
- DataSource& ds = it->second;
+ DataSourceState& ds = it->second;
PERFETTO_METATRACE_SCOPED(TAG_PRODUCER, PROFILER_READ_TICK);
@@ -305,8 +289,12 @@
}
}
- if (PERFETTO_UNLIKELY(ds.reader_stopping) && !more_records_available) {
- InitiateUnwindStop(&ds);
+ // Wake up the unwinder as we've (likely) pushed samples into its queue.
+ unwinding_worker_->PostProcessQueue();
+
+ if (PERFETTO_UNLIKELY(ds.status == DataSourceState::Status::kShuttingDown) &&
+ !more_records_available) {
+ unwinding_worker_->PostInitiateDataSourceStop(ds_id);
} else {
// otherwise, keep reading
auto weak_this = weak_factory_.GetWeakPtr();
@@ -322,8 +310,7 @@
bool PerfProducer::ReadAndParsePerCpuBuffer(EventReader* reader,
size_t max_samples,
DataSourceInstanceID ds_id,
- DataSource* ds) {
- using Status = DataSource::ProcDescriptors::Status;
+ DataSourceState* ds) {
PERFETTO_METATRACE_SCOPED(TAG_PRODUCER, PROFILER_READ_CPU);
// If the kernel ring buffer dropped data, record it in the trace.
@@ -349,28 +336,9 @@
// Request proc-fds for the process if this is the first time we see it.
pid_t pid = sample->pid;
- auto& fd_entry = ds->proc_fds[pid]; // created if absent
+ auto& process_state = ds->process_states[pid]; // insert if new
- // Seeing pid for the first time.
- if (fd_entry.status == Status::kInitial) {
- PERFETTO_DLOG("New pid: [%d]", static_cast<int>(pid));
-
- // Check whether samples for this new process should be
- // dropped due to the target whitelist/blacklist.
- const TargetFilter& filter = ds->event_cfg.filter();
- if (ShouldRejectDueToFilter(pid, filter)) {
- fd_entry.status = Status::kRejected;
- continue;
- }
-
- // At this point, sampled process is known to be of interest, so start
- // resolving the proc-fds.
- fd_entry.status = Status::kResolving;
- proc_fd_getter_->GetDescriptorsForPid(pid); // response is async
- PostDescriptorLookupTimeout(ds_id, pid, kProcDescriptorTimeoutMs);
- }
-
- if (fd_entry.status == Status::kExpired) {
+ if (process_state == ProcessTrackingStatus::kExpired) {
PERFETTO_DLOG("Skipping sample for previously expired pid [%d]",
static_cast<int>(pid));
PostEmitSkippedSample(ds_id, ProfilerStage::kRead,
@@ -379,42 +347,70 @@
}
// Previously failed the target filter check.
- if (fd_entry.status == Status::kRejected) {
+ if (process_state == ProcessTrackingStatus::kRejected) {
PERFETTO_DLOG("Skipping sample for pid [%d] due to target filter",
static_cast<int>(pid));
continue;
}
- // Push the sample into a dedicated unwinding queue.
- unwind_queues_[ds_id].emplace_back(std::move(sample.value()));
+ // Seeing pid for the first time.
+ if (process_state == ProcessTrackingStatus::kInitial) {
+ PERFETTO_DLOG("New pid: [%d]", static_cast<int>(pid));
- // Metatrace: counter sensible only when there's a single active source.
- PERFETTO_METATRACE_COUNTER(TAG_PRODUCER, PROFILER_UNWIND_QUEUE_SZ,
- unwind_queues_[ds_id].size());
+ // Check whether samples for this new process should be
+ // dropped due to the target whitelist/blacklist.
+ const TargetFilter& filter = ds->event_cfg.filter();
+ if (ShouldRejectDueToFilter(pid, filter)) {
+ process_state = ProcessTrackingStatus::kRejected;
+ continue;
+ }
+
+ // At this point, sampled process is known to be of interest, so start
+ // resolving the proc-fds.
+ process_state = ProcessTrackingStatus::kResolving;
+ proc_fd_getter_->GetDescriptorsForPid(pid); // response is async
+ PostDescriptorLookupTimeout(ds_id, pid, kProcDescriptorTimeoutMs);
+ }
+
+ PERFETTO_CHECK(process_state == ProcessTrackingStatus::kResolved ||
+ process_state == ProcessTrackingStatus::kResolving);
+
+ // Push the sample into the unwinding queue if there is room.
+ // TODO(rsavitski): this can silently drop entries. We should either record
+ // them (e.g. using |PostEmitSkippedSample|), or rearchitect the kernel
+ // buffer reading s.t. we can retain the blocked samples (putting
+ // back-pressure on the kernel ring buffer instead).
+ auto& queue = unwinding_worker_->unwind_queue();
+ WriteView write_view = queue.BeginWrite();
+ if (write_view.valid) {
+ queue.at(write_view.write_pos) =
+ UnwindEntry{ds_id, std::move(sample.value())};
+ queue.CommitWrite();
+ }
}
- // Most likely more events in the buffer. Though we might be exactly on the
- // boundary due to |max_samples|.
+ // Most likely more events in the kernel buffer. Though we might be exactly on
+ // the boundary due to |max_samples|.
return true;
}
-// TODO(rsavitski): first-fit makes descriptor request fulfillment not true
-// FIFO.
+// Note: first-fit makes descriptor request fulfillment not true FIFO. But the
+// edge-cases where it matters are very unlikely.
void PerfProducer::OnProcDescriptors(pid_t pid,
base::ScopedFile maps_fd,
base::ScopedFile mem_fd) {
- using Status = DataSource::ProcDescriptors::Status;
// Find first fit data source that is waiting on descriptors for the process.
for (auto& it : data_sources_) {
- DataSource& ds = it.second;
- auto proc_fd_it = ds.proc_fds.find(pid);
- if (proc_fd_it != ds.proc_fds.end() &&
- proc_fd_it->second.status == Status::kResolving) {
- proc_fd_it->second.status = Status::kResolved;
- proc_fd_it->second.unwind_state =
- UnwindingMetadata{std::move(maps_fd), std::move(mem_fd)};
- PERFETTO_DLOG("Handed off proc-fds for pid [%d] to DS [%zu]",
+ DataSourceState& ds = it.second;
+ auto proc_status_it = ds.process_states.find(pid);
+ if (proc_status_it != ds.process_states.end() &&
+ proc_status_it->second == ProcessTrackingStatus::kResolving) {
+ PERFETTO_DLOG("Handing off proc-fds for pid [%d] to DS [%zu]",
static_cast<int>(pid), static_cast<size_t>(it.first));
+
+ proc_status_it->second = ProcessTrackingStatus::kResolved;
+ unwinding_worker_->PostAdoptProcDescriptors(
+ it.first, pid, std::move(maps_fd), std::move(mem_fd));
return; // done
}
}
@@ -437,201 +433,25 @@
void PerfProducer::DescriptorLookupTimeout(DataSourceInstanceID ds_id,
pid_t pid) {
- using Status = DataSource::ProcDescriptors::Status;
auto ds_it = data_sources_.find(ds_id);
if (ds_it == data_sources_.end())
return;
// If the request is still outstanding, poison the pid for this source.
- DataSource& ds = ds_it->second;
- auto proc_fd_it = ds.proc_fds.find(pid);
- if (proc_fd_it != ds.proc_fds.end() &&
- proc_fd_it->second.status == Status::kResolving) {
- proc_fd_it->second.status = Status::kExpired;
+ DataSourceState& ds = ds_it->second;
+ auto proc_status_it = ds.process_states.find(pid);
+ if (proc_status_it != ds.process_states.end() &&
+ proc_status_it->second == ProcessTrackingStatus::kResolving) {
PERFETTO_DLOG("Descriptor lookup timeout of pid [%d] for DS [%zu]",
static_cast<int>(pid), static_cast<size_t>(ds_it->first));
+
+ proc_status_it->second = ProcessTrackingStatus::kExpired;
+ // Also inform the unwinder of the state change (so that it can discard any
+ // of the already-enqueued samples).
+ unwinding_worker_->PostRecordTimedOutProcDescriptors(ds_id, pid);
}
}
-void PerfProducer::TickDataSourceUnwind(DataSourceInstanceID ds_id) {
- auto ds_it = data_sources_.find(ds_id);
- if (ds_it == data_sources_.end()) {
- PERFETTO_DLOG("TickDataSourceUnwind(%zu): source gone",
- static_cast<size_t>(ds_id));
- return;
- }
- auto unwind_it = unwind_queues_.find(ds_id);
- PERFETTO_CHECK(unwind_it != unwind_queues_.end());
-
- PERFETTO_METATRACE_SCOPED(TAG_PRODUCER, PROFILER_UNWIND_TICK);
-
- bool queue_active =
- ProcessUnwindQueue(ds_id, &unwind_it->second, &ds_it->second);
-
- auto weak_this = weak_factory_.GetWeakPtr();
- if (!queue_active) {
- // Done with unwindings, push the source teardown to the end of the task
- // queue (to still process enqueued sampled).
- // TODO(rsavitski): under a dedicated unwinder thread, teardown of unwinding
- // state will happen here.
- task_runner_->PostTask([weak_this, ds_id] {
- if (weak_this)
- weak_this->FinishDataSourceStop(ds_id);
- });
- } else {
- // Otherwise, keep unwinding.
- task_runner_->PostDelayedTask(
- [weak_this, ds_id] {
- if (weak_this)
- weak_this->TickDataSourceUnwind(ds_id);
- },
- kUnwindTickPeriodMs - (NowMs() % kUnwindTickPeriodMs));
- }
-}
-
-// TODO(rsavitski): if we want to put a bound on the queue size (not as a
-// function of proc-fd timeout), then the reader could purge kResolving entries
-// from the start beyond that threshold.
-// TODO(rsavitski): DataSource input won't be needed once fd-tracking in the
-// unwinder is separated from fd-tracking in the reading frontend.
-bool PerfProducer::ProcessUnwindQueue(DataSourceInstanceID ds_id,
- std::deque<UnwindEntry>* input_queue,
- DataSource* ds_ptr) {
- using Status = DataSource::ProcDescriptors::Status;
- auto& queue = *input_queue;
- auto& ds = *ds_ptr;
-
- // Iterate over the queue, handling unwindable samples, and then marking them
- // as processed.
- size_t num_samples = queue.size();
- for (size_t i = 0; i < num_samples; i++) {
- UnwindEntry& entry = queue[i];
- if (!entry.valid)
- continue; // already processed
-
- ParsedSample& sample = entry.sample;
- auto proc_fd_it = ds.proc_fds.find(sample.pid);
- PERFETTO_CHECK(proc_fd_it != ds.proc_fds.end()); // must be present
-
- auto fd_status = proc_fd_it->second.status;
- PERFETTO_CHECK(fd_status != Status::kInitial);
- PERFETTO_CHECK(fd_status != Status::kRejected);
-
- // Giving up on the sample (proc-fd lookup timed out).
- if (fd_status == Status::kExpired) {
- PERFETTO_DLOG("Skipping sample for pid [%d]",
- static_cast<int>(sample.pid));
- PostEmitSkippedSample(ds_id, ProfilerStage::kUnwind,
- std::move(entry.sample));
- entry.valid = false;
- continue;
- }
-
- // Still waiting on the proc-fds.
- if (fd_status == Status::kResolving) {
- PERFETTO_DLOG("Still resolving sample for pid [%d]",
- static_cast<int>(sample.pid));
- continue;
- }
-
- // Sample ready - process it.
- if (fd_status == Status::kResolved) {
- PERFETTO_METATRACE_SCOPED(TAG_PRODUCER, PROFILER_UNWIND_SAMPLE);
-
- PerfProducer::CompletedSample unwound_sample =
- UnwindSample(std::move(sample), &proc_fd_it->second);
-
- PostEmitSample(ds_id, std::move(unwound_sample));
- entry.valid = false;
- continue;
- }
- }
-
- // Pop all leading processed entries.
- for (size_t i = 0; i < num_samples; i++) {
- PERFETTO_DCHECK(queue.size() > 0);
- if (queue.front().valid)
- break;
- queue.pop_front();
- }
-
- // Metatrace: counter sensible only when there's a single active source.
- PERFETTO_METATRACE_COUNTER(TAG_PRODUCER, PROFILER_UNWIND_QUEUE_SZ,
- queue.size());
- PERFETTO_DLOG("Unwind queue drain: [%zu]->[%zu]", num_samples, queue.size());
-
- // Return whether we're done with unwindings for this source.
- if (PERFETTO_UNLIKELY(ds.unwind_stopping) && queue.empty()) {
- return false;
- }
- return true;
-}
-
-PerfProducer::CompletedSample PerfProducer::UnwindSample(
- ParsedSample sample,
- DataSource::ProcDescriptors* process_state) {
- PerfProducer::CompletedSample ret;
- ret.cpu = sample.cpu;
- ret.pid = sample.pid;
- ret.tid = sample.tid;
- ret.timestamp = sample.timestamp;
- ret.cpu_mode = sample.cpu_mode;
-
- auto& unwind_state = process_state->unwind_state;
-
- // Overlay the stack bytes over /proc/<pid>/mem.
- std::shared_ptr<unwindstack::Memory> overlay_memory =
- std::make_shared<StackOverlayMemory>(
- unwind_state.fd_mem, sample.regs->sp(),
- reinterpret_cast<uint8_t*>(sample.stack.data()), sample.stack.size());
-
- // Unwindstack clobbers registers, so make a copy in case we need to retry.
- auto working_regs = std::unique_ptr<unwindstack::Regs>{sample.regs->Clone()};
-
- unwindstack::ErrorCode error_code = unwindstack::ERROR_NONE;
- unwindstack::Unwinder unwinder(kUnwindingMaxFrames, &unwind_state.fd_maps,
- working_regs.get(), overlay_memory);
-
- for (int attempt = 0; attempt < 2; attempt++) {
-#if PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD)
- unwinder.SetJitDebug(unwind_state.jit_debug.get(), working_regs->Arch());
- unwinder.SetDexFiles(unwind_state.dex_files.get(), working_regs->Arch());
-#endif
- unwinder.Unwind(/*initial_map_names_to_skip=*/nullptr,
- /*map_suffixes_to_ignore=*/nullptr);
- error_code = unwinder.LastErrorCode();
- if (error_code != unwindstack::ERROR_INVALID_MAP)
- break;
-
- // Otherwise, reparse the maps, and possibly retry the unwind.
- PERFETTO_DLOG("Reparsing maps");
- unwind_state.ReparseMaps();
- }
-
- PERFETTO_DLOG("Frames from unwindstack:");
- std::vector<unwindstack::FrameData> frames = unwinder.ConsumeFrames();
- for (unwindstack::FrameData& frame : frames) {
- if (PERFETTO_DLOG_IS_ON())
- PERFETTO_DLOG("%s", unwinder.FormatFrame(frame).c_str());
-
- ret.frames.emplace_back(unwind_state.AnnotateFrame(std::move(frame)));
- }
-
- // In case of an unwinding error, add a synthetic error frame (which will
- // appear as a caller of the partially-unwound fragment), for easier
- // visualization of errors.
- if (error_code != unwindstack::ERROR_NONE) {
- PERFETTO_DLOG("Unwinding error %" PRIu8, error_code);
- unwindstack::FrameData frame_data{};
- frame_data.function_name = "ERROR " + std::to_string(error_code);
- frame_data.map_name = "ERROR";
- ret.frames.emplace_back(std::move(frame_data), /*build_id=*/"");
- ret.unwind_error = error_code;
- }
-
- return ret;
-}
-
void PerfProducer::PostEmitSample(DataSourceInstanceID ds_id,
CompletedSample sample) {
// hack: c++11 lambdas can't be moved into, so stash the sample on the heap.
@@ -647,11 +467,8 @@
void PerfProducer::EmitSample(DataSourceInstanceID ds_id,
CompletedSample sample) {
auto ds_it = data_sources_.find(ds_id);
- if (ds_it == data_sources_.end()) {
- PERFETTO_DLOG("EmitSample(%zu): source gone", static_cast<size_t>(ds_id));
- return;
- }
- DataSource& ds = ds_it->second;
+ PERFETTO_CHECK(ds_it != data_sources_.end());
+ DataSourceState& ds = ds_it->second;
// intern callsite
GlobalCallstackTrie::Node* callstack_root =
@@ -683,12 +500,8 @@
size_t cpu,
uint64_t records_lost) {
auto ds_it = data_sources_.find(ds_id);
- if (ds_it == data_sources_.end()) {
- PERFETTO_DLOG("EmitRingBufferLoss(%zu): source gone",
- static_cast<size_t>(ds_id));
- return;
- }
- DataSource& ds = ds_it->second;
+ PERFETTO_CHECK(ds_it != data_sources_.end());
+ DataSourceState& ds = ds_it->second;
PERFETTO_DLOG("DataSource(%zu): cpu%zu lost [%" PRIu64 "] records",
static_cast<size_t>(ds_id), cpu, records_lost);
@@ -723,12 +536,8 @@
ProfilerStage stage,
ParsedSample sample) {
auto ds_it = data_sources_.find(ds_id);
- if (ds_it == data_sources_.end()) {
- PERFETTO_DLOG("EmitSkippedSample(%zu): source gone",
- static_cast<size_t>(ds_id));
- return;
- }
- DataSource& ds = ds_it->second;
+ PERFETTO_CHECK(ds_it != data_sources_.end());
+ DataSourceState& ds = ds_it->second;
auto packet = ds.trace_writer->NewTracePacket();
packet->set_timestamp(sample.timestamp);
@@ -749,42 +558,39 @@
}
}
-void PerfProducer::InitiateReaderStop(DataSource* ds) {
+void PerfProducer::InitiateReaderStop(DataSourceState* ds) {
PERFETTO_DLOG("InitiateReaderStop");
- ds->reader_stopping = true;
+ PERFETTO_CHECK(ds->status != DataSourceState::Status::kShuttingDown);
+
+ ds->status = DataSourceState::Status::kShuttingDown;
for (auto& event_reader : ds->per_cpu_readers) {
event_reader.PauseEvents();
}
}
-void PerfProducer::InitiateUnwindStop(DataSource* ds) {
- PERFETTO_DLOG("InitiateUnwindStop");
- PERFETTO_CHECK(ds->reader_stopping);
- ds->unwind_stopping = true;
+void PerfProducer::PostFinishDataSourceStop(DataSourceInstanceID ds_id) {
+ auto weak_producer = weak_factory_.GetWeakPtr();
+ task_runner_->PostTask([weak_producer, ds_id] {
+ if (weak_producer)
+ weak_producer->FinishDataSourceStop(ds_id);
+ });
}
void PerfProducer::FinishDataSourceStop(DataSourceInstanceID ds_id) {
PERFETTO_LOG("FinishDataSourceStop(%zu)", static_cast<size_t>(ds_id));
auto ds_it = data_sources_.find(ds_id);
PERFETTO_CHECK(ds_it != data_sources_.end());
- DataSource& ds = ds_it->second;
-
- PERFETTO_CHECK(ds.reader_stopping);
- PERFETTO_CHECK(ds.unwind_stopping);
+ DataSourceState& ds = ds_it->second;
+ PERFETTO_CHECK(ds.status == DataSourceState::Status::kShuttingDown);
ds.trace_writer->Flush();
- data_sources_.erase(ds_id);
- unwind_queues_.erase(ds_id);
+ data_sources_.erase(ds_it);
endpoint_->NotifyDataSourceStopped(ds_id);
// Clean up resources if there are no more active sources.
if (data_sources_.empty()) {
- // purge internings
- callstack_trie_.ClearTrie();
- // clear and re-enable libunwindstack's cache
- unwindstack::Elf::SetCachingEnabled(false);
- unwindstack::Elf::SetCachingEnabled(true);
+ callstack_trie_.ClearTrie(); // purge internings
}
}
diff --git a/src/profiling/perf/perf_producer.h b/src/profiling/perf/perf_producer.h
index 4d3f295..50f696a 100644
--- a/src/profiling/perf/perf_producer.h
+++ b/src/profiling/perf/perf_producer.h
@@ -38,16 +38,24 @@
#include "src/profiling/common/callstack_trie.h"
#include "src/profiling/common/interning_output.h"
#include "src/profiling/common/unwind_support.h"
+#include "src/profiling/perf/common_types.h"
#include "src/profiling/perf/event_config.h"
#include "src/profiling/perf/event_reader.h"
#include "src/profiling/perf/proc_descriptors.h"
+#include "src/profiling/perf/unwinding.h"
#include "src/tracing/core/metatrace_writer.h"
namespace perfetto {
namespace profiling {
-// TODO(b/144281346): work in progress. Do not use.
-class PerfProducer : public Producer, public ProcDescriptorDelegate {
+// TODO(rsavitski): describe the high-level architecture and threading. Rough
+// summary in the mean time: three stages: (1) kernel buffer reader that parses
+// the samples -> (2) callstack unwinder -> (3) interning and serialization of
+// samples. This class handles stages (1) and (3) on the main thread. Unwinding
+// is done by |Unwinder| on a dedicated thread.
+class PerfProducer : public Producer,
+ public ProcDescriptorDelegate,
+ public Unwinder::Delegate {
public:
PerfProducer(ProcDescriptorGetter* proc_fd_getter,
base::TaskRunner* task_runner);
@@ -81,8 +89,16 @@
base::ScopedFile maps_fd,
base::ScopedFile mem_fd) override;
+ // Unwinder::Delegate impl (callbacks from unwinder):
+ void PostEmitSample(DataSourceInstanceID ds_id,
+ CompletedSample sample) override;
+ void PostEmitSkippedSample(DataSourceInstanceID ds_id,
+ ProfilerStage stage,
+ ParsedSample sample) override;
+ void PostFinishDataSourceStop(DataSourceInstanceID ds_id) override;
+
private:
- // State of the connection to tracing service (traced).
+ // State of the producer's connection to tracing service (traced).
enum State {
kNotStarted = 0,
kNotConnected,
@@ -90,79 +106,41 @@
kConnected,
};
- struct DataSource {
- DataSource(EventConfig _event_cfg,
- std::unique_ptr<TraceWriter> _trace_writer,
- std::vector<EventReader> _per_cpu_readers)
+ // Represents the data source scoped view of a process. Specifically:
+ // * whether the process is in scope of the tracing session (if the latter
+ // specifies a target filter).
+ // * the state of the (possibly asynchronous) lookup of /proc/<pid>/{maps,mem}
+ // file descriptors, which are necessary for callstack unwinding of samples.
+ enum class ProcessTrackingStatus {
+ kInitial,
+ kResolving, // waiting on proc-fd lookup
+ kResolved, // proc-fds obtained, and process considered relevant
+ kExpired, // proc-fd lookup timed out
+ kRejected // process not considered relevant for the data source
+ };
+
+ struct DataSourceState {
+ enum class Status { kActive, kShuttingDown };
+
+ DataSourceState(EventConfig _event_cfg,
+ std::unique_ptr<TraceWriter> _trace_writer,
+ std::vector<EventReader> _per_cpu_readers)
: event_cfg(std::move(_event_cfg)),
trace_writer(std::move(_trace_writer)),
per_cpu_readers(std::move(_per_cpu_readers)) {}
+ Status status = Status::kActive;
const EventConfig event_cfg;
-
std::unique_ptr<TraceWriter> trace_writer;
// Indexed by cpu, vector never resized.
std::vector<EventReader> per_cpu_readers;
// Tracks the incremental state for interned entries.
InterningOutputTracker interning_output;
-
- bool reader_stopping = false;
- bool unwind_stopping = false;
-
- // TODO(rsavitski): under a single-threaded model, directly shared between
- // the reader and the "unwinder". If/when lifting unwinding into a separate
- // thread(s), the FDs will become owned by the unwinder, but the tracking
- // will need to be done by both sides (frontend needs to know whether to
- // resolve the pid, and the unwinder needs to know whether the fd is
- // ready/poisoned).
- // TODO(rsavitski): find a more descriptive name.
- struct ProcDescriptors {
- enum class Status {
- kInitial,
- kResolving,
- kResolved,
- kExpired,
- kRejected
- };
-
- Status status = Status::kInitial;
- UnwindingMetadata unwind_state{/*maps_fd=*/base::ScopedFile{},
- /*mem_fd=*/base::ScopedFile{}};
- };
- std::map<pid_t, ProcDescriptors> proc_fds; // keyed by pid
- };
-
- // Entry in an unwinding queue. Either a sample that requires unwinding, or a
- // tombstoned entry (valid == false).
- struct UnwindEntry {
- UnwindEntry(ParsedSample _sample)
- : valid(true), sample(std::move(_sample)) {}
-
- bool valid = false;
- ParsedSample sample;
- };
-
- // Fully processed sample that is ready for output.
- struct CompletedSample {
- // move-only
- CompletedSample() = default;
- CompletedSample(const CompletedSample&) = delete;
- CompletedSample& operator=(const CompletedSample&) = delete;
- CompletedSample(CompletedSample&&) = default;
- CompletedSample& operator=(CompletedSample&&) = default;
-
- uint32_t cpu = 0;
- pid_t pid = 0;
- pid_t tid = 0;
- uint64_t timestamp = 0;
- uint16_t cpu_mode = PERF_RECORD_MISC_CPUMODE_UNKNOWN;
- std::vector<FrameData> frames;
- unwindstack::ErrorCode unwind_error = unwindstack::ERROR_NONE;
- };
-
- enum class ProfilerStage {
- kRead = 0,
- kUnwind,
+ // Producer thread's view of sampled processes. This is the primary tracking
+ // structure, but a subset of updates are replicated to a similar structure
+ // in the |Unwinder|, which needs to track whether the necessary unwinding
+ // inputs for a given process' samples are ready.
+ std::map<pid_t, ProcessTrackingStatus> process_states;
};
void ConnectService();
@@ -170,6 +148,8 @@
void ResetConnectionBackoff();
void IncreaseConnectionBackoff();
+ // Periodic read task which reads a batch of samples from all kernel ring
+ // buffers associated with the given data source.
void TickDataSourceRead(DataSourceInstanceID ds_id);
// Returns *false* if the reader has caught up with the writer position, true
// otherwise. Return value is only useful if the underlying perf_event has
@@ -179,44 +159,32 @@
bool ReadAndParsePerCpuBuffer(EventReader* reader,
size_t max_samples,
DataSourceInstanceID ds_id,
- DataSource* ds);
+ DataSourceState* ds);
void PostDescriptorLookupTimeout(DataSourceInstanceID ds_id,
pid_t pid,
uint32_t timeout_ms);
void DescriptorLookupTimeout(DataSourceInstanceID ds_id, pid_t pid);
- void TickDataSourceUnwind(DataSourceInstanceID ds_id);
- // Returns true if we should keep processing the queue (i.e. we should
- // continue the unwinder ticks).
- bool ProcessUnwindQueue(DataSourceInstanceID ds_id,
- std::deque<UnwindEntry>* input_queue,
- DataSource* ds_ptr);
- CompletedSample UnwindSample(ParsedSample sample,
- DataSource::ProcDescriptors* process_state);
-
- void PostEmitSample(DataSourceInstanceID ds_id, CompletedSample sample);
void EmitSample(DataSourceInstanceID ds_id, CompletedSample sample);
void EmitRingBufferLoss(DataSourceInstanceID ds_id,
size_t cpu,
uint64_t records_lost);
// Emit a packet indicating that a sample was relevant, but skipped as it was
// considered to be not unwindable (e.g. the process no longer exists).
- void PostEmitSkippedSample(DataSourceInstanceID ds_id,
- ProfilerStage stage,
- ParsedSample sample);
void EmitSkippedSample(DataSourceInstanceID ds_id,
ProfilerStage stage,
ParsedSample sample);
- // Starts the shutdown of the given data source instance, starting with the
- // reader frontend.
- void InitiateReaderStop(DataSource* ds);
- // TODO(rsavitski): under a dedicated unwind thread, this becomes a PostTask
- // for the instance id.
- void InitiateUnwindStop(DataSource* ds);
- // Destroys the state belonging to this instance, and notifies the tracing
- // service of the stop.
+ // Starts the shutdown of the given data source instance, starting with
+ // pausing the reader frontend. Once the reader reaches the point where all
+ // kernel buffers have been fully consumed, it will notify the |Unwinder| to
+ // proceed with the shutdown sequence. The unwinder in turn will call back to
+ // this producer once there are no more outstanding samples for the data
+ // source at the unwinding stage.
+ void InitiateReaderStop(DataSourceState* ds);
+ // Destroys the state belonging to this instance, and acks the stop to the
+ // tracing service.
void FinishDataSourceStop(DataSourceInstanceID ds_id);
void StartMetatraceSource(DataSourceInstanceID ds_id, BufferID target_buffer);
@@ -247,8 +215,11 @@
// sequences.
GlobalCallstackTrie callstack_trie_;
- std::map<DataSourceInstanceID, DataSource> data_sources_;
- std::map<DataSourceInstanceID, std::deque<UnwindEntry>> unwind_queues_;
+ // State associated with perf-sampling data sources.
+ std::map<DataSourceInstanceID, DataSourceState> data_sources_;
+
+ // Unwinding stage, running on a dedicated thread.
+ UnwinderHandle unwinding_worker_;
base::WeakPtrFactory<PerfProducer> weak_factory_; // keep last
};
diff --git a/src/profiling/perf/unwind_queue.h b/src/profiling/perf/unwind_queue.h
index 8231251..b49b176 100644
--- a/src/profiling/perf/unwind_queue.h
+++ b/src/profiling/perf/unwind_queue.h
@@ -38,7 +38,7 @@
// Single-writer, single-reader ring buffer of fixed-size entries (of any
// default-constructible type). Size of the buffer is static for the lifetime of
// UnwindQueue, and must be a power of two.
-// Writer side appends entries once at a time, and must stop if there
+// Writer side appends entries one at a time, and must stop if there
// is no available capacity.
// Reader side sees all unconsumed entries, and can advance the reader position
// by any amount.
diff --git a/src/profiling/perf/unwinding.cc b/src/profiling/perf/unwinding.cc
new file mode 100644
index 0000000..c51c57e
--- /dev/null
+++ b/src/profiling/perf/unwinding.cc
@@ -0,0 +1,362 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/perf/unwinding.h"
+
+#include <inttypes.h>
+
+#include "perfetto/ext/base/metatrace.h"
+
+namespace {
+constexpr size_t kUnwindingMaxFrames = 1000;
+constexpr uint32_t kDataSourceShutdownRetryDelayMs = 400;
+} // namespace
+
+namespace perfetto {
+namespace profiling {
+
+Unwinder::Delegate::~Delegate() = default;
+
+void Unwinder::PostStartDataSource(DataSourceInstanceID ds_id) {
+ // No need for a weak pointer as the associated task runner quits (stops
+ // running tasks) strictly before the Unwinder's destruction.
+ task_runner_->PostTask([this, ds_id] { StartDataSource(ds_id); });
+}
+
+void Unwinder::StartDataSource(DataSourceInstanceID ds_id) {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
+ PERFETTO_DLOG("Unwinder::StartDataSource(%zu)", static_cast<size_t>(ds_id));
+
+ auto it_and_inserted = data_sources_.emplace(ds_id, DataSourceState{});
+ PERFETTO_DCHECK(it_and_inserted.second);
+}
+
+// c++11: use shared_ptr to transfer resource handles, so that the resources get
+// released even if the task runner is destroyed with pending tasks.
+// "Cleverness" warning:
+// the task will be executed on a different thread, and will mutate the
+// pointed-to memory. It may be the case that this posting thread will not
+// decrement its shared_ptr refcount until *after* the task has executed. In
+// that scenario, the destruction of the pointed-to memory will be happening on
+// the posting thread. This implies a data race between the mutation on the task
+// thread, and the destruction on the posting thread. *However*, we assume that
+// there is no race in practice due to refcount decrements having
+// release-acquire semantics. The refcount decrements pair with each other, and
+// therefore also serve as a memory barrier between the destructor, and any
+// previous modifications of the pointed-to memory.
+// TODO(rsavitski): present a more convincing argument, or reimplement
+// without relying on shared_ptr implementation details.
+void Unwinder::PostAdoptProcDescriptors(DataSourceInstanceID ds_id,
+ pid_t pid,
+ base::ScopedFile maps_fd,
+ base::ScopedFile mem_fd) {
+ auto shared_maps = std::make_shared<base::ScopedFile>(std::move(maps_fd));
+ auto shared_mem = std::make_shared<base::ScopedFile>(std::move(mem_fd));
+ task_runner_->PostTask([this, ds_id, pid, shared_maps, shared_mem] {
+ base::ScopedFile maps = std::move(*shared_maps.get());
+ base::ScopedFile mem = std::move(*shared_mem.get());
+ AdoptProcDescriptors(ds_id, pid, std::move(maps), std::move(mem));
+ });
+}
+
+void Unwinder::AdoptProcDescriptors(DataSourceInstanceID ds_id,
+ pid_t pid,
+ base::ScopedFile maps_fd,
+ base::ScopedFile mem_fd) {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
+ PERFETTO_DLOG("Unwinder::AdoptProcDescriptors(%zu, %d, %d, %d)",
+ static_cast<size_t>(ds_id), static_cast<int>(pid),
+ maps_fd.get(), mem_fd.get());
+
+ auto it = data_sources_.find(ds_id);
+ PERFETTO_CHECK(it != data_sources_.end());
+ DataSourceState& ds = it->second;
+
+ ProcessState& proc_state = ds.process_states[pid]; // insert if new
+ PERFETTO_DCHECK(proc_state.status != ProcessState::Status::kResolved);
+
+ proc_state.status = ProcessState::Status::kResolved;
+ proc_state.unwind_state =
+ UnwindingMetadata{std::move(maps_fd), std::move(mem_fd)};
+}
+
+void Unwinder::PostRecordTimedOutProcDescriptors(DataSourceInstanceID ds_id,
+ pid_t pid) {
+ task_runner_->PostTask(
+ [this, ds_id, pid] { RecordTimedOutProcDescriptors(ds_id, pid); });
+}
+
+void Unwinder::RecordTimedOutProcDescriptors(DataSourceInstanceID ds_id,
+ pid_t pid) {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
+ PERFETTO_DLOG("Unwinder::RecordTimedOutProcDescriptors(%zu, %d)",
+ static_cast<size_t>(ds_id), static_cast<int>(pid));
+
+ auto it = data_sources_.find(ds_id);
+ PERFETTO_CHECK(it != data_sources_.end());
+ DataSourceState& ds = it->second;
+
+ ProcessState& proc_state = ds.process_states[pid]; // insert if new
+ PERFETTO_DCHECK(proc_state.status == ProcessState::Status::kResolving);
+
+ proc_state.status = ProcessState::Status::kExpired;
+ // proc_state.unwind_state is clear as the preceding state is kResolving
+}
+
+void Unwinder::PostProcessQueue() {
+ task_runner_->PostTask([this] { ProcessQueue(); });
+}
+
+// Note: we always walk the queue in order. So if there are multiple data
+// sources, one of which is shutting down, its shutdown can be delayed by
+// unwinding of other sources' samples. Instead, we could scan the queue
+// multiple times, prioritizing the samples for shutting-down sources. At the
+// time of writing, the earlier is considered to be fair enough.
+void Unwinder::ProcessQueue() {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
+ PERFETTO_METATRACE_SCOPED(TAG_PRODUCER, PROFILER_UNWIND_TICK);
+ PERFETTO_DLOG("Unwinder::ProcessQueue");
+
+ base::FlatSet<DataSourceInstanceID> pending_sample_sources =
+ ConsumeAndUnwindReadySamples();
+
+ // Deal with the possiblity of data sources that are shutting down.
+ bool post_delayed_reprocess = false;
+ base::FlatSet<DataSourceInstanceID> sources_to_stop;
+ for (auto& id_and_ds : data_sources_) {
+ DataSourceInstanceID ds_id = id_and_ds.first;
+ const DataSourceState& ds = id_and_ds.second;
+
+ if (ds.status == DataSourceState::Status::kActive)
+ continue;
+
+ // Data source that is shutting down. If we're still waiting on proc-fds (or
+ // the lookup to time out) for samples in the queue - repost a later
+ // attempt (as there is no guarantee that there are any readers waking up
+ // the unwinder anymore).
+ if (pending_sample_sources.count(ds_id)) {
+ PERFETTO_DLOG(
+ "Unwinder delaying DS(%zu) stop: waiting on a pending sample",
+ static_cast<size_t>(ds_id));
+ post_delayed_reprocess = true;
+ } else {
+ // Otherwise, proceed with tearing down data source state (after
+ // completing the loop, to avoid invalidating the iterator).
+ sources_to_stop.insert(ds_id);
+ }
+ }
+
+ for (auto ds_id : sources_to_stop)
+ FinishDataSourceStop(ds_id);
+
+ if (post_delayed_reprocess)
+ task_runner_->PostDelayedTask([this] { ProcessQueue(); },
+ kDataSourceShutdownRetryDelayMs);
+}
+
+base::FlatSet<DataSourceInstanceID> Unwinder::ConsumeAndUnwindReadySamples() {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
+ base::FlatSet<DataSourceInstanceID> pending_sample_sources;
+
+ // Use a single snapshot of the ring buffer pointers.
+ ReadView read_view = unwind_queue_.BeginRead();
+
+ PERFETTO_METATRACE_COUNTER(
+ TAG_PRODUCER, PROFILER_UNWIND_QUEUE_SZ,
+ static_cast<int32_t>(read_view.write_pos - read_view.read_pos));
+
+ if (read_view.read_pos == read_view.write_pos)
+ return pending_sample_sources;
+
+ // Walk the queue.
+ for (auto read_pos = read_view.read_pos; read_pos < read_view.write_pos;
+ read_pos++) {
+ UnwindEntry& entry = unwind_queue_.at(read_pos);
+
+ if (!entry.valid)
+ continue; // already processed
+
+ auto it = data_sources_.find(entry.data_source_id);
+ PERFETTO_CHECK(it != data_sources_.end());
+ DataSourceState& ds = it->second;
+
+ pid_t pid = entry.sample.pid;
+ ProcessState& proc_state = ds.process_states[pid]; // insert if new
+
+ // Giving up on the sample (proc-fd lookup timed out).
+ if (proc_state.status == ProcessState::Status::kExpired) {
+ PERFETTO_DLOG("Unwinder skipping sample for pid [%d]",
+ static_cast<int>(pid));
+
+ delegate_->PostEmitSkippedSample(entry.data_source_id,
+ ProfilerStage::kUnwind,
+ std::move(entry.sample));
+ entry.valid = false;
+ continue;
+ }
+
+ // Still waiting on the proc-fds.
+ if (proc_state.status == ProcessState::Status::kResolving) {
+ PERFETTO_DLOG("Unwinder deferring sample for pid [%d]",
+ static_cast<int>(pid));
+
+ pending_sample_sources.insert(entry.data_source_id);
+ continue;
+ }
+
+ // Sample ready - process it.
+ if (proc_state.status == ProcessState::Status::kResolved) {
+ PERFETTO_METATRACE_SCOPED(TAG_PRODUCER, PROFILER_UNWIND_SAMPLE);
+
+ CompletedSample unwound_sample =
+ UnwindSample(entry.sample, &proc_state.unwind_state);
+
+ delegate_->PostEmitSample(entry.data_source_id,
+ std::move(unwound_sample));
+ entry.valid = false;
+ continue;
+ }
+ }
+
+ // Consume all leading processed entries in the queue.
+ auto new_read_pos = read_view.read_pos;
+ for (; new_read_pos < read_view.write_pos; new_read_pos++) {
+ UnwindEntry& entry = unwind_queue_.at(new_read_pos);
+ if (entry.valid)
+ break;
+ }
+ if (new_read_pos != read_view.read_pos)
+ unwind_queue_.CommitNewReadPosition(new_read_pos);
+
+ PERFETTO_METATRACE_COUNTER(
+ TAG_PRODUCER, PROFILER_UNWIND_QUEUE_SZ,
+ static_cast<int32_t>(read_view.write_pos - new_read_pos));
+
+ PERFETTO_DLOG("Unwind queue drain: [%" PRIu64 "]->[%" PRIu64 "]",
+ read_view.write_pos - read_view.read_pos,
+ read_view.write_pos - new_read_pos);
+
+ return pending_sample_sources;
+}
+
+CompletedSample Unwinder::UnwindSample(const ParsedSample& sample,
+ UnwindingMetadata* unwind_state) {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
+ PERFETTO_DCHECK(unwind_state);
+
+ CompletedSample ret;
+ ret.cpu = sample.cpu;
+ ret.pid = sample.pid;
+ ret.tid = sample.tid;
+ ret.timestamp = sample.timestamp;
+ ret.cpu_mode = sample.cpu_mode;
+
+ // Overlay the stack bytes over /proc/<pid>/mem.
+ std::shared_ptr<unwindstack::Memory> overlay_memory =
+ std::make_shared<StackOverlayMemory>(
+ unwind_state->fd_mem, sample.regs->sp(),
+ reinterpret_cast<const uint8_t*>(sample.stack.data()),
+ sample.stack.size());
+
+ // Unwindstack clobbers registers, so make a copy in case we need to retry.
+ auto regs_copy = std::unique_ptr<unwindstack::Regs>{sample.regs->Clone()};
+
+ unwindstack::ErrorCode error_code = unwindstack::ERROR_NONE;
+ unwindstack::Unwinder unwinder(kUnwindingMaxFrames, &unwind_state->fd_maps,
+ regs_copy.get(), overlay_memory);
+
+ // TODO(rsavitski): consider rate-limiting unwind retries.
+ for (int attempt = 0; attempt < 2; attempt++) {
+#if PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD)
+ unwinder.SetJitDebug(unwind_state->jit_debug.get(), regs_copy->Arch());
+ unwinder.SetDexFiles(unwind_state->dex_files.get(), regs_copy->Arch());
+#endif
+ unwinder.Unwind(/*initial_map_names_to_skip=*/nullptr,
+ /*map_suffixes_to_ignore=*/nullptr);
+ error_code = unwinder.LastErrorCode();
+ if (error_code != unwindstack::ERROR_INVALID_MAP)
+ break;
+
+ // Otherwise, reparse the maps, and possibly retry the unwind.
+ PERFETTO_DLOG("Reparsing maps for pid [%d]", static_cast<int>(sample.pid));
+ unwind_state->ReparseMaps();
+ }
+
+ PERFETTO_DLOG("Frames from unwindstack for pid [%d]:",
+ static_cast<int>(sample.pid));
+ std::vector<unwindstack::FrameData> frames = unwinder.ConsumeFrames();
+ for (unwindstack::FrameData& frame : frames) {
+ if (PERFETTO_DLOG_IS_ON())
+ PERFETTO_DLOG("%s", unwinder.FormatFrame(frame).c_str());
+
+ ret.frames.emplace_back(unwind_state->AnnotateFrame(std::move(frame)));
+ }
+
+ // In case of an unwinding error, add a synthetic error frame (which will
+ // appear as a caller of the partially-unwound fragment), for easier
+ // visualization of errors.
+ if (error_code != unwindstack::ERROR_NONE) {
+ PERFETTO_DLOG("Unwinding error %" PRIu8, error_code);
+ unwindstack::FrameData frame_data{};
+ frame_data.function_name = "ERROR " + std::to_string(error_code);
+ frame_data.map_name = "ERROR";
+ ret.frames.emplace_back(std::move(frame_data), /*build_id=*/"");
+ ret.unwind_error = error_code;
+ }
+
+ return ret;
+}
+
+void Unwinder::PostInitiateDataSourceStop(DataSourceInstanceID ds_id) {
+ task_runner_->PostTask([this, ds_id] { InitiateDataSourceStop(ds_id); });
+}
+
+void Unwinder::InitiateDataSourceStop(DataSourceInstanceID ds_id) {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
+ PERFETTO_DLOG("Unwinder::InitiateDataSourceStop(%zu)",
+ static_cast<size_t>(ds_id));
+
+ auto it = data_sources_.find(ds_id);
+ PERFETTO_CHECK(it != data_sources_.end());
+ DataSourceState& ds = it->second;
+
+ PERFETTO_CHECK(ds.status == DataSourceState::Status::kActive);
+ ds.status = DataSourceState::Status::kShuttingDown;
+
+ // Make sure that there's an outstanding task to process the unwinding queue,
+ // as it is the point that evaluates the stop condition.
+ PostProcessQueue();
+}
+
+void Unwinder::FinishDataSourceStop(DataSourceInstanceID ds_id) {
+ PERFETTO_DCHECK_THREAD(thread_checker_);
+ PERFETTO_DLOG("Unwinder::FinishDataSourceStop(%zu)",
+ static_cast<size_t>(ds_id));
+
+ auto it = data_sources_.find(ds_id);
+ PERFETTO_CHECK(it != data_sources_.end());
+ DataSourceState& ds = it->second;
+
+ // Drop unwinder's state tied to the source.
+ PERFETTO_CHECK(ds.status == DataSourceState::Status::kShuttingDown);
+ data_sources_.erase(it);
+
+ // Inform service thread that the unwinder is done with the source.
+ delegate_->PostFinishDataSourceStop(ds_id);
+}
+
+} // namespace profiling
+} // namespace perfetto
diff --git a/src/profiling/perf/unwinding.h b/src/profiling/perf/unwinding.h
new file mode 100644
index 0000000..c77de5e
--- /dev/null
+++ b/src/profiling/perf/unwinding.h
@@ -0,0 +1,231 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_PROFILING_PERF_UNWINDING_H_
+#define SRC_PROFILING_PERF_UNWINDING_H_
+
+#include <map>
+#include <thread>
+
+#include <linux/perf_event.h>
+#include <stdint.h>
+
+#include <unwindstack/Error.h>
+
+#include "perfetto/base/flat_set.h"
+#include "perfetto/base/logging.h"
+#include "perfetto/ext/base/thread_checker.h"
+#include "perfetto/ext/base/unix_task_runner.h"
+#include "perfetto/ext/tracing/core/basic_types.h"
+#include "src/profiling/common/unwind_support.h"
+#include "src/profiling/perf/common_types.h"
+#include "src/profiling/perf/unwind_queue.h"
+
+namespace {
+constexpr static uint32_t kUnwindQueueCapacity = 2048;
+}
+
+namespace perfetto {
+namespace profiling {
+
+// Unwinds callstacks based on the sampled stack and register state (see
+// |ParsedSample|). Has a single unwinding ring queue, shared across
+// all data sources.
+//
+// Samples cannot be unwound without having /proc/<pid>/{maps,mem} file
+// descriptors for that process. This lookup can be asynchronous (e.g. on
+// Android), so the unwinder might have to wait before it can process (or
+// discard) some of the enqueued samples. To avoid blocking the entire queue,
+// the unwinder is allowed to process the entries out of order.
+//
+// Besides the queue, all interactions between the unwinder and the rest of the
+// producer logic are through posted tasks.
+//
+// As unwinding times are long-tailed (example measurements: median <1ms,
+// worst-case ~1000ms), the unwinder runs on a dedicated thread to avoid
+// starving the rest of the producer's work (including IPC and consumption of
+// records from the kernel ring buffers).
+//
+// This class should not be instantiated directly, use the |UnwinderHandle|
+// below instead.
+//
+// TODO(rsavitski): while the inputs to the unwinder are batched as a result of
+// the reader posting a wakeup only after consuming a batch of kernel samples,
+// the Unwinder might be staggering wakeups for the producer thread by posting a
+// task every time a sample has been unwound. Evaluate how bad these wakeups are
+// in practice, and consider also implementing a batching strategy for the
+// unwinder->serialization handoff (which isn't very latency-sensitive).
+// TODO(rsavitski): re-enable unwindstack's cache with fixed / external locking
+// (the enabling and disabling is not thread-safe).
+class Unwinder {
+ public:
+ friend class UnwinderHandle;
+
+ // Callbacks from the unwinder to the primary producer thread.
+ class Delegate {
+ public:
+ virtual void PostEmitSample(DataSourceInstanceID ds_id,
+ CompletedSample sample) = 0;
+ virtual void PostEmitSkippedSample(DataSourceInstanceID ds_id,
+ ProfilerStage stage,
+ ParsedSample sample) = 0;
+ virtual void PostFinishDataSourceStop(DataSourceInstanceID ds_id) = 0;
+
+ virtual ~Delegate();
+ };
+
+ ~Unwinder() { PERFETTO_DCHECK_THREAD(thread_checker_); }
+
+ void PostStartDataSource(DataSourceInstanceID ds_id);
+ void PostAdoptProcDescriptors(DataSourceInstanceID ds_id,
+ pid_t pid,
+ base::ScopedFile maps_fd,
+ base::ScopedFile mem_fd);
+ void PostRecordTimedOutProcDescriptors(DataSourceInstanceID ds_id, pid_t pid);
+ void PostProcessQueue();
+ void PostInitiateDataSourceStop(DataSourceInstanceID ds_id);
+
+ UnwindQueue<UnwindEntry, kUnwindQueueCapacity>& unwind_queue() {
+ return unwind_queue_;
+ }
+
+ private:
+ struct ProcessState {
+ enum class Status {
+ kResolving, // unwinder waiting on proc-fds for the process
+ kResolved, // proc-fds available, can unwind samples
+ kExpired // proc-fd lookup timed out, will discard samples
+ };
+
+ Status status = Status::kResolving;
+ UnwindingMetadata unwind_state{/*maps_fd=*/base::ScopedFile{},
+ /*mem_fd=*/base::ScopedFile{}};
+ };
+
+ struct DataSourceState {
+ enum class Status { kActive, kShuttingDown };
+
+ Status status = Status::kActive;
+ std::map<pid_t, ProcessState> process_states;
+ };
+
+ // Must be instantiated via the |UnwinderHandle|.
+ Unwinder(Delegate* delegate, base::UnixTaskRunner* task_runner)
+ : task_runner_(task_runner), delegate_(delegate) {}
+
+ // Marks the data source as valid and active at the unwinding stage.
+ void StartDataSource(DataSourceInstanceID ds_id);
+
+ void AdoptProcDescriptors(DataSourceInstanceID ds_id,
+ pid_t pid,
+ base::ScopedFile maps_fd,
+ base::ScopedFile mem_fd);
+ void RecordTimedOutProcDescriptors(DataSourceInstanceID ds_id, pid_t pid);
+
+ // Primary task. Processes the enqueued samples using
+ // |ConsumeAndUnwindReadySamples|, and re-evaluates data source state.
+ void ProcessQueue();
+
+ // Processes the enqueued samples for which all unwinding inputs are ready.
+ // Returns the set of data source instances which still have samples pending
+ // (i.e. waiting on the proc-fds).
+ base::FlatSet<DataSourceInstanceID> ConsumeAndUnwindReadySamples();
+
+ CompletedSample UnwindSample(const ParsedSample& sample,
+ UnwindingMetadata* unwind_state);
+
+ // Marks the data source as shutting down at the unwinding stage. It is known
+ // that no new samples for this source will be pushed into the queue, but we
+ // need to delay the unwinder state teardown until all previously-enqueued
+ // samples for this source are processed.
+ void InitiateDataSourceStop(DataSourceInstanceID ds_id);
+
+ // Tears down unwinding state for the data source without any outstanding
+ // samples, and informs the service that it can continue the shutdown
+ // sequence.
+ void FinishDataSourceStop(DataSourceInstanceID ds_id);
+
+ base::UnixTaskRunner* const task_runner_;
+ Delegate* const delegate_;
+ UnwindQueue<UnwindEntry, kUnwindQueueCapacity> unwind_queue_;
+ std::map<DataSourceInstanceID, DataSourceState> data_sources_;
+
+ PERFETTO_THREAD_CHECKER(thread_checker_)
+};
+
+// Owning resource handle for an |Unwinder| with a dedicated task thread.
+// Ensures that the |Unwinder| is constructed and destructed on the task thread.
+// TODO(rsavitski): update base::ThreadTaskRunner to allow for this pattern of
+// owned state, and consolidate.
+class UnwinderHandle {
+ public:
+ UnwinderHandle(Unwinder::Delegate* delegate) {
+ std::mutex init_lock;
+ std::condition_variable init_cv;
+
+ std::function<void(base::UnixTaskRunner*, Unwinder*)> initializer =
+ [this, &init_lock, &init_cv](base::UnixTaskRunner* task_runner,
+ Unwinder* unwinder) {
+ std::lock_guard<std::mutex> lock(init_lock);
+ task_runner_ = task_runner;
+ unwinder_ = unwinder;
+ // Notify while still holding the lock, as init_cv ceases to exist as
+ // soon as the main thread observes a non-null task_runner_, and it
+ // can wake up spuriously (i.e. before the notify if we had unlocked
+ // before notifying).
+ init_cv.notify_one();
+ };
+
+ thread_ = std::thread(&UnwinderHandle::RunTaskThread, this,
+ std::move(initializer), delegate);
+
+ std::unique_lock<std::mutex> lock(init_lock);
+ init_cv.wait(lock, [this] { return !!task_runner_ && !!unwinder_; });
+ }
+
+ ~UnwinderHandle() {
+ if (task_runner_) {
+ PERFETTO_CHECK(!task_runner_->QuitCalled());
+ task_runner_->Quit();
+
+ PERFETTO_DCHECK(thread_.joinable());
+ }
+ if (thread_.joinable())
+ thread_.join();
+ }
+
+ Unwinder* operator->() { return unwinder_; }
+
+ private:
+ void RunTaskThread(
+ std::function<void(base::UnixTaskRunner*, Unwinder*)> initializer,
+ Unwinder::Delegate* delegate) {
+ base::UnixTaskRunner task_runner;
+ Unwinder unwinder(delegate, &task_runner);
+ task_runner.PostTask(
+ std::bind(std::move(initializer), &task_runner, &unwinder));
+ task_runner.Run();
+ }
+
+ std::thread thread_;
+ base::UnixTaskRunner* task_runner_ = nullptr;
+ Unwinder* unwinder_ = nullptr;
+};
+
+} // namespace profiling
+} // namespace perfetto
+
+#endif // SRC_PROFILING_PERF_UNWINDING_H_