base: add thread pool for CPU bound tasks

This is a very lightweight implementation building heavily on top of
ThreadTaskRunner and simply adding a queue of tasks which allows load
balancing across threads and a stack tracking the threads which are
immediately available for scheduling.

Change-Id: I8a7fa3fde327e16fd7aa9f6735cbe4db95b65959
diff --git a/Android.bp b/Android.bp
index e519dff..eadd256 100644
--- a/Android.bp
+++ b/Android.bp
@@ -1714,6 +1714,11 @@
     name: "perfetto_include_perfetto_ext_base_http_http",
 }
 
+// GN: //include/perfetto/ext/base/threading:threading
+filegroup {
+    name: "perfetto_include_perfetto_ext_base_threading_threading",
+}
+
 // GN: //include/perfetto/ext/base:version
 filegroup {
     name: "perfetto_include_perfetto_ext_base_version",
@@ -8122,6 +8127,22 @@
     ],
 }
 
+// GN: //src/base/threading:threading
+filegroup {
+    name: "perfetto_src_base_threading_threading",
+    srcs: [
+        "src/base/threading/thread_pool.cc",
+    ],
+}
+
+// GN: //src/base/threading:unittests
+filegroup {
+    name: "perfetto_src_base_threading_unittests",
+    srcs: [
+        "src/base/threading/thread_pool_unittest.cc",
+    ],
+}
+
 // GN: //src/base:unittests
 filegroup {
     name: "perfetto_src_base_unittests",
@@ -11284,6 +11305,7 @@
         ":perfetto_include_perfetto_base_base",
         ":perfetto_include_perfetto_ext_base_base",
         ":perfetto_include_perfetto_ext_base_http_http",
+        ":perfetto_include_perfetto_ext_base_threading_threading",
         ":perfetto_include_perfetto_ext_base_version",
         ":perfetto_include_perfetto_ext_ipc_ipc",
         ":perfetto_include_perfetto_ext_trace_processor_demangle",
@@ -11411,6 +11433,8 @@
         ":perfetto_src_base_http_http",
         ":perfetto_src_base_http_unittests",
         ":perfetto_src_base_test_support",
+        ":perfetto_src_base_threading_threading",
+        ":perfetto_src_base_threading_unittests",
         ":perfetto_src_base_unittests",
         ":perfetto_src_base_unix_socket",
         ":perfetto_src_base_version",
diff --git a/gn/perfetto_unittests.gni b/gn/perfetto_unittests.gni
index 3af1962..76594fe 100644
--- a/gn/perfetto_unittests.gni
+++ b/gn/perfetto_unittests.gni
@@ -18,6 +18,7 @@
   "gn:default_deps",
   "gn:gtest_main",
   "src/base:unittests",
+  "src/base/threading:unittests",
   "src/protozero:unittests",
   "src/tracing/core:unittests",
   "src/tracing:unittests",
diff --git a/include/perfetto/ext/base/threading/BUILD.gn b/include/perfetto/ext/base/threading/BUILD.gn
new file mode 100644
index 0000000..ab12670
--- /dev/null
+++ b/include/perfetto/ext/base/threading/BUILD.gn
@@ -0,0 +1,23 @@
+# Copyright (C) 2023 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import("../../../../../gn/perfetto.gni")
+
+source_set("threading") {
+  sources = [ "thread_pool.h" ]
+  deps = [
+    "..:base",
+    "../../../../../gn:default_deps",
+  ]
+}
diff --git a/include/perfetto/ext/base/threading/thread_pool.h b/include/perfetto/ext/base/threading/thread_pool.h
new file mode 100644
index 0000000..1a4b153
--- /dev/null
+++ b/include/perfetto/ext/base/threading/thread_pool.h
@@ -0,0 +1,80 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INCLUDE_PERFETTO_EXT_BASE_THREADING_THREAD_POOL_H_
+#define INCLUDE_PERFETTO_EXT_BASE_THREADING_THREAD_POOL_H_
+
+#include <condition_variable>
+#include <functional>
+#include <list>
+#include <mutex>
+#include <thread>
+#include <vector>
+
+#include "perfetto/base/task_runner.h"
+#include "perfetto/ext/base/optional.h"
+
+namespace perfetto {
+namespace base {
+
+// Bounded thread pool designed for CPU-bound tasks.
+//
+// This is a classic bounded thread pool designed for running jobs which fully
+// occupy the CPU without blocking. IO bound tasks which block for long periods
+// of times will cause starvation for any other tasks which are waiting.
+// IO-heavy tasks should use base::TaskRunner and async-IO instead of using this
+// class.
+//
+// Threads are created when the thread pool is created and persist for the
+// lifetime of the ThreadPool. No new threads are created after construction.
+// When the ThreadPool is destroyed, any active tasks are completed and every
+// thread joined before returning from the destructor.
+//
+// Tasks are executed in a FIFO order without any notion of priority. If a
+// thread in the pool is free, it will be used to execute the task immediately.
+// Otherwise, it will be queued for execution when any thread becomes available.
+class ThreadPool {
+ public:
+  // Initializes this thread_pool |thread_count| threads.
+  explicit ThreadPool(uint32_t thread_count);
+  ~ThreadPool();
+
+  // Submits a task for execution by any thread in this thread pool.
+  //
+  // This task should not block for IO as this can cause starvation.
+  void PostTask(std::function<void()>);
+
+ private:
+  void RunThreadLoop();
+
+  ThreadPool(ThreadPool&&) = delete;
+  ThreadPool& operator=(ThreadPool&&) = delete;
+
+  // Start of mutex protected members.
+  std::mutex mutex_;
+  std::list<std::function<void()>> pending_tasks_;
+  std::condition_variable thread_waiter_;
+  uint32_t thread_waiting_count_ = 0;
+  bool quit_ = false;
+  // End of mutex protected members.
+
+  std::vector<std::thread> threads_;
+};
+
+}  // namespace base
+}  // namespace perfetto
+
+#endif  // INCLUDE_PERFETTO_EXT_BASE_THREADING_THREAD_POOL_H_
diff --git a/src/base/threading/BUILD.gn b/src/base/threading/BUILD.gn
new file mode 100644
index 0000000..ba709e7
--- /dev/null
+++ b/src/base/threading/BUILD.gn
@@ -0,0 +1,33 @@
+# Copyright (C) 2023 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import("../../../gn/test.gni")
+
+source_set("threading") {
+  deps = [ "../../../gn:default_deps" ]
+  public_deps = [ "../../../include/perfetto/ext/base/threading" ]
+  sources = [ "thread_pool.cc" ]
+}
+
+perfetto_unittest_source_set("unittests") {
+  testonly = true
+  deps = [
+    ":threading",
+    "..:base",
+    "../../../gn:default_deps",
+    "../../../gn:gtest_and_gmock",
+  ]
+
+  sources = [ "thread_pool_unittest.cc" ]
+}
diff --git a/src/base/threading/thread_pool.cc b/src/base/threading/thread_pool.cc
new file mode 100644
index 0000000..2097126
--- /dev/null
+++ b/src/base/threading/thread_pool.cc
@@ -0,0 +1,75 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "perfetto/ext/base/threading/thread_pool.h"
+#include <mutex>
+#include <thread>
+
+namespace perfetto {
+namespace base {
+
+ThreadPool::ThreadPool(uint32_t thread_count) {
+  for (uint32_t i = 0; i < thread_count; ++i) {
+    threads_.emplace_back(std::bind(&ThreadPool::RunThreadLoop, this));
+  }
+}
+
+ThreadPool::~ThreadPool() {
+  {
+    std::lock_guard<std::mutex> guard(mutex_);
+    quit_ = true;
+  }
+  thread_waiter_.notify_all();
+  for (auto& thread : threads_) {
+    thread.join();
+  }
+}
+
+void ThreadPool::PostTask(std::function<void()> fn) {
+  {
+    std::lock_guard<std::mutex> guard(mutex_);
+    pending_tasks_.emplace_back(std::move(fn));
+  if (thread_waiting_count_ == 0) {
+    return;
+  }
+  }
+  thread_waiter_.notify_one();
+}
+
+void ThreadPool::RunThreadLoop() {
+  for (;;) {
+    std::function<void()> fn;
+    {
+      std::unique_lock<std::mutex> guard(mutex_);
+      if (quit_) {
+        return;
+      }
+      if (pending_tasks_.empty()) {
+        thread_waiting_count_++;
+        thread_waiter_.wait(
+            guard, [this]() { return quit_ || !pending_tasks_.empty(); });
+        thread_waiting_count_--;
+        continue;
+      }
+      fn = std::move(pending_tasks_.front());
+      pending_tasks_.pop_front();
+    }
+    fn();
+  }
+}
+
+}  // namespace base
+}  // namespace perfetto
diff --git a/src/base/threading/thread_pool_unittest.cc b/src/base/threading/thread_pool_unittest.cc
new file mode 100644
index 0000000..f4aa0f7
--- /dev/null
+++ b/src/base/threading/thread_pool_unittest.cc
@@ -0,0 +1,107 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "perfetto/ext/base//threading/thread_pool.h"
+#include <atomic>
+#include <condition_variable>
+#include <mutex>
+
+#include "perfetto/ext/base/waitable_event.h"
+#include "test/gtest_and_gmock.h"
+
+namespace perfetto {
+namespace base {
+namespace {
+
+struct ThreadLatch {
+  base::WaitableEvent notify;
+  base::WaitableEvent wait;
+  bool task_started = false;
+};
+
+TEST(ThreadPoolTest, SequentialQueueing) {
+  ThreadLatch first;
+  ThreadLatch second;
+  base::ThreadPool pool(1);
+
+  pool.PostTask([&first] {
+    first.task_started = true;
+    first.notify.Notify();
+    first.wait.Wait();
+  });
+
+  pool.PostTask([&second] {
+    second.task_started = true;
+    second.notify.Notify();
+    second.wait.Wait();
+  });
+
+  first.notify.Wait();
+  ASSERT_TRUE(first.task_started);
+  ASSERT_FALSE(second.task_started);
+  first.wait.Notify();
+
+  second.notify.Wait();
+  ASSERT_TRUE(second.task_started);
+  second.wait.Notify();
+}
+
+TEST(ThreadPoolTest, ParallelSecondFinishFirst) {
+  base::ThreadPool pool(2);
+
+  ThreadLatch first;
+  pool.PostTask([&first] {
+    first.wait.Wait();
+    first.task_started = true;
+    first.notify.Notify();
+  });
+
+  ThreadLatch second;
+  pool.PostTask([&second] {
+    second.wait.Wait();
+    second.task_started = true;
+    second.notify.Notify();
+  });
+
+  second.wait.Notify();
+  second.notify.Wait();
+  ASSERT_TRUE(second.task_started);
+
+  first.wait.Notify();
+  first.notify.Wait();
+  ASSERT_TRUE(first.task_started);
+}
+
+TEST(ThreadPoolTest, StressTest) {
+  std::atomic<uint32_t> atomic(0);
+  std::condition_variable cv;
+  base::ThreadPool pool(128);
+  for (uint32_t i = 0; i < 1024; ++i) {
+    pool.PostTask([&atomic, &cv] {
+      if (atomic.fetch_add(1) == 1023) {
+        cv.notify_one();
+      }
+    });
+  }
+
+  std::mutex mu;
+  std::unique_lock<std::mutex> lock(mu);
+  cv.wait(lock, [&atomic]() { return atomic.load() == 1024u; });
+}
+
+}  // namespace
+}  // namespace base
+}  // namespace perfetto