traced_probes: Use libstatspull to collect statsd atoms

See go/ww-atom-subscriber-api

adb push test/configs/statsd.cfg /data/misc/perfetto-configs/statsd.cfg
adb shell perfetto --txt -c /data/misc/perfetto-configs/statsd.cfg -o /data/misc/perfetto-traces/statsd.pb
and adb pull /data/misc/perfetto-traces/statsd.pb statsd.pb
out/linux_clang_debug/traceconv text statsd.pb

Test: See above
Test: m
Bug: 268661096
Bug: 247858731
Bug: 268822860
Bug: 272619721
Change-Id: I0b649018f449187d4ed8a95036c31e9890f03e75
diff --git a/Android.bp b/Android.bp
index 439fd7b..336e0e8 100644
--- a/Android.bp
+++ b/Android.bp
@@ -8089,6 +8089,7 @@
     name: "perfetto_protos_third_party_statsd_config_zero_gen",
     srcs: [
         "protos/third_party/statsd/shell_config.proto",
+        "protos/third_party/statsd/shell_data.proto",
     ],
     tools: [
         "aprotoc",
@@ -8097,6 +8098,7 @@
     cmd: "mkdir -p $(genDir)/external/perfetto/ && $(location aprotoc) --proto_path=external/perfetto --plugin=protoc-gen-plugin=$(location protozero_plugin) --plugin_out=wrapper_namespace=pbzero:$(genDir)/external/perfetto/ $(in)",
     out: [
         "external/perfetto/protos/third_party/statsd/shell_config.pbzero.cc",
+        "external/perfetto/protos/third_party/statsd/shell_data.pbzero.cc",
     ],
 }
 
@@ -8105,6 +8107,7 @@
     name: "perfetto_protos_third_party_statsd_config_zero_gen_headers",
     srcs: [
         "protos/third_party/statsd/shell_config.proto",
+        "protos/third_party/statsd/shell_data.proto",
     ],
     tools: [
         "aprotoc",
@@ -8113,6 +8116,7 @@
     cmd: "mkdir -p $(genDir)/external/perfetto/ && $(location aprotoc) --proto_path=external/perfetto --plugin=protoc-gen-plugin=$(location protozero_plugin) --plugin_out=wrapper_namespace=pbzero:$(genDir)/external/perfetto/ $(in)",
     out: [
         "external/perfetto/protos/third_party/statsd/shell_config.pbzero.h",
+        "external/perfetto/protos/third_party/statsd/shell_data.pbzero.h",
     ],
     export_include_dirs: [
         ".",
@@ -10810,6 +10814,7 @@
     name: "perfetto_src_traced_probes_statsd_client_statsd_client",
     srcs: [
         "src/traced/probes/statsd_client/common.cc",
+        "src/traced/probes/statsd_client/statsd_binder_data_source.cc",
         "src/traced/probes/statsd_client/statsd_exec_data_source.cc",
     ],
 }
diff --git a/BUILD b/BUILD
index d2df1ad..692c44d 100644
--- a/BUILD
+++ b/BUILD
@@ -707,6 +707,7 @@
         "src/android_internal/health_hal.h",
         "src/android_internal/incident_service.h",
         "src/android_internal/power_stats.h",
+        "src/android_internal/statsd.h",
         "src/android_internal/statsd_logging.h",
         "src/android_internal/tracing_service_proxy.h",
     ],
@@ -2461,6 +2462,8 @@
     srcs = [
         "src/traced/probes/statsd_client/common.cc",
         "src/traced/probes/statsd_client/common.h",
+        "src/traced/probes/statsd_client/statsd_binder_data_source.cc",
+        "src/traced/probes/statsd_client/statsd_binder_data_source.h",
         "src/traced/probes/statsd_client/statsd_exec_data_source.cc",
         "src/traced/probes/statsd_client/statsd_exec_data_source.h",
     ],
@@ -4368,6 +4371,7 @@
     name = "protos_third_party_statsd_config_protos",
     srcs = [
         "protos/third_party/statsd/shell_config.proto",
+        "protos/third_party/statsd/shell_data.proto",
     ],
     visibility = [
         PERFETTO_CONFIG.proto_library_visibility,
diff --git a/protos/third_party/statsd/BUILD.gn b/protos/third_party/statsd/BUILD.gn
index 5906f06..5558d7b 100644
--- a/protos/third_party/statsd/BUILD.gn
+++ b/protos/third_party/statsd/BUILD.gn
@@ -19,5 +19,8 @@
     "zero",
     "source_set",
   ]
