Reland: "traced_probes: Add statsd atoms datasource"

This reverts commit 54664088dca1cc0774f78b646d640689aed74a08.

Change-Id: I6dda39ebbaf18d84370bd9f3d1400e9f0b153126
diff --git a/Android.bp b/Android.bp
index cedbd47..c83aef7 100644
--- a/Android.bp
+++ b/Android.bp
@@ -525,12 +525,14 @@
         ":perfetto_protos_perfetto_trace_power_zero_gen",
         ":perfetto_protos_perfetto_trace_profiling_zero_gen",
         ":perfetto_protos_perfetto_trace_ps_zero_gen",
+        ":perfetto_protos_perfetto_trace_statsd_cpp_gen",
         ":perfetto_protos_perfetto_trace_statsd_zero_gen",
         ":perfetto_protos_perfetto_trace_sys_stats_zero_gen",
         ":perfetto_protos_perfetto_trace_system_info_zero_gen",
         ":perfetto_protos_perfetto_trace_track_event_cpp_gen",
         ":perfetto_protos_perfetto_trace_track_event_zero_gen",
         ":perfetto_protos_perfetto_trace_translation_zero_gen",
+        ":perfetto_protos_third_party_statsd_config_zero_gen",
         ":perfetto_src_android_internal_headers",
         ":perfetto_src_android_internal_lazy_library_loader",
         ":perfetto_src_android_stats_android_stats",
@@ -545,6 +547,7 @@
         ":perfetto_src_protozero_filtering_bytecode_common",
         ":perfetto_src_protozero_filtering_bytecode_parser",
         ":perfetto_src_protozero_filtering_message_filter",
+        ":perfetto_src_protozero_proto_ring_buffer",
         ":perfetto_src_protozero_protozero",
         ":perfetto_src_traced_probes_android_game_intervention_list_android_game_intervention_list",
         ":perfetto_src_traced_probes_android_log_android_log",
@@ -563,6 +566,7 @@
         ":perfetto_src_traced_probes_probes",
         ":perfetto_src_traced_probes_probes_src",
         ":perfetto_src_traced_probes_ps_ps",
+        ":perfetto_src_traced_probes_statsd_client_statsd_client",
         ":perfetto_src_traced_probes_sys_stats_sys_stats",
         ":perfetto_src_traced_probes_system_info_system_info",
         ":perfetto_src_traced_service_service",
@@ -621,12 +625,14 @@
         "perfetto_protos_perfetto_trace_power_zero_gen_headers",
         "perfetto_protos_perfetto_trace_profiling_zero_gen_headers",
         "perfetto_protos_perfetto_trace_ps_zero_gen_headers",
+        "perfetto_protos_perfetto_trace_statsd_cpp_gen_headers",
         "perfetto_protos_perfetto_trace_statsd_zero_gen_headers",
         "perfetto_protos_perfetto_trace_sys_stats_zero_gen_headers",
         "perfetto_protos_perfetto_trace_system_info_zero_gen_headers",
         "perfetto_protos_perfetto_trace_track_event_cpp_gen_headers",
         "perfetto_protos_perfetto_trace_track_event_zero_gen_headers",
         "perfetto_protos_perfetto_trace_translation_zero_gen_headers",
