perfetto: add minimal implementation of Streams

This CL adds a minimal implementation of streams in the Perfetto
codebase. This is mainly intended to be used by cloud trace processor.

This is largely analogous to aosp/2381492 which introduced futures.

Change-Id: I9809dbf67320ae7103373470206e2829023ab159
diff --git a/Android.bp b/Android.bp
index 7392a26..ce5b623 100644
--- a/Android.bp
+++ b/Android.bp
@@ -8182,6 +8182,7 @@
 filegroup {
     name: "perfetto_src_base_threading_threading",
     srcs: [
+        "src/base/threading/stream_combinators.cc",
         "src/base/threading/thread_pool.cc",
     ],
 }
@@ -8192,6 +8193,7 @@
     srcs: [
         "src/base/threading/channel_unittest.cc",
         "src/base/threading/future_unittest.cc",
+        "src/base/threading/stream_unittest.cc",
         "src/base/threading/thread_pool_unittest.cc",
     ],
 }
diff --git a/include/perfetto/ext/base/threading/BUILD.gn b/include/perfetto/ext/base/threading/BUILD.gn
index 5f3dfa9..151d10f 100644
--- a/include/perfetto/ext/base/threading/BUILD.gn
+++ b/include/perfetto/ext/base/threading/BUILD.gn
@@ -20,6 +20,8 @@
     "future.h",
     "future_combinators.h",
     "poll.h",
+    "stream.h",
+    "stream_combinators.h",
     "thread_pool.h",
   ]
   deps = [
diff --git a/include/perfetto/ext/base/threading/future.h b/include/perfetto/ext/base/threading/future.h
index 6ea216e..c509e9b 100644
--- a/include/perfetto/ext/base/threading/future.h
+++ b/include/perfetto/ext/base/threading/future.h
@@ -80,15 +80,15 @@
   Future(T item) : pollable_(new ImmediateImpl<T>(std::move(item))) {}
 
   // Intentionally implicit to allow for egonomic definition of functions
-  // returning Future<base::StatusOr<T>> by simply returning base::ErrStatus.
+  // returning Future<StatusOr<T>> by simply returning ErrStatus.
   // The enable_if is necessary because this definition is the same as the above
   // constructor in cases where T = base::Status.
   template <typename U = T,
-            typename = std::enable_if_t<!std::is_same_v<base::Status, U>>>
-  Future(base::Status status) : Future(T(std::move(status))) {}
+            typename = std::enable_if_t<!std::is_same_v<Status, U>>>
+  Future(Status status) : Future(T(std::move(status))) {}
 
   // Intentionally implicit to allow for egonomic definition of functions
-  // returning Future<base::StatusOr<T>> by simply returning T.
+  // returning Future<StatusOr<T>> by simply returning T.
   template <typename U = T, typename = typename U::value_type>
   Future(typename U::value_type val) : Future(T(std::move(val))) {}
 
@@ -132,12 +132,12 @@
 
 // Alias to shorten type defintions for Future<Status> which is common in
 // the codebase.
-using StatusFuture = base::Future<base::Status>;
+using StatusFuture = Future<Status>;
 
 // Alias to shorten type defintions for Future<StatusOr<T>> which is common
 // in the codebase.
 template <typename T>
-using StatusOrFuture = base::Future<base::StatusOr<T>>;
+using StatusOrFuture = Future<StatusOr<T>>;
 
 }  // namespace base
 }  // namespace perfetto
diff --git a/include/perfetto/ext/base/threading/poll.h b/include/perfetto/ext/base/threading/poll.h
index b162a32..5989361 100644
--- a/include/perfetto/ext/base/threading/poll.h
+++ b/include/perfetto/ext/base/threading/poll.h
@@ -96,6 +96,68 @@
   virtual FuturePollResult<T> Poll(PollContext*) = 0;
 };
 
+// Indicates that the Stream has been exhausted and no more values will be
+// returned.
+struct DonePollResult {};
+
+// Return value of Stream<T>::Poll.
+//
+// Essentially a wrapper around std::variant<T, PendingPollResult,
+// DonePollResult> but with higher level API.
+template <typename T>
+class StreamPollResult {
+ public:
+  using PollT = T;
+
+  // Intentionally implicit to allow idiomatic returns.
+  StreamPollResult(const PendingPollResult&) : inner_(PendingPollResult()) {}
+  StreamPollResult(const DonePollResult&) : inner_(DonePollResult()) {}
+  StreamPollResult(T item) : inner_(std::move(item)) {}
+
+  // Returns whether the Stream is still pending.
+  bool IsPending() const {
+    return std::holds_alternative<PendingPollResult>(inner_);
+  }
+
+  // Returns whether the Stream is done.
+  bool IsDone() const { return std::holds_alternative<DonePollResult>(inner_); }
+
+  // The real value inside this result: requires !IsPending() and !IsDone().
+  T& item() {
+    PERFETTO_DCHECK(!IsPending());
+    PERFETTO_DCHECK(!IsDone());
+    return std::get<T>(inner_);
+  }
+  const T& item() const {
+    PERFETTO_DCHECK(!IsPending());
+    PERFETTO_DCHECK(!IsDone());
+    return std::get<T>(inner_);
+  }
+
+  // The real value inside this result: requires !IsPending() and !IsDone().
+  T* operator->() { return &item(); }
+  const T* operator->() const { return &item(); }
+
+ private:
+  std::variant<PendingPollResult, DonePollResult, T> inner_;
+};
+
+// Interface for implementing the Stream<T>::Poll function.
+//
+// This is essentially analagous to FuturePollable<T> for streams: check the
+// documentation of that class for why this exists.
+template <typename T>
+class StreamPollable {
+ public:
+  using PollT = T;
+
+  virtual ~StreamPollable() = default;
+
+  // Implementation of the Poll function of a Stream: see Stream documentation
+  // for how this should be implemented.
+  virtual StreamPollResult<T> PollNext(PollContext*) = 0;
+};
+
 // Context class passed to Pollable classes.
 //
 // Implementations of Pollable which simply wrap another Pollable will use
