From: Martin Willi Date: Mon, 24 Jun 2013 12:58:01 +0000 (+0200) Subject: watcher: add a centralized an generic facility to monitor file descriptors X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=56a9275a81cbab8c803cb5ef85bd86e19f355b3d;p=thirdparty%2Fstrongswan.git watcher: add a centralized an generic facility to monitor file descriptors --- diff --git a/src/libstrongswan/Android.mk b/src/libstrongswan/Android.mk index 75b501fddc..b731a591da 100644 --- a/src/libstrongswan/Android.mk +++ b/src/libstrongswan/Android.mk @@ -28,7 +28,7 @@ networking/host.c networking/host_resolver.c networking/packet.c \ networking/tun_device.c \ pen/pen.c plugins/plugin_loader.c plugins/plugin_feature.c processing/jobs/job.c \ processing/jobs/callback_job.c processing/processor.c processing/scheduler.c \ -resolver/resolver_manager.c resolver/rr_set.c \ +processing/watcher.c resolver/resolver_manager.c resolver/rr_set.c \ selectors/traffic_selector.c threading/thread.c threading/thread_value.c \ threading/mutex.c threading/semaphore.c threading/rwlock.c threading/spinlock.c \ utils/utils.c utils/chunk.c utils/debug.c utils/enum.c utils/identification.c \ @@ -110,4 +110,3 @@ LOCAL_PRELINK_MODULE := false LOCAL_SHARED_LIBRARIES += libdl libvstr include $(BUILD_SHARED_LIBRARY) - diff --git a/src/libstrongswan/Makefile.am b/src/libstrongswan/Makefile.am index 567bdfe6f1..96e775f19c 100644 --- a/src/libstrongswan/Makefile.am +++ b/src/libstrongswan/Makefile.am @@ -26,7 +26,7 @@ networking/host.c networking/host_resolver.c networking/packet.c \ networking/tun_device.c \ pen/pen.c plugins/plugin_loader.c plugins/plugin_feature.c processing/jobs/job.c \ processing/jobs/callback_job.c processing/processor.c processing/scheduler.c \ -resolver/resolver_manager.c resolver/rr_set.c \ +processing/watcher.c resolver/resolver_manager.c resolver/rr_set.c \ selectors/traffic_selector.c threading/thread.c threading/thread_value.c \ threading/mutex.c threading/semaphore.c threading/rwlock.c threading/spinlock.c \ utils/utils.c utils/chunk.c utils/debug.c utils/enum.c utils/identification.c \ @@ -69,7 +69,7 @@ resolver/resolver.h resolver/resolver_response.h resolver/rr_set.h \ resolver/rr.h resolver/resolver_manager.h \ plugins/plugin_loader.h plugins/plugin.h plugins/plugin_feature.h \ processing/jobs/job.h processing/jobs/callback_job.h processing/processor.h \ -processing/scheduler.h selectors/traffic_selector.h \ +processing/scheduler.h processing/watcher.h selectors/traffic_selector.h \ threading/thread.h threading/thread_value.h \ threading/mutex.h threading/condvar.h threading/spinlock.h threading/semaphore.h \ threading/rwlock.h threading/rwlock_condvar.h threading/lock_profiler.h \ diff --git a/src/libstrongswan/library.c b/src/libstrongswan/library.c index 05d984b189..35d74200c3 100644 --- a/src/libstrongswan/library.c +++ b/src/libstrongswan/library.c @@ -80,6 +80,7 @@ void library_deinit() /* make sure the cache is clear before unloading plugins */ lib->credmgr->flush_cache(lib->credmgr, CERT_ANY); + this->public.watcher->destroy(this->public.watcher); this->public.scheduler->destroy(this->public.scheduler); this->public.processor->destroy(this->public.processor); this->public.plugins->destroy(this->public.plugins); @@ -266,6 +267,7 @@ bool library_init(char *settings) this->public.db = database_factory_create(); this->public.processor = processor_create(); this->public.scheduler = scheduler_create(); + this->public.watcher = watcher_create(); this->public.plugins = plugin_loader_create(); if (!check_memwipe()) diff --git a/src/libstrongswan/library.h b/src/libstrongswan/library.h index 1168da8fdb..d5497258aa 100644 --- a/src/libstrongswan/library.h +++ b/src/libstrongswan/library.h @@ -92,6 +92,7 @@ #include "networking/host_resolver.h" #include "processing/processor.h" #include "processing/scheduler.h" +#include "processing/watcher.h" #include "crypto/crypto_factory.h" #include "crypto/proposal/proposal_keywords.h" #include "fetcher/fetcher_manager.h" @@ -196,6 +197,11 @@ struct library_t { */ scheduler_t *scheduler; + /** + * File descriptor monitoring + */ + watcher_t *watcher; + /** * resolve hosts by DNS name */ diff --git a/src/libstrongswan/processing/watcher.c b/src/libstrongswan/processing/watcher.c new file mode 100644 index 0000000000..7ccac72bc1 --- /dev/null +++ b/src/libstrongswan/processing/watcher.c @@ -0,0 +1,396 @@ +/* + * Copyright (C) 2013 Martin Willi + * Copyright (C) 2013 revosec AG + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See . + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#include "watcher.h" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +typedef struct private_watcher_t private_watcher_t; + +/** + * Private data of an watcher_t object. + */ +struct private_watcher_t { + + /** + * Public watcher_t interface. + */ + watcher_t public; + + /** + * List of registered FDs, as entry_t + */ + linked_list_t *fds; + + /** + * Lock to access FD list + */ + mutex_t *mutex; + + /** + * Condvar to signal completion of callback + */ + condvar_t *condvar; + + /** + * Notification pipe to signal watcher thread + */ + int notify[2]; +}; + +/** + * Entry for a registered file descriptor + */ +typedef struct { + /** file descriptor */ + int fd; + /** events to watch */ + watcher_event_t events; + /** registered callback function */ + watcher_cb_t cb; + /** user data to pass to callback */ + void *data; + /** callback currently active? */ + bool active; +} entry_t; + +/** + * Data we pass on for an async notification + */ +typedef struct { + /** file descriptor */ + int fd; + /** event type */ + watcher_event_t event; + /** registered callback function */ + watcher_cb_t cb; + /** user data to pass to callback */ + void *data; + /** keep registered? */ + bool keep; + /** reference to watcher */ + private_watcher_t *this; +} notify_data_t; + +/** + * Notify watcher thread about changes + */ +static void update(private_watcher_t *this) +{ + char buf[1] = { 'u' }; + + if (this->notify[1] != -1) + { + ignore_result(write(this->notify[1], buf, sizeof(buf))); + } +} + + /** + * Execute callback of registered FD, asynchronous + */ +static job_requeue_t notify_async(notify_data_t *data) +{ + data->keep = data->cb(data->data, data->fd, data->event); + return JOB_REQUEUE_NONE; +} + +/** + * Clean up notification data, reactivate FD + */ +static void notify_end(notify_data_t *data) +{ + private_watcher_t *this = data->this; + enumerator_t *enumerator; + entry_t *entry; + + /* reactivate the disabled entry */ + this->mutex->lock(this->mutex); + enumerator = this->fds->create_enumerator(this->fds); + while (enumerator->enumerate(enumerator, &entry)) + { + if (entry->fd == data->fd) + { + if (!data->keep) + { + entry->events &= ~data->event; + if (!entry->events) + { + this->fds->remove_at(this->fds, enumerator); + free(entry); + break; + } + } + entry->active = TRUE; + break; + } + } + enumerator->destroy(enumerator); + + update(this); + this->condvar->broadcast(this->condvar); + this->mutex->unlock(this->mutex); + + free(data); +} + +/** + * Execute the callback for a registered FD + */ +static bool notify(private_watcher_t *this, entry_t *entry, + watcher_event_t event) +{ + notify_data_t *data; + + /* get a copy of entry for async job, but with specific event */ + INIT(data, + .fd = entry->fd, + .event = event, + .cb = entry->cb, + .data = entry->data, + .keep = TRUE, + .this = this, + ); + + /* deactivate entry, so we can select() other FDs even if the async + * processing did not handle the event yet */ + entry->active = FALSE; + + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create_with_prio((void*)notify_async, data, + (void*)notify_end, (callback_job_cancel_t)return_false, + JOB_PRIO_CRITICAL)); + return TRUE; +} + +/** + * Dispatching function + */ +static job_requeue_t watch(private_watcher_t *this) +{ + enumerator_t *enumerator; + entry_t *entry; + fd_set rd, wr, ex; + int maxfd = 0, res; + + FD_ZERO(&rd); + FD_ZERO(&wr); + FD_ZERO(&ex); + + this->mutex->lock(this->mutex); + if (this->fds->get_count(this->fds) == 0) + { + this->mutex->unlock(this->mutex); + return JOB_REQUEUE_NONE; + } + + if (this->notify[0] != -1) + { + FD_SET(this->notify[0], &rd); + maxfd = this->notify[0]; + } + + enumerator = this->fds->create_enumerator(this->fds); + while (enumerator->enumerate(enumerator, &entry)) + { + if (entry->active) + { + if (entry->events & WATCHER_READ) + { + FD_SET(entry->fd, &rd); + } + if (entry->events & WATCHER_WRITE) + { + FD_SET(entry->fd, &wr); + } + if (entry->events & WATCHER_EXCEPT) + { + FD_SET(entry->fd, &ex); + } + maxfd = max(maxfd, entry->fd); + } + } + enumerator->destroy(enumerator); + this->mutex->unlock(this->mutex); + + while (TRUE) + { + char buf[1]; + bool old, notified = FALSE; + + old = thread_cancelability(TRUE); + res = select(maxfd + 1, &rd, &wr, &ex, NULL); + thread_cancelability(old); + if (res > 0) + { + if (this->notify[0] != -1 && FD_ISSET(this->notify[0], &rd)) + { + ignore_result(read(this->notify[0], buf, sizeof(buf))); + return JOB_REQUEUE_DIRECT; + } + + this->mutex->lock(this->mutex); + enumerator = this->fds->create_enumerator(this->fds); + while (enumerator->enumerate(enumerator, &entry)) + { + if (FD_ISSET(entry->fd, &rd)) + { + notified = notify(this, entry, WATCHER_READ); + break; + } + if (FD_ISSET(entry->fd, &wr)) + { + notified = notify(this, entry, WATCHER_WRITE); + break; + } + if (FD_ISSET(entry->fd, &ex)) + { + notified = notify(this, entry, WATCHER_EXCEPT); + break; + } + } + enumerator->destroy(enumerator); + this->mutex->unlock(this->mutex); + + if (notified) + { + /* we temporarily disable a notified FD, rebuild FDSET */ + return JOB_REQUEUE_DIRECT; + } + } + } +} + +METHOD(watcher_t, add, void, + private_watcher_t *this, int fd, watcher_event_t events, + watcher_cb_t cb, void *data) +{ + entry_t *entry; + + INIT(entry, + .fd = fd, + .events = events, + .cb = cb, + .data = data, + .active = TRUE, + ); + + this->mutex->lock(this->mutex); + this->fds->insert_last(this->fds, entry); + if (this->fds->get_count(this->fds) == 1) + { + lib->processor->queue_job(lib->processor, + (job_t*)callback_job_create_with_prio((void*)watch, this, + NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); + } + else + { + update(this); + } + this->mutex->unlock(this->mutex); +} + +METHOD(watcher_t, remove_, void, + private_watcher_t *this, int fd) +{ + enumerator_t *enumerator; + entry_t *entry; + + this->mutex->lock(this->mutex); + while (TRUE) + { + bool is_in_callback = FALSE; + + enumerator = this->fds->create_enumerator(this->fds); + while (enumerator->enumerate(enumerator, &entry)) + { + if (entry->fd == fd) + { + if (entry->active) + { + this->fds->remove_at(this->fds, enumerator); + free(entry); + } + else + { + is_in_callback = TRUE; + break; + } + } + } + enumerator->destroy(enumerator); + if (!is_in_callback) + { + break; + } + this->condvar->wait(this->condvar, this->mutex); + } + + update(this); + this->mutex->unlock(this->mutex); +} + +METHOD(watcher_t, destroy, void, + private_watcher_t *this) +{ + this->mutex->destroy(this->mutex); + this->condvar->destroy(this->condvar); + this->fds->destroy(this->fds); + if (this->notify[0] != -1) + { + close(this->notify[0]); + } + if (this->notify[1] != -1) + { + close(this->notify[1]); + } + free(this); +} + +/** + * See header + */ +watcher_t *watcher_create() +{ + private_watcher_t *this; + + INIT(this, + .public = { + .add = _add, + .remove = _remove_, + .destroy = _destroy, + }, + .fds = linked_list_create(), + .mutex = mutex_create(MUTEX_TYPE_DEFAULT), + .condvar = condvar_create(CONDVAR_TYPE_DEFAULT), + .notify[0] = -1, + .notify[1] = -1, + ); + + if (pipe(this->notify) != 0) + { + DBG1(DBG_LIB, "creating watcher notify pipe failed: %s", + strerror(errno)); + } + return &this->public; +} diff --git a/src/libstrongswan/processing/watcher.h b/src/libstrongswan/processing/watcher.h new file mode 100644 index 0000000000..db7dd4fa87 --- /dev/null +++ b/src/libstrongswan/processing/watcher.h @@ -0,0 +1,97 @@ +/* + * Copyright (C) 2013 Martin Willi + * Copyright (C) 2013 revosec AG + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. See . + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +/** + * @defgroup watcher watcher + * @{ @ingroup processor + */ + +#ifndef WATCHER_H_ +#define WATCHER_H_ + +typedef struct watcher_t watcher_t; +typedef enum watcher_event_t watcher_event_t; + +#include + +/** + * Callback function to register for file descriptor events. + * + * The callback is executed asynchronously using a thread from the pool. + * Monitoring of fd is temporarily suspended to avoid additional events while + * it is processed asynchronously. To allow concurrent events, one can quickly + * process it (using a read/write) and return from the callback. This will + * re-enable the event, while the data read can be processed in another + * asynchronous job. + * + * On Linux, even if select() marks an FD as "ready", a subsequent read/write + * can block. It is therefore highly recommended to use non-blocking I/O + * and handle EAGAIN/EWOULDBLOCK gracefully. + * + * @param data user data passed during registration + * @param fd file descriptor the event occured on + * @param event type of event + * @return TRUE to keep watching event, FALSE to unregister fd for event + */ +typedef bool (*watcher_cb_t)(void *data, int fd, watcher_event_t event); + +/** + * What events to watch for a file descriptor. + */ +enum watcher_event_t { + WATCHER_READ = (1<<0), + WATCHER_WRITE = (1<<1), + WATCHER_EXCEPT = (1<<2), +}; + +/** + * Watch multiple file descriptors using select(). + */ +struct watcher_t { + + /** + * Start watching a new file descriptor. + * + * @param fd file descriptor to start watching + * @param events ORed set of events to watch + * @param cb callback function to invoke on events + * @param data data to pass to cb() + */ + void (*add)(watcher_t *this, int fd, watcher_event_t events, + watcher_cb_t cb, void *data); + + /** + * Stop watching a previously registered file descriptor. + * + * This call blocks until any active callback for this FD returns. + * + * @param fd file descriptor to stop watching + */ + void (*remove)(watcher_t *this, int fd); + + /** + * Destroy a watcher_t. + */ + void (*destroy)(watcher_t *this); +}; + +/** + * Create a watcher instance. + * + * @return watcher + */ +watcher_t *watcher_create(); + +#endif /** WATCHER_H_ @}*/