diff --git a/src/tl/tl/tlThreadedWorkers.cc b/src/tl/tl/tlThreadedWorkers.cc index 48ddbabd0..01f31ce7d 100644 --- a/src/tl/tl/tlThreadedWorkers.cc +++ b/src/tl/tl/tlThreadedWorkers.cc @@ -159,6 +159,16 @@ TaskList::put_front (Task *task) } } +size_t +TaskList::size () const +{ + size_t n = 0; + for (Task *t = mp_first; t; t = t->mp_next) { + ++n; + } + return n; +} + // ----------------------------------------------------------------------------- // tl::JobBase implementation @@ -264,6 +274,11 @@ JobBase::start () mp_workers.back ()->start (this, int (mp_workers.size ()) - 1); } + while (m_nworkers < int (mp_workers.size ())) { + delete mp_workers.back (); + mp_workers.pop_back (); + } + for (int i = 0; i < int (mp_workers.size ()); ++i) { setup_worker (mp_workers [i]); mp_workers [i]->reset_stop_request (); @@ -278,39 +293,59 @@ JobBase::start () std::unique_ptr sync_worker (create_worker ()); setup_worker (sync_worker.get ()); - while (! m_task_list.is_empty ()) { - std::unique_ptr task (m_task_list.fetch ()); - try { - sync_worker->perform_task (task.get ()); - } catch (TaskTerminatedException) { - // Stop the thread. - break; - } catch (WorkerTerminatedException) { - // Stop the thread. - break; - } catch (tl::Exception &ex) { - log_error (ex.msg ()); - } catch (std::exception &ex) { - log_error (ex.what ()); - } catch (...) { - log_error (tl::to_string (tr ("Unspecific error"))); - } - } - - // clean up any remaining tasks - while (! m_task_list.is_empty ()) { - Task *task = m_task_list.fetch (); - if (task) { - delete task; + try { + + while (! m_task_list.is_empty ()) { + + std::unique_ptr task (m_task_list.fetch ()); + before_sync_task (task.get ()); + + try { + sync_worker->perform_task (task.get ()); + } catch (TaskTerminatedException) { + // Stop the thread. + break; + } catch (WorkerTerminatedException) { + // Stop the thread. + break; + } catch (tl::Exception &ex) { + log_error (ex.msg ()); + } catch (std::exception &ex) { + log_error (ex.what ()); + } catch (...) { + log_error (tl::to_string (tr ("Unspecific error"))); + } + + after_sync_task (task.get ()); + } + + } catch (...) { + // handle exceptions raised by before_sync_task or after_sync_task + cleanup (); + m_running = false; + throw; } + cleanup (); finished (); m_running = false; } } +void +JobBase::cleanup () +{ + // clean up any remaining tasks + while (! m_task_list.is_empty ()) { + Task *task = m_task_list.fetch (); + if (task) { + delete task; + } + } +} + bool JobBase::is_running () { diff --git a/src/tl/tl/tlThreadedWorkers.h b/src/tl/tl/tlThreadedWorkers.h index 48eab9dbc..172ea8b02 100644 --- a/src/tl/tl/tlThreadedWorkers.h +++ b/src/tl/tl/tlThreadedWorkers.h @@ -109,6 +109,11 @@ public: return mp_first; } + /** + * @brief Gets the number of tasks + */ + size_t size () const; + private: Task *mp_first, *mp_last; @@ -163,6 +168,14 @@ public: */ void schedule (Task *task); + /** + * @brief Gets the number of tasks in the queue + */ + size_t tasks () const + { + return m_task_list.size (); + } + /** * @brief Start the execution of the job */ @@ -227,6 +240,16 @@ protected: */ virtual void setup_worker (Worker * /*worker*/) { } + /** + * @brief This method is called before the given task is started in sync mode (workers == 0) + */ + virtual void before_sync_task (Task * /*task*/) { } + + /** + * @brief This method is called after the given task has finished in sync mode (workers == 0) + */ + virtual void after_sync_task (Task * /*task*/) { } + /** * @brief Indicates that the job has finished * @@ -276,6 +299,7 @@ private: Task *get_task (int for_worker); void log_error (const std::string &s); + void cleanup (); }; /**