Revert "perfetto: merge TaskRunnerThread and base::ThreadTaskRunner"

This reverts commit 2cc2bd6c78ce431cb579845fbab8e1d176f78b2b.

Reason for revert: broke the build

Change-Id: I4d1fc12ff964717b39d30930bff5a4d4998d87ab
diff --git a/Android.bp b/Android.bp
index cea7f13..2b79c85 100644
--- a/Android.bp
+++ b/Android.bp
@@ -773,6 +773,8 @@
     ":perfetto_src_tracing_ipc",
     ":perfetto_src_tracing_tracing",
     ":perfetto_test_end_to_end_integrationtests",
+    ":perfetto_test_task_runner_thread",
+    ":perfetto_test_task_runner_thread_delegates",
     ":perfetto_test_test_helper",
     "test/cts/device_feature_test_cts.cc",
     "test/cts/end_to_end_integrationtest_cts.cc",
@@ -1127,6 +1129,8 @@
     ":perfetto_src_tracing_test_api_test_support",
     ":perfetto_src_tracing_tracing",
     ":perfetto_test_end_to_end_integrationtests",
+    ":perfetto_test_task_runner_thread",
+    ":perfetto_test_task_runner_thread_delegates",
     ":perfetto_test_test_helper",
   ],
   shared_libs: [
@@ -6351,11 +6355,27 @@
   ],
 }
 
+// GN: //test:task_runner_thread
+filegroup {
+  name: "perfetto_test_task_runner_thread",
+  srcs: [
+    "test/task_runner_thread.cc",
+  ],
+}
+
+// GN: //test:task_runner_thread_delegates
+filegroup {
+  name: "perfetto_test_task_runner_thread_delegates",
+  srcs: [
+    "test/fake_producer.cc",
+    "test/task_runner_thread_delegates.cc",
+  ],
+}
+
 // GN: //test:test_helper
 filegroup {
   name: "perfetto_test_test_helper",
   srcs: [
-    "test/fake_producer.cc",
     "test/test_helper.cc",
   ],
 }
diff --git a/include/perfetto/ext/base/thread_task_runner.h b/include/perfetto/ext/base/thread_task_runner.h
index 6579fa5..fac4553 100644
--- a/include/perfetto/ext/base/thread_task_runner.h
+++ b/include/perfetto/ext/base/thread_task_runner.h
@@ -34,9 +34,7 @@
 //
 class ThreadTaskRunner {
  public:
-  static ThreadTaskRunner CreateAndStart(const std::string& name = "") {
-    return ThreadTaskRunner(name);
-  }
+  static ThreadTaskRunner CreateAndStart() { return ThreadTaskRunner(); }
 
   ThreadTaskRunner(const ThreadTaskRunner&) = delete;
   ThreadTaskRunner& operator=(const ThreadTaskRunner&) = delete;
@@ -45,14 +43,6 @@
   ThreadTaskRunner& operator=(ThreadTaskRunner&&);
   ~ThreadTaskRunner();
 
-  // Executes the given function on the task runner thread and blocks the caller
-  // thread until the function has run.
-  void PostTaskAndWaitForTesting(std::function<void()>);
-
-  // Can be called from another thread to get the CPU time of the thread the
-  // task-runner is executing on.
-  uint64_t GetThreadCPUTimeNsForTesting();
-
   // Returns a pointer to the UnixTaskRunner, which is valid for the lifetime of
   // this ThreadTaskRunner object (unless this object is moved-from, in which
   // case the pointer remains valid for the lifetime of the new owning
@@ -63,11 +53,10 @@
   UnixTaskRunner* get() const { return task_runner_; }
 
  private:
-  explicit ThreadTaskRunner(const std::string& name);
+  ThreadTaskRunner();
   void RunTaskThread(std::function<void(UnixTaskRunner*)> initializer);
 
   std::thread thread_;
-  std::string name_;
   UnixTaskRunner* task_runner_ = nullptr;
 };
 
diff --git a/src/base/thread_task_runner.cc b/src/base/thread_task_runner.cc
index af89e8c..0576ee9 100644
--- a/src/base/thread_task_runner.cc
+++ b/src/base/thread_task_runner.cc
@@ -19,7 +19,6 @@
 
 #include "perfetto/ext/base/thread_task_runner.h"
 
-#include <sys/prctl.h>
 #include <condition_variable>
 #include <functional>
 #include <mutex>
@@ -53,7 +52,7 @@
     thread_.join();
 }
 
