threading: add Future<T>/Stream<T> helpers for Channel<T> and ThreadPool

A very common use of Futures is for representing which is happening
across a thread boundary. A Channel is a perfect way to pass data
across thread boundaries. base::ThreadPool allows running computation
on another thread without worrying about

These helper functions:
a) utilise Future/Streams and Channel to elegantly represent message
passing between threads
b) allow running computations on a ThreadPool and returning the result
as a Future or Stream when the computation completes

Change-Id: Ie6c9d12208c6fe5b4b1b6f54290a39da0d514b0c
diff --git a/Android.bp b/Android.bp
index 1e6f23b..0a016e8 100644
--- a/Android.bp
+++ b/Android.bp
@@ -8208,6 +8208,7 @@
         "src/base/threading/future_unittest.cc",
         "src/base/threading/stream_unittest.cc",
         "src/base/threading/thread_pool_unittest.cc",
+        "src/base/threading/util_unittest.cc",
     ],
 }
 
diff --git a/include/perfetto/ext/base/threading/BUILD.gn b/include/perfetto/ext/base/threading/BUILD.gn
index 151d10f..9e557ec 100644
--- a/include/perfetto/ext/base/threading/BUILD.gn
+++ b/include/perfetto/ext/base/threading/BUILD.gn
@@ -23,6 +23,7 @@
     "stream.h",
     "stream_combinators.h",
     "thread_pool.h",
