blob: 7b5136f783c5c6581ac3444474ed7a51c22039d4 [file] [log] [blame] [edit]
/*
* Copyright (C) 2025 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law of an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "perfetto/ext/base/lock_free_task_runner.h"
#include "perfetto/base/build_config.h"
#if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
#include <windows.h>
// Keep the \n before to prevent clang-format reordering.
#include <synchapi.h>
#else
#include <poll.h>
#include <unistd.h>
#endif
#include <thread>
#include "perfetto/base/logging.h"
#include "perfetto/base/time.h"
#include "perfetto/ext/base/bits.h"
#include "perfetto/ext/base/platform.h"
#include "perfetto/ext/base/utils.h"
#include "perfetto/ext/base/watchdog.h"
namespace perfetto {
namespace base {
namespace task_runner_internal {
Slab::Slab() = default;
Slab::~Slab() {
PERFETTO_DCHECK(!prev); // We should never delete linked slabs.
}
} // namespace task_runner_internal
namespace {
static constexpr auto kSlabSize = task_runner_internal::kSlabSize;
}
LockFreeTaskRunner::LockFreeTaskRunner()
: run_task_thread_id_(std::this_thread::get_id()) {
static_assert((kSlabSize & (kSlabSize - 1)) == 0, "kSlabSize must be a pow2");
static_assert(kSlabSize >= Slab::kBitsPerWord);
// Populate both the tail and the freelist, to minimize the chance of
// allocations at runtime.
tail_.store(AllocNewSlab());
free_slab_.store(AllocNewSlab());
AddFileDescriptorWatch(wakeup_event_.fd(), [] {
// Not reached -- see PostFileDescriptorWatches().
PERFETTO_DFATAL("Should be unreachable.");
});
}
LockFreeTaskRunner::~LockFreeTaskRunner() {
PERFETTO_DCHECK(RunsTasksOnCurrentThread());
for (Slab* slab = tail_.exchange(nullptr); slab;) {
Slab* prev = slab->prev;
slab->prev = nullptr;
delete slab;
slab = prev;
}
delete free_slab_.exchange(nullptr);
}
void LockFreeTaskRunner::PostTask(std::function<void()> closure) {
// We use nullity of std::function in PopTaskRecursive() to determine exit
// criteria. Posting a null task would break that logic. Also a null task
// would cause an abort when trying to run it later on.
PERFETTO_CHECK(PERFETTO_LIKELY(closure));
using BitWord = Slab::BitWord;
for (;;) {
Slab* slab = tail_.load();
PERFETTO_DCHECK(slab); // The tail_ must be always valid.
ScopedRefcount scoped_refcount(this, slab);
// Now that we have a slab, try appending a task to it (if there is space).
// We have 3 cases:
// 1. slot < kSlabSize: the nominal case. Append the task and return.
// 2. slot == kSlabSize: the common overflow case: The slab was full and we
// tried to allocate the N+1 th element. We have to allocate a new Slab.
// 3. slot > kSlabSize: like 2, but but two (or more) threads raced on it.
// One thread will win the race and alloc a new slab, the other one will
// repeat.
size_t slot = slab->next_task_slot.fetch_add(1, std::memory_order_relaxed);
if (slot >= kSlabSize) { // Cases 2,3
Slab* new_slab = AllocNewSlab();
new_slab->prev = slab;
new_slab->next_task_slot.store(1, std::memory_order_relaxed);
slot = 0;
if (PERFETTO_UNLIKELY(!tail_.compare_exchange_strong(slab, new_slab))) {
// If the cmpxcgh fails, another thread tried to allocate a new tail
// slab and won the race. Do another round, we'll observe the new slab.
// We have to release the prev pointer as at this point we found out
// another thread also has it.
new_slab->prev = nullptr;
DeleteSlab(new_slab);
continue;
}
slab = new_slab;
scoped_refcount = ScopedRefcount(this, new_slab);
}
// Nominal case: publish the task and return.
PERFETTO_DCHECK(!slab->tasks[slot]);
slab->tasks[slot] = std::move(closure);
size_t s_word = slot / Slab::kBitsPerWord;
size_t s_bit = slot % Slab::kBitsPerWord;
BitWord s_mask = BitWord(1) << s_bit;
PERFETTO_DCHECK(
(slab->tasks_written[s_word].load(std::memory_order_relaxed) &
s_mask) == 0);
slab->tasks_written[s_word].fetch_or(s_mask, std::memory_order_release);
if (!RunsTasksOnCurrentThread()) {
// We don't need any clever logic to avoid spurious wake ups from other
// threads. Most PostTask()s in our codebase are done by the main thread.
// In the rare cases of a PostTask() coming from another thread, the odds
// of the main thread being woken up at the same time are tiny.
WakeUp();
}
return;
}
}
void LockFreeTaskRunner::Run() {
PERFETTO_CHECK(run_task_thread_id_ == std::this_thread::get_id());
quit_ = false;
while (!quit_) {
// Pop one immediate and one delayed task, for fairness.
std::function<void()> immediate_task = PopNextImmediateTask();
std::function<void()> delayed_task = PopNextExpiredDelayedTask();
bool has_task = immediate_task || delayed_task;
int poll_timeout_ms = has_task ? 0 : GetDelayMsToNextTask();
// run the poll(). We need it for two different reasons:
// 1. Blocks until the next event on the horizon, which:
// - If we pulled a task, poll in non-blocking mode (0 delay). There
// might be other tasks ready and we want to run them soon after.
// - If there is a delayed task, compute the time remaining for it.
// - Otherwise polls indefinitely, waiting for a PostTask() or a Quit()
// call from another thread.
// 2. Regardless of timing, we need to read the FD watches.
// We want to do this even if we know already that we have an immediate
// task (when poll_timeout_ms = 0) to ensure fairness.
// TODO(primiano): we could optimize this and avoid a syscall for each
// task when we have bursts of tasks (with some saturation, e.g. do still
// a poll() every 10 tasks). Unfortunately, however, some users of
// TestTaskRunner.RunUntilIdle() rely on the fact that FD watches are
// polled before the current task is ran, so that the task can tell if
// there are more tasks upcoming or it has reached quiescence.
// TL;DR RunUntilIdle() was a mistake as it has ill-defined semantics but
// now tests rely on those subtle semantics.
// Recompute the list of FDs to watch.
UpdateWatchTasks();
uint64_t windows_wait_res = 0;
#if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
// Unlike poll(2), WaitForMultipleObjects() returns only *one* handle in the
// set, even when >1 is signalled. In order to avoid starvation,
// PostFileDescriptorWatches() will WaitForSingleObject() each other handle
// to ensure fairness. |windows_wait_res| is passed just to avoid an extra
// WaitForSingleObject() for the one handle that WaitForMultipleObject()
// returned.
DWORD timeout =
poll_timeout_ms >= 0 ? static_cast<DWORD>(poll_timeout_ms) : INFINITE;
windows_wait_res =
WaitForMultipleObjects(static_cast<DWORD>(poll_fds_.size()),
&poll_fds_[0], /*bWaitAll=*/false, timeout);
#else
platform::BeforeMaybeBlockingSyscall();
int ret = PERFETTO_EINTR(poll(
&poll_fds_[0], static_cast<nfds_t>(poll_fds_.size()), poll_timeout_ms));
platform::AfterMaybeBlockingSyscall();
PERFETTO_CHECK(ret >= 0);
#endif
PostFileDescriptorWatches(windows_wait_res);
if (immediate_task) {
errno = 0;
RunTaskWithWatchdogGuard(std::move(immediate_task));
}
if (delayed_task) {
errno = 0;
RunTaskWithWatchdogGuard(std::move(delayed_task));
}
}
// Wait for all other threads to have finished the Quit's PostTask().
// This is to prevent the following race in tests:
// - Thread1 (!= main thread) invokes Quit, which in turn becomes a PostTask.
// - This function sees quit_=true and returns from Run().
// - The owner of the LFTR at that point is entitled to destroy LFTR.
// - Thread1 is still executing the epilogue of the PostTask, decrementing the
// refcount, and ends up operating on invalid memory.
while (
std::any_of(refcounts_.begin(), refcounts_.end(),
[](std::atomic<int32_t>& bucket) { return bucket.load(); })) {
std::this_thread::yield();
}
}
std::function<void()> LockFreeTaskRunner::PopNextImmediateTask() {
return PopTaskRecursive(tail_.load(), nullptr);
}
std::function<void()> LockFreeTaskRunner::PopTaskRecursive(Slab* slab,
Slab* next_slab) {
PERFETTO_DCHECK(RunsTasksOnCurrentThread());
Slab* prev = slab->prev;
if (PERFETTO_UNLIKELY(prev)) {
// In practice it's extemely unlikely that a slab has >1 predecessors.
// In nominal conditions it is going to have 0 predecessors most of the
// times and 1 predecessors 1 every kSlabSize times.
auto task = PopTaskRecursive(prev, slab);
if (task)
return task;
}
size_t words_fully_consumed = 0;
for (size_t w = 0; w < Slab::kNumWords; ++w) {
using BitWord = Slab::BitWord;
BitWord wr_word = slab->tasks_written[w].load(std::memory_order_acquire);
BitWord rd_word = slab->tasks_read[w];
words_fully_consumed += base::AllBitsSet(rd_word) ? 1 : 0;
BitWord unread_word = wr_word & ~rd_word;
if (unread_word == 0)
continue;
// Find the first unread task in the word.
uint32_t bit = base::CountTrailZeros(unread_word);
BitWord bit_mask = BitWord(1) << bit;
size_t slot = w * Slab::kBitsPerWord + bit;
std::function<void()> task = std::move(slab->tasks[slot]);
slab->tasks[slot] = nullptr;
slab->tasks_read[w] |= bit_mask;
return task;
} // for(word in tasks_written)
// There are no unconsumed tasks in this Slab. Reached this point, this
// invocation will return null. However, before doing so, if the slab is fully
// written (are no slots left) and fully consumed delete it.
// We never delete the latest slab (the one pointed to by tail_) because
// we want the tail_ to always point to a valid slab.
// In principle, we only delete non-tail slabs, and PostTask() threads only
// access the tail, never walk back on the list, which sounds safe. However
// there is a potential race we have to guard against:
// A PostTask() thread could observe Slab 1 (e.g., the only slab), then
// another PostTask thread() could append (and replace tail_ with) a new
// Slab 2 (because Slab 1 was full).
// If Run() comes along and sees the Slab 1 full it will delete it, without
// realizing that the first thread is still accessing it, thinking it was the
// tail.
// Essentially, it is safe to delete non-tail slabs as long as we can
// guarrantee that no other thread observed that slab through the `tail_` at
// some point in the recent past. In order to do so, we use a refcount
// mechanism. If the refcount is 0 we know that no other PostTask thread
// has loaded the tail, and hence we can safely delete any non-tail slab.
// If a PostTask thread comes immediately after our check they will observe
// the new tail (we never delete the tail slab, only the predecessors).
bool slab_fully_consumed = words_fully_consumed == Slab::kNumWords;
const uint32_t bucket = HashSlabPtr(slab);
if (slab_fully_consumed && next_slab && refcounts_[bucket].load() == 0) {
// NOTE: only the main thread follows the `prev` linked list, writers never
// look at `prev`. The only contention entrypoint is the `tail_` pointer,
// which can be modified both by us and by writers.
PERFETTO_DCHECK(next_slab->prev == slab);
next_slab->prev = slab->prev;
slab->prev = nullptr;
DeleteSlab(slab);
}
return nullptr;
}
void LockFreeTaskRunner::Quit() {
// We have to post the writing of the quit if we are not on the right thread.
// The alternative would be writing quit_ + WakeUp(), but that is racy:
// - We write quit_=true from another thread
// - The main thread sees it immediately (it just happens to be iterating)
// and destroys itself.
// - We try to call WakeUp() while the main thread is being destroyed.
if (!RunsTasksOnCurrentThread()) {
PostTask([this] { this->Quit(); });
return;
}
quit_ = true;
}
bool LockFreeTaskRunner::IsIdleForTesting() {
PERFETTO_DCHECK(RunsTasksOnCurrentThread());
for (Slab* slab = tail_; slab; slab = slab->prev) {
for (size_t i = 0; i < Slab::kNumWords; ++i) {
if (slab->tasks_written[i] & ~slab->tasks_read[i]) {
return false;
}
}
}
return true;
}
std::function<void()> LockFreeTaskRunner::PopNextExpiredDelayedTask() {
PERFETTO_DCHECK(RunsTasksOnCurrentThread());
TimeMillis now =
GetWallTimeMs() +
TimeMillis(advanced_time_for_testing_.load(std::memory_order_relaxed));
if (!delayed_tasks_.empty() && delayed_tasks_.back().time <= now) {
auto task = std::move(delayed_tasks_.back().task);
delayed_tasks_.pop_back();
return task;
}
return nullptr;
}
int LockFreeTaskRunner::GetDelayMsToNextTask() const {
PERFETTO_DCHECK(RunsTasksOnCurrentThread());
if (delayed_tasks_.empty()) {
return -1; // Forever.
}
TimeMillis now =
GetWallTimeMs() +
TimeMillis(advanced_time_for_testing_.load(std::memory_order_relaxed));
TimeMillis deadline = delayed_tasks_.back().time;
if (deadline <= now) {
return 0;
}
return static_cast<int>((deadline - now).count());
}
LockFreeTaskRunner::Slab* LockFreeTaskRunner::AllocNewSlab() {
Slab* free_slab = free_slab_.exchange(nullptr);
if (free_slab) {
free_slab->~Slab();
new (free_slab) Slab();
return free_slab;
}
slabs_allocated_.fetch_add(1, std::memory_order_relaxed);
return new Slab();
}
void LockFreeTaskRunner::DeleteSlab(Slab* slab) {
PERFETTO_DCHECK(!slab->prev); // We should never delete a linked slab.
Slab* null_slab = nullptr;
if (!free_slab_.compare_exchange_strong(null_slab, slab)) {
slabs_freed_.fetch_add(1, std::memory_order_relaxed);
delete slab;
}
}
void LockFreeTaskRunner::PostDelayedTask(std::function<void()> task,
uint32_t delay_ms) {
if (!RunsTasksOnCurrentThread()) {
PostTask([this, task = std::move(task), delay_ms] {
this->PostDelayedTask(std::move(task), delay_ms);
});
return;
}
TimeMillis runtime =
GetWallTimeMs() + TimeMillis(delay_ms) +
TimeMillis(advanced_time_for_testing_.load(std::memory_order_relaxed));
delayed_tasks_.insert(
DelayedTask{runtime, next_delayed_task_seq_++, std::move(task)});
}
void LockFreeTaskRunner::AdvanceTimeForTesting(uint32_t ms) {
advanced_time_for_testing_.fetch_add(ms);
WakeUp();
}
void LockFreeTaskRunner::PostFileDescriptorWatches(
uint64_t windows_wait_result) {
PERFETTO_DCHECK(RunsTasksOnCurrentThread());
for (size_t i = 0; i < poll_fds_.size(); i++) {
#if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
const PlatformHandle handle = poll_fds_[i];
// |windows_wait_result| is the result of WaitForMultipleObjects() call. If
// one of the objects was signalled, it will have a value between
// [0, poll_fds_.size()].
if (i != windows_wait_result &&
WaitForSingleObject(handle, 0) != WAIT_OBJECT_0) {
continue;
}
#else
base::ignore_result(windows_wait_result);
const PlatformHandle handle = poll_fds_[i].fd;
if (!(poll_fds_[i].revents & (POLLIN | POLLHUP)))
continue;
poll_fds_[i].revents = 0;
#endif
// The wake-up event is handled inline to avoid an infinite recursion of
// posted tasks.
if (handle == wakeup_event_.fd()) {
wakeup_event_.Clear();
continue;
}
// Binding to |this| is safe since we are the only object executing the
// task.
PostTask([this, handle] { this->RunFileDescriptorWatch(handle); });
// Flag the task as pending.
#if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
// On Windows this is done by marking the WatchTask entry as pending. This
// is more expensive than Linux as requires rebuilding the |poll_fds_|
// vector on each call. There doesn't seem to be a good alternative though.
auto it = watch_tasks_.find(handle);
PERFETTO_CHECK(it != watch_tasks_.end());
PERFETTO_DCHECK(!it->second.pending);
it->second.pending = true;
#else
// On UNIX systems instead, we just make the fd negative while its task is
// pending. This makes poll(2) ignore the fd.
PERFETTO_DCHECK(poll_fds_[i].fd >= 0);
poll_fds_[i].fd = -poll_fds_[i].fd;
#endif
}
}
void LockFreeTaskRunner::RunFileDescriptorWatch(PlatformHandle fd) {
PERFETTO_DCHECK(RunsTasksOnCurrentThread());
std::function<void()> task;
auto it = watch_tasks_.find(fd);
if (it == watch_tasks_.end())
return;
WatchTask& watch_task = it->second;
// Make poll(2) pay attention to the fd again. Since another thread may have
// updated this watch we need to refresh the set first.
UpdateWatchTasks();
#if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
// On Windows we manually track the presence of outstanding tasks for the
// watch. The UpdateWatchTasksLocked() in the Run() loop will re-add the
// task to the |poll_fds_| vector.
PERFETTO_DCHECK(watch_task.pending);
watch_task.pending = false;
#else
size_t fd_index = watch_task.poll_fd_index;
PERFETTO_DCHECK(fd_index < poll_fds_.size());
PERFETTO_DCHECK(::abs(poll_fds_[fd_index].fd) == fd);
poll_fds_[fd_index].fd = fd;
#endif
task = watch_task.callback;
errno = 0;
RunTaskWithWatchdogGuard(task);
}
void LockFreeTaskRunner::UpdateWatchTasks() {
PERFETTO_DCHECK(RunsTasksOnCurrentThread());
#if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
if (!watch_tasks_changed_)
return;
watch_tasks_changed_ = false;
#endif
poll_fds_.clear();
for (auto& it : watch_tasks_) {
PlatformHandle handle = it.first;
WatchTask& watch_task = it.second;
#if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
if (!watch_task.pending)
poll_fds_.push_back(handle);
#else
watch_task.poll_fd_index = poll_fds_.size();
poll_fds_.push_back({handle, POLLIN | POLLHUP, 0});
#endif
}
}
void LockFreeTaskRunner::AddFileDescriptorWatch(PlatformHandle fd,
std::function<void()> task) {
PERFETTO_DCHECK(PlatformHandleChecker::IsValid(fd));
if (!RunsTasksOnCurrentThread()) {
PostTask([this, task = std::move(task), fd] {
this->AddFileDescriptorWatch(fd, std::move(task));
});
return;
}
PERFETTO_DCHECK(!watch_tasks_.count(fd));
WatchTask& watch_task = watch_tasks_[fd];
watch_task.callback = std::move(task);
#if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
watch_task.pending = false;
#else
watch_task.poll_fd_index = SIZE_MAX;
#endif
watch_tasks_changed_ = true;
}
void LockFreeTaskRunner::RemoveFileDescriptorWatch(PlatformHandle fd) {
if (!RunsTasksOnCurrentThread()) {
PostTask([this, fd] { this->RemoveFileDescriptorWatch(fd); });
return;
}
PERFETTO_DCHECK(watch_tasks_.count(fd));
watch_tasks_.erase(fd);
watch_tasks_changed_ = true;
}
bool LockFreeTaskRunner::RunsTasksOnCurrentThread() const {
return run_task_thread_id_ == std::this_thread::get_id();
}
} // namespace base
} // namespace perfetto