Add support for canceling tasks in TaskRunner

This commit is contained in:
Attila Uygun 2023-06-05 23:05:59 +02:00
parent 557eb517cb
commit 1d1452d4aa
2 changed files with 26 additions and 2 deletions

View File

@ -35,7 +35,7 @@ TaskRunner* TaskRunner::GetThreadLocalTaskRunner() {
void TaskRunner::PostTask(const Location& from, Closure task) { void TaskRunner::PostTask(const Location& from, Closure task) {
DCHECK(task) << LOCATION(from); DCHECK(task) << LOCATION(from);
task_count_.fetch_add(1, std::memory_order_release); task_count_.fetch_add(1, std::memory_order_relaxed);
std::lock_guard<std::mutex> scoped_lock(lock_); std::lock_guard<std::mutex> scoped_lock(lock_);
queue_.emplace_back(from, std::move(task)); queue_.emplace_back(from, std::move(task));
} }
@ -71,6 +71,11 @@ void TaskRunner::MultiConsumerRun() {
task_cb(); task_cb();
task_count_.fetch_sub(1, std::memory_order_release); task_count_.fetch_sub(1, std::memory_order_release);
if (cancel_tasks_.load(std::memory_order_relaxed)) {
CancelTasksInternal();
break;
}
} }
} }
@ -93,12 +98,28 @@ void TaskRunner::SingleConsumerRun() {
task_cb(); task_cb();
task_count_.fetch_sub(1, std::memory_order_release); task_count_.fetch_sub(1, std::memory_order_release);
if (cancel_tasks_.load(std::memory_order_relaxed)) {
CancelTasksInternal();
break;
}
} }
} }
void TaskRunner::CancelTasks() {
cancel_tasks_.store(true, std::memory_order_relaxed);
}
void TaskRunner::WaitForCompletion() { void TaskRunner::WaitForCompletion() {
while (task_count_.load(std::memory_order_acquire) > 0) while (task_count_.load(std::memory_order_acquire) > 0)
std::this_thread::yield(); std::this_thread::yield();
} }
void TaskRunner::CancelTasksInternal() {
cancel_tasks_.store(false, std::memory_order_relaxed);
task_count_.store(0, std::memory_order_relaxed);
std::lock_guard<std::mutex> scoped_lock(lock_);
queue_.clear();
}
} // namespace base } // namespace base

View File

@ -60,9 +60,9 @@ class TaskRunner {
} }
void MultiConsumerRun(); void MultiConsumerRun();
void SingleConsumerRun(); void SingleConsumerRun();
void CancelTasks();
void WaitForCompletion(); void WaitForCompletion();
static void CreateThreadLocalTaskRunner(); static void CreateThreadLocalTaskRunner();
@ -74,9 +74,12 @@ class TaskRunner {
std::deque<Task> queue_; std::deque<Task> queue_;
mutable std::mutex lock_; mutable std::mutex lock_;
std::atomic<size_t> task_count_{0}; std::atomic<size_t> task_count_{0};
std::atomic<bool> cancel_tasks_{false};
static thread_local std::unique_ptr<TaskRunner> thread_local_task_runner; static thread_local std::unique_ptr<TaskRunner> thread_local_task_runner;
void CancelTasksInternal();
TaskRunner(TaskRunner const&) = delete; TaskRunner(TaskRunner const&) = delete;
TaskRunner& operator=(TaskRunner const&) = delete; TaskRunner& operator=(TaskRunner const&) = delete;
}; };