*/
u_int active;
+ /**
+ * Currently running jobs
+ */
+ u_int running;
+
/**
* mutex to lock active counter
*/
* TRUE when the service is terminated
*/
bool terminated;
+
+ /**
+ * Reference counter
+ */
+ refcount_t ref;
};
+static void destroy_service(private_stream_service_t *this)
+{
+ if (ref_put(&this->ref))
+ {
+ close(this->fd);
+ this->mutex->destroy(this->mutex);
+ this->condvar->destroy(this->condvar);
+ free(this);
+ }
+}
+
/**
* Data to pass to async accept job
*/
}
this->condvar->signal(this->condvar);
this->mutex->unlock(this->mutex);
+ destroy_service(this);
if (data->fd != -1)
{
free(data);
}
+/**
+ * Reduce running counter
+ */
+CALLBACK(reduce_running, void,
+ async_data_t *data)
+{
+ private_stream_service_t *this = data->this;
+
+ this->mutex->lock(this->mutex);
+ this->running--;
+ this->condvar->signal(this->condvar);
+ this->mutex->unlock(this->mutex);
+}
+
/**
* Async processing of accepted connection
*/
static job_requeue_t accept_async(async_data_t *data)
{
+ private_stream_service_t *this = data->this;
stream_t *stream;
+ this->mutex->lock(this->mutex);
+ if (this->terminated)
+ {
+ this->mutex->unlock(this->mutex);
+ return JOB_REQUEUE_NONE;
+ }
+ this->running++;
+ this->mutex->unlock(this->mutex);
+
stream = stream_create_from_fd(data->fd);
if (stream)
{
/* FD is now owned by stream, don't close it during cleanup */
data->fd = -1;
+ thread_cleanup_push(reduce_running, data);
thread_cleanup_push((void*)stream->destroy, stream);
thread_cleanup_pop(!data->cb(data->data, stream));
+ thread_cleanup_pop(TRUE);
}
return JOB_REQUEUE_NONE;
}
keep = FALSE;
}
this->mutex->unlock(this->mutex);
+ ref_get(&this->ref);
lib->processor->queue_job(lib->processor,
(job_t*)callback_job_create_with_prio((void*)accept_async, data,
{
this->mutex->lock(this->mutex);
+ if (this->terminated)
+ {
+ this->mutex->unlock(this->mutex);
+ return;
+ }
+
/* wait for all callbacks to return */
while (this->active)
{
private_stream_service_t *this)
{
this->mutex->lock(this->mutex);
+ lib->watcher->remove(lib->watcher, this->fd);
this->terminated = TRUE;
+ while (this->running)
+ {
+ this->condvar->wait(this->condvar, this->mutex);
+ }
this->mutex->unlock(this->mutex);
- on_accept(this, NULL, NULL, this->prio, this->cncrncy);
- close(this->fd);
- this->mutex->destroy(this->mutex);
- this->condvar->destroy(this->condvar);
- free(this);
+ destroy_service(this);
}
/**
.prio = JOB_PRIO_MEDIUM,
.mutex = mutex_create(MUTEX_TYPE_RECURSIVE),
.condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
+ .ref = 1,
);
return &this->public;