diff --git a/CMakeLists.txt b/CMakeLists.txt index d8870760..5b244963 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -199,6 +199,7 @@ set(STA_SOURCE search/WritePathSpice.cc util/Debug.cc + util/DispatchQueue.cc util/Error.cc util/Fuzzy.cc util/Hash.cc @@ -361,6 +362,7 @@ set(STA_HEADERS search/WritePathSpice.hh util/Debug.hh + util/DispatchQueue.hh util/DisallowCopyAssign.hh util/EnumNameMap.hh util/Error.hh @@ -385,7 +387,6 @@ set(STA_HEADERS util/StringSeq.hh util/StringSet.hh util/StringUtil.hh - util/ThreadForEach.hh util/TokenParser.hh util/UnorderedMap.hh util/UnorderedSet.hh diff --git a/sdc/Sdc.cc b/sdc/Sdc.cc index 31467063..79ad9755 100644 --- a/sdc/Sdc.cc +++ b/sdc/Sdc.cc @@ -2472,8 +2472,9 @@ Sdc::cycleAccting(const ClockEdge *src, { if (src == nullptr) src = tgt; + CycleAccting *acct; CycleAccting probe(src, tgt); - CycleAccting *acct = cycle_acctings_.findKey(&probe); + acct = cycle_acctings_.findKey(&probe); if (acct == nullptr) { UniqueLock lock(cycle_acctings_lock_); // Recheck with lock. diff --git a/search/Bfs.cc b/search/Bfs.cc index 0ffdb333..b7e1c53e 100644 --- a/search/Bfs.cc +++ b/search/Bfs.cc @@ -19,7 +19,7 @@ #include "Report.hh" #include "Debug.hh" #include "Mutex.hh" -#include "ThreadForEach.hh" +#include "DispatchQueue.hh" #include "Network.hh" #include "Graph.hh" #include "Levelize.hh" @@ -159,51 +159,6 @@ BfsIterator::visit(Level to_level, return visit_count; } -// VertexSeq::Iterator that filters null objects, -// and pops objects so the vector does not need to be cleared. -class QueueIterator : Iterator -{ -public: - QueueIterator(VertexSeq &vertices, - BfsIndex bfs_index); - virtual bool hasNext(); - virtual Vertex *next(); - unsigned count() { return count_; } - -private: - VertexSeq &vertices_; - BfsIndex bfs_index_; - unsigned count_; -}; - -QueueIterator::QueueIterator(VertexSeq &vertices, - BfsIndex bfs_index) : - vertices_(vertices), - bfs_index_(bfs_index), - count_(0) -{ -} - -bool -QueueIterator::hasNext() -{ - Vertex *next = nullptr; - while (!vertices_.empty() - && (next = vertices_.back()) == nullptr) - vertices_.pop_back(); - return next != nullptr; -} - -Vertex * -QueueIterator::next() -{ - Vertex *next = vertices_.back(); - next->setBfsInQueue(bfs_index_, false); - vertices_.pop_back(); - count_++; - return next; -} - int BfsIterator::visitParallel(Level to_level, VertexVisitor *visitor) @@ -213,35 +168,24 @@ BfsIterator::visitParallel(Level to_level, if (thread_count_ <= 1) visit_count = visit(to_level, visitor); else { - std::mutex lock; - Level level = first_level_; - while (levelLessOrEqual(level, last_level_) - && levelLessOrEqual(level, to_level)) { - VertexSeq &level_vertices = queue_[level]; + std::vector visitors; + for (int i = 0; i < thread_count_; i++) + visitors.push_back(visitor->copy()); + while (levelLessOrEqual(first_level_, last_level_) + && levelLessOrEqual(first_level_, to_level)) { + VertexSeq &level_vertices = queue_[first_level_]; + incrLevel(first_level_); if (!level_vertices.empty()) { - incrLevel(first_level_); - QueueIterator iter(level_vertices, bfs_index_); - std::vector threads; - - for (int i = 0; i < thread_count_; i++) { - ForEachArg arg(&iter, lock, - visitor->copy()); - // Missing check for null vertex. - threads.push_back(std::thread(forEachBegin, arg)); + for (auto vertex : level_vertices) { + if (vertex) { + vertex->setBfsInQueue(bfs_index_, false); + dispatch_queue_->dispatch( [vertex, &visitors](int i){ visitors[i]->visit(vertex); } ); + visit_count++; + } } - - // Wait for all threads working on this level before moving on. - for (auto &thread : threads) - thread.join(); - - visit_count += iter.count(); + dispatch_queue_->finishTasks(); visitor->levelFinished(); - level = first_level_; - } - else { - incrLevel(first_level_); - level = first_level_; + level_vertices.clear(); } } } diff --git a/search/PathGroup.cc b/search/PathGroup.cc index 109e465a..fc0c22fd 100644 --- a/search/PathGroup.cc +++ b/search/PathGroup.cc @@ -32,7 +32,7 @@ #include "Search.hh" #include "VisitPathEnds.hh" #include "PathEnum.hh" -#include "ThreadForEach.hh" +#include "DispatchQueue.hh" #include "PathGroup.hh" namespace sta { @@ -899,11 +899,15 @@ PathGroups::makeGroupPathEnds(VertexSet *endpoints, const MinMaxAll *min_max, PathEndVisitor *visitor) { - VertexSet::Iterator end_iter(endpoints); - MakeEndpointPathEnds make_path_ends(visitor, corner, min_max, this); - forEach(&end_iter, - &make_path_ends, - thread_count_); + Vector visitors; + for (int i = 0; i < thread_count_; i++) + visitors.push_back(new MakeEndpointPathEnds(visitor, corner, min_max, this)); + for (auto endpoint : *endpoints) { + dispatch_queue_->dispatch( [endpoint, &visitors](int i) + { visitors[i]->visit(endpoint); } ); + } + dispatch_queue_->finishTasks(); + visitors.deleteContents(); } } // namespace diff --git a/search/Search.cc b/search/Search.cc index 7dccb681..a75a1cc8 100644 --- a/search/Search.cc +++ b/search/Search.cc @@ -19,7 +19,6 @@ #include "Machine.hh" #include "DisallowCopyAssign.hh" #include "Mutex.hh" -#include "ThreadForEach.hh" #include "Report.hh" #include "Debug.hh" #include "Error.hh" diff --git a/search/Sta.cc b/search/Sta.cc index f21a9fc4..12f7d458 100644 --- a/search/Sta.cc +++ b/search/Sta.cc @@ -17,6 +17,7 @@ #include #include "Machine.hh" #include "DisallowCopyAssign.hh" +#include "DispatchQueue.hh" #include "ReportTcl.hh" #include "Debug.hh" #include "Stats.hh" @@ -319,6 +320,9 @@ void Sta::setThreadCount(int thread_count) { thread_count_ = thread_count; + // dispatch_queue_->setThreadCount(thread_count); + delete dispatch_queue_; + dispatch_queue_ = new DispatchQueue(thread_count); updateComponentsState(); } diff --git a/search/StaState.cc b/search/StaState.cc index 80de7fa7..8b6e42de 100644 --- a/search/StaState.cc +++ b/search/StaState.cc @@ -17,6 +17,7 @@ #include #include "Machine.hh" #include "Network.hh" +#include "DispatchQueue.hh" #include "StaState.hh" namespace sta { @@ -37,6 +38,7 @@ StaState::StaState() : search_(nullptr), latches_(nullptr), thread_count_(1), + dispatch_queue_(new DispatchQueue(thread_count_)), pocv_enabled_(false), sigma_factor_(1.0) { @@ -60,6 +62,7 @@ StaState::StaState(const StaState *sta) : search_(sta->search_), latches_(sta->latches_), thread_count_(sta->thread_count_), + dispatch_queue_(sta->dispatch_queue_), pocv_enabled_(sta->pocv_enabled_), sigma_factor_(sta->sigma_factor_) { @@ -85,6 +88,7 @@ StaState::copyState(const StaState *sta) search_ = sta->search_; latches_ = sta->latches_; thread_count_ = sta->thread_count_; + dispatch_queue_ = sta->dispatch_queue_; pocv_enabled_ = sta->pocv_enabled_; sigma_factor_ = sta->sigma_factor_; } diff --git a/search/StaState.hh b/search/StaState.hh index 75c02bb1..50fdef99 100644 --- a/search/StaState.hh +++ b/search/StaState.hh @@ -37,6 +37,7 @@ class Parasitics; class ArcDelayCalc; class GraphDelayCalc; class Latches; +class DispatchQueue; // Most STA components use functionality in other components. // This class simplifies the process of copying pointers to the @@ -114,6 +115,7 @@ protected: Search *search_; Latches *latches_; int thread_count_; + DispatchQueue *dispatch_queue_; bool pocv_enabled_; float sigma_factor_; diff --git a/util/DispatchQueue.cc b/util/DispatchQueue.cc new file mode 100644 index 00000000..d59fdf18 --- /dev/null +++ b/util/DispatchQueue.cc @@ -0,0 +1,108 @@ +// Author Phillip Johnston +// Licensed under CC0 1.0 Universal +// https://github.com/embeddedartistry/embedded-resources/blob/master/examples/cpp/dispatch.cpp +// https://embeddedartistry.com/blog/2017/2/1/dispatch-queues?rq=dispatch + +#include "DispatchQueue.hh" + +namespace sta { + +DispatchQueue::DispatchQueue(size_t thread_count) : + threads_(thread_count), + pending_task_count_(0) +{ + for(size_t i = 0; i < thread_count; i++) + threads_[i] = std::thread(&DispatchQueue::dispatch_thread_handler, this, i); +} + +DispatchQueue::~DispatchQueue() +{ + terminateThreads(); +} + +void +DispatchQueue::terminateThreads() +{ + // Signal to dispatch threads that it's time to wrap up + std::unique_lock lock(lock_); + quit_ = true; + lock.unlock(); + cv_.notify_all(); + + // Wait for threads to finish before we exit + for(size_t i = 0; i < threads_.size(); i++) { + if (threads_[i].joinable()) { + threads_[i].join(); + } + } +} + +void +DispatchQueue::setThreadCount(size_t thread_count) +{ + terminateThreads(); + + threads_.resize(thread_count); + for(size_t i = 0; i < thread_count; i++) { + threads_[i] = std::thread(&DispatchQueue::dispatch_thread_handler, this, i); + } +} + +void +DispatchQueue::finishTasks() +{ + while (pending_task_count_.load(std::memory_order_acquire) != 0) + std::this_thread::yield(); +} + +void +DispatchQueue::dispatch(const fp_t& op) +{ + std::unique_lock lock(lock_); + q_.push(op); + pending_task_count_++; + + // Manual unlocking is done before notifying, to avoid waking up + // the waiting thread only to block again (see notify_one for details) + lock.unlock(); + cv_.notify_all(); +} + +void +DispatchQueue::dispatch(fp_t&& op) +{ + std::unique_lock lock(lock_); + q_.push(std::move(op)); + pending_task_count_++; + + // Manual unlocking is done before notifying, to avoid waking up + // the waiting thread only to block again (see notify_one for details) + lock.unlock(); + cv_.notify_all(); +} + +void +DispatchQueue::dispatch_thread_handler(size_t i) +{ + std::unique_lock lock(lock_); + + do { + // Wait until we have data or a quit signal + cv_.wait(lock, [this] { return (q_.size() || quit_); } ); + + //after wait, we own the lock + if(!quit_ && q_.size()) { + auto op = std::move(q_.front()); + q_.pop(); + + lock.unlock(); + + op(i); + + pending_task_count_--; + lock.lock(); + } + } while (!quit_); +} + +} // namespace diff --git a/util/DispatchQueue.hh b/util/DispatchQueue.hh new file mode 100644 index 00000000..5d2dc32d --- /dev/null +++ b/util/DispatchQueue.hh @@ -0,0 +1,53 @@ +// Author Phillip Johnston +// Licensed under CC0 1.0 Universal +// https://github.com/embeddedartistry/embedded-resources/blob/master/examples/cpp/dispatch.cpp +// https://embeddedartistry.com/blog/2017/2/1/dispatch-queues?rq=dispatch + +#ifndef STA_DISPATCH_QUEUE_H +#define STA_DISPATCH_QUEUE_H + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace sta { + +class DispatchQueue +{ + typedef std::function fp_t; + +public: + DispatchQueue(size_t thread_cnt); + ~DispatchQueue(); + void setThreadCount(size_t thread_count); + // Dispatch and copy. + void dispatch(const fp_t& op); + // Dispatch and move. + void dispatch(fp_t&& op); + void finishTasks(); + + // Deleted operations + DispatchQueue(const DispatchQueue& rhs) = delete; + DispatchQueue& operator=(const DispatchQueue& rhs) = delete; + DispatchQueue(DispatchQueue&& rhs) = delete; + DispatchQueue& operator=(DispatchQueue&& rhs) = delete; + +private: + void dispatch_thread_handler(size_t i); + void terminateThreads(); + + std::mutex lock_; + std::vector threads_; + std::queue q_; + std::condition_variable cv_; + std::atomic pending_task_count_; + bool quit_ = false; +}; + +} // namespace +#endif diff --git a/util/ThreadForEach.hh b/util/ThreadForEach.hh deleted file mode 100644 index e1fc2fce..00000000 --- a/util/ThreadForEach.hh +++ /dev/null @@ -1,112 +0,0 @@ -// OpenSTA, Static Timing Analyzer -// Copyright (c) 2019, Parallax Software, Inc. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -#ifndef STA_THREAD_FOR_EACH_H -#define STA_THREAD_FOR_EACH_H - -#include -#include -#include -#include "Iterator.hh" - -namespace sta { - -template -class ForEachArg { -public: - ForEachArg(Iterator *iter, - std::mutex &lock, - Func *func) : - iter_(iter), - lock_(lock), - func_(func) - {} - - ~ForEachArg() - { - delete func_; - } - - // Copy constructor. - ForEachArg(const ForEachArg &arg) : - iter_(arg.iter_), - lock_(arg.lock_), - func_(arg.func_->copy()) - { - } - // Move constructor. - ForEachArg(ForEachArg &&arg) : - iter_(arg.iter_), - lock_(arg.lock_), - func_(arg.func_) - { - arg.func_ = nullptr; - } - - Iterator *iter_; - std::mutex &lock_; - Func *func_; -}; - -template -void -forEachBegin(ForEachArg arg1) -{ - Iterator *iter = arg1.iter_; - std::mutex &lock = arg1.lock_; - Func *func = arg1.func_; - while (true) { - lock.lock(); - if (iter->hasNext()) { - FuncArg arg = iter->next(); - lock.unlock(); - (*func)(arg); - } - else { - lock.unlock(); - break; - } - } -} - -// Parallel version of STL for_each. -// Each thread has its own functor. -// Func::copy() must be defined. -template -void -forEach(Iterator *iter, - Func *func, - int thread_count) -{ - if (thread_count <= 1) { - while (iter->hasNext()) - (*func)(iter->next()); - } - else { - std::vector threads; - std::mutex lock; - for (int i = 0; i < thread_count; i++) { - ForEachArg arg(iter, lock, func->copy()); - threads.push_back(std::thread(forEachBegin, arg)); - } - - for (auto &thread : threads) - thread.join(); - } -} - -} // namespace -#endif