traced_perf: process sharding (choosing random processes for callstacks)

The config will specify N shards for one or more data sources, we need
to choose a random value [0,N) to use as the shard for all those data
sources. We know which sources share a session (i.e. a config) via
tracing_session_id as supplied by traced.

Instead of introducing session-scoped data and linking that to the
lifetime of data sources, I've opted to simply replicate this state in
each DataSourceState. The extra plumbing through EventConfig::Create is
a bit clumsy, but ultimately TargetFilter is a logical place to host
this state.

Doc: go/tperf-sharding
Bug: 223773242
Change-Id: I84982c819ef541735bf3c6821c30efed2122ae94
diff --git a/Android.bp b/Android.bp
index 71b26e4..335b330 100644
--- a/Android.bp
+++ b/Android.bp
@@ -8278,6 +8278,7 @@
     name: "perfetto_src_profiling_perf_producer_unittests",
     srcs: [
         "src/profiling/perf/event_config_unittest.cc",
+        "src/profiling/perf/perf_producer_unittest.cc",
         "src/profiling/perf/unwind_queue_unittest.cc",
     ],
 }
diff --git a/src/profiling/perf/BUILD.gn b/src/profiling/perf/BUILD.gn
index 341bd3a..ff602a7 100644
--- a/src/profiling/perf/BUILD.gn
+++ b/src/profiling/perf/BUILD.gn
@@ -147,6 +147,7 @@
   ]
   sources = [
     "event_config_unittest.cc",
+    "perf_producer_unittest.cc",
     "unwind_queue_unittest.cc",
   ]
 }
diff --git a/src/profiling/perf/event_config.cc b/src/profiling/perf/event_config.cc
index 4263dc5..a89d0c9 100644
--- a/src/profiling/perf/event_config.cc
+++ b/src/profiling/perf/event_config.cc
@@ -89,7 +89,9 @@
 // regardless of whether we're parsing an old-style config. The overall outcome
 // shouldn't change for almost all existing uses.
 template <typename T>
-TargetFilter ParseTargetFilter(const T& cfg) {
+TargetFilter ParseTargetFilter(
+    const T& cfg,
+    base::Optional<ProcessSharding> process_sharding) {
   TargetFilter filter;
   for (const auto& str : cfg.target_cmdline()) {
     filter.cmdlines.push_back(str);
@@ -104,6 +106,7 @@
     filter.exclude_pids.insert(pid);
   }
   filter.additional_cmdline_count = cfg.additional_cmdline_count();
+  filter.process_sharding = process_sharding;
   return filter;
 }
 
@@ -292,19 +295,9 @@
 
 // static
 base::Optional<EventConfig> EventConfig::Create(
-    const DataSourceConfig& ds_config,
-    tracepoint_id_fn_t tracepoint_id_lookup) {
-  protos::gen::PerfEventConfig pb_config;
-  if (!pb_config.ParseFromString(ds_config.perf_event_config_raw()))
-    return base::nullopt;
-
-  return EventConfig::Create(pb_config, ds_config, tracepoint_id_lookup);
-}
-
-// static
-base::Optional<EventConfig> EventConfig::Create(
     const protos::gen::PerfEventConfig& pb_config,
     const DataSourceConfig& raw_ds_config,
+    base::Optional<ProcessSharding> process_sharding,
     tracepoint_id_fn_t tracepoint_id_lookup) {
   // Timebase: sampling interval.
   uint64_t sampling_frequency = 0;
@@ -380,11 +373,14 @@
         return base::nullopt;
     }
 
-    // Process scoping.
+    // Process scoping. Sharding parameter is supplied from outside as it is
+    // shared by all data sources within a tracing session.
     target_filter =
         pb_config.callstack_sampling().has_scope()
-            ? ParseTargetFilter(pb_config.callstack_sampling().scope())
-            : ParseTargetFilter(pb_config);  // backwards compatibility
+            ? ParseTargetFilter(pb_config.callstack_sampling().scope(),
+                                process_sharding)
+            : ParseTargetFilter(pb_config,
+                                process_sharding);  // backwards compatibility
 
     // Kernel callstacks.
     kernel_frames = pb_config.callstack_sampling().kernel_frames() ||
diff --git a/src/profiling/perf/event_config.h b/src/profiling/perf/event_config.h
index 6249177..515a102 100644
--- a/src/profiling/perf/event_config.h
+++ b/src/profiling/perf/event_config.h
@@ -41,6 +41,13 @@
 
 namespace profiling {
 
+// Callstack sampling parameter for unwinding only a fraction of seen processes
+// (without enumerating them in the config).
+struct ProcessSharding {
+  uint32_t shard_count = 0;
+  uint32_t chosen_shard = 0;
+};
+
 // Parsed allow/deny-list for filtering samples.
 // An empty allow-list means that all targets are allowed unless explicitly
 // denied.
@@ -49,6 +56,7 @@
   std::vector<std::string> exclude_cmdlines;
   base::FlatSet<pid_t> pids;
   base::FlatSet<pid_t> exclude_pids;
+  base::Optional<ProcessSharding> process_sharding;
   uint32_t additional_cmdline_count = 0;
 };
 
@@ -111,13 +119,9 @@
       std::function<uint32_t(const std::string&, const std::string&)>;
 
   static base::Optional<EventConfig> Create(
-      const DataSourceConfig& ds_config,
-      tracepoint_id_fn_t tracepoint_id_lookup =
-          [](const std::string&, const std::string&) { return 0; });
-
-  static base::Optional<EventConfig> Create(
       const protos::gen::PerfEventConfig& pb_config,
       const DataSourceConfig& raw_ds_config,
+      base::Optional<ProcessSharding> process_sharding,
       tracepoint_id_fn_t tracepoint_id_lookup);
 
   uint32_t ring_buffer_pages() const { return ring_buffer_pages_; }
@@ -168,8 +172,6 @@
   // ioctl after creating the event.
   const PerfCounter timebase_event_;
 
-  // TODO(rsavitski): consider adding an Optional<CallstackSampling> once the
-  // complexity warrants it.
   // If true, include userspace frames in sampled callstacks.
   const bool user_frames_;
 
diff --git a/src/profiling/perf/event_config_unittest.cc b/src/profiling/perf/event_config_unittest.cc
index d744390..95c0a9b 100644
--- a/src/profiling/perf/event_config_unittest.cc
+++ b/src/profiling/perf/event_config_unittest.cc
@@ -38,17 +38,20 @@
   return (v != 0 && ((v & (v - 1)) == 0));
 }
 
