Update TaskRunner

Remove SingleConsumerRun
Rename MultiConsumerRun to RunTasks
Refactor CancelTasks
This commit is contained in:
Attila Uygun 2023-06-12 19:56:27 +02:00
parent 305b23738c
commit 8fd24a3c1a
5 changed files with 21 additions and 62 deletions

View File

@ -22,7 +22,7 @@ void PostTaskAndReplyRelay(Location from,
// The task runner that belongs to the thread it's created in. Tasks to be run
// on a specific thread can be posted to this task runner.
// TaskRunner::GetThreadLocalTaskRunner()->SingleConsumerRun() is expected to be
// TaskRunner::GetThreadLocalTaskRunner()->RunTasks() is expected to be
// periodically called.
thread_local std::shared_ptr<TaskRunner> TaskRunner::thread_local_task_runner;
@ -54,7 +54,18 @@ void TaskRunner::PostTaskAndReply(Location from, Closure task, Closure reply) {
PostTask(from, std::move(relay));
}
void TaskRunner::MultiConsumerRun() {
void TaskRunner::CancelTasks() {
std::lock_guard<std::mutex> scoped_lock(lock_);
task_count_.fetch_sub(queue_.size(), std::memory_order_release);
queue_.clear();
}
void TaskRunner::WaitForCompletion() {
while (task_count_.load(std::memory_order_acquire) > 0)
std::this_thread::yield();
}
void TaskRunner::RunTasks() {
for (;;) {
Task task;
{
@ -73,55 +84,7 @@ void TaskRunner::MultiConsumerRun() {
task_cb();
task_count_.fetch_sub(1, std::memory_order_release);
if (cancel_tasks_.load(std::memory_order_relaxed)) {
CancelTasksInternal();
break;
}
}
}
void TaskRunner::SingleConsumerRun() {
std::deque<Task> queue;
{
std::lock_guard<std::mutex> scoped_lock(lock_);
if (queue_.empty())
return;
queue.swap(queue_);
}
while (!queue.empty()) {
auto [from, task_cb] = queue.front();
queue.pop_front();
#if 0
LOG << __func__ << " from: " << LOCATION(from);
#endif
task_cb();
task_count_.fetch_sub(1, std::memory_order_release);
if (cancel_tasks_.load(std::memory_order_relaxed)) {
CancelTasksInternal();
break;
}
}
}
void TaskRunner::CancelTasks() {
cancel_tasks_.store(true, std::memory_order_relaxed);
}
void TaskRunner::WaitForCompletion() {
while (task_count_.load(std::memory_order_acquire) > 0)
std::this_thread::yield();
}
void TaskRunner::CancelTasksInternal() {
cancel_tasks_.store(false, std::memory_order_relaxed);
task_count_.store(0, std::memory_order_relaxed);
std::lock_guard<std::mutex> scoped_lock(lock_);
queue_.clear();
}
} // namespace base

View File

@ -42,6 +42,9 @@ class TaskRunner {
TaskRunner() = default;
~TaskRunner() = default;
static void CreateThreadLocalTaskRunner();
static std::shared_ptr<TaskRunner> GetThreadLocalTaskRunner();
void PostTask(Location from, Closure task);
void PostTaskAndReply(Location from, Closure task, Closure reply);
@ -59,14 +62,11 @@ class TaskRunner {
result));
}
void MultiConsumerRun();
void SingleConsumerRun();
void CancelTasks();
void WaitForCompletion();
static void CreateThreadLocalTaskRunner();
static std::shared_ptr<TaskRunner> GetThreadLocalTaskRunner();
void RunTasks();
private:
using Task = std::tuple<Location, Closure>;
@ -74,12 +74,9 @@ class TaskRunner {
std::deque<Task> queue_;
mutable std::mutex lock_;
std::atomic<size_t> task_count_{0};
std::atomic<bool> cancel_tasks_{false};
static thread_local std::shared_ptr<TaskRunner> thread_local_task_runner;
void CancelTasksInternal();
TaskRunner(TaskRunner const&) = delete;
TaskRunner& operator=(TaskRunner const&) = delete;
};

View File

@ -61,11 +61,10 @@ void ThreadPool::CancelTasks() {
void ThreadPool::WorkerMain() {
for (;;) {
semaphore_.acquire();
if (quit_.load(std::memory_order_relaxed))
return;
task_runner_.MultiConsumerRun();
task_runner_.RunTasks();
}
}

View File

@ -76,7 +76,7 @@ void Engine::Run() {
float frame_frac = 0.0f;
for (;;) {
TaskRunner::GetThreadLocalTaskRunner()->SingleConsumerRun();
TaskRunner::GetThreadLocalTaskRunner()->RunTasks();
platform_->Update();
if (platform_->should_exit())

View File

@ -2006,7 +2006,7 @@ void RendererVulkan::SetupThreadMain(int preallocate) {
if (quit_.load(std::memory_order_relaxed))
break;
task_runner_.SingleConsumerRun();
task_runner_.RunTasks();
}
for (size_t i = 0; i < staging_buffers_.size(); i++) {