Internals: V3ThreadPool: add function waiting for list of futures (#4112)

This commit is contained in:
Kamil Rakoczy 2023-04-12 14:49:48 +02:00 committed by GitHub
parent e38b359d75
commit e1683afb31
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 53 additions and 11 deletions

View File

@ -182,18 +182,12 @@ void V3ThreadPool::selfTest() {
}
futures.push_back(s().enqueue<void>(std::bind(thirdJob, 100)));
futures.push_back(s().enqueue<void>(std::bind(thirdJob, 100)));
while (!futures.empty()) {
s().waitForFuture(futures.front());
futures.pop_front();
}
V3ThreadPool::waitForFutures(futures);
s().waitIfStopRequested();
s().requestExclusiveAccess(std::bind(firstJob, 100));
auto forthJob = [&]() -> int { return 1234; };
std::list<std::future<int>> futuresInt;
futuresInt.push_back(s().enqueue<int>(forthJob));
while (!futuresInt.empty()) {
int result = s().waitForFuture(futuresInt.front());
UASSERT(result == 1234, "unexpected future result = " << commonValue);
futuresInt.pop_front();
}
auto result = V3ThreadPool::waitForFutures(futuresInt);
UASSERT(result.back() == 1234, "unexpected future result = " << result.back());
}

View File

@ -27,6 +27,19 @@
#include <queue>
#include <thread>
namespace future_type {
template <typename T>
struct return_type {
typedef std::list<T> type;
};
template <>
struct return_type<void> {
typedef void type;
};
} // namespace future_type
//============================================================================
class V3ThreadPool final {
@ -88,12 +101,47 @@ public:
// Returns true if request was send and we waited, otherwise false
bool waitIfStopRequested() VL_MT_SAFE;
// Waits for future.
// This function can be interupted by exclusive access request.
// When other thread requested exclusive access to processing,
// current thread is stopped and waits until it is resumed.
// Returns future result
template <typename T>
T waitForFuture(std::future<T>& future) VL_MT_SAFE_EXCLUDES(m_mutex);
static T waitForFuture(std::future<T>& future) VL_MT_SAFE_EXCLUDES(m_mutex);
// Waits for list of futures
// This function can be interupted by exclusive access request.
// When other thread requested exclusive access to processing,
// current thread is stopped and waits until it is resumed.
// This function uses function overload instead of template
// specialization as C++11 requires them to be inside namespace scope
// Returns list of future result or void
template <typename T>
static typename future_type::return_type<T>::type
waitForFutures(std::list<std::future<T>>& futures) {
return waitForFuturesImp(futures);
}
static void selfTest();
private:
template <typename T>
static typename future_type::return_type<T>::type
waitForFuturesImp(std::list<std::future<T>>& futures) {
typename future_type::return_type<T>::type results;
while (!futures.empty()) {
results.push_back(V3ThreadPool::waitForFuture(futures.front()));
futures.pop_front();
}
return results;
}
static void waitForFuturesImp(std::list<std::future<void>>& futures) {
while (!futures.empty()) {
V3ThreadPool::waitForFuture(futures.front());
futures.pop_front();
}
}
bool willExecuteSynchronously() const VL_MT_SAFE {
return m_workers.empty() || m_exclusiveAccess;
}
@ -128,7 +176,7 @@ private:
template <typename T>
T V3ThreadPool::waitForFuture(std::future<T>& future) VL_MT_SAFE_EXCLUDES(m_mutex) {
while (true) {
waitIfStopRequested();
V3ThreadPool::s().waitIfStopRequested();
{
std::future_status status
= future.wait_for(std::chrono::milliseconds(V3ThreadPool::FUTUREWAITFOR_MS));