* Notification pipe to signal watcher thread
*/
int notify[2];
+
+ /**
+ * List of callback jobs to process by watcher thread, as job_t
+ */
+ linked_list_t *jobs;
};
/**
watcher_cb_t cb;
/** user data to pass to callback */
void *data;
- /** callback currently active? */
- bool active;
+ /** callback(s) currently active? */
+ int in_callback;
} entry_t;
/**
break;
}
}
- entry->active = TRUE;
+ entry->in_callback--;
break;
}
}
/**
* Execute the callback for a registered FD
*/
-static job_t* notify(private_watcher_t *this, entry_t *entry,
- watcher_event_t event)
+static void notify(private_watcher_t *this, entry_t *entry,
+ watcher_event_t event)
{
notify_data_t *data;
/* deactivate entry, so we can select() other FDs even if the async
* processing did not handle the event yet */
- entry->active = FALSE;
+ entry->in_callback++;
- return (job_t*)callback_job_create_with_prio((void*)notify_async, data,
+ this->jobs->insert_last(this->jobs,
+ callback_job_create_with_prio((void*)notify_async, data,
(void*)notify_end, (callback_job_cancel_t)return_false,
- JOB_PRIO_CRITICAL);
+ JOB_PRIO_CRITICAL));
}
/**
enumerator = this->fds->create_enumerator(this->fds);
while (enumerator->enumerate(enumerator, &entry))
{
- entry->active = TRUE;
+ entry->in_callback = 0;
}
enumerator->destroy(enumerator);
this->condvar->broadcast(this->condvar);
enumerator = this->fds->create_enumerator(this->fds);
while (enumerator->enumerate(enumerator, &entry))
{
- if (entry->active)
+ if (!entry->in_callback)
{
if (entry->events & WATCHER_READ)
{
{
char buf[1];
bool old;
- job_t *job = NULL;
+ job_t *job;
DBG2(DBG_JOB, "watcher going to select()");
thread_cleanup_push((void*)activate_all, this);
enumerator = this->fds->create_enumerator(this->fds);
while (enumerator->enumerate(enumerator, &entry))
{
- if (FD_ISSET(entry->fd, &rd))
+ if (FD_ISSET(entry->fd, &rd) && (entry->events & WATCHER_READ))
{
DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd);
- job = notify(this, entry, WATCHER_READ);
- break;
+ notify(this, entry, WATCHER_READ);
}
- if (FD_ISSET(entry->fd, &wr))
+ if (FD_ISSET(entry->fd, &wr) && (entry->events & WATCHER_WRITE))
{
DBG2(DBG_JOB, "watched FD %d ready to write", entry->fd);
- job = notify(this, entry, WATCHER_WRITE);
- break;
+ notify(this, entry, WATCHER_WRITE);
}
- if (FD_ISSET(entry->fd, &ex))
+ if (FD_ISSET(entry->fd, &ex) && (entry->events & WATCHER_EXCEPT))
{
DBG2(DBG_JOB, "watched FD %d has exception", entry->fd);
- job = notify(this, entry, WATCHER_EXCEPT);
- break;
+ notify(this, entry, WATCHER_EXCEPT);
}
}
enumerator->destroy(enumerator);
this->mutex->unlock(this->mutex);
- if (job)
+ if (this->jobs->get_count(this->jobs))
{
- if (lib->processor->get_threads(lib->processor))
+ while (this->jobs->remove_first(this->jobs,
+ (void**)&job) == SUCCESS)
{
- lib->processor->queue_job(lib->processor, job);
- }
- else
- {
- job->execute(job);
- job->destroy(job);
+ if (lib->processor->get_threads(lib->processor))
+ {
+ lib->processor->queue_job(lib->processor, job);
+ }
+ else
+ {
+ job->execute(job);
+ job->destroy(job);
+ }
}
/* we temporarily disable a notified FD, rebuild FDSET */
return JOB_REQUEUE_DIRECT;
.events = events,
.cb = cb,
.data = data,
- .active = TRUE,
);
this->mutex->lock(this->mutex);
{
if (entry->fd == fd)
{
- if (entry->active)
- {
- this->fds->remove_at(this->fds, enumerator);
- free(entry);
- }
- else
+ if (entry->in_callback)
{
is_in_callback = TRUE;
break;
}
+ this->fds->remove_at(this->fds, enumerator);
+ free(entry);
}
}
enumerator->destroy(enumerator);
{
close(this->notify[1]);
}
+ this->jobs->destroy(this->jobs);
free(this);
}
.fds = linked_list_create(),
.mutex = mutex_create(MUTEX_TYPE_DEFAULT),
.condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
+ .jobs = linked_list_create(),
.notify[0] = -1,
.notify[1] = -1,
);