@@ -151,13 +213,31 @@
 //   ASSIGN_OR_RETURN_IF_PENDING_FUTURE(res, MyIntReturningFutureFn());
 //   return std::to_string(*foo);
 // }
-//
 #define ASSIGN_OR_RETURN_IF_PENDING_FUTURE(var, expr) \
   auto assign_and_return_if_poll_##var = (expr);      \
   if (assign_and_return_if_poll_##var.IsPending())    \
     return base::PendingPollResult();                 \
   auto var = std::move(assign_and_return_if_poll_##var.item())
 
+// Evaluates |expr|, which should return a PollResult. If IsPending is
+// true, returns base::PendingPollResult().
+//
+// Example usage:
+//
+// Strean<int> MyIntReturningStreamFn();
+//
+// StreamPollResult<std::string> Poll(PollContext* ctx) {
+//   ASSIGN_OR_RETURN_IF_PENDING_STREAM(res, MyIntReturningStreamFn());
+//   if (res.IsDone()) {
+//     return DonePollResult();
+//   }
+//   return std::to_string(*foo);
+// }
+#define ASSIGN_OR_RETURN_IF_PENDING_STREAM(var, expr) \
+  auto var = (expr);                                  \
+  if (var.IsPending())                                \
+  return base::PendingPollResult()
+
 }  // namespace base
 }  // namespace perfetto
 
