diff --git a/include/verilatedos.h b/include/verilatedos.h index fdcbe64cd..113d8cd2d 100644 --- a/include/verilatedos.h +++ b/include/verilatedos.h @@ -119,6 +119,10 @@ // Allowed on: function, method. (-fthread-safety) #define VL_RETURN_CAPABILITY(x) \ VL_CLANG_ATTR(lock_returned(x)) +// Assert that capability is already held. +// Allowed on: function, method. (-fthread-safety) +#define VL_ASSERT_CAPABILITY(x) \ + VL_CLANG_ATTR(assert_capability(x)) // Defaults for unsupported compiler features #ifndef VL_ATTR_ALWINLINE diff --git a/src/V3EmitCImp.cpp b/src/V3EmitCImp.cpp index f47b5bcf9..571388b44 100644 --- a/src/V3EmitCImp.cpp +++ b/src/V3EmitCImp.cpp @@ -932,11 +932,11 @@ void V3EmitC::emitcImp() { const AstNodeModule* const modp = VN_AS(nodep, NodeModule); cfiles.emplace_back(); auto& slowCfilesr = cfiles.back(); - futures.push_back(V3ThreadPool::s().enqueue( + futures.push_back(V3ThreadPool::s().enqueue( [modp, &slowCfilesr]() { EmitCImp::main(modp, /* slow: */ true, slowCfilesr); })); cfiles.emplace_back(); auto& fastCfilesr = cfiles.back(); - futures.push_back(V3ThreadPool::s().enqueue( + futures.push_back(V3ThreadPool::s().enqueue( [modp, &fastCfilesr]() { EmitCImp::main(modp, /* slow: */ false, fastCfilesr); })); } @@ -944,12 +944,12 @@ void V3EmitC::emitcImp() { if (v3Global.opt.trace() && !v3Global.opt.lintOnly()) { cfiles.emplace_back(); auto& slowCfilesr = cfiles.back(); - futures.push_back(V3ThreadPool::s().enqueue([&slowCfilesr]() { + futures.push_back(V3ThreadPool::s().enqueue([&slowCfilesr]() { EmitCTrace::main(v3Global.rootp()->topModulep(), /* slow: */ true, slowCfilesr); })); cfiles.emplace_back(); auto& fastCfilesr = cfiles.back(); - futures.push_back(V3ThreadPool::s().enqueue([&fastCfilesr]() { + futures.push_back(V3ThreadPool::s().enqueue([&fastCfilesr]() { EmitCTrace::main(v3Global.rootp()->topModulep(), /* slow: */ false, fastCfilesr); })); } diff --git a/src/V3Error.cpp b/src/V3Error.cpp index a0fa7a3d3..57ec24e91 100644 --- a/src/V3Error.cpp +++ b/src/V3Error.cpp @@ -222,20 +222,18 @@ void V3ErrorGuarded::v3errorEnd(std::ostringstream& sstr, const string& extra) #ifndef V3ERROR_NO_GLOBAL_ if (dumpTreeLevel() || debug()) { V3Broken::allowMidvisitorCheck(true); - V3ThreadPool::s().requestExclusiveAccess([&]() VL_REQUIRES(m_mutex) { - if (dumpTreeLevel()) { - v3Global.rootp()->dumpTreeFile( - v3Global.debugFilename("final.tree", 990)); - } - if (debug()) { - execErrorExitCb(); - V3Stats::statsFinalAll(v3Global.rootp()); - V3Stats::statsReport(); - } - // Abort in exclusive access to make sure other threads - // don't change error code - vlAbortOrExit(); - }); + const V3ThreadPool::ScopedExclusiveAccess exclusiveAccess; + if (dumpTreeLevel()) { + v3Global.rootp()->dumpTreeFile(v3Global.debugFilename("final.tree", 990)); + } + if (debug()) { + execErrorExitCb(); + V3Stats::statsFinalAll(v3Global.rootp()); + V3Stats::statsReport(); + } + // Abort in exclusive access to make sure other threads + // don't change error code + vlAbortOrExit(); } #endif } diff --git a/src/V3Mutex.h b/src/V3Mutex.h index c5f3883a7..7f4bc20cf 100644 --- a/src/V3Mutex.h +++ b/src/V3Mutex.h @@ -109,6 +109,10 @@ public: bool try_lock() VL_TRY_ACQUIRE(true) VL_MT_SAFE { return V3MutexConfig::s().enable() ? m_mutex.try_lock() : true; } + /// Assume that the mutex is already held. Purely for Clang thread safety analyzer. + void assumeLocked() VL_ASSERT_CAPABILITY(this) VL_MT_SAFE {} + /// Pretend that the mutex is being unlocked. Purely for Clang thread safety analyzer. + void pretendUnlock() VL_RELEASE() VL_MT_SAFE {} /// Acquire/lock mutex and check for stop request /// It tries to lock the mutex and if it fails, it check if stop request was send. /// It returns after locking mutex. diff --git a/src/V3ThreadPool.cpp b/src/V3ThreadPool.cpp index 63469681a..8eadc7b61 100644 --- a/src/V3ThreadPool.cpp +++ b/src/V3ThreadPool.cpp @@ -61,7 +61,7 @@ void V3ThreadPool::workerJobLoop(int id) VL_MT_SAFE { while (true) { // Wait for a notification waitIfStopRequested(); - job_t job; + VAnyPackagedTask job; { V3LockGuard lock(m_mutex); m_cv.wait(m_mutex, [&]() VL_REQUIRES(m_mutex) { @@ -72,7 +72,7 @@ void V3ThreadPool::workerJobLoop(int id) VL_MT_SAFE { // Get the job UASSERT(!m_queue.empty(), "Job should be available"); - job = m_queue.front(); + job = std::move(m_queue.front()); m_queue.pop(); } @@ -81,40 +81,6 @@ void V3ThreadPool::workerJobLoop(int id) VL_MT_SAFE { } } -template <> -void V3ThreadPool::pushJob(std::shared_ptr>& prom, - std::function&& f) VL_MT_SAFE { - if (willExecuteSynchronously()) { - f(); - prom->set_value(); - } else { - const V3LockGuard lock{m_mutex}; - m_queue.push([prom, f] { - f(); - prom->set_value(); - }); - } -} - -void V3ThreadPool::requestExclusiveAccess(const V3ThreadPool::job_t&& exclusiveAccessJob) - VL_MT_SAFE { - if (willExecuteSynchronously()) { - exclusiveAccessJob(); - } else { - V3LockGuard stoppedJobLock{m_stoppedJobsMutex}; - // if some other job already requested exclusive access - // wait until it stops - if (stopRequested()) { waitStopRequested(); } - m_stopRequested = true; - waitOtherThreads(); - m_exclusiveAccess = true; - exclusiveAccessJob(); - m_exclusiveAccess = false; - m_stopRequested = false; - m_stoppedJobsCV.notify_all(); - } -} - bool V3ThreadPool::waitIfStopRequested() VL_MT_SAFE VL_EXCLUDES(m_stoppedJobsMutex) { if (!stopRequested()) return false; V3LockGuard stoppedJobLock(m_stoppedJobsMutex); @@ -150,11 +116,10 @@ void V3ThreadPool::selfTest() { auto firstJob = [&](int sleep) -> void { std::this_thread::sleep_for(std::chrono::milliseconds{sleep}); - s().requestExclusiveAccess([&]() { - commonValue = 10; - std::this_thread::sleep_for(std::chrono::milliseconds{sleep + 10}); - UASSERT(commonValue == 10, "unexpected commonValue = " << commonValue); - }); + const V3ThreadPool::ScopedExclusiveAccess exclusiveAccess; + commonValue = 10; + std::this_thread::sleep_for(std::chrono::milliseconds{sleep + 10}); + UASSERT(commonValue == 10, "unexpected commonValue = " << commonValue); }; auto secondJob = [&](int sleep) -> void { commonMutex.lock(); @@ -175,19 +140,19 @@ void V3ThreadPool::selfTest() { }; std::list> futures; - futures.push_back(s().enqueue(std::bind(firstJob, 100))); - futures.push_back(s().enqueue(std::bind(secondJob, 100))); - futures.push_back(s().enqueue(std::bind(firstJob, 100))); - futures.push_back(s().enqueue(std::bind(secondJob, 100))); - futures.push_back(s().enqueue(std::bind(secondJob, 200))); - futures.push_back(s().enqueue(std::bind(firstJob, 200))); - futures.push_back(s().enqueue(std::bind(firstJob, 300))); + futures.push_back(s().enqueue(std::bind(firstJob, 100))); + futures.push_back(s().enqueue(std::bind(secondJob, 100))); + futures.push_back(s().enqueue(std::bind(firstJob, 100))); + futures.push_back(s().enqueue(std::bind(secondJob, 100))); + futures.push_back(s().enqueue(std::bind(secondJob, 200))); + futures.push_back(s().enqueue(std::bind(firstJob, 200))); + futures.push_back(s().enqueue(std::bind(firstJob, 300))); while (!futures.empty()) { s().waitForFuture(futures.front()); futures.pop_front(); } - futures.push_back(s().enqueue(std::bind(thirdJob, 100))); - futures.push_back(s().enqueue(std::bind(thirdJob, 100))); + futures.push_back(s().enqueue(std::bind(thirdJob, 100))); + futures.push_back(s().enqueue(std::bind(thirdJob, 100))); V3ThreadPool::waitForFutures(futures); s().waitIfStopRequested(); @@ -195,7 +160,7 @@ void V3ThreadPool::selfTest() { auto forthJob = [&]() -> int { return 1234; }; std::list> futuresInt; - futuresInt.push_back(s().enqueue(forthJob)); + futuresInt.push_back(s().enqueue(forthJob)); auto result = V3ThreadPool::waitForFutures(futuresInt); UASSERT(result.back() == 1234, "unexpected future result = " << result.back()); } diff --git a/src/V3ThreadPool.h b/src/V3ThreadPool.h index d87d5be17..b7c0658f4 100644 --- a/src/V3ThreadPool.h +++ b/src/V3ThreadPool.h @@ -29,13 +29,53 @@ //============================================================================ +// Callable, type-erased wrapper for std::packaged_task with any Signature. +class VAnyPackagedTask final { + // TYPES + struct PTWrapperBase { + virtual ~PTWrapperBase() {} + virtual void operator()() = 0; + }; + + template + struct PTWrapper final : PTWrapperBase { + std::packaged_task m_pt; + + PTWrapper(std::packaged_task&& pt) + : m_pt(std::move(pt)) {} + + void operator()() final override { m_pt(); } + }; + + // MEMBERS + std::unique_ptr m_ptWrapperp = nullptr; // Wrapper to call + +public: + // CONSTRUCTORS + template + VAnyPackagedTask(std::packaged_task&& pt) + : m_ptWrapperp{std::make_unique>(std::move(pt))} {} + + VAnyPackagedTask() = default; + ~VAnyPackagedTask() = default; + + VAnyPackagedTask(const VAnyPackagedTask&) = delete; + VAnyPackagedTask& operator=(const VAnyPackagedTask&) = delete; + + VAnyPackagedTask(VAnyPackagedTask&&) = default; + VAnyPackagedTask& operator=(VAnyPackagedTask&&) = default; + + // METHODS + // Call the wrapped function + void operator()() { (*m_ptWrapperp)(); } +}; + class V3ThreadPool final { // MEMBERS static constexpr unsigned int FUTUREWAITFOR_MS = 100; - using job_t = std::function; - mutable V3Mutex m_mutex; // Mutex for use by m_queue - std::queue m_queue VL_GUARDED_BY(m_mutex); // Queue of jobs + V3Mutex m_mutex; // Mutex for use by m_queue + std::queue m_queue VL_GUARDED_BY(m_mutex); // Queue of jobs // We don't need to guard this condition_variable as // both `notify_one` and `notify_all` functions are atomic, // `wait` function is not atomic, but we are guarding `m_queue` that is @@ -62,6 +102,9 @@ class V3ThreadPool final { } public: + // Request exclusive access to processing for the object lifetime. + class ScopedExclusiveAccess; + // METHODS // Singleton static V3ThreadPool& s() VL_MT_SAFE { @@ -79,8 +122,8 @@ public: // will call it. `VL_MT_START` here indicates that // every function call inside this `std::function` requires // annotations. - template - std::future enqueue(std::function&& f) VL_MT_START; + template + auto enqueue(Callable&& f) VL_MT_START; // Request exclusive access to processing. // It sends request to stop all other threads and waits for them to stop. @@ -88,7 +131,9 @@ public: // they can be stopped. // When all other threads are stopped, this function executes the job // and resumes execution of other jobs. - void requestExclusiveAccess(const job_t&& exclusiveAccessJob) VL_MT_SAFE; + template + void requestExclusiveAccess(Callable&& exclusiveAccessJob) VL_MT_SAFE + VL_EXCLUDES(m_stoppedJobsMutex); // Check if other thread requested exclusive access to processing, // if so, it waits for it to complete. Afterwards it is resumed. @@ -151,14 +196,39 @@ private: // Waits until all other jobs are stopped void waitOtherThreads() VL_MT_SAFE_EXCLUDES(m_mutex) VL_REQUIRES(m_stoppedJobsMutex); - template - void pushJob(std::shared_ptr>& prom, std::function&& f) VL_MT_SAFE; - void workerJobLoop(int id) VL_MT_SAFE; static void startWorker(V3ThreadPool* selfThreadp, int id) VL_MT_SAFE; }; +class VL_SCOPED_CAPABILITY V3ThreadPool::ScopedExclusiveAccess final { +public: + ScopedExclusiveAccess() VL_ACQUIRE(V3ThreadPool::s().m_stoppedJobsMutex) VL_MT_SAFE { + if (!V3ThreadPool::s().willExecuteSynchronously()) { + V3ThreadPool::s().m_stoppedJobsMutex.lock(); + + if (V3ThreadPool::s().stopRequested()) { V3ThreadPool::s().waitStopRequested(); } + V3ThreadPool::s().m_stopRequested = true; + V3ThreadPool::s().waitOtherThreads(); + V3ThreadPool::s().m_exclusiveAccess = true; + } else { + V3ThreadPool::s().m_stoppedJobsMutex.assumeLocked(); + } + } + ~ScopedExclusiveAccess() VL_RELEASE(V3ThreadPool::s().m_stoppedJobsMutex) VL_MT_SAFE { + // Can't use `willExecuteSynchronously`, we're still in exclusive execution state. + if (V3ThreadPool::s().m_exclusiveAccess) { + V3ThreadPool::s().m_exclusiveAccess = false; + V3ThreadPool::s().m_stopRequested = false; + V3ThreadPool::s().m_stoppedJobsCV.notify_all(); + + V3ThreadPool::s().m_stoppedJobsMutex.unlock(); + } else { + V3ThreadPool::s().m_stoppedJobsMutex.pretendUnlock(); + } + } +}; + template T V3ThreadPool::waitForFuture(std::future& future) VL_MT_SAFE_EXCLUDES(m_mutex) { while (true) { @@ -175,29 +245,28 @@ T V3ThreadPool::waitForFuture(std::future& future) VL_MT_SAFE_EXCLUDES(m_mute } } -template -std::future V3ThreadPool::enqueue(std::function&& f) VL_MT_START { - std::shared_ptr> prom = std::make_shared>(); - std::future result = prom->get_future(); - pushJob(prom, std::move(f)); - const V3LockGuard guard{m_mutex}; - m_cv.notify_one(); - return result; -} - -template -void V3ThreadPool::pushJob(std::shared_ptr>& prom, - std::function&& f) VL_MT_SAFE { +template +auto V3ThreadPool::enqueue(Callable&& f) VL_MT_START { + using result_t = decltype(f()); + auto&& job = std::packaged_task{std::forward(f)}; + auto future = job.get_future(); if (willExecuteSynchronously()) { - prom->set_value(f()); + job(); } else { - const V3LockGuard guard{m_mutex}; - m_queue.push([prom, f] { prom->set_value(f()); }); + { + const V3LockGuard guard{m_mutex}; + m_queue.push(std::move(job)); + } + m_cv.notify_one(); } + return future; } -template <> -void V3ThreadPool::pushJob(std::shared_ptr>& prom, - std::function&& f) VL_MT_SAFE; +template +void V3ThreadPool::requestExclusiveAccess(Callable&& exclusiveAccessJob) VL_MT_SAFE + VL_EXCLUDES(m_stoppedJobsMutex) { + ScopedExclusiveAccess exclusive_access; + exclusiveAccessJob(); +} #endif // Guard