dispatch queue for thread support

This commit is contained in:
James Cherry 2019-11-11 08:48:27 -07:00
parent 184d044b02
commit e647ed391d
11 changed files with 201 additions and 193 deletions

View File

@ -199,6 +199,7 @@ set(STA_SOURCE
search/WritePathSpice.cc
util/Debug.cc
util/DispatchQueue.cc
util/Error.cc
util/Fuzzy.cc
util/Hash.cc
@ -361,6 +362,7 @@ set(STA_HEADERS
search/WritePathSpice.hh
util/Debug.hh
util/DispatchQueue.hh
util/DisallowCopyAssign.hh
util/EnumNameMap.hh
util/Error.hh
@ -385,7 +387,6 @@ set(STA_HEADERS
util/StringSeq.hh
util/StringSet.hh
util/StringUtil.hh
util/ThreadForEach.hh
util/TokenParser.hh
util/UnorderedMap.hh
util/UnorderedSet.hh

View File

@ -2472,8 +2472,9 @@ Sdc::cycleAccting(const ClockEdge *src,
{
if (src == nullptr)
src = tgt;
CycleAccting *acct;
CycleAccting probe(src, tgt);
CycleAccting *acct = cycle_acctings_.findKey(&probe);
acct = cycle_acctings_.findKey(&probe);
if (acct == nullptr) {
UniqueLock lock(cycle_acctings_lock_);
// Recheck with lock.

View File

@ -19,7 +19,7 @@
#include "Report.hh"
#include "Debug.hh"
#include "Mutex.hh"
#include "ThreadForEach.hh"
#include "DispatchQueue.hh"
#include "Network.hh"
#include "Graph.hh"
#include "Levelize.hh"
@ -159,51 +159,6 @@ BfsIterator::visit(Level to_level,
return visit_count;
}
// VertexSeq::Iterator that filters null objects,
// and pops objects so the vector does not need to be cleared.
class QueueIterator : Iterator<Vertex*>
{
public:
QueueIterator(VertexSeq &vertices,
BfsIndex bfs_index);
virtual bool hasNext();
virtual Vertex *next();
unsigned count() { return count_; }
private:
VertexSeq &vertices_;
BfsIndex bfs_index_;
unsigned count_;
};
QueueIterator::QueueIterator(VertexSeq &vertices,
BfsIndex bfs_index) :
vertices_(vertices),
bfs_index_(bfs_index),
count_(0)
{
}
bool
QueueIterator::hasNext()
{
Vertex *next = nullptr;
while (!vertices_.empty()
&& (next = vertices_.back()) == nullptr)
vertices_.pop_back();
return next != nullptr;
}
Vertex *
QueueIterator::next()
{
Vertex *next = vertices_.back();
next->setBfsInQueue(bfs_index_, false);
vertices_.pop_back();
count_++;
return next;
}
int
BfsIterator::visitParallel(Level to_level,
VertexVisitor *visitor)
@ -213,35 +168,24 @@ BfsIterator::visitParallel(Level to_level,
if (thread_count_ <= 1)
visit_count = visit(to_level, visitor);
else {
std::mutex lock;
Level level = first_level_;
while (levelLessOrEqual(level, last_level_)
&& levelLessOrEqual(level, to_level)) {
VertexSeq &level_vertices = queue_[level];
std::vector<VertexVisitor*> visitors;
for (int i = 0; i < thread_count_; i++)
visitors.push_back(visitor->copy());
while (levelLessOrEqual(first_level_, last_level_)
&& levelLessOrEqual(first_level_, to_level)) {
VertexSeq &level_vertices = queue_[first_level_];
incrLevel(first_level_);
if (!level_vertices.empty()) {
incrLevel(first_level_);
QueueIterator iter(level_vertices, bfs_index_);
std::vector<std::thread> threads;
for (int i = 0; i < thread_count_; i++) {
ForEachArg<QueueIterator, VertexVisitor> arg(&iter, lock,
visitor->copy());
// Missing check for null vertex.
threads.push_back(std::thread(forEachBegin<QueueIterator,
VertexVisitor, Vertex*>, arg));
for (auto vertex : level_vertices) {
if (vertex) {
vertex->setBfsInQueue(bfs_index_, false);
dispatch_queue_->dispatch( [vertex, &visitors](int i){ visitors[i]->visit(vertex); } );
visit_count++;
}
}
// Wait for all threads working on this level before moving on.
for (auto &thread : threads)
thread.join();
visit_count += iter.count();
dispatch_queue_->finishTasks();
visitor->levelFinished();
level = first_level_;
}
else {
incrLevel(first_level_);
level = first_level_;
level_vertices.clear();
}
}
}

View File

@ -32,7 +32,7 @@
#include "Search.hh"
#include "VisitPathEnds.hh"
#include "PathEnum.hh"
#include "ThreadForEach.hh"
#include "DispatchQueue.hh"
#include "PathGroup.hh"
namespace sta {
@ -899,11 +899,15 @@ PathGroups::makeGroupPathEnds(VertexSet *endpoints,
const MinMaxAll *min_max,
PathEndVisitor *visitor)
{
VertexSet::Iterator end_iter(endpoints);
MakeEndpointPathEnds make_path_ends(visitor, corner, min_max, this);
forEach<VertexSet::Iterator,VertexVisitor,Vertex*>(&end_iter,
&make_path_ends,
thread_count_);
Vector<MakeEndpointPathEnds*> visitors;
for (int i = 0; i < thread_count_; i++)
visitors.push_back(new MakeEndpointPathEnds(visitor, corner, min_max, this));
for (auto endpoint : *endpoints) {
dispatch_queue_->dispatch( [endpoint, &visitors](int i)
{ visitors[i]->visit(endpoint); } );
}
dispatch_queue_->finishTasks();
visitors.deleteContents();
}
} // namespace

View File

@ -19,7 +19,6 @@
#include "Machine.hh"
#include "DisallowCopyAssign.hh"
#include "Mutex.hh"
#include "ThreadForEach.hh"
#include "Report.hh"
#include "Debug.hh"
#include "Error.hh"

View File

@ -17,6 +17,7 @@
#include <limits>
#include "Machine.hh"
#include "DisallowCopyAssign.hh"
#include "DispatchQueue.hh"
#include "ReportTcl.hh"
#include "Debug.hh"
#include "Stats.hh"
@ -319,6 +320,9 @@ void
Sta::setThreadCount(int thread_count)
{
thread_count_ = thread_count;
// dispatch_queue_->setThreadCount(thread_count);
delete dispatch_queue_;
dispatch_queue_ = new DispatchQueue(thread_count);
updateComponentsState();
}

View File

@ -17,6 +17,7 @@
#include <limits>
#include "Machine.hh"
#include "Network.hh"
#include "DispatchQueue.hh"
#include "StaState.hh"
namespace sta {
@ -37,6 +38,7 @@ StaState::StaState() :
search_(nullptr),
latches_(nullptr),
thread_count_(1),
dispatch_queue_(new DispatchQueue(thread_count_)),
pocv_enabled_(false),
sigma_factor_(1.0)
{
@ -60,6 +62,7 @@ StaState::StaState(const StaState *sta) :
search_(sta->search_),
latches_(sta->latches_),
thread_count_(sta->thread_count_),
dispatch_queue_(sta->dispatch_queue_),
pocv_enabled_(sta->pocv_enabled_),
sigma_factor_(sta->sigma_factor_)
{
@ -85,6 +88,7 @@ StaState::copyState(const StaState *sta)
search_ = sta->search_;
latches_ = sta->latches_;
thread_count_ = sta->thread_count_;
dispatch_queue_ = sta->dispatch_queue_;
pocv_enabled_ = sta->pocv_enabled_;
sigma_factor_ = sta->sigma_factor_;
}

View File

@ -37,6 +37,7 @@ class Parasitics;
class ArcDelayCalc;
class GraphDelayCalc;
class Latches;
class DispatchQueue;
// Most STA components use functionality in other components.
// This class simplifies the process of copying pointers to the
@ -114,6 +115,7 @@ protected:
Search *search_;
Latches *latches_;
int thread_count_;
DispatchQueue *dispatch_queue_;
bool pocv_enabled_;
float sigma_factor_;

108
util/DispatchQueue.cc Normal file
View File

@ -0,0 +1,108 @@
// Author Phillip Johnston
// Licensed under CC0 1.0 Universal
// https://github.com/embeddedartistry/embedded-resources/blob/master/examples/cpp/dispatch.cpp
// https://embeddedartistry.com/blog/2017/2/1/dispatch-queues?rq=dispatch
#include "DispatchQueue.hh"
namespace sta {
DispatchQueue::DispatchQueue(size_t thread_count) :
threads_(thread_count),
pending_task_count_(0)
{
for(size_t i = 0; i < thread_count; i++)
threads_[i] = std::thread(&DispatchQueue::dispatch_thread_handler, this, i);
}
DispatchQueue::~DispatchQueue()
{
terminateThreads();
}
void
DispatchQueue::terminateThreads()
{
// Signal to dispatch threads that it's time to wrap up
std::unique_lock<std::mutex> lock(lock_);
quit_ = true;
lock.unlock();
cv_.notify_all();
// Wait for threads to finish before we exit
for(size_t i = 0; i < threads_.size(); i++) {
if (threads_[i].joinable()) {
threads_[i].join();
}
}
}
void
DispatchQueue::setThreadCount(size_t thread_count)
{
terminateThreads();
threads_.resize(thread_count);
for(size_t i = 0; i < thread_count; i++) {
threads_[i] = std::thread(&DispatchQueue::dispatch_thread_handler, this, i);
}
}
void
DispatchQueue::finishTasks()
{
while (pending_task_count_.load(std::memory_order_acquire) != 0)
std::this_thread::yield();
}
void
DispatchQueue::dispatch(const fp_t& op)
{
std::unique_lock<std::mutex> lock(lock_);
q_.push(op);
pending_task_count_++;
// Manual unlocking is done before notifying, to avoid waking up
// the waiting thread only to block again (see notify_one for details)
lock.unlock();
cv_.notify_all();
}
void
DispatchQueue::dispatch(fp_t&& op)
{
std::unique_lock<std::mutex> lock(lock_);
q_.push(std::move(op));
pending_task_count_++;
// Manual unlocking is done before notifying, to avoid waking up
// the waiting thread only to block again (see notify_one for details)
lock.unlock();
cv_.notify_all();
}
void
DispatchQueue::dispatch_thread_handler(size_t i)
{
std::unique_lock<std::mutex> lock(lock_);
do {
// Wait until we have data or a quit signal
cv_.wait(lock, [this] { return (q_.size() || quit_); } );
//after wait, we own the lock
if(!quit_ && q_.size()) {
auto op = std::move(q_.front());
q_.pop();
lock.unlock();
op(i);
pending_task_count_--;
lock.lock();
}
} while (!quit_);
}
} // namespace

53
util/DispatchQueue.hh Normal file
View File

@ -0,0 +1,53 @@
// Author Phillip Johnston
// Licensed under CC0 1.0 Universal
// https://github.com/embeddedartistry/embedded-resources/blob/master/examples/cpp/dispatch.cpp
// https://embeddedartistry.com/blog/2017/2/1/dispatch-queues?rq=dispatch
#ifndef STA_DISPATCH_QUEUE_H
#define STA_DISPATCH_QUEUE_H
#include <thread>
#include <functional>
#include <vector>
#include <cstdint>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <atomic>
namespace sta {
class DispatchQueue
{
typedef std::function<void(int thread)> fp_t;
public:
DispatchQueue(size_t thread_cnt);
~DispatchQueue();
void setThreadCount(size_t thread_count);
// Dispatch and copy.
void dispatch(const fp_t& op);
// Dispatch and move.
void dispatch(fp_t&& op);
void finishTasks();
// Deleted operations
DispatchQueue(const DispatchQueue& rhs) = delete;
DispatchQueue& operator=(const DispatchQueue& rhs) = delete;
DispatchQueue(DispatchQueue&& rhs) = delete;
DispatchQueue& operator=(DispatchQueue&& rhs) = delete;
private:
void dispatch_thread_handler(size_t i);
void terminateThreads();
std::mutex lock_;
std::vector<std::thread> threads_;
std::queue<fp_t> q_;
std::condition_variable cv_;
std::atomic<size_t> pending_task_count_;
bool quit_ = false;
};
} // namespace
#endif

View File

@ -1,112 +0,0 @@
// OpenSTA, Static Timing Analyzer
// Copyright (c) 2019, Parallax Software, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
#ifndef STA_THREAD_FOR_EACH_H
#define STA_THREAD_FOR_EACH_H
#include <mutex>
#include <thread>
#include <vector>
#include "Iterator.hh"
namespace sta {
template<class Iterator, class Func>
class ForEachArg {
public:
ForEachArg(Iterator *iter,
std::mutex &lock,
Func *func) :
iter_(iter),
lock_(lock),
func_(func)
{}
~ForEachArg()
{
delete func_;
}
// Copy constructor.
ForEachArg(const ForEachArg &arg) :
iter_(arg.iter_),
lock_(arg.lock_),
func_(arg.func_->copy())
{
}
// Move constructor.
ForEachArg(ForEachArg &&arg) :
iter_(arg.iter_),
lock_(arg.lock_),
func_(arg.func_)
{
arg.func_ = nullptr;
}
Iterator *iter_;
std::mutex &lock_;
Func *func_;
};
template<class Iterator, class Func, class FuncArg>
void
forEachBegin(ForEachArg<Iterator, Func> arg1)
{
Iterator *iter = arg1.iter_;
std::mutex &lock = arg1.lock_;
Func *func = arg1.func_;
while (true) {
lock.lock();
if (iter->hasNext()) {
FuncArg arg = iter->next();
lock.unlock();
(*func)(arg);
}
else {
lock.unlock();
break;
}
}
}
// Parallel version of STL for_each.
// Each thread has its own functor.
// Func::copy() must be defined.
template<class Iterator, class Func, class FuncArg>
void
forEach(Iterator *iter,
Func *func,
int thread_count)
{
if (thread_count <= 1) {
while (iter->hasNext())
(*func)(iter->next());
}
else {
std::vector<std::thread> threads;
std::mutex lock;
for (int i = 0; i < thread_count; i++) {
ForEachArg<Iterator,Func> arg(iter, lock, func->copy());
threads.push_back(std::thread(forEachBegin<Iterator,Func,FuncArg>, arg));
}
for (auto &thread : threads)
thread.join();
}
}
} // namespace
#endif