+        "perfetto_protos_third_party_statsd_config_zero_gen_headers",
         "perfetto_src_base_version_gen_h",
     ],
     defaults: [
@@ -1121,6 +1127,7 @@
         ":perfetto_protos_perfetto_trace_track_event_zero_gen",
         ":perfetto_protos_perfetto_trace_translation_cpp_gen",
         ":perfetto_protos_perfetto_trace_translation_zero_gen",
+        ":perfetto_protos_third_party_statsd_config_zero_gen",
         ":perfetto_src_android_internal_headers",
         ":perfetto_src_android_internal_lazy_library_loader",
         ":perfetto_src_android_stats_android_stats",
@@ -1138,6 +1145,7 @@
         ":perfetto_src_protozero_filtering_bytecode_generator",
         ":perfetto_src_protozero_filtering_bytecode_parser",
         ":perfetto_src_protozero_filtering_message_filter",
+        ":perfetto_src_protozero_proto_ring_buffer",
         ":perfetto_src_protozero_protozero",
         ":perfetto_src_traced_probes_android_game_intervention_list_android_game_intervention_list",
         ":perfetto_src_traced_probes_android_log_android_log",
@@ -1155,6 +1163,7 @@
         ":perfetto_src_traced_probes_power_power",
         ":perfetto_src_traced_probes_probes_src",
         ":perfetto_src_traced_probes_ps_ps",
+        ":perfetto_src_traced_probes_statsd_client_statsd_client",
         ":perfetto_src_traced_probes_sys_stats_sys_stats",
         ":perfetto_src_traced_probes_system_info_system_info",
         ":perfetto_src_tracing_common",
@@ -1245,6 +1254,7 @@
         "perfetto_protos_perfetto_trace_track_event_zero_gen_headers",
         "perfetto_protos_perfetto_trace_translation_cpp_gen_headers",
         "perfetto_protos_perfetto_trace_translation_zero_gen_headers",
+        "perfetto_protos_third_party_statsd_config_zero_gen_headers",
         "perfetto_src_base_version_gen_h",
     ],
     export_generated_headers: [
@@ -1311,6 +1321,7 @@
         "perfetto_protos_perfetto_trace_track_event_zero_gen_headers",
         "perfetto_protos_perfetto_trace_translation_cpp_gen_headers",
         "perfetto_protos_perfetto_trace_translation_zero_gen_headers",
+        "perfetto_protos_third_party_statsd_config_zero_gen_headers",
         "perfetto_src_base_version_gen_h",
     ],
     defaults: [
@@ -1399,6 +1410,7 @@
         ":perfetto_protos_perfetto_trace_track_event_zero_gen",
         ":perfetto_protos_perfetto_trace_translation_cpp_gen",
         ":perfetto_protos_perfetto_trace_translation_zero_gen",
+        ":perfetto_protos_third_party_statsd_config_zero_gen",
         ":perfetto_src_android_internal_headers",
         ":perfetto_src_android_internal_lazy_library_loader",
         ":perfetto_src_android_stats_android_stats",
@@ -1415,6 +1427,7 @@
         ":perfetto_src_protozero_filtering_bytecode_common",
         ":perfetto_src_protozero_filtering_bytecode_parser",
         ":perfetto_src_protozero_filtering_message_filter",
+        ":perfetto_src_protozero_proto_ring_buffer",
         ":perfetto_src_protozero_protozero",
         ":perfetto_src_traced_probes_android_game_intervention_list_android_game_intervention_list",
         ":perfetto_src_traced_probes_android_log_android_log",
@@ -1432,6 +1445,7 @@
         ":perfetto_src_traced_probes_power_power",
         ":perfetto_src_traced_probes_probes_src",
         ":perfetto_src_traced_probes_ps_ps",
+        ":perfetto_src_traced_probes_statsd_client_statsd_client",
         ":perfetto_src_traced_probes_sys_stats_sys_stats",
         ":perfetto_src_traced_probes_system_info_system_info",
         ":perfetto_src_tracing_common",
@@ -1508,6 +1522,7 @@
         "perfetto_protos_perfetto_trace_track_event_zero_gen_headers",
         "perfetto_protos_perfetto_trace_translation_cpp_gen_headers",
         "perfetto_protos_perfetto_trace_translation_zero_gen_headers",
+        "perfetto_protos_third_party_statsd_config_zero_gen_headers",
         "perfetto_src_base_version_gen_h",
     ],
     export_generated_headers: [
@@ -1574,6 +1589,7 @@
         "perfetto_protos_perfetto_trace_track_event_zero_gen_headers",
         "perfetto_protos_perfetto_trace_translation_cpp_gen_headers",
         "perfetto_protos_perfetto_trace_translation_zero_gen_headers",
+        "perfetto_protos_third_party_statsd_config_zero_gen_headers",
         "perfetto_src_base_version_gen_h",
     ],
     defaults: [
@@ -1861,6 +1877,7 @@
         ":perfetto_protos_perfetto_trace_translation_cpp_gen",
         ":perfetto_protos_perfetto_trace_translation_lite_gen",
         ":perfetto_protos_perfetto_trace_translation_zero_gen",
+        ":perfetto_protos_third_party_statsd_config_zero_gen",
         ":perfetto_src_android_internal_headers",
         ":perfetto_src_android_internal_lazy_library_loader",
         ":perfetto_src_android_stats_android_stats",
@@ -1892,6 +1909,7 @@
         ":perfetto_src_protozero_filtering_bytecode_generator",
         ":perfetto_src_protozero_filtering_bytecode_parser",
         ":perfetto_src_protozero_filtering_message_filter",
+        ":perfetto_src_protozero_proto_ring_buffer",
         ":perfetto_src_protozero_protozero",
         ":perfetto_src_trace_processor_analysis_analysis",
         ":perfetto_src_trace_processor_containers_containers",
@@ -1938,6 +1956,7 @@
         ":perfetto_src_traced_probes_power_power",
         ":perfetto_src_traced_probes_probes_src",
         ":perfetto_src_traced_probes_ps_ps",
+        ":perfetto_src_traced_probes_statsd_client_statsd_client",
         ":perfetto_src_traced_probes_sys_stats_sys_stats",
         ":perfetto_src_traced_probes_system_info_system_info",
         ":perfetto_src_tracing_client_api_without_backends",
@@ -2077,6 +2096,7 @@
         "perfetto_protos_perfetto_trace_translation_cpp_gen_headers",
         "perfetto_protos_perfetto_trace_translation_lite_gen_headers",
         "perfetto_protos_perfetto_trace_translation_zero_gen_headers",
+        "perfetto_protos_third_party_statsd_config_zero_gen_headers",
         "perfetto_src_base_version_gen_h",
         "perfetto_src_trace_processor_importers_gen_cc_chrome_track_event_descriptor",
         "perfetto_src_trace_processor_importers_gen_cc_config_descriptor",
@@ -7588,6 +7608,42 @@
     ],
 }
 
+// GN: //protos/third_party/statsd:config_zero
+genrule {
+    name: "perfetto_protos_third_party_statsd_config_zero_gen",
+    srcs: [
+        "protos/third_party/statsd/shell_config.proto",
+    ],
+    tools: [
+        "aprotoc",
+        "protozero_plugin",
+    ],
+    cmd: "mkdir -p $(genDir)/external/perfetto/ && $(location aprotoc) --proto_path=external/perfetto --plugin=protoc-gen-plugin=$(location protozero_plugin) --plugin_out=wrapper_namespace=pbzero:$(genDir)/external/perfetto/ $(in)",
+    out: [
+        "external/perfetto/protos/third_party/statsd/shell_config.pbzero.cc",
+    ],
+}
+
+// GN: //protos/third_party/statsd:config_zero
+genrule {
+    name: "perfetto_protos_third_party_statsd_config_zero_gen_headers",
+    srcs: [
+        "protos/third_party/statsd/shell_config.proto",
+    ],
+    tools: [
+        "aprotoc",
+        "protozero_plugin",
+    ],
+    cmd: "mkdir -p $(genDir)/external/perfetto/ && $(location aprotoc) --proto_path=external/perfetto --plugin=protoc-gen-plugin=$(location protozero_plugin) --plugin_out=wrapper_namespace=pbzero:$(genDir)/external/perfetto/ $(in)",
+    out: [
+        "external/perfetto/protos/third_party/statsd/shell_config.pbzero.h",
+    ],
+    export_include_dirs: [
+        ".",
+        "protos",
+    ],
+}
+
 // GN: //src/android_internal:headers
 filegroup {
     name: "perfetto_src_android_internal_headers",
@@ -9755,6 +9811,22 @@
     ],
 }
 
