perfetto: add minimal implementation of Streams
This CL adds a minimal implementation of streams in the Perfetto
codebase. This is mainly intended to be used by cloud trace processor.
This is largely analogous to aosp/2381492 which introduced futures.
Change-Id: I9809dbf67320ae7103373470206e2829023ab159
diff --git a/Android.bp b/Android.bp
index 7392a26..ce5b623 100644
--- a/Android.bp
+++ b/Android.bp
@@ -8182,6 +8182,7 @@
filegroup {
name: "perfetto_src_base_threading_threading",
srcs: [
+ "src/base/threading/stream_combinators.cc",
"src/base/threading/thread_pool.cc",
],
}
@@ -8192,6 +8193,7 @@
srcs: [
"src/base/threading/channel_unittest.cc",
"src/base/threading/future_unittest.cc",
+ "src/base/threading/stream_unittest.cc",
"src/base/threading/thread_pool_unittest.cc",
],
}
diff --git a/include/perfetto/ext/base/threading/BUILD.gn b/include/perfetto/ext/base/threading/BUILD.gn
index 5f3dfa9..151d10f 100644
--- a/include/perfetto/ext/base/threading/BUILD.gn
+++ b/include/perfetto/ext/base/threading/BUILD.gn
@@ -20,6 +20,8 @@
"future.h",
"future_combinators.h",
"poll.h",
+ "stream.h",
+ "stream_combinators.h",
"thread_pool.h",
]
deps = [
diff --git a/include/perfetto/ext/base/threading/future.h b/include/perfetto/ext/base/threading/future.h
index 6ea216e..c509e9b 100644
--- a/include/perfetto/ext/base/threading/future.h
+++ b/include/perfetto/ext/base/threading/future.h
@@ -80,15 +80,15 @@
Future(T item) : pollable_(new ImmediateImpl<T>(std::move(item))) {}
// Intentionally implicit to allow for egonomic definition of functions
- // returning Future<base::StatusOr<T>> by simply returning base::ErrStatus.
+ // returning Future<StatusOr<T>> by simply returning ErrStatus.
// The enable_if is necessary because this definition is the same as the above
// constructor in cases where T = base::Status.
template <typename U = T,
- typename = std::enable_if_t<!std::is_same_v<base::Status, U>>>
- Future(base::Status status) : Future(T(std::move(status))) {}
+ typename = std::enable_if_t<!std::is_same_v<Status, U>>>
+ Future(Status status) : Future(T(std::move(status))) {}
// Intentionally implicit to allow for egonomic definition of functions
- // returning Future<base::StatusOr<T>> by simply returning T.
+ // returning Future<StatusOr<T>> by simply returning T.
template <typename U = T, typename = typename U::value_type>
Future(typename U::value_type val) : Future(T(std::move(val))) {}
@@ -132,12 +132,12 @@
// Alias to shorten type defintions for Future<Status> which is common in
// the codebase.
-using StatusFuture = base::Future<base::Status>;
+using StatusFuture = Future<Status>;
// Alias to shorten type defintions for Future<StatusOr<T>> which is common
// in the codebase.
template <typename T>
-using StatusOrFuture = base::Future<base::StatusOr<T>>;
+using StatusOrFuture = Future<StatusOr<T>>;
} // namespace base
} // namespace perfetto
diff --git a/include/perfetto/ext/base/threading/poll.h b/include/perfetto/ext/base/threading/poll.h
index b162a32..5989361 100644
--- a/include/perfetto/ext/base/threading/poll.h
+++ b/include/perfetto/ext/base/threading/poll.h
@@ -96,6 +96,68 @@
virtual FuturePollResult<T> Poll(PollContext*) = 0;
};
+// Indicates that the Stream has been exhausted and no more values will be
+// returned.
+struct DonePollResult {};
+
+// Return value of Stream<T>::Poll.
+//
+// Essentially a wrapper around std::variant<T, PendingPollResult,
+// DonePollResult> but with higher level API.
+template <typename T>
+class StreamPollResult {
+ public:
+ using PollT = T;
+
+ // Intentionally implicit to allow idiomatic returns.
+ StreamPollResult(const PendingPollResult&) : inner_(PendingPollResult()) {}
+ StreamPollResult(const DonePollResult&) : inner_(DonePollResult()) {}
+ StreamPollResult(T item) : inner_(std::move(item)) {}
+
+ // Returns whether the Stream is still pending.
+ bool IsPending() const {
+ return std::holds_alternative<PendingPollResult>(inner_);
+ }
+
+ // Returns whether the Stream is done.
+ bool IsDone() const { return std::holds_alternative<DonePollResult>(inner_); }
+
+ // The real value inside this result: requires !IsPending() and !IsDone().
+ T& item() {
+ PERFETTO_DCHECK(!IsPending());
+ PERFETTO_DCHECK(!IsDone());
+ return std::get<T>(inner_);
+ }
+ const T& item() const {
+ PERFETTO_DCHECK(!IsPending());
+ PERFETTO_DCHECK(!IsDone());
+ return std::get<T>(inner_);
+ }
+
+ // The real value inside this result: requires !IsPending() and !IsDone().
+ T* operator->() { return &item(); }
+ const T* operator->() const { return &item(); }
+
+ private:
+ std::variant<PendingPollResult, DonePollResult, T> inner_;
+};
+
+// Interface for implementing the Stream<T>::Poll function.
+//
+// This is essentially analagous to FuturePollable<T> for streams: check the
+// documentation of that class for why this exists.
+template <typename T>
+class StreamPollable {
+ public:
+ using PollT = T;
+
+ virtual ~StreamPollable() = default;
+
+ // Implementation of the Poll function of a Stream: see Stream documentation
+ // for how this should be implemented.
+ virtual StreamPollResult<T> PollNext(PollContext*) = 0;
+};
+
// Context class passed to Pollable classes.
//
// Implementations of Pollable which simply wrap another Pollable will use
@@ -151,13 +213,31 @@
// ASSIGN_OR_RETURN_IF_PENDING_FUTURE(res, MyIntReturningFutureFn());
// return std::to_string(*foo);
// }
-//
#define ASSIGN_OR_RETURN_IF_PENDING_FUTURE(var, expr) \
auto assign_and_return_if_poll_##var = (expr); \
if (assign_and_return_if_poll_##var.IsPending()) \
return base::PendingPollResult(); \
auto var = std::move(assign_and_return_if_poll_##var.item())
+// Evaluates |expr|, which should return a PollResult. If IsPending is
+// true, returns base::PendingPollResult().
+//
+// Example usage:
+//
+// Strean<int> MyIntReturningStreamFn();
+//
+// StreamPollResult<std::string> Poll(PollContext* ctx) {
+// ASSIGN_OR_RETURN_IF_PENDING_STREAM(res, MyIntReturningStreamFn());
+// if (res.IsDone()) {
+// return DonePollResult();
+// }
+// return std::to_string(*foo);
+// }
+#define ASSIGN_OR_RETURN_IF_PENDING_STREAM(var, expr) \
+ auto var = (expr); \
+ if (var.IsPending()) \
+ return base::PendingPollResult()
+
} // namespace base
} // namespace perfetto
diff --git a/include/perfetto/ext/base/threading/stream.h b/include/perfetto/ext/base/threading/stream.h
new file mode 100644
index 0000000..4d67eb4
--- /dev/null
+++ b/include/perfetto/ext/base/threading/stream.h
@@ -0,0 +1,185 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_H_
+#define INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_H_
+
+#include <functional>
+#include <memory>
+#include <vector>
+
+#include "perfetto/base/status.h"
+#include "perfetto/ext/base/status_or.h"
+#include "perfetto/ext/base/threading/future.h"
+#include "perfetto/ext/base/threading/stream_combinators.h"
+
+namespace perfetto {
+namespace base {
+
+// Creates a Stream<T> from P, a subclass of StreamPollable<T>.
+//
+// This function follows the same pattern of std::make_unique, std::make_shared
+// etc.
+template <typename P, typename... Args>
+Stream<typename P::PollT> MakeStream(Args... args) {
+ return Stream<typename P::PollT>(
+ std::unique_ptr<StreamPollable<typename P::PollT>>(
+ new P(std::forward<Args>(args)...)));
+}
+
+// An asynchronous iterator for values of type T.
+//
+// If Future<T> is an asynchronous version of T, Stream<T> is an asynchronous
+// version of Iterator<T>. Long-running compute/IO operations which return
+// multiple values can be represented with a Stream<T>.
+//
+// Refer to the class documentation for Future<T> as most of the features and
+// implementation of Future<T> also apply to Stream<T>.
+template <typename T>
+class Stream {
+ public:
+ using PollableItem = T;
+
+ // Creates a Stream from a |StreamPollable<T>|. Prefer using |MakeStream|
+ // instead of this function.
+ explicit Stream(std::unique_ptr<StreamPollable<T>> pollable)
+ : pollable_(std::move(pollable)) {}
+
+ // Converts a Stream<T> to Stream<U>. This works by applying |map_fn| to each
+ // element in T and then polling the returned Future<U> to completion.
+ template <typename Function /* = Future<U>(T) */>
+ Stream<FutureReturn<Function, T>> MapFuture(Function map_fn) && {
+ return MakeStream<MapFutureStreamImpl<Function, T>>(std::move(*this),
+ std::move(map_fn));
+ }
+
+ // Creates a stream which fully polls |this| and then polls |concat| to
+ // completion.
+ Stream<T> Concat(Stream<T> concat) && {
+ return MakeStream<ConcatStreamImpl<T>>(std::move(*this), std::move(concat));
+ }
+
+ // Converts a Stream<T> to Future<U> by collecting elements using |collector|.
+ // See documentation on |Collector| for how to implement one.
+ template <typename U>
+ Future<U> Collect(std::unique_ptr<Collector<T, U>> collector) && {
+ return MakeFuture<CollectImpl<T, U>>(std::move(*this),
+ std::move(collector));
+ }
+
+ // Checks if the computation backing this Stream<T> has finished.
+ //
+ // Returns a StreamPollResult<T> which is a essentially a
+ // variant<PendingPollResult, DonePollResult T>. If PendingPollResult is
+ // returned, |ctx| will be used to register interest in the various fds which
+ // are "blocking" this future from finishing. If DonePollResult is returned,
+ // Poll *must not* be called again.
+ StreamPollResult<T> PollNext(PollContext* ctx) {
+ return pollable_->PollNext(ctx);
+ }
+
+ private:
+ std::unique_ptr<StreamPollable<T>> pollable_;
+};
+
+// Alias to shorten type defintions for Stream<Status> which is common in
+// the codebase.
+using StatusStream = Stream<Status>;
+
+// Alias to shorten type defintions for Stream<StatusOr<T>> which is common
+// in the codebase.
+template <typename T>
+using StatusOrStream = Stream<StatusOr<T>>;
+
+// Creates a Stream<T> which returns the next value inside |vector| every time
+// Stream<T>::Poll is called.
+template <typename T>
+Stream<T> StreamFrom(std::vector<T> vector) {
+ return MakeStream<ImmediateStreamImpl<T>>(std::move(vector));
+}
+
+// Creates a Stream<T> which immediately returns DonePollResult when polled.
+template <typename T>
+Stream<T> EmptyStream() {
+ return StreamFrom(std::vector<T>());
+}
+
+// Creates a Stream<T> which returns |first| and each of |rest| in sequence when
+// polled.
+template <typename T, typename... Ts>
+Stream<T> StreamOf(T first, Ts... rest) {
+ std::vector<T> values;
+ AddAllToVector(values, std::forward<T>(first), std::forward<Ts>(rest)...);
+ return StreamFrom(std::move(values));
+}
+
+// Creates a Stream<T> which returns the value of |future| before completing.
+template <typename T>
+Stream<T> StreamFromFuture(Future<T> future) {
+ return StreamOf(std::move(future)).MapFuture([](Future<T> value) { return value; });
+}
+
+// Creates a stream which returns no elements but calls |fn| in the destructor
+// of the returned stream.
+//
+// This function can be used to do resource management for a stream by making
+// the passed |fn| own the resources used by any "upstream" sources and then
+// Concat-ing this stream with the upstream.
+template <typename T, typename Function>
+Stream<T> OnDestroyStream(Function fn) {
+ return MakeStream<OnDestroyStreamImpl<T, Function>>(std::move(fn));
+}
+
+// Creates a Stream<T> returning values generated by each stream in |streams| as
+// soon as they are produced without preserving ordering.
+//
+// The returned Stream<T> keeps the amount of Poll calls to the inner |streams|,
+// to a minimum only calling Poll for the Streams which are marked are ready
+// in the PollContext.
+template <typename T>
+Stream<T> FlattenStreams(std::vector<Stream<T>> streams) {
+ return MakeStream<FlattenImpl<T>>(std::move(streams));
+}
+
+// Collector for Stream<Status>::Collect() which immediately resolves the
+// returned Future when an error status is detected. Resolves with
+// OkStatus once the entire stream finishes after returning all OkStatus().
+inline std::unique_ptr<Collector<Status, Status>> AllOkCollector() {
+ return std::make_unique<AllOkCollectorImpl>();
+}
+
+// Collector for Stream<T>::Collect() which ensures the stream returns *exactly*
+// one T before completing. Crashes if either a) no values are produced by
+// the Stream, b) more than one value is produced by the Stream.
+template <typename T>
+inline std::unique_ptr<Collector<T, T>> ToFutureCheckedCollector() {
+ return std::make_unique<FutureCheckedCollectorImpl<T>>();
+}
+
+// Collector for Stream<StatusOr<T>>::Collect() which returns a vector
+// containing all the successful results from the stream. If any element is an
+// error, short-circuits the stream with the error.
+template <typename T>
+inline std::unique_ptr<
+ Collector<StatusOr<T>, StatusOr<std::vector<T>>>>
+StatusOrVectorCollector() {
+ return std::make_unique<StatusOrVectorCollectorImpl<T>>();
+}
+
+} // namespace base
+} // namespace perfetto
+
+#endif // INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_H_
diff --git a/include/perfetto/ext/base/threading/stream_combinators.h b/include/perfetto/ext/base/threading/stream_combinators.h
new file mode 100644
index 0000000..5485139
--- /dev/null
+++ b/include/perfetto/ext/base/threading/stream_combinators.h
@@ -0,0 +1,315 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_COMBINATORS_H_
+#define INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_COMBINATORS_H_
+
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "perfetto/base/status.h"
+#include "perfetto/ext/base/optional.h"
+#include "perfetto/ext/base/status_or.h"
+#include "perfetto/ext/base/threading/future_combinators.h"
+#include "perfetto/ext/base/threading/poll.h"
+
+namespace perfetto {
+namespace base {
+
+template <typename T>
+class Stream;
+
+// Helper function for adding all the elements in parameter pack to a vector.
+template <typename T, typename... Elements>
+void AddAllToVector(std::vector<T>&) {}
+
+template <typename T, typename... Elements>
+void AddAllToVector(std::vector<T>& vec, T first, Elements... rest) {
+ vec.emplace_back(std::forward<T>(first));
+ AddAllToVector(vec, std::forward<Elements>(rest)...);
+}
+
+// For a Function which returns Stream<U>, returns the U.
+template <typename Function, typename T>
+using StreamReturn =
+ typename std::invoke_result<Function, T>::type::PollableItem;
+
+// Implementation of StreamPollable for creating a Stream<T> from a
+// std::vector<T>.
+template <typename T>
+class ImmediateStreamImpl : public StreamPollable<T> {
+ public:
+ explicit ImmediateStreamImpl(std::vector<T> values)
+ : values_(std::move(values)) {}
+
+ StreamPollResult<T> PollNext(PollContext*) override {
+ if (index_ >= values_.size()) {
+ return DonePollResult();
+ }
+ return StreamPollResult<T>(std::move(values_[index_++]));
+ }
+
+ private:
+ std::vector<T> values_;
+ uint32_t index_ = 0;
+};
+
+// Implementation of a StreamPollable for creating a Stream<U> from a Stream<T>
+// and a functor with prototype Future<U>(T).
+template <typename Function, typename T>
+class MapFutureStreamImpl : public StreamPollable<FutureReturn<Function, T>> {
+ public:
+ using U = FutureReturn<Function, T>;
+
+ MapFutureStreamImpl(Stream<T> stream, Function map_fn)
+ : stream_(std::move(stream)), map_fn_(std::move(map_fn)) {}
+
+ StreamPollResult<U> PollNext(PollContext* context) override {
+ if (!future_) {
+ ASSIGN_OR_RETURN_IF_PENDING_STREAM(res, stream_.PollNext(context));
+ if (res.IsDone()) {
+ return DonePollResult();
+ }
+ future_ = map_fn_(std::move(res.item()));
+ }
+ ASSIGN_OR_RETURN_IF_PENDING_FUTURE(res, future_->Poll(context));
+ future_ = nullopt;
+ return res;
+ }
+
+ private:
+ Stream<T> stream_;
+ Function map_fn_;
+ Optional<Future<U>> future_;
+};
+
+// Implementation of a StreamPollable for creating a concatenating two streams
+// together.
+template <typename T>
+class ConcatStreamImpl : public StreamPollable<T> {
+ public:
+ explicit ConcatStreamImpl(Stream<T> first, Stream<T> second)
+ : first_(std::move(first)), second_(std::move(second)) {}
+
+ StreamPollResult<T> PollNext(PollContext* context) override {
+ if (first_) {
+ ASSIGN_OR_RETURN_IF_PENDING_STREAM(res, first_->PollNext(context));
+ if (!res.IsDone()) {
+ return res.item();
+ }
+ first_ = base::nullopt;
+ }
+ return second_.PollNext(context);
+ }
+
+ private:
+ base::Optional<Stream<T>> first_;
+ Stream<T> second_;
+};
+
+// Implementation of a StreamPollable for creating a Stream<T> from a
+// std::vector<Stream<T>>. Values are returned from the inner streams as soon as
+// they are available.
+template <typename T>
+class FlattenImpl : public StreamPollable<T> {
+ public:
+ explicit FlattenImpl(std::vector<Stream<T>> streams)
+ : registered_handles_(static_cast<uint32_t>(streams.size())) {
+ for (auto& stream : streams) {
+ streams_.emplace_back(std::move(stream));
+ }
+ }
+
+ StreamPollResult<T> PollNext(PollContext* upstream) override {
+ for (uint32_t i = 0; i < streams_.size(); ++i) {
+ auto& stream = streams_[i];
+ if (!stream) {
+ continue;
+ }
+ Optional<PollContext> ctx = PollContextForStream(upstream, i);
+ if (!ctx) {
+ continue;
+ }
+ StreamPollResult<T> res = stream->PollNext(&*ctx);
+ if (res.IsPending()) {
+ PERFETTO_CHECK(!registered_handles_[i].empty());
+ continue;
+ }
+ if (!res.IsDone()) {
+ return res;
+ }
+ // StreamPollable has returned EOF. Clear it and the registered handles
+ // out.
+ stream = nullopt;
+ ++eof_streams_;
+ registered_handles_[i].clear();
+ }
+
+ // Every child stream being EOF means we have reached EOF as well.
+ if (eof_streams_ == streams_.size()) {
+ return DonePollResult();
+ }
+ // Every remaining stream must be pending so we can make no further
+ // progress. Register all the child handles with the context and return.
+ for (const FlatSet<PlatformHandle>& handles : registered_handles_) {
+ upstream->RegisterAllInterested(handles);
+ }
+ return PendingPollResult();
+ }
+
+ private:
+ Optional<PollContext> PollContextForStream(PollContext* upstream,
+ uint32_t stream_idx) {
+ FlatSet<PlatformHandle>& state = registered_handles_[stream_idx];
+ if (state.empty()) {
+ return PollContext(&state, &upstream->ready_handles());
+ }
+ for (PlatformHandle handle : upstream->ready_handles()) {
+ if (state.count(handle)) {
+ state.clear();
+ return PollContext(&state, &upstream->ready_handles());
+ }
+ }
+ return base::nullopt;
+ }
+
+ std::vector<Optional<Stream<T>>> streams_;
+ std::vector<FlatSet<PlatformHandle>> registered_handles_;
+ uint32_t eof_streams_ = 0;
+};
+
+// Implementation of a Stream<T> which immediately completes and calls a
+// function in the destructor.
+template <typename T, typename Function>
+class OnDestroyStreamImpl : public StreamPollable<T> {
+ public:
+ explicit OnDestroyStreamImpl(Function fn) : fn_(std::move(fn)) {}
+ ~OnDestroyStreamImpl() override { fn_(); }
+
+ StreamPollResult<T> PollNext(PollContext*) override {
+ return DonePollResult();
+ }
+
+ private:
+ Function fn_;
+};
+
+// Interface for converting a Stream<T> into a Future<U>.
+//
+// The goal of this interface is to allow a Stream to be converted to a Future,
+// allowing short-circuiting (i.e. allowing the Future to complete before
+// the stream finishes).
+//
+// The flexibility of this interface allows both supporting the traditional
+// notion of collecting i.e. converting a Stream<T> to a Future<vector<T>> but
+// also more advanced functionality like completing a Future<Status> early
+// when errors are detected, racing Future<T> against each other and returning
+// the first value produced etc.
+template <typename T, typename U>
+class Collector {
+ public:
+ virtual ~Collector() = default;
+
+ // Receives the next item from a Stream<T>. If the wrapping Future<U> can be
+ // completed, returns the a value U which completes that future. Otherwise,
+ // returns base::nullopt.
+ virtual Optional<U> OnNext(T value) = 0;
+
+ // Called when the stream has completed and returns the |U| which will be
+ // used to complete the future. This method will only be called if OnNext
+ // returned nullopt for every element in the stream.
+ virtual U OnDone() = 0;
+};
+
+// Implementation of a StreamPollable which converts a Stream<T> to a Future<U>
+// using an implementation of Collector<T, U>.
+template <typename T, typename U>
+class CollectImpl : public FuturePollable<U> {
+ public:
+ explicit CollectImpl(Stream<T> stream,
+ std::unique_ptr<Collector<T, U>> collector)
+ : stream_(std::move(stream)), collector_(std::move(collector)) {}
+
+ FuturePollResult<U> Poll(PollContext* context) override {
+ for (;;) {
+ ASSIGN_OR_RETURN_IF_PENDING_STREAM(res, stream_.PollNext(context));
+ if (res.IsDone()) {
+ return collector_->OnDone();
+ }
+ Optional<U> collected = collector_->OnNext(std::move(res.item()));
+ if (collected.has_value()) {
+ return std::move(collected.value());
+ }
+ }
+ }
+
+ private:
+ Stream<T> stream_;
+ std::unique_ptr<Collector<T, U>> collector_;
+};
+
+// Implementation for |AllOkCollector|.
+class AllOkCollectorImpl : public Collector<Status, Status> {
+ public:
+ ~AllOkCollectorImpl() override;
+
+ Optional<Status> OnNext(Status status) override {
+ return status.ok() ? nullopt : make_optional(std::move(status));
+ }
+ Status OnDone() override { return OkStatus(); }
+};
+
+// Implementation for |ToFutureCheckedCollector|.
+template <typename T>
+class FutureCheckedCollectorImpl : public Collector<T, T> {
+ public:
+ Optional<T> OnNext(T value) override {
+ PERFETTO_CHECK(!prev_value_);
+ prev_value_ = value;
+ return nullopt;
+ }
+ T OnDone() override { return *prev_value_; }
+
+ private:
+ Optional<T> prev_value_;
+};
+
+// Implementation for |StatusOrVectorCollector|.
+template <typename T>
+class StatusOrVectorCollectorImpl
+ : public Collector<base::StatusOr<T>, base::StatusOr<std::vector<T>>> {
+ public:
+ Optional<base::StatusOr<std::vector<T>>> OnNext(
+ base::StatusOr<T> val_or) override {
+ if (!val_or.ok()) {
+ return make_optional(val_or.status());
+ }
+ values_.emplace_back(std::move(val_or.value()));
+ return nullopt;
+ }
+ base::StatusOr<std::vector<T>> OnDone() override {
+ return std::move(values_);
+ }
+
+ private:
+ std::vector<T> values_;
+};
+
+} // namespace base
+} // namespace perfetto
+
+#endif // INCLUDE_PERFETTO_EXT_BASE_THREADING_STREAM_COMBINATORS_H_
diff --git a/src/base/threading/BUILD.gn b/src/base/threading/BUILD.gn
index 0e4f150..cdf2363 100644
--- a/src/base/threading/BUILD.gn
+++ b/src/base/threading/BUILD.gn
@@ -17,7 +17,10 @@
source_set("threading") {
deps = [ "../../../gn:default_deps" ]
public_deps = [ "../../../include/perfetto/ext/base/threading" ]
- sources = [ "thread_pool.cc" ]
+ sources = [
+ "stream_combinators.cc",
+ "thread_pool.cc",
+ ]
}
perfetto_unittest_source_set("unittests") {
@@ -31,6 +34,7 @@
sources = [
"channel_unittest.cc",
"future_unittest.cc",
+ "stream_unittest.cc",
"thread_pool_unittest.cc",
]
}
diff --git a/src/base/threading/stream_combinators.cc b/src/base/threading/stream_combinators.cc
new file mode 100644
index 0000000..9c21c9b
--- /dev/null
+++ b/src/base/threading/stream_combinators.cc
@@ -0,0 +1,25 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "perfetto/ext/base/threading/stream.h"
+
+namespace perfetto {
+namespace base {
+
+AllOkCollectorImpl::~AllOkCollectorImpl() = default;
+
+} // namespace base
+} // namespace perfetto
diff --git a/src/base/threading/stream_unittest.cc b/src/base/threading/stream_unittest.cc
new file mode 100644
index 0000000..3036bd9
--- /dev/null
+++ b/src/base/threading/stream_unittest.cc
@@ -0,0 +1,329 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "perfetto/ext/base/threading/stream.h"
+
+#include <vector>
+
+#include "perfetto/base/status.h"
+#include "perfetto/ext/base/threading/future_combinators.h"
+#include "perfetto/ext/base/threading/poll.h"
+#include "test/gtest_and_gmock.h"
+
+namespace perfetto {
+namespace base {
+namespace {
+
+using testing::_;
+using testing::ElementsAre;
+using testing::Return;
+
+template <typename T>
+class MockPollable : public FuturePollable<T> {
+ public:
+ MOCK_METHOD1(Poll, FuturePollResult<T>(PollContext*));
+};
+
+template <typename T>
+class MockStreamPollable : public StreamPollable<T> {
+ public:
+ MOCK_METHOD1(PollNext, StreamPollResult<T>(PollContext*));
+};
+
+class StreamUnittest : public ::testing::Test {
+ protected:
+ base::FlatSet<base::PlatformHandle> interested_;
+ base::FlatSet<base::PlatformHandle> ready_;
+ PollContext ctx_{&interested_, &ready_};
+};
+
+TEST_F(StreamUnittest, PollableImmediateResult) {
+ std::unique_ptr<MockStreamPollable<int>> int_pollable(
+ new MockStreamPollable<int>());
+ EXPECT_CALL(*int_pollable, PollNext(_))
+ .WillOnce(Return(StreamPollResult<int>(0)));
+
+ base::Stream<int> stream(std::move(int_pollable));
+ auto res = stream.PollNext(&ctx_);
+ ASSERT_FALSE(res.IsPending());
+ ASSERT_EQ(res.item(), 0);
+}
+
+TEST_F(StreamUnittest, PollablePendingThenResult) {
+ std::unique_ptr<MockStreamPollable<int>> int_pollable(
+ new MockStreamPollable<int>());
+ EXPECT_CALL(*int_pollable, PollNext(_))
+ .WillOnce(Return(PendingPollResult()))
+ .WillOnce(Return(StreamPollResult<int>(1)))
+ .WillOnce(Return(DonePollResult()));
+
+ base::Stream<int> stream(std::move(int_pollable));
+ ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
+ ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
+ ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
+}
+
+TEST_F(StreamUnittest, Map) {
+ std::unique_ptr<MockStreamPollable<int>> int_pollable(
+ new MockStreamPollable<int>());
+ EXPECT_CALL(*int_pollable, PollNext(_))
+ .WillOnce(Return(PendingPollResult()))
+ .WillOnce(Return(StreamPollResult<int>(1)))
+ .WillOnce(Return(PendingPollResult()))
+ .WillOnce(Return(StreamPollResult<int>(2)))
+ .WillOnce(Return(DonePollResult()));
+
+ auto stream = base::Stream<int>(std::move(int_pollable))
+ .MapFuture([](int res) -> base::Future<std::string> {
+ return std::to_string(res);
+ });
+ ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
+ ASSERT_EQ(stream.PollNext(&ctx_).item(), "1");
+ ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
+ ASSERT_EQ(stream.PollNext(&ctx_).item(), "2");
+ ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
+}
+
+TEST_F(StreamUnittest, Concat) {
+ std::unique_ptr<MockStreamPollable<int>> int_pollable(
+ new MockStreamPollable<int>());
+ EXPECT_CALL(*int_pollable, PollNext(_))
+ .WillOnce(Return(PendingPollResult()))
+ .WillOnce(Return(StreamPollResult<int>(1)))
+ .WillOnce(Return(StreamPollResult<int>(2)))
+ .WillOnce(Return(DonePollResult()));
+
+ std::unique_ptr<MockStreamPollable<int>> concat_pollable(
+ new MockStreamPollable<int>());
+ EXPECT_CALL(*concat_pollable, PollNext(_))
+ .WillOnce(Return(PendingPollResult()))
+ .WillOnce(Return(StreamPollResult<int>(3)))
+ .WillOnce(Return(PendingPollResult()))
+ .WillOnce(Return(StreamPollResult<int>(4)))
+ .WillOnce(Return(DonePollResult()));
+
+ auto stream = base::Stream<int>(std::move(int_pollable))
+ .Concat(base::Stream<int>(std::move(concat_pollable)));
+ ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
+ ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
+ ASSERT_EQ(stream.PollNext(&ctx_).item(), 2);
+ ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
+ ASSERT_EQ(stream.PollNext(&ctx_).item(), 3);
+ ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
+ ASSERT_EQ(stream.PollNext(&ctx_).item(), 4);
+ ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
+}
+
+TEST_F(StreamUnittest, AllOkCollectorEarly) {
+ std::unique_ptr<MockStreamPollable<base::Status>> pollable(
+ new MockStreamPollable<base::Status>());
+ EXPECT_CALL(*pollable, PollNext(_))
+ .WillOnce(Return(PendingPollResult()))
+ .WillOnce(Return(StreamPollResult<base::Status>(base::OkStatus())))
+ .WillOnce(Return(PendingPollResult()))
+ .WillOnce(Return(StreamPollResult<base::Status>(base::ErrStatus("Bad"))));
+
+ auto future = base::Stream<base::Status>(std::move(pollable))
+ .Collect(base::AllOkCollector());
+ ASSERT_TRUE(future.Poll(&ctx_).IsPending());
+ ASSERT_TRUE(future.Poll(&ctx_).IsPending());
+ ASSERT_FALSE(future.Poll(&ctx_).item().ok());
+}
+
+TEST_F(StreamUnittest, AllOkCollectorComplete) {
+ std::unique_ptr<MockStreamPollable<base::Status>> pollable(
+ new MockStreamPollable<base::Status>());
+ EXPECT_CALL(*pollable, PollNext(_))
+ .WillOnce(Return(PendingPollResult()))
+ .WillOnce(Return(StreamPollResult<base::Status>(base::OkStatus())))
+ .WillOnce(Return(PendingPollResult()))
+ .WillOnce(Return(StreamPollResult<base::Status>(base::OkStatus())))
+ .WillOnce(Return(StreamPollResult<base::Status>(base::OkStatus())))
+ .WillOnce(Return(DonePollResult()));
+
+ auto future = base::Stream<base::Status>(std::move(pollable))
+ .Collect(base::AllOkCollector());
+ ASSERT_TRUE(future.Poll(&ctx_).IsPending());
+ ASSERT_TRUE(future.Poll(&ctx_).IsPending());
+ ASSERT_TRUE(future.Poll(&ctx_).item().ok());
+}
+
+TEST_F(StreamUnittest, ToFutureCheckedCollector) {
+ std::unique_ptr<MockStreamPollable<base::Status>> pollable(
+ new MockStreamPollable<base::Status>());
+ EXPECT_CALL(*pollable, PollNext(_))
+ .WillOnce(Return(PendingPollResult()))
+ .WillOnce(Return(StreamPollResult<base::Status>(base::OkStatus())))
+ .WillOnce(Return(DonePollResult()));
+
+ auto future = base::Stream<base::Status>(std::move(pollable))
+ .Collect(base::ToFutureCheckedCollector<base::Status>());
+ ASSERT_TRUE(future.Poll(&ctx_).IsPending());
+ ASSERT_TRUE(future.Poll(&ctx_).item().ok());
+}
+
+TEST_F(StreamUnittest, StatusOrCollectorEarly) {
+ std::unique_ptr<MockStreamPollable<base::StatusOr<int>>> pollable(
+ new MockStreamPollable<base::StatusOr<int>>());
+ EXPECT_CALL(*pollable, PollNext(_))
+ .WillOnce(Return(PendingPollResult()))
+ .WillOnce(Return(StreamPollResult<base::StatusOr<int>>(1024)))
+ .WillOnce(Return(PendingPollResult()))
+ .WillOnce(Return(
+ StreamPollResult<base::StatusOr<int>>(base::ErrStatus("Bad"))));
+
+ auto future = base::Stream<base::StatusOr<int>>(std::move(pollable))
+ .Collect(base::StatusOrVectorCollector<int>());
+ ASSERT_TRUE(future.Poll(&ctx_).IsPending());
+ ASSERT_TRUE(future.Poll(&ctx_).IsPending());
+ ASSERT_FALSE(future.Poll(&ctx_).item().ok());
+}
+
+TEST_F(StreamUnittest, StatusOrCollectorComplete) {
+ std::unique_ptr<MockStreamPollable<base::StatusOr<int>>> pollable(
+ new MockStreamPollable<base::StatusOr<int>>());
+ EXPECT_CALL(*pollable, PollNext(_))
+ .WillOnce(Return(PendingPollResult()))
+ .WillOnce(Return(StreamPollResult<base::StatusOr<int>>(1024)))
+ .WillOnce(Return(PendingPollResult()))
+ .WillOnce(Return(StreamPollResult<base::StatusOr<int>>(2048)))
+ .WillOnce(Return(DonePollResult()));
+
+ auto future = base::Stream<base::StatusOr<int>>(std::move(pollable))
+ .Collect(base::StatusOrVectorCollector<int>());
+ ASSERT_TRUE(future.Poll(&ctx_).IsPending());
+ ASSERT_TRUE(future.Poll(&ctx_).IsPending());
+ ASSERT_THAT(future.Poll(&ctx_).item().value(), ElementsAre(1024, 2048));
+}
+
+TEST_F(StreamUnittest, StreamFrom) {
+ auto stream = base::StreamFrom(std::vector<int>({1, 2, 4}));
+
+ ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
+ ASSERT_EQ(stream.PollNext(&ctx_).item(), 2);
+ ASSERT_EQ(stream.PollNext(&ctx_).item(), 4);
+ ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
+}
+
+TEST_F(StreamUnittest, EmptyStream) {
+ auto stream = base::EmptyStream<int>();
+ ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
+}
+
+TEST_F(StreamUnittest, StreamOf) {
+ auto stream = base::StreamOf(1, 2);
+
+ ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
+ ASSERT_EQ(stream.PollNext(&ctx_).item(), 2);
+ ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
+}
+
+TEST_F(StreamUnittest, StreamFromFuture) {
+ std::unique_ptr<MockPollable<int>> int_pollable(new MockPollable<int>());
+ EXPECT_CALL(*int_pollable, Poll(_))
+ .WillOnce(Return(PendingPollResult()))
+ .WillOnce(Return(FuturePollResult<int>(1)));
+
+ auto stream =
+ base::StreamFromFuture(base::Future<int>(std::move(int_pollable)));
+
+ ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
+ ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
+ ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
+}
+
+TEST_F(StreamUnittest, OnDestroyStream) {
+ bool destroyed = false;
+ {
+ auto stream =
+ base::OnDestroyStream<int>([&destroyed]() { destroyed = true; });
+ ASSERT_FALSE(destroyed);
+ ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
+ ASSERT_FALSE(destroyed);
+ }
+ ASSERT_TRUE(destroyed);
+}
+
+TEST_F(StreamUnittest, FlattenStreams) {
+ std::unique_ptr<MockStreamPollable<int>> a(new MockStreamPollable<int>());
+ EXPECT_CALL(*a, PollNext(_))
+ .WillOnce([](PollContext* ctx) {
+ ctx->RegisterInterested(1);
+ return PendingPollResult();
+ })
+ .WillOnce(Return(StreamPollResult<int>(1)))
+ .WillOnce(Return(DonePollResult()));
+
+ std::unique_ptr<MockStreamPollable<int>> b(new MockStreamPollable<int>());
+ EXPECT_CALL(*b, PollNext(_))
+ .WillOnce([](PollContext* ctx) {
+ ctx->RegisterInterested(2);
+ return PendingPollResult();
+ })
+ .WillOnce([](PollContext* ctx) {
+ ctx->RegisterInterested(2);
+ return PendingPollResult();
+ })
+ .WillOnce(Return(StreamPollResult<int>(2)))
+ .WillOnce(Return(DonePollResult()));
+
+ std::unique_ptr<MockStreamPollable<int>> c(new MockStreamPollable<int>());
+ EXPECT_CALL(*c, PollNext(_))
+ .WillOnce(Return(StreamPollResult<int>(3)))
+ .WillOnce([](PollContext* ctx) {
+ ctx->RegisterInterested(3);
+ ctx->RegisterInterested(4);
+ return PendingPollResult();
+ })
+ .WillOnce(Return(DonePollResult()));
+
+ std::vector<Stream<int>> streams;
+ streams.emplace_back(std::move(a));
+ streams.emplace_back(std::move(b));
+ streams.emplace_back(std::move(c));
+
+ auto stream = base::FlattenStreams(std::move(streams));
+ ASSERT_EQ(stream.PollNext(&ctx_).item(), 3);
+ ASSERT_THAT(interested_, ElementsAre());
+
+ ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
+ ASSERT_THAT(interested_, ElementsAre(1, 2, 3, 4));
+
+ interested_.clear();
+ ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
+ ASSERT_THAT(interested_, ElementsAre(1, 2, 3, 4));
+
+ interested_.clear();
+ ready_ = {1};
+ ASSERT_EQ(stream.PollNext(&ctx_).item(), 1);
+ ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
+ ASSERT_THAT(interested_, ElementsAre(2, 3, 4));
+
+ interested_.clear();
+ ready_ = {};
+ ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
+ ASSERT_THAT(interested_, ElementsAre(2, 3, 4));
+
+ interested_.clear();
+ ready_ = {1, 2, 3};
+ ASSERT_TRUE(stream.PollNext(&ctx_).IsPending());
+ ASSERT_EQ(stream.PollNext(&ctx_).item(), 2);
+ ASSERT_TRUE(stream.PollNext(&ctx_).IsDone());
+}
+
+} // namespace
+} // namespace base
+} // namespace perfetto