-  sources = [ "shell_config.proto" ]
+  sources = [
+    "shell_config.proto",
+    "shell_data.proto",
+  ]
 }
diff --git a/protos/third_party/statsd/shell_data.proto b/protos/third_party/statsd/shell_data.proto
new file mode 100644
index 0000000..e4c31bd
--- /dev/null
+++ b/protos/third_party/statsd/shell_data.proto
@@ -0,0 +1,27 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto2";
+
+package perfetto.proto;
+
+// This is a manual import of ShellData:
+// https://cs.android.com/android/platform/superproject/+/master:packages/modules/StatsD/statsd/src/shell/shell_data.proto;l=27;drc=d2e51ecdf08753688fb889b657dcba60adb994f3
+
+message ShellData {
+  repeated bytes atom = 1;
+  repeated int64 timestamp_nanos = 2 [packed = true];
+}
diff --git a/src/android_internal/BUILD.gn b/src/android_internal/BUILD.gn
index 5843748..8435eab 100644
--- a/src/android_internal/BUILD.gn
+++ b/src/android_internal/BUILD.gn
@@ -53,6 +53,7 @@
       "services",
       "tracingproxy",
       "utils",
+      "libstatspull",
     ]
 
     # This target should never depend on any other perfetto target to avoid ODR
@@ -75,6 +76,7 @@
     "health_hal.h",
     "incident_service.h",
     "power_stats.h",
+    "statsd.h",
     "statsd_logging.h",
     "tracing_service_proxy.h",
   ]
diff --git a/src/android_internal/statsd.h b/src/android_internal/statsd.h
new file mode 100644
index 0000000..8afa9f0
--- /dev/null
+++ b/src/android_internal/statsd.h
@@ -0,0 +1,62 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_ANDROID_INTERNAL_STATSD_H_
+#define SRC_ANDROID_INTERNAL_STATSD_H_
+
+#include <stdint.h>
+
+// This header declares proxy functions defined in
+// libperfetto_android_internal.so that access internal android functions (e.g.
+// hwbinder).
+// Do not add any include to either perfetto headers or android headers. See
+// README.md for more.
+
+namespace perfetto {
+namespace android_internal {
+
+extern "C" {
+
+// These functions are not thread safe unless specified otherwise.
+
+const uint32_t kAtomCallbackReasonStatsdInitiated = 1;
+const uint32_t kAtomCallbackReasonFlushRequested = 2;
+const uint32_t kAtomCallbackReasonSubscriptionEnded = 3;
+
+typedef void (*AtomCallback)(int32_t subscription_id,
+                             uint32_t reason,
+                             uint8_t* payload,
+                             size_t num_bytes,
+                             void* cookie);
+
+int32_t __attribute__((visibility("default")))
+AddAtomSubscription(const uint8_t* subscription_config,
+                    size_t num_bytes,
+                    AtomCallback callback,
+                    void* cookie);
+
+void __attribute__((visibility("default")))
+RemoveAtomSubscription(int32_t subscription_id);
+
+void __attribute__((visibility("default")))
+FlushAtomSubscription(int32_t subscription_id);
+
+}  // extern "C"
+
+}  // namespace android_internal
+}  // namespace perfetto
+
+#endif  // SRC_ANDROID_INTERNAL_STATSD_H_
diff --git a/src/traced/probes/probes_producer.cc b/src/traced/probes/probes_producer.cc
index 45330a5..d80d021 100644
--- a/src/traced/probes/probes_producer.cc
+++ b/src/traced/probes/probes_producer.cc
@@ -48,6 +48,7 @@
 #include "src/traced/probes/power/linux_power_sysfs_data_source.h"
 #include "src/traced/probes/probes_data_source.h"
 #include "src/traced/probes/ps/process_stats_data_source.h"
+#include "src/traced/probes/statsd_client/statsd_binder_data_source.h"
 #include "src/traced/probes/statsd_client/statsd_exec_data_source.h"
 #include "src/traced/probes/sys_stats/sys_stats_data_source.h"
 #include "src/traced/probes/system_info/system_info_data_source.h"
