blob: 34dccdb609da51f72c2af3c7e518a232522599d0 [file] [log] [blame]
/*
* Copyright (C) 2023 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef INCLUDE_PERFETTO_EXT_BASE_THREADING_UTIL_H_
#define INCLUDE_PERFETTO_EXT_BASE_THREADING_UTIL_H_
#include <functional>
#include <memory>
#include <type_traits>
#include <utility>
#include <vector>
#include "perfetto/base/status.h"
#include "perfetto/base/task_runner.h"
#include "perfetto/ext/base/optional.h"
#include "perfetto/ext/base/threading/channel.h"
#include "perfetto/ext/base/threading/future.h"
#include "perfetto/ext/base/threading/poll.h"
#include "perfetto/ext/base/threading/stream.h"
#include "perfetto/ext/base/threading/thread_pool.h"
#include "perfetto/ext/base/unix_task_runner.h"
namespace perfetto {
namespace base {
// Blocks the calling thread until |fd| is considered "readable". In Linux,
// this corresponds to |POLLOUT| or |POLLHUP| being returned if |fd| is polled.
inline void BlockUntilReadableFd(base::PlatformHandle fd) {
base::UnixTaskRunner runner;
runner.AddFileDescriptorWatch(fd, [&runner]() { runner.Quit(); });
runner.Run();
}
// Creates a Stream<T> which returns all the data from |channel| and completes
// when |channel| is closed.
//
// Note: the caller retains ownership of the passed channel and must ensure that
// the channel outlives the lifetime of the returned stream.
template <typename T>
Stream<T> ReadChannelStream(Channel<T>* channel) {
class ReadImpl : public StreamPollable<T> {
public:
explicit ReadImpl(Channel<T>* reader) : reader_(reader) {}
StreamPollResult<T> PollNext(PollContext* ctx) override {
auto result = reader_->ReadNonBlocking();
if (!result.item.has_value()) {
if (result.is_closed) {
return DonePollResult();
}
ctx->RegisterInterested(reader_->read_fd());
return PendingPollResult();
}
return std::move(*result.item);
}
private:
Channel<T>* reader_ = nullptr;
};
return MakeStream<ReadImpl>(channel);
}
// Creates a Future<FVoid> which handles writing |item| into |channel|. The
// Future is completed when the item is succesfully written.
//
// Note: the caller retains ownership of the passed channel and must ensure that
// the channel outlives the lifetime of the returned future.
template <typename T>
Future<FVoid> WriteChannelFuture(Channel<T>* channel, T item) {
class WriteImpl : public FuturePollable<FVoid> {
public:
WriteImpl(Channel<T>* writer, T to_write)
: writer_(writer), to_write_(std::move(to_write)) {}
FuturePollResult<FVoid> Poll(PollContext* ctx) override {
auto res = writer_->WriteNonBlocking(std::move(to_write_));
PERFETTO_CHECK(!res.is_closed);
if (!res.success) {
ctx->RegisterInterested(writer_->write_fd());
return PendingPollResult();
}
return FVoid();
}
private:
Channel<T>* writer_ = nullptr;
T to_write_;
};
return MakeFuture<WriteImpl>(channel, std::move(item));
}
// Creates a Stream<T> which yields the result of executing |fn| on |pool|
// repeatedly. The returned stream only completes when |fn| returns
// base::nullopt.
//
// The intended usage of this function is to schedule CPU intensive work on a
// background thread pool and receive regular "updates" on the progress by:
// a) breaking the work into chunks
// b) returning some indication of progress/partial results through |T|.
template <typename T>
Stream<T> RunOnThreadPool(ThreadPool* pool,
std::function<base::Optional<T>()> fn) {
class RunOnPoolImpl : public StreamPollable<T> {
public:
explicit RunOnPoolImpl(ThreadPool* pool,
std::function<base::Optional<T>()> fn)
: pool_(pool),
fn_(std::make_shared<std::function<base::Optional<T>()>>(
std::move(fn))),
channel_(new Channel<T>(1)),
channel_stream_(ReadChannelStream(channel_.get())) {
RunFn();
}
StreamPollResult<T> PollNext(PollContext* ctx) override {
ASSIGN_OR_RETURN_IF_PENDING_STREAM(res, channel_stream_.PollNext(ctx));
if (res.IsDone()) {
return DonePollResult();
}
RunFn();
return res;
}
private:
void RunFn() {
pool_->PostTask([channel = channel_, fn = fn_]() {
auto opt_value = (*fn)();
if (!opt_value) {
channel->Close();
return;
}
auto write_res =
channel->WriteNonBlocking(std::move(opt_value.value()));
PERFETTO_CHECK(write_res.success);
PERFETTO_CHECK(!write_res.is_closed);
});
}
ThreadPool* pool_ = nullptr;
std::shared_ptr<std::function<base::Optional<T>()>> fn_;
std::shared_ptr<Channel<T>> channel_;
base::Stream<T> channel_stream_;
};
return MakeStream<RunOnPoolImpl>(pool, std::move(fn));
}
// Creates a Future<T> which yields the result of executing |fn| on |pool|. The
// returned completes with the return value of |fn|.
//
// The intended usage of this function is to schedule CPU intensive work on a
// background thread pool and have the result returned when available.
template <typename T>
Future<T> RunOnceOnThreadPool(ThreadPool* pool, std::function<T()> fn) {
return RunOnThreadPool<T>(
pool,
[done = false, fn = std::move(fn)]() mutable -> base::Optional<T> {
if (done) {
return base::nullopt;
}
done = true;
return fn();
})
.Collect(base::ToFutureCheckedCollector<T>());
}
} // namespace base
} // namespace perfetto
#endif // INCLUDE_PERFETTO_EXT_BASE_THREADING_UTIL_H_