Internals: V3LockGuard: Add constructor for adopting already locked mutex. (#4476)

This commit is contained in:
Kamil Rakoczy 2023-09-13 19:52:59 +02:00 committed by GitHub
parent 823e0723fb
commit 9fe459c820
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 125 additions and 23 deletions

View File

@ -531,7 +531,7 @@ public:
// Global versions, so that if the class doesn't define an operator, we get the functions anyway.
void v3errorEnd(std::ostringstream& sstr) VL_RELEASE(V3Error::s().m_mutex);
void v3errorEndFatal(std::ostringstream& sstr) VL_RELEASE(V3Error::s().m_mutex);
void v3errorEndFatal(std::ostringstream& sstr) VL_RELEASE(V3Error::s().m_mutex) VL_ATTR_NORETURN;
// Theses allow errors using << operators: v3error("foo"<<"bar");
// Careful, you can't put () around msg, as you would in most macro definitions.

View File

@ -139,12 +139,17 @@ private:
T& m_mutexr;
public:
/// Construct and hold given mutex lock until destruction or unlock()
/// Lock given mutex and hold it for the object lifetime.
explicit V3LockGuardImp(T& mutexr) VL_ACQUIRE(mutexr) VL_MT_SAFE
: m_mutexr(mutexr) { // Need () or GCC 4.8 false warning
mutexr.lock();
}
/// Destruct and unlock the mutex
/// Take already locked mutex, and and hold the lock for the object lifetime.
explicit V3LockGuardImp(T& mutexr, std::adopt_lock_t) VL_REQUIRES(mutexr) VL_MT_SAFE
: m_mutexr(mutexr) { // Need () or GCC 4.8 false warning
}
/// Unlock the mutex
~V3LockGuardImp() VL_RELEASE() { m_mutexr.unlock(); }
};

View File