@@ -189,6 +190,17 @@
 
 template <>
 std::unique_ptr<ProbesDataSource>
+ProbesProducer::CreateDSInstance<StatsdBinderDataSource>(
+    TracingSessionID session_id,
+    const DataSourceConfig& config) {
+  auto buffer_id = static_cast<BufferID>(config.target_buffer());
+  return std::unique_ptr<StatsdBinderDataSource>(new StatsdBinderDataSource(
+      task_runner_, session_id, endpoint_->CreateTraceWriter(buffer_id),
+      config));
+}
+
+template <>
+std::unique_ptr<ProbesDataSource>
 ProbesProducer::CreateDSInstance<AndroidPowerDataSource>(
     TracingSessionID session_id,
     const DataSourceConfig& config) {
@@ -325,7 +337,11 @@
     Ds<MetatraceDataSource>(),
     Ds<PackagesListDataSource>(),
     Ds<ProcessStatsDataSource>(),
+#if PERFETTO_BUILDFLAG(PERFETTO_ANDROID_BUILD)
+    Ds<StatsdBinderDataSource>(),
+#else
     Ds<StatsdExecDataSource>(),
+#endif
     Ds<SysStatsDataSource>(),
     Ds<SystemInfoDataSource>(),
 };
diff --git a/src/traced/probes/statsd_client/BUILD.gn b/src/traced/probes/statsd_client/BUILD.gn
index 4fa7f53..f36c993 100644
--- a/src/traced/probes/statsd_client/BUILD.gn
+++ b/src/traced/probes/statsd_client/BUILD.gn
@@ -26,6 +26,8 @@
     "../../../../protos/perfetto/trace/statsd:cpp",
     "../../../../protos/perfetto/trace/statsd:zero",
     "../../../../protos/third_party/statsd:config_zero",
+    "../../../android_internal:headers",
+    "../../../android_internal:lazy_library_loader",
     "../../../base",
     "../../../protozero:proto_ring_buffer",
     "../common",
@@ -33,6 +35,8 @@
   sources = [
     "common.cc",
     "common.h",
+    "statsd_binder_data_source.cc",
+    "statsd_binder_data_source.h",
     "statsd_exec_data_source.cc",
     "statsd_exec_data_source.h",
   ]
