Internals: Use `std::packaged_task` as a job wrapper; add and use `V3ThreadPool::ScopedExclusiveAccess` (#4310).

* Add VL_ASSERT_CAPABILITY; add assumeLocked and pretendUnlock to V3Mutex.

* Pass jobs as template-arguments and use std::packaged_task.

* Add and use V3ThreadPool::ScopedExclusiveAccess.
This commit is contained in:
Mariusz Glebocki 2023-06-24 00:25:12 +02:00 committed by GitHub
parent 7005a65d32
commit 85b7f828b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 137 additions and 97 deletions

View File

@ -119,6 +119,10 @@
// Allowed on: function, method. (-fthread-safety)
#define VL_RETURN_CAPABILITY(x) \
VL_CLANG_ATTR(lock_returned(x))
// Assert that capability is already held.
// Allowed on: function, method. (-fthread-safety)
#define VL_ASSERT_CAPABILITY(x) \
VL_CLANG_ATTR(assert_capability(x))
// Defaults for unsupported compiler features
#ifndef VL_ATTR_ALWINLINE

View File

@ -932,11 +932,11 @@ void V3EmitC::emitcImp() {
const AstNodeModule* const modp = VN_AS(nodep, NodeModule);
cfiles.emplace_back();
auto& slowCfilesr = cfiles.back();
futures.push_back(V3ThreadPool::s().enqueue<void>(
futures.push_back(V3ThreadPool::s().enqueue(
[modp, &slowCfilesr]() { EmitCImp::main(modp, /* slow: */ true, slowCfilesr); }));
cfiles.emplace_back();
auto& fastCfilesr = cfiles.back();
futures.push_back(V3ThreadPool::s().enqueue<void>(
futures.push_back(V3ThreadPool::s().enqueue(
[modp, &fastCfilesr]() { EmitCImp::main(modp, /* slow: */ false, fastCfilesr); }));
}
@ -944,12 +944,12 @@ void V3EmitC::emitcImp() {
if (v3Global.opt.trace() && !v3Global.opt.lintOnly()) {
cfiles.emplace_back();
auto& slowCfilesr = cfiles.back();
futures.push_back(V3ThreadPool::s().enqueue<void>([&slowCfilesr]() {
futures.push_back(V3ThreadPool::s().enqueue([&slowCfilesr]() {
EmitCTrace::main(v3Global.rootp()->topModulep(), /* slow: */ true, slowCfilesr);
}));
cfiles.emplace_back();
auto& fastCfilesr = cfiles.back();
futures.push_back(V3ThreadPool::s().enqueue<void>([&fastCfilesr]() {
futures.push_back(V3ThreadPool::s().enqueue([&fastCfilesr]() {
EmitCTrace::main(v3Global.rootp()->topModulep(), /* slow: */ false, fastCfilesr);
}));
}

View File

@ -222,20 +222,18 @@ void V3ErrorGuarded::v3errorEnd(std::ostringstream& sstr, const string& extra)
#ifndef V3ERROR_NO_GLOBAL_
if (dumpTreeLevel() || debug()) {
V3Broken::allowMidvisitorCheck(true);
V3ThreadPool::s().requestExclusiveAccess([&]() VL_REQUIRES(m_mutex) {
if (dumpTreeLevel()) {
v3Global.rootp()->dumpTreeFile(
v3Global.debugFilename("final.tree", 990));
}
if (debug()) {
execErrorExitCb();
V3Stats::statsFinalAll(v3Global.rootp());
V3Stats::statsReport();
}
// Abort in exclusive access to make sure other threads
// don't change error code
vlAbortOrExit();
});
const V3ThreadPool::ScopedExclusiveAccess exclusiveAccess;
if (dumpTreeLevel()) {
v3Global.rootp()->dumpTreeFile(v3Global.debugFilename("final.tree", 990));
}
if (debug()) {
execErrorExitCb();
V3Stats::statsFinalAll(v3Global.rootp());
V3Stats::statsReport();
}
// Abort in exclusive access to make sure other threads
// don't change error code
vlAbortOrExit();
}
#endif
}

View File

@ -109,6 +109,10 @@ public:
bool try_lock() VL_TRY_ACQUIRE(true) VL_MT_SAFE {
return V3MutexConfig::s().enable() ? m_mutex.try_lock() : true;
}
/// Assume that the mutex is already held. Purely for Clang thread safety analyzer.
void assumeLocked() VL_ASSERT_CAPABILITY(this) VL_MT_SAFE {}
/// Pretend that the mutex is being unlocked. Purely for Clang thread safety analyzer.
void pretendUnlock() VL_RELEASE() VL_MT_SAFE {}
/// Acquire/lock mutex and check for stop request
/// It tries to lock the mutex and if it fails, it check if stop request was send.
/// It returns after locking mutex.

View File

@ -61,7 +61,7 @@ void V3ThreadPool::workerJobLoop(int id) VL_MT_SAFE {
while (true) {
// Wait for a notification
waitIfStopRequested();
job_t job;
VAnyPackagedTask job;
{
V3LockGuard lock(m_mutex);
m_cv.wait(m_mutex, [&]() VL_REQUIRES(m_mutex) {
@ -72,7 +72,7 @@ void V3ThreadPool::workerJobLoop(int id) VL_MT_SAFE {
// Get the job
UASSERT(!m_queue.empty(), "Job should be available");
job = m_queue.front();
job = std::move(m_queue.front());
m_queue.pop();
}
@ -81,40 +81,6 @@ void V3ThreadPool::workerJobLoop(int id) VL_MT_SAFE {
}
}
template <>
void V3ThreadPool::pushJob<void>(std::shared_ptr<std::promise<void>>& prom,
std::function<void()>&& f) VL_MT_SAFE {
if (willExecuteSynchronously()) {
f();
prom->set_value();
} else {
const V3LockGuard lock{m_mutex};
m_queue.push([prom, f] {
f();
prom->set_value();
});
}
}
void V3ThreadPool::requestExclusiveAccess(const V3ThreadPool::job_t&& exclusiveAccessJob)
VL_MT_SAFE {
if (willExecuteSynchronously()) {
exclusiveAccessJob();
} else {
V3LockGuard stoppedJobLock{m_stoppedJobsMutex};
// if some other job already requested exclusive access
// wait until it stops
if (stopRequested()) { waitStopRequested(); }
m_stopRequested = true;
waitOtherThreads();
m_exclusiveAccess = true;
exclusiveAccessJob();
m_exclusiveAccess = false;
m_stopRequested = false;
m_stoppedJobsCV.notify_all();
}
}
bool V3ThreadPool::waitIfStopRequested() VL_MT_SAFE VL_EXCLUDES(m_stoppedJobsMutex) {
if (!stopRequested()) return false;
V3LockGuard stoppedJobLock(m_stoppedJobsMutex);
@ -150,11 +116,10 @@ void V3ThreadPool::selfTest() {
auto firstJob = [&](int sleep) -> void {
std::this_thread::sleep_for(std::chrono::milliseconds{sleep});
s().requestExclusiveAccess([&]() {
commonValue = 10;
std::this_thread::sleep_for(std::chrono::milliseconds{sleep + 10});
UASSERT(commonValue == 10, "unexpected commonValue = " << commonValue);
});
const V3ThreadPool::ScopedExclusiveAccess exclusiveAccess;
commonValue = 10;
std::this_thread::sleep_for(std::chrono::milliseconds{sleep + 10});
UASSERT(commonValue == 10, "unexpected commonValue = " << commonValue);
};
auto secondJob = [&](int sleep) -> void {
commonMutex.lock();
@ -175,19 +140,19 @@ void V3ThreadPool::selfTest() {
};
std::list<std::future<void>> futures;
futures.push_back(s().enqueue<void>(std::bind(firstJob, 100)));
futures.push_back(s().enqueue<void>(std::bind(secondJob, 100)));
futures.push_back(s().enqueue<void>(std::bind(firstJob, 100)));
futures.push_back(s().enqueue<void>(std::bind(secondJob, 100)));
futures.push_back(s().enqueue<void>(std::bind(secondJob, 200)));
futures.push_back(s().enqueue<void>(std::bind(firstJob, 200)));
futures.push_back(s().enqueue<void>(std::bind(firstJob, 300)));
futures.push_back(s().enqueue(std::bind(firstJob, 100)));
futures.push_back(s().enqueue(std::bind(secondJob, 100)));
futures.push_back(s().enqueue(std::bind(firstJob, 100)));
futures.push_back(s().enqueue(std::bind(secondJob, 100)));
futures.push_back(s().enqueue(std::bind(secondJob, 200)));
futures.push_back(s().enqueue(std::bind(firstJob, 200)));
futures.push_back(s().enqueue(std::bind(firstJob, 300)));
while (!futures.empty()) {
s().waitForFuture(futures.front());
futures.pop_front();
}
futures.push_back(s().enqueue<void>(std::bind(thirdJob, 100)));
futures.push_back(s().enqueue<void>(std::bind(thirdJob, 100)));
futures.push_back(s().enqueue(std::bind(thirdJob, 100)));
futures.push_back(s().enqueue(std::bind(thirdJob, 100)));
V3ThreadPool::waitForFutures(futures);
s().waitIfStopRequested();
@ -195,7 +160,7 @@ void V3ThreadPool::selfTest() {
auto forthJob = [&]() -> int { return 1234; };
std::list<std::future<int>> futuresInt;
futuresInt.push_back(s().enqueue<int>(forthJob));
futuresInt.push_back(s().enqueue(forthJob));
auto result = V3ThreadPool::waitForFutures(futuresInt);
UASSERT(result.back() == 1234, "unexpected future result = " << result.back());
}

View File

@ -29,13 +29,53 @@
//============================================================================
// Callable, type-erased wrapper for std::packaged_task<Signature> with any Signature.
class VAnyPackagedTask final {
// TYPES
struct PTWrapperBase {
virtual ~PTWrapperBase() {}
virtual void operator()() = 0;
};
template <typename Signature>
struct PTWrapper final : PTWrapperBase {
std::packaged_task<Signature> m_pt;
PTWrapper(std::packaged_task<Signature>&& pt)
: m_pt(std::move(pt)) {}
void operator()() final override { m_pt(); }
};
// MEMBERS
std::unique_ptr<PTWrapperBase> m_ptWrapperp = nullptr; // Wrapper to call
public:
// CONSTRUCTORS
template <typename Signature>
VAnyPackagedTask(std::packaged_task<Signature>&& pt)
: m_ptWrapperp{std::make_unique<PTWrapper<Signature>>(std::move(pt))} {}
VAnyPackagedTask() = default;
~VAnyPackagedTask() = default;
VAnyPackagedTask(const VAnyPackagedTask&) = delete;
VAnyPackagedTask& operator=(const VAnyPackagedTask&) = delete;
VAnyPackagedTask(VAnyPackagedTask&&) = default;
VAnyPackagedTask& operator=(VAnyPackagedTask&&) = default;
// METHODS
// Call the wrapped function
void operator()() { (*m_ptWrapperp)(); }
};
class V3ThreadPool final {
// MEMBERS
static constexpr unsigned int FUTUREWAITFOR_MS = 100;
using job_t = std::function<void()>;
mutable V3Mutex m_mutex; // Mutex for use by m_queue
std::queue<job_t> m_queue VL_GUARDED_BY(m_mutex); // Queue of jobs
V3Mutex m_mutex; // Mutex for use by m_queue
std::queue<VAnyPackagedTask> m_queue VL_GUARDED_BY(m_mutex); // Queue of jobs
// We don't need to guard this condition_variable as
// both `notify_one` and `notify_all` functions are atomic,
// `wait` function is not atomic, but we are guarding `m_queue` that is
@ -62,6 +102,9 @@ class V3ThreadPool final {
}
public:
// Request exclusive access to processing for the object lifetime.
class ScopedExclusiveAccess;
// METHODS
// Singleton
static V3ThreadPool& s() VL_MT_SAFE {
@ -79,8 +122,8 @@ public:
// will call it. `VL_MT_START` here indicates that
// every function call inside this `std::function` requires
// annotations.
template <typename T>
std::future<T> enqueue(std::function<T()>&& f) VL_MT_START;
template <typename Callable>
auto enqueue(Callable&& f) VL_MT_START;
// Request exclusive access to processing.
// It sends request to stop all other threads and waits for them to stop.
@ -88,7 +131,9 @@ public:
// they can be stopped.
// When all other threads are stopped, this function executes the job
// and resumes execution of other jobs.
void requestExclusiveAccess(const job_t&& exclusiveAccessJob) VL_MT_SAFE;
template <typename Callable>
void requestExclusiveAccess(Callable&& exclusiveAccessJob) VL_MT_SAFE
VL_EXCLUDES(m_stoppedJobsMutex);
// Check if other thread requested exclusive access to processing,
// if so, it waits for it to complete. Afterwards it is resumed.
@ -151,14 +196,39 @@ private:
// Waits until all other jobs are stopped
void waitOtherThreads() VL_MT_SAFE_EXCLUDES(m_mutex) VL_REQUIRES(m_stoppedJobsMutex);
template <typename T>
void pushJob(std::shared_ptr<std::promise<T>>& prom, std::function<T()>&& f) VL_MT_SAFE;
void workerJobLoop(int id) VL_MT_SAFE;
static void startWorker(V3ThreadPool* selfThreadp, int id) VL_MT_SAFE;
};
class VL_SCOPED_CAPABILITY V3ThreadPool::ScopedExclusiveAccess final {
public:
ScopedExclusiveAccess() VL_ACQUIRE(V3ThreadPool::s().m_stoppedJobsMutex) VL_MT_SAFE {
if (!V3ThreadPool::s().willExecuteSynchronously()) {
V3ThreadPool::s().m_stoppedJobsMutex.lock();
if (V3ThreadPool::s().stopRequested()) { V3ThreadPool::s().waitStopRequested(); }
V3ThreadPool::s().m_stopRequested = true;
V3ThreadPool::s().waitOtherThreads();
V3ThreadPool::s().m_exclusiveAccess = true;
} else {
V3ThreadPool::s().m_stoppedJobsMutex.assumeLocked();
}
}
~ScopedExclusiveAccess() VL_RELEASE(V3ThreadPool::s().m_stoppedJobsMutex) VL_MT_SAFE {
// Can't use `willExecuteSynchronously`, we're still in exclusive execution state.
if (V3ThreadPool::s().m_exclusiveAccess) {
V3ThreadPool::s().m_exclusiveAccess = false;
V3ThreadPool::s().m_stopRequested = false;
V3ThreadPool::s().m_stoppedJobsCV.notify_all();
V3ThreadPool::s().m_stoppedJobsMutex.unlock();
} else {
V3ThreadPool::s().m_stoppedJobsMutex.pretendUnlock();
}
}
};
template <typename T>
T V3ThreadPool::waitForFuture(std::future<T>& future) VL_MT_SAFE_EXCLUDES(m_mutex) {
while (true) {
@ -175,29 +245,28 @@ T V3ThreadPool::waitForFuture(std::future<T>& future) VL_MT_SAFE_EXCLUDES(m_mute
}
}
template <typename T>
std::future<T> V3ThreadPool::enqueue(std::function<T()>&& f) VL_MT_START {
std::shared_ptr<std::promise<T>> prom = std::make_shared<std::promise<T>>();
std::future<T> result = prom->get_future();
pushJob(prom, std::move(f));
const V3LockGuard guard{m_mutex};
m_cv.notify_one();
return result;
}
template <typename T>
void V3ThreadPool::pushJob(std::shared_ptr<std::promise<T>>& prom,
std::function<T()>&& f) VL_MT_SAFE {
template <typename Callable>
auto V3ThreadPool::enqueue(Callable&& f) VL_MT_START {
using result_t = decltype(f());
auto&& job = std::packaged_task<result_t()>{std::forward<Callable>(f)};
auto future = job.get_future();
if (willExecuteSynchronously()) {
prom->set_value(f());
job();
} else {
const V3LockGuard guard{m_mutex};
m_queue.push([prom, f] { prom->set_value(f()); });
{
const V3LockGuard guard{m_mutex};
m_queue.push(std::move(job));
}
m_cv.notify_one();
}
return future;
}
template <>
void V3ThreadPool::pushJob<void>(std::shared_ptr<std::promise<void>>& prom,
std::function<void()>&& f) VL_MT_SAFE;
template <typename Callable>
void V3ThreadPool::requestExclusiveAccess(Callable&& exclusiveAccessJob) VL_MT_SAFE
VL_EXCLUDES(m_stoppedJobsMutex) {
ScopedExclusiveAccess exclusive_access;
exclusiveAccessJob();
}
#endif // Guard