diff --git a/src/tl/tl/tlThreads.cc b/src/tl/tl/tlThreads.cc index dc85cce1b..c2e5c6e15 100644 --- a/src/tl/tl/tlThreads.cc +++ b/src/tl/tl/tlThreads.cc @@ -36,27 +36,124 @@ namespace tl // ------------------------------------------------------------------------------- // WaitCondition implementation + +class WaitConditionPrivate +{ +public: + WaitConditionPrivate () + : m_initialized (false) + { + if (pthread_mutex_init (&m_mutex, NULL) != 0) { + tl::error << tr ("Unable to create pthread Mutex for WaitCondition"); + } else if (pthread_cond_init(&m_cond, NULL) != 0) { + tl::error << tr ("Unable to create pthread Condition for WaitCondition"); + } else { + m_initialized = true; + } + } + + ~WaitConditionPrivate () + { + if (m_initialized) { + pthread_cond_destroy (&m_cond); + pthread_mutex_destroy (&m_mutex); + } + } + + bool wait (Mutex *mutex, unsigned long time) + { + if (! m_initialized) { + return false; + } + + // transfer the lock from our own implementation to the pthread mutex + pthread_mutex_lock (&m_mutex); + mutex->unlock (); + + // this code is executed concurrently ... + + bool woken = true; + + if (time < std::numeric_limits::max ()) { + + struct timespec end_time; + clock_gettime (CLOCK_REALTIME, &end_time); + + end_time.tv_sec += (time / 1000); + end_time.tv_nsec += (time % 1000) * 1000000; + if (end_time.tv_nsec > 1000000000) { + end_time.tv_nsec -= 1000000000; + end_time.tv_sec += 1; + } + + int res = pthread_cond_timedwait (&m_cond, &m_mutex, &end_time); + if (res == ETIMEDOUT) { + woken = false; + } else if (res) { + tl::error << tr ("Error waiting for pthread Condition (timed)"); + } + + } else { + + if (pthread_cond_wait (&m_cond, &m_mutex) != 0) { + tl::error << tr ("Error waiting for pthread Condition (timed)"); + } + + } + + // transfers the lock back + mutex->lock (); + pthread_mutex_unlock (&m_mutex); + + return woken; + } + + void wake_all () + { + if (pthread_mutex_lock (&m_mutex) == 0) { + pthread_cond_broadcast (&m_cond); + pthread_mutex_unlock (&m_mutex); + } + } + + void wake_one () + { + if (pthread_mutex_lock (&m_mutex) == 0) { + pthread_cond_signal (&m_cond); + pthread_mutex_unlock (&m_mutex); + } + } + +private: + pthread_mutex_t m_mutex; + pthread_cond_t m_cond; + bool m_initialized; +}; + WaitCondition::WaitCondition () { - // @@@ + mp_data = new WaitConditionPrivate (); } -bool WaitCondition::wait (Mutex *mutex, unsigned long /*time*/) +WaitCondition::~WaitCondition () { - mutex->unlock(); - // @@@ - mutex->lock(); - return true; + delete mp_data; + mp_data = 0; +} + +bool WaitCondition::wait (Mutex *mutex, unsigned long time) +{ + return mp_data->wait (mutex, time); } void WaitCondition::wakeAll () { - // @@@ + mp_data->wake_all (); } void WaitCondition::wakeOne () { - // @@@ + mp_data->wake_one (); } // ------------------------------------------------------------------------------- diff --git a/src/tl/tl/tlThreads.h b/src/tl/tl/tlThreads.h index 2d778172f..f8d88263e 100644 --- a/src/tl/tl/tlThreads.h +++ b/src/tl/tl/tlThreads.h @@ -86,14 +86,20 @@ public: #else +class WaitConditionPrivate; + // The non-Qt version is a dummy implementation as threading is not supported (yet) class TL_PUBLIC WaitCondition { public: WaitCondition (); + ~WaitCondition (); bool wait (Mutex * /*mutex*/, unsigned long /*time*/ = std::numeric_limits::max ()); void wakeAll (); void wakeOne (); + +private: + WaitConditionPrivate *mp_data; }; #endif diff --git a/src/tl/unit_tests/tlThreads.cc b/src/tl/unit_tests/tlThreads.cc index 05553bef1..5e41bb48d 100644 --- a/src/tl/unit_tests/tlThreads.cc +++ b/src/tl/unit_tests/tlThreads.cc @@ -258,6 +258,7 @@ public: while (m_value < 10000000) { m_value += s_mythread2_increment; if (m_value == m_nstop) { + tl::MutexLocker locker (&s_wait_mutex); m_stopped = true; s_condition.wait (&s_wait_mutex); m_stopped = false; @@ -280,7 +281,15 @@ TEST(4_wakeAll) thr1.start (); thr2.start (); - while (! thr1.stopped () || ! thr2.stopped ()) { + while (true) { + bool res; + { + tl::MutexLocker locker (&s_wait_mutex); + res = thr1.stopped () && thr2.stopped (); + } + if (res) { + break; + } EXPECT_EQ (thr1.isRunning (), true); EXPECT_EQ (thr2.isRunning (), true); tl_assert (thr1.isRunning () && thr2.isRunning ()); @@ -305,7 +314,15 @@ TEST(4_wakeOne) thr1.start (); thr2.start (); - while (! thr1.stopped () || ! thr2.stopped ()) { + while (true) { + bool res; + { + tl::MutexLocker locker (&s_wait_mutex); + res = thr1.stopped () && thr2.stopped (); + } + if (res) { + break; + } EXPECT_EQ (thr1.isRunning (), true); EXPECT_EQ (thr2.isRunning (), true); tl_assert (thr1.isRunning () && thr2.isRunning ());