Bfs::visitParallel chunk by thread

Signed-off-by: James Cherry <cherry@parallaxsw.com>
This commit is contained in:
James Cherry 2021-12-25 11:00:43 -07:00
parent 834109979b
commit 445f935c16
1 changed files with 27 additions and 10 deletions

View File

@ -164,27 +164,44 @@ int
BfsIterator::visitParallel(Level to_level,
VertexVisitor *visitor)
{
size_t thread_count = thread_count_;
int visit_count = 0;
if (!empty()) {
if (thread_count_ <= 1)
if (thread_count == 1)
visit_count = visit(to_level, visitor);
else {
std::vector<VertexVisitor*> visitors;
for (int i = 0; i < thread_count_; i++)
for (int k = 0; k < thread_count_; k++)
visitors.push_back(visitor->copy());
while (levelLessOrEqual(first_level_, last_level_)
&& levelLessOrEqual(first_level_, to_level)) {
VertexSeq &level_vertices = queue_[first_level_];
incrLevel(first_level_);
if (!level_vertices.empty()) {
for (auto vertex : level_vertices) {
if (vertex) {
vertex->setBfsInQueue(bfs_index_, false);
dispatch_queue_->dispatch( [vertex, &visitors](int i){ visitors[i]->visit(vertex); } );
visit_count++;
}
}
dispatch_queue_->finishTasks();
size_t vertex_count = level_vertices.size();
if (vertex_count < thread_count) {
for (Vertex *vertex : level_vertices) {
vertex->setBfsInQueue(bfs_index_, false);
visitor->visit(vertex);
}
}
else {
size_t from = 0;
size_t chunk_size = vertex_count / thread_count;
for (size_t k = 0; k < thread_count; k++) {
// Last thread gets the left overs.
size_t to = (k == thread_count - 1) ? vertex_count : from + chunk_size;
dispatch_queue_->dispatch( [=](int) {
for (size_t i = from; i < to; i++) {
Vertex *vertex = level_vertices[i];
vertex->setBfsInQueue(bfs_index_, false);
visitors[k]->visit(vertex);
}
});
from = to;
}
dispatch_queue_->finishTasks();
}
visitor->levelFinished();
level_vertices.clear();
}