+    "util.h",
   ]
   deps = [
     "..:base",
diff --git a/include/perfetto/ext/base/threading/util.h b/include/perfetto/ext/base/threading/util.h
new file mode 100644
index 0000000..34dccdb
--- /dev/null
+++ b/include/perfetto/ext/base/threading/util.h
@@ -0,0 +1,182 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INCLUDE_PERFETTO_EXT_BASE_THREADING_UTIL_H_
+#define INCLUDE_PERFETTO_EXT_BASE_THREADING_UTIL_H_
+
+#include <functional>
+#include <memory>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include "perfetto/base/status.h"
+#include "perfetto/base/task_runner.h"
+#include "perfetto/ext/base/optional.h"
+#include "perfetto/ext/base/threading/channel.h"
+#include "perfetto/ext/base/threading/future.h"
+#include "perfetto/ext/base/threading/poll.h"
+#include "perfetto/ext/base/threading/stream.h"
+#include "perfetto/ext/base/threading/thread_pool.h"
+#include "perfetto/ext/base/unix_task_runner.h"
+
+namespace perfetto {
+namespace base {
+
+// Blocks the calling thread until |fd| is considered "readable". In Linux,
+// this corresponds to |POLLOUT| or |POLLHUP| being returned if |fd| is polled.
+inline void BlockUntilReadableFd(base::PlatformHandle fd) {
+  base::UnixTaskRunner runner;
+  runner.AddFileDescriptorWatch(fd, [&runner]() { runner.Quit(); });
+  runner.Run();
+}
+
+// Creates a Stream<T> which returns all the data from |channel| and completes
+// when |channel| is closed.
+//
+// Note: the caller retains ownership of the passed channel and must ensure that
+// the channel outlives the lifetime of the returned stream.
+template <typename T>
+Stream<T> ReadChannelStream(Channel<T>* channel) {
+  class ReadImpl : public StreamPollable<T> {
+   public:
+    explicit ReadImpl(Channel<T>* reader) : reader_(reader) {}
+
+    StreamPollResult<T> PollNext(PollContext* ctx) override {
+      auto result = reader_->ReadNonBlocking();
+      if (!result.item.has_value()) {
+        if (result.is_closed) {
+          return DonePollResult();
+        }
+        ctx->RegisterInterested(reader_->read_fd());
+        return PendingPollResult();
+      }
+      return std::move(*result.item);
+    }
+
+   private:
+    Channel<T>* reader_ = nullptr;
+  };
+  return MakeStream<ReadImpl>(channel);
+}
+
+// Creates a Future<FVoid> which handles writing |item| into |channel|. The
+// Future is completed when the item is succesfully written.
+//
+// Note: the caller retains ownership of the passed channel and must ensure that
+// the channel outlives the lifetime of the returned future.
+template <typename T>
+Future<FVoid> WriteChannelFuture(Channel<T>* channel, T item) {
+  class WriteImpl : public FuturePollable<FVoid> {
+   public:
+    WriteImpl(Channel<T>* writer, T to_write)
+        : writer_(writer), to_write_(std::move(to_write)) {}
+
+    FuturePollResult<FVoid> Poll(PollContext* ctx) override {
+      auto res = writer_->WriteNonBlocking(std::move(to_write_));
+      PERFETTO_CHECK(!res.is_closed);
+      if (!res.success) {
+        ctx->RegisterInterested(writer_->write_fd());
+        return PendingPollResult();
+      }
+      return FVoid();
+    }
+
+   private:
+    Channel<T>* writer_ = nullptr;
+    T to_write_;
+  };
+  return MakeFuture<WriteImpl>(channel, std::move(item));
+}
+
+// Creates a Stream<T> which yields the result of executing |fn| on |pool|
+// repeatedly. The returned stream only completes when |fn| returns
+// base::nullopt.
+//
+// The intended usage of this function is to schedule CPU intensive work on a
+// background thread pool and receive regular "updates" on the progress by:
+// a) breaking the work into chunks
+// b) returning some indication of progress/partial results through |T|.
+template <typename T>
+Stream<T> RunOnThreadPool(ThreadPool* pool,
+                          std::function<base::Optional<T>()> fn) {
+  class RunOnPoolImpl : public StreamPollable<T> {
+   public:
+    explicit RunOnPoolImpl(ThreadPool* pool,
+                           std::function<base::Optional<T>()> fn)
+        : pool_(pool),
+          fn_(std::make_shared<std::function<base::Optional<T>()>>(
+              std::move(fn))),
+          channel_(new Channel<T>(1)),
+          channel_stream_(ReadChannelStream(channel_.get())) {
+      RunFn();
+    }
+
+    StreamPollResult<T> PollNext(PollContext* ctx) override {
+      ASSIGN_OR_RETURN_IF_PENDING_STREAM(res, channel_stream_.PollNext(ctx));
+      if (res.IsDone()) {
+        return DonePollResult();
+      }
+      RunFn();
+      return res;
+    }
+
+   private:
+    void RunFn() {
+      pool_->PostTask([channel = channel_, fn = fn_]() {
+        auto opt_value = (*fn)();
+        if (!opt_value) {
+          channel->Close();
+          return;
+        }
+        auto write_res =
+            channel->WriteNonBlocking(std::move(opt_value.value()));
+        PERFETTO_CHECK(write_res.success);
+        PERFETTO_CHECK(!write_res.is_closed);
+      });
+    }
+
+    ThreadPool* pool_ = nullptr;
+    std::shared_ptr<std::function<base::Optional<T>()>> fn_;
+    std::shared_ptr<Channel<T>> channel_;
+    base::Stream<T> channel_stream_;
+  };
+  return MakeStream<RunOnPoolImpl>(pool, std::move(fn));
+}
+
+// Creates a Future<T> which yields the result of executing |fn| on |pool|. The
+// returned completes with the return value of |fn|.
+//
+// The intended usage of this function is to schedule CPU intensive work on a
+// background thread pool and have the result returned when available.
+template <typename T>
+Future<T> RunOnceOnThreadPool(ThreadPool* pool, std::function<T()> fn) {
+  return RunOnThreadPool<T>(
+             pool,
+             [done = false, fn = std::move(fn)]() mutable -> base::Optional<T> {
+               if (done) {
+                 return base::nullopt;
+               }
+               done = true;
+               return fn();
+             })
+      .Collect(base::ToFutureCheckedCollector<T>());
+}
+
+}  // namespace base
+}  // namespace perfetto
+
+#endif  // INCLUDE_PERFETTO_EXT_BASE_THREADING_UTIL_H_
diff --git a/src/base/threading/BUILD.gn b/src/base/threading/BUILD.gn
index cdf2363..3a31723 100644
--- a/src/base/threading/BUILD.gn
+++ b/src/base/threading/BUILD.gn
@@ -36,5 +36,6 @@
     "future_unittest.cc",
     "stream_unittest.cc",
     "thread_pool_unittest.cc",
+    "util_unittest.cc",
   ]
 }