+// GN: //src/traced/probes/statsd_client:statsd_client
+filegroup {
+    name: "perfetto_src_traced_probes_statsd_client_statsd_client",
+    srcs: [
+        "src/traced/probes/statsd_client/statsd_data_source.cc",
+    ],
+}
+
+// GN: //src/traced/probes/statsd_client:unittests
+filegroup {
+    name: "perfetto_src_traced_probes_statsd_client_unittests",
+    srcs: [
+        "src/traced/probes/statsd_client/statsd_data_source_unittest.cc",
+    ],
+}
+
 // GN: //src/traced/probes/sys_stats:sys_stats
 filegroup {
     name: "perfetto_src_traced_probes_sys_stats_sys_stats",
@@ -10467,6 +10539,7 @@
         ":perfetto_protos_perfetto_trace_translation_cpp_gen",
         ":perfetto_protos_perfetto_trace_translation_lite_gen",
         ":perfetto_protos_perfetto_trace_translation_zero_gen",
+        ":perfetto_protos_third_party_statsd_config_zero_gen",
         ":perfetto_src_android_internal_headers",
         ":perfetto_src_android_internal_lazy_library_loader",
         ":perfetto_src_android_stats_android_stats",
@@ -10601,6 +10674,8 @@
         ":perfetto_src_traced_probes_probes_src",
         ":perfetto_src_traced_probes_ps_ps",
         ":perfetto_src_traced_probes_ps_unittests",
+        ":perfetto_src_traced_probes_statsd_client_statsd_client",
+        ":perfetto_src_traced_probes_statsd_client_unittests",
         ":perfetto_src_traced_probes_sys_stats_sys_stats",
         ":perfetto_src_traced_probes_sys_stats_unittests",
         ":perfetto_src_traced_probes_system_info_system_info",
@@ -10741,6 +10816,7 @@
         "perfetto_protos_perfetto_trace_translation_cpp_gen_headers",
         "perfetto_protos_perfetto_trace_translation_lite_gen_headers",
         "perfetto_protos_perfetto_trace_translation_zero_gen_headers",
+        "perfetto_protos_third_party_statsd_config_zero_gen_headers",
         "perfetto_src_base_version_gen_h",
         "perfetto_src_ipc_test_messages_cpp_gen_headers",
         "perfetto_src_ipc_test_messages_ipc_gen_headers",
diff --git a/BUILD b/BUILD
index faee247..45d85cb 100644
--- a/BUILD
+++ b/BUILD
@@ -232,6 +232,7 @@
         ":src_protozero_filtering_bytecode_common",
         ":src_protozero_filtering_bytecode_parser",
         ":src_protozero_filtering_message_filter",
+        ":src_protozero_proto_ring_buffer",
         ":src_traced_probes_android_game_intervention_list_android_game_intervention_list",
         ":src_traced_probes_android_log_android_log",
         ":src_traced_probes_android_system_property_android_system_property",
@@ -249,6 +250,7 @@
         ":src_traced_probes_probes",
         ":src_traced_probes_probes_src",
         ":src_traced_probes_ps_ps",
+        ":src_traced_probes_statsd_client_statsd_client",
         ":src_traced_probes_sys_stats_sys_stats",
         ":src_traced_probes_system_info_system_info",
         ":src_traced_service_service",
@@ -317,12 +319,14 @@
         ":protos_perfetto_trace_power_zero",
         ":protos_perfetto_trace_profiling_zero",
         ":protos_perfetto_trace_ps_zero",
+        ":protos_perfetto_trace_statsd_cpp",
         ":protos_perfetto_trace_statsd_zero",
         ":protos_perfetto_trace_sys_stats_zero",
         ":protos_perfetto_trace_system_info_zero",
         ":protos_perfetto_trace_track_event_cpp",
         ":protos_perfetto_trace_track_event_zero",
         ":protos_perfetto_trace_translation_zero",
+        ":protos_third_party_statsd_config_zero",
         ":protozero",
         ":src_base_base",
         ":src_base_version",
@@ -1936,6 +1940,15 @@
     ],
 )
 
+# GN target: //src/traced/probes/statsd_client:statsd_client
+perfetto_filegroup(
+    name = "src_traced_probes_statsd_client_statsd_client",
+    srcs = [
+        "src/traced/probes/statsd_client/statsd_data_source.cc",
+        "src/traced/probes/statsd_client/statsd_data_source.h",
+    ],
+)
+
 # GN target: //src/traced/probes/sys_stats:sys_stats
 perfetto_filegroup(
     name = "src_traced_probes_sys_stats_sys_stats",
@@ -3565,6 +3578,15 @@
     ],
 )
 
