kaliber/src/base/task_runner.cc

106 lines
2.5 KiB
C++
Raw Normal View History

#include "base/task_runner.h"
2020-04-13 11:24:53 +00:00
#include "base/log.h"
namespace {
void PostTaskAndReplyRelay(base::Location from,
base::Closure task_cb,
base::Closure reply_cb,
base::TaskRunner* destination) {
task_cb();
if (reply_cb)
destination->PostTask(from, std::move(reply_cb));
}
} // namespace
2020-04-13 11:24:53 +00:00
namespace base {
thread_local std::unique_ptr<TaskRunner> TaskRunner::thread_local_task_runner;
void TaskRunner::CreateThreadLocalTaskRunner() {
DCHECK(!thread_local_task_runner);
thread_local_task_runner = std::make_unique<TaskRunner>();
}
TaskRunner* TaskRunner::GetThreadLocalTaskRunner() {
return thread_local_task_runner.get();
}
void TaskRunner::PostTask(const Location& from, Closure task) {
DCHECK(task) << LOCATION(from);
task_count_.fetch_add(1, std::memory_order_relaxed);
std::lock_guard<std::mutex> scoped_lock(lock_);
queue_.emplace_back(from, std::move(task));
2020-04-13 11:24:53 +00:00
}
void TaskRunner::PostTaskAndReply(const Location& from,
Closure task,
Closure reply) {
DCHECK(task) << LOCATION(from);
DCHECK(reply) << LOCATION(from);
DCHECK(thread_local_task_runner) << LOCATION(from);
auto relay = std::bind(::PostTaskAndReplyRelay, from, std::move(task),
std::move(reply), thread_local_task_runner.get());
PostTask(from, std::move(relay));
}
void TaskRunner::MultiConsumerRun() {
2020-04-13 11:24:53 +00:00
for (;;) {
Task task;
2020-04-13 11:24:53 +00:00
{
std::lock_guard<std::mutex> scoped_lock(lock_);
if (queue_.empty())
return;
task.swap(queue_.front());
queue_.pop_front();
2020-04-13 11:24:53 +00:00
}
auto [from, task_cb] = task;
#if 0
LOG << __func__ << " from: " << LOCATION(from);
#endif
task_cb();
task_count_.fetch_sub(1, std::memory_order_relaxed);
2020-04-13 11:24:53 +00:00
}
}
void TaskRunner::SingleConsumerRun() {
std::deque<Task> queue;
{
std::lock_guard<std::mutex> scoped_lock(lock_);
if (queue_.empty())
return;
queue.swap(queue_);
}
while (!queue.empty()) {
auto [from, task_cb] = queue.front();
queue.pop_front();
#if 0
LOG << __func__ << " from: " << LOCATION(from);
#endif
task_cb();
task_count_.fetch_sub(1, std::memory_order_relaxed);
}
cv_.notify_one();
}
void TaskRunner::WaitForCompletion() {
std::unique_lock<std::mutex> scoped_lock(lock_);
cv_.wait(scoped_lock, [&]() -> bool {
return task_count_.load(std::memory_order_relaxed) == 0;
});
2020-04-13 11:24:53 +00:00
}
} // namespace base