diff --git a/src/base/threading/util_unittest.cc b/src/base/threading/util_unittest.cc
new file mode 100644
index 0000000..887802c
--- /dev/null
+++ b/src/base/threading/util_unittest.cc
@@ -0,0 +1,150 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "perfetto/ext/base/threading/util.h"
+
+#include "perfetto/base/flat_set.h"
+#include "perfetto/base/platform_handle.h"
+#include "perfetto/base/time.h"
+#include "perfetto/ext/base/event_fd.h"
+#include "perfetto/ext/base/optional.h"
+#include "perfetto/ext/base/threading/channel.h"
+#include "perfetto/ext/base/threading/poll.h"
+#include "perfetto/ext/base/threading/stream.h"
+#include "perfetto/ext/base/threading/thread_pool.h"
+#include "perfetto/ext/base/unix_task_runner.h"
+#include "perfetto/ext/base/waitable_event.h"
+#include "test/gtest_and_gmock.h"
+
+namespace perfetto {
+namespace base {
+namespace {
+
+int WaitForFutureReady(base::Future<int>& stream,
+                       base::FlatSet<base::PlatformHandle>& interested,
+                       PollContext& ctx) {
+  auto res = stream.Poll(&ctx);
+  for (; res.IsPending(); res = stream.Poll(&ctx)) {
+    PERFETTO_CHECK(interested.size() == 1);
+    base::BlockUntilReadableFd(*interested.begin());
+    interested = {};
+  }
+  return res.item();
+}
+
+base::Optional<int> WaitForStreamReady(
+    base::Stream<int>& stream,
+    base::FlatSet<base::PlatformHandle>& interested,
+    PollContext& ctx) {
+  auto res = stream.PollNext(&ctx);
+  for (; res.IsPending(); res = stream.PollNext(&ctx)) {
+    PERFETTO_CHECK(interested.size() == 1);
+    base::BlockUntilReadableFd(*interested.begin());
+    interested = {};
+  }
+  return res.IsDone() ? base::nullopt : base::make_optional(res.item());
+}
+
+TEST(UtilUnittest, BlockUntilReadableFd) {
+  base::WaitableEvent evt;
+  base::EventFd main_to_background;
+  base::EventFd background_to_main;
+  std::thread thread([&main_to_background, &background_to_main] {
+    base::BlockUntilReadableFd(main_to_background.fd());
+    background_to_main.Notify();
+  });
+  main_to_background.Notify();
+  base::BlockUntilReadableFd(background_to_main.fd());
+  thread.join();
+}
+
+TEST(UtilUnittest, ReadChannelStream) {
+  base::Channel<int> channel(1);
+  auto stream = base::ReadChannelStream(&channel);
+
+  base::FlatSet<base::PlatformHandle> interested;
+  base::FlatSet<base::PlatformHandle> ready;
+  PollContext ctx(&interested, &ready);
+
+  ASSERT_TRUE(stream.PollNext(&ctx).IsPending());
+  ASSERT_EQ(interested.count(channel.read_fd()), 1u);
+  interested = {};
+
+  ASSERT_TRUE(channel.WriteNonBlocking(1).success);
+  ASSERT_EQ(stream.PollNext(&ctx).item(), 1);
+
+  ASSERT_TRUE(stream.PollNext(&ctx).IsPending());
+  ASSERT_EQ(interested.count(channel.read_fd()), 1u);
+  interested = {};
+
+  ASSERT_TRUE(channel.WriteNonBlocking(2).success);
+  channel.Close();
+
+  ASSERT_EQ(stream.PollNext(&ctx).item(), 2);
+  ASSERT_TRUE(stream.PollNext(&ctx).IsDone());
+}
+
+TEST(UtilUnittest, WriteChannelFuture) {
+  base::Channel<int> channel(1);
+
+  base::FlatSet<base::PlatformHandle> interested;
+  base::FlatSet<base::PlatformHandle> ready;
+  PollContext ctx(&interested, &ready);
+
+  ASSERT_TRUE(channel.WriteNonBlocking(1).success);
+  ASSERT_FALSE(channel.WriteNonBlocking(2).success);
+
+  auto future = base::WriteChannelFuture(&channel, 3);
+  ASSERT_TRUE(future.Poll(&ctx).IsPending());
+  ASSERT_EQ(interested.count(channel.write_fd()), 1u);
+  interested = {};
+
+  ASSERT_EQ(channel.ReadNonBlocking().item, 1);
+  ASSERT_EQ(channel.ReadNonBlocking().item, base::nullopt);
+
+  ASSERT_FALSE(future.Poll(&ctx).IsPending());
+  ASSERT_EQ(channel.ReadNonBlocking().item, 3);
+}
+
+TEST(UtilUnittest, RunOnThreadPool) {
+  base::FlatSet<base::PlatformHandle> interested;
+  base::FlatSet<base::PlatformHandle> ready;
+  PollContext ctx(&interested, &ready);
+
+  base::ThreadPool pool(1);
+  base::Stream<int> stream =
+      base::RunOnThreadPool<int>(&pool, [counter = 0]() mutable {
+        return counter == 2 ? base::nullopt : base::make_optional(counter++);
+      });
+  ASSERT_EQ(WaitForStreamReady(stream, interested, ctx), 0);
+  ASSERT_EQ(WaitForStreamReady(stream, interested, ctx), 1);
+  ASSERT_EQ(WaitForStreamReady(stream, interested, ctx), base::nullopt);
+}
+
+TEST(UtilUnittest, RunOnceOnThreadPool) {
+  base::FlatSet<base::PlatformHandle> interested;
+  base::FlatSet<base::PlatformHandle> ready;
+  PollContext ctx(&interested, &ready);
+
+  base::ThreadPool pool(1);
+  base::Future<int> fut =
+      base::RunOnceOnThreadPool<int>(&pool, []() mutable { return 1; });
+  ASSERT_EQ(WaitForFutureReady(fut, interested, ctx), 1);
+}
+
+}  // namespace
+}  // namespace base
+}  // namespace perfetto