diff --git a/src/tl/tl/tlThreads.cc b/src/tl/tl/tlThreads.cc index 0c9b782d9..dc85cce1b 100644 --- a/src/tl/tl/tlThreads.cc +++ b/src/tl/tl/tlThreads.cc @@ -28,6 +28,7 @@ #include #include +#include namespace tl { @@ -90,6 +91,8 @@ Thread::Thread () Thread::~Thread () { + terminate (); + wait (); delete mp_data; mp_data = 0; } @@ -97,7 +100,6 @@ Thread::~Thread () void Thread::do_run () { try { - mp_data->running = true; run (); mp_data->running = false; } catch (tl::Exception &ex) { @@ -143,11 +145,10 @@ void Thread::start () return; } - if (! mp_data->initialized) { - mp_data->initialized = true; - if (pthread_create (&mp_data->pthread, NULL, &start_thread, (void *) this) != 0) { - tl::error << tr ("Failed to create thread"); - } + mp_data->initialized = true; + mp_data->running = true; + if (pthread_create (&mp_data->pthread, NULL, &start_thread, (void *) this) != 0) { + tl::error << tr ("Failed to create thread"); } } @@ -158,17 +159,42 @@ void Thread::terminate () } } -bool Thread::wait (unsigned long /*time*/) +bool Thread::wait (unsigned long time) { - if (! mp_data->initialized) { + if (! isRunning ()) { return true; } - // @@@ TODO: timed join - if (pthread_join (mp_data->pthread, &mp_data->return_code) != 0) { - tl::error << tr ("Could not join threads"); + if (time < std::numeric_limits::max ()) { + + struct timespec end_time; + clock_gettime (CLOCK_REALTIME, &end_time); + + end_time.tv_sec += (time / 1000); + end_time.tv_nsec += (time % 1000) * 1000000; + if (end_time.tv_nsec > 1000000000) { + end_time.tv_nsec -= 1000000000; + end_time.tv_sec += 1; + } + + int res = pthread_timedjoin_np (mp_data->pthread, &mp_data->return_code, &end_time); + if (res == ETIMEDOUT) { + return false; + } else if (res) { + tl::error << tr ("Could not join threads"); + } + + return true; + + } else { + + if (pthread_join (mp_data->pthread, &mp_data->return_code) != 0) { + tl::error << tr ("Could not join threads"); + } + + return true; + } - return true; } // ------------------------------------------------------------------------------- diff --git a/src/tl/unit_tests/tlThreads.cc b/src/tl/unit_tests/tlThreads.cc index bef5b19d7..05553bef1 100644 --- a/src/tl/unit_tests/tlThreads.cc +++ b/src/tl/unit_tests/tlThreads.cc @@ -46,6 +46,12 @@ public: return m_value; } + void reset () + { + m_value = 0; + m_stop = false; + } + void run () { for (int i = 0; i < 10 && !m_stop; ++i) { @@ -69,15 +75,250 @@ private: }; // basic: concurrency, ability to stop async, wait -TEST(1) +TEST(1_basic) { MyThread my_thread; + + EXPECT_EQ (my_thread.isRunning (), false); + EXPECT_EQ (my_thread.isFinished (), false); + my_thread.start (); + + EXPECT_EQ (my_thread.isRunning (), true); + EXPECT_EQ (my_thread.isFinished (), false); + while (my_thread.value () < 5) ; + my_thread.stop (); my_thread.wait (); + + EXPECT_EQ (my_thread.isRunning (), false); + EXPECT_EQ (my_thread.isFinished (), true); + + my_thread.reset (); + my_thread.start (); + + EXPECT_EQ (my_thread.isRunning (), true); + EXPECT_EQ (my_thread.isFinished (), false); + + while (my_thread.value () < 5) + ; + + my_thread.stop (); + my_thread.wait (); + + EXPECT_EQ (my_thread.isRunning (), false); + EXPECT_EQ (my_thread.isFinished (), true); + // stopped before 10 and after 5 EXPECT_EQ (my_thread.value () >= 5 && my_thread.value () < 10, true); } +// basic: thread dtor while running +TEST(1_brute_shutdown) +{ + MyThread my_thread; + my_thread.start (); + EXPECT_EQ (true, true); // makes the compiler happy +} + +// basic: concurrency, ability to stop async, wait +TEST(1_timed_wait) +{ + MyThread my_thread; + my_thread.start (); + + EXPECT_EQ (my_thread.wait (1), false); + while (my_thread.value () < 5) { + EXPECT_EQ (my_thread.wait (1), false); + } + + EXPECT_EQ (my_thread.wait (100000 /*"enough"*/), true); +} + +int s_mythread2_increment = 1; + +class MyThread2 : public tl::Thread +{ +public: + MyThread2 (bool locked) : m_value (0), m_locked (locked) { } + + int value () + { + return m_value; + } + + void run () + { + if (m_locked) { + for (int i = 0; i < 10000000; ++i) { + tl::MutexLocker locker (&m_lock); + // Do it more elaborate than ++m_value to prevent compiler optimization + m_value += s_mythread2_increment; + } + } else { + for (int i = 0; i < 10000000; ++i) { + m_value += s_mythread2_increment; + } + } + } + +private: + int m_value; + tl::Mutex m_lock; + bool m_locked; +}; + +// Heavily loaded mutex +TEST(2_locked) +{ + MyThread2 my_thread (true); + my_thread.start (); + // two times - once in the background and once in the main thread + my_thread.run (); + my_thread.wait (); + EXPECT_EQ (my_thread.value (), 20000000); +} + +// Cross-check: unlocked +TEST(2_nonlocked) +{ + MyThread2 my_thread (false); + my_thread.start (); + // two times - once in the background and once in the main thread + my_thread.run (); + my_thread.wait (); + EXPECT_EQ (my_thread.value () < 20000000, true); +} + +static tl::ThreadStorage s_tls; + +class MyThread3 : public tl::Thread +{ +public: + MyThread3 () : m_value (0) { } + + int value () + { + return m_value; + } + + void run () + { + m_value = do_run (10000000); + } + + int do_run (int n) + { + s_tls.setLocalData (0); + for (int i = 0; i < n; ++i) { + s_tls.localData () += s_mythread2_increment; + } + return s_tls.localData (); + } + +private: + int m_value; +}; + +// Thread-local storage +TEST(3) +{ + MyThread3 my_thread; + my_thread.start (); + // While we start the loop inside the thread we run it outside. Since + // the counter is TLS, both loops will to the same but with different data. + // A mutex is not involved. + EXPECT_EQ (my_thread.do_run (9999999), 9999999); + my_thread.wait (); + EXPECT_EQ (my_thread.value (), 10000000); +} + +static tl::WaitCondition s_condition; +static tl::Mutex s_wait_mutex; + +class MyThread4 : public tl::Thread +{ +public: + MyThread4 (int nstop) : m_value (0), m_nstop (nstop), m_stopped (false) { } + + int value () + { + return m_value; + } + + bool stopped () + { + return m_stopped; + } + + void run () + { + while (m_value < 10000000) { + m_value += s_mythread2_increment; + if (m_value == m_nstop) { + m_stopped = true; + s_condition.wait (&s_wait_mutex); + m_stopped = false; + } + } + } + +private: + int m_value; + int m_nstop; + bool m_stopped; +}; + + +// WaitCondition +TEST(4_wakeAll) +{ + MyThread4 thr1 (3000000), thr2 (7000000); + + thr1.start (); + thr2.start (); + + while (! thr1.stopped () || ! thr2.stopped ()) { + EXPECT_EQ (thr1.isRunning (), true); + EXPECT_EQ (thr2.isRunning (), true); + tl_assert (thr1.isRunning () && thr2.isRunning ()); + } + + EXPECT_EQ (thr1.value (), 3000000); + EXPECT_EQ (thr2.value (), 7000000); + + s_condition.wakeAll (); + thr1.wait (); + thr2.wait (); + + EXPECT_EQ (thr1.value (), 10000000); + EXPECT_EQ (thr2.value (), 10000000); +} + +// WaitCondition with two wakeOne +TEST(4_wakeOne) +{ + MyThread4 thr1 (3000000), thr2 (7000000); + + thr1.start (); + thr2.start (); + + while (! thr1.stopped () || ! thr2.stopped ()) { + EXPECT_EQ (thr1.isRunning (), true); + EXPECT_EQ (thr2.isRunning (), true); + tl_assert (thr1.isRunning () && thr2.isRunning ()); + } + + EXPECT_EQ (thr1.value (), 3000000); + EXPECT_EQ (thr2.value (), 7000000); + + s_condition.wakeOne (); + s_condition.wakeOne (); + thr1.wait (); + thr2.wait (); + + EXPECT_EQ (thr1.value (), 10000000); + EXPECT_EQ (thr2.value (), 10000000); +}