blob: ace1985c24a156a6b4c824a69d9c119af1041164 [file] [log] [blame]
// Copyright 2013 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "flutter/fml/concurrent_message_loop.h"
#include <algorithm>
#include "flutter/fml/thread.h"
#include "flutter/fml/trace_event.h"
namespace fml {
ConcurrentMessageLoop::ConcurrentMessageLoop(size_t worker_count)
: worker_count_(std::max<size_t>(worker_count, 1ul)) {
for (size_t i = 0; i < worker_count_; ++i) {
workers_.emplace_back([i, this]() {
fml::Thread::SetCurrentThreadName(fml::Thread::ThreadConfig(
std::string{"io.worker." + std::to_string(i + 1)}));
WorkerMain();
});
}
for (const auto& worker : workers_) {
worker_thread_ids_.emplace_back(worker.get_id());
}
}
ConcurrentMessageLoop::~ConcurrentMessageLoop() {
Terminate();
for (auto& worker : workers_) {
FML_DCHECK(worker.joinable());
worker.join();
}
}
size_t ConcurrentMessageLoop::GetWorkerCount() const {
return worker_count_;
}
std::shared_ptr<ConcurrentTaskRunner> ConcurrentMessageLoop::GetTaskRunner() {
return std::make_shared<ConcurrentTaskRunner>(weak_from_this());
}
void ConcurrentMessageLoop::PostTask(const fml::closure& task) {
if (!task) {
return;
}
std::unique_lock lock(tasks_mutex_);
// Don't just drop tasks on the floor in case of shutdown.
if (shutdown_) {
FML_DLOG(WARNING)
<< "Tried to post a task to shutdown concurrent message "
"loop. The task will be executed on the callers thread.";
lock.unlock();
ExecuteTask(task);
return;
}
tasks_.push(task);
// Unlock the mutex before notifying the condition variable because that mutex
// has to be acquired on the other thread anyway. Waiting in this scope till
// it is acquired there is a pessimization.
lock.unlock();
tasks_condition_.notify_one();
}
void ConcurrentMessageLoop::WorkerMain() {
while (true) {
std::unique_lock lock(tasks_mutex_);
tasks_condition_.wait(lock, [&]() {
return !tasks_.empty() || shutdown_ || HasThreadTasksLocked();
});
// Shutdown cannot be read with the task mutex unlocked.
bool shutdown_now = shutdown_;
fml::closure task;
std::vector<fml::closure> thread_tasks;
if (!tasks_.empty()) {
task = tasks_.front();
tasks_.pop();
}
if (HasThreadTasksLocked()) {
thread_tasks = GetThreadTasksLocked();
FML_DCHECK(!HasThreadTasksLocked());
}
// Don't hold onto the mutex while tasks are being executed as they could
// themselves try to post more tasks to the message loop.
lock.unlock();
TRACE_EVENT0("flutter", "ConcurrentWorkerWake");
// Execute the primary task we woke up for.
if (task) {
ExecuteTask(task);
}
// Execute any thread tasks.
for (const auto& thread_task : thread_tasks) {
ExecuteTask(thread_task);
}
if (shutdown_now) {
break;
}
}
}
void ConcurrentMessageLoop::ExecuteTask(const fml::closure& task) {
task();
}
void ConcurrentMessageLoop::Terminate() {
std::scoped_lock lock(tasks_mutex_);
shutdown_ = true;
tasks_condition_.notify_all();
}
void ConcurrentMessageLoop::PostTaskToAllWorkers(const fml::closure& task) {
if (!task) {
return;
}
std::scoped_lock lock(tasks_mutex_);
for (const auto& worker_thread_id : worker_thread_ids_) {
thread_tasks_[worker_thread_id].emplace_back(task);
}
tasks_condition_.notify_all();
}
bool ConcurrentMessageLoop::HasThreadTasksLocked() const {
return thread_tasks_.count(std::this_thread::get_id()) > 0;
}
std::vector<fml::closure> ConcurrentMessageLoop::GetThreadTasksLocked() {
auto found = thread_tasks_.find(std::this_thread::get_id());
FML_DCHECK(found != thread_tasks_.end());
std::vector<fml::closure> pending_tasks;
std::swap(pending_tasks, found->second);
thread_tasks_.erase(found);
return pending_tasks;
}
ConcurrentTaskRunner::ConcurrentTaskRunner(
std::weak_ptr<ConcurrentMessageLoop> weak_loop)
: weak_loop_(std::move(weak_loop)) {}
ConcurrentTaskRunner::~ConcurrentTaskRunner() = default;
void ConcurrentTaskRunner::PostTask(const fml::closure& task) {
if (!task) {
return;
}
if (auto loop = weak_loop_.lock()) {
loop->PostTask(task);
return;
}
FML_DLOG(WARNING)
<< "Tried to post to a concurrent message loop that has already died. "
"Executing the task on the callers thread.";
task();
}
bool ConcurrentMessageLoop::RunsTasksOnCurrentThread() {
std::scoped_lock lock(tasks_mutex_);
for (const auto& worker_thread_id : worker_thread_ids_) {
if (worker_thread_id == std::this_thread::get_id()) {
return true;
}
}
return false;
}
} // namespace fml