| // Copyright 2013 The Flutter Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "flutter/fml/concurrent_message_loop.h" |
| |
| #include <algorithm> |
| |
| #include "flutter/fml/thread.h" |
| #include "flutter/fml/trace_event.h" |
| |
| namespace fml { |
| |
| std::shared_ptr<ConcurrentMessageLoop> ConcurrentMessageLoop::Create( |
| size_t worker_count) { |
| return std::shared_ptr<ConcurrentMessageLoop>{ |
| new ConcurrentMessageLoop(worker_count)}; |
| } |
| |
| ConcurrentMessageLoop::ConcurrentMessageLoop(size_t worker_count) |
| : worker_count_(std::max<size_t>(worker_count, 1ul)) { |
| for (size_t i = 0; i < worker_count_; ++i) { |
| workers_.emplace_back([i, this]() { |
| fml::Thread::SetCurrentThreadName( |
| std::string{"io.worker." + std::to_string(i + 1)}); |
| WorkerMain(); |
| }); |
| } |
| |
| for (const auto& worker : workers_) { |
| worker_thread_ids_.emplace_back(worker.get_id()); |
| } |
| } |
| |
| ConcurrentMessageLoop::~ConcurrentMessageLoop() { |
| Terminate(); |
| for (auto& worker : workers_) { |
| worker.join(); |
| } |
| } |
| |
| size_t ConcurrentMessageLoop::GetWorkerCount() const { |
| return worker_count_; |
| } |
| |
| std::shared_ptr<ConcurrentTaskRunner> ConcurrentMessageLoop::GetTaskRunner() { |
| return std::make_shared<ConcurrentTaskRunner>(weak_from_this()); |
| } |
| |
| void ConcurrentMessageLoop::PostTask(const fml::closure& task) { |
| if (!task) { |
| return; |
| } |
| |
| std::unique_lock lock(tasks_mutex_); |
| |
| // Don't just drop tasks on the floor in case of shutdown. |
| if (shutdown_) { |
| FML_DLOG(WARNING) |
| << "Tried to post a task to shutdown concurrent message " |
| "loop. The task will be executed on the callers thread."; |
| lock.unlock(); |
| task(); |
| return; |
| } |
| |
| tasks_.push(task); |
| |
| // Unlock the mutex before notifying the condition variable because that mutex |
| // has to be acquired on the other thread anyway. Waiting in this scope till |
| // it is acquired there is a pessimization. |
| lock.unlock(); |
| |
| tasks_condition_.notify_one(); |
| } |
| |
| void ConcurrentMessageLoop::WorkerMain() { |
| while (true) { |
| std::unique_lock lock(tasks_mutex_); |
| tasks_condition_.wait(lock, [&]() { |
| return tasks_.size() > 0 || shutdown_ || HasThreadTasksLocked(); |
| }); |
| |
| // Shutdown cannot be read with the task mutex unlocked. |
| bool shutdown_now = shutdown_; |
| fml::closure task; |
| std::vector<fml::closure> thread_tasks; |
| |
| if (tasks_.size() != 0) { |
| task = tasks_.front(); |
| tasks_.pop(); |
| } |
| |
| if (HasThreadTasksLocked()) { |
| thread_tasks = GetThreadTasksLocked(); |
| FML_DCHECK(!HasThreadTasksLocked()); |
| } |
| |
| // Don't hold onto the mutex while tasks are being executed as they could |
| // themselves try to post more tasks to the message loop. |
| lock.unlock(); |
| |
| TRACE_EVENT0("flutter", "ConcurrentWorkerWake"); |
| // Execute the primary task we woke up for. |
| if (task) { |
| task(); |
| } |
| |
| // Execute any thread tasks. |
| for (const auto& thread_task : thread_tasks) { |
| thread_task(); |
| } |
| |
| if (shutdown_now) { |
| break; |
| } |
| } |
| } |
| |
| void ConcurrentMessageLoop::Terminate() { |
| std::scoped_lock lock(tasks_mutex_); |
| shutdown_ = true; |
| tasks_condition_.notify_all(); |
| } |
| |
| void ConcurrentMessageLoop::PostTaskToAllWorkers(fml::closure task) { |
| if (!task) { |
| return; |
| } |
| |
| std::scoped_lock lock(tasks_mutex_); |
| for (const auto& worker_thread_id : worker_thread_ids_) { |
| thread_tasks_[worker_thread_id].emplace_back(task); |
| } |
| tasks_condition_.notify_all(); |
| } |
| |
| bool ConcurrentMessageLoop::HasThreadTasksLocked() const { |
| return thread_tasks_.count(std::this_thread::get_id()) > 0; |
| } |
| |
| std::vector<fml::closure> ConcurrentMessageLoop::GetThreadTasksLocked() { |
| auto found = thread_tasks_.find(std::this_thread::get_id()); |
| FML_DCHECK(found != thread_tasks_.end()); |
| std::vector<fml::closure> pending_tasks; |
| std::swap(pending_tasks, found->second); |
| thread_tasks_.erase(found); |
| return pending_tasks; |
| } |
| |
| ConcurrentTaskRunner::ConcurrentTaskRunner( |
| std::weak_ptr<ConcurrentMessageLoop> weak_loop) |
| : weak_loop_(std::move(weak_loop)) {} |
| |
| ConcurrentTaskRunner::~ConcurrentTaskRunner() = default; |
| |
| void ConcurrentTaskRunner::PostTask(const fml::closure& task) { |
| if (!task) { |
| return; |
| } |
| |
| if (auto loop = weak_loop_.lock()) { |
| loop->PostTask(task); |
| return; |
| } |
| |
| FML_DLOG(WARNING) |
| << "Tried to post to a concurrent message loop that has already died. " |
| "Executing the task on the callers thread."; |
| task(); |
| } |
| |
| } // namespace fml |