+# GN target: //protos/perfetto/trace/statsd:cpp
+perfetto_cc_protocpp_library(
+    name = "protos_perfetto_trace_statsd_cpp",
+    deps = [
+        ":protos_perfetto_common_cpp",
+        ":protos_perfetto_trace_statsd_protos",
+    ],
+)
+
 # GN target: //protos/perfetto/trace/statsd:source_set
 perfetto_proto_library(
     name = "protos_perfetto_trace_statsd_protos",
@@ -3757,6 +3779,25 @@
     ],
 )
 
+# GN target: //protos/third_party/statsd:config_source_set
+perfetto_proto_library(
+    name = "protos_third_party_statsd_config_protos",
+    srcs = [
+        "protos/third_party/statsd/shell_config.proto",
+    ],
+    visibility = [
+        PERFETTO_CONFIG.proto_library_visibility,
+    ],
+)
+
+# GN target: //protos/third_party/statsd:config_zero
+perfetto_cc_protozero_library(
+    name = "protos_third_party_statsd_config_zero",
+    deps = [
+        ":protos_third_party_statsd_config_protos",
+    ],
+)
+
 # GN target: //src/perfetto_cmd:protos_cpp
 perfetto_cc_protocpp_library(
     name = "src_perfetto_cmd_protos_cpp",
diff --git a/CHANGELOG b/CHANGELOG
index 8407784..e4f9aad 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,6 @@
 Unreleased:
   Tracing service and probes:
-    *
+    * Add android.statsd datasource.
   Trace Processor:
     *
   UI:
diff --git a/src/traced/probes/BUILD.gn b/src/traced/probes/BUILD.gn
index 2d025a9..44ed22c 100644
--- a/src/traced/probes/BUILD.gn
+++ b/src/traced/probes/BUILD.gn
@@ -68,6 +68,7 @@
     "packages_list",
     "power",
     "ps",
+    "statsd_client",
     "sys_stats",
     "system_info",
   ]
@@ -109,6 +110,7 @@
     "packages_list:unittests",
     "power:unittests",
     "ps:unittests",
+    "statsd_client:unittests",
     "sys_stats:unittests",
     "system_info:unittests",
   ]
diff --git a/src/traced/probes/probes_producer.cc b/src/traced/probes/probes_producer.cc
index dfbf533..76469d3 100644
--- a/src/traced/probes/probes_producer.cc
+++ b/src/traced/probes/probes_producer.cc
@@ -48,6 +48,7 @@
 #include "src/traced/probes/power/linux_power_sysfs_data_source.h"
 #include "src/traced/probes/probes_data_source.h"
 #include "src/traced/probes/ps/process_stats_data_source.h"
+#include "src/traced/probes/statsd_client/statsd_data_source.h"
 #include "src/traced/probes/sys_stats/sys_stats_data_source.h"
 #include "src/traced/probes/system_info/system_info_data_source.h"
 
@@ -180,6 +181,17 @@
 
 template <>
 std::unique_ptr<ProbesDataSource>
+ProbesProducer::CreateDSInstance<StatsdDataSource>(
+    TracingSessionID session_id,
+    const DataSourceConfig& config) {
+  auto buffer_id = static_cast<BufferID>(config.target_buffer());
+  return std::unique_ptr<StatsdDataSource>(
+      new StatsdDataSource(task_runner_, session_id,
+                           endpoint_->CreateTraceWriter(buffer_id), config));
+}
+
+template <>
+std::unique_ptr<ProbesDataSource>
 ProbesProducer::CreateDSInstance<AndroidPowerDataSource>(
     TracingSessionID session_id,
     const DataSourceConfig& config) {
@@ -316,6 +328,7 @@
     Ds<MetatraceDataSource>(),
     Ds<PackagesListDataSource>(),
     Ds<ProcessStatsDataSource>(),
+    Ds<StatsdDataSource>(),
     Ds<SysStatsDataSource>(),
     Ds<SystemInfoDataSource>(),
 };