-ThreadTaskRunner::ThreadTaskRunner(const std::string& name) : name_(name) {
+ThreadTaskRunner::ThreadTaskRunner() {
   std::mutex init_lock;
   std::condition_variable init_cv;
 
@@ -67,7 +66,6 @@
         // notifying).
         init_cv.notify_one();
       };
-
   thread_ = std::thread(&ThreadTaskRunner::RunTaskThread, this,
                         std::move(initializer));
 
@@ -77,43 +75,11 @@
 
 void ThreadTaskRunner::RunTaskThread(
     std::function<void(UnixTaskRunner*)> initializer) {
-  if (!name_.empty()) {
-#if PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
-    pthread_setname_np(name_.c_str());
-#else
-    prctl(PR_SET_NAME, name_.c_str());
-#endif
-  }
-
   UnixTaskRunner task_runner;
   task_runner.PostTask(std::bind(std::move(initializer), &task_runner));
   task_runner.Run();
 }
 
-void ThreadTaskRunner::PostTaskAndWaitForTesting(std::function<void()> fn) {
-  std::mutex mutex;
-  std::condition_variable cv;
-
-  std::unique_lock<std::mutex> lock(mutex);
-  bool done = false;
-  task_runner_->PostTask([&mutex, &cv, &done, &fn] {
-    fn();
-
-    std::lock_guard<std::mutex> inner_lock(mutex);
-    done = true;
-    cv.notify_one();
-  });
-  cv.wait(lock, [&done] { return done; });
-}
-
-uint64_t ThreadTaskRunner::GetThreadCPUTimeNsForTesting() {
-  uint64_t thread_time_ns = 0;
-  PostTaskAndWaitForTesting([&thread_time_ns] {
-    thread_time_ns = static_cast<uint64_t>(base::GetThreadCPUTimeNs().count());
-  });
-  return thread_time_ns;
-}
-
 }  // namespace base
 }  // namespace perfetto
 
diff --git a/src/profiling/memory/heapprofd_end_to_end_test.cc b/src/profiling/memory/heapprofd_end_to_end_test.cc
index 96d4f50..d698650 100644
--- a/src/profiling/memory/heapprofd_end_to_end_test.cc
+++ b/src/profiling/memory/heapprofd_end_to_end_test.cc
@@ -47,6 +47,23 @@
 using ::testing::Bool;
 using ::testing::Eq;
 
+class HeapprofdDelegate : public ThreadDelegate {
+ public:
+  HeapprofdDelegate(const std::string& producer_socket)
+      : producer_socket_(producer_socket) {}
+  ~HeapprofdDelegate() override = default;
+
+  void Initialize(base::TaskRunner* task_runner) override {
+    producer_.reset(
+        new HeapprofdProducer(HeapprofdMode::kCentral, task_runner));
+    producer_->ConnectWithRetries(producer_socket_.c_str());
+  }
+
+ private:
+  std::string producer_socket_;
+  std::unique_ptr<HeapprofdProducer> producer_;
+};
+
 constexpr const char* kHeapprofdModeProperty = "heapprofd.userdebug.mode";
 
 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
diff --git a/test/BUILD.gn b/test/BUILD.gn
index 0fe00be..3388f02 100644
--- a/test/BUILD.gn
+++ b/test/BUILD.gn
@@ -19,6 +19,8 @@
 source_set("end_to_end_integrationtests") {
   testonly = true
   deps = [
+    ":task_runner_thread",
+    ":task_runner_thread_delegates",
     ":test_helper",
     "../gn:default_deps",
     "../gn:gtest_and_gmock",
@@ -71,6 +73,8 @@
   ]
   testonly = true
   deps = [
+    ":task_runner_thread",
+    ":task_runner_thread_delegates",
     ":test_helper",
     "../gn:default_deps",
     "../protos/perfetto/trace:zero",
@@ -94,17 +98,26 @@
   ]
 }
 
