} worker_thread_t;
-static void process_jobs(worker_thread_t *worker);
-
-/**
- * restart a terminated thread
- */
-static void restart(worker_thread_t *worker)
-{
- private_processor_t *this = worker->processor;
- job_t *job;
-
- DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id());
-
- this->mutex->lock(this->mutex);
- /* cleanup worker thread */
- this->working_threads[worker->priority]--;
- worker->job->status = JOB_STATUS_CANCELED;
- job = worker->job;
- /* unset the job before releasing the mutex, otherwise cancel() might
- * interfere */
- worker->job = NULL;
- /* release mutex to avoid deadlocks if the same lock is required
- * during queue_job() and in the destructor called here */
- this->mutex->unlock(this->mutex);
- job->destroy(job);
- this->mutex->lock(this->mutex);
-
- /* respawn thread if required */
- if (this->desired_threads >= this->total_threads)
- {
- worker_thread_t *new_worker;
-
- INIT(new_worker,
- .processor = this,
- );
- new_worker->thread = thread_create((thread_main_t)process_jobs,
- new_worker);
- if (new_worker->thread)
- {
- this->threads->insert_last(this->threads, new_worker);
- this->mutex->unlock(this->mutex);
- return;
- }
- free(new_worker);
- }
- this->total_threads--;
- this->thread_terminated->signal(this->thread_terminated);
- this->mutex->unlock(this->mutex);
-}
+static void process_job(private_processor_t *this, worker_thread_t *worker);
/**
* Get number of idle threads, non-locking variant
return FALSE;
}
+CALLBACK(process_jobs, void*,
+ worker_thread_t *worker)
+{
+ private_processor_t *this = worker->processor;
+
+ /* worker threads are not cancelable by default */
+ thread_cancelability(FALSE);
+
+ DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id());
+
+ this->mutex->lock(this->mutex);
+ while (this->desired_threads >= this->total_threads)
+ {
+ if (get_job(this, worker))
+ {
+ process_job(this, worker);
+ }
+ else
+ {
+ this->job_added->wait(this->job_added, this->mutex);
+ }
+ }
+ this->total_threads--;
+ this->thread_terminated->signal(this->thread_terminated);
+ this->mutex->unlock(this->mutex);
+ return NULL;
+}
+
+/**
+ * restart a terminated thread
+ */
+static void restart(worker_thread_t *worker)
+{
+ private_processor_t *this = worker->processor;
+ job_t *job;
+
+ DBG2(DBG_JOB, "terminated worker thread %.2u", thread_current_id());
+
+ this->mutex->lock(this->mutex);
+ /* cleanup worker thread */
+ this->working_threads[worker->priority]--;
+ worker->job->status = JOB_STATUS_CANCELED;
+ job = worker->job;
+ /* unset the job before releasing the mutex, otherwise cancel() might
+ * interfere */
+ worker->job = NULL;
+ /* release mutex to avoid deadlocks if the same lock is required
+ * during queue_job() and in the destructor called here */
+ this->mutex->unlock(this->mutex);
+ job->destroy(job);
+ this->mutex->lock(this->mutex);
+
+ /* respawn thread if required */
+ if (this->desired_threads >= this->total_threads)
+ {
+ worker_thread_t *new_worker;
+
+ INIT(new_worker,
+ .processor = this,
+ );
+ new_worker->thread = thread_create(process_jobs, new_worker);
+ if (new_worker->thread)
+ {
+ this->threads->insert_last(this->threads, new_worker);
+ this->mutex->unlock(this->mutex);
+ return;
+ }
+ free(new_worker);
+ }
+ this->total_threads--;
+ this->thread_terminated->signal(this->thread_terminated);
+ this->mutex->unlock(this->mutex);
+}
+
/**
* Process a single job (provided in worker->job, worker->priority is also
* expected to be set)
}
}
-/**
- * Process queued jobs, called by the worker threads
- */
-static void process_jobs(worker_thread_t *worker)
-{
- private_processor_t *this = worker->processor;
-
- /* worker threads are not cancelable by default */
- thread_cancelability(FALSE);
-
- DBG2(DBG_JOB, "started worker thread %.2u", thread_current_id());
-
- this->mutex->lock(this->mutex);
- while (this->desired_threads >= this->total_threads)
- {
- if (get_job(this, worker))
- {
- process_job(this, worker);
- }
- else
- {
- this->job_added->wait(this->job_added, this->mutex);
- }
- }
- this->total_threads--;
- this->thread_terminated->signal(this->thread_terminated);
- this->mutex->unlock(this->mutex);
-}
-
METHOD(processor_t, get_total_threads, u_int,
private_processor_t *this)
{
INIT(worker,
.processor = this,
);
- worker->thread = thread_create((thread_main_t)process_jobs, worker);
+ worker->thread = thread_create(process_jobs, worker);
if (worker->thread)
{
this->threads->insert_last(this->threads, worker);