-static DataSourceConfig AsDataSourceConfig(
-    const protos::gen::PerfEventConfig& perf_cfg) {
+base::Optional<EventConfig> CreateEventConfig(
+    const protos::gen::PerfEventConfig& perf_cfg,
+    EventConfig::tracepoint_id_fn_t tracepoint_id_lookup =
+        [](const std::string&, const std::string&) { return 0; }) {
   protos::gen::DataSourceConfig ds_cfg;
   ds_cfg.set_perf_event_config_raw(perf_cfg.SerializeAsString());
-  return ds_cfg;
+  return EventConfig::Create(perf_cfg, ds_cfg,
+                             /*process_sharding=*/base::nullopt,
+                             tracepoint_id_lookup);
 }
 
 TEST(EventConfigTest, AttrStructConstructed) {
   protos::gen::PerfEventConfig cfg;
-  base::Optional<EventConfig> event_config =
-      EventConfig::Create(AsDataSourceConfig(cfg));
+  base::Optional<EventConfig> event_config = CreateEventConfig(cfg);
 
   ASSERT_TRUE(event_config.has_value());
   ASSERT_TRUE(event_config->perf_attr() != nullptr);
@@ -57,8 +60,7 @@
 TEST(EventConfigTest, RingBufferPagesValidated) {
   {  // if unset, a default is used
     protos::gen::PerfEventConfig cfg;
-    base::Optional<EventConfig> event_config =
-        EventConfig::Create(AsDataSourceConfig(cfg));
+    base::Optional<EventConfig> event_config = CreateEventConfig(cfg);
 
     ASSERT_TRUE(event_config.has_value());
     ASSERT_GT(event_config->ring_buffer_pages(), 0u);
@@ -68,8 +70,7 @@
     uint32_t num_pages = 128;
     protos::gen::PerfEventConfig cfg;
     cfg.set_ring_buffer_pages(num_pages);
-    base::Optional<EventConfig> event_config =
-        EventConfig::Create(AsDataSourceConfig(cfg));
+    base::Optional<EventConfig> event_config = CreateEventConfig(cfg);
 
     ASSERT_TRUE(event_config.has_value());
     ASSERT_EQ(event_config->ring_buffer_pages(), num_pages);
@@ -77,8 +78,7 @@
   {  // entire config rejected if not a power of two of pages
     protos::gen::PerfEventConfig cfg;
     cfg.set_ring_buffer_pages(7);
-    base::Optional<EventConfig> event_config =
-        EventConfig::Create(AsDataSourceConfig(cfg));
+    base::Optional<EventConfig> event_config = CreateEventConfig(cfg);
 
     ASSERT_FALSE(event_config.has_value());
   }
@@ -87,8 +87,7 @@
 TEST(EventConfigTest, ReadTickPeriodDefaultedIfUnset) {
   {  // if unset, a default is used
     protos::gen::PerfEventConfig cfg;
-    base::Optional<EventConfig> event_config =
-        EventConfig::Create(AsDataSourceConfig(cfg));
+    base::Optional<EventConfig> event_config = CreateEventConfig(cfg);
 
     ASSERT_TRUE(event_config.has_value());
     ASSERT_GT(event_config->read_tick_period_ms(), 0u);
@@ -97,8 +96,7 @@
     uint32_t period_ms = 250;
     protos::gen::PerfEventConfig cfg;
     cfg.set_ring_buffer_read_period_ms(period_ms);
-    base::Optional<EventConfig> event_config =
-        EventConfig::Create(AsDataSourceConfig(cfg));
+    base::Optional<EventConfig> event_config = CreateEventConfig(cfg);
 
     ASSERT_TRUE(event_config.has_value());
     ASSERT_EQ(event_config->read_tick_period_ms(), period_ms);
@@ -108,8 +106,7 @@
 TEST(EventConfigTest, RemotePeriodTimeoutDefaultedIfUnset) {
   {  // if unset, a default is used
     protos::gen::PerfEventConfig cfg;
-    base::Optional<EventConfig> event_config =
-        EventConfig::Create(AsDataSourceConfig(cfg));
+    base::Optional<EventConfig> event_config = CreateEventConfig(cfg);
 
     ASSERT_TRUE(event_config.has_value());
     ASSERT_GT(event_config->remote_descriptor_timeout_ms(), 0u);
@@ -118,8 +115,7 @@
     uint32_t timeout_ms = 300;
     protos::gen::PerfEventConfig cfg;
     cfg.set_remote_descriptor_timeout_ms(timeout_ms);
-    base::Optional<EventConfig> event_config =
-        EventConfig::Create(AsDataSourceConfig(cfg));
+    base::Optional<EventConfig> event_config = CreateEventConfig(cfg);
 
     ASSERT_TRUE(event_config.has_value());
     ASSERT_EQ(event_config->remote_descriptor_timeout_ms(), timeout_ms);
@@ -130,8 +126,7 @@
   {  // period:
     protos::gen::PerfEventConfig cfg;
     cfg.mutable_timebase()->set_period(100);
-    base::Optional<EventConfig> event_config =
-        EventConfig::Create(AsDataSourceConfig(cfg));
+    base::Optional<EventConfig> event_config = CreateEventConfig(cfg);
 
     ASSERT_TRUE(event_config.has_value());
     EXPECT_FALSE(event_config->perf_attr()->freq);
@@ -140,8 +135,7 @@
   {  // frequency:
     protos::gen::PerfEventConfig cfg;
     cfg.mutable_timebase()->set_frequency(4000);
-    base::Optional<EventConfig> event_config =
-        EventConfig::Create(AsDataSourceConfig(cfg));
+    base::Optional<EventConfig> event_config = CreateEventConfig(cfg);
 
     ASSERT_TRUE(event_config.has_value());
     EXPECT_TRUE(event_config->perf_attr()->freq);
@@ -150,8 +144,7 @@
   {  // legacy frequency field:
     protos::gen::PerfEventConfig cfg;
     cfg.set_sampling_frequency(5000);
-    base::Optional<EventConfig> event_config =
-        EventConfig::Create(AsDataSourceConfig(cfg));
+    base::Optional<EventConfig> event_config = CreateEventConfig(cfg);
 
     ASSERT_TRUE(event_config.has_value());
     EXPECT_TRUE(event_config->perf_attr()->freq);
@@ -159,8 +152,7 @@
   }
   {  // default is 10 Hz (implementation-defined)
     protos::gen::PerfEventConfig cfg;
-    base::Optional<EventConfig> event_config =
-        EventConfig::Create(AsDataSourceConfig(cfg));
+    base::Optional<EventConfig> event_config = CreateEventConfig(cfg);
 
     ASSERT_TRUE(event_config.has_value());
     EXPECT_TRUE(event_config->perf_attr()->freq);
@@ -180,7 +172,7 @@
     mutable_tracepoint->set_name("sched:sched_switch");
 
     base::Optional<EventConfig> event_config =
-        EventConfig::Create(AsDataSourceConfig(cfg), id_lookup);
+        CreateEventConfig(cfg, id_lookup);
 
     ASSERT_TRUE(event_config.has_value());
     EXPECT_EQ(event_config->perf_attr()->type, PERF_TYPE_TRACEPOINT);
@@ -189,8 +181,7 @@
   {  // default is the CPU timer:
     protos::gen::PerfEventConfig cfg;
     cfg.mutable_timebase()->set_frequency(1000);
-    base::Optional<EventConfig> event_config =
-        EventConfig::Create(AsDataSourceConfig(cfg));
+    base::Optional<EventConfig> event_config = CreateEventConfig(cfg);
 
     ASSERT_TRUE(event_config.has_value());
     EXPECT_EQ(event_config->perf_attr()->type, PERF_TYPE_SOFTWARE);
@@ -208,8 +199,7 @@
     mutable_scope->set_additional_cmdline_count(3);
     mutable_scope->add_exclude_cmdline("heapprofd");
 
-    base::Optional<EventConfig> event_config =
-        EventConfig::Create(AsDataSourceConfig(cfg));
+    base::Optional<EventConfig> event_config = CreateEventConfig(cfg);
 
     ASSERT_TRUE(event_config.has_value());
     const auto& filter = event_config->filter();
@@ -230,8 +220,7 @@
     cfg.set_additional_cmdline_count(3);
     cfg.add_exclude_cmdline("heapprofd");
 
-    base::Optional<EventConfig> event_config =
-        EventConfig::Create(AsDataSourceConfig(cfg));
+    base::Optional<EventConfig> event_config = CreateEventConfig(cfg);
 
     ASSERT_TRUE(event_config.has_value());
     const auto& filter = event_config->filter();
@@ -252,8 +241,7 @@
     mutable_timebase->set_period(500);
     mutable_timebase->set_counter(protos::gen::PerfEvents::HW_CPU_CYCLES);
 
-    base::Optional<EventConfig> event_config =
-        EventConfig::Create(AsDataSourceConfig(cfg));
+    base::Optional<EventConfig> event_config = CreateEventConfig(cfg);
 
     ASSERT_TRUE(event_config.has_value());
     EXPECT_EQ(event_config->perf_attr()->type, PERF_TYPE_HARDWARE);
@@ -268,8 +256,7 @@
     mutable_timebase->set_period(500);
     mutable_timebase->set_counter(protos::gen::PerfEvents::SW_PAGE_FAULTS);
 
-    base::Optional<EventConfig> event_config =
-        EventConfig::Create(AsDataSourceConfig(cfg));
+    base::Optional<EventConfig> event_config = CreateEventConfig(cfg);
 
     ASSERT_TRUE(event_config.has_value());
     EXPECT_EQ(event_config->perf_attr()->type, PERF_TYPE_SOFTWARE);
@@ -285,8 +272,7 @@
     protos::gen::PerfEventConfig cfg;
     cfg.mutable_callstack_sampling();  // set field
 
-    base::Optional<EventConfig> event_config =
-        EventConfig::Create(AsDataSourceConfig(cfg));
+    base::Optional<EventConfig> event_config = CreateEventConfig(cfg);
 
     ASSERT_TRUE(event_config.has_value());
     EXPECT_TRUE(event_config->sample_callstacks());
@@ -306,8 +292,7 @@
     cfg.mutable_callstack_sampling()->set_user_frames(
         protos::gen::PerfEventConfig::UNWIND_SKIP);
 
-    base::Optional<EventConfig> event_config =
-        EventConfig::Create(AsDataSourceConfig(cfg));
+    base::Optional<EventConfig> event_config = CreateEventConfig(cfg);
 
     ASSERT_TRUE(event_config.has_value());
     EXPECT_TRUE(event_config->sample_callstacks());
@@ -330,8 +315,7 @@
   {
     protos::gen::PerfEventConfig cfg;
     cfg.mutable_callstack_sampling()->set_kernel_frames(true);
-    base::Optional<EventConfig> event_config =
-        EventConfig::Create(AsDataSourceConfig(cfg));
+    base::Optional<EventConfig> event_config = CreateEventConfig(cfg);
 
     ASSERT_TRUE(event_config.has_value());
     EXPECT_TRUE(event_config->kernel_frames());
@@ -340,16 +324,14 @@
     protos::gen::PerfEventConfig cfg;
     cfg.set_all_cpus(true);  // used to detect compat mode
     cfg.set_kernel_frames(true);
-    base::Optional<EventConfig> event_config =
-        EventConfig::Create(AsDataSourceConfig(cfg));
+    base::Optional<EventConfig> event_config = CreateEventConfig(cfg);
 
     ASSERT_TRUE(event_config.has_value());
     EXPECT_TRUE(event_config->kernel_frames());
   }
   {  // default is false
     protos::gen::PerfEventConfig cfg;
-    base::Optional<EventConfig> event_config =
-        EventConfig::Create(AsDataSourceConfig(cfg));
+    base::Optional<EventConfig> event_config = CreateEventConfig(cfg);
 
     ASSERT_TRUE(event_config.has_value());
     EXPECT_FALSE(event_config->kernel_frames());
@@ -359,8 +341,7 @@
 TEST(EventConfigTest, TimestampClockId) {
   {  // if unset, a default is used
     protos::gen::PerfEventConfig cfg;
-    base::Optional<EventConfig> event_config =
-        EventConfig::Create(AsDataSourceConfig(cfg));
+    base::Optional<EventConfig> event_config = CreateEventConfig(cfg);
 
     ASSERT_TRUE(event_config.has_value());
     EXPECT_TRUE(event_config->perf_attr()->use_clockid);
@@ -370,8 +351,7 @@
     protos::gen::PerfEventConfig cfg;
     cfg.mutable_timebase()->set_timestamp_clock(
         protos::gen::PerfEvents::PERF_CLOCK_BOOTTIME);
-    base::Optional<EventConfig> event_config =
-        EventConfig::Create(AsDataSourceConfig(cfg));
+    base::Optional<EventConfig> event_config = CreateEventConfig(cfg);
 
     ASSERT_TRUE(event_config.has_value());
     EXPECT_TRUE(event_config->perf_attr()->use_clockid);
@@ -381,8 +361,7 @@
     protos::gen::PerfEventConfig cfg;
     cfg.mutable_timebase()->set_timestamp_clock(
         protos::gen::PerfEvents::PERF_CLOCK_MONOTONIC);
-    base::Optional<EventConfig> event_config =
-        EventConfig::Create(AsDataSourceConfig(cfg));
+    base::Optional<EventConfig> event_config = CreateEventConfig(cfg);
 
     ASSERT_TRUE(event_config.has_value());
     EXPECT_TRUE(event_config->perf_attr()->use_clockid);
diff --git a/src/profiling/perf/perf_producer.cc b/src/profiling/perf/perf_producer.cc
index 951cb4b..092ac49 100644
--- a/src/profiling/perf/perf_producer.cc
+++ b/src/profiling/perf/perf_producer.cc
@@ -161,6 +161,16 @@
 
   // Not setting timebase.timestamp_clock since the field that matters during
   // parsing is the root timestamp_clock_id set above.
+
+  // Record the random shard we've chosen so that the post-processing can infer
+  // which processes would've been unwound if sampled. In particular this lets
+  // us distinguish between "running but not chosen" and "running and chosen,
+  // but not sampled" cases.
+  const auto& process_sharding = event_config.filter().process_sharding;
+  if (process_sharding.has_value()) {
+    perf_defaults->set_process_shard_count(process_sharding->shard_count);
+    perf_defaults->set_chosen_process_shard(process_sharding->chosen_shard);
+  }
 }
 
 uint32_t TimeToNextReadTickMs(DataSourceInstanceID ds_id, uint32_t period_ms) {
@@ -176,73 +186,6 @@
   return period_ms - ((now_ms - ds_period_offset) % period_ms);
 }
 
-bool ShouldRejectDueToFilter(pid_t pid,
-                             const TargetFilter& filter,
-                             bool skip_cmdline,
-                             base::FlatSet<std::string>* additional_cmdlines) {
-  PERFETTO_CHECK(additional_cmdlines);
-
-  std::string cmdline;
-  bool have_cmdline = false;
-  if (!skip_cmdline)
-    have_cmdline = glob_aware::ReadProcCmdlineForPID(pid, &cmdline);
-
-  const char* binname = "";
-  if (have_cmdline) {
-    binname = glob_aware::FindBinaryName(cmdline.c_str(), cmdline.size());
-  }
-
-  auto has_matching_pattern = [](const std::vector<std::string>& patterns,
-                                 const char* cmd, const char* name) {
-    for (const std::string& pattern : patterns) {
-      if (glob_aware::MatchGlobPattern(pattern.c_str(), cmd, name)) {
-        return true;
-      }
-    }
-    return false;
-  };
-
-  if (have_cmdline &&
-      has_matching_pattern(filter.exclude_cmdlines, cmdline.c_str(), binname)) {
-    PERFETTO_DLOG("Explicitly rejecting samples for pid [%d] due to cmdline",
-                  static_cast<int>(pid));
-    return true;
-  }
-  if (filter.exclude_pids.count(pid)) {
-    PERFETTO_DLOG("Explicitly rejecting samples for pid [%d] due to pid",
-                  static_cast<int>(pid));
-    return true;
-  }
-
-  if (have_cmdline &&
-      has_matching_pattern(filter.cmdlines, cmdline.c_str(), binname)) {
-    return false;
-  }
-  if (filter.pids.count(pid)) {
-    return false;
-  }
-
-  // Empty allow filter means keep everything that isn't explicitly excluded.
-  if (filter.cmdlines.empty() && filter.pids.empty() &&
-      !filter.additional_cmdline_count) {
-    return false;
-  }
-
-  // Config option that allows to profile just the N first seen cmdlines.
-  if (have_cmdline) {
-    if (additional_cmdlines->count(cmdline)) {
-      return false;
-    }
-    if (additional_cmdlines->size() < filter.additional_cmdline_count) {
-      additional_cmdlines->insert(cmdline);
-      return false;
-    }
-  }
-
-  PERFETTO_DLOG("Rejecting samples for pid [%d]", static_cast<int>(pid));
-  return true;
-}
-
 protos::pbzero::Profiling::CpuMode ToCpuModeEnum(uint16_t perf_cpu_mode) {
   using Profiling = protos::pbzero::Profiling;
   switch (perf_cpu_mode) {
@@ -299,6 +242,94 @@
 
 }  // namespace
 
+// static
+bool PerfProducer::ShouldRejectDueToFilter(
+    pid_t pid,
+    const TargetFilter& filter,
+    bool skip_cmdline,
+    base::FlatSet<std::string>* additional_cmdlines,
+    std::function<bool(std::string*)> read_proc_pid_cmdline) {
+  PERFETTO_CHECK(additional_cmdlines);
+
+  std::string cmdline;
+  bool have_cmdline = false;
+  if (!skip_cmdline)
+    have_cmdline = read_proc_pid_cmdline(&cmdline);
+
+  const char* binname = "";
+  if (have_cmdline) {
+    binname = glob_aware::FindBinaryName(cmdline.c_str(), cmdline.size());
+  }
+
+  auto has_matching_pattern = [](const std::vector<std::string>& patterns,
+                                 const char* cmd, const char* name) {
+    for (const std::string& pattern : patterns) {
+      if (glob_aware::MatchGlobPattern(pattern.c_str(), cmd, name)) {
+        return true;
+      }
+    }
+    return false;
+  };
+
+  if (have_cmdline &&
+      has_matching_pattern(filter.exclude_cmdlines, cmdline.c_str(), binname)) {
+    PERFETTO_DLOG("Explicitly rejecting samples for pid [%d] due to cmdline",
+                  static_cast<int>(pid));
+    return true;
+  }
+  if (filter.exclude_pids.count(pid)) {
+    PERFETTO_DLOG("Explicitly rejecting samples for pid [%d] due to pid",
+                  static_cast<int>(pid));
+    return true;
+  }
+
+  if (have_cmdline &&
+      has_matching_pattern(filter.cmdlines, cmdline.c_str(), binname)) {
+    return false;
+  }
+  if (filter.pids.count(pid)) {
+    return false;
+  }
+
+  // Empty allow filter means keep everything that isn't explicitly excluded.
+  if (filter.cmdlines.empty() && filter.pids.empty() &&
+      !filter.additional_cmdline_count &&
+      !filter.process_sharding.has_value()) {
+    return false;
+  }
+
+  // Niche option: process sharding to amortise systemwide unwinding costs.
+  // Selects a subset of all processes by using the low order bits of their pid.
+  if (filter.process_sharding.has_value()) {
+    uint32_t upid = static_cast<uint32_t>(pid);
+    if (upid % filter.process_sharding->shard_count ==
+        filter.process_sharding->chosen_shard) {
+      PERFETTO_DLOG("Process sharding: keeping pid [%d]",
+                    static_cast<int>(pid));
+      return false;
+    } else {
+      PERFETTO_DLOG("Process sharding: rejecting pid [%d]",
+                    static_cast<int>(pid));
+      return true;
+    }
+  }
+
+  // Niche option: additionally remember the first seen N process cmdlines, and
+  // keep all processes with those names.
+  if (have_cmdline) {
+    if (additional_cmdlines->count(cmdline)) {
+      return false;
+    }
+    if (additional_cmdlines->size() < filter.additional_cmdline_count) {
+      additional_cmdlines->insert(cmdline);
+      return false;
+    }
+  }
+
+  PERFETTO_DLOG("Rejecting samples for pid [%d]", static_cast<int>(pid));
+  return true;
+}
+
 PerfProducer::PerfProducer(ProcDescriptorGetter* proc_fd_getter,
                            base::TaskRunner* task_runner)
     : task_runner_(task_runner),
@@ -313,7 +344,9 @@
 
 void PerfProducer::StartDataSource(DataSourceInstanceID ds_id,
                                    const DataSourceConfig& config) {
-  PERFETTO_LOG("StartDataSource(%zu, %s)", static_cast<size_t>(ds_id),
+  uint64_t tracing_session_id = config.tracing_session_id();
+  PERFETTO_LOG("StartDataSource(ds %zu, session %" PRIu64 ", name %s)",
+               static_cast<size_t>(ds_id), tracing_session_id,
                config.name().c_str());
 
   if (config.name() == MetatraceWriter::kDataSourceName) {
@@ -340,8 +373,20 @@
     PERFETTO_ELOG("PerfEventConfig could not be parsed.");
     return;
   }
-  base::Optional<EventConfig> event_config =
-      EventConfig::Create(event_config_pb, config, tracepoint_id_lookup);
+
+  // Unlikely: handle a callstack sampling option that shares a random decision
+  // between all data sources within a tracing session. Instead of introducing
+  // session-scoped data, we replicate the decision in each per-DS EventConfig.
+  base::Optional<ProcessSharding> process_sharding;
+  uint32_t shard_count =
+      event_config_pb.callstack_sampling().scope().process_shard_count();
+  if (shard_count > 0) {
+    process_sharding =
+        GetOrChooseCallstackProcessShard(tracing_session_id, shard_count);
+  }
+
+  base::Optional<EventConfig> event_config = EventConfig::Create(
+      event_config_pb, config, process_sharding, tracepoint_id_lookup);
   if (!event_config.has_value()) {
     PERFETTO_ELOG("PerfEventConfig rejected.");
     return;
@@ -369,8 +414,8 @@
   bool inserted;
   std::tie(ds_it, inserted) = data_sources_.emplace(
       std::piecewise_construct, std::forward_as_tuple(ds_id),
-      std::forward_as_tuple(event_config.value(), std::move(writer),
-                            std::move(per_cpu_readers)));
+      std::forward_as_tuple(event_config.value(), tracing_session_id,
+                            std::move(writer), std::move(per_cpu_readers)));
   PERFETTO_CHECK(inserted);
   DataSourceState& ds = ds_it->second;
 
@@ -647,8 +692,11 @@
       // Check whether samples for this new process should be dropped due to
       // the target filtering. Kernel threads don't have a cmdline, but we
       // still check against pid inclusion/exclusion.
-      if (ShouldRejectDueToFilter(pid, event_config.filter(), is_kthread,
-                                  &ds->additional_cmdlines)) {
+      if (ShouldRejectDueToFilter(
+              pid, event_config.filter(), is_kthread, &ds->additional_cmdlines,
+              [pid](std::string* cmdline) {
+                return glob_aware::ReadProcCmdlineForPID(pid, cmdline);
+              })) {
         process_state = ProcessTrackingStatus::kRejected;
         continue;
       }
@@ -1027,6 +1075,45 @@
   }
 }
 
+// Either:
+// * choose a random number up to |shard_count|.
+// * reuse a choice made previously by a data source within this tracing
+//   session. The config option requires that all data sources within one config
+//   have the same shard count.
+base::Optional<ProcessSharding> PerfProducer::GetOrChooseCallstackProcessShard(
+    uint64_t tracing_session_id,
+    uint32_t shard_count) {
+  for (auto& it : data_sources_) {
+    const DataSourceState& ds = it.second;
+    const auto& sharding = ds.event_config.filter().process_sharding;
+    if ((ds.tracing_session_id != tracing_session_id) || !sharding.has_value())
+      continue;
+
+    // Found existing data source, reuse its decision while doing best-effort
+    // error reporting (logging) if the shard count is not the same.
+    if (sharding->shard_count != shard_count) {
+      PERFETTO_ELOG(
+          "Mismatch of process_shard_count between data sources in tracing "
+          "session %" PRIu64 ". Overriding shard count to match.",
+          tracing_session_id);
+    }
+    return sharding;
+  }
+
+  // First data source in this session, choose random shard.
+  std::random_device r;
+  std::minstd_rand minstd(r());
+  std::uniform_int_distribution<uint32_t> dist(0, shard_count - 1);
+  uint32_t chosen_shard = dist(minstd);
+
+  ProcessSharding ret;
+  ret.shard_count = shard_count;
+  ret.chosen_shard = chosen_shard;
+
+  PERFETTO_DCHECK(ret.shard_count && ret.chosen_shard < ret.shard_count);
+  return ret;
+}
+
 void PerfProducer::StartMetatraceSource(DataSourceInstanceID ds_id,
                                         BufferID target_buffer) {
   auto writer = endpoint_->CreateTraceWriter(target_buffer);
diff --git a/src/profiling/perf/perf_producer.h b/src/profiling/perf/perf_producer.h
index d9c6a18..31d5ff5 100644
--- a/src/profiling/perf/perf_producer.h
+++ b/src/profiling/perf/perf_producer.h
@@ -101,6 +101,14 @@
     all_data_sources_registered_cb_ = cb;
   }
 
+  // public for testing:
+  static bool ShouldRejectDueToFilter(
+      pid_t pid,
+      const TargetFilter& filter,
+      bool skip_cmdline,
+      base::FlatSet<std::string>* additional_cmdlines,
+      std::function<bool(std::string*)> read_proc_pid_cmdline);
+
  private:
   // State of the producer's connection to tracing service (traced).
   enum State {
@@ -136,14 +144,17 @@
     enum class Status { kActive, kShuttingDown };
 
     DataSourceState(EventConfig _event_config,
+                    uint64_t _tracing_session_id,
                     std::unique_ptr<TraceWriter> _trace_writer,
                     std::vector<EventReader> _per_cpu_readers)
         : event_config(std::move(_event_config)),
+          tracing_session_id(_tracing_session_id),
           trace_writer(std::move(_trace_writer)),
           per_cpu_readers(std::move(_per_cpu_readers)) {}
 
     Status status = Status::kActive;
     const EventConfig event_config;
+    uint64_t tracing_session_id;
     std::unique_ptr<TraceWriter> trace_writer;
     // Indexed by cpu, vector never resized.
     std::vector<EventReader> per_cpu_readers;
@@ -154,9 +165,8 @@
     // in the |Unwinder|, which needs to track whether the necessary unwinding
     // inputs for a given process' samples are ready.
     std::map<pid_t, ProcessTrackingStatus> process_states;
-
-    // Command lines we have decided to unwind, up to a total of
-    // additional_cmdline_count values.
+    // Additional state for EventConfig.TargetFilter: command lines we have
+    // decided to unwind, up to a total of additional_cmdline_count values.
     base::FlatSet<std::string> additional_cmdlines;
   };
 
@@ -228,6 +238,12 @@
   void CheckMemoryFootprintPeriodic(DataSourceInstanceID ds_id,
                                     uint32_t max_daemon_memory_kb);
 
+  // Chooses a random parameter for a callstack sampling option. Done at this
+  // level as the choice is shared by all data sources within a tracing session.
+  base::Optional<ProcessSharding> GetOrChooseCallstackProcessShard(
+      uint64_t tracing_session_id,
+      uint32_t shard_count);
+
   void StartMetatraceSource(DataSourceInstanceID ds_id, BufferID target_buffer);
 
   // Task runner owned by the main thread.
diff --git a/src/profiling/perf/perf_producer_unittest.cc b/src/profiling/perf/perf_producer_unittest.cc
new file mode 100644
index 0000000..22f4002
--- /dev/null
+++ b/src/profiling/perf/perf_producer_unittest.cc
@@ -0,0 +1,202 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/profiling/perf/perf_producer.h"
+
+#include <stdint.h>
+
+#include "perfetto/base/logging.h"
+#include "perfetto/ext/base/optional.h"
+#include "test/gtest_and_gmock.h"
+
+namespace perfetto {
+namespace profiling {
+namespace {
+
+bool ShouldReject(pid_t pid,
+                  std::string cmdline,
+                  const TargetFilter& filter,
+                  bool skip_cmd,
+                  base::FlatSet<std::string>* additional_cmdlines) {
+  return PerfProducer::ShouldRejectDueToFilter(
+      pid, filter, skip_cmd, additional_cmdlines, [cmdline](std::string* out) {
+        *out = cmdline;
+        return true;
+      });
+}
+
+TEST(TargetFilterTest, EmptyFilter) {
+  {
+    bool skip_cmd = false;
+    base::FlatSet<std::string> extra_cmds;
+    TargetFilter filter;
+
+    // empty filter allows everything
+    EXPECT_FALSE(ShouldReject(42, "/bin/echo", filter, skip_cmd, &extra_cmds));
+    EXPECT_FALSE(ShouldReject(77, "/bin/echo", filter, skip_cmd, &extra_cmds));
+  }
+  {
+    bool skip_cmd = false;
+    base::FlatSet<std::string> extra_cmds;
+    TargetFilter filter;
+    filter.exclude_pids.insert(1);
+
+    // allow everything besides the explicit exclusions
+    EXPECT_FALSE(ShouldReject(42, "/bin/echo", filter, skip_cmd, &extra_cmds));
+    EXPECT_FALSE(ShouldReject(77, "/bin/echo", filter, skip_cmd, &extra_cmds));
+
+    EXPECT_TRUE(ShouldReject(1, "/sbin/init", filter, skip_cmd, &extra_cmds));
+  }
+}
+
+TEST(TargetFilterTest, TargetPids) {
+  bool skip_cmd = false;
+  base::FlatSet<std::string> extra_cmds;
+  TargetFilter filter;
+  filter.pids.insert(32);
+  filter.pids.insert(42);
+
+  EXPECT_FALSE(ShouldReject(32, "/bin/echo", filter, skip_cmd, &extra_cmds));
+  EXPECT_FALSE(ShouldReject(42, "/bin/echo", filter, skip_cmd, &extra_cmds));
+
+  EXPECT_TRUE(ShouldReject(77, "/bin/echo", filter, skip_cmd, &extra_cmds));
+}
+
+TEST(TargetFilterTest, ExcludePids) {
+  bool skip_cmd = false;
+  base::FlatSet<std::string> extra_cmds;
+  TargetFilter filter;
+  filter.exclude_pids.insert(32);
+  filter.exclude_pids.insert(42);
+
+  EXPECT_FALSE(ShouldReject(77, "/bin/echo", filter, skip_cmd, &extra_cmds));
+
+  EXPECT_TRUE(ShouldReject(32, "/bin/echo", filter, skip_cmd, &extra_cmds));
+  EXPECT_TRUE(ShouldReject(42, "/bin/echo", filter, skip_cmd, &extra_cmds));
+}
+
+TEST(TargetFilterTest, TargetCmdlines) {
+  {
+    bool skip_cmd = false;
+    base::FlatSet<std::string> extra_cmds;
+    TargetFilter filter;
+    filter.cmdlines.emplace_back("echo");
+    filter.cmdlines.emplace_back("/bin/cat");
+
+    EXPECT_FALSE(ShouldReject(42, "/bin/echo", filter, skip_cmd, &extra_cmds));
+    EXPECT_FALSE(ShouldReject(42, "/bin/cat", filter, skip_cmd, &extra_cmds));
+
+    EXPECT_TRUE(ShouldReject(42, "/bin/top", filter, skip_cmd, &extra_cmds));
+  }
+  {
+    bool skip_cmd = true;
+    base::FlatSet<std::string> extra_cmds;
+    TargetFilter filter;
+    filter.cmdlines.emplace_back("echo");
+    filter.cmdlines.emplace_back("/bin/cat");
+
+    // As above but with |skip_cmd| making none of the cmdline checks apply.
+    // Therefore everything gets rejected because it's still considered to be a
+    // filter that only requested specific targets (and none of these match).
+    EXPECT_TRUE(ShouldReject(42, "/bin/echo", filter, skip_cmd, &extra_cmds));
+    EXPECT_TRUE(ShouldReject(42, "/bin/cat", filter, skip_cmd, &extra_cmds));
+    EXPECT_TRUE(ShouldReject(42, "/bin/top", filter, skip_cmd, &extra_cmds));
+  }
+}
+
+TEST(TargetFilterTest, ExcludeCmdlines) {
+  bool skip_cmd = false;
+  base::FlatSet<std::string> extra_cmds;
+  TargetFilter filter;
+  filter.exclude_cmdlines.emplace_back("echo");
+  filter.exclude_cmdlines.emplace_back("/bin/cat");
+
+  EXPECT_FALSE(ShouldReject(42, "/bin/top", filter, skip_cmd, &extra_cmds));
+
+  EXPECT_TRUE(ShouldReject(42, "/bin/echo", filter, skip_cmd, &extra_cmds));
+  EXPECT_TRUE(ShouldReject(42, "/bin/cat", filter, skip_cmd, &extra_cmds));
+}
+
+TEST(TargetFilterTest, ExclusionPrioritised) {
+  bool skip_cmd = false;
+  base::FlatSet<std::string> extra_cmds;
+  TargetFilter filter;
+  filter.pids.insert(42);
+  filter.exclude_pids.insert(42);
+  filter.cmdlines.push_back("echo");
+  filter.exclude_cmdlines.push_back("echo");
+
+  EXPECT_TRUE(ShouldReject(42, "/bin/cat", filter, skip_cmd, &extra_cmds));
+  EXPECT_TRUE(ShouldReject(100, "/bin/echo", filter, skip_cmd, &extra_cmds));
+}
+
+TEST(TargetFilterTest, ProcessSharding) {
+  {
+    bool skip_cmd = false;
+    base::FlatSet<std::string> extra_cmds;
+    TargetFilter filter;
+    filter.process_sharding =
+        ProcessSharding{/*shard_count=*/4, /*chosen_shard=*/1};
+
+    EXPECT_FALSE(ShouldReject(1, "/bin/echo", filter, skip_cmd, &extra_cmds));
+    EXPECT_FALSE(ShouldReject(41, "/bin/echo", filter, skip_cmd, &extra_cmds));
+
+    EXPECT_TRUE(ShouldReject(0, "/bin/echo", filter, skip_cmd, &extra_cmds));
+    EXPECT_TRUE(ShouldReject(42, "/bin/echo", filter, skip_cmd, &extra_cmds));
+  }
+  {
+    // as above but with an explicit exclude_pid
+    bool skip_cmd = false;
+    base::FlatSet<std::string> extra_cmds;
+    TargetFilter filter;
+    filter.exclude_pids.insert(41);
+    filter.process_sharding =
+        ProcessSharding{/*shard_count=*/4, /*chosen_shard=*/1};
+
+    EXPECT_FALSE(ShouldReject(1, "/bin/echo", filter, skip_cmd, &extra_cmds));
+
+    // explicit exclusion applies even if pid is in the accepted shard
+    EXPECT_TRUE(ShouldReject(41, "/bin/echo", filter, skip_cmd, &extra_cmds));
+    EXPECT_TRUE(ShouldReject(42, "/bin/echo", filter, skip_cmd, &extra_cmds));
+  }
+}
+
+TEST(TargetFilterTest, AdditionalCmdlines) {
+  bool skip_cmd = false;
+  base::FlatSet<std::string> extra_cmds;
+  TargetFilter filter;
+  filter.additional_cmdline_count = 2;
+
+  // first two distinct cmdlines remembered and allowed:
+  EXPECT_FALSE(ShouldReject(42, "/bin/echo", filter, skip_cmd, &extra_cmds));
+  EXPECT_FALSE(ShouldReject(43, "/bin/echo", filter, skip_cmd, &extra_cmds));
+  EXPECT_FALSE(ShouldReject(44, "/bin/cat", filter, skip_cmd, &extra_cmds));
+
+  // further cmdlines rejected:
+  EXPECT_TRUE(ShouldReject(45, "/bin/top", filter, skip_cmd, &extra_cmds));
+
+  // remembered cmdlines still allowed:
+  EXPECT_FALSE(ShouldReject(46, "/bin/echo", filter, skip_cmd, &extra_cmds));
+
+  EXPECT_EQ(extra_cmds.size(), 2u);
+  EXPECT_EQ(extra_cmds.count("/bin/echo"), 1u);
+  EXPECT_EQ(extra_cmds.count("/bin/cat"), 1u);
+  EXPECT_EQ(extra_cmds.count("/bin/top"), 0u);
+}
+
+}  // namespace
+}  // namespace profiling
+}  // namespace perfetto