blob: 35de484163c38368e64a962db7fe19efa8912a61 [file] [log] [blame] [edit]
/*
* Copyright (C) 2017 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <random>
#include <thread>
#include "perfetto/base/build_config.h"
#include "perfetto/ext/base/event_fd.h"
#include "perfetto/ext/base/file_utils.h"
#include "perfetto/ext/base/pipe.h"
#include "perfetto/ext/base/scoped_file.h"
#include "perfetto/ext/base/unix_task_runner.h"
#include "perfetto/ext/base/utils.h"
#include "perfetto/ext/base/waitable_event.h"
#include "test/gtest_and_gmock.h"
namespace perfetto {
namespace base {
namespace {
template <typename TaskRunnerType>
class TaskRunnerTest : public ::testing::Test {
public:
TaskRunnerType task_runner;
};
struct TaskRunnerTestNames {
template <typename T>
static std::string GetName(int) {
if (std::is_same<T, UnixTaskRunner>::value)
return "UnixTaskRunner";
return testing::internal::GetTypeName<T>();
}
};
using TaskRunnerTypes = ::testing::Types<UnixTaskRunner>;
TYPED_TEST_SUITE(TaskRunnerTest, TaskRunnerTypes, TaskRunnerTestNames);
TYPED_TEST(TaskRunnerTest, QuitImmediately) {
this->task_runner.PostTask([&] { this->task_runner.Quit(); });
this->task_runner.Run();
}
TYPED_TEST(TaskRunnerTest, OneTaskFromAnotherThread) {
WaitableEvent task_runner_started;
std::thread t1([&] {
task_runner_started.Wait();
this->task_runner.PostTask([&] { this->task_runner.Quit(); });
});
this->task_runner.PostTask([&] { task_runner_started.Notify(); });
this->task_runner.Run();
t1.join();
}
TYPED_TEST(TaskRunnerTest, PostTaskSimple) {
std::string str;
this->task_runner.PostTask([&str] { str.append("a"); });
this->task_runner.PostTask([&str] { str.append("b"); });
this->task_runner.PostTask([&str] { str.append("c"); });
this->task_runner.PostTask([&str, tr = &this->task_runner] {
tr->PostTask([&str] { str.append("d"); });
tr->PostTask([&str] { str.append("e"); });
tr->PostTask([&str] { str.append("f"); });
tr->PostTask([tr] { tr->Quit(); });
});
this->task_runner.Run();
EXPECT_EQ(str, "abcdef");
}
TYPED_TEST(TaskRunnerTest, ManyTasksPostedBeforeRun) {
constexpr size_t kNumTasks = 10000;
std::function<void()> post_another;
size_t last_task_id = 0;
auto task = [&](size_t n) {
ASSERT_EQ(last_task_id, n - 1);
last_task_id = n;
if (n == kNumTasks)
this->task_runner.Quit();
};
for (size_t i = 1; i <= kNumTasks; i++) {
this->task_runner.PostTask(std::bind(task, i));
}
this->task_runner.Run();
EXPECT_EQ(last_task_id, kNumTasks);
}
TYPED_TEST(TaskRunnerTest, PostImmediateTask) {
auto& task_runner = this->task_runner;
int counter = 0;
task_runner.PostTask([&counter] { counter = (counter << 4) | 1; });
task_runner.PostTask([&counter] { counter = (counter << 4) | 2; });
task_runner.PostTask([&counter] { counter = (counter << 4) | 3; });
task_runner.PostTask([&counter] { counter = (counter << 4) | 4; });
task_runner.PostTask([&task_runner] { task_runner.Quit(); });
task_runner.Run();
EXPECT_EQ(0x1234, counter);
}
TYPED_TEST(TaskRunnerTest, PostDelayedTask) {
std::vector<int> executed_tasks;
this->task_runner.PostDelayedTask(
[&] {
executed_tasks.push_back(5);
this->task_runner.Quit();
},
100);
this->task_runner.PostDelayedTask([&] { executed_tasks.push_back(2); }, 20);
this->task_runner.PostDelayedTask([&] { executed_tasks.push_back(3); }, 20);
this->task_runner.PostDelayedTask([&] { executed_tasks.push_back(4); }, 80);
this->task_runner.PostDelayedTask([&] { executed_tasks.push_back(1); }, 10);
this->task_runner.PostTask([&] {
this->task_runner.AdvanceTimeForTesting(10); // Executes task 1.
});
this->task_runner.PostTask([&] {
this->task_runner.AdvanceTimeForTesting(10); // Executes tasks 2 and 3.
});
this->task_runner.PostTask([&] {
this->task_runner.AdvanceTimeForTesting(60); // Executes task 4.
});
this->task_runner.PostTask([&] {
this->task_runner.AdvanceTimeForTesting(20); // Executes task 5.
});
this->task_runner.Run();
EXPECT_THAT(executed_tasks, ::testing::ElementsAre(1, 2, 3, 4, 5));
}
TYPED_TEST(TaskRunnerTest, PostImmediateTaskFromTask) {
auto& task_runner = this->task_runner;
task_runner.PostTask([&task_runner] {
task_runner.PostTask([&task_runner] { task_runner.Quit(); });
});
task_runner.Run();
}
TYPED_TEST(TaskRunnerTest, PostDelayedTaskFromTask) {
auto& task_runner = this->task_runner;
task_runner.PostTask([&task_runner] {
task_runner.PostDelayedTask([&task_runner] { task_runner.Quit(); }, 10);
});
task_runner.Run();
}
TYPED_TEST(TaskRunnerTest, PostImmediateTaskFromOtherThread) {
auto& task_runner = this->task_runner;
ThreadChecker thread_checker;
int counter = 0;
std::thread thread([&task_runner, &counter, &thread_checker] {
task_runner.PostTask([&thread_checker] {
EXPECT_TRUE(thread_checker.CalledOnValidThread());
});
task_runner.PostTask([&counter] { counter = (counter << 4) | 1; });
task_runner.PostTask([&counter] { counter = (counter << 4) | 2; });
task_runner.PostTask([&counter] { counter = (counter << 4) | 3; });
task_runner.PostTask([&counter] { counter = (counter << 4) | 4; });
task_runner.PostTask([&task_runner] { task_runner.Quit(); });
});
task_runner.Run();
thread.join();
EXPECT_EQ(0x1234, counter);
}
TYPED_TEST(TaskRunnerTest, PostDelayedTaskFromOtherThread) {
auto& task_runner = this->task_runner;
std::thread thread([&task_runner] {
task_runner.PostDelayedTask([&task_runner] { task_runner.Quit(); }, 10);
});
task_runner.Run();
thread.join();
}
TYPED_TEST(TaskRunnerTest, AddFileDescriptorWatch) {
auto& task_runner = this->task_runner;
EventFd evt;
task_runner.AddFileDescriptorWatch(evt.fd(),
[&task_runner] { task_runner.Quit(); });
evt.Notify();
task_runner.Run();
}
TYPED_TEST(TaskRunnerTest, RemoveFileDescriptorWatch) {
auto& task_runner = this->task_runner;
EventFd evt;
evt.Notify();
bool watch_ran = false;
task_runner.AddFileDescriptorWatch(evt.fd(),
[&watch_ran] { watch_ran = true; });
task_runner.RemoveFileDescriptorWatch(evt.fd());
task_runner.PostDelayedTask([&task_runner] { task_runner.Quit(); }, 10);
task_runner.Run();
EXPECT_FALSE(watch_ran);
}
TYPED_TEST(TaskRunnerTest, RemoveFileDescriptorWatchFromTask) {
auto& task_runner = this->task_runner;
EventFd evt;
evt.Notify();
bool watch_ran = false;
task_runner.PostTask([&task_runner, &evt] {
task_runner.RemoveFileDescriptorWatch(evt.fd());
});
task_runner.AddFileDescriptorWatch(evt.fd(),
[&watch_ran] { watch_ran = true; });
task_runner.PostDelayedTask([&task_runner] { task_runner.Quit(); }, 10);
task_runner.Run();
EXPECT_FALSE(watch_ran);
}
TYPED_TEST(TaskRunnerTest, AddFileDescriptorWatchFromAnotherWatch) {
auto& task_runner = this->task_runner;
EventFd evt;
EventFd evt2;
evt.Notify();
evt2.Notify();
task_runner.AddFileDescriptorWatch(evt.fd(), [&task_runner, &evt, &evt2] {
evt.Clear();
task_runner.AddFileDescriptorWatch(evt2.fd(),
[&task_runner] { task_runner.Quit(); });
});
task_runner.Run();
}
TYPED_TEST(TaskRunnerTest, RemoveFileDescriptorWatchFromAnotherWatch) {
auto& task_runner = this->task_runner;
EventFd evt;
EventFd evt2;
evt.Notify();
bool watch_ran = false;
task_runner.AddFileDescriptorWatch(evt.fd(), [&task_runner, &evt, &evt2] {
evt.Clear();
evt2.Notify();
task_runner.RemoveFileDescriptorWatch(evt2.fd());
});
task_runner.AddFileDescriptorWatch(evt2.fd(),
[&watch_ran] { watch_ran = true; });
task_runner.PostDelayedTask([&task_runner] { task_runner.Quit(); }, 10);
task_runner.Run();
EXPECT_FALSE(watch_ran);
}
TYPED_TEST(TaskRunnerTest, ReplaceFileDescriptorWatchFromAnotherWatch) {
auto& task_runner = this->task_runner;
EventFd evt;
EventFd evt2;
bool watch_ran = false;
evt.Notify();
task_runner.AddFileDescriptorWatch(evt.fd(), [&task_runner, &evt, &evt2] {
evt.Clear();
evt2.Notify();
task_runner.RemoveFileDescriptorWatch(evt2.fd());
task_runner.AddFileDescriptorWatch(evt2.fd(),
[&task_runner] { task_runner.Quit(); });
});
task_runner.AddFileDescriptorWatch(evt2.fd(),
[&watch_ran] { watch_ran = true; });
task_runner.Run();
EXPECT_FALSE(watch_ran);
}
TYPED_TEST(TaskRunnerTest, AddFileDescriptorWatchFromAnotherThread) {
auto& task_runner = this->task_runner;
EventFd evt;
evt.Notify();
std::thread thread([&task_runner, &evt] {
task_runner.AddFileDescriptorWatch(evt.fd(),
[&task_runner] { task_runner.Quit(); });
});
task_runner.Run();
thread.join();
}
TYPED_TEST(TaskRunnerTest, FileDescriptorWatchWithMultipleEvents) {
auto& task_runner = this->task_runner;
EventFd evt;
evt.Notify();
int event_count = 0;
task_runner.AddFileDescriptorWatch(
evt.fd(), [&task_runner, &evt, &event_count] {
ASSERT_LT(event_count, 3);
if (++event_count == 3) {
task_runner.Quit();
return;
}
evt.Clear();
task_runner.PostTask([&evt] { evt.Notify(); });
});
task_runner.Run();
}
TYPED_TEST(TaskRunnerTest, PostManyDelayedTasks) {
// Check that PostTask doesn't start failing if there are too many scheduled
// wake-ups.
auto& task_runner = this->task_runner;
for (int i = 0; i < 0x1000; i++)
task_runner.PostDelayedTask([] {}, 0);
task_runner.PostDelayedTask([&task_runner] { task_runner.Quit(); }, 10);
task_runner.Run();
}
TYPED_TEST(TaskRunnerTest, RunAgain) {
auto& task_runner = this->task_runner;
int counter = 0;
task_runner.PostTask([&task_runner, &counter] {
counter++;
task_runner.Quit();
});
task_runner.Run();
task_runner.PostTask([&task_runner, &counter] {
counter++;
task_runner.Quit();
});
task_runner.Run();
EXPECT_EQ(2, counter);
}
template <typename T>
void RepeatingTask(T* task_runner) {
task_runner->PostTask(std::bind(&RepeatingTask<T>, task_runner));
}
TYPED_TEST(TaskRunnerTest, FileDescriptorWatchesNotStarved) {
auto& task_runner = this->task_runner;
EventFd evt;
evt.Notify();
task_runner.PostTask(std::bind(&RepeatingTask<TypeParam>, &task_runner));
task_runner.AddFileDescriptorWatch(evt.fd(),
[&task_runner] { task_runner.Quit(); });
task_runner.Run();
}
template <typename T>
void CountdownTask(T* task_runner, int* counter) {
if (!--(*counter)) {
task_runner->Quit();
return;
}
task_runner->PostDelayedTask(
std::bind(&CountdownTask<T>, task_runner, counter), 1);
}
TYPED_TEST(TaskRunnerTest, NoDuplicateFileDescriptorWatchCallbacks) {
auto& task_runner = this->task_runner;
EventFd evt;
evt.Notify();
bool watch_called = 0;
int counter = 10;
task_runner.AddFileDescriptorWatch(evt.fd(), [&evt, &watch_called] {
ASSERT_FALSE(watch_called);
evt.Clear();
watch_called = true;
});
task_runner.PostTask(
std::bind(&CountdownTask<TypeParam>, &task_runner, &counter));
task_runner.Run();
}
TYPED_TEST(TaskRunnerTest, ReplaceFileDescriptorWatchFromOtherThread) {
auto& task_runner = this->task_runner;
EventFd evt;
evt.Notify();
// The two watch tasks here race each other. We don't particularly care which
// wins as long as one of them runs.
task_runner.AddFileDescriptorWatch(evt.fd(),
[&task_runner] { task_runner.Quit(); });
std::thread thread([&task_runner, &evt] {
task_runner.RemoveFileDescriptorWatch(evt.fd());
task_runner.AddFileDescriptorWatch(evt.fd(),
[&task_runner] { task_runner.Quit(); });
});
task_runner.Run();
thread.join();
}
TYPED_TEST(TaskRunnerTest, IsIdleForTesting) {
auto& task_runner = this->task_runner;
// This first task fails because by the time we get to Run(), there is another
// one (below) queued up already.
task_runner.PostTask(
[&task_runner] { EXPECT_FALSE(task_runner.IsIdleForTesting()); });
// This one succeeds because it's the last one and there is no further task.
task_runner.PostTask([&task_runner] {
EXPECT_TRUE(task_runner.IsIdleForTesting());
task_runner.Quit();
});
task_runner.Run();
}
// Covers a corner cases that TestTaskRunner::RunUntilIdle relies on:
// IsIdleForTesting() is supposed to check for all type of upcoming tasks,
// including FD watches. This is to check that the TaskRunner implementation
// doesn't have off-by one behaviours where the FD watch is only observed on
// the next task.
// It's debatable on whether we need to preserve this behaviour in production
// code, if we assume FDs are unpredictable events and we shouldn't expect
// timing correlations with current tasks.
TYPED_TEST(TaskRunnerTest, IsIdleForTesting_WithFd) {
auto& task_runner = this->task_runner;
EventFd efd;
bool efd_observed = false;
// This will fail the IsIdleForTesting() check because by the time we get
// to run, the eventfd is notified.
task_runner.PostTask(
[&task_runner] { EXPECT_FALSE(task_runner.IsIdleForTesting()); });
task_runner.AddFileDescriptorWatch(efd.fd(), [&] {
efd.Clear();
efd_observed = true;
task_runner.PostTask([&task_runner] {
EXPECT_TRUE(task_runner.IsIdleForTesting());
task_runner.Quit();
});
});
efd.Notify();
task_runner.Run();
EXPECT_TRUE(efd_observed);
}
TYPED_TEST(TaskRunnerTest, RunsTasksOnCurrentThread) {
auto& main_tr = this->task_runner;
EXPECT_TRUE(main_tr.RunsTasksOnCurrentThread());
std::thread thread([&main_tr] {
TypeParam second_tr;
second_tr.PostTask([&main_tr, &second_tr] {
EXPECT_FALSE(main_tr.RunsTasksOnCurrentThread());
EXPECT_TRUE(second_tr.RunsTasksOnCurrentThread());
second_tr.Quit();
});
second_tr.Run();
});
main_tr.PostTask([&]() { main_tr.Quit(); });
main_tr.Run();
thread.join();
}
TYPED_TEST(TaskRunnerTest, FileDescriptorWatchFairness) {
auto& task_runner = this->task_runner;
EventFd evt[5];
std::map<PlatformHandle, int /*num_tasks*/> num_tasks;
static constexpr int kNumTasksPerHandle = 100;
for (auto& e : evt) {
e.Notify();
task_runner.AddFileDescriptorWatch(e.fd(), [&] {
if (++num_tasks[e.fd()] == kNumTasksPerHandle) {
e.Clear();
task_runner.Quit();
}
});
}
task_runner.Run();
// The sequence evt[0], evt[1], evt[2] should be repeated N times. On the
// Nth time the task runner quits. All tasks should have been running at least
// N-1 times (we can't predict which one of the tasks will quit).
for (auto& e : evt) {
ASSERT_GE(num_tasks[e.fd()], kNumTasksPerHandle - 1);
ASSERT_LE(num_tasks[e.fd()], kNumTasksPerHandle);
}
}
#if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
// This tests UNIX-specific behavior on pipe closure.
TYPED_TEST(TaskRunnerTest, FileDescriptorClosedEvent) {
auto& task_runner = this->task_runner;
Pipe pipe = Pipe::Create();
pipe.wr.reset();
task_runner.AddFileDescriptorWatch(pipe.rd.get(),
[&task_runner] { task_runner.Quit(); });
task_runner.Run();
}
#endif
TYPED_TEST(TaskRunnerTest, MultiThreadedStress) {
constexpr size_t kNumThreads = 4;
constexpr size_t kNumTasksPerThread = 1000;
constexpr size_t kTotalTasks = kNumThreads * kNumTasksPerThread;
std::atomic<size_t> tasks_posted{};
std::array<size_t, kNumThreads> last_task_received{};
auto task_fn = [&](size_t thread_id, size_t task_num) {
ASSERT_EQ(last_task_received[thread_id], task_num);
++last_task_received[thread_id];
};
auto thread_fn = [&](size_t thread_id) {
std::minstd_rand0 rnd{};
size_t task_seq = 0;
for (;;) {
int num_subtasks = std::uniform_int_distribution<int>(1, 32)(rnd);
for (int i = 0; i < num_subtasks; ++i) {
this->task_runner.PostTask(std::bind(task_fn, thread_id, task_seq));
if (tasks_posted.fetch_add(1, std::memory_order_relaxed) ==
kTotalTasks - 1) {
this->task_runner.PostTask([&] { this->task_runner.Quit(); });
}
if (++task_seq >= kNumTasksPerThread) {
return;
}
}
std::this_thread::yield();
}
};
std::array<std::thread, kNumThreads> threads{};
for (size_t i = 0; i < kNumThreads; ++i) {
threads[i] = std::thread(std::bind(thread_fn, i));
}
this->task_runner.Run();
for (auto& thread : threads) {
thread.join();
}
EXPECT_EQ(tasks_posted.load(), kTotalTasks);
}
} // namespace
} // namespace base
} // namespace perfetto