Support for adding tasks in front of the task queue

This commit is contained in:
Attila Uygun 2023-06-24 09:48:21 +02:00
parent d7e444fa81
commit 69a05c00e9
5 changed files with 39 additions and 20 deletions

View File

@ -11,11 +11,12 @@ namespace {
void PostTaskAndReplyRelay(Location from, void PostTaskAndReplyRelay(Location from,
Closure task_cb, Closure task_cb,
Closure reply_cb, Closure reply_cb,
std::shared_ptr<TaskRunner> destination) { std::shared_ptr<TaskRunner> destination,
bool front) {
task_cb(); task_cb();
if (reply_cb) if (reply_cb)
destination->PostTask(from, std::move(reply_cb)); destination->PostTask(from, std::move(reply_cb), front);
} }
} // namespace } // namespace
@ -36,22 +37,28 @@ std::shared_ptr<TaskRunner> TaskRunner::GetThreadLocalTaskRunner() {
return thread_local_task_runner; return thread_local_task_runner;
} }
void TaskRunner::PostTask(Location from, Closure task) { void TaskRunner::PostTask(Location from, Closure task, bool front) {
DCHECK(task) << LOCATION(from); DCHECK(task) << LOCATION(from);
task_count_.fetch_add(1, std::memory_order_relaxed); task_count_.fetch_add(1, std::memory_order_relaxed);
std::lock_guard<std::mutex> scoped_lock(lock_); std::lock_guard<std::mutex> scoped_lock(lock_);
if (front)
queue_.emplace_front(from, std::move(task));
else
queue_.emplace_back(from, std::move(task)); queue_.emplace_back(from, std::move(task));
} }
void TaskRunner::PostTaskAndReply(Location from, Closure task, Closure reply) { void TaskRunner::PostTaskAndReply(Location from,
Closure task,
Closure reply,
bool front) {
DCHECK(task) << LOCATION(from); DCHECK(task) << LOCATION(from);
DCHECK(reply) << LOCATION(from); DCHECK(reply) << LOCATION(from);
DCHECK(thread_local_task_runner) << LOCATION(from); DCHECK(thread_local_task_runner) << LOCATION(from);
auto relay = std::bind(PostTaskAndReplyRelay, from, std::move(task), auto relay = std::bind(PostTaskAndReplyRelay, from, std::move(task),
std::move(reply), thread_local_task_runner); std::move(reply), thread_local_task_runner, front);
PostTask(from, std::move(relay)); PostTask(from, std::move(relay), front);
} }
void TaskRunner::CancelTasks() { void TaskRunner::CancelTasks() {

View File

@ -45,21 +45,25 @@ class TaskRunner {
static void CreateThreadLocalTaskRunner(); static void CreateThreadLocalTaskRunner();
static std::shared_ptr<TaskRunner> GetThreadLocalTaskRunner(); static std::shared_ptr<TaskRunner> GetThreadLocalTaskRunner();
void PostTask(Location from, Closure task); void PostTask(Location from, Closure task, bool front = false);
void PostTaskAndReply(Location from, Closure task, Closure reply); void PostTaskAndReply(Location from,
Closure task,
Closure reply,
bool front = false);
template <typename ReturnType> template <typename ReturnType>
void PostTaskAndReplyWithResult(Location from, void PostTaskAndReplyWithResult(Location from,
std::function<ReturnType()> task, std::function<ReturnType()> task,
std::function<void(ReturnType)> reply) { std::function<void(ReturnType)> reply,
bool front = false) {
auto* result = new ReturnType; auto* result = new ReturnType;
return PostTaskAndReply( return PostTaskAndReply(
from, from,
std::bind(internal::ReturnAsParamAdapter<ReturnType>, std::move(task), std::bind(internal::ReturnAsParamAdapter<ReturnType>, std::move(task),
result), result),
std::bind(internal::ReplyAdapter<ReturnType>, std::move(reply), std::bind(internal::ReplyAdapter<ReturnType>, std::move(reply),
result)); result), front);
} }
void CancelTasks(); void CancelTasks();

View File

@ -40,17 +40,20 @@ void ThreadPool::Shutdown() {
threads_.clear(); threads_.clear();
} }
void ThreadPool::PostTask(Location from, Closure task) { void ThreadPool::PostTask(Location from, Closure task, bool front) {
DCHECK((!threads_.empty())); DCHECK((!threads_.empty()));
task_runner_.PostTask(from, std::move(task)); task_runner_.PostTask(from, std::move(task), front);
semaphore_.release(); semaphore_.release();
} }
void ThreadPool::PostTaskAndReply(Location from, Closure task, Closure reply) { void ThreadPool::PostTaskAndReply(Location from,
Closure task,
Closure reply,
bool front) {
DCHECK((!threads_.empty())); DCHECK((!threads_.empty()));
task_runner_.PostTaskAndReply(from, std::move(task), std::move(reply)); task_runner_.PostTaskAndReply(from, std::move(task), std::move(reply), front);
semaphore_.release(); semaphore_.release();
} }

View File

@ -24,16 +24,20 @@ class ThreadPool {
void Shutdown(); void Shutdown();
void PostTask(Location from, Closure task); void PostTask(Location from, Closure task, bool front = false);
void PostTaskAndReply(Location from, Closure task, Closure reply); void PostTaskAndReply(Location from,
Closure task,
Closure reply,
bool front = false);
template <typename ReturnType> template <typename ReturnType>
void PostTaskAndReplyWithResult(Location from, void PostTaskAndReplyWithResult(Location from,
std::function<ReturnType()> task, std::function<ReturnType()> task,
std::function<void(ReturnType)> reply) { std::function<void(ReturnType)> reply,
bool front = false) {
task_runner_.PostTaskAndReplyWithResult(from, std::move(task), task_runner_.PostTaskAndReplyWithResult(from, std::move(task),
std::move(reply)); std::move(reply), front);
semaphore_.release(); semaphore_.release();
} }

View File

@ -254,7 +254,8 @@ void AudioMixer::RenderAudio(float* output_buffer, size_t num_frames) {
ThreadPool::Get().PostTask( ThreadPool::Get().PostTask(
HERE, HERE,
std::bind(&AudioMixer::DoStream, this, *it, flags & kLoop)); std::bind(&AudioMixer::DoStream, this, *it, flags & kLoop),
true);
} else { } else {
DLOG << "Mixer buffer underrun!"; DLOG << "Mixer buffer underrun!";
} }