Enhancements for sync mode in threaded workers: allows providing progress too.

This commit is contained in:
Matthias Koefferlein 2021-03-13 10:11:27 +01:00
parent 9e474e4cc2
commit f323c830d7
2 changed files with 83 additions and 24 deletions

View File

@ -159,6 +159,16 @@ TaskList::put_front (Task *task)
}
}
size_t
TaskList::size () const
{
size_t n = 0;
for (Task *t = mp_first; t; t = t->mp_next) {
++n;
}
return n;
}
// -----------------------------------------------------------------------------
// tl::JobBase implementation
@ -264,6 +274,11 @@ JobBase::start ()
mp_workers.back ()->start (this, int (mp_workers.size ()) - 1);
}
while (m_nworkers < int (mp_workers.size ())) {
delete mp_workers.back ();
mp_workers.pop_back ();
}
for (int i = 0; i < int (mp_workers.size ()); ++i) {
setup_worker (mp_workers [i]);
mp_workers [i]->reset_stop_request ();
@ -278,39 +293,59 @@ JobBase::start ()
std::unique_ptr <Worker> sync_worker (create_worker ());
setup_worker (sync_worker.get ());
while (! m_task_list.is_empty ()) {
std::unique_ptr<Task> task (m_task_list.fetch ());
try {
sync_worker->perform_task (task.get ());
} catch (TaskTerminatedException) {
// Stop the thread.
break;
} catch (WorkerTerminatedException) {
// Stop the thread.
break;
} catch (tl::Exception &ex) {
log_error (ex.msg ());
} catch (std::exception &ex) {
log_error (ex.what ());
} catch (...) {
log_error (tl::to_string (tr ("Unspecific error")));
}
}
// clean up any remaining tasks
while (! m_task_list.is_empty ()) {
Task *task = m_task_list.fetch ();
if (task) {
delete task;
try {
while (! m_task_list.is_empty ()) {
std::unique_ptr<Task> task (m_task_list.fetch ());
before_sync_task (task.get ());
try {
sync_worker->perform_task (task.get ());
} catch (TaskTerminatedException) {
// Stop the thread.
break;
} catch (WorkerTerminatedException) {
// Stop the thread.
break;
} catch (tl::Exception &ex) {
log_error (ex.msg ());
} catch (std::exception &ex) {
log_error (ex.what ());
} catch (...) {
log_error (tl::to_string (tr ("Unspecific error")));
}
after_sync_task (task.get ());
}
} catch (...) {
// handle exceptions raised by before_sync_task or after_sync_task
cleanup ();
m_running = false;
throw;
}
cleanup ();
finished ();
m_running = false;
}
}
void
JobBase::cleanup ()
{
// clean up any remaining tasks
while (! m_task_list.is_empty ()) {
Task *task = m_task_list.fetch ();
if (task) {
delete task;
}
}
}
bool
JobBase::is_running ()
{

View File

@ -109,6 +109,11 @@ public:
return mp_first;
}
/**
* @brief Gets the number of tasks
*/
size_t size () const;
private:
Task *mp_first, *mp_last;
@ -163,6 +168,14 @@ public:
*/
void schedule (Task *task);
/**
* @brief Gets the number of tasks in the queue
*/
size_t tasks () const
{
return m_task_list.size ();
}
/**
* @brief Start the execution of the job
*/
@ -227,6 +240,16 @@ protected:
*/
virtual void setup_worker (Worker * /*worker*/) { }
/**
* @brief This method is called before the given task is started in sync mode (workers == 0)
*/
virtual void before_sync_task (Task * /*task*/) { }
/**
* @brief This method is called after the given task has finished in sync mode (workers == 0)
*/
virtual void after_sync_task (Task * /*task*/) { }
/**
* @brief Indicates that the job has finished
*
@ -276,6 +299,7 @@ private:
Task *get_task (int for_worker);
void log_error (const std::string &s);
void cleanup ();
};
/**