@ -24,13 +24,16 @@
constexpr unsigned int V3ThreadPool::FUTUREWAITFOR_MS;
void V3ThreadPool::resize(unsigned n) VL_MT_UNSAFE VL_EXCLUDES(m_mutex)
VL_EXCLUDES(m_stoppedJobsMutex) {
VL_EXCLUDES(m_stoppedJobsMutex) VL_EXCLUDES(V3MtDisabledLock::instance()) {
// At least one thread (main)
n = std::max(1u, n);
if (n == (m_workers.size() + 1)) { return; }
// This function is not thread-safe and can result in race between threads
UASSERT(V3MutexConfig::s().lockConfig(),
"Mutex config needs to be locked before starting ThreadPool");
{
V3LockGuard lock{m_mutex};
V3LockGuard stoppedJobsLock{m_stoppedJobsMutex};
V3LockGuard lock{m_mutex};
UASSERT(m_queue.empty(), "Resizing busy thread pool");
// Shut down old threads
@ -38,6 +41,7 @@ void V3ThreadPool::resize(unsigned n) VL_MT_UNSAFE VL_EXCLUDES(m_mutex)
m_stoppedJobs = 0;
m_cv.notify_all();
m_stoppedJobsCV.notify_all();
m_exclusiveAccessThreadCV.notify_all();
}
while (!m_workers.empty()) {
m_workers.front().join();
@ -53,6 +57,37 @@ void V3ThreadPool::resize(unsigned n) VL_MT_UNSAFE VL_EXCLUDES(m_mutex)
}
}
void V3ThreadPool::suspendMultithreading() VL_MT_SAFE VL_EXCLUDES(m_mutex)
VL_EXCLUDES(m_stoppedJobsMutex) {
V3LockGuard stoppedJobsLock{m_stoppedJobsMutex};
if (!m_workers.empty()) { stopOtherThreads(); }
if (!m_mutex.try_lock()) {
v3fatal("Tried to suspend thread pool when other thread uses it.");
}
V3LockGuard lock{m_mutex, std::adopt_lock_t{}};
UASSERT(m_queue.empty(), "Thread pool has pending jobs");
UASSERT(m_jobsInProgress == 0, "Thread pool has jobs in progress");
m_exclusiveAccess = true;
m_multithreadingSuspended = true;
}
void V3ThreadPool::resumeMultithreading() VL_MT_SAFE VL_EXCLUDES(m_mutex)
VL_EXCLUDES(m_stoppedJobsMutex) {
if (!m_mutex.try_lock()) { v3fatal("Tried to resume thread pool when other thread uses it."); }
{
V3LockGuard lock{m_mutex, std::adopt_lock_t{}};
UASSERT(m_multithreadingSuspended, "Multithreading is not suspended");
m_multithreadingSuspended = false;
m_exclusiveAccess = false;
}
if (!m_workers.empty()) {
V3LockGuard stoppedJobsLock{m_stoppedJobsMutex};
resumeOtherThreads();
}
}
void V3ThreadPool::startWorker(V3ThreadPool* selfThreadp, int id) VL_MT_SAFE {
selfThreadp->workerJobLoop(id);
}
@ -74,10 +109,14 @@ void V3ThreadPool::workerJobLoop(int id) VL_MT_SAFE {
job = std::move(m_queue.front());
m_queue.pop();
++m_jobsInProgress;
}
// Execute the job
job();
// Note that a context switch can happen here. This means `m_jobsInProgress` could still
// contain old value even after the job promise has been fulfilled.
--m_jobsInProgress;
}
}
@ -90,25 +129,22 @@ bool V3ThreadPool::waitIfStopRequested() VL_MT_SAFE VL_EXCLUDES(m_stoppedJobsMut
void V3ThreadPool::waitForResumeRequest() VL_REQUIRES(m_stoppedJobsMutex) {
++m_stoppedJobs;
m_stoppedJobsCV.notify_all();
m_stoppedJobsCV.wait(m_stoppedJobsMutex, [&]() VL_REQUIRES(m_stoppedJobsMutex) {
return !m_stopRequested.load();
});
m_exclusiveAccessThreadCV.notify_one();
m_stoppedJobsCV.wait(m_stoppedJobsMutex,
[&]() VL_REQUIRES(m_stoppedJobsMutex) { return !m_stopRequested; });
--m_stoppedJobs;
m_stoppedJobsCV.notify_all();
}
void V3ThreadPool::stopOtherThreads() VL_MT_SAFE_EXCLUDES(m_mutex)
VL_REQUIRES(m_stoppedJobsMutex) {
m_stopRequested = true;
++m_stoppedJobs;
m_stoppedJobsCV.notify_all();
m_cv.notify_all();
m_stoppedJobsCV.wait(m_stoppedJobsMutex, [&]() VL_REQUIRES(m_stoppedJobsMutex) {
// count also the main thread
return m_stoppedJobs == (m_workers.size() + 1);
{
V3LockGuard lock{m_mutex};
m_cv.notify_all();
}
m_exclusiveAccessThreadCV.wait(m_stoppedJobsMutex, [&]() VL_REQUIRES(m_stoppedJobsMutex) {
return m_stoppedJobs == m_workers.size();
});
--m_stoppedJobs;
}
void V3ThreadPool::selfTestMtDisabled() {
@ -171,7 +207,25 @@ void V3ThreadPool::selfTest() {
{
const V3MtDisabledLockGuard mtDisabler{v3MtDisabledLock()};
selfTestMtDisabled();
{
V3LockGuard lock{V3ThreadPool::s().m_mutex};
UASSERT(V3ThreadPool::s().m_multithreadingSuspended,
"Multithreading should be suspended at this point");
}
}
{
V3LockGuard lock{V3ThreadPool::s().m_mutex};
UASSERT(!V3ThreadPool::s().m_multithreadingSuspended,
"Multithreading should not be suspended at this point");
}
}
V3MtDisabledLock V3MtDisabledLock::s_mtDisabledLock;
void V3MtDisabledLock::lock() VL_ACQUIRE() VL_MT_SAFE {
V3ThreadPool::s().suspendMultithreading();
}
void V3MtDisabledLock::unlock() VL_RELEASE() VL_MT_SAFE {
V3ThreadPool::s().resumeMultithreading();
}

View File

@ -81,7 +81,11 @@ class V3ThreadPool final {
// MEMBERS
static constexpr unsigned int FUTUREWAITFOR_MS = 100;
// some functions locks both of this mutexes, be careful of lock inversion problems
// 'm_stoppedJobsMutex' mutex should always be locked before 'm_mutex' mutex
// check usage of both of them when you use either of them
V3Mutex m_mutex; // Mutex for use by m_queue
V3Mutex m_stoppedJobsMutex; // Used to signal stopped jobs
std::queue<VAnyPackagedTask> m_queue VL_GUARDED_BY(m_mutex); // Queue of jobs
// We don't need to guard this condition_variable as
// both `notify_one` and `notify_all` functions are atomic,
@ -89,20 +93,36 @@ class V3ThreadPool final {
// used by this condition_variable, so clang checks that we have mutex locked
std::condition_variable_any m_cv; // Conditions to wake up workers
std::list<std::thread> m_workers; // Worker threads
V3Mutex m_stoppedJobsMutex; // Used to signal stopped jobs
// Number of started and not yet finished jobs.
// Reading is valid only after call to `stopOtherThreads()` or when no worker threads exist.
std::atomic_uint m_jobsInProgress{0};
// Conditions to wake up stopped jobs
std::condition_variable_any m_stoppedJobsCV VL_GUARDED_BY(m_stoppedJobsMutex);
// Conditions to wake up exclusive access thread
std::condition_variable_any m_exclusiveAccessThreadCV VL_GUARDED_BY(m_stoppedJobsMutex);
std::atomic_uint m_stoppedJobs{0}; // Currently stopped jobs waiting for wake up
std::atomic_bool m_stopRequested{false}; // Signals to resume stopped jobs
std::atomic_bool m_exclusiveAccess{false}; // Signals that all other threads are stopped
std::atomic_bool m_shutdown{false}; // Termination pending
// Indicates whether multithreading has been suspended.
// Used for error detection in resumeMultithreading only. You probably should use
// m_exclusiveAccess for information whether something should be run in current thread.
bool m_multithreadingSuspended VL_GUARDED_BY(m_mutex) = false;
// CONSTRUCTORS
V3ThreadPool() = default;
~V3ThreadPool() {
{
V3LockGuard lock{m_mutex};
if (!m_mutex.try_lock()) {
if (m_jobsInProgress != 0) {
// ThreadPool shouldn't be destroyed when jobs are running and mutex is locked,
// something is wrong. Most likely Verilator is exitting as a result of failed
// assert in critical section. Do nothing, let it exit.
return;
}
} else {
V3LockGuard lock{m_mutex, std::adopt_lock_t{}};
m_queue = {}; // make sure queue is empty
}
resize(0);
@ -120,7 +140,8 @@ public:
}
// Resize thread pool to n workers (queue must be empty)
void resize(unsigned n) VL_MT_UNSAFE VL_EXCLUDES(m_mutex) VL_EXCLUDES(m_stoppedJobsMutex);
void resize(unsigned n) VL_MT_UNSAFE VL_EXCLUDES(m_mutex) VL_EXCLUDES(m_stoppedJobsMutex)
VL_EXCLUDES(V3MtDisabledLock::instance());
// Enqueue a job for asynchronous execution
// Due to missing support for lambda annotations in c++11,
@ -171,6 +192,24 @@ public:
static void selfTestMtDisabled() VL_MT_DISABLED;
private:
// For access to suspendMultithreading() and resumeMultithreading()
friend class V3MtDisabledLock;
// Temporarily suspends multithreading.
//
// Existing worker threads are not terminated. All jobs enqueued when multithreading is
// suspended are executed synchronously.
// Must be called from the main thread. Jobs queue must be empty. Existing worker threads must
// be idle.
//
// Only V3MtDisabledLock class is supposed to use this function.
void suspendMultithreading() VL_MT_SAFE VL_EXCLUDES(m_mutex) VL_EXCLUDES(m_stoppedJobsMutex);
// Resumes multithreading suspended previously by call tosuspendMultithreading().
//
// Only V3MtDisabledLock class is supposed to use this function.
void resumeMultithreading() VL_MT_SAFE VL_EXCLUDES(m_mutex) VL_EXCLUDES(m_stoppedJobsMutex);
template <typename T>
static std::list<T> waitForFuturesImp(std::list<std::future<T>>& futures) {
std::list<T> results;
@ -235,6 +274,8 @@ public:
V3ThreadPool::s().resumeOtherThreads();
V3ThreadPool::s().m_stoppedJobsMutex.unlock();
// wait for all threads to resume
while (V3ThreadPool::s().m_stoppedJobs != 0) {}
} else {
V3ThreadPool::s().m_stoppedJobsMutex.pretendUnlock();
}

View File

@ -36,8 +36,10 @@ class VL_CAPABILITY("lock") V3MtDisabledLock final {
VL_UNMOVABLE(V3MtDisabledLock);
public:
constexpr void lock() VL_ACQUIRE() VL_MT_SAFE {}
constexpr void unlock() VL_RELEASE() VL_MT_SAFE {}
// lock() will disable multithreading while in MT Disabled regions
void lock() VL_ACQUIRE() VL_MT_SAFE;
// unlock() will reenable multithreading while in MT Disabled regions
void unlock() VL_RELEASE() VL_MT_SAFE;
static constexpr V3MtDisabledLock& instance()
VL_RETURN_CAPABILITY(V3MtDisabledLock::s_mtDisabledLock) {