OpenSTA/include/sta/DispatchQueue.hh

99 lines
2.4 KiB
C++
Raw Normal View History

// Original Author: Phillip Johnston
2019-11-11 16:48:27 +01:00
// Licensed under CC0 1.0 Universal
// Original source: https://github.com/embeddedartistry/embedded-resources/blob/master/examples/cpp/dispatch.cpp
// Original article: https://embeddedartistry.com/blog/2017/2/1/dispatch-queues?rq=dispatch
//
// Modified for OpenSTA to use C++20 non-spinning DynamicLatch for synchronization.
2019-11-11 16:48:27 +01:00
2020-02-16 01:13:16 +01:00
#pragma once
2019-11-11 16:48:27 +01:00
#include <atomic>
#include <cstddef>
#include <condition_variable>
2019-11-11 16:48:27 +01:00
#include <cstdint>
#include <functional>
2019-11-11 16:48:27 +01:00
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
2019-11-11 16:48:27 +01:00
namespace sta {
class DynamicLatch
{
public:
explicit DynamicLatch(std::ptrdiff_t initial_count = 0) :
count_(initial_count)
{
}
// Delete copy/move constructors to prevent accidental slicing/copying of atomics
DynamicLatch(const DynamicLatch&) = delete;
DynamicLatch& operator=(const DynamicLatch&) = delete;
// Increases the latch count (used when a new task is dispatched)
void
countUp()
{
count_.fetch_add(1, std::memory_order_release);
}
// Decreases the latch count and wakes waiting threads if it hits zero
void
countDown(std::ptrdiff_t n = 1)
{
if (count_.fetch_sub(n, std::memory_order_release) == n) {
count_.notify_all();
}
}
// Blocks until the count reaches zero
void
wait() const
{
std::ptrdiff_t current = count_.load(std::memory_order_acquire);
while (current != 0) {
count_.wait(current, std::memory_order_acquire);
current = count_.load(std::memory_order_acquire);
}
}
private:
mutable std::atomic<std::ptrdiff_t> count_{0};
};
2019-11-11 16:48:27 +01:00
class DispatchQueue
{
2026-04-13 23:58:16 +02:00
using fp_t = std::function<void(int thread)>;
2019-11-11 16:48:27 +01:00
public:
2026-04-13 23:58:16 +02:00
DispatchQueue(size_t thread_count);
2019-11-11 16:48:27 +01:00
~DispatchQueue();
void setThreadCount(size_t thread_count);
size_t getThreadCount() const;
2019-11-11 16:48:27 +01:00
// Dispatch and copy.
void dispatch(const fp_t& op);
// Dispatch and move.
void dispatch(fp_t&& op);
void finishTasks();
// Deleted operations
DispatchQueue(const DispatchQueue& rhs) = delete;
DispatchQueue& operator=(const DispatchQueue& rhs) = delete;
DispatchQueue(DispatchQueue&& rhs) = delete;
DispatchQueue& operator=(DispatchQueue&& rhs) = delete;
private:
void dispatch_thread_handler(size_t i);
void terminateThreads();
std::mutex lock_;
std::vector<std::thread> threads_;
std::queue<fp_t> q_;
std::condition_variable cv_;
DynamicLatch pending_task_count_latch_;
2019-11-11 16:48:27 +01:00
bool quit_ = false;
};
2026-04-13 23:58:16 +02:00
} // namespace sta