/**
* Execute the callback for a registered FD
*/
-static bool notify(private_watcher_t *this, entry_t *entry,
- watcher_event_t event)
+static job_t* notify(private_watcher_t *this, entry_t *entry,
+ watcher_event_t event)
{
notify_data_t *data;
* processing did not handle the event yet */
entry->active = FALSE;
- lib->processor->queue_job(lib->processor,
- (job_t*)callback_job_create_with_prio((void*)notify_async, data,
+ return (job_t*)callback_job_create_with_prio((void*)notify_async, data,
(void*)notify_end, (callback_job_cancel_t)return_false,
- JOB_PRIO_CRITICAL));
- return TRUE;
+ JOB_PRIO_CRITICAL);
}
/**
while (TRUE)
{
char buf[1];
- bool old, notified = FALSE;
+ bool old;
+ job_t *job = NULL;
thread_cleanup_push((void*)activate_all, this);
old = thread_cancelability(TRUE);
{
if (FD_ISSET(entry->fd, &rd))
{
- notified = notify(this, entry, WATCHER_READ);
+ job = notify(this, entry, WATCHER_READ);
break;
}
if (FD_ISSET(entry->fd, &wr))
{
- notified = notify(this, entry, WATCHER_WRITE);
+ job = notify(this, entry, WATCHER_WRITE);
break;
}
if (FD_ISSET(entry->fd, &ex))
{
- notified = notify(this, entry, WATCHER_EXCEPT);
+ job = notify(this, entry, WATCHER_EXCEPT);
break;
}
}
enumerator->destroy(enumerator);
this->mutex->unlock(this->mutex);
- if (notified)
+ if (job)
{
+ if (lib->processor->get_threads(lib->processor))
+ {
+ lib->processor->queue_job(lib->processor, job);
+ }
+ else
+ {
+ job->execute(job);
+ job->destroy(job);
+ }
/* we temporarily disable a notified FD, rebuild FDSET */
return JOB_REQUEUE_DIRECT;
}