Compare commits

..

10 Commits

Author SHA1 Message Date
Attila Uygun 8fd24a3c1a Update TaskRunner
Remove SingleConsumerRun
Rename MultiConsumerRun to RunTasks
Refactor CancelTasks
2023-06-13 23:27:10 +02:00
Attila Uygun 305b23738c Make thread-local-task-runner a shared_ptr 2023-06-13 23:27:10 +02:00
Attila Uygun 83400a0b52 Use string for callback location info 2023-06-13 23:27:10 +02:00
Attila Uygun fdbbb2a6fd Fix for ReplyAdapter in TaskRunner 2023-06-12 00:35:57 +02:00
Attila Uygun ab4c0c7e57 Cleanup 2023-06-12 00:34:29 +02:00
Attila Uygun 325b07d844 Do not refresh image when texture is not in use 2023-06-12 00:34:29 +02:00
Attila Uygun 05b86a38c1 Add comments 2023-06-11 22:42:34 +02:00
Attila Uygun 0876203d82 Cancel tasks on shutdown 2023-06-11 22:42:34 +02:00
Attila Uygun f54835e5f3 Consume input events on ContextLost 2023-06-11 22:42:34 +02:00
Attila Uygun ba246eca7f Revert "Consume input events when switching between renderers"
This reverts commit 5c6e414a15.
2023-06-06 23:00:55 +02:00
11 changed files with 79 additions and 116 deletions

View File

@ -12,12 +12,11 @@
// Helper for logging location info, e.g. LOG << LOCATION(from) // Helper for logging location info, e.g. LOG << LOCATION(from)
#define LOCATION(from) \ #define LOCATION(from) \
std::get<0>(from) << "() [" << [](const char* path) -> std::string { \ std::get<0>(from) << "() [" << [](std::string path) -> std::string { \
std::string file_name(path); \ size_t last_slash_pos = path.find_last_of("\\/"); \
size_t last_slash_pos = file_name.find_last_of("\\/"); \
if (last_slash_pos != std::string::npos) \ if (last_slash_pos != std::string::npos) \
file_name = file_name.substr(last_slash_pos + 1); \ path = path.substr(last_slash_pos + 1); \
return file_name; \ return path; \
}(std::get<1>(from)) << ":" \ }(std::get<1>(from)) << ":" \
<< std::get<2>(from) << "]" << std::get<2>(from) << "]"
@ -36,7 +35,7 @@ using Closure = std::function<void()>;
// Provides location info (function name, file name and line number) where of a // Provides location info (function name, file name and line number) where of a
// Closure was constructed. // Closure was constructed.
using Location = std::tuple<const char*, const char*, int>; using Location = std::tuple<std::string, std::string, int>;
#else #else

View File

