Thread Redaction - Create synthetic threads

In order to merge threads, new thread need to be created. These "fake"
threads will be called synthetic threads. A synthetic thread needs
to have a unique thread id, ensuring no collisions.

Because each CPU will get its own thread, all scheduling events on a
CPU will get moved into that CPU's thread.

The range of PIDs are known (0 to 2 ^ 22), so all synth threads will
be greater than 2 ^ 22.

Bug: 336807771
Change-Id: I29242f5c24dc478e9582d2a1e5eba3959e1af462
diff --git a/Android.bp b/Android.bp
index 94b84ff..f8e8b03 100644
--- a/Android.bp
+++ b/Android.bp
@@ -12906,6 +12906,7 @@
     name: "perfetto_src_trace_redaction_trace_redaction",
     srcs: [
         "src/trace_redaction/collect_frame_cookies.cc",
+        "src/trace_redaction/collect_system_info.cc",
         "src/trace_redaction/collect_timeline_events.cc",
         "src/trace_redaction/filter_ftrace_using_allowlist.cc",
         "src/trace_redaction/filter_packet_using_allowlist.cc",
@@ -12937,6 +12938,7 @@
     name: "perfetto_src_trace_redaction_unittests",
     srcs: [
         "src/trace_redaction/collect_frame_cookies_unittest.cc",
+        "src/trace_redaction/collect_system_info_unittest.cc",
         "src/trace_redaction/collect_timeline_events_unittest.cc",
         "src/trace_redaction/filter_ftrace_using_allowlist_unittest.cc",
         "src/trace_redaction/filter_packet_using_allowlist_unittest.cc",
diff --git a/src/trace_redaction/BUILD.gn b/src/trace_redaction/BUILD.gn
index 4f2fb5f..037545d 100644
--- a/src/trace_redaction/BUILD.gn
+++ b/src/trace_redaction/BUILD.gn
@@ -30,6 +30,8 @@
   sources = [
     "collect_frame_cookies.cc",
     "collect_frame_cookies.h",
+    "collect_system_info.cc",
+    "collect_system_info.h",
     "collect_timeline_events.cc",
     "collect_timeline_events.h",
     "filter_ftrace_using_allowlist.cc",
@@ -126,6 +128,7 @@
   testonly = true
   sources = [
     "collect_frame_cookies_unittest.cc",
+    "collect_system_info_unittest.cc",
     "collect_timeline_events_unittest.cc",
     "filter_ftrace_using_allowlist_unittest.cc",
     "filter_packet_using_allowlist_unittest.cc",
@@ -157,5 +160,6 @@
     "../../protos/perfetto/trace/ps:cpp",
     "../../protos/perfetto/trace/ps:zero",
     "../base:test_support",
+    "../trace_processor/util:util",
   ]
 }
diff --git a/src/trace_redaction/collect_system_info.cc b/src/trace_redaction/collect_system_info.cc
new file mode 100644
index 0000000..374f510
--- /dev/null
+++ b/src/trace_redaction/collect_system_info.cc
@@ -0,0 +1,81 @@
+/*
+ * Copyright (C) 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/trace_redaction/collect_system_info.h"
+
+#include "perfetto/protozero/field.h"
+
+#include "protos/perfetto/trace/ftrace/ftrace_event_bundle.pbzero.h"
+
+namespace perfetto::trace_redaction {
+
+base::Status CollectSystemInfo::Begin(Context* context) const {
+  // Other primitives are allows to push more data into the system info (e.g.
+  // another source of pids).
+  if (!context->system_info.has_value()) {
+    context->system_info.emplace();
+  }
+
+  return base::OkStatus();
+}
+
+base::Status CollectSystemInfo::Collect(
+    const protos::pbzero::TracePacket::Decoder& packet,
+    Context* context) const {
+  auto* system_info = &context->system_info.value();
+
+  if (!packet.has_ftrace_events()) {
+    return base::OkStatus();
+  }
+
+  protozero::ProtoDecoder decoder(packet.ftrace_events());
+
+  auto field =
+      decoder.FindField(protos::pbzero::FtraceEventBundle::kCpuFieldNumber);
+
+  if (field.valid()) {
+    system_info->ReserveCpu(field.as_uint32());
+  }
+
+  return base::OkStatus();
+}
+
+base::Status BuildSyntheticThreads::Build(Context* context) const {
+  if (!context->system_info.has_value()) {
+    return base::ErrStatus("BuildThreadMap: missing system info.");
+  }
+
+  if (context->synthetic_threads.has_value()) {
+    return base::ErrStatus(
+        "BuildThreadMap: synthetic threads were already initialized.");
+  }
+
+  auto& system_info = context->system_info.value();
+  auto& synthetic_threads = context->synthetic_threads.emplace();
+
+  auto cpu_count = system_info.last_cpu() + 1;
+
+  synthetic_threads.tgid = system_info.AllocateSynthThread();
+  synthetic_threads.tids.resize(cpu_count);
+
+  for (uint32_t cpu = 0; cpu < cpu_count; ++cpu) {
+    synthetic_threads.tids[cpu] = system_info.AllocateSynthThread();
+  }
+
+  return base::OkStatus();
+}
+
+}  // namespace perfetto::trace_redaction
diff --git a/src/trace_redaction/collect_system_info.h b/src/trace_redaction/collect_system_info.h
new file mode 100644
index 0000000..1dec21c
--- /dev/null
+++ b/src/trace_redaction/collect_system_info.h
@@ -0,0 +1,46 @@
+/*
+ * Copyright (C) 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACE_REDACTION_COLLECT_SYSTEM_INFO_H_
+#define SRC_TRACE_REDACTION_COLLECT_SYSTEM_INFO_H_
+
+#include "src/trace_redaction/trace_redaction_framework.h"
+
+namespace perfetto::trace_redaction {
+
+// Collects system info (e.g. tids and cpu info). These will provide the raw
+// material needed by BuildThreadMap.
+class CollectSystemInfo : public CollectPrimitive {
+ public:
+  base::Status Begin(Context*) const override;
+
+  base::Status Collect(const protos::pbzero::TracePacket::Decoder&,
+                       Context*) const override;
+};
+
+// Condenses system info into a query-focuesed structure, making it possible to
+// replace a thread with a synthetic thread.
+//
+// This is done here, and not in CollectSystemInfo::End, so that other collect
+// primitives can report additional system information.
+class BuildSyntheticThreads : public BuildPrimitive {
+ public:
+  base::Status Build(Context* context) const override;
+};
+
+}  // namespace perfetto::trace_redaction
+
+#endif  // SRC_TRACE_REDACTION_COLLECT_SYSTEM_INFO_H_
diff --git a/src/trace_redaction/collect_system_info_unittest.cc b/src/trace_redaction/collect_system_info_unittest.cc
new file mode 100644
index 0000000..bf0ced2
--- /dev/null
+++ b/src/trace_redaction/collect_system_info_unittest.cc
@@ -0,0 +1,97 @@
+/*
+ * Copyright (C) 2024 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/trace_redaction/collect_system_info.h"
+#include "src/base/test/status_matchers.h"
+#include "src/trace_processor/util/status_macros.h"
+#include "test/gtest_and_gmock.h"
+
+#include "protos/perfetto/trace/ftrace/ftrace_event.gen.h"
+#include "protos/perfetto/trace/ftrace/ftrace_event_bundle.gen.h"
+#include "protos/perfetto/trace/ftrace/sched.gen.h"
+#include "protos/perfetto/trace/trace_packet.gen.h"
+
+namespace perfetto::trace_redaction {
+
+class CollectSystemInfoTest : public testing::Test {
+ protected:
+  base::Status Collect() {
+    auto buffer = packet_.SerializeAsString();
+    protos::pbzero::TracePacket::Decoder decoder(buffer);
+
+    RETURN_IF_ERROR(collect_.Begin(&context_));
+    RETURN_IF_ERROR(collect_.Collect(decoder, &context_));
+    return collect_.End(&context_);
+  }
+
+  void AppendFtraceEvent(uint32_t event_cpu, uint32_t pid) {
+    auto* events = packet_.mutable_ftrace_events();
+    events->set_cpu(event_cpu);
+
+    auto* event = events->add_event();
+    event->set_pid(pid);
+  }
+
+  void AppendSchedSwitch(int32_t next_pid) {
+    auto& event = packet_.mutable_ftrace_events()->mutable_event()->back();
+
+    auto* sched_switch = event.mutable_sched_switch();
+    sched_switch->set_prev_pid(static_cast<int32_t>(event.pid()));
+    sched_switch->set_next_pid(next_pid);
+  }
+
+  protos::gen::TracePacket packet_;
+  Context context_;
+  CollectSystemInfo collect_;
+};
+
+TEST_F(CollectSystemInfoTest, UpdatesCpuCountUsingFtraceEvents) {
+  AppendFtraceEvent(7, 8);
+  AppendSchedSwitch(9);
+
+  ASSERT_OK(Collect());
+  ASSERT_EQ(context_.system_info->last_cpu(), 7u);
+
+  AppendFtraceEvent(11, 8);
+  AppendSchedSwitch(9);
+
+  ASSERT_OK(Collect());
+  ASSERT_EQ(context_.system_info->last_cpu(), 11u);
+}
+
+// The first synth thread pid should be beyond the range of valid pids.
+TEST(SystemInfoTest, FirstSynthThreadPidIsNotAValidPid) {
+  SystemInfo info;
+
+  auto pid = info.AllocateSynthThread();
+  ASSERT_GT(pid, 1 << 22);
+}
+
+TEST(BuildSyntheticThreadsTest, CreatesThreadsPerCpu) {
+  Context context;
+  context.system_info.emplace();
+
+  // The first CPU is always 0, so CPU 7 means there are 8 CPUs.
+  context.system_info->ReserveCpu(7);
+
+  BuildSyntheticThreads build;
+  ASSERT_OK(build.Build(&context));
+
+  ASSERT_NE(context.synthetic_threads->tgid, 0);
+  ASSERT_EQ(context.synthetic_threads->tids.size(), 8u);
+}
+
+}  // namespace perfetto::trace_redaction
diff --git a/src/trace_redaction/main.cc b/src/trace_redaction/main.cc
index 9582890..04cfa1e 100644
--- a/src/trace_redaction/main.cc
+++ b/src/trace_redaction/main.cc
@@ -17,6 +17,7 @@
 #include "perfetto/base/logging.h"
 #include "perfetto/base/status.h"
 #include "src/trace_redaction/collect_frame_cookies.h"
+#include "src/trace_redaction/collect_system_info.h"
 #include "src/trace_redaction/collect_timeline_events.h"
 #include "src/trace_redaction/filter_ftrace_using_allowlist.h"
 #include "src/trace_redaction/filter_packet_using_allowlist.h"
@@ -51,12 +52,14 @@
   redactor.emplace_collect<FindPackageUid>();
   redactor.emplace_collect<CollectTimelineEvents>();
   redactor.emplace_collect<CollectFrameCookies>();
+  redactor.emplace_collect<CollectSystemInfo>();
 
   // Add all builders.
   redactor.emplace_build<PopulateAllowlists>();
   redactor.emplace_build<AllowSuspendResume>();
   redactor.emplace_build<OptimizeTimeline>();
   redactor.emplace_build<ReduceFrameCookies>();
+  redactor.emplace_build<BuildSyntheticThreads>();
 
   // Add all transforms.
   auto* scrub_packet = redactor.emplace_transform<ScrubTracePacket>();
diff --git a/src/trace_redaction/trace_redaction_framework.h b/src/trace_redaction/trace_redaction_framework.h
index cc135a9..12f6eb2 100644
--- a/src/trace_redaction/trace_redaction_framework.h
+++ b/src/trace_redaction/trace_redaction_framework.h
@@ -40,6 +40,59 @@
   return uid % 1000000;
 }
 
+class SystemInfo {
+ public:
+  int32_t AllocateSynthThread() {
+    return (1 << kSynthShift) | (++next_synth_thread_);
+  }
+
+  uint32_t ReserveCpu(uint32_t cpu) {
+    last_cpu_ = std::max(last_cpu_, cpu);
+    return last_cpu_;
+  }
+
+  uint32_t last_cpu() const { return last_cpu_; }
+
+ private:
+  // This is the last allocated tid. Using a tid equal to or less than this tid
+  // risks a collision with another tid. If a tid is ever created (by a
+  // primitive) this should be advanced to the max between this value and the
+  // new tid.
+  //
+  // On a 64 bit machine, the max pid limit is 2^22 (approximately 4 million).
+  // Perfetto uses a 32 (signed) int for the pid. Even in this case, there is
+  // room for 2^9 synthetic threads (2 ^ (31 - 22) = 2 ^ 9).
+  //
+  // Futhermore, ther Android source code return 4194304 (2 ^ 22) on 64 bit
+  // devices.
+  //
+  //  /proc/sys/kernel/pid_max (since Linux 2.5.34)
+  //      This file specifies the value at which PIDs wrap around
+  //      (i.e., the value in this file is one greater than the
+  //      maximum PID).  PIDs greater than this value are not
+  //      allocated; thus, the value in this file also acts as a
+  //      system-wide limit on the total number of processes and
+  //      threads.  The default value for this file, 32768, results
+  //      in the same range of PIDs as on earlier kernels.  On
+  //      32-bit platforms, 32768 is the maximum value for pid_max.
+  //      On 64-bit systems, pid_max can be set to any value up to
+  //      2^22 (PID_MAX_LIMIT, approximately 4 million).
+  //
+  // SOURCE: https://man7.org/linux/man-pages/man5/proc.5.html
+  static constexpr auto kSynthShift = 22;
+  int32_t next_synth_thread_ = 0;
+
+  // The last CPU index seen. If this value is 7, it means there are at least
+  // 8 CPUs.
+  uint32_t last_cpu_ = 0;
+};
+
+class SyntheticThreadGroup {
+ public:
+  int32_t tgid;
+  std::vector<int32_t> tids;
+};
+
 // Primitives should be stateless. All state should be stored in the context.
 // Primitives should depend on data in the context, not the origin of the data.
 // This allows primitives to be swapped out or work together to populate data
@@ -217,6 +270,10 @@
   // values are unique within the scope of the trace, pid and time are no longer
   // needed and a set can be used for faster queries.
   std::unordered_set<int64_t> package_frame_cookies;
+
+  std::optional<SystemInfo> system_info;
+
+  std::optional<SyntheticThreadGroup> synthetic_threads;
 };
 
 // Extracts low-level data from the trace and writes it into the context. The