*/
u_int idle_threads;
+ /**
+ * All threads managed in the pool (including threads that have been
+ * cancelled, this allows to join them during destruction)
+ */
+ linked_list_t *threads;
+
/**
* The jobs are stored in a linked list
*/
{
thread_t *thread;
+ DBG2(DBG_JOB, "terminated worker thread, ID: %u", thread_current_id());
+
/* respawn thread if required */
this->mutex->lock(this->mutex);
- if (this->desired_threads == 0 ||
+ if (this->desired_threads < this->total_threads ||
(thread = thread_create((thread_main_t)process_jobs, this)) == NULL)
{
this->total_threads--;
- this->thread_terminated->broadcast(this->thread_terminated);
+ this->thread_terminated->signal(this->thread_terminated);
}
else
{
- thread->detach(thread);
+ this->threads->insert_last(this->threads, thread);
}
this->mutex->unlock(this->mutex);
}
*/
static void process_jobs(private_processor_t *this)
{
- bool oldstate;
+ /* worker threads are not cancellable by default */
+ thread_cancelability(FALSE);
- oldstate = thread_cancelability(FALSE);
-
- DBG2(DBG_JOB, "started worker thread, thread_ID: %u", thread_current_id());
+ DBG2(DBG_JOB, "started worker thread, ID: %u", thread_current_id());
this->mutex->lock(this->mutex);
while (this->desired_threads >= this->total_threads)
thread_cleanup_pop(FALSE);
this->mutex->lock(this->mutex);
}
- this->total_threads--;
- this->thread_terminated->signal(this->thread_terminated);
this->mutex->unlock(this->mutex);
+ restart(this);
}
/**
current = thread_create((thread_main_t)process_jobs, this);
if (current)
{
- current->detach(current);
+ this->threads->insert_last(this->threads, current);
this->total_threads++;
}
}
*/
static void destroy(private_processor_t *this)
{
+ thread_t *current;
set_threads(this, 0);
this->mutex->lock(this->mutex);
while (this->total_threads > 0)
this->job_added->broadcast(this->job_added);
this->thread_terminated->wait(this->thread_terminated, this->mutex);
}
+ while (this->threads->remove_first(this->threads,
+ (void**)¤t) == SUCCESS)
+ {
+ current->join(current);
+ }
this->mutex->unlock(this->mutex);
this->thread_terminated->destroy(this->thread_terminated);
this->job_added->destroy(this->job_added);
this->mutex->destroy(this->mutex);
this->list->destroy_offset(this->list, offsetof(job_t, destroy));
+ this->threads->destroy(this->threads);
free(this);
}
this->public.destroy = (void(*)(processor_t*))destroy;
this->list = linked_list_create();
+ this->threads = linked_list_create();
this->mutex = mutex_create(MUTEX_TYPE_DEFAULT);
this->job_added = condvar_create(CONDVAR_TYPE_DEFAULT);
this->thread_terminated = condvar_create(CONDVAR_TYPE_DEFAULT);