@ -4,12 +4,14 @@
#include "base/log.h" #include "base/log.h"
namespace base {
namespace { namespace {
void PostTaskAndReplyRelay(base::Location from, void PostTaskAndReplyRelay(Location from,
base::Closure task_cb, Closure task_cb,
base::Closure reply_cb, Closure reply_cb,
base::TaskRunner* destination) { std::shared_ptr<TaskRunner> destination) {
task_cb(); task_cb();
if (reply_cb) if (reply_cb)
@ -18,21 +20,23 @@ void PostTaskAndReplyRelay(base::Location from,
} // namespace } // namespace
namespace base { // The task runner that belongs to the thread it's created in. Tasks to be run
// on a specific thread can be posted to this task runner.
thread_local std::unique_ptr<TaskRunner> TaskRunner::thread_local_task_runner; // TaskRunner::GetThreadLocalTaskRunner()->RunTasks() is expected to be
// periodically called.
thread_local std::shared_ptr<TaskRunner> TaskRunner::thread_local_task_runner;
void TaskRunner::CreateThreadLocalTaskRunner() { void TaskRunner::CreateThreadLocalTaskRunner() {
DCHECK(!thread_local_task_runner); DCHECK(!thread_local_task_runner);
thread_local_task_runner = std::make_unique<TaskRunner>(); thread_local_task_runner = std::make_shared<TaskRunner>();
} }
TaskRunner* TaskRunner::GetThreadLocalTaskRunner() { std::shared_ptr<TaskRunner> TaskRunner::GetThreadLocalTaskRunner() {
return thread_local_task_runner.get(); return thread_local_task_runner;
} }
void TaskRunner::PostTask(const Location& from, Closure task) { void TaskRunner::PostTask(Location from, Closure task) {
DCHECK(task) << LOCATION(from); DCHECK(task) << LOCATION(from);
task_count_.fetch_add(1, std::memory_order_relaxed); task_count_.fetch_add(1, std::memory_order_relaxed);
@ -40,19 +44,28 @@ void TaskRunner::PostTask(const Location& from, Closure task) {
queue_.emplace_back(from, std::move(task)); queue_.emplace_back(from, std::move(task));
} }
void TaskRunner::PostTaskAndReply(const Location& from, void TaskRunner::PostTaskAndReply(Location from, Closure task, Closure reply) {
Closure task,
Closure reply) {
DCHECK(task) << LOCATION(from); DCHECK(task) << LOCATION(from);
DCHECK(reply) << LOCATION(from); DCHECK(reply) << LOCATION(from);
DCHECK(thread_local_task_runner) << LOCATION(from); DCHECK(thread_local_task_runner) << LOCATION(from);
auto relay = std::bind(::PostTaskAndReplyRelay, from, std::move(task), auto relay = std::bind(PostTaskAndReplyRelay, from, std::move(task),
std::move(reply), thread_local_task_runner.get()); std::move(reply), thread_local_task_runner);
PostTask(from, std::move(relay)); PostTask(from, std::move(relay));
} }
void TaskRunner::MultiConsumerRun() { void TaskRunner::CancelTasks() {
std::lock_guard<std::mutex> scoped_lock(lock_);
task_count_.fetch_sub(queue_.size(), std::memory_order_release);
queue_.clear();
}
void TaskRunner::WaitForCompletion() {
while (task_count_.load(std::memory_order_acquire) > 0)
std::this_thread::yield();
}
void TaskRunner::RunTasks() {
for (;;) { for (;;) {
Task task; Task task;
{ {
@ -71,55 +84,7 @@ void TaskRunner::MultiConsumerRun() {
task_cb(); task_cb();
task_count_.fetch_sub(1, std::memory_order_release); task_count_.fetch_sub(1, std::memory_order_release);
if (cancel_tasks_.load(std::memory_order_relaxed)) {
CancelTasksInternal();
break;
}
} }
} }
void TaskRunner::SingleConsumerRun() {
std::deque<Task> queue;
{
std::lock_guard<std::mutex> scoped_lock(lock_);
if (queue_.empty())
return;
queue.swap(queue_);
}
while (!queue.empty()) {
auto [from, task_cb] = queue.front();
queue.pop_front();
#if 0
LOG << __func__ << " from: " << LOCATION(from);
#endif
task_cb();
task_count_.fetch_sub(1, std::memory_order_release);
if (cancel_tasks_.load(std::memory_order_relaxed)) {
CancelTasksInternal();
break;
}
}
}
void TaskRunner::CancelTasks() {
cancel_tasks_.store(true, std::memory_order_relaxed);
}
void TaskRunner::WaitForCompletion() {
while (task_count_.load(std::memory_order_acquire) > 0)
std::this_thread::yield();
}
void TaskRunner::CancelTasksInternal() {
cancel_tasks_.store(false, std::memory_order_relaxed);
task_count_.store(0, std::memory_order_relaxed);
std::lock_guard<std::mutex> scoped_lock(lock_);
queue_.clear();
}
} // namespace base } // namespace base

View File

@ -26,7 +26,7 @@ void ReturnAsParamAdapter(std::function<ReturnType()> func,
template <typename ReturnType> template <typename ReturnType>
void ReplyAdapter(std::function<void(ReturnType)> callback, void ReplyAdapter(std::function<void(ReturnType)> callback,
ReturnType* result) { ReturnType* result) {
callback(*result); callback(std::move(*result));
delete result; delete result;
} }
@ -34,20 +34,23 @@ void ReplyAdapter(std::function<void(ReturnType)> callback,
// Runs queued tasks (in the form of Closure objects). All methods are // Runs queued tasks (in the form of Closure objects). All methods are
// thread-safe and can be called on any thread. // thread-safe and can be called on any thread.
// Tasks run in FIFO order. When consumed concurrently by multiple threads, it // Tasks run in FIFO order when consumed by a single thread. When consumed
// doesn't guarantee whether tasks overlap, or whether they run on a particular // concurrently by multiple threads, it doesn't guarantee whether tasks overlap,
// thread. // or whether they run on a particular thread.
class TaskRunner { class TaskRunner {
public: public:
TaskRunner() = default; TaskRunner() = default;
~TaskRunner() = default; ~TaskRunner() = default;
void PostTask(const Location& from, Closure task); static void CreateThreadLocalTaskRunner();
static std::shared_ptr<TaskRunner> GetThreadLocalTaskRunner();
void PostTaskAndReply(const Location& from, Closure task, Closure reply); void PostTask(Location from, Closure task);
void PostTaskAndReply(Location from, Closure task, Closure reply);
template <typename ReturnType> template <typename ReturnType>
void PostTaskAndReplyWithResult(const Location& from, void PostTaskAndReplyWithResult(Location from,
std::function<ReturnType()> task, std::function<ReturnType()> task,
std::function<void(ReturnType)> reply) { std::function<void(ReturnType)> reply) {
auto* result = new ReturnType; auto* result = new ReturnType;
@ -59,14 +62,11 @@ class TaskRunner {
result)); result));
} }
void MultiConsumerRun();
void SingleConsumerRun();
void CancelTasks(); void CancelTasks();
void WaitForCompletion(); void WaitForCompletion();
static void CreateThreadLocalTaskRunner(); void RunTasks();
static TaskRunner* GetThreadLocalTaskRunner();
private: private:
using Task = std::tuple<Location, Closure>; using Task = std::tuple<Location, Closure>;
@ -74,11 +74,8 @@ class TaskRunner {
std::deque<Task> queue_; std::deque<Task> queue_;
mutable std::mutex lock_; mutable std::mutex lock_;
std::atomic<size_t> task_count_{0}; std::atomic<size_t> task_count_{0};
std::atomic<bool> cancel_tasks_{false};
static thread_local std::unique_ptr<TaskRunner> thread_local_task_runner; static thread_local std::shared_ptr<TaskRunner> thread_local_task_runner;
void CancelTasksInternal();
TaskRunner(TaskRunner const&) = delete; TaskRunner(TaskRunner const&) = delete;
TaskRunner& operator=(TaskRunner const&) = delete; TaskRunner& operator=(TaskRunner const&) = delete;

View File

@ -40,30 +40,31 @@ void ThreadPool::Shutdown() {
threads_.clear(); threads_.clear();
} }
void ThreadPool::PostTask(const Location& from, Closure task) { void ThreadPool::PostTask(Location from, Closure task) {
DCHECK((!threads_.empty())); DCHECK((!threads_.empty()));
task_runner_.PostTask(from, std::move(task)); task_runner_.PostTask(from, std::move(task));
semaphore_.release(); semaphore_.release();
} }
void ThreadPool::PostTaskAndReply(const Location& from, void ThreadPool::PostTaskAndReply(Location from, Closure task, Closure reply) {
Closure task,
Closure reply) {
DCHECK((!threads_.empty())); DCHECK((!threads_.empty()));
task_runner_.PostTaskAndReply(from, std::move(task), std::move(reply)); task_runner_.PostTaskAndReply(from, std::move(task), std::move(reply));
semaphore_.release(); semaphore_.release();
} }
void ThreadPool::CancelTasks() {
task_runner_.CancelTasks();
}
void ThreadPool::WorkerMain() { void ThreadPool::WorkerMain() {
for (;;) { for (;;) {
semaphore_.acquire(); semaphore_.acquire();
if (quit_.load(std::memory_order_relaxed)) if (quit_.load(std::memory_order_relaxed))
return; return;
task_runner_.MultiConsumerRun(); task_runner_.RunTasks();
} }
} }

View File

@ -11,8 +11,6 @@
namespace base { namespace base {
class TaskRunner;
// Feed the ThreadPool tasks (in the form of Closure objects) and they will be // Feed the ThreadPool tasks (in the form of Closure objects) and they will be
// called on any thread from the pool. // called on any thread from the pool.
class ThreadPool { class ThreadPool {
@ -26,12 +24,12 @@ class ThreadPool {
void Shutdown(); void Shutdown();
void PostTask(const Location& from, Closure task); void PostTask(Location from, Closure task);
void PostTaskAndReply(const Location& from, Closure task, Closure reply); void PostTaskAndReply(Location from, Closure task, Closure reply);
template <typename ReturnType> template <typename ReturnType>
void PostTaskAndReplyWithResult(const Location& from, void PostTaskAndReplyWithResult(Location from,
std::function<ReturnType()> task, std::function<ReturnType()> task,
std::function<void(ReturnType)> reply) { std::function<void(ReturnType)> reply) {
task_runner_.PostTaskAndReplyWithResult(from, std::move(task), task_runner_.PostTaskAndReplyWithResult(from, std::move(task),
@ -39,6 +37,8 @@ class ThreadPool {
semaphore_.release(); semaphore_.release();
} }
void CancelTasks();
private: private:
std::vector<std::thread> threads_; std::vector<std::thread> threads_;

View File

@ -201,7 +201,6 @@ bool Menu::Initialize() {
: RendererType::kOpenGL); : RendererType::kOpenGL);
renderer_type_.SetEnabled( renderer_type_.SetEnabled(
(Engine::Get().GetRendererType() == RendererType::kVulkan)); (Engine::Get().GetRendererType() == RendererType::kVulkan));
Engine::Get().ConsumeInputEvents();
}, },
true, Engine::Get().GetRendererType() == RendererType::kVulkan, true, Engine::Get().GetRendererType() == RendererType::kVulkan,
kColorFadeOut, {Vector4f{1, 1, 1, 1}, Vector4f{1, 1, 1, 1}}); kColorFadeOut, {Vector4f{1, 1, 1, 1}, Vector4f{1, 1, 1, 1}});

View File

@ -16,7 +16,6 @@ class TaskRunner;
namespace eng { namespace eng {
class AudioSink;
class AudioBus; class AudioBus;
// Mix and render audio with low overhead. A platform specific AudioSink // Mix and render audio with low overhead. A platform specific AudioSink
@ -85,13 +84,13 @@ class AudioMixer : public AudioSink::Delegate {
std::list<std::shared_ptr<Resource>> end_list_; std::list<std::shared_ptr<Resource>> end_list_;
base::TaskRunner* main_thread_task_runner_; std::shared_ptr<base::TaskRunner> main_thread_task_runner_;
std::unique_ptr<AudioSink> audio_sink_; std::unique_ptr<AudioSink> audio_sink_;
bool audio_enabled_ = true; bool audio_enabled_ = true;
// AudioSink::Delegate implementation // AudioSink::Delegate interface
int GetChannelCount() final { return kChannelCount; } int GetChannelCount() final { return kChannelCount; }
void RenderAudio(float* output_buffer, size_t num_frames) final; void RenderAudio(float* output_buffer, size_t num_frames) final;

View File

@ -50,6 +50,9 @@ Engine::Engine(Platform* platform)
Engine::~Engine() { Engine::~Engine() {
LOG << "Shutting down engine."; LOG << "Shutting down engine.";
thread_pool_.CancelTasks();
thread_pool_.Shutdown();
game_.reset(); game_.reset();
stats_.reset(); stats_.reset();
textures_.clear(); textures_.clear();
@ -73,7 +76,7 @@ void Engine::Run() {
float frame_frac = 0.0f; float frame_frac = 0.0f;
for (;;) { for (;;) {
TaskRunner::GetThreadLocalTaskRunner()->SingleConsumerRun(); TaskRunner::GetThreadLocalTaskRunner()->RunTasks();
platform_->Update(); platform_->Update();
if (platform_->should_exit()) if (platform_->should_exit())
@ -290,11 +293,13 @@ void Engine::RefreshImage(const std::string& asset_name) {
return; return;
} }
auto image = it->second.create_image(); if (it->second.persistent || it->second.use_count > 0) {
if (image) auto image = it->second.create_image();
it->second.texture->Update(std::move(image)); if (image)
else it->second.texture->Update(std::move(image));
it->second.texture->Destroy(); else
it->second.texture->Destroy();
}
} }
Texture* Engine::AcquireTexture(const std::string& asset_name) { Texture* Engine::AcquireTexture(const std::string& asset_name) {
@ -304,9 +309,9 @@ Texture* Engine::AcquireTexture(const std::string& asset_name) {
return nullptr; return nullptr;
} }
it->second.use_count++;
if (!it->second.texture->IsValid()) if (!it->second.texture->IsValid())
RefreshImage(it->first); RefreshImage(it->first);
it->second.use_count++;
return it->second.texture.get(); return it->second.texture.get();
} }
@ -396,10 +401,6 @@ std::unique_ptr<InputEvent> Engine::GetNextInputEvent() {
return event; return event;
} }
void Engine::ConsumeInputEvents() {
input_queue_.clear();
}
void Engine::StartRecording(const Json::Value& payload) { void Engine::StartRecording(const Json::Value& payload) {
if (!replaying_ && !recording_) { if (!replaying_ && !recording_) {
recording_ = true; recording_ = true;
@ -635,6 +636,8 @@ void Engine::ContextLost() {
if (game_) if (game_)
game_->ContextLost(); game_->ContextLost();
input_queue_.clear();
} }
void Engine::SetStatsVisible(bool visible) { void Engine::SetStatsVisible(bool visible) {

View File

@ -84,7 +84,6 @@ class Engine : public PlatformObserver {
void RemoveCustomShader(const std::string& asset_name); void RemoveCustomShader(const std::string& asset_name);
std::unique_ptr<InputEvent> GetNextInputEvent(); std::unique_ptr<InputEvent> GetNextInputEvent();
void ConsumeInputEvents();
void StartRecording(const Json::Value& payload); void StartRecording(const Json::Value& payload);
void EndRecording(const std::string file_name); void EndRecording(const std::string file_name);

View File

@ -144,7 +144,7 @@ class RendererOpenGL final : public Renderer {
std::counting_semaphore<> draw_complete_semaphore_{0}; std::counting_semaphore<> draw_complete_semaphore_{0};
base::TaskRunner* main_thread_task_runner_; std::shared_ptr<base::TaskRunner> main_thread_task_runner_;
#endif // THREADED_RENDERING #endif // THREADED_RENDERING
// Stats. // Stats.

View File

@ -987,6 +987,7 @@ void RendererVulkan::Shutdown() {
return; return;
LOG << "Shutting down renderer."; LOG << "Shutting down renderer.";
task_runner_.CancelTasks();
quit_.store(true, std::memory_order_relaxed); quit_.store(true, std::memory_order_relaxed);
semaphore_.release(); semaphore_.release();
setup_thread_.join(); setup_thread_.join();
@ -2005,7 +2006,7 @@ void RendererVulkan::SetupThreadMain(int preallocate) {
if (quit_.load(std::memory_order_relaxed)) if (quit_.load(std::memory_order_relaxed))
break; break;
task_runner_.SingleConsumerRun(); task_runner_.RunTasks();
} }
for (size_t i = 0; i < staging_buffers_.size(); i++) { for (size_t i = 0; i < staging_buffers_.size(); i++) {