From 8fd24a3c1a93a676a3710fcfd9226eeec268a7c2 Mon Sep 17 00:00:00 2001 From: Attila Uygun Date: Mon, 12 Jun 2023 19:56:27 +0200 Subject: [PATCH] Update TaskRunner Remove SingleConsumerRun Rename MultiConsumerRun to RunTasks Refactor CancelTasks --- src/base/task_runner.cc | 63 ++++--------------- src/base/task_runner.h | 13 ++-- src/base/thread_pool.cc | 3 +- src/engine/engine.cc | 2 +- src/engine/renderer/vulkan/renderer_vulkan.cc | 2 +- 5 files changed, 21 insertions(+), 62 deletions(-) diff --git a/src/base/task_runner.cc b/src/base/task_runner.cc index c2da56b..3a0396f 100644 --- a/src/base/task_runner.cc +++ b/src/base/task_runner.cc @@ -22,7 +22,7 @@ void PostTaskAndReplyRelay(Location from, // The task runner that belongs to the thread it's created in. Tasks to be run // on a specific thread can be posted to this task runner. -// TaskRunner::GetThreadLocalTaskRunner()->SingleConsumerRun() is expected to be +// TaskRunner::GetThreadLocalTaskRunner()->RunTasks() is expected to be // periodically called. thread_local std::shared_ptr TaskRunner::thread_local_task_runner; @@ -54,7 +54,18 @@ void TaskRunner::PostTaskAndReply(Location from, Closure task, Closure reply) { PostTask(from, std::move(relay)); } -void TaskRunner::MultiConsumerRun() { +void TaskRunner::CancelTasks() { + std::lock_guard scoped_lock(lock_); + task_count_.fetch_sub(queue_.size(), std::memory_order_release); + queue_.clear(); +} + +void TaskRunner::WaitForCompletion() { + while (task_count_.load(std::memory_order_acquire) > 0) + std::this_thread::yield(); +} + +void TaskRunner::RunTasks() { for (;;) { Task task; { @@ -73,55 +84,7 @@ void TaskRunner::MultiConsumerRun() { task_cb(); task_count_.fetch_sub(1, std::memory_order_release); - - if (cancel_tasks_.load(std::memory_order_relaxed)) { - CancelTasksInternal(); - break; - } } } -void TaskRunner::SingleConsumerRun() { - std::deque queue; - { - std::lock_guard scoped_lock(lock_); - if (queue_.empty()) - return; - queue.swap(queue_); - } - - while (!queue.empty()) { - auto [from, task_cb] = queue.front(); - queue.pop_front(); - -#if 0 - LOG << __func__ << " from: " << LOCATION(from); -#endif - - task_cb(); - task_count_.fetch_sub(1, std::memory_order_release); - - if (cancel_tasks_.load(std::memory_order_relaxed)) { - CancelTasksInternal(); - break; - } - } -} - -void TaskRunner::CancelTasks() { - cancel_tasks_.store(true, std::memory_order_relaxed); -} - -void TaskRunner::WaitForCompletion() { - while (task_count_.load(std::memory_order_acquire) > 0) - std::this_thread::yield(); -} - -void TaskRunner::CancelTasksInternal() { - cancel_tasks_.store(false, std::memory_order_relaxed); - task_count_.store(0, std::memory_order_relaxed); - std::lock_guard scoped_lock(lock_); - queue_.clear(); -} - } // namespace base diff --git a/src/base/task_runner.h b/src/base/task_runner.h index e5fdbf8..0c69d23 100644 --- a/src/base/task_runner.h +++ b/src/base/task_runner.h @@ -42,6 +42,9 @@ class TaskRunner { TaskRunner() = default; ~TaskRunner() = default; + static void CreateThreadLocalTaskRunner(); + static std::shared_ptr GetThreadLocalTaskRunner(); + void PostTask(Location from, Closure task); void PostTaskAndReply(Location from, Closure task, Closure reply); @@ -59,14 +62,11 @@ class TaskRunner { result)); } - void MultiConsumerRun(); - void SingleConsumerRun(); - void CancelTasks(); + void WaitForCompletion(); - static void CreateThreadLocalTaskRunner(); - static std::shared_ptr GetThreadLocalTaskRunner(); + void RunTasks(); private: using Task = std::tuple; @@ -74,12 +74,9 @@ class TaskRunner { std::deque queue_; mutable std::mutex lock_; std::atomic task_count_{0}; - std::atomic cancel_tasks_{false}; static thread_local std::shared_ptr thread_local_task_runner; - void CancelTasksInternal(); - TaskRunner(TaskRunner const&) = delete; TaskRunner& operator=(TaskRunner const&) = delete; }; diff --git a/src/base/thread_pool.cc b/src/base/thread_pool.cc index 312e13f..3aa69b4 100644 --- a/src/base/thread_pool.cc +++ b/src/base/thread_pool.cc @@ -61,11 +61,10 @@ void ThreadPool::CancelTasks() { void ThreadPool::WorkerMain() { for (;;) { semaphore_.acquire(); - if (quit_.load(std::memory_order_relaxed)) return; - task_runner_.MultiConsumerRun(); + task_runner_.RunTasks(); } } diff --git a/src/engine/engine.cc b/src/engine/engine.cc index c4c72e4..7183b81 100644 --- a/src/engine/engine.cc +++ b/src/engine/engine.cc @@ -76,7 +76,7 @@ void Engine::Run() { float frame_frac = 0.0f; for (;;) { - TaskRunner::GetThreadLocalTaskRunner()->SingleConsumerRun(); + TaskRunner::GetThreadLocalTaskRunner()->RunTasks(); platform_->Update(); if (platform_->should_exit()) diff --git a/src/engine/renderer/vulkan/renderer_vulkan.cc b/src/engine/renderer/vulkan/renderer_vulkan.cc index a31c0c7..f0a1d9a 100644 --- a/src/engine/renderer/vulkan/renderer_vulkan.cc +++ b/src/engine/renderer/vulkan/renderer_vulkan.cc @@ -2006,7 +2006,7 @@ void RendererVulkan::SetupThreadMain(int preallocate) { if (quit_.load(std::memory_order_relaxed)) break; - task_runner_.SingleConsumerRun(); + task_runner_.RunTasks(); } for (size_t i = 0; i < staging_buffers_.size(); i++) {