base: add thread pool for CPU bound tasks
This is a very lightweight implementation building heavily on top of
ThreadTaskRunner and simply adding a queue of tasks which allows load
balancing across threads and a stack tracking the threads which are
immediately available for scheduling.
Change-Id: I8a7fa3fde327e16fd7aa9f6735cbe4db95b65959
diff --git a/Android.bp b/Android.bp
index e519dff..eadd256 100644
--- a/Android.bp
+++ b/Android.bp
@@ -1714,6 +1714,11 @@
name: "perfetto_include_perfetto_ext_base_http_http",
}
+// GN: //include/perfetto/ext/base/threading:threading
+filegroup {
+ name: "perfetto_include_perfetto_ext_base_threading_threading",
+}
+
// GN: //include/perfetto/ext/base:version
filegroup {
name: "perfetto_include_perfetto_ext_base_version",
@@ -8122,6 +8127,22 @@
],
}
+// GN: //src/base/threading:threading
+filegroup {
+ name: "perfetto_src_base_threading_threading",
+ srcs: [
+ "src/base/threading/thread_pool.cc",
+ ],
+}
+
+// GN: //src/base/threading:unittests
+filegroup {
+ name: "perfetto_src_base_threading_unittests",
+ srcs: [
+ "src/base/threading/thread_pool_unittest.cc",
+ ],
+}
+
// GN: //src/base:unittests
filegroup {
name: "perfetto_src_base_unittests",
@@ -11284,6 +11305,7 @@
":perfetto_include_perfetto_base_base",
":perfetto_include_perfetto_ext_base_base",
":perfetto_include_perfetto_ext_base_http_http",
+ ":perfetto_include_perfetto_ext_base_threading_threading",
":perfetto_include_perfetto_ext_base_version",
":perfetto_include_perfetto_ext_ipc_ipc",
":perfetto_include_perfetto_ext_trace_processor_demangle",
@@ -11411,6 +11433,8 @@
":perfetto_src_base_http_http",
":perfetto_src_base_http_unittests",
":perfetto_src_base_test_support",
+ ":perfetto_src_base_threading_threading",
+ ":perfetto_src_base_threading_unittests",
":perfetto_src_base_unittests",
":perfetto_src_base_unix_socket",
":perfetto_src_base_version",
diff --git a/gn/perfetto_unittests.gni b/gn/perfetto_unittests.gni
index 3af1962..76594fe 100644
--- a/gn/perfetto_unittests.gni
+++ b/gn/perfetto_unittests.gni
@@ -18,6 +18,7 @@
"gn:default_deps",
"gn:gtest_main",
"src/base:unittests",
+ "src/base/threading:unittests",
"src/protozero:unittests",
"src/tracing/core:unittests",
"src/tracing:unittests",
diff --git a/include/perfetto/ext/base/threading/BUILD.gn b/include/perfetto/ext/base/threading/BUILD.gn
new file mode 100644
index 0000000..ab12670
--- /dev/null
+++ b/include/perfetto/ext/base/threading/BUILD.gn
@@ -0,0 +1,23 @@
+# Copyright (C) 2023 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import("../../../../../gn/perfetto.gni")
+
+source_set("threading") {
+ sources = [ "thread_pool.h" ]
+ deps = [
+ "..:base",
+ "../../../../../gn:default_deps",
+ ]
+}
diff --git a/include/perfetto/ext/base/threading/thread_pool.h b/include/perfetto/ext/base/threading/thread_pool.h
new file mode 100644
index 0000000..1a4b153
--- /dev/null
+++ b/include/perfetto/ext/base/threading/thread_pool.h
@@ -0,0 +1,80 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INCLUDE_PERFETTO_EXT_BASE_THREADING_THREAD_POOL_H_
+#define INCLUDE_PERFETTO_EXT_BASE_THREADING_THREAD_POOL_H_
+
+#include <condition_variable>
+#include <functional>
+#include <list>
+#include <mutex>
+#include <thread>
+#include <vector>
+
+#include "perfetto/base/task_runner.h"
+#include "perfetto/ext/base/optional.h"
+
+namespace perfetto {
+namespace base {
+
+// Bounded thread pool designed for CPU-bound tasks.
+//
+// This is a classic bounded thread pool designed for running jobs which fully
+// occupy the CPU without blocking. IO bound tasks which block for long periods
+// of times will cause starvation for any other tasks which are waiting.
+// IO-heavy tasks should use base::TaskRunner and async-IO instead of using this
+// class.
+//
+// Threads are created when the thread pool is created and persist for the
+// lifetime of the ThreadPool. No new threads are created after construction.
+// When the ThreadPool is destroyed, any active tasks are completed and every
+// thread joined before returning from the destructor.
+//
+// Tasks are executed in a FIFO order without any notion of priority. If a
+// thread in the pool is free, it will be used to execute the task immediately.
+// Otherwise, it will be queued for execution when any thread becomes available.
+class ThreadPool {
+ public:
+ // Initializes this thread_pool |thread_count| threads.
+ explicit ThreadPool(uint32_t thread_count);
+ ~ThreadPool();
+
+ // Submits a task for execution by any thread in this thread pool.
+ //
+ // This task should not block for IO as this can cause starvation.
+ void PostTask(std::function<void()>);
+
+ private:
+ void RunThreadLoop();
+
+ ThreadPool(ThreadPool&&) = delete;
+ ThreadPool& operator=(ThreadPool&&) = delete;
+
+ // Start of mutex protected members.
+ std::mutex mutex_;
+ std::list<std::function<void()>> pending_tasks_;
+ std::condition_variable thread_waiter_;
+ uint32_t thread_waiting_count_ = 0;
+ bool quit_ = false;
+ // End of mutex protected members.
+
+ std::vector<std::thread> threads_;
+};
+
+} // namespace base
+} // namespace perfetto
+
+#endif // INCLUDE_PERFETTO_EXT_BASE_THREADING_THREAD_POOL_H_
diff --git a/src/base/threading/BUILD.gn b/src/base/threading/BUILD.gn
new file mode 100644
index 0000000..ba709e7
--- /dev/null
+++ b/src/base/threading/BUILD.gn
@@ -0,0 +1,33 @@
+# Copyright (C) 2023 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import("../../../gn/test.gni")
+
+source_set("threading") {
+ deps = [ "../../../gn:default_deps" ]
+ public_deps = [ "../../../include/perfetto/ext/base/threading" ]
+ sources = [ "thread_pool.cc" ]
+}
+
+perfetto_unittest_source_set("unittests") {
+ testonly = true
+ deps = [
+ ":threading",
+ "..:base",
+ "../../../gn:default_deps",
+ "../../../gn:gtest_and_gmock",
+ ]
+
+ sources = [ "thread_pool_unittest.cc" ]
+}
diff --git a/src/base/threading/thread_pool.cc b/src/base/threading/thread_pool.cc
new file mode 100644
index 0000000..2097126
--- /dev/null
+++ b/src/base/threading/thread_pool.cc
@@ -0,0 +1,75 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "perfetto/ext/base/threading/thread_pool.h"
+#include <mutex>
+#include <thread>
+
+namespace perfetto {
+namespace base {
+
+ThreadPool::ThreadPool(uint32_t thread_count) {
+ for (uint32_t i = 0; i < thread_count; ++i) {
+ threads_.emplace_back(std::bind(&ThreadPool::RunThreadLoop, this));
+ }
+}
+
+ThreadPool::~ThreadPool() {
+ {
+ std::lock_guard<std::mutex> guard(mutex_);
+ quit_ = true;
+ }
+ thread_waiter_.notify_all();
+ for (auto& thread : threads_) {
+ thread.join();
+ }
+}
+
+void ThreadPool::PostTask(std::function<void()> fn) {
+ {
+ std::lock_guard<std::mutex> guard(mutex_);
+ pending_tasks_.emplace_back(std::move(fn));
+ if (thread_waiting_count_ == 0) {
+ return;
+ }
+ }
+ thread_waiter_.notify_one();
+}
+
+void ThreadPool::RunThreadLoop() {
+ for (;;) {
+ std::function<void()> fn;
+ {
+ std::unique_lock<std::mutex> guard(mutex_);
+ if (quit_) {
+ return;
+ }
+ if (pending_tasks_.empty()) {
+ thread_waiting_count_++;
+ thread_waiter_.wait(
+ guard, [this]() { return quit_ || !pending_tasks_.empty(); });
+ thread_waiting_count_--;
+ continue;
+ }
+ fn = std::move(pending_tasks_.front());
+ pending_tasks_.pop_front();
+ }
+ fn();
+ }
+}
+
+} // namespace base
+} // namespace perfetto
diff --git a/src/base/threading/thread_pool_unittest.cc b/src/base/threading/thread_pool_unittest.cc
new file mode 100644
index 0000000..f4aa0f7
--- /dev/null
+++ b/src/base/threading/thread_pool_unittest.cc
@@ -0,0 +1,107 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "perfetto/ext/base//threading/thread_pool.h"
+#include <atomic>
+#include <condition_variable>
+#include <mutex>
+
+#include "perfetto/ext/base/waitable_event.h"
+#include "test/gtest_and_gmock.h"
+
+namespace perfetto {
+namespace base {
+namespace {
+
+struct ThreadLatch {
+ base::WaitableEvent notify;
+ base::WaitableEvent wait;
+ bool task_started = false;
+};
+
+TEST(ThreadPoolTest, SequentialQueueing) {
+ ThreadLatch first;
+ ThreadLatch second;
+ base::ThreadPool pool(1);
+
+ pool.PostTask([&first] {
+ first.task_started = true;
+ first.notify.Notify();
+ first.wait.Wait();
+ });
+
+ pool.PostTask([&second] {
+ second.task_started = true;
+ second.notify.Notify();
+ second.wait.Wait();
+ });
+
+ first.notify.Wait();
+ ASSERT_TRUE(first.task_started);
+ ASSERT_FALSE(second.task_started);
+ first.wait.Notify();
+
+ second.notify.Wait();
+ ASSERT_TRUE(second.task_started);
+ second.wait.Notify();
+}
+
+TEST(ThreadPoolTest, ParallelSecondFinishFirst) {
+ base::ThreadPool pool(2);
+
+ ThreadLatch first;
+ pool.PostTask([&first] {
+ first.wait.Wait();
+ first.task_started = true;
+ first.notify.Notify();
+ });
+
+ ThreadLatch second;
+ pool.PostTask([&second] {
+ second.wait.Wait();
+ second.task_started = true;
+ second.notify.Notify();
+ });
+
+ second.wait.Notify();
+ second.notify.Wait();
+ ASSERT_TRUE(second.task_started);
+
+ first.wait.Notify();
+ first.notify.Wait();
+ ASSERT_TRUE(first.task_started);
+}
+
+TEST(ThreadPoolTest, StressTest) {
+ std::atomic<uint32_t> atomic(0);
+ std::condition_variable cv;
+ base::ThreadPool pool(128);
+ for (uint32_t i = 0; i < 1024; ++i) {
+ pool.PostTask([&atomic, &cv] {
+ if (atomic.fetch_add(1) == 1023) {
+ cv.notify_one();
+ }
+ });
+ }
+
+ std::mutex mu;
+ std::unique_lock<std::mutex> lock(mu);
+ cv.wait(lock, [&atomic]() { return atomic.load() == 1024u; });
+}
+
+} // namespace
+} // namespace base
+} // namespace perfetto