diff --git a/include/perfetto/ext/base/threading/stream.h b/include/perfetto/ext/base/threading/stream.h
new file mode 100644
index 0000000..4d67eb4
--- /dev/null
+++ b/include/perfetto/ext/base/threading/stream.h
@@ -0,0 +1,185 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_H_
+#define INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_H_
+
+#include <functional>
+#include <memory>
+#include <vector>
+
+#include "perfetto/base/status.h"
+#include "perfetto/ext/base/status_or.h"
+#include "perfetto/ext/base/threading/future.h"
+#include "perfetto/ext/base/threading/stream_combinators.h"
+
+namespace perfetto {
+namespace base {
+
+// Creates a Stream<T> from P, a subclass of StreamPollable<T>.
+//
+// This function follows the same pattern of std::make_unique, std::make_shared
+// etc.
+template <typename P, typename... Args>
+Stream<typename P::PollT> MakeStream(Args... args) {
+  return Stream<typename P::PollT>(
+      std::unique_ptr<StreamPollable<typename P::PollT>>(
+          new P(std::forward<Args>(args)...)));
+}
+
+// An asynchronous iterator for values of type T.
+//
+// If Future<T> is an asynchronous version of T, Stream<T> is an asynchronous
+// version of Iterator<T>. Long-running compute/IO operations which return
+// multiple values can be represented with a Stream<T>.
+//
+// Refer to the class documentation for Future<T> as most of the features and
+// implementation of Future<T> also apply to Stream<T>.
+template <typename T>
+class Stream {
+ public:
+  using PollableItem = T;
+
+  // Creates a Stream from a |StreamPollable<T>|. Prefer using |MakeStream|
+  // instead of this function.
+  explicit Stream(std::unique_ptr<StreamPollable<T>> pollable)
+      : pollable_(std::move(pollable)) {}
+
+  // Converts a Stream<T> to Stream<U>. This works by applying |map_fn| to each
+  // element in T and then polling the returned Future<U> to completion.
+  template <typename Function /* = Future<U>(T) */>
+  Stream<FutureReturn<Function, T>> MapFuture(Function map_fn) && {
+    return MakeStream<MapFutureStreamImpl<Function, T>>(std::move(*this),
+                                                        std::move(map_fn));
+  }
+
+  // Creates a stream which fully polls |this| and then polls |concat| to
+  // completion.
+  Stream<T> Concat(Stream<T> concat) && {
+    return MakeStream<ConcatStreamImpl<T>>(std::move(*this), std::move(concat));
+  }
+
+  // Converts a Stream<T> to Future<U> by collecting elements using |collector|.
+  // See documentation on |Collector| for how to implement one.
+  template <typename U>
+  Future<U> Collect(std::unique_ptr<Collector<T, U>> collector) && {
+    return MakeFuture<CollectImpl<T, U>>(std::move(*this),
+                                         std::move(collector));
+  }
+
+  // Checks if the computation backing this Stream<T> has finished.
+  //
+  // Returns a StreamPollResult<T> which is a essentially a
+  // variant<PendingPollResult, DonePollResult T>. If PendingPollResult is
+  // returned, |ctx| will be used to register interest in the various fds which
+  // are "blocking" this future from finishing. If DonePollResult is returned,
+  // Poll *must not* be called again.
+  StreamPollResult<T> PollNext(PollContext* ctx) {
+    return pollable_->PollNext(ctx);
+  }
+
+ private:
+  std::unique_ptr<StreamPollable<T>> pollable_;
+};
+
+// Alias to shorten type defintions for Stream<Status> which is common in
+// the codebase.
+using StatusStream = Stream<Status>;
+
+// Alias to shorten type defintions for Stream<StatusOr<T>> which is common
+// in the codebase.
+template <typename T>
+using StatusOrStream = Stream<StatusOr<T>>;
+
+// Creates a Stream<T> which returns the next value inside |vector| every time
+// Stream<T>::Poll is called.
+template <typename T>
+Stream<T> StreamFrom(std::vector<T> vector) {
+  return MakeStream<ImmediateStreamImpl<T>>(std::move(vector));
+}
+
+// Creates a Stream<T> which immediately returns DonePollResult when polled.
+template <typename T>
+Stream<T> EmptyStream() {
+  return StreamFrom(std::vector<T>());
+}
+
+// Creates a Stream<T> which returns |first| and each of |rest| in sequence when
+// polled.
+template <typename T, typename... Ts>
+Stream<T> StreamOf(T first, Ts... rest) {
+  std::vector<T> values;
+  AddAllToVector(values, std::forward<T>(first), std::forward<Ts>(rest)...);
+  return StreamFrom(std::move(values));
+}
+
+// Creates a Stream<T> which returns the value of |future| before completing.
+template <typename T>
+Stream<T> StreamFromFuture(Future<T> future) {
+  return StreamOf(std::move(future)).MapFuture([](Future<T> value) { return value; });
+}
+
+// Creates a stream which returns no elements but calls |fn| in the destructor
+// of the returned stream.
+//
+// This function can be used to do resource management for a stream by making
+// the passed |fn| own the resources used by any "upstream" sources and then
+// Concat-ing this stream with the upstream.
+template <typename T, typename Function>
+Stream<T> OnDestroyStream(Function fn) {
+  return MakeStream<OnDestroyStreamImpl<T, Function>>(std::move(fn));
+}
+
+// Creates a Stream<T> returning values generated by each stream in |streams| as
+// soon as they are produced without preserving ordering.
+//
+// The returned Stream<T> keeps the amount of Poll calls to the inner |streams|,
+// to a minimum only calling Poll for the Streams which are marked are ready
+// in the PollContext.
+template <typename T>
+Stream<T> FlattenStreams(std::vector<Stream<T>> streams) {
+  return MakeStream<FlattenImpl<T>>(std::move(streams));
+}
+
+// Collector for Stream<Status>::Collect() which immediately resolves the
+// returned Future when an error status is detected. Resolves with
+// OkStatus once the entire stream finishes after returning all OkStatus().
+inline std::unique_ptr<Collector<Status, Status>> AllOkCollector() {
+  return std::make_unique<AllOkCollectorImpl>();
+}
+
+// Collector for Stream<T>::Collect() which ensures the stream returns *exactly*
+// one T before completing. Crashes if either a) no values are produced by
+// the Stream, b) more than one value is produced by the Stream.
+template <typename T>
+inline std::unique_ptr<Collector<T, T>> ToFutureCheckedCollector() {
+  return std::make_unique<FutureCheckedCollectorImpl<T>>();
+}
+
+// Collector for Stream<StatusOr<T>>::Collect() which returns a vector
+// containing all the successful results from the stream. If any element is an
+// error, short-circuits the stream with the error.
+template <typename T>
+inline std::unique_ptr<
+    Collector<StatusOr<T>, StatusOr<std::vector<T>>>>
+StatusOrVectorCollector() {
+  return std::make_unique<StatusOrVectorCollectorImpl<T>>();
+}
+
+}  // namespace base
+}  // namespace perfetto
+
+#endif  // INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_H_
diff --git a/include/perfetto/ext/base/threading/stream_combinators.h b/include/perfetto/ext/base/threading/stream_combinators.h
new file mode 100644
index 0000000..5485139
--- /dev/null
+++ b/include/perfetto/ext/base/threading/stream_combinators.h
@@ -0,0 +1,315 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_COMBINATORS_H_
+#define INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_COMBINATORS_H_
+
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "perfetto/base/status.h"
+#include "perfetto/ext/base/optional.h"
+#include "perfetto/ext/base/status_or.h"
+#include "perfetto/ext/base/threading/future_combinators.h"
+#include "perfetto/ext/base/threading/poll.h"
+
+namespace perfetto {
+namespace base {
+
+template <typename T>
+class Stream;
+
+// Helper function for adding all the elements in parameter pack to a vector.
+template <typename T, typename... Elements>
+void AddAllToVector(std::vector<T>&) {}
+
+template <typename T, typename... Elements>
+void AddAllToVector(std::vector<T>& vec, T first, Elements... rest) {
+  vec.emplace_back(std::forward<T>(first));
+  AddAllToVector(vec, std::forward<Elements>(rest)...);
+}
+
+// For a Function which returns Stream<U>, returns the U.
+template <typename Function, typename T>
+using StreamReturn =
+    typename std::invoke_result<Function, T>::type::PollableItem;
+
+// Implementation of StreamPollable for creating a Stream<T> from a
+// std::vector<T>.
+template <typename T>
+class ImmediateStreamImpl : public StreamPollable<T> {
+ public:
+  explicit ImmediateStreamImpl(std::vector<T> values)
+      : values_(std::move(values)) {}
+
+  StreamPollResult<T> PollNext(PollContext*) override {
+    if (index_ >= values_.size()) {
+      return DonePollResult();
+    }
+    return StreamPollResult<T>(std::move(values_[index_++]));
+  }
+
+ private:
+  std::vector<T> values_;
+  uint32_t index_ = 0;
+};
+
+// Implementation of a StreamPollable for creating a Stream<U> from a Stream<T>
+// and a functor with prototype Future<U>(T).
+template <typename Function, typename T>
+class MapFutureStreamImpl : public StreamPollable<FutureReturn<Function, T>> {
+ public:
+  using U = FutureReturn<Function, T>;
+
+  MapFutureStreamImpl(Stream<T> stream, Function map_fn)
+      : stream_(std::move(stream)), map_fn_(std::move(map_fn)) {}
+
+  StreamPollResult<U> PollNext(PollContext* context) override {
+    if (!future_) {
+      ASSIGN_OR_RETURN_IF_PENDING_STREAM(res, stream_.PollNext(context));
+      if (res.IsDone()) {
+        return DonePollResult();
+      }
+      future_ = map_fn_(std::move(res.item()));
+    }
+    ASSIGN_OR_RETURN_IF_PENDING_FUTURE(res, future_->Poll(context));
+    future_ = nullopt;
+    return res;
+  }
+
+ private:
+  Stream<T> stream_;
+  Function map_fn_;
+  Optional<Future<U>> future_;
+};
+
+// Implementation of a StreamPollable for creating a concatenating two streams
+// together.
+template <typename T>
+class ConcatStreamImpl : public StreamPollable<T> {
+ public:
+  explicit ConcatStreamImpl(Stream<T> first, Stream<T> second)
+      : first_(std::move(first)), second_(std::move(second)) {}
+
+  StreamPollResult<T> PollNext(PollContext* context) override {
+    if (first_) {
+      ASSIGN_OR_RETURN_IF_PENDING_STREAM(res, first_->PollNext(context));
+      if (!res.IsDone()) {
+        return res.item();
+      }
+      first_ = base::nullopt;
+    }
+    return second_.PollNext(context);
+  }
+
+ private:
+  base::Optional<Stream<T>> first_;
+  Stream<T> second_;
+};
+
+// Implementation of a StreamPollable for creating a Stream<T> from a
+// std::vector<Stream<T>>. Values are returned from the inner streams as soon as
+// they are available.
+template <typename T>
+class FlattenImpl : public StreamPollable<T> {
+ public:
+  explicit FlattenImpl(std::vector<Stream<T>> streams)
+      : registered_handles_(static_cast<uint32_t>(streams.size())) {
+    for (auto& stream : streams) {
+      streams_.emplace_back(std::move(stream));
+    }
+  }
+
+  StreamPollResult<T> PollNext(PollContext* upstream) override {
+    for (uint32_t i = 0; i < streams_.size(); ++i) {
+      auto& stream = streams_[i];
+      if (!stream) {
+        continue;
+      }
+      Optional<PollContext> ctx = PollContextForStream(upstream, i);
+      if (!ctx) {
+        continue;
+      }
+      StreamPollResult<T> res = stream->PollNext(&*ctx);
+      if (res.IsPending()) {
+        PERFETTO_CHECK(!registered_handles_[i].empty());
+        continue;
+      }
+      if (!res.IsDone()) {
+        return res;
+      }
+      // StreamPollable has returned EOF. Clear it and the registered handles
+      // out.
+      stream = nullopt;
+      ++eof_streams_;
+      registered_handles_[i].clear();
+    }
+
+    // Every child stream being EOF means we have reached EOF as well.
+    if (eof_streams_ == streams_.size()) {
+      return DonePollResult();
+    }
+    // Every remaining stream must be pending so we can make no further
+    // progress. Register all the child handles with the context and return.
+    for (const FlatSet<PlatformHandle>& handles : registered_handles_) {
+      upstream->RegisterAllInterested(handles);
+    }
+    return PendingPollResult();
+  }
+
+ private:
+  Optional<PollContext> PollContextForStream(PollContext* upstream,
+                                             uint32_t stream_idx) {
+    FlatSet<PlatformHandle>& state = registered_handles_[stream_idx];
+    if (state.empty()) {
+      return PollContext(&state, &upstream->ready_handles());
+    }
+    for (PlatformHandle handle : upstream->ready_handles()) {
+      if (state.count(handle)) {
+        state.clear();
+        return PollContext(&state, &upstream->ready_handles());
+      }
+    }
+    return base::nullopt;
+  }
+
+  std::vector<Optional<Stream<T>>> streams_;
+  std::vector<FlatSet<PlatformHandle>> registered_handles_;
+  uint32_t eof_streams_ = 0;
+};
+
+// Implementation of a Stream<T> which immediately completes and calls a
+// function in the destructor.
+template <typename T, typename Function>
+class OnDestroyStreamImpl : public StreamPollable<T> {
+ public:
+  explicit OnDestroyStreamImpl(Function fn) : fn_(std::move(fn)) {}
+  ~OnDestroyStreamImpl() override { fn_(); }
+
+  StreamPollResult<T> PollNext(PollContext*) override {
+    return DonePollResult();
+  }
+
+ private:
+  Function fn_;
+};
+
+// Interface for converting a Stream<T> into a Future<U>.
+//
+// The goal of this interface is to allow a Stream to be converted to a Future,
+// allowing short-circuiting (i.e. allowing the Future to complete before
+// the stream finishes).
+//
+// The flexibility of this interface allows both supporting the traditional
+// notion of collecting i.e. converting a Stream<T> to a Future<vector<T>> but
+// also more advanced functionality like completing a Future<Status> early
+// when errors are detected, racing Future<T> against each other and returning
+// the first value produced etc.
+template <typename T, typename U>
+class Collector {
+ public:
+  virtual ~Collector() = default;
+
+  // Receives the next item from a Stream<T>. If the wrapping Future<U> can be
+  // completed, returns the a value U which completes that future. Otherwise,
+  // returns base::nullopt.
+  virtual Optional<U> OnNext(T value) = 0;
+
+  // Called when the stream has completed and returns the |U| which will be
+  // used to complete the future. This method will only be called if OnNext
+  // returned nullopt for every element in the stream.
+  virtual U OnDone() = 0;
+};
+
+// Implementation of a StreamPollable which converts a Stream<T> to a Future<U>
+// using an implementation of Collector<T, U>.
+template <typename T, typename U>
+class CollectImpl : public FuturePollable<U> {
+ public:
+  explicit CollectImpl(Stream<T> stream,
+                       std::unique_ptr<Collector<T, U>> collector)
+      : stream_(std::move(stream)), collector_(std::move(collector)) {}
+
+  FuturePollResult<U> Poll(PollContext* context) override {
+    for (;;) {
+      ASSIGN_OR_RETURN_IF_PENDING_STREAM(res, stream_.PollNext(context));
+      if (res.IsDone()) {
+        return collector_->OnDone();
+      }
+      Optional<U> collected = collector_->OnNext(std::move(res.item()));
+      if (collected.has_value()) {
+        return std::move(collected.value());
+      }
+    }
+  }
+
+ private:
+  Stream<T> stream_;
+  std::unique_ptr<Collector<T, U>> collector_;
+};
+
+// Implementation for |AllOkCollector|.
+class AllOkCollectorImpl : public Collector<Status, Status> {
+ public:
+  ~AllOkCollectorImpl() override;
+
+  Optional<Status> OnNext(Status status) override {
+    return status.ok() ? nullopt : make_optional(std::move(status));
+  }
+  Status OnDone() override { return OkStatus(); }
+};
+
+// Implementation for |ToFutureCheckedCollector|.
+template <typename T>
+class FutureCheckedCollectorImpl : public Collector<T, T> {
+ public:
+  Optional<T> OnNext(T value) override {
+    PERFETTO_CHECK(!prev_value_);
+    prev_value_ = value;
+    return nullopt;
+  }
+  T OnDone() override { return *prev_value_; }
+
+ private:
+  Optional<T> prev_value_;
+};
+
+// Implementation for |StatusOrVectorCollector|.
+template <typename T>
+class StatusOrVectorCollectorImpl
+    : public Collector<base::StatusOr<T>, base::StatusOr<std::vector<T>>> {
+ public:
+  Optional<base::StatusOr<std::vector<T>>> OnNext(
+      base::StatusOr<T> val_or) override {
+    if (!val_or.ok()) {
+      return make_optional(val_or.status());
+    }
+    values_.emplace_back(std::move(val_or.value()));
+    return nullopt;
+  }
+  base::StatusOr<std::vector<T>> OnDone() override {
+    return std::move(values_);
+  }
+
+ private:
+  std::vector<T> values_;
+};
+
+}  // namespace base
+}  // namespace perfetto
+
+#endif  // INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_COMBINATORS_H_
diff --git a/src/base/threading/BUILD.gn b/src/base/threading/BUILD.gn
index 0e4f150..cdf2363 100644
--- a/src/base/threading/BUILD.gn
+++ b/src/base/threading/BUILD.gn
@@ -17,7 +17,10 @@
 source_set("threading") {
   deps = [ "../../../gn:default_deps" ]
   public_deps = [ "../../../include/perfetto/ext/base/threading" ]
-  sources = [ "thread_pool.cc" ]
+  sources = [
+    "stream_combinators.cc",
+    "thread_pool.cc",
+  ]
 }
 
 perfetto_unittest_source_set("unittests") {
@@ -31,6 +34,7 @@
   sources = [
     "channel_unittest.cc",
     "future_unittest.cc",
+    "stream_unittest.cc",
     "thread_pool_unittest.cc",
   ]
 }
