From: Martin Willi Date: Fri, 28 Jun 2013 09:50:59 +0000 (+0200) Subject: stream: add a concurrency option to services, limiting parallel callbacks X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=939a3e527b82a4187aa26a2c34fbf0162dbe7259;p=thirdparty%2Fstrongswan.git stream: add a concurrency option to services, limiting parallel callbacks --- diff --git a/src/libstrongswan/networking/streams/stream_manager.c b/src/libstrongswan/networking/streams/stream_manager.c index c7e5fd19f3..e70df316f8 100644 --- a/src/libstrongswan/networking/streams/stream_manager.c +++ b/src/libstrongswan/networking/streams/stream_manager.c @@ -108,7 +108,7 @@ METHOD(stream_manager_t, connect_, stream_t*, METHOD(stream_manager_t, start_service, bool, private_stream_manager_t *this, char *uri, int backlog, - stream_service_cb_t cb, void *data, job_priority_t prio) + stream_service_cb_t cb, void *data, job_priority_t prio, u_int cncrncy) { running_entry_t *running; enumerator_t *enumerator; @@ -140,7 +140,7 @@ METHOD(stream_manager_t, start_service, bool, .uri = strdup(uri), .service = service, ); - service->on_accept(service, cb, data, prio); + service->on_accept(service, cb, data, prio, cncrncy); this->lock->write_lock(this->lock); this->running->insert_last(this->running, running); diff --git a/src/libstrongswan/networking/streams/stream_manager.h b/src/libstrongswan/networking/streams/stream_manager.h index 8639893f11..4e798fa060 100644 --- a/src/libstrongswan/networking/streams/stream_manager.h +++ b/src/libstrongswan/networking/streams/stream_manager.h @@ -47,11 +47,12 @@ struct stream_manager_t { * @param cb callback function invoked for each client connection * @param data user data to pass to callback * @param prio job priority to invoke callback with + * @param cncrncy maximum number of parallel callback invocations * @return TRUE if service started, FALSE on failure */ bool (*start_service)(stream_manager_t *this, char *uri, int backlog, stream_service_cb_t cb, void *data, - job_priority_t prio); + job_priority_t prio, u_int cncrncy); /** * Stop a service previously create with start_service(). diff --git a/src/libstrongswan/networking/streams/stream_service.c b/src/libstrongswan/networking/streams/stream_service.c index 5f29051461..34d45a0674 100644 --- a/src/libstrongswan/networking/streams/stream_service.c +++ b/src/libstrongswan/networking/streams/stream_service.c @@ -15,6 +15,8 @@ #include #include +#include +#include #include #include @@ -54,6 +56,26 @@ struct private_stream_service_t { * Job priority to invoke callback with */ job_priority_t prio; + + /** + * Maximum number of parallel callback invocations + */ + u_int cncrncy; + + /** + * Currently active jobs + */ + u_int active; + + /** + * mutex to lock active counter + */ + mutex_t *mutex; + + /** + * Condvar to wait for callback termination + */ + condvar_t *condvar; }; /** @@ -66,6 +88,8 @@ typedef struct { void *data; /** accepted connection */ int fd; + /** reference to stream service */ + private_stream_service_t *this; } async_data_t; /** @@ -73,6 +97,18 @@ typedef struct { */ static void destroy_async_data(async_data_t *data) { + private_stream_service_t *this = data->this; + + this->mutex->lock(this->mutex); + if (this->active-- == this->cncrncy) + { + /* leaving concurrency limit, restart accept()ing. */ + this->public.on_accept(&this->public, this->cb, this->data, + this->prio, this->cncrncy); + } + this->condvar->signal(this->condvar); + this->mutex->unlock(this->mutex); + close(data->fd); free(data); } @@ -100,15 +136,25 @@ static job_requeue_t accept_async(async_data_t *data) static bool watch(private_stream_service_t *this, int fd, watcher_event_t event) { async_data_t *data; + bool keep = TRUE; INIT(data, .cb = this->cb, .data = this->data, .fd = accept(fd, NULL, NULL), + .this = this, ); if (data->fd != -1) { + this->mutex->lock(this->mutex); + if (++this->active == this->cncrncy) + { + /* concurrency limit reached, stop accept()ing new connections */ + keep = FALSE; + } + this->mutex->unlock(this->mutex); + lib->processor->queue_job(lib->processor, (job_t*)callback_job_create_with_prio((void*)accept_async, data, (void*)destroy_async_data, NULL, this->prio)); @@ -117,13 +163,21 @@ static bool watch(private_stream_service_t *this, int fd, watcher_event_t event) { free(data); } - return TRUE; + return keep; } METHOD(stream_service_t, on_accept, void, private_stream_service_t *this, stream_service_cb_t cb, void *data, - job_priority_t prio) + job_priority_t prio, u_int cncrncy) { + this->mutex->lock(this->mutex); + + /* wait for all callbacks to return */ + while (this->active) + { + this->condvar->wait(this->condvar, this->mutex); + } + if (this->cb) { lib->watcher->remove(lib->watcher, this->fd); @@ -135,19 +189,24 @@ METHOD(stream_service_t, on_accept, void, { this->prio = prio; } + this->cncrncy = cncrncy; if (this->cb) { lib->watcher->add(lib->watcher, this->fd, WATCHER_READ, (watcher_cb_t)watch, this); } + + this->mutex->unlock(this->mutex); } METHOD(stream_service_t, destroy, void, private_stream_service_t *this) { - on_accept(this, NULL, NULL, this->prio); + on_accept(this, NULL, NULL, this->prio, this->cncrncy); close(this->fd); + this->mutex->destroy(this->mutex); + this->condvar->destroy(this->condvar); free(this); } @@ -165,6 +224,8 @@ stream_service_t *stream_service_create_from_fd(int fd) }, .fd = fd, .prio = JOB_PRIO_MEDIUM, + .mutex = mutex_create(MUTEX_TYPE_RECURSIVE), + .condvar = condvar_create(CONDVAR_TYPE_DEFAULT), ); return &this->public; diff --git a/src/libstrongswan/networking/streams/stream_service.h b/src/libstrongswan/networking/streams/stream_service.h index 91a7a1722a..27ef79148e 100644 --- a/src/libstrongswan/networking/streams/stream_service.h +++ b/src/libstrongswan/networking/streams/stream_service.h @@ -59,9 +59,11 @@ struct stream_service_t { * @param cb callback function to call for accepted client streams * @param data data to pass to callback function * @param prio job priority to run callback with + * @param cncrncy maximum number of parallel callback invocations */ void (*on_accept)(stream_service_t *this, - stream_service_cb_t cb, void *data, job_priority_t prio); + stream_service_cb_t cb, void *data, + job_priority_t prio, u_int cncrncy); /** * Destroy a stream_service_t.