From 69a05c00e936e0c2e851e741b07f78a4e09f76dd Mon Sep 17 00:00:00 2001 From: Attila Uygun Date: Sat, 24 Jun 2023 09:48:21 +0200 Subject: [PATCH] Support for adding tasks in front of the task queue --- src/base/task_runner.cc | 21 ++++++++++++++------- src/base/task_runner.h | 12 ++++++++---- src/base/thread_pool.cc | 11 +++++++---- src/base/thread_pool.h | 12 ++++++++---- src/engine/audio/audio_mixer.cc | 3 ++- 5 files changed, 39 insertions(+), 20 deletions(-) diff --git a/src/base/task_runner.cc b/src/base/task_runner.cc index 3a0396f..fb0a124 100644 --- a/src/base/task_runner.cc +++ b/src/base/task_runner.cc @@ -11,11 +11,12 @@ namespace { void PostTaskAndReplyRelay(Location from, Closure task_cb, Closure reply_cb, - std::shared_ptr destination) { + std::shared_ptr destination, + bool front) { task_cb(); if (reply_cb) - destination->PostTask(from, std::move(reply_cb)); + destination->PostTask(from, std::move(reply_cb), front); } } // namespace @@ -36,22 +37,28 @@ std::shared_ptr TaskRunner::GetThreadLocalTaskRunner() { return thread_local_task_runner; } -void TaskRunner::PostTask(Location from, Closure task) { +void TaskRunner::PostTask(Location from, Closure task, bool front) { DCHECK(task) << LOCATION(from); task_count_.fetch_add(1, std::memory_order_relaxed); std::lock_guard scoped_lock(lock_); - queue_.emplace_back(from, std::move(task)); + if (front) + queue_.emplace_front(from, std::move(task)); + else + queue_.emplace_back(from, std::move(task)); } -void TaskRunner::PostTaskAndReply(Location from, Closure task, Closure reply) { +void TaskRunner::PostTaskAndReply(Location from, + Closure task, + Closure reply, + bool front) { DCHECK(task) << LOCATION(from); DCHECK(reply) << LOCATION(from); DCHECK(thread_local_task_runner) << LOCATION(from); auto relay = std::bind(PostTaskAndReplyRelay, from, std::move(task), - std::move(reply), thread_local_task_runner); - PostTask(from, std::move(relay)); + std::move(reply), thread_local_task_runner, front); + PostTask(from, std::move(relay), front); } void TaskRunner::CancelTasks() { diff --git a/src/base/task_runner.h b/src/base/task_runner.h index 0c69d23..44ae501 100644 --- a/src/base/task_runner.h +++ b/src/base/task_runner.h @@ -45,21 +45,25 @@ class TaskRunner { static void CreateThreadLocalTaskRunner(); static std::shared_ptr GetThreadLocalTaskRunner(); - void PostTask(Location from, Closure task); + void PostTask(Location from, Closure task, bool front = false); - void PostTaskAndReply(Location from, Closure task, Closure reply); + void PostTaskAndReply(Location from, + Closure task, + Closure reply, + bool front = false); template void PostTaskAndReplyWithResult(Location from, std::function task, - std::function reply) { + std::function reply, + bool front = false) { auto* result = new ReturnType; return PostTaskAndReply( from, std::bind(internal::ReturnAsParamAdapter, std::move(task), result), std::bind(internal::ReplyAdapter, std::move(reply), - result)); + result), front); } void CancelTasks(); diff --git a/src/base/thread_pool.cc b/src/base/thread_pool.cc index 3aa69b4..03b3f99 100644 --- a/src/base/thread_pool.cc +++ b/src/base/thread_pool.cc @@ -40,17 +40,20 @@ void ThreadPool::Shutdown() { threads_.clear(); } -void ThreadPool::PostTask(Location from, Closure task) { +void ThreadPool::PostTask(Location from, Closure task, bool front) { DCHECK((!threads_.empty())); - task_runner_.PostTask(from, std::move(task)); + task_runner_.PostTask(from, std::move(task), front); semaphore_.release(); } -void ThreadPool::PostTaskAndReply(Location from, Closure task, Closure reply) { +void ThreadPool::PostTaskAndReply(Location from, + Closure task, + Closure reply, + bool front) { DCHECK((!threads_.empty())); - task_runner_.PostTaskAndReply(from, std::move(task), std::move(reply)); + task_runner_.PostTaskAndReply(from, std::move(task), std::move(reply), front); semaphore_.release(); } diff --git a/src/base/thread_pool.h b/src/base/thread_pool.h index 73979bb..1c39029 100644 --- a/src/base/thread_pool.h +++ b/src/base/thread_pool.h @@ -24,16 +24,20 @@ class ThreadPool { void Shutdown(); - void PostTask(Location from, Closure task); + void PostTask(Location from, Closure task, bool front = false); - void PostTaskAndReply(Location from, Closure task, Closure reply); + void PostTaskAndReply(Location from, + Closure task, + Closure reply, + bool front = false); template void PostTaskAndReplyWithResult(Location from, std::function task, - std::function reply) { + std::function reply, + bool front = false) { task_runner_.PostTaskAndReplyWithResult(from, std::move(task), - std::move(reply)); + std::move(reply), front); semaphore_.release(); } diff --git a/src/engine/audio/audio_mixer.cc b/src/engine/audio/audio_mixer.cc index 3487adb..d92c06a 100644 --- a/src/engine/audio/audio_mixer.cc +++ b/src/engine/audio/audio_mixer.cc @@ -254,7 +254,8 @@ void AudioMixer::RenderAudio(float* output_buffer, size_t num_frames) { ThreadPool::Get().PostTask( HERE, - std::bind(&AudioMixer::DoStream, this, *it, flags & kLoop)); + std::bind(&AudioMixer::DoStream, this, *it, flags & kLoop), + true); } else { DLOG << "Mixer buffer underrun!"; }