-source_set("test_helper") {
+source_set("task_runner_thread") {
   testonly = true
-  public_deps = [
-    "../protos/perfetto/trace:cpp",
-    "../src/tracing:ipc",
-  ]
   deps = [
     "../gn:default_deps",
+    "../src/base",
+    "../src/base:test_support",
+  ]
+  sources = [
+    "task_runner_thread.cc",
+    "task_runner_thread.h",
+  ]
+}
+
+source_set("task_runner_thread_delegates") {
+  testonly = true
+  deps = [
+    ":task_runner_thread",
+    "../gn:default_deps",
     "../include/perfetto/ext/traced",
     "../protos/perfetto/config:cpp",
-    "../protos/perfetto/trace:zero",
     "../src/base:test_support",
     "../src/traced/probes:probes_src",
     "../src/tracing:ipc",
@@ -112,6 +125,26 @@
   sources = [
     "fake_producer.cc",
     "fake_producer.h",
+    "task_runner_thread_delegates.cc",
+    "task_runner_thread_delegates.h",
+  ]
+}
+
+source_set("test_helper") {
+  testonly = true
+  public_deps = [
+    "../protos/perfetto/trace:cpp",
+    "../src/tracing:ipc",
+  ]
+  deps = [
+    ":task_runner_thread",
+    ":task_runner_thread_delegates",
+    "../gn:default_deps",
+    "../include/perfetto/ext/traced",
+    "../protos/perfetto/trace:zero",
+    "../src/base:test_support",
+  ]
+  sources = [
     "test_helper.cc",
     "test_helper.h",
   ]
@@ -124,6 +157,8 @@
   source_set("end_to_end_benchmarks") {
     testonly = true
     deps = [
+      ":task_runner_thread",
+      ":task_runner_thread_delegates",
       ":test_helper",
       "../gn:benchmark",
       "../gn:default_deps",
diff --git a/test/end_to_end_benchmark.cc b/test/end_to_end_benchmark.cc
index ff1fe74..0668487 100644
--- a/test/end_to_end_benchmark.cc
+++ b/test/end_to_end_benchmark.cc
@@ -22,6 +22,8 @@
 #include "perfetto/tracing/core/trace_config.h"
 #include "src/base/test/test_task_runner.h"
 #include "test/gtest_and_gmock.h"
+#include "test/task_runner_thread.h"
+#include "test/task_runner_thread_delegates.h"
 #include "test/test_helper.h"
 
 #include "protos/perfetto/config/test_config.gen.h"
@@ -71,10 +73,8 @@
   helper.WaitForProducerEnabled();
 
   uint64_t wall_start_ns = static_cast<uint64_t>(base::GetWallTimeNs().count());
-  uint64_t service_start_ns =
-      helper.service_thread()->GetThreadCPUTimeNsForTesting();
-  uint64_t producer_start_ns =
-      helper.producer_thread()->GetThreadCPUTimeNsForTesting();
+  uint64_t service_start_ns = helper.service_thread()->GetThreadCPUTimeNs();
+  uint64_t producer_start_ns = helper.producer_thread()->GetThreadCPUTimeNs();
   uint32_t iterations = 0;
   for (auto _ : state) {
     auto cname = "produced.and.committed." + std::to_string(iterations++);
@@ -83,11 +83,9 @@
     task_runner.RunUntilCheckpoint(cname, time_for_messages_ms);
   }
   uint64_t service_ns =
-      helper.service_thread()->GetThreadCPUTimeNsForTesting() -
-      service_start_ns;
+      helper.service_thread()->GetThreadCPUTimeNs() - service_start_ns;
   uint64_t producer_ns =
-      helper.producer_thread()->GetThreadCPUTimeNsForTesting() -
-      producer_start_ns;
+      helper.producer_thread()->GetThreadCPUTimeNs() - producer_start_ns;
   uint64_t wall_ns =
       static_cast<uint64_t>(base::GetWallTimeNs().count()) - wall_start_ns;
 
@@ -152,8 +150,8 @@
   helper.WaitForProducerEnabled();
 
   uint64_t wall_start_ns = static_cast<uint64_t>(base::GetWallTimeNs().count());
-  uint64_t service_start_ns = static_cast<uint64_t>(
-      helper.service_thread()->GetThreadCPUTimeNsForTesting());
+  uint64_t service_start_ns =
+      static_cast<uint64_t>(helper.service_thread()->GetThreadCPUTimeNs());
   uint64_t consumer_start_ns =
       static_cast<uint64_t>(base::GetThreadCPUTimeNs().count());
   uint64_t read_time_taken_ns = 0;
@@ -195,8 +193,7 @@
     }
   }
   uint64_t service_ns =
-      helper.service_thread()->GetThreadCPUTimeNsForTesting() -
-      service_start_ns;
+      helper.service_thread()->GetThreadCPUTimeNs() - service_start_ns;
   uint64_t consumer_ns =
       static_cast<uint64_t>(base::GetThreadCPUTimeNs().count()) -
       consumer_start_ns;
diff --git a/test/end_to_end_integrationtest.cc b/test/end_to_end_integrationtest.cc
index 52947df..209693d 100644
--- a/test/end_to_end_integrationtest.cc
+++ b/test/end_to_end_integrationtest.cc
@@ -35,6 +35,8 @@
 #include "src/traced/probes/ftrace/ftrace_controller.h"
 #include "src/traced/probes/ftrace/ftrace_procfs.h"
 #include "test/gtest_and_gmock.h"
+#include "test/task_runner_thread.h"
+#include "test/task_runner_thread_delegates.h"
 #include "test/test_helper.h"
 
 #include "protos/perfetto/config/power/android_power_config.pbzero.h"
@@ -360,8 +362,9 @@
   helper.StartServiceIfRequired();
 
 #if PERFETTO_BUILDFLAG(PERFETTO_START_DAEMONS)
-  ProbesProducerThread probes(TEST_PRODUCER_SOCK_NAME);
-  probes.Connect();
+  TaskRunnerThread producer_thread("perfetto.prd");
+  producer_thread.Start(std::unique_ptr<ProbesProducerDelegate>(
+      new ProbesProducerDelegate(TEST_PRODUCER_SOCK_NAME)));
 #endif
 
   helper.ConnectConsumer();
@@ -405,8 +408,9 @@
   helper.StartServiceIfRequired();
 
 #if PERFETTO_BUILDFLAG(PERFETTO_START_DAEMONS)
-  ProbesProducerThread probes(TEST_PRODUCER_SOCK_NAME);
-  probes.Connect();
+  TaskRunnerThread producer_thread("perfetto.prd");
+  producer_thread.Start(std::unique_ptr<ProbesProducerDelegate>(
+      new ProbesProducerDelegate(TEST_PRODUCER_SOCK_NAME)));
 #endif
 
   helper.ConnectConsumer();
@@ -463,8 +467,9 @@
   helper.StartServiceIfRequired();
 
 #if PERFETTO_BUILDFLAG(PERFETTO_START_DAEMONS)
-  ProbesProducerThread probes(TEST_PRODUCER_SOCK_NAME);
-  probes.Connect();
+  TaskRunnerThread producer_thread("perfetto.prd");
+  producer_thread.Start(std::unique_ptr<ProbesProducerDelegate>(
+      new ProbesProducerDelegate(TEST_PRODUCER_SOCK_NAME)));
 #else
   base::ignore_result(TEST_PRODUCER_SOCK_NAME);
 #endif
diff --git a/test/end_to_end_shared_memory_fuzzer.cc b/test/end_to_end_shared_memory_fuzzer.cc
index ce53acf..052f888 100644
--- a/test/end_to_end_shared_memory_fuzzer.cc
+++ b/test/end_to_end_shared_memory_fuzzer.cc
@@ -30,6 +30,8 @@
 #include "perfetto/tracing/core/data_source_descriptor.h"
 #include "protos/perfetto/trace/test_event.pbzero.h"
 #include "src/base/test/test_task_runner.h"
+#include "test/task_runner_thread.h"
+#include "test/task_runner_thread_delegates.h"
 #include "test/test_helper.h"
 
 #include "protos/perfetto/trace/trace_packet.pbzero.h"
@@ -107,33 +109,23 @@
   std::function<void()> on_produced_and_committed_;
 };
 
-class FuzzerFakeProducerThread {
+class FakeProducerDelegate : public ThreadDelegate {
  public:
-  FuzzerFakeProducerThread(const uint8_t* data,
-                           size_t size,
-                           std::function<void()> on_produced_and_committed)
+  FakeProducerDelegate(const uint8_t* data,
+                       size_t size,
+                       std::function<void()> on_produced_and_committed)
       : data_(data),
         size_(size),
         on_produced_and_committed_(on_produced_and_committed) {}
+  ~FakeProducerDelegate() override = default;
 
-  ~FuzzerFakeProducerThread() {
-    if (!runner_)
-      return;
-    runner_->PostTaskAndWaitForTesting([this]() { producer_.reset(); });
-  }
-
-  void Connect() {
-    runner_ = base::ThreadTaskRunner::CreateAndStart("perfetto.prd.fake");
-    runner_->PostTaskAndWaitForTesting([this]() {
-      producer_.reset(new FakeProducer("android.perfetto.FakeProducer", data_,
-                                       size_, on_produced_and_committed_));
-      producer_->Connect(TEST_PRODUCER_SOCK_NAME, runner_->get());
-    });
+  void Initialize(base::TaskRunner* task_runner) override {
+    producer_.reset(new FakeProducer("android.perfetto.FakeProducer", data_,
+                                     size_, on_produced_and_committed_));
+    producer_->Connect(TEST_PRODUCER_SOCK_NAME, task_runner);
   }
 
  private:
-  base::Optional<base::ThreadTaskRunner> runner_;  // Keep first.
-
   std::unique_ptr<FakeProducer> producer_;
   const uint8_t* data_;
   const size_t size_;
@@ -148,10 +140,11 @@
   TestHelper helper(&task_runner);
   helper.StartServiceIfRequired();
 
-  auto cp =
-      helper.WrapTask(task_runner.CreateCheckpoint("produced.and.committed"));
-  FuzzerFakeProducerThread producer_thread(data, size, cp);
-  producer_thread.Connect();
+  TaskRunnerThread producer_thread("perfetto.prd");
+  producer_thread.Start(std::unique_ptr<FakeProducerDelegate>(
+      new FakeProducerDelegate(data, size,
+                               helper.WrapTask(task_runner.CreateCheckpoint(
+                                   "produced.and.committed")))));
 
   helper.ConnectConsumer();
   helper.WaitForConsumerConnect();
diff --git a/test/fake_producer.cc b/test/fake_producer.cc
index 730ec00..17f80f0 100644
--- a/test/fake_producer.cc
+++ b/test/fake_producer.cc
@@ -16,6 +16,7 @@
 
 #include "test/fake_producer.h"
 
+#include <condition_variable>
 #include <mutex>
 
 #include "perfetto/base/logging.h"
diff --git a/test/task_runner_thread.cc b/test/task_runner_thread.cc
new file mode 100644
index 0000000..c625078
--- /dev/null
+++ b/test/task_runner_thread.cc
@@ -0,0 +1,116 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <pthread.h>
+#include <stdlib.h>
+#include <sys/syscall.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <condition_variable>
+#include <thread>
+
+#include "perfetto/base/time.h"
+#include "perfetto/ext/base/file_utils.h"
+#include "perfetto/ext/base/string_splitter.h"
+#include "test/task_runner_thread.h"
+
+namespace perfetto {
+
+TaskRunnerThread::TaskRunnerThread(const char* name) : name_(name) {}
+TaskRunnerThread::~TaskRunnerThread() {
+  Stop();
+}
+
+void TaskRunnerThread::Start(std::unique_ptr<ThreadDelegate> delegate) {
+  // Begin holding the lock for the condition variable.
+  std::unique_lock<std::mutex> lock(mutex_);
+
+  // Start the thread.
+  PERFETTO_DCHECK(!runner_);
+  thread_ = std::thread(&TaskRunnerThread::Run, this, std::move(delegate));
+
+  // Wait for runner to be ready.
+  ready_.wait_for(lock, std::chrono::seconds(10),
+                  [this]() { return runner_ != nullptr; });
+}
+
+void TaskRunnerThread::Stop() {
+  {
+    std::unique_lock<std::mutex> lock(mutex_);
+    if (runner_)
+      runner_->Quit();
+  }
+
+  if (thread_.joinable())
+    thread_.join();
+}
+
+uint64_t TaskRunnerThread::GetThreadCPUTimeNs() {
+  std::condition_variable cv;
+  std::unique_lock<std::mutex> lock(mutex_);
+  uint64_t thread_time_ns = 0;
+
+  if (!runner_)
+    return 0;
+
+  runner_->PostTask([this, &thread_time_ns, &cv] {
+    std::unique_lock<std::mutex> inner_lock(mutex_);
+    thread_time_ns = static_cast<uint64_t>(base::GetThreadCPUTimeNs().count());
+    cv.notify_one();
+  });
+
+  cv.wait(lock, [&thread_time_ns] { return thread_time_ns != 0; });
+  return thread_time_ns;
+}
+
+void TaskRunnerThread::Run(std::unique_ptr<ThreadDelegate> delegate) {
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
+  pthread_setname_np(name_);
+#else
+  pthread_setname_np(pthread_self(), name_);
+#endif
+
+  // Create the task runner and execute the specicalised code.
+  base::UnixTaskRunner task_runner;
+  delegate->Initialize(&task_runner);
+
+  // Pass the runner back to the main thread.
+  {
+    std::unique_lock<std::mutex> lock(mutex_);
+    runner_ = &task_runner;
+  }
+
+  // Notify the main thread that the runner is ready.
+  ready_.notify_one();
+
+  // Spin the loop.
+  task_runner.Run();
+
+  // Ensure we clear out the delegate before runner goes out
+  // of scope.
+  delegate.reset();
+
+  // Cleanup the runner.
+  {
+    std::unique_lock<std::mutex> lock(mutex_);
+    runner_ = nullptr;
+  }
+}
+
+ThreadDelegate::~ThreadDelegate() = default;
+
+}  // namespace perfetto
diff --git a/test/task_runner_thread.h b/test/task_runner_thread.h
new file mode 100644
index 0000000..354c308
--- /dev/null
+++ b/test/task_runner_thread.h
@@ -0,0 +1,69 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef TEST_TASK_RUNNER_THREAD_H_
+#define TEST_TASK_RUNNER_THREAD_H_
+
+#include <condition_variable>
+#include <mutex>
+#include <thread>
+
+#include "perfetto/base/task_runner.h"
+#include "src/base/test/test_task_runner.h"
+
+namespace perfetto {
+
+// Used to perform initialization work on a background TaskRunnerThread.
+class ThreadDelegate {
+ public:
+  virtual ~ThreadDelegate();
+
+  // Invoked on the target thread before the message loop is started.
+  virtual void Initialize(base::TaskRunner* task_runner) = 0;
+};
+
+// Background thread which spins a task runner until completed or the thread is
+// destroyed. If the thread is destroyed before the task runner completes, the
+// task runner is quit and the thread is joined.
+class TaskRunnerThread {
+ public:
+  explicit TaskRunnerThread(const char* name);
+  ~TaskRunnerThread();
+
+  // Blocks until the thread has been created and Initialize() has been
+  // called.
+  void Start(std::unique_ptr<ThreadDelegate> delegate);
+
+  // Blocks until the thread has been stopped and joined.
+  void Stop();
+
+  uint64_t GetThreadCPUTimeNs();
+
+ private:
+  void Run(std::unique_ptr<ThreadDelegate> delegate);
+
+  const char* const name_;
+  std::thread thread_;
+  std::condition_variable ready_;
+
+  // All variables below this point are protected by |mutex_|.
+  std::mutex mutex_;
+  base::UnixTaskRunner* runner_ = nullptr;
+};
+
+}  // namespace perfetto
+
+#endif  // TEST_TASK_RUNNER_THREAD_H_
diff --git a/test/task_runner_thread_delegates.cc b/test/task_runner_thread_delegates.cc
new file mode 100644
index 0000000..291482f
--- /dev/null
+++ b/test/task_runner_thread_delegates.cc
@@ -0,0 +1,25 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "test/task_runner_thread_delegates.h"
+
+namespace perfetto {
+
+ServiceDelegate::~ServiceDelegate() = default;
+ProbesProducerDelegate::~ProbesProducerDelegate() = default;
+FakeProducerDelegate::~FakeProducerDelegate() = default;
+
+}  // namespace perfetto
diff --git a/test/task_runner_thread_delegates.h b/test/task_runner_thread_delegates.h
new file mode 100644
index 0000000..bea384a
--- /dev/null
+++ b/test/task_runner_thread_delegates.h
@@ -0,0 +1,91 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef TEST_TASK_RUNNER_THREAD_DELEGATES_H_
+#define TEST_TASK_RUNNER_THREAD_DELEGATES_H_
+
+#include "perfetto/ext/tracing/ipc/service_ipc_host.h"
+#include "src/traced/probes/probes_producer.h"
+#include "test/fake_producer.h"
+#include "test/task_runner_thread.h"
+
+namespace perfetto {
+// This is used only in daemon starting integrations tests.
+class ServiceDelegate : public ThreadDelegate {
+ public:
+  ServiceDelegate(const std::string& producer_socket,
+                  const std::string& consumer_socket)
+      : producer_socket_(producer_socket), consumer_socket_(consumer_socket) {}
+  ~ServiceDelegate() override;
+
+  void Initialize(base::TaskRunner* task_runner) override {
+    svc_ = ServiceIPCHost::CreateInstance(task_runner);
+    unlink(producer_socket_.c_str());
+    unlink(consumer_socket_.c_str());
+    svc_->Start(producer_socket_.c_str(), consumer_socket_.c_str());
+  }
+
+ private:
+  std::string producer_socket_;
+  std::string consumer_socket_;
+  std::unique_ptr<ServiceIPCHost> svc_;
+};
+
+// This is used only in daemon starting integrations tests.
+class ProbesProducerDelegate : public ThreadDelegate {
+ public:
+  ProbesProducerDelegate(const std::string& producer_socket)
+      : producer_socket_(producer_socket) {}
+  ~ProbesProducerDelegate() override;
+
+  void Initialize(base::TaskRunner* task_runner) override {
+    producer_.reset(new ProbesProducer);
+    producer_->ConnectWithRetries(producer_socket_.c_str(), task_runner);
+  }
+
+ private:
+  std::string producer_socket_;
+  std::unique_ptr<ProbesProducer> producer_;
+};
+
+class FakeProducerDelegate : public ThreadDelegate {
+ public:
+  FakeProducerDelegate(const std::string& producer_socket,
+                       std::function<void()> setup_callback,
+                       std::function<void()> connect_callback)
+      : producer_socket_(producer_socket),
+        setup_callback_(std::move(setup_callback)),
+        connect_callback_(std::move(connect_callback)) {}
+  ~FakeProducerDelegate() override;
+
+  void Initialize(base::TaskRunner* task_runner) override {
+    producer_.reset(new FakeProducer("android.perfetto.FakeProducer"));
+    producer_->Connect(producer_socket_.c_str(), task_runner,
+                       std::move(setup_callback_),
+                       std::move(connect_callback_));
+  }
+
+  FakeProducer* producer() { return producer_.get(); }
+
+ private:
+  std::string producer_socket_;
+  std::unique_ptr<FakeProducer> producer_;
+  std::function<void()> setup_callback_;
+  std::function<void()> connect_callback_;
+};
+}  // namespace perfetto
+
+#endif  // TEST_TASK_RUNNER_THREAD_DELEGATES_H_
diff --git a/test/test_helper.cc b/test/test_helper.cc
index 51d9cb7..e716c07 100644
--- a/test/test_helper.cc
+++ b/test/test_helper.cc
@@ -18,6 +18,7 @@
 
 #include "perfetto/ext/traced/traced.h"
 #include "perfetto/ext/tracing/core/trace_packet.h"
+#include "test/task_runner_thread_delegates.h"
 #include "perfetto/ext/tracing/ipc/default_socket.h"
 
 #include "protos/perfetto/trace/trace_packet.pbzero.h"
@@ -40,10 +41,8 @@
 TestHelper::TestHelper(base::TestTaskRunner* task_runner)
     : instance_num_(next_instance_num_++),
       task_runner_(task_runner),
-      service_thread_(TEST_PRODUCER_SOCK_NAME, TEST_CONSUMER_SOCK_NAME),
-      fake_producer_thread_(TEST_PRODUCER_SOCK_NAME,
-                            WrapTask(CreateCheckpoint("producer.setup")),
-                            WrapTask(CreateCheckpoint("producer.enabled"))) {}
+      service_thread_("perfetto.svc"),
+      producer_thread_("perfetto.prd") {}
 
 void TestHelper::OnConnect() {
   std::move(on_connect_callback_)();
@@ -78,13 +77,19 @@
 
 void TestHelper::StartServiceIfRequired() {
 #if PERFETTO_BUILDFLAG(PERFETTO_START_DAEMONS)
-  service_thread_.Start();
+  service_thread_.Start(std::unique_ptr<ServiceDelegate>(
+      new ServiceDelegate(TEST_PRODUCER_SOCK_NAME, TEST_CONSUMER_SOCK_NAME)));
 #endif
 }
 
 FakeProducer* TestHelper::ConnectFakeProducer() {
-  fake_producer_thread_.Connect();
-  return fake_producer_thread_.producer();
+  std::unique_ptr<FakeProducerDelegate> producer_delegate(
+      new FakeProducerDelegate(TEST_PRODUCER_SOCK_NAME,
+                               WrapTask(CreateCheckpoint("producer.setup")),
+                               WrapTask(CreateCheckpoint("producer.enabled"))));
+  FakeProducerDelegate* producer_delegate_cached = producer_delegate.get();
+  producer_thread_.Start(std::move(producer_delegate));
+  return producer_delegate_cached->producer();
 }
 
 void TestHelper::ConnectConsumer() {
diff --git a/test/test_helper.h b/test/test_helper.h
index e18531e..11897e5 100644
--- a/test/test_helper.h
+++ b/test/test_helper.h
@@ -18,121 +18,18 @@
 #define TEST_TEST_HELPER_H_
 
 #include "perfetto/ext/base/scoped_file.h"
-#include "perfetto/ext/base/thread_task_runner.h"
 #include "perfetto/ext/tracing/core/consumer.h"
 #include "perfetto/ext/tracing/core/trace_packet.h"
 #include "perfetto/ext/tracing/ipc/consumer_ipc_client.h"
-#include "perfetto/ext/tracing/ipc/service_ipc_host.h"
 #include "perfetto/tracing/core/trace_config.h"
 #include "src/base/test/test_task_runner.h"
-#include "src/traced/probes/probes_producer.h"
 #include "test/fake_producer.h"
+#include "test/task_runner_thread.h"
 
 #include "protos/perfetto/trace/trace_packet.gen.h"
 
 namespace perfetto {
 
-// This is used only in daemon starting integrations tests.
-class ServiceThread {
- public:
-  ServiceThread(const std::string& producer_socket,
-                const std::string& consumer_socket)
-      : producer_socket_(producer_socket), consumer_socket_(consumer_socket) {}
-
-  ~ServiceThread() {
-    if (!runner_)
-      return;
-    runner_->PostTaskAndWaitForTesting([this]() { svc_.reset(); });
-  }
-
-  void Start() {
-    runner_ = base::ThreadTaskRunner::CreateAndStart("perfetto.svc");
-    runner_->PostTaskAndWaitForTesting([this]() {
-      svc_ = ServiceIPCHost::CreateInstance(runner_->get());
-      unlink(producer_socket_.c_str());
-      unlink(consumer_socket_.c_str());
-
-      bool res =
-          svc_->Start(producer_socket_.c_str(), consumer_socket_.c_str());
-      PERFETTO_CHECK(res);
-    });
-  }
-
-  base::ThreadTaskRunner* runner() { return runner_ ? &*runner_ : nullptr; }
-
- private:
-  base::Optional<base::ThreadTaskRunner> runner_;  // Keep first.
-
-  std::string producer_socket_;
-  std::string consumer_socket_;
-  std::unique_ptr<ServiceIPCHost> svc_;
-};
-
-// This is used only in daemon starting integrations tests.
-class ProbesProducerThread {
- public:
-  ProbesProducerThread(const std::string& producer_socket)
-      : producer_socket_(producer_socket) {}
-
-  ~ProbesProducerThread() {
-    if (!runner_)
-      return;
-    runner_->PostTaskAndWaitForTesting([this]() { producer_.reset(); });
-  }
-
-  void Connect() {
-    runner_ = base::ThreadTaskRunner::CreateAndStart("perfetto.prd.probes");
-    runner_->PostTaskAndWaitForTesting([this]() {
-      producer_.reset(new ProbesProducer());
-      producer_->ConnectWithRetries(producer_socket_.c_str(), runner_->get());
-    });
-  }
-
- private:
-  base::Optional<base::ThreadTaskRunner> runner_;  // Keep first.
-
-  std::string producer_socket_;
-  std::unique_ptr<ProbesProducer> producer_;
-};
-
-class FakeProducerThread {
- public:
-  FakeProducerThread(const std::string& producer_socket,
-                     std::function<void()> setup_callback,
-                     std::function<void()> connect_callback)
-      : producer_socket_(producer_socket),
-        setup_callback_(std::move(setup_callback)),
-        connect_callback_(std::move(connect_callback)) {}
-
-  ~FakeProducerThread() {
-    if (!runner_)
-      return;
-    runner_->PostTaskAndWaitForTesting([this]() { producer_.reset(); });
-  }
-
-  void Connect() {
-    runner_ = base::ThreadTaskRunner::CreateAndStart("perfetto.prd.fake");
-    runner_->PostTaskAndWaitForTesting([this]() {
-      producer_.reset(new FakeProducer("android.perfetto.FakeProducer"));
-      producer_->Connect(producer_socket_.c_str(), runner_->get(),
-                         std::move(setup_callback_),
-                         std::move(connect_callback_));
-    });
-  }
-
-  base::ThreadTaskRunner* runner() { return runner_ ? &*runner_ : nullptr; }
-
-  FakeProducer* producer() { return producer_.get(); }
-
- private:
-  base::Optional<base::ThreadTaskRunner> runner_;  // Keep first.
-
-  std::string producer_socket_;
-  std::unique_ptr<FakeProducer> producer_;
-  std::function<void()> setup_callback_;
-  std::function<void()> connect_callback_;
-};
-
 class TestHelper : public Consumer {
  public:
   static const char* GetConsumerSocketName();
@@ -182,10 +79,8 @@
 
   std::function<void()> WrapTask(const std::function<void()>& function);
 
-  base::ThreadTaskRunner* service_thread() { return service_thread_.runner(); }
-  base::ThreadTaskRunner* producer_thread() {
-    return fake_producer_thread_.runner();
-  }
+  TaskRunnerThread* service_thread() { return &service_thread_; }
+  TaskRunnerThread* producer_thread() { return &producer_thread_; }
   const std::vector<protos::gen::TracePacket>& trace() { return trace_; }
 
  private:
@@ -202,9 +97,8 @@
 
   std::vector<protos::gen::TracePacket> trace_;
 
-  ServiceThread service_thread_;
-  FakeProducerThread fake_producer_thread_;
-
+  TaskRunnerThread service_thread_;
+  TaskRunnerThread producer_thread_;
   std::unique_ptr<TracingService::ConsumerEndpoint> endpoint_;  // Keep last.
 };