diff --git a/src/base/threading/stream_combinators.cc b/src/base/threading/stream_combinators.cc
new file mode 100644
index 0000000..9c21c9b
--- /dev/null
+++ b/src/base/threading/stream_combinators.cc
@@ -0,0 +1,25 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "perfetto/ext/base/threading/stream.h"
+
+namespace perfetto {
+namespace base {
+
+AllOkCollectorImpl::~AllOkCollectorImpl() = default;
+
+}  // namespace base
+}  // namespace perfetto
diff --git a/src/base/threading/stream_unittest.cc b/src/base/threading/stream_unittest.cc
new file mode 100644
index 0000000..3036bd9
--- /dev/null
+++ b/src/base/threading/stream_unittest.cc
@@ -0,0 +1,329 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "perfetto/ext/base/threading/stream.h"
+
+#include <vector>
+
+#include "perfetto/base/status.h"
+#include "perfetto/ext/base/threading/future_combinators.h"
+#include "perfetto/ext/base/threading/poll.h"
+#include "test/gtest_and_gmock.h"
+
+namespace perfetto {
+namespace base {
+namespace {
+
+using testing::_;
+using testing::ElementsAre;
+using testing::Return;
+
+template <typename T>
+class MockPollable : public FuturePollable<T> {
+ public:
+  MOCK_METHOD1(Poll, FuturePollResult<T>(PollContext*));
+};
+
+template <typename T>
+class MockStreamPollable : public StreamPollable<T> {
+ public:
+  MOCK_METHOD1(PollNext, StreamPollResult<T>(PollContext*));
+};
+
+class StreamUnittest : public ::testing::Test {
+ protected:
+  base::FlatSet<base::PlatformHandle> interested_;
+  base::FlatSet<base::PlatformHandle> ready_;
+  PollContext ctx_{&interested_, &ready_};
+};
+
+TEST_F(StreamUnittest, PollableImmediateResult) {
+  std::unique_ptr<MockStreamPollable<int>> int_pollable(
+      new MockStreamPollable<int>());
+  EXPECT_CALL(*int_pollable, PollNext(_))
+      .WillOnce(Return(StreamPollResult<int>(0)));
+
+  base::Stream<int> stream(std::move(int_pollable));
+  auto res = stream.PollNext(&ctx_);
+  ASSERT_FALSE(res.IsPending());
+  ASSERT_EQ(res.item(), 0);
+}
+
+TEST_F(StreamUnittest, PollablePendingThenResult) {
+  std::unique_ptr<MockStreamPollable<int>> int_pollable(
+      new MockStreamPollable<int>());
+  EXPECT_CALL(*int_pollable, PollNext(_))
+      .WillOnce(Return(PendingPollResult()))
+      .WillOnce(Return(StreamPollResult<int>(1)))
+      .WillOnce(Return(DonePollResult()));
+
+  base::Stream<int> stream(std::move(int_pollable));
+  ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
+  ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
+  ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
+}
+
+TEST_F(StreamUnittest, Map) {
+  std::unique_ptr<MockStreamPollable<int>> int_pollable(
+      new MockStreamPollable<int>());
+  EXPECT_CALL(*int_pollable, PollNext(_))
+      .WillOnce(Return(PendingPollResult()))
+      .WillOnce(Return(StreamPollResult<int>(1)))
+      .WillOnce(Return(PendingPollResult()))
+      .WillOnce(Return(StreamPollResult<int>(2)))
+      .WillOnce(Return(DonePollResult()));
+
+  auto stream = base::Stream<int>(std::move(int_pollable))
+                    .MapFuture([](int res) -> base::Future<std::string> {
+                      return std::to_string(res);
+                    });
+  ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
+  ASSERT_EQ(stream.PollNext(&ctx_).item(), "1");
+  ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
+  ASSERT_EQ(stream.PollNext(&ctx_).item(), "2");
+  ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
+}
+
+TEST_F(StreamUnittest, Concat) {
+  std::unique_ptr<MockStreamPollable<int>> int_pollable(
+      new MockStreamPollable<int>());
+  EXPECT_CALL(*int_pollable, PollNext(_))
+      .WillOnce(Return(PendingPollResult()))
+      .WillOnce(Return(StreamPollResult<int>(1)))
+      .WillOnce(Return(StreamPollResult<int>(2)))
+      .WillOnce(Return(DonePollResult()));
+
+  std::unique_ptr<MockStreamPollable<int>> concat_pollable(
+      new MockStreamPollable<int>());
+  EXPECT_CALL(*concat_pollable, PollNext(_))
+      .WillOnce(Return(PendingPollResult()))
+      .WillOnce(Return(StreamPollResult<int>(3)))
+      .WillOnce(Return(PendingPollResult()))
+      .WillOnce(Return(StreamPollResult<int>(4)))
+      .WillOnce(Return(DonePollResult()));
+
+  auto stream = base::Stream<int>(std::move(int_pollable))
+                    .Concat(base::Stream<int>(std::move(concat_pollable)));
+  ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
+  ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
+  ASSERT_EQ(stream.PollNext(&ctx_).item(), 2);
+  ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
+  ASSERT_EQ(stream.PollNext(&ctx_).item(), 3);
+  ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
+  ASSERT_EQ(stream.PollNext(&ctx_).item(), 4);
+  ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
+}
+
+TEST_F(StreamUnittest, AllOkCollectorEarly) {
+  std::unique_ptr<MockStreamPollable<base::Status>> pollable(
+      new MockStreamPollable<base::Status>());
+  EXPECT_CALL(*pollable, PollNext(_))
+      .WillOnce(Return(PendingPollResult()))
+      .WillOnce(Return(StreamPollResult<base::Status>(base::OkStatus())))
+      .WillOnce(Return(PendingPollResult()))
+      .WillOnce(Return(StreamPollResult<base::Status>(base::ErrStatus("Bad"))));
+
+  auto future = base::Stream<base::Status>(std::move(pollable))
+                    .Collect(base::AllOkCollector());
+  ASSERT_TRUE(future.Poll(&ctx_).IsPending());
+  ASSERT_TRUE(future.Poll(&ctx_).IsPending());
+  ASSERT_FALSE(future.Poll(&ctx_).item().ok());
+}
+
+TEST_F(StreamUnittest, AllOkCollectorComplete) {
+  std::unique_ptr<MockStreamPollable<base::Status>> pollable(
+      new MockStreamPollable<base::Status>());
+  EXPECT_CALL(*pollable, PollNext(_))
+      .WillOnce(Return(PendingPollResult()))
+      .WillOnce(Return(StreamPollResult<base::Status>(base::OkStatus())))
+      .WillOnce(Return(PendingPollResult()))
+      .WillOnce(Return(StreamPollResult<base::Status>(base::OkStatus())))
+      .WillOnce(Return(StreamPollResult<base::Status>(base::OkStatus())))
+      .WillOnce(Return(DonePollResult()));
+
+  auto future = base::Stream<base::Status>(std::move(pollable))
+                    .Collect(base::AllOkCollector());
+  ASSERT_TRUE(future.Poll(&ctx_).IsPending());
+  ASSERT_TRUE(future.Poll(&ctx_).IsPending());
+  ASSERT_TRUE(future.Poll(&ctx_).item().ok());
+}
+
+TEST_F(StreamUnittest, ToFutureCheckedCollector) {
+  std::unique_ptr<MockStreamPollable<base::Status>> pollable(
+      new MockStreamPollable<base::Status>());
+  EXPECT_CALL(*pollable, PollNext(_))
+      .WillOnce(Return(PendingPollResult()))
+      .WillOnce(Return(StreamPollResult<base::Status>(base::OkStatus())))
+      .WillOnce(Return(DonePollResult()));
+
+  auto future = base::Stream<base::Status>(std::move(pollable))
+                    .Collect(base::ToFutureCheckedCollector<base::Status>());
+  ASSERT_TRUE(future.Poll(&ctx_).IsPending());
+  ASSERT_TRUE(future.Poll(&ctx_).item().ok());
+}
+
+TEST_F(StreamUnittest, StatusOrCollectorEarly) {
+  std::unique_ptr<MockStreamPollable<base::StatusOr<int>>> pollable(
+      new MockStreamPollable<base::StatusOr<int>>());
+  EXPECT_CALL(*pollable, PollNext(_))
+      .WillOnce(Return(PendingPollResult()))
+      .WillOnce(Return(StreamPollResult<base::StatusOr<int>>(1024)))
+      .WillOnce(Return(PendingPollResult()))
+      .WillOnce(Return(
+          StreamPollResult<base::StatusOr<int>>(base::ErrStatus("Bad"))));
+
+  auto future = base::Stream<base::StatusOr<int>>(std::move(pollable))
+                    .Collect(base::StatusOrVectorCollector<int>());
+  ASSERT_TRUE(future.Poll(&ctx_).IsPending());
+  ASSERT_TRUE(future.Poll(&ctx_).IsPending());
+  ASSERT_FALSE(future.Poll(&ctx_).item().ok());
+}
+
+TEST_F(StreamUnittest, StatusOrCollectorComplete) {
+  std::unique_ptr<MockStreamPollable<base::StatusOr<int>>> pollable(
+      new MockStreamPollable<base::StatusOr<int>>());
+  EXPECT_CALL(*pollable, PollNext(_))
+      .WillOnce(Return(PendingPollResult()))
+      .WillOnce(Return(StreamPollResult<base::StatusOr<int>>(1024)))
+      .WillOnce(Return(PendingPollResult()))
+      .WillOnce(Return(StreamPollResult<base::StatusOr<int>>(2048)))
+      .WillOnce(Return(DonePollResult()));
+
+  auto future = base::Stream<base::StatusOr<int>>(std::move(pollable))
+                    .Collect(base::StatusOrVectorCollector<int>());
+  ASSERT_TRUE(future.Poll(&ctx_).IsPending());
+  ASSERT_TRUE(future.Poll(&ctx_).IsPending());
+  ASSERT_THAT(future.Poll(&ctx_).item().value(), ElementsAre(1024, 2048));
+}
+
+TEST_F(StreamUnittest, StreamFrom) {
+  auto stream = base::StreamFrom(std::vector<int>({1, 2, 4}));
+
+  ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
+  ASSERT_EQ(stream.PollNext(&ctx_).item(), 2);
+  ASSERT_EQ(stream.PollNext(&ctx_).item(), 4);
+  ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
+}
+
+TEST_F(StreamUnittest, EmptyStream) {
+  auto stream = base::EmptyStream<int>();
+  ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
+}
+
+TEST_F(StreamUnittest, StreamOf) {
+  auto stream = base::StreamOf(1, 2);
+
+  ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
+  ASSERT_EQ(stream.PollNext(&ctx_).item(), 2);
+  ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
+}
+
+TEST_F(StreamUnittest, StreamFromFuture) {
+  std::unique_ptr<MockPollable<int>> int_pollable(new MockPollable<int>());
+  EXPECT_CALL(*int_pollable, Poll(_))
+      .WillOnce(Return(PendingPollResult()))
+      .WillOnce(Return(FuturePollResult<int>(1)));
+
+  auto stream =
+      base::StreamFromFuture(base::Future<int>(std::move(int_pollable)));
+
+  ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
+  ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
+  ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
+}
+
+TEST_F(StreamUnittest, OnDestroyStream) {
+  bool destroyed = false;
+  {
+    auto stream =
+        base::OnDestroyStream<int>([&destroyed]() { destroyed = true; });
+    ASSERT_FALSE(destroyed);
+    ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
+    ASSERT_FALSE(destroyed);
+  }
+  ASSERT_TRUE(destroyed);
+}
+
+TEST_F(StreamUnittest, FlattenStreams) {
+  std::unique_ptr<MockStreamPollable<int>> a(new MockStreamPollable<int>());
+  EXPECT_CALL(*a, PollNext(_))
+      .WillOnce([](PollContext* ctx) {
+        ctx->RegisterInterested(1);
+        return PendingPollResult();
+      })
+      .WillOnce(Return(StreamPollResult<int>(1)))
+      .WillOnce(Return(DonePollResult()));
+
+  std::unique_ptr<MockStreamPollable<int>> b(new MockStreamPollable<int>());
+  EXPECT_CALL(*b, PollNext(_))
+      .WillOnce([](PollContext* ctx) {
+        ctx->RegisterInterested(2);
+        return PendingPollResult();
+      })
+      .WillOnce([](PollContext* ctx) {
+        ctx->RegisterInterested(2);
+        return PendingPollResult();
+      })
+      .WillOnce(Return(StreamPollResult<int>(2)))
+      .WillOnce(Return(DonePollResult()));
+
+  std::unique_ptr<MockStreamPollable<int>> c(new MockStreamPollable<int>());
+  EXPECT_CALL(*c, PollNext(_))
+      .WillOnce(Return(StreamPollResult<int>(3)))
+      .WillOnce([](PollContext* ctx) {
+        ctx->RegisterInterested(3);
+        ctx->RegisterInterested(4);
+        return PendingPollResult();
+      })
+      .WillOnce(Return(DonePollResult()));
+
+  std::vector<Stream<int>> streams;
+  streams.emplace_back(std::move(a));
+  streams.emplace_back(std::move(b));
+  streams.emplace_back(std::move(c));
+
+  auto stream = base::FlattenStreams(std::move(streams));
+  ASSERT_EQ(stream.PollNext(&ctx_).item(), 3);
+  ASSERT_THAT(interested_, ElementsAre());
+
+  ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
+  ASSERT_THAT(interested_, ElementsAre(1, 2, 3, 4));
+
+  interested_.clear();
+  ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
+  ASSERT_THAT(interested_, ElementsAre(1, 2, 3, 4));
+
+  interested_.clear();
+  ready_ = {1};
+  ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
+  ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
+  ASSERT_THAT(interested_, ElementsAre(2, 3, 4));
+
+  interested_.clear();
+  ready_ = {};
+  ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
+  ASSERT_THAT(interested_, ElementsAre(2, 3, 4));
+
+  interested_.clear();
+  ready_ = {1, 2, 3};
+  ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
+  ASSERT_EQ(stream.PollNext(&ctx_).item(), 2);
+  ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
+}
+
+}  // namespace
+}  // namespace base
+}  // namespace perfetto