diff --git a/src/traced/probes/statsd_client/BUILD.gn b/src/traced/probes/statsd_client/BUILD.gn
new file mode 100644
index 0000000..bd0f714
--- /dev/null
+++ b/src/traced/probes/statsd_client/BUILD.gn
@@ -0,0 +1,56 @@
+# Copyright (C) 2022 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import("../../../../gn/test.gni")
+
+source_set("statsd_client") {
+  public_deps = [ "../../../tracing/core" ]
+  deps = [
+    "..:data_source",
+    "../../../../gn:default_deps",
+    "../../../../include/perfetto/ext/traced",
+    "../../../../protos/perfetto/config/statsd:cpp",
+    "../../../../protos/perfetto/config/statsd:zero",
+    "../../../../protos/perfetto/trace:zero",
+    "../../../../protos/perfetto/trace/statsd:cpp",
+    "../../../../protos/perfetto/trace/statsd:zero",
+    "../../../../protos/third_party/statsd:config_zero",
+    "../../../base",
+    "../../../protozero:proto_ring_buffer",
+    "../common",
+  ]
+  sources = [
+    "statsd_data_source.cc",
+    "statsd_data_source.h",
+  ]
+}
+
+perfetto_unittest_source_set("unittests") {
+  testonly = true
+  deps = [
+    ":statsd_client",
+    "../../../../gn:default_deps",
+    "../../../../gn:gtest_and_gmock",
+    "../../../../protos/perfetto/config/statsd:cpp",
+    "../../../../protos/perfetto/config/statsd:zero",
+    "../../../../protos/perfetto/trace:zero",
+    "../../../../protos/perfetto/trace/statsd:cpp",
+    "../../../../protos/perfetto/trace/statsd:zero",
+    "../../../../protos/third_party/statsd:config_zero",
+    "../../../../src/base:test_support",
+    "../../../../src/tracing/test:test_support",
+    "../common:test_support",
+  ]
+  sources = [ "statsd_data_source_unittest.cc" ]
+}
diff --git a/src/traced/probes/statsd_client/statsd_data_source.cc b/src/traced/probes/statsd_client/statsd_data_source.cc
new file mode 100644
index 0000000..987db58
--- /dev/null
+++ b/src/traced/probes/statsd_client/statsd_data_source.cc
@@ -0,0 +1,310 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/traced/probes/statsd_client/statsd_data_source.h"
+
+#include <stdlib.h>
+
+#include "perfetto/base/task_runner.h"
+#include "perfetto/base/time.h"
+#include "perfetto/ext/base/string_utils.h"
+#include "perfetto/ext/base/subprocess.h"
+#include "perfetto/protozero/scattered_heap_buffer.h"
+#include "perfetto/tracing/core/data_source_config.h"
+
+#include "protos/perfetto/config/statsd/statsd_tracing_config.pbzero.h"
+#include "protos/perfetto/trace/statsd/statsd_atom.pbzero.h"
+#include "protos/perfetto/trace/trace_packet.pbzero.h"
+#include "protos/third_party/statsd/shell_config.pbzero.h"
+
+using ::perfetto::protos::pbzero::StatsdPullAtomConfig;
+using ::perfetto::protos::pbzero::StatsdShellSubscription;
+using ::perfetto::protos::pbzero::StatsdTracingConfig;
+
+namespace perfetto {
+namespace {
+
+static constexpr const size_t kHeaderSize = sizeof(size_t);
+
+void AddPullAtoms(const StatsdPullAtomConfig::Decoder& cfg,
+                  protozero::RepeatedFieldIterator<int32_t> it,
+                  StatsdShellSubscription* msg) {
+  constexpr int32_t kDefaultPullFreqMs = 5000;
+  int32_t pull_freq_ms = kDefaultPullFreqMs;
+  if (cfg.has_pull_frequency_ms()) {
+    pull_freq_ms = cfg.pull_frequency_ms();
+  }
+
+  for (; it; ++it) {
+    auto* pulled_msg = msg->add_pulled();
+    pulled_msg->set_freq_millis(pull_freq_ms);
+
+    for (auto package = cfg.packages(); package; ++package) {
+      pulled_msg->add_packages(*package);
+    }
+
+    auto* matcher_msg = pulled_msg->set_matcher();
+    matcher_msg->set_atom_id(*it);
+  }
+}
+
+void AddPushAtoms(protozero::RepeatedFieldIterator<int32_t> it,
+                  StatsdShellSubscription* msg) {
+  for (; it; ++it) {
+    auto* matcher_msg = msg->add_pushed();
+    matcher_msg->set_atom_id(*it);
+  }
+}
+
+// Exec "cmd stats data-subscribe" and read/write stdin/stdout. This is
+// the only way to make this work when side loading but for in tree
+// builds this causes to many denials:
+// avc: denied { execute_no_trans } for comm="traced_probes"
+// path="/system/bin/cmd" dev="dm-0" ino=200 scontext=u:r:traced_probes:s0
+// tcontext=u:object_r:system_file:s0 tclass=file permissive=1 avc: denied {
+// call } for comm="cmd" scontext=u:r:traced_probes:s0 tcontext=u:r:statsd:s0
+// tclass=binder permissive=1 avc: denied { use } for comm="cmd"
+// path="pipe:[51149]" dev="pipefs" ino=51149 scontext=u:r:statsd:s0
+// tcontext=u:r:traced_probes:s0 tclass=fd permissive=1 avc: denied { read } for
+// comm="cmd" path="pipe:[51149]" dev="pipefs" ino=51149 scontext=u:r:statsd:s0
+// tcontext=u:r:traced_probes:s0 tclass=fifo_file permissive=1 avc: denied {
+// write } for comm="cmd" path="pipe:[51148]" dev="pipefs" ino=51148
+// scontext=u:r:statsd:s0 tcontext=u:r:traced_probes:s0 tclass=fifo_file
+// permissive=1 avc: denied { transfer } for comm="cmd"
+// scontext=u:r:traced_probes:s0 tcontext=u:r:statsd:s0 tclass=binder
+// permissive=1
+class ExecStatsdBackend : public StatsdBackend {
+ public:
+  ExecStatsdBackend(std::string input, base::ScopedFile output_wr);
+  virtual ~ExecStatsdBackend() override;
+
+ private:
+  base::Subprocess subprocess_;
+};
+
+ExecStatsdBackend::ExecStatsdBackend(std::string input,
+                                     base::ScopedFile output_wr)
+    : StatsdBackend(std::move(input), std::move(output_wr)),
+      subprocess_({"/system/bin/cmd", "stats", "data-subscribe"}) {
+  subprocess_.args.stdin_mode = base::Subprocess::InputMode::kBuffer;
+  subprocess_.args.stdout_mode = base::Subprocess::OutputMode::kFd;
+  subprocess_.args.stderr_mode = base::Subprocess::OutputMode::kInherit;
+  subprocess_.args.input = std::move(input_);
+  subprocess_.args.out_fd = std::move(output_wr_);
+  subprocess_.Start();
+  // Have to Poll at least once so the subprocess has a chance to
+  // consume the input.
+  // TODO(hjd): Might not manage to push the whole stdin here in which
+  // case we can be stuck here forever. We should re-posttask the Poll
+  // until the whole stdin is consumed.
+  subprocess_.Poll();
+}
+
+// virtual
+ExecStatsdBackend::~ExecStatsdBackend() = default;
+
+std::unique_ptr<StatsdBackend> GetStatsdBackend(std::string in,
+                                                base::ScopedFile&& out) {
+  return std::unique_ptr<StatsdBackend>(
+      new ExecStatsdBackend(std::move(in), std::move(out)));
+}
+
+}  // namespace
+
+StatsdBackend::StatsdBackend(std::string input, base::ScopedFile output_wr)
+    : input_(std::move(input)), output_wr_(std::move(output_wr)) {}
+
+StatsdBackend::~StatsdBackend() = default;
+
+SizetPrefixedMessageReader::SizetPrefixedMessageReader()
+    : RingBufferMessageReader() {}
+SizetPrefixedMessageReader::~SizetPrefixedMessageReader() {}
+
+SizetPrefixedMessageReader::Message SizetPrefixedMessageReader::TryReadMessage(
+    const uint8_t* start,
+    const uint8_t* end) {
+  SizetPrefixedMessageReader::Message msg{};
+  size_t available = static_cast<size_t>(end - start);
+  if (kHeaderSize <= available) {
+    size_t sz = 0;
+    static_assert(sizeof(sz) == kHeaderSize, "kHeaderSize must match size_t");
+    memcpy(&sz, start, kHeaderSize);
+    // It is valid for sz to be zero here and we must ensure we return
+    // a valid Message for this case.
+    if (kHeaderSize + sz <= available) {
+      PERFETTO_CHECK(kHeaderSize + sz > sz);
+      msg.start = start + kHeaderSize;
+      msg.len = static_cast<uint32_t>(sz);
+      msg.field_id = 0;
+    }
+  }
+  return msg;
+}
+
+// static
+const ProbesDataSource::Descriptor StatsdDataSource::descriptor = {
+    /*name*/ "android.statsd",
+    /*flags*/ Descriptor::kHandlesIncrementalState,
+    /*fill_descriptor_func*/ nullptr,
+};
+
+StatsdDataSource::StatsdDataSource(base::TaskRunner* task_runner,
+                                   TracingSessionID session_id,
+                                   std::unique_ptr<TraceWriter> writer,
+                                   const DataSourceConfig& ds_config)
+    : ProbesDataSource(session_id, &descriptor),
+      task_runner_(task_runner),
+      writer_(std::move(writer)),
+      output_(base::Pipe::Create(base::Pipe::Flags::kRdNonBlock)),
+      shell_subscription_(GenerateShellConfig(ds_config)),
+      weak_factory_(this) {}
+
+StatsdDataSource::~StatsdDataSource() {
+  if (output_.rd) {
+    task_runner_->RemoveFileDescriptorWatch(output_.rd.get());
+  }
+}
+
+void StatsdDataSource::Start() {
+  // Don't bother actually connecting to statsd if no pull/push atoms
+  // were configured:
+  if (shell_subscription_.empty()) {
+    PERFETTO_LOG("Empty statsd config. Not connecting to statsd.");
+    return;
+  }
+
+  // The binary protocol for talking to statsd is to write 'size_t'
+  // followed by a proto encoded ShellConfig. For now we assume that
+  // us and statsd are the same bitness & endianness.
+  std::string body = shell_subscription_;
+  size_t size = body.size();
+  static_assert(sizeof(size) == kHeaderSize, "kHeaderSize must match size_t");
+  std::string input(sizeof(size) + size, '\0');
+  memcpy(&input[0], &size, sizeof(size));
+  memcpy(&input[0] + sizeof(size), body.data(), size);
+
+  backend_ = GetStatsdBackend(std::move(input), std::move(output_.wr));
+
+  // Watch is removed on destruction.
+  auto weak_this = weak_factory_.GetWeakPtr();
+  task_runner_->AddFileDescriptorWatch(output_.rd.get(), [weak_this] {
+    if (weak_this) {
+      weak_this->OnStatsdWakeup();
+    }
+  });
+}
+
+// Once the pipe is available to read we want to drain it but we need
+// to split the work across multiple tasks to avoid statsd ddos'ing us
+// and causing us to hit the timeout. At the same time we don't want
+// multiple OnStatsdWakeup to cause 'concurrent' read cycles (we're
+// single threaded so we can't actually race but we could still end up
+// in some confused state) so:
+// - The first wakeup triggers DoRead and sets read_in_progress_
+// - Subsequent wakeups are ignored due to read_in_progress_
+// - DoRead does a single read and either:
+//    - No data = we're finished so unset read_in_progress_
+//    - Some data so PostTask another DoRead.
+void StatsdDataSource::OnStatsdWakeup() {
+  if (read_in_progress_) {
+    return;
+  }
+  read_in_progress_ = true;
+  DoRead();
+}
+
+// Do a single read. If there is potentially more data to read schedule
+// another DoRead.
+void StatsdDataSource::DoRead() {
+  PERFETTO_CHECK(read_in_progress_);
+
+  uint8_t data[4098];
+  // Read into the static buffer
+  ssize_t rd = PERFETTO_EINTR(read(output_.rd.get(), &data, sizeof(data)));
+  if (rd < 0) {
+    if (!base::IsAgain(errno)) {
+      PERFETTO_PLOG("Failed to read statsd pipe (ret: %zd)", rd);
+    }
+    // EAGAIN or otherwise we're done so re-enable the fd watch.
+    read_in_progress_ = false;
+    return;
+  } else if (rd == 0) {
+    // EOF so clean everything up.
+    read_in_progress_ = false;
+    task_runner_->RemoveFileDescriptorWatch(output_.rd.get());
+    backend_.reset();
+  }
+
+  buffer_.Append(data, static_cast<size_t>(rd));
+
+  TraceWriter::TracePacketHandle packet;
+  for (;;) {
+    SizetPrefixedMessageReader::Message msg = buffer_.ReadMessage();
+    // The whole packet is not available so we're done.
+    if (!msg.valid()) {
+      break;
+    }
+
+    // A heart beat packet with no content
+    if (msg.len == 0) {
+      continue;
+    }
+
+    packet = writer_->NewTracePacket();
+    // This is late. It's already been >=2 IPC hops since the client
+    // code actually produced the atom however we don't get any time
+    // stamp from statsd/the client so this is the best we can do:
+    packet->set_timestamp(static_cast<uint64_t>(base::GetBootTimeNs().count()));
+    auto* atom = packet->set_statsd_atom();
+    atom->AppendRawProtoBytes(msg.start, msg.len);
+    packet->Finalize();
+  }
+
+  // Potentially more to read so repost:
+  auto weak_this = weak_factory_.GetWeakPtr();
+  task_runner_->PostTask([weak_this] {
+    if (weak_this) {
+      weak_this->DoRead();
+    }
+  });
+}
+
+// static
+std::string StatsdDataSource::GenerateShellConfig(
+    const DataSourceConfig& config) {
+  StatsdTracingConfig::Decoder cfg(config.statsd_tracing_config_raw());
+  protozero::HeapBuffered<StatsdShellSubscription> msg;
+  for (auto pull_it = cfg.pull_config(); pull_it; ++pull_it) {
+    StatsdPullAtomConfig::Decoder pull_cfg(*pull_it);
+    AddPullAtoms(pull_cfg, pull_cfg.raw_pull_atom_id(), msg.get());
+    AddPullAtoms(pull_cfg, pull_cfg.pull_atom_id(), msg.get());
+  }
+  AddPushAtoms(cfg.push_atom_id(), msg.get());
+  AddPushAtoms(cfg.raw_push_atom_id(), msg.get());
+  return msg.SerializeAsString();
+}
+
+base::WeakPtr<StatsdDataSource> StatsdDataSource::GetWeakPtr() const {
+  return weak_factory_.GetWeakPtr();
+}
+
+void StatsdDataSource::Flush(FlushRequestID, std::function<void()> callback) {
+  writer_->Flush(callback);
+}
+
+void StatsdDataSource::ClearIncrementalState() {}
+
+}  // namespace perfetto
diff --git a/src/traced/probes/statsd_client/statsd_data_source.h b/src/traced/probes/statsd_client/statsd_data_source.h
new file mode 100644
index 0000000..853280f
--- /dev/null
+++ b/src/traced/probes/statsd_client/statsd_data_source.h
@@ -0,0 +1,123 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACED_PROBES_STATSD_CLIENT_STATSD_DATA_SOURCE_H_
+#define SRC_TRACED_PROBES_STATSD_CLIENT_STATSD_DATA_SOURCE_H_
+
+#include <array>
+#include <limits>
+#include <memory>
+#include <set>
+#include <unordered_map>
+#include <vector>
+
+#include "perfetto/ext/base/pipe.h"
+#include "perfetto/ext/base/subprocess.h"
+#include "perfetto/ext/base/weak_ptr.h"
+#include "perfetto/ext/tracing/core/basic_types.h"
+#include "perfetto/ext/tracing/core/trace_writer.h"
+#include "perfetto/tracing/core/forward_decls.h"
+#include "src/protozero/proto_ring_buffer.h"
+#include "src/traced/probes/probes_data_source.h"
+
+namespace perfetto {
+
+namespace base {
+class TaskRunner;
+}  // namespace base
+
+namespace protos {
+namespace pbzero {
+class ProcessTree;
+class Statsd;
+class Statsd_Process;
+}  // namespace pbzero
+}  // namespace protos
+
+// We have two ways to talk to statsd:
+// - via execing cmd
+// - via binder:
+// https://cs.android.com/android/platform/superproject/+/master:frameworks/native/libs/binder/ndk/include_cpp/android/binder_interface_utils.h;l=239?q=android%2Fbinder_interface_utils.h
+// TODO(hjd): Implement binder backend.
+class StatsdBackend {
+ public:
+  // output is a file descriptor that StatsdBackend will continously
+  // write to until the backend is destoied. Normally ths would be the
+  // 'write' side of a pipe.
+  StatsdBackend(std::string input, base::ScopedFile output_wr);
+  virtual ~StatsdBackend();
+
+ protected:
+  // Encoded ShellConfig which will be written to statsd stdin.
+  std::string input_;
+  // stdout file descriptor. Normally one end of a pipe.
+  base::ScopedFile output_wr_;
+};
+
+class SizetPrefixedMessageReader final
+    : public protozero::RingBufferMessageReader {
+ public:
+  SizetPrefixedMessageReader();
+  virtual ~SizetPrefixedMessageReader() override;
+
+ protected:
+  virtual SizetPrefixedMessageReader::Message TryReadMessage(
+      const uint8_t* start,
+      const uint8_t* end) override;
+};
+
+class StatsdDataSource : public ProbesDataSource {
+ public:
+  static const ProbesDataSource::Descriptor descriptor;
+
+  StatsdDataSource(base::TaskRunner*,
+                   TracingSessionID,
+                   std::unique_ptr<TraceWriter> writer,
+                   const DataSourceConfig&);
+  ~StatsdDataSource() override;
+
+  base::WeakPtr<StatsdDataSource> GetWeakPtr() const;
+
+  // ProbesDataSource implementation.
+  void Start() override;
+  void Flush(FlushRequestID, std::function<void()> callback) override;
+  void ClearIncrementalState() override;
+
+  // public for testing
+  static std::string GenerateShellConfig(const DataSourceConfig& config);
+
+ private:
+  // Common functions.
+  StatsdDataSource(const StatsdDataSource&) = delete;
+  StatsdDataSource& operator=(const StatsdDataSource&) = delete;
+
+  void OnStatsdWakeup();
+  void DoRead();
+
+  base::TaskRunner* const task_runner_;
+  std::unique_ptr<TraceWriter> writer_;
+  std::unique_ptr<StatsdBackend> backend_{};
+  base::Pipe output_;
+  std::string shell_subscription_;
+  bool read_in_progress_ = false;
+  SizetPrefixedMessageReader buffer_;
+
+  base::WeakPtrFactory<StatsdDataSource> weak_factory_;  // Keep last.
+};
+
+}  // namespace perfetto
+
+#endif  // SRC_TRACED_PROBES_STATSD_CLIENT_STATSD_DATA_SOURCE_H_
diff --git a/src/traced/probes/statsd_client/statsd_data_source_unittest.cc b/src/traced/probes/statsd_client/statsd_data_source_unittest.cc
new file mode 100644
index 0000000..57f4f19
--- /dev/null
+++ b/src/traced/probes/statsd_client/statsd_data_source_unittest.cc
@@ -0,0 +1,90 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/traced/probes/statsd_client/statsd_data_source.h"
+
+#include "perfetto/protozero/scattered_heap_buffer.h"
+#include "perfetto/tracing/core/data_source_config.h"
+#include "src/base/test/test_task_runner.h"
+#include "src/tracing/core/trace_writer_for_testing.h"
+#include "test/gtest_and_gmock.h"
+
+#include "protos/perfetto/trace/statsd/statsd_atom.pbzero.h"
+#include "protos/perfetto/trace/trace_packet.pbzero.h"
+
+#include "protos/perfetto/config/statsd/statsd_tracing_config.gen.h"
+#include "protos/perfetto/trace/statsd/statsd_atom.gen.h"
+#include "protos/third_party/statsd/shell_config.pbzero.h"
+
+using ::perfetto::protos::gen::StatsdTracingConfig;
+using ::perfetto::protos::pbzero::StatsdShellSubscription;
+using ::perfetto::protos::pbzero::StatsdSimpleAtomMatcher;
+using ::testing::Mock;
+
+namespace perfetto {
+namespace {
+
+class TestStatsdDataSource : public StatsdDataSource {
+ public:
+  TestStatsdDataSource(base::TaskRunner* task_runner,
+                       TracingSessionID id,
+                       std::unique_ptr<TraceWriter> writer,
+                       const DataSourceConfig& config)
+      : StatsdDataSource(task_runner, id, std::move(writer), config) {}
+};
+
+class StatsdDataSourceTest : public ::testing::Test {
+ protected:
+  StatsdDataSourceTest() {}
+
+  std::unique_ptr<TestStatsdDataSource> GetStatsdDataSource(
+      const DataSourceConfig& cfg) {
+    auto writer =
+        std::unique_ptr<TraceWriterForTesting>(new TraceWriterForTesting());
+    writer_raw_ = writer.get();
+    return std::unique_ptr<TestStatsdDataSource>(
+        new TestStatsdDataSource(&task_runner_, 0, std::move(writer), cfg));
+  }
+
+  base::TestTaskRunner task_runner_;
+  TraceWriterForTesting* writer_raw_;
+};
+
+TEST_F(StatsdDataSourceTest, EmptyTest) {}
+
+TEST(StatsdDataSourceStaticTest, EmptyConfig) {
+  DataSourceConfig cfg{};
+  std::string s = StatsdDataSource::GenerateShellConfig(cfg);
+  EXPECT_EQ(s, "");
+}
+
+TEST(StatsdDataSourceStaticTest, PushOneAtom) {
+  StatsdTracingConfig cfg;
+  cfg.add_raw_push_atom_id(42);
+
+  DataSourceConfig ds_cfg;
+  ds_cfg.set_statsd_tracing_config_raw(cfg.SerializeAsString());
+
+  std::string s = StatsdDataSource::GenerateShellConfig(ds_cfg);
+  StatsdShellSubscription::Decoder subscription(s);
+
+  EXPECT_TRUE(subscription.has_pushed());
+  EXPECT_EQ(StatsdSimpleAtomMatcher::Decoder(*subscription.pushed()).atom_id(),
+            42);
+}
+
+}  // namespace
+}  // namespace perfetto
diff --git a/test/configs/statsd.cfg b/test/configs/statsd.cfg
new file mode 100644
index 0000000..376d768
--- /dev/null
+++ b/test/configs/statsd.cfg
@@ -0,0 +1,21 @@
+buffers {
+  size_kb: 100024
+  fill_policy: RING_BUFFER
+}
+
+data_sources {
+  config {
+    name: "android.statsd"
+    target_buffer: 0
+    statsd_tracing_config {
+      push_atom_id: FLASHLIGHT_STATE_CHANGED
+      pull_config {
+        pull_atom_id: SYSTEM_UPTIME
+        pull_frequency_ms: 500
+        packages: "SYSTEM_AID"
+      }
+    }
+  }
+}
+
+duration_ms: 10000