diff --git a/src/traced/probes/statsd_client/statsd_binder_data_source.cc b/src/traced/probes/statsd_client/statsd_binder_data_source.cc
new file mode 100644
index 0000000..b05451b
--- /dev/null
+++ b/src/traced/probes/statsd_client/statsd_binder_data_source.cc
@@ -0,0 +1,312 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "statsd_binder_data_source.h"
+
+#include <unistd.h>
+
+#include <map>
+#include <mutex>
+
+#include "perfetto/base/time.h"
+#include "perfetto/ext/base/no_destructor.h"
+#include "perfetto/ext/base/optional.h"
+#include "perfetto/ext/base/string_utils.h"
+#include "perfetto/protozero/scattered_heap_buffer.h"
+#include "perfetto/tracing/core/data_source_config.h"
+#include "src/android_internal/lazy_library_loader.h"
+#include "src/android_internal/statsd.h"
+#include "src/traced/probes/statsd_client/common.h"
+
+#include "protos/perfetto/config/statsd/statsd_tracing_config.pbzero.h"
+#include "protos/perfetto/trace/statsd/statsd_atom.pbzero.h"
+#include "protos/perfetto/trace/trace_packet.pbzero.h"
+#include "protos/third_party/statsd/shell_config.pbzero.h"
+#include "protos/third_party/statsd/shell_data.pbzero.h"
+
+using ::perfetto::protos::pbzero::StatsdPullAtomConfig;
+using ::perfetto::protos::pbzero::StatsdShellSubscription;
+using ::perfetto::protos::pbzero::StatsdTracingConfig;
+
+using ShellDataDecoder = ::perfetto::proto::pbzero::ShellData_Decoder;
+
+namespace perfetto {
+namespace {
+
+int32_t AddAtomSubscription(const uint8_t* subscription_config,
+                            size_t num_bytes,
+                            android_internal::AtomCallback callback,
+                            void* cookie) {
+  PERFETTO_LAZY_LOAD(android_internal::AddAtomSubscription, fn);
+  if (fn) {
+    return fn(subscription_config, num_bytes, callback, cookie);
+  }
+  return -1;
+}
+
+bool RemoveAtomSubscription(int32_t subscription_id) {
+  PERFETTO_LAZY_LOAD(android_internal::RemoveAtomSubscription, fn);
+  if (fn) {
+    fn(subscription_id);
+    return true;
+  }
+  return false;
+}
+
+bool FlushAtomSubscription(int32_t subscription_id) {
+  PERFETTO_LAZY_LOAD(android_internal::FlushAtomSubscription, fn);
+  if (fn) {
+    fn(subscription_id);
+    return true;
+  }
+  return false;
+}
+
+// This is a singleton for mapping Statsd subscriptions to their data source.
+// It is needed to deal with all the threading weirdness binder introduces. The
+// AtomCallback from AddAtomSubscription can happen on any of a pool of binder
+// threads while StatsdBinderDatasource runs on the single main thread.
+// This means that StatsdBinderDatasource could be destroyed while a
+// AtomCallback is in progress. To guard against this all the mapping
+// to/from subscription_id/StatsdBinderDatasource happens under the lock
+// of SubscriptionTracker.
+class SubscriptionTracker {
+ public:
+  struct Entry {
+    base::TaskRunner* task_runner;
+    base::WeakPtr<StatsdBinderDataSource> data_source;
+  };
+
+  static SubscriptionTracker* Get();
+  void OnData(int32_t subscription_id,
+              uint32_t reason,
+              uint8_t* data,
+              size_t sz);
+  int32_t Register(base::TaskRunner* task_runner,
+                   base::WeakPtr<StatsdBinderDataSource> data_source,
+                   const std::string& config);
+  void Unregister(int32_t subscription_id);
+
+ private:
+  friend base::NoDestructor<SubscriptionTracker>;
+
+  SubscriptionTracker();
+  virtual ~SubscriptionTracker();
+  SubscriptionTracker(const SubscriptionTracker&) = delete;
+  SubscriptionTracker& operator=(const SubscriptionTracker&) = delete;
+
+  // lock_ guards access to subscriptions_
+  std::mutex lock_;
+  std::map<int32_t, Entry> subscriptions_;
+};
+
+// static
+SubscriptionTracker* SubscriptionTracker::Get() {
+  static base::NoDestructor<SubscriptionTracker> instance;
+  return &(instance.ref());
+}
+
+SubscriptionTracker::SubscriptionTracker() {}
+SubscriptionTracker::~SubscriptionTracker() = default;
+
+void SubscriptionTracker::OnData(int32_t subscription_id,
+                                 uint32_t reason,
+                                 uint8_t* data,
+                                 size_t sz) {
+  // Allocate and copy before we take the lock:
+  std::shared_ptr<uint8_t> copy(new uint8_t[sz],
+                                std::default_delete<uint8_t[]>());
+  memcpy(copy.get(), data, sz);
+
+  std::lock_guard<std::mutex> scoped_lock(lock_);
+
+  auto it = subscriptions_.find(subscription_id);
+  if (it == subscriptions_.end()) {
+    // This is very paranoid and should not be required (since
+    // ~StatsdBinderDataSource will call this) however it would be awful to get
+    // stuck in a situation where statsd is sending us data forever and we're
+    // immediately dropping it on the floor - so if nothing wants the data we
+    // end the subscription. In the case the subscription is already gone this
+    // is a noop in libstatspull.
+    RemoveAtomSubscription(subscription_id);
+    return;
+  }
+
+  base::TaskRunner* task_runner = it->second.task_runner;
+  base::WeakPtr<StatsdBinderDataSource> data_source = it->second.data_source;
+
+  task_runner->PostTask([data_source, reason, copy = std::move(copy), sz]() {
+    if (data_source) {
+      data_source->OnData(reason, copy.get(), sz);
+    }
+  });
+}
+
+int32_t SubscriptionTracker::Register(
+    base::TaskRunner* task_runner,
+    base::WeakPtr<StatsdBinderDataSource> data_source,
+    const std::string& config) {
+  std::lock_guard<std::mutex> scoped_lock(lock_);
+
+  // We do this here (as opposed to in StatsdBinderDataSource) so that
+  // we can hold the lock while we do and avoid the tiny race window between
+  // getting the subscription id and putting that id in the subscriptions_ map
+  auto* begin = reinterpret_cast<const uint8_t*>(config.data());
+  size_t size = config.size();
+  int32_t id = AddAtomSubscription(
+      begin, size,
+      [](int32_t subscription_id, uint32_t reason, uint8_t* payload,
+         size_t num_bytes, void*) {
+        SubscriptionTracker::Get()->OnData(subscription_id, reason, payload,
+                                           num_bytes);
+      },
+      nullptr);
+
+  if (id >= 0) {
+    subscriptions_[id] = Entry{task_runner, data_source};
+  }
+
+  return id;
+}
+
+void SubscriptionTracker::Unregister(int32_t subscription_id) {
+  std::lock_guard<std::mutex> scoped_lock(lock_);
+
+  auto it = subscriptions_.find(subscription_id);
+  if (it != subscriptions_.end()) {
+    subscriptions_.erase(it);
+  }
+
+  // Unregister is called both when the data source is finishing
+  // (~StatsdBinderDataSource) but also when we observe a
+  // kAtomCallbackReasonSubscriptionEnded message. In the latter
+  // case this call is unnecessary (the statsd subscription is already
+  // gone) but it doesn't hurt.
+  RemoveAtomSubscription(subscription_id);
+}
+
+}  // namespace
+
+// static
+const ProbesDataSource::Descriptor StatsdBinderDataSource::descriptor = {
+    /*name*/ "android.statsd",
+    /*flags*/ Descriptor::kFlagsNone,
+    /*fill_descriptor_func*/ nullptr,
+};
+
+StatsdBinderDataSource::StatsdBinderDataSource(
+    base::TaskRunner* task_runner,
+    TracingSessionID session_id,
+    std::unique_ptr<TraceWriter> writer,
+    const DataSourceConfig& ds_config)
+    : ProbesDataSource(session_id, &descriptor),
+      task_runner_(task_runner),
+      writer_(std::move(writer)),
+      shell_subscription_(CreateStatsdShellConfig(ds_config)),
+      weak_factory_(this) {}
+
+StatsdBinderDataSource::~StatsdBinderDataSource() {
+  if (subscription_id_ >= 0) {
+    SubscriptionTracker::Get()->Unregister(subscription_id_);
+    subscription_id_ = -1;
+  }
+}
+
+void StatsdBinderDataSource::Start() {
+  // Don't bother actually connecting to statsd if no pull/push atoms
+  // were configured:
+  if (shell_subscription_.empty()) {
+    PERFETTO_LOG("Empty statsd config. Not connecting to statsd.");
+    return;
+  }
+
+  auto weak_this = weak_factory_.GetWeakPtr();
+  subscription_id_ = SubscriptionTracker::Get()->Register(
+      task_runner_, weak_this, shell_subscription_);
+}
+
+void StatsdBinderDataSource::OnData(uint32_t reason,
+                                    const uint8_t* data,
+                                    size_t sz) {
+  ShellDataDecoder message(data, sz);
+
+  bool parse_error = false;
+  auto timestamps_it = message.timestamp_nanos(&parse_error);
+  std::vector<int64_t> timestamps;
+  if (!parse_error) {
+    for (; timestamps_it; ++timestamps_it) {
+      timestamps.push_back(*timestamps_it);
+    }
+
+    TraceWriter::TracePacketHandle packet;
+    size_t i = 0;
+    for (auto it = message.atom(); it; ++it) {
+      packet = writer_->NewTracePacket();
+      if (i < timestamps.size()) {
+        packet->set_timestamp(static_cast<uint64_t>(timestamps[i++]));
+      } else {
+        packet->set_timestamp(
+            static_cast<uint64_t>(base::GetBootTimeNs().count()));
+      }
+      auto* atom = packet->set_statsd_atom();
+      auto* nested = atom->add_nested();
+      nested->AppendRawProtoBytes(it->data(), it->size());
+      packet->Finalize();
+    }
+  }
+
+  // If we have the pending flush in progress resolve that:
+  if (reason == android_internal::kAtomCallbackReasonFlushRequested &&
+      pending_flush_callback_) {
+    writer_->Flush(pending_flush_callback_);
+    pending_flush_callback_ = nullptr;
+  }
+
+  if (reason == android_internal::kAtomCallbackReasonSubscriptionEnded) {
+    // This is the last packet so unregister self. It's not required to do this
+    // since we clean up in the destructor but it doesn't hurt.
+    SubscriptionTracker::Get()->Unregister(subscription_id_);
+    subscription_id_ = -1;
+  }
+}
+
+void StatsdBinderDataSource::Flush(FlushRequestID,
+                                   std::function<void()> callback) {
+  if (subscription_id_ < 0) {
+    writer_->Flush(callback);
+  } else {
+    // We don't want to queue up pending flushes to avoid a situation where
+    // we end up will giant queue of unresolved flushes if statsd never replies.
+    // To avoid this if there is already a flush in flight finish that one now:
+    if (pending_flush_callback_) {
+      writer_->Flush(pending_flush_callback_);
+    }
+
+    // Remember the callback for later.
+    pending_flush_callback_ = callback;
+
+    // Start the flush
+    if (!FlushAtomSubscription(subscription_id_)) {
+      // If it fails immediately we're done:
+      writer_->Flush(pending_flush_callback_);
+      pending_flush_callback_ = nullptr;
+    }
+  }
+}
+
+void StatsdBinderDataSource::ClearIncrementalState() {}
+
+}  // namespace perfetto
diff --git a/src/traced/probes/statsd_client/statsd_binder_data_source.h b/src/traced/probes/statsd_client/statsd_binder_data_source.h
new file mode 100644
index 0000000..6268eb7
--- /dev/null
+++ b/src/traced/probes/statsd_client/statsd_binder_data_source.h
@@ -0,0 +1,65 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACED_PROBES_STATSD_CLIENT_STATSD_BINDER_DATA_SOURCE_H_
+#define SRC_TRACED_PROBES_STATSD_CLIENT_STATSD_BINDER_DATA_SOURCE_H_
+
+#include <memory>
+
+#include "perfetto/base/task_runner.h"
+#include "perfetto/ext/base/utils.h"
+#include "perfetto/ext/base/weak_ptr.h"
+#include "perfetto/ext/tracing/core/basic_types.h"
+#include "perfetto/ext/tracing/core/trace_writer.h"
+#include "perfetto/tracing/core/forward_decls.h"
+#include "src/protozero/proto_ring_buffer.h"
+#include "src/traced/probes/probes_data_source.h"
+
+namespace perfetto {
+
+class StatsdBinderDataSource : public ProbesDataSource {
+ public:
+  static const ProbesDataSource::Descriptor descriptor;
+
+  StatsdBinderDataSource(base::TaskRunner*,
+                         TracingSessionID,
+                         std::unique_ptr<TraceWriter> writer,
+                         const DataSourceConfig&);
+  ~StatsdBinderDataSource() override;
+
+  // ProbesDataSource implementation.
+  void Start() override;
+  void Flush(FlushRequestID, std::function<void()> callback) override;
+  void ClearIncrementalState() override;
+
+  void OnData(uint32_t reason, const uint8_t* data, size_t sz);
+
+ private:
+  StatsdBinderDataSource(const StatsdBinderDataSource&) = delete;
+  StatsdBinderDataSource& operator=(const StatsdBinderDataSource&) = delete;
+
+  base::TaskRunner* const task_runner_;
+  std::unique_ptr<TraceWriter> writer_;
+  std::string shell_subscription_;
+  int32_t subscription_id_ = -1;
+  std::function<void()> pending_flush_callback_;
+
+  base::WeakPtrFactory<StatsdBinderDataSource> weak_factory_;  // Keep last.
+};
+
+}  // namespace perfetto
+
+#endif  // SRC_TRACED_PROBES_STATSD_CLIENT_STATSD_BINDER_DATA_SOURCE_H_
diff --git a/tools/gen_android_bp b/tools/gen_android_bp
index 32e277d..d0eb99f 100755
--- a/tools/gen_android_bp
+++ b/tools/gen_android_bp
@@ -147,6 +147,7 @@
     'statssocket',
     'tracingproxy',
     'utils',
+    'statspull',
 ]
 
 # Static libraries which are directly translated to Android system equivalents.