-*- coding: utf-8 -*-
Changes with Apache 2.5.0
+ *) mod_http2: reworked deallocation on connection shutdown and worker
+ abort. Separate parent pool for all workers. worker threads are joined
+ on planned worker shutdown.
+ [Yann Ylavic, Stefan Eissing]
+
*) mod_ssl: when receiving requests for other virtual hosts than the handshake
server, the SSL parameters are checked for equality. With equal
configuration, requests are passed for processing. Any change will trigger
have_written = 1;
wait_micros = 0;
}
- else if (status == APR_EAGAIN) {
+ else if (APR_STATUS_IS_EAGAIN(status)) {
/* nop */
}
else if (status == APR_TIMEUP) {
apr_status_t status;
apr_off_t bblen;
+ if (APR_BRIGADE_EMPTY(bb)) {
+ return APR_SUCCESS;
+ }
+
ap_update_child_status(io->connection->sbh, SERVER_BUSY_WRITE, NULL);
status = apr_brigade_length(bb, 1, &bblen);
if (status == APR_SUCCESS) {
return status;
}
+/* Bring the current buffer content into the output brigade, appropriately
+ * chunked.
+ */
static apr_status_t bucketeer_buffer(h2_conn_io *io) {
const char *data = io->buffer;
apr_size_t remaining = io->buflen;
if (io->unflushed) {
apr_status_t status;
if (io->buflen > 0) {
+ /* something in the buffer, put it in the output brigade */
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, io->connection,
"h2_conn_io: flush, flushing %ld bytes", (long)io->buflen);
bucketeer_buffer(io);
io->buflen = 0;
}
- /* Append flush.
- */
- APR_BRIGADE_INSERT_TAIL(io->output,
- apr_bucket_flush_create(io->output->bucket_alloc));
-
- /* Send it out through installed filters (TLS) to the client */
+ /* Send it out */
status = flush_out(io->output, io);
- if (status == APR_SUCCESS) {
- /* These are all fine and no reason for concern. Everything else
- * is interesting. */
- io->unflushed = 0;
- }
- else {
+ if (status != APR_SUCCESS) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, io->connection,
- "h2_conn_io: flush error");
+ "h2_conn_io: flush");
+ return status;
}
-
- return status;
+
+ io->unflushed = 0;
}
return APR_SUCCESS;
}
apr_atomic_inc32(&m->refs);
}
-static void release(h2_mplx *m)
+static void release(h2_mplx *m, int lock)
{
if (!apr_atomic_dec32(&m->refs)) {
+ if (lock) {
+ apr_thread_mutex_lock(m->lock);
+ }
if (m->join_wait) {
apr_thread_cond_signal(m->join_wait);
}
+ if (lock) {
+ apr_thread_mutex_unlock(m->lock);
+ }
}
}
}
void h2_mplx_release(h2_mplx *m)
{
- release(m);
+ release(m, 1);
}
static void workers_register(h2_mplx *m) {
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
- int attempts = 0;
-
- release(m);
+ release(m, 0);
while (apr_atomic_read32(&m->refs) > 0) {
m->join_wait = wait;
- ap_log_cerror(APLOG_MARK, (attempts? APLOG_INFO : APLOG_DEBUG),
- 0, m->c,
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
"h2_mplx(%ld): release_join, refs=%d, waiting...",
m->id, m->refs);
- apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(10));
- if (++attempts >= 6) {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
- APLOGNO(02952)
- "h2_mplx(%ld): join attempts exhausted, refs=%d",
- m->id, m->refs);
- break;
- }
- }
- if (m->join_wait) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
- "h2_mplx(%ld): release_join -> destroy", m->id);
+ apr_thread_cond_wait(wait, m->lock);
}
m->join_wait = NULL;
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
+ "h2_mplx(%ld): release_join -> destroy", m->id);
apr_thread_mutex_unlock(m->lock);
h2_mplx_destroy(m);
}
if (io) {
io->input_arrived = iowait;
status = h2_io_in_read(io, bb, 0);
- while (status == APR_EAGAIN
+ while (APR_STATUS_IS_EAGAIN(status)
&& !is_aborted(m, &status)
&& block == APR_BLOCK_READ) {
apr_thread_cond_wait(io->input_arrived, m->lock);
static int h2_session_status_from_apr_status(apr_status_t rv)
{
- switch (rv) {
- case APR_SUCCESS:
- return NGHTTP2_NO_ERROR;
- case APR_EAGAIN:
- case APR_TIMEUP:
- return NGHTTP2_ERR_WOULDBLOCK;
- case APR_EOF:
+ if (rv == APR_SUCCESS) {
+ return NGHTTP2_NO_ERROR;
+ }
+ else if (APR_STATUS_IS_EAGAIN(rv)) {
+ return NGHTTP2_ERR_WOULDBLOCK;
+ }
+ else if (APR_STATUS_IS_EOF(rv)) {
return NGHTTP2_ERR_EOF;
- default:
- return NGHTTP2_ERR_PROTO;
}
+ return NGHTTP2_ERR_PROTO;
}
static int stream_open(h2_session *session, int stream_id)
if (status == APR_SUCCESS) {
return length;
}
- if (status == APR_EAGAIN || status == APR_TIMEUP) {
+ if (APR_STATUS_IS_EAGAIN(status)) {
return NGHTTP2_ERR_WOULDBLOCK;
}
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
if (status == APR_SUCCESS) {
return 0;
}
- else if (status != APR_EOF) {
- ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
APLOGNO(02925)
"h2_stream(%ld-%d): failed send_data_cb",
session->id, (int)stream_id);
- return NGHTTP2_ERR_CALLBACK_FAILURE;
}
return h2_session_status_from_apr_status(status);
rv = 0; /* ...gracefully shut down */
break;
case APR_EBADF: /* connection unusable, terminate silently */
- case APR_ECONNABORTED:
- rv = NGHTTP2_ERR_EOF;
- break;
default:
+ if (APR_STATUS_IS_ECONNABORTED(reason)
+ || APR_STATUS_IS_ECONNRESET(reason)
+ || APR_STATUS_IS_EBADF(reason)) {
+ rv = NGHTTP2_ERR_EOF;
+ }
break;
}
}
if (status == APR_SUCCESS) {
flush_output = 1;
}
- else if (status != APR_EAGAIN) {
+ else if (!APR_STATUS_IS_EAGAIN(status)) {
return status;
}
return NULL;
}
+static apr_status_t cleanup_join_thread(void *ctx)
+{
+ h2_worker *w = ctx;
+ /* do the join only when the worker is aborted. Otherwise,
+ * we are probably in a process shutdown.
+ */
+ if (w->thread && w->aborted) {
+ apr_status_t rv;
+ apr_thread_join(&rv, w->thread);
+ }
+ return APR_SUCCESS;
+}
+
h2_worker *h2_worker_create(int id,
apr_pool_t *parent_pool,
apr_threadattr_t *attr,
return NULL;
}
+ apr_pool_pre_cleanup_register(pool, w, cleanup_join_thread);
apr_thread_create(&w->thread, attr, execute, w, pool);
}
return w;
return 0;
}
+static void cleanup_zombies(h2_workers *workers, int lock) {
+ if (lock) {
+ apr_thread_mutex_lock(workers->lock);
+ }
+ while (!H2_WORKER_LIST_EMPTY(&workers->zombies)) {
+ h2_worker *zombie = H2_WORKER_LIST_FIRST(&workers->zombies);
+ H2_WORKER_REMOVE(zombie);
+ ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s,
+ "h2_workers: cleanup zombie %d", zombie->id);
+ h2_worker_destroy(zombie);
+ }
+ if (lock) {
+ apr_thread_mutex_unlock(workers->lock);
+ }
+}
+
/**
* Get the next task for the given worker. Will block until a task arrives
if (!task) {
/* Need to wait for either a new mplx to arrive.
*/
+ cleanup_zombies(workers, 0);
+
if (workers->worker_count > workers->min_size) {
apr_time_t now = apr_time_now();
if (now >= (start_wait + max_wait)) {
/* waited long enough without getting a task. */
- status = APR_TIMEUP;
- }
- else {
- ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s,
- "h2_worker(%d): waiting signal, "
- "worker_count=%d", worker->id,
- (int)workers->worker_count);
- status = apr_thread_cond_timedwait(workers->mplx_added,
- workers->lock, max_wait);
- }
-
- if (status == APR_TIMEUP) {
- /* waited long enough */
if (workers->worker_count > workers->min_size) {
ap_log_error(APLOG_MARK, APLOG_TRACE1, 0,
workers->s,
break;
}
}
+ ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s,
+ "h2_worker(%d): waiting signal, "
+ "worker_count=%d", worker->id,
+ (int)workers->worker_count);
+ apr_thread_cond_timedwait(workers->mplx_added,
+ workers->lock, max_wait);
}
else {
ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s,
h2_workers *workers = (h2_workers *)ctx;
apr_status_t status = apr_thread_mutex_lock(workers->lock);
if (status == APR_SUCCESS) {
- ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s,
+ ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s,
"h2_worker(%d): done", h2_worker_get_id(worker));
H2_WORKER_REMOVE(worker);
--workers->worker_count;
- h2_worker_destroy(worker);
+ H2_WORKER_LIST_INSERT_TAIL(&workers->zombies, worker);
apr_thread_mutex_unlock(workers->lock);
}
if (!w) {
return APR_ENOMEM;
}
- ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s,
+ ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s,
"h2_workers: adding worker(%d)", h2_worker_get_id(w));
++workers->worker_count;
H2_WORKER_LIST_INSERT_TAIL(&workers->workers, w);
return status;
}
-h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pool,
+h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool,
int min_size, int max_size)
{
apr_status_t status;
h2_workers *workers;
+ apr_pool_t *pool;
+
AP_DEBUG_ASSERT(s);
- AP_DEBUG_ASSERT(pool);
- status = APR_SUCCESS;
+ AP_DEBUG_ASSERT(server_pool);
+ /* let's have our own pool that will be parent to all h2_worker
+ * instances we create. This happens in various threads, but always
+ * guarded by our lock. Without this pool, all subpool creations would
+ * happen on the pool handed to us, which we do not guard.
+ */
+ apr_pool_create(&pool, server_pool);
workers = apr_pcalloc(pool, sizeof(h2_workers));
if (workers) {
workers->s = s;
apr_threadattr_create(&workers->thread_attr, workers->pool);
APR_RING_INIT(&workers->workers, h2_worker, link);
+ APR_RING_INIT(&workers->zombies, h2_worker, link);
APR_RING_INIT(&workers->mplxs, h2_mplx, link);
status = apr_thread_mutex_create(&workers->lock,
void h2_workers_destroy(h2_workers *workers)
{
+ /* before we go, cleanup any zombie workers that may have accumulated */
+ cleanup_zombies(workers, 1);
+
if (workers->mplx_added) {
apr_thread_cond_destroy(workers->mplx_added);
workers->mplx_added = NULL;
h2_worker *w = H2_WORKER_LIST_FIRST(&workers->workers);
H2_WORKER_REMOVE(w);
}
+ if (workers->pool) {
+ apr_pool_destroy(workers->pool);
+ /* workers is gone */
+ }
}
apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m)
add_worker(workers);
}
+ /* cleanup any zombie workers that may have accumulated */
+ cleanup_zombies(workers, 0);
+
apr_thread_mutex_unlock(workers->lock);
}
return status;
H2_MPLX_REMOVE(m);
status = APR_SUCCESS;
}
+ /* cleanup any zombie workers that may have accumulated */
+ cleanup_zombies(workers, 0);
+
apr_thread_mutex_unlock(workers->lock);
}
return status;
apr_threadattr_t *thread_attr;
APR_RING_HEAD(h2_worker_list, h2_worker) workers;
+ APR_RING_HEAD(h2_worker_zombies, h2_worker) zombies;
APR_RING_HEAD(h2_mplx_list, h2_mplx) mplxs;
int worker_count;