Merge "Make flush async in probes producer"
diff --git a/src/traced/probes/filesystem/inode_file_data_source.cc b/src/traced/probes/filesystem/inode_file_data_source.cc
index 9d3a098..02eed04 100644
--- a/src/traced/probes/filesystem/inode_file_data_source.cc
+++ b/src/traced/probes/filesystem/inode_file_data_source.cc
@@ -182,9 +182,10 @@
     PERFETTO_DLOG("%" PRIu64 " inodes found in cache", cache_found_count);
 }
 
-void InodeFileDataSource::Flush() {
+void InodeFileDataSource::Flush(FlushRequestID,
+                                std::function<void()> callback) {
   ResetTracePacket();
-  writer_->Flush();
+  writer_->Flush(callback);
 }
 
 void InodeFileDataSource::OnInodes(
diff --git a/src/traced/probes/filesystem/inode_file_data_source.h b/src/traced/probes/filesystem/inode_file_data_source.h
index 143df66..2afdeed 100644
--- a/src/traced/probes/filesystem/inode_file_data_source.h
+++ b/src/traced/probes/filesystem/inode_file_data_source.h
@@ -85,7 +85,7 @@
 
   // ProbesDataSource implementation.
   void Start() override;
-  void Flush() override;
+  void Flush(FlushRequestID, std::function<void()> callback) override;
 
  protected:
   std::multimap<BlockDeviceID, std::string> mount_points_;
diff --git a/src/traced/probes/ftrace/ftrace_data_source.cc b/src/traced/probes/ftrace/ftrace_data_source.cc
index d7bc843..0272cd4 100644
--- a/src/traced/probes/ftrace/ftrace_data_source.cc
+++ b/src/traced/probes/ftrace/ftrace_data_source.cc
@@ -65,7 +65,7 @@
     controller_weak_->DumpFtraceStats(stats);
 }
 
-void FtraceDataSource::Flush() {
+void FtraceDataSource::Flush(FlushRequestID, std::function<void()> callback) {
   // TODO(primiano): this still doesn't flush data from the kernel ftrace
   // buffers (see b/73886018). We should do that and delay the
   // NotifyFlushComplete() until the ftrace data has been drained from the
@@ -73,7 +73,7 @@
   if (!writer_)
     return;
   WriteStats();
-  writer_->Flush();
+  writer_->Flush(callback);
 }
 
 void FtraceDataSource::WriteStats() {
diff --git a/src/traced/probes/ftrace/ftrace_data_source.h b/src/traced/probes/ftrace/ftrace_data_source.h
index e576a5a..f15c1d1 100644
--- a/src/traced/probes/ftrace/ftrace_data_source.h
+++ b/src/traced/probes/ftrace/ftrace_data_source.h
@@ -67,7 +67,7 @@
 
   // Flushes the ftrace buffers into the userspace trace buffers and writes
   // also ftrace stats.
-  void Flush() override;
+  void Flush(FlushRequestID, std::function<void()> callback) override;
 
   FtraceConfigId config_id() const { return config_id_; }
   const FtraceConfig& config() const { return config_; }
diff --git a/src/traced/probes/probes_data_source.h b/src/traced/probes/probes_data_source.h
index 13d47e1..1e0e8cb 100644
--- a/src/traced/probes/probes_data_source.h
+++ b/src/traced/probes/probes_data_source.h
@@ -17,6 +17,8 @@
 #ifndef SRC_TRACED_PROBES_PROBES_DATA_SOURCE_H_
 #define SRC_TRACED_PROBES_PROBES_DATA_SOURCE_H_
 
+#include <functional>
+
 #include "perfetto/tracing/core/basic_types.h"
 
 namespace perfetto {
@@ -29,7 +31,7 @@
   virtual ~ProbesDataSource();
 
   virtual void Start() = 0;
-  virtual void Flush() = 0;
+  virtual void Flush(FlushRequestID, std::function<void()> callback) = 0;
 
   const TracingSessionID tracing_session_id;
   const int type_id;
diff --git a/src/traced/probes/probes_producer.cc b/src/traced/probes/probes_producer.cc
index c309180..b7d666c 100644
--- a/src/traced/probes/probes_producer.cc
+++ b/src/traced/probes/probes_producer.cc
@@ -47,6 +47,10 @@
 
 constexpr uint32_t kInitialConnectionBackoffMs = 100;
 constexpr uint32_t kMaxConnectionBackoffMs = 30 * 1000;
+
+// Should be larger than FtraceController::kFlushTimeoutMs.
+constexpr uint32_t kFlushTimeoutMs = 1000;
+
 constexpr char kFtraceSourceName[] = "linux.ftrace";
 constexpr char kProcessStatsSourceName[] = "linux.process_stats";
 constexpr char kInodeMapSourceName[] = "linux.inode_file_map";
@@ -280,12 +284,64 @@
 void ProbesProducer::Flush(FlushRequestID flush_request_id,
                            const DataSourceInstanceID* data_source_ids,
                            size_t num_data_sources) {
+  PERFETTO_DCHECK(flush_request_id);
+  auto weak_this = weak_factory_.GetWeakPtr();
+
+  // Issue a Flush() to all started data sources.
+  bool flush_queued = false;
   for (size_t i = 0; i < num_data_sources; i++) {
-    auto it = data_sources_.find(data_source_ids[i]);
+    DataSourceInstanceID ds_id = data_source_ids[i];
+    auto it = data_sources_.find(ds_id);
     if (it == data_sources_.end() || !it->second->started)
       continue;
-    it->second->Flush();
+    pending_flushes_.emplace(flush_request_id, ds_id);
+    flush_queued = true;
+    auto flush_callback = [weak_this, flush_request_id, ds_id] {
+      if (weak_this)
+        weak_this->OnDataSourceFlushComplete(flush_request_id, ds_id);
+    };
+    it->second->Flush(flush_request_id, flush_callback);
   }
+
+  // If there is nothing to flush, ack immediately.
+  if (!flush_queued) {
+    endpoint_->NotifyFlushComplete(flush_request_id);
+    return;
+  }
+
+  // Otherwise, post the timeout task.
+  task_runner_->PostDelayedTask(
+      [weak_this, flush_request_id] {
+        if (weak_this)
+          weak_this->OnFlushTimeout(flush_request_id);
+      },
+      kFlushTimeoutMs);
+}
+
+void ProbesProducer::OnDataSourceFlushComplete(FlushRequestID flush_request_id,
+                                               DataSourceInstanceID ds_id) {
+  PERFETTO_DLOG("Flush %" PRIu64 " acked by data source %" PRIu64,
+                flush_request_id, ds_id);
+  auto range = pending_flushes_.equal_range(flush_request_id);
+  for (auto it = range.first; it != range.second; it++) {
+    if (it->second == ds_id) {
+      pending_flushes_.erase(it);
+      break;
+    }
+  }
+
+  if (pending_flushes_.count(flush_request_id))
+    return;  // Still waiting for other data sources to ack.
+
+  PERFETTO_DLOG("All data sources acked to flush %" PRIu64, flush_request_id);
+  endpoint_->NotifyFlushComplete(flush_request_id);
+}
+
+void ProbesProducer::OnFlushTimeout(FlushRequestID flush_request_id) {
+  if (pending_flushes_.count(flush_request_id) == 0)
+    return;  // All acked.
+  PERFETTO_ELOG("Flush(%" PRIu64 ") timed out", flush_request_id);
+  pending_flushes_.erase(flush_request_id);
   endpoint_->NotifyFlushComplete(flush_request_id);
 }
 
diff --git a/src/traced/probes/probes_producer.h b/src/traced/probes/probes_producer.h
index 14737d6..44ac19e 100644
--- a/src/traced/probes/probes_producer.h
+++ b/src/traced/probes/probes_producer.h
@@ -95,6 +95,8 @@
   void Restart();
   void ResetConnectionBackoff();
   void IncreaseConnectionBackoff();
+  void OnDataSourceFlushComplete(FlushRequestID, DataSourceInstanceID);
+  void OnFlushTimeout(FlushRequestID);
 
   State state_ = kNotStarted;
   base::TaskRunner* task_runner_ = nullptr;
@@ -112,6 +114,9 @@
   std::unordered_multimap<TracingSessionID, ProbesDataSource*>
       session_data_sources_;
 
+  std::unordered_multimap<FlushRequestID, DataSourceInstanceID>
+      pending_flushes_;
+
   std::unordered_map<DataSourceInstanceID, base::Watchdog::Timer> watchdogs_;
   LRUInodeCache cache_{kLRUInodeCacheSize};
   std::map<BlockDeviceID, std::unordered_map<Inode, InodeMapValue>>
diff --git a/src/traced/probes/ps/process_stats_data_source.cc b/src/traced/probes/ps/process_stats_data_source.cc
index ddff880..858f25f 100644
--- a/src/traced/probes/ps/process_stats_data_source.cc
+++ b/src/traced/probes/ps/process_stats_data_source.cc
@@ -161,11 +161,12 @@
   FinalizeCurPacket();
 }
 
-void ProcessStatsDataSource::Flush() {
+void ProcessStatsDataSource::Flush(FlushRequestID,
+                                   std::function<void()> callback) {
   // We shouldn't get this in the middle of WriteAllProcesses() or OnPids().
   PERFETTO_DCHECK(!cur_ps_tree_);
   PERFETTO_DCHECK(!cur_ps_stats_);
-  writer_->Flush();
+  writer_->Flush(callback);
 }
 
 void ProcessStatsDataSource::WriteProcessOrThread(int32_t pid) {
diff --git a/src/traced/probes/ps/process_stats_data_source.h b/src/traced/probes/ps/process_stats_data_source.h
index 65b6e2e..f5329cf 100644
--- a/src/traced/probes/ps/process_stats_data_source.h
+++ b/src/traced/probes/ps/process_stats_data_source.h
@@ -57,7 +57,7 @@
 
   // ProbesDataSource implementation.
   void Start() override;
-  void Flush() override;
+  void Flush(FlushRequestID, std::function<void()> callback) override;
 
   bool on_demand_dumps_enabled() const { return enable_on_demand_dumps_; }
 
diff --git a/src/traced/probes/ps/process_stats_data_source_unittest.cc b/src/traced/probes/ps/process_stats_data_source_unittest.cc
index 4587cda..b985bcc 100644
--- a/src/traced/probes/ps/process_stats_data_source_unittest.cc
+++ b/src/traced/probes/ps/process_stats_data_source_unittest.cc
@@ -169,7 +169,7 @@
 
   data_source->Start();
   task_runner_.RunUntilCheckpoint("all_done");
-  data_source->Flush();
+  data_source->Flush(1 /* FlushRequestId */, []() {});
 
   // |packet| will contain the merge of all kNumIter packets written.
   std::unique_ptr<protos::TracePacket> packet = writer_raw_->ParseProto();
diff --git a/src/traced/probes/sys_stats/sys_stats_data_source.cc b/src/traced/probes/sys_stats/sys_stats_data_source.cc
index 61d869d..befe396 100644
--- a/src/traced/probes/sys_stats/sys_stats_data_source.cc
+++ b/src/traced/probes/sys_stats/sys_stats_data_source.cc
@@ -297,8 +297,8 @@
   return weak_factory_.GetWeakPtr();
 }
 
-void SysStatsDataSource::Flush() {
-  writer_->Flush();
+void SysStatsDataSource::Flush(FlushRequestID, std::function<void()> callback) {
+  writer_->Flush(callback);
 }
 
 size_t SysStatsDataSource::ReadFile(base::ScopedFile* fd, const char* path) {
diff --git a/src/traced/probes/sys_stats/sys_stats_data_source.h b/src/traced/probes/sys_stats/sys_stats_data_source.h
index bdb5e10..0fa085c 100644
--- a/src/traced/probes/sys_stats/sys_stats_data_source.h
+++ b/src/traced/probes/sys_stats/sys_stats_data_source.h
@@ -56,7 +56,7 @@
 
   // ProbesDataSource implementation.
   void Start() override;
-  void Flush() override;
+  void Flush(FlushRequestID, std::function<void()> callback) override;
 
   base::WeakPtr<SysStatsDataSource> GetWeakPtr() const;
 
diff --git a/src/tracing/core/trace_writer_impl.cc b/src/tracing/core/trace_writer_impl.cc
index bfdf1f4..0d0e3e2 100644
--- a/src/tracing/core/trace_writer_impl.cc
+++ b/src/tracing/core/trace_writer_impl.cc
@@ -68,10 +68,12 @@
   if (cur_chunk_.is_valid()) {
     shmem_arbiter_->ReturnCompletedChunk(std::move(cur_chunk_), target_buffer_,
                                          &patch_list_);
-    shmem_arbiter_->FlushPendingCommitDataRequests(callback);
   } else {
     PERFETTO_DCHECK(patch_list_.empty());
   }
+  // Always issue the Flush request, even if there is nothing to flush, just
+  // for the sake of getting the callback posted back.
+  shmem_arbiter_->FlushPendingCommitDataRequests(callback);
   protobuf_stream_writer_.Reset({nullptr, nullptr});
 }