From 1d1452d4aa28206776e6d35525fad2c27fd3ce03 Mon Sep 17 00:00:00 2001 From: Attila Uygun Date: Mon, 5 Jun 2023 23:05:59 +0200 Subject: [PATCH] Add support for canceling tasks in TaskRunner --- src/base/task_runner.cc | 23 ++++++++++++++++++++++- src/base/task_runner.h | 5 ++++- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/src/base/task_runner.cc b/src/base/task_runner.cc index 3e07c31..fa8f670 100644 --- a/src/base/task_runner.cc +++ b/src/base/task_runner.cc @@ -35,7 +35,7 @@ TaskRunner* TaskRunner::GetThreadLocalTaskRunner() { void TaskRunner::PostTask(const Location& from, Closure task) { DCHECK(task) << LOCATION(from); - task_count_.fetch_add(1, std::memory_order_release); + task_count_.fetch_add(1, std::memory_order_relaxed); std::lock_guard scoped_lock(lock_); queue_.emplace_back(from, std::move(task)); } @@ -71,6 +71,11 @@ void TaskRunner::MultiConsumerRun() { task_cb(); task_count_.fetch_sub(1, std::memory_order_release); + + if (cancel_tasks_.load(std::memory_order_relaxed)) { + CancelTasksInternal(); + break; + } } } @@ -93,12 +98,28 @@ void TaskRunner::SingleConsumerRun() { task_cb(); task_count_.fetch_sub(1, std::memory_order_release); + + if (cancel_tasks_.load(std::memory_order_relaxed)) { + CancelTasksInternal(); + break; + } } } +void TaskRunner::CancelTasks() { + cancel_tasks_.store(true, std::memory_order_relaxed); +} + void TaskRunner::WaitForCompletion() { while (task_count_.load(std::memory_order_acquire) > 0) std::this_thread::yield(); } +void TaskRunner::CancelTasksInternal() { + cancel_tasks_.store(false, std::memory_order_relaxed); + task_count_.store(0, std::memory_order_relaxed); + std::lock_guard scoped_lock(lock_); + queue_.clear(); +} + } // namespace base diff --git a/src/base/task_runner.h b/src/base/task_runner.h index a46cf37..c3ef81f 100644 --- a/src/base/task_runner.h +++ b/src/base/task_runner.h @@ -60,9 +60,9 @@ class TaskRunner { } void MultiConsumerRun(); - void SingleConsumerRun(); + void CancelTasks(); void WaitForCompletion(); static void CreateThreadLocalTaskRunner(); @@ -74,9 +74,12 @@ class TaskRunner { std::deque queue_; mutable std::mutex lock_; std::atomic task_count_{0}; + std::atomic cancel_tasks_{false}; static thread_local std::unique_ptr thread_local_task_runner; + void CancelTasksInternal(); + TaskRunner(TaskRunner const&) = delete; TaskRunner& operator=(TaskRunner const&) = delete; };