Use std::counting_semaphore

This commit is contained in:
Attila Uygun 2023-05-03 20:25:23 +02:00
parent 771f8a3d78
commit 653c283758
11 changed files with 26 additions and 73 deletions

View File

@ -1,41 +0,0 @@
#ifndef BASE_SEMAPHORE_H
#define BASE_SEMAPHORE_H
#include <condition_variable>
#include <mutex>
#include "base/log.h"
namespace base {
class Semaphore {
public:
Semaphore(int count = 0) : count_(count) {}
void Acquire() {
std::unique_lock<std::mutex> scoped_lock(mutex_);
cv_.wait(scoped_lock, [&]() { return count_ > 0; });
--count_;
DCHECK(count_ >= 0);
}
void Release() {
{
std::lock_guard<std::mutex> scoped_lock(mutex_);
++count_;
}
cv_.notify_one();
}
private:
std::condition_variable cv_;
std::mutex mutex_;
int count_ = 0;
Semaphore(Semaphore const&) = delete;
Semaphore& operator=(Semaphore const&) = delete;
};
} // namespace base
#endif // BASE_SEMAPHORE_H

View File

@ -1,5 +1,7 @@
#include "base/task_runner.h" #include "base/task_runner.h"
#include <thread>
#include "base/log.h" #include "base/log.h"
namespace { namespace {
@ -33,7 +35,7 @@ TaskRunner* TaskRunner::GetThreadLocalTaskRunner() {
void TaskRunner::PostTask(const Location& from, Closure task) { void TaskRunner::PostTask(const Location& from, Closure task) {
DCHECK(task) << LOCATION(from); DCHECK(task) << LOCATION(from);
task_count_.fetch_add(1, std::memory_order_relaxed); task_count_.fetch_add(1, std::memory_order_release);
std::lock_guard<std::mutex> scoped_lock(lock_); std::lock_guard<std::mutex> scoped_lock(lock_);
queue_.emplace_back(from, std::move(task)); queue_.emplace_back(from, std::move(task));
} }
@ -68,7 +70,7 @@ void TaskRunner::MultiConsumerRun() {
#endif #endif
task_cb(); task_cb();
task_count_.fetch_sub(1, std::memory_order_relaxed); task_count_.fetch_sub(1, std::memory_order_release);
} }
} }
@ -90,16 +92,13 @@ void TaskRunner::SingleConsumerRun() {
#endif #endif
task_cb(); task_cb();
task_count_.fetch_sub(1, std::memory_order_relaxed); task_count_.fetch_sub(1, std::memory_order_release);
} }
cv_.notify_one();
} }
void TaskRunner::WaitForCompletion() { void TaskRunner::WaitForCompletion() {
std::unique_lock<std::mutex> scoped_lock(lock_); while (task_count_.load(std::memory_order_acquire) > 0)
cv_.wait(scoped_lock, [&]() -> bool { std::this_thread::yield();
return task_count_.load(std::memory_order_relaxed) == 0;
});
} }
} // namespace base } // namespace base

View File

@ -2,7 +2,6 @@
#define BASE_TASK_RUNNER_H #define BASE_TASK_RUNNER_H
#include <atomic> #include <atomic>
#include <condition_variable>
#include <deque> #include <deque>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
@ -74,7 +73,6 @@ class TaskRunner {
std::deque<Task> queue_; std::deque<Task> queue_;
mutable std::mutex lock_; mutable std::mutex lock_;
std::condition_variable cv_;
std::atomic<size_t> task_count_{0}; std::atomic<size_t> task_count_{0};
static thread_local std::unique_ptr<TaskRunner> thread_local_task_runner; static thread_local std::unique_ptr<TaskRunner> thread_local_task_runner;

View File

@ -33,7 +33,7 @@ void ThreadPool::Shutdown() {
return; return;
quit_.store(true, std::memory_order_relaxed); quit_.store(true, std::memory_order_relaxed);
semaphore_.Release(); semaphore_.release(threads_.size());
for (auto& thread : threads_) for (auto& thread : threads_)
thread.join(); thread.join();
@ -44,7 +44,7 @@ void ThreadPool::PostTask(const Location& from, Closure task) {
DCHECK((!threads_.empty())); DCHECK((!threads_.empty()));
task_runner_.PostTask(from, std::move(task)); task_runner_.PostTask(from, std::move(task));
semaphore_.Release(); semaphore_.release();
} }
void ThreadPool::PostTaskAndReply(const Location& from, void ThreadPool::PostTaskAndReply(const Location& from,
@ -53,17 +53,15 @@ void ThreadPool::PostTaskAndReply(const Location& from,
DCHECK((!threads_.empty())); DCHECK((!threads_.empty()));
task_runner_.PostTaskAndReply(from, std::move(task), std::move(reply)); task_runner_.PostTaskAndReply(from, std::move(task), std::move(reply));
semaphore_.Release(); semaphore_.release();
} }
void ThreadPool::WorkerMain() { void ThreadPool::WorkerMain() {
for (;;) { for (;;) {
semaphore_.Acquire(); semaphore_.acquire();
if (quit_.load(std::memory_order_relaxed)) { if (quit_.load(std::memory_order_relaxed))
semaphore_.Release();
return; return;
}
task_runner_.MultiConsumerRun(); task_runner_.MultiConsumerRun();
} }

View File

@ -2,11 +2,11 @@
#define BASE_THREAD_POOL_H #define BASE_THREAD_POOL_H
#include <atomic> #include <atomic>
#include <semaphore>
#include <thread> #include <thread>
#include <vector> #include <vector>
#include "base/closure.h" #include "base/closure.h"
#include "base/semaphore.h"
#include "base/task_runner.h" #include "base/task_runner.h"
namespace base { namespace base {
@ -36,13 +36,13 @@ class ThreadPool {
std::function<void(ReturnType)> reply) { std::function<void(ReturnType)> reply) {
task_runner_.PostTaskAndReplyWithResult(from, std::move(task), task_runner_.PostTaskAndReplyWithResult(from, std::move(task),
std::move(reply)); std::move(reply));
semaphore_.Release(); semaphore_.release();
} }
private: private:
std::vector<std::thread> threads_; std::vector<std::thread> threads_;
Semaphore semaphore_; std::counting_semaphore<> semaphore_{0};
std::atomic<bool> quit_{false}; std::atomic<bool> quit_{false};
base::TaskRunner task_runner_; base::TaskRunner task_runner_;

View File

@ -184,7 +184,7 @@ void RendererOpenGL::SetUniform(uint64_t resource_id,
void RendererOpenGL::Present() { void RendererOpenGL::Present() {
EnqueueCommand(std::make_unique<CmdPresent>()); EnqueueCommand(std::make_unique<CmdPresent>());
#ifdef THREADED_RENDERING #ifdef THREADED_RENDERING
draw_complete_semaphore_.Acquire(); draw_complete_semaphore_.acquire();
#endif // THREADED_RENDERING #endif // THREADED_RENDERING
fps_++; fps_++;
} }

View File

@ -13,9 +13,8 @@
#include <deque> #include <deque>
#include <future> #include <future>
#include <mutex> #include <mutex>
#include <semaphore>
#include <thread> #include <thread>
#include "base/semaphore.h"
#endif // THREADED_RENDERING #endif // THREADED_RENDERING
#include "engine/renderer/opengl/opengl.h" #include "engine/renderer/opengl/opengl.h"
@ -147,7 +146,7 @@ class RendererOpenGL : public Renderer {
std::thread render_thread_; std::thread render_thread_;
bool terminate_render_thread_ = false; bool terminate_render_thread_ = false;
base::Semaphore draw_complete_semaphore_; std::counting_semaphore<> draw_complete_semaphore_{0};
base::TaskRunner* main_thread_task_runner_; base::TaskRunner* main_thread_task_runner_;
#endif // THREADED_RENDERING #endif // THREADED_RENDERING

View File

@ -58,7 +58,7 @@ void RendererOpenGL::HandleCmdPresent(RenderCommand* cmd) {
return; return;
} }
#ifdef THREADED_RENDERING #ifdef THREADED_RENDERING
draw_complete_semaphore_.Release(); draw_complete_semaphore_.release();
#endif // THREADED_RENDERING #endif // THREADED_RENDERING
glClear(GL_COLOR_BUFFER_BIT | GL_DEPTH_BUFFER_BIT); glClear(GL_COLOR_BUFFER_BIT | GL_DEPTH_BUFFER_BIT);
active_shader_id_ = 0; active_shader_id_ = 0;

View File

@ -55,7 +55,7 @@ void RendererOpenGL::HandleCmdPresent(RenderCommand* cmd) {
if (display_) { if (display_) {
glXSwapBuffers(display_, window_); glXSwapBuffers(display_, window_);
#ifdef THREADED_RENDERING #ifdef THREADED_RENDERING
draw_complete_semaphore_.Release(); draw_complete_semaphore_.release();
#endif // THREADED_RENDERING #endif // THREADED_RENDERING
glClear(GL_COLOR_BUFFER_BIT | GL_DEPTH_BUFFER_BIT); glClear(GL_COLOR_BUFFER_BIT | GL_DEPTH_BUFFER_BIT);
active_shader_id_ = 0; active_shader_id_ = 0;

View File

@ -295,7 +295,7 @@ uint64_t RendererVulkan::CreateGeometry(std::unique_ptr<Mesh> mesh) {
// Transfer mesh ownership to the background thread. // Transfer mesh ownership to the background thread.
std::unique_ptr<Mesh> own(mesh); std::unique_ptr<Mesh> own(mesh);
}); });
semaphore_.Release(); semaphore_.release();
return last_resource_id_; return last_resource_id_;
} }
@ -387,7 +387,7 @@ void RendererVulkan::UpdateTexture(uint64_t resource_id,
// Transfer image ownership to the background thread. // Transfer image ownership to the background thread.
std::unique_ptr<Image> own(image); std::unique_ptr<Image> own(image);
}); });
semaphore_.Release(); semaphore_.release();
} }
void RendererVulkan::DestroyTexture(uint64_t resource_id) { void RendererVulkan::DestroyTexture(uint64_t resource_id) {
@ -867,7 +867,7 @@ void RendererVulkan::Shutdown() {
DestroyAllResources(); DestroyAllResources();
quit_.store(true, std::memory_order_relaxed); quit_.store(true, std::memory_order_relaxed);
semaphore_.Release(); semaphore_.release();
setup_thread_.join(); setup_thread_.join();
vkDeviceWaitIdle(device_); vkDeviceWaitIdle(device_);
@ -1877,7 +1877,7 @@ void RendererVulkan::SetupThreadMain(int preallocate) {
} }
for (;;) { for (;;) {
semaphore_.Acquire(); semaphore_.acquire();
if (quit_.load(std::memory_order_relaxed)) if (quit_.load(std::memory_order_relaxed))
break; break;

View File

@ -3,6 +3,7 @@
#include <atomic> #include <atomic>
#include <memory> #include <memory>
#include <semaphore>
#include <string> #include <string>
#include <thread> #include <thread>
#include <tuple> #include <tuple>
@ -11,7 +12,6 @@
#include "engine/renderer/vulkan/vulkan_context.h" #include "engine/renderer/vulkan/vulkan_context.h"
#include "base/semaphore.h"
#include "base/task_runner.h" #include "base/task_runner.h"
#include "engine/renderer/renderer.h" #include "engine/renderer/renderer.h"
#include "third_party/vma/vk_mem_alloc.h" #include "third_party/vma/vk_mem_alloc.h"
@ -181,7 +181,7 @@ class RendererVulkan : public Renderer {
std::thread setup_thread_; std::thread setup_thread_;
base::TaskRunner task_runner_; base::TaskRunner task_runner_;
base::Semaphore semaphore_; std::counting_semaphore<> semaphore_{0};
std::atomic<bool> quit_{false}; std::atomic<bool> quit_{false};
bool InitializeInternal(); bool InitializeInternal();