Merge "Make flush async in probes producer"
diff --git a/src/traced/probes/filesystem/inode_file_data_source.cc b/src/traced/probes/filesystem/inode_file_data_source.cc
index 9d3a098..02eed04 100644
--- a/src/traced/probes/filesystem/inode_file_data_source.cc
+++ b/src/traced/probes/filesystem/inode_file_data_source.cc
@@ -182,9 +182,10 @@
PERFETTO_DLOG("%" PRIu64 " inodes found in cache", cache_found_count);
}
-void InodeFileDataSource::Flush() {
+void InodeFileDataSource::Flush(FlushRequestID,
+ std::function<void()> callback) {
ResetTracePacket();
- writer_->Flush();
+ writer_->Flush(callback);
}
void InodeFileDataSource::OnInodes(
diff --git a/src/traced/probes/filesystem/inode_file_data_source.h b/src/traced/probes/filesystem/inode_file_data_source.h
index 143df66..2afdeed 100644
--- a/src/traced/probes/filesystem/inode_file_data_source.h
+++ b/src/traced/probes/filesystem/inode_file_data_source.h
@@ -85,7 +85,7 @@
// ProbesDataSource implementation.
void Start() override;
- void Flush() override;
+ void Flush(FlushRequestID, std::function<void()> callback) override;
protected:
std::multimap<BlockDeviceID, std::string> mount_points_;
diff --git a/src/traced/probes/ftrace/ftrace_data_source.cc b/src/traced/probes/ftrace/ftrace_data_source.cc
index d7bc843..0272cd4 100644
--- a/src/traced/probes/ftrace/ftrace_data_source.cc
+++ b/src/traced/probes/ftrace/ftrace_data_source.cc
@@ -65,7 +65,7 @@
controller_weak_->DumpFtraceStats(stats);
}
-void FtraceDataSource::Flush() {
+void FtraceDataSource::Flush(FlushRequestID, std::function<void()> callback) {
// TODO(primiano): this still doesn't flush data from the kernel ftrace
// buffers (see b/73886018). We should do that and delay the
// NotifyFlushComplete() until the ftrace data has been drained from the
@@ -73,7 +73,7 @@
if (!writer_)
return;
WriteStats();
- writer_->Flush();
+ writer_->Flush(callback);
}
void FtraceDataSource::WriteStats() {
diff --git a/src/traced/probes/ftrace/ftrace_data_source.h b/src/traced/probes/ftrace/ftrace_data_source.h
index e576a5a..f15c1d1 100644
--- a/src/traced/probes/ftrace/ftrace_data_source.h
+++ b/src/traced/probes/ftrace/ftrace_data_source.h
@@ -67,7 +67,7 @@
// Flushes the ftrace buffers into the userspace trace buffers and writes
// also ftrace stats.
- void Flush() override;
+ void Flush(FlushRequestID, std::function<void()> callback) override;
FtraceConfigId config_id() const { return config_id_; }
const FtraceConfig& config() const { return config_; }
diff --git a/src/traced/probes/probes_data_source.h b/src/traced/probes/probes_data_source.h
index 13d47e1..1e0e8cb 100644
--- a/src/traced/probes/probes_data_source.h
+++ b/src/traced/probes/probes_data_source.h
@@ -17,6 +17,8 @@
#ifndef SRC_TRACED_PROBES_PROBES_DATA_SOURCE_H_
#define SRC_TRACED_PROBES_PROBES_DATA_SOURCE_H_
+#include <functional>
+
#include "perfetto/tracing/core/basic_types.h"
namespace perfetto {
@@ -29,7 +31,7 @@
virtual ~ProbesDataSource();
virtual void Start() = 0;
- virtual void Flush() = 0;
+ virtual void Flush(FlushRequestID, std::function<void()> callback) = 0;
const TracingSessionID tracing_session_id;
const int type_id;
diff --git a/src/traced/probes/probes_producer.cc b/src/traced/probes/probes_producer.cc
index c309180..b7d666c 100644
--- a/src/traced/probes/probes_producer.cc
+++ b/src/traced/probes/probes_producer.cc
@@ -47,6 +47,10 @@
constexpr uint32_t kInitialConnectionBackoffMs = 100;
constexpr uint32_t kMaxConnectionBackoffMs = 30 * 1000;
+
+// Should be larger than FtraceController::kFlushTimeoutMs.
+constexpr uint32_t kFlushTimeoutMs = 1000;
+
constexpr char kFtraceSourceName[] = "linux.ftrace";
constexpr char kProcessStatsSourceName[] = "linux.process_stats";
constexpr char kInodeMapSourceName[] = "linux.inode_file_map";
@@ -280,12 +284,64 @@
void ProbesProducer::Flush(FlushRequestID flush_request_id,
const DataSourceInstanceID* data_source_ids,
size_t num_data_sources) {
+ PERFETTO_DCHECK(flush_request_id);
+ auto weak_this = weak_factory_.GetWeakPtr();
+
+ // Issue a Flush() to all started data sources.
+ bool flush_queued = false;
for (size_t i = 0; i < num_data_sources; i++) {
- auto it = data_sources_.find(data_source_ids[i]);
+ DataSourceInstanceID ds_id = data_source_ids[i];
+ auto it = data_sources_.find(ds_id);
if (it == data_sources_.end() || !it->second->started)
continue;
- it->second->Flush();
+ pending_flushes_.emplace(flush_request_id, ds_id);
+ flush_queued = true;
+ auto flush_callback = [weak_this, flush_request_id, ds_id] {
+ if (weak_this)
+ weak_this->OnDataSourceFlushComplete(flush_request_id, ds_id);
+ };
+ it->second->Flush(flush_request_id, flush_callback);
}
+
+ // If there is nothing to flush, ack immediately.
+ if (!flush_queued) {
+ endpoint_->NotifyFlushComplete(flush_request_id);
+ return;
+ }
+
+ // Otherwise, post the timeout task.
+ task_runner_->PostDelayedTask(
+ [weak_this, flush_request_id] {
+ if (weak_this)
+ weak_this->OnFlushTimeout(flush_request_id);
+ },
+ kFlushTimeoutMs);
+}
+
+void ProbesProducer::OnDataSourceFlushComplete(FlushRequestID flush_request_id,
+ DataSourceInstanceID ds_id) {
+ PERFETTO_DLOG("Flush %" PRIu64 " acked by data source %" PRIu64,
+ flush_request_id, ds_id);
+ auto range = pending_flushes_.equal_range(flush_request_id);
+ for (auto it = range.first; it != range.second; it++) {
+ if (it->second == ds_id) {
+ pending_flushes_.erase(it);
+ break;
+ }
+ }
+
+ if (pending_flushes_.count(flush_request_id))
+ return; // Still waiting for other data sources to ack.
+
+ PERFETTO_DLOG("All data sources acked to flush %" PRIu64, flush_request_id);
+ endpoint_->NotifyFlushComplete(flush_request_id);
+}
+
+void ProbesProducer::OnFlushTimeout(FlushRequestID flush_request_id) {
+ if (pending_flushes_.count(flush_request_id) == 0)
+ return; // All acked.
+ PERFETTO_ELOG("Flush(%" PRIu64 ") timed out", flush_request_id);
+ pending_flushes_.erase(flush_request_id);
endpoint_->NotifyFlushComplete(flush_request_id);
}
diff --git a/src/traced/probes/probes_producer.h b/src/traced/probes/probes_producer.h
index 14737d6..44ac19e 100644
--- a/src/traced/probes/probes_producer.h
+++ b/src/traced/probes/probes_producer.h
@@ -95,6 +95,8 @@
void Restart();
void ResetConnectionBackoff();
void IncreaseConnectionBackoff();
+ void OnDataSourceFlushComplete(FlushRequestID, DataSourceInstanceID);
+ void OnFlushTimeout(FlushRequestID);
State state_ = kNotStarted;
base::TaskRunner* task_runner_ = nullptr;
@@ -112,6 +114,9 @@
std::unordered_multimap<TracingSessionID, ProbesDataSource*>
session_data_sources_;
+ std::unordered_multimap<FlushRequestID, DataSourceInstanceID>
+ pending_flushes_;
+
std::unordered_map<DataSourceInstanceID, base::Watchdog::Timer> watchdogs_;
LRUInodeCache cache_{kLRUInodeCacheSize};
std::map<BlockDeviceID, std::unordered_map<Inode, InodeMapValue>>
diff --git a/src/traced/probes/ps/process_stats_data_source.cc b/src/traced/probes/ps/process_stats_data_source.cc
index ddff880..858f25f 100644
--- a/src/traced/probes/ps/process_stats_data_source.cc
+++ b/src/traced/probes/ps/process_stats_data_source.cc
@@ -161,11 +161,12 @@
FinalizeCurPacket();
}
-void ProcessStatsDataSource::Flush() {
+void ProcessStatsDataSource::Flush(FlushRequestID,
+ std::function<void()> callback) {
// We shouldn't get this in the middle of WriteAllProcesses() or OnPids().
PERFETTO_DCHECK(!cur_ps_tree_);
PERFETTO_DCHECK(!cur_ps_stats_);
- writer_->Flush();
+ writer_->Flush(callback);
}
void ProcessStatsDataSource::WriteProcessOrThread(int32_t pid) {
diff --git a/src/traced/probes/ps/process_stats_data_source.h b/src/traced/probes/ps/process_stats_data_source.h
index 65b6e2e..f5329cf 100644
--- a/src/traced/probes/ps/process_stats_data_source.h
+++ b/src/traced/probes/ps/process_stats_data_source.h
@@ -57,7 +57,7 @@
// ProbesDataSource implementation.
void Start() override;
- void Flush() override;
+ void Flush(FlushRequestID, std::function<void()> callback) override;
bool on_demand_dumps_enabled() const { return enable_on_demand_dumps_; }
diff --git a/src/traced/probes/ps/process_stats_data_source_unittest.cc b/src/traced/probes/ps/process_stats_data_source_unittest.cc
index 4587cda..b985bcc 100644
--- a/src/traced/probes/ps/process_stats_data_source_unittest.cc
+++ b/src/traced/probes/ps/process_stats_data_source_unittest.cc
@@ -169,7 +169,7 @@
data_source->Start();
task_runner_.RunUntilCheckpoint("all_done");
- data_source->Flush();
+ data_source->Flush(1 /* FlushRequestId */, []() {});
// |packet| will contain the merge of all kNumIter packets written.
std::unique_ptr<protos::TracePacket> packet = writer_raw_->ParseProto();
diff --git a/src/traced/probes/sys_stats/sys_stats_data_source.cc b/src/traced/probes/sys_stats/sys_stats_data_source.cc
index 61d869d..befe396 100644
--- a/src/traced/probes/sys_stats/sys_stats_data_source.cc
+++ b/src/traced/probes/sys_stats/sys_stats_data_source.cc
@@ -297,8 +297,8 @@
return weak_factory_.GetWeakPtr();
}
-void SysStatsDataSource::Flush() {
- writer_->Flush();
+void SysStatsDataSource::Flush(FlushRequestID, std::function<void()> callback) {
+ writer_->Flush(callback);
}
size_t SysStatsDataSource::ReadFile(base::ScopedFile* fd, const char* path) {
diff --git a/src/traced/probes/sys_stats/sys_stats_data_source.h b/src/traced/probes/sys_stats/sys_stats_data_source.h
index bdb5e10..0fa085c 100644
--- a/src/traced/probes/sys_stats/sys_stats_data_source.h
+++ b/src/traced/probes/sys_stats/sys_stats_data_source.h
@@ -56,7 +56,7 @@
// ProbesDataSource implementation.
void Start() override;
- void Flush() override;
+ void Flush(FlushRequestID, std::function<void()> callback) override;
base::WeakPtr<SysStatsDataSource> GetWeakPtr() const;
diff --git a/src/tracing/core/trace_writer_impl.cc b/src/tracing/core/trace_writer_impl.cc
index bfdf1f4..0d0e3e2 100644
--- a/src/tracing/core/trace_writer_impl.cc
+++ b/src/tracing/core/trace_writer_impl.cc
@@ -68,10 +68,12 @@
if (cur_chunk_.is_valid()) {
shmem_arbiter_->ReturnCompletedChunk(std::move(cur_chunk_), target_buffer_,
&patch_list_);
- shmem_arbiter_->FlushPendingCommitDataRequests(callback);
} else {
PERFETTO_DCHECK(patch_list_.empty());
}
+ // Always issue the Flush request, even if there is nothing to flush, just
+ // for the sake of getting the callback posted back.
+ shmem_arbiter_->FlushPendingCommitDataRequests(callback);
protobuf_stream_writer_.Reset({nullptr, nullptr});
}