From: Martin Willi Date: Wed, 17 Jul 2013 14:07:47 +0000 (+0200) Subject: watcher: properly support multiple watch callback types for the same FD X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=564efd5beac431018c4073086cd67996faf5934b;p=thirdparty%2Fstrongswan.git watcher: properly support multiple watch callback types for the same FD --- diff --git a/src/libstrongswan/processing/watcher.c b/src/libstrongswan/processing/watcher.c index 85639ad084..7eaa044bbd 100644 --- a/src/libstrongswan/processing/watcher.c +++ b/src/libstrongswan/processing/watcher.c @@ -58,6 +58,11 @@ struct private_watcher_t { * Notification pipe to signal watcher thread */ int notify[2]; + + /** + * List of callback jobs to process by watcher thread, as job_t + */ + linked_list_t *jobs; }; /** @@ -72,8 +77,8 @@ typedef struct { watcher_cb_t cb; /** user data to pass to callback */ void *data; - /** callback currently active? */ - bool active; + /** callback(s) currently active? */ + int in_callback; } entry_t; /** @@ -155,7 +160,7 @@ static void notify_end(notify_data_t *data) break; } } - entry->active = TRUE; + entry->in_callback--; break; } } @@ -171,8 +176,8 @@ static void notify_end(notify_data_t *data) /** * Execute the callback for a registered FD */ -static job_t* notify(private_watcher_t *this, entry_t *entry, - watcher_event_t event) +static void notify(private_watcher_t *this, entry_t *entry, + watcher_event_t event) { notify_data_t *data; @@ -188,11 +193,12 @@ static job_t* notify(private_watcher_t *this, entry_t *entry, /* deactivate entry, so we can select() other FDs even if the async * processing did not handle the event yet */ - entry->active = FALSE; + entry->in_callback++; - return (job_t*)callback_job_create_with_prio((void*)notify_async, data, + this->jobs->insert_last(this->jobs, + callback_job_create_with_prio((void*)notify_async, data, (void*)notify_end, (callback_job_cancel_t)return_false, - JOB_PRIO_CRITICAL); + JOB_PRIO_CRITICAL)); } /** @@ -210,7 +216,7 @@ static void activate_all(private_watcher_t *this) enumerator = this->fds->create_enumerator(this->fds); while (enumerator->enumerate(enumerator, &entry)) { - entry->active = TRUE; + entry->in_callback = 0; } enumerator->destroy(enumerator); this->condvar->broadcast(this->condvar); @@ -247,7 +253,7 @@ static job_requeue_t watch(private_watcher_t *this) enumerator = this->fds->create_enumerator(this->fds); while (enumerator->enumerate(enumerator, &entry)) { - if (entry->active) + if (!entry->in_callback) { if (entry->events & WATCHER_READ) { @@ -274,7 +280,7 @@ static job_requeue_t watch(private_watcher_t *this) { char buf[1]; bool old; - job_t *job = NULL; + job_t *job; DBG2(DBG_JOB, "watcher going to select()"); thread_cleanup_push((void*)activate_all, this); @@ -295,38 +301,39 @@ static job_requeue_t watch(private_watcher_t *this) enumerator = this->fds->create_enumerator(this->fds); while (enumerator->enumerate(enumerator, &entry)) { - if (FD_ISSET(entry->fd, &rd)) + if (FD_ISSET(entry->fd, &rd) && (entry->events & WATCHER_READ)) { DBG2(DBG_JOB, "watched FD %d ready to read", entry->fd); - job = notify(this, entry, WATCHER_READ); - break; + notify(this, entry, WATCHER_READ); } - if (FD_ISSET(entry->fd, &wr)) + if (FD_ISSET(entry->fd, &wr) && (entry->events & WATCHER_WRITE)) { DBG2(DBG_JOB, "watched FD %d ready to write", entry->fd); - job = notify(this, entry, WATCHER_WRITE); - break; + notify(this, entry, WATCHER_WRITE); } - if (FD_ISSET(entry->fd, &ex)) + if (FD_ISSET(entry->fd, &ex) && (entry->events & WATCHER_EXCEPT)) { DBG2(DBG_JOB, "watched FD %d has exception", entry->fd); - job = notify(this, entry, WATCHER_EXCEPT); - break; + notify(this, entry, WATCHER_EXCEPT); } } enumerator->destroy(enumerator); this->mutex->unlock(this->mutex); - if (job) + if (this->jobs->get_count(this->jobs)) { - if (lib->processor->get_threads(lib->processor)) + while (this->jobs->remove_first(this->jobs, + (void**)&job) == SUCCESS) { - lib->processor->queue_job(lib->processor, job); - } - else - { - job->execute(job); - job->destroy(job); + if (lib->processor->get_threads(lib->processor)) + { + lib->processor->queue_job(lib->processor, job); + } + else + { + job->execute(job); + job->destroy(job); + } } /* we temporarily disable a notified FD, rebuild FDSET */ return JOB_REQUEUE_DIRECT; @@ -350,7 +357,6 @@ METHOD(watcher_t, add, void, .events = events, .cb = cb, .data = data, - .active = TRUE, ); this->mutex->lock(this->mutex); @@ -384,16 +390,13 @@ METHOD(watcher_t, remove_, void, { if (entry->fd == fd) { - if (entry->active) - { - this->fds->remove_at(this->fds, enumerator); - free(entry); - } - else + if (entry->in_callback) { is_in_callback = TRUE; break; } + this->fds->remove_at(this->fds, enumerator); + free(entry); } } enumerator->destroy(enumerator); @@ -422,6 +425,7 @@ METHOD(watcher_t, destroy, void, { close(this->notify[1]); } + this->jobs->destroy(this->jobs); free(this); } @@ -443,6 +447,7 @@ watcher_t *watcher_create() .fds = linked_list_create(), .mutex = mutex_create(MUTEX_TYPE_DEFAULT), .condvar = condvar_create(CONDVAR_TYPE_DEFAULT), + .jobs = linked_list_create(), .notify[0] = -1, .notify[1] = -1, ); diff --git a/src/libstrongswan/processing/watcher.h b/src/libstrongswan/processing/watcher.h index db7dd4fa87..02d9188f09 100644 --- a/src/libstrongswan/processing/watcher.h +++ b/src/libstrongswan/processing/watcher.h @@ -64,6 +64,9 @@ struct watcher_t { /** * Start watching a new file descriptor. * + * Multiple callbacks can be registered for the same file descriptor, and + * all of them get notified. Such callbacks are executed concurrently. + * * @param fd file descriptor to start watching * @param events ORed set of events to watch * @param cb callback function to invoke on events @@ -75,7 +78,8 @@ struct watcher_t { /** * Stop watching a previously registered file descriptor. * - * This call blocks until any active callback for this FD returns. + * This call blocks until any active callback for this FD returns. All + * callbacks registered for that FD get unregistered. * * @param fd file descriptor to stop watching */