-*- coding: utf-8 -*-
Changes with Apache 2.4.18
-
+ *) mod_http2: reworked deallocation on connection shutdown and worker
+ abort. Separate parent pool for all workers. worker threads are joined
+ on planned worker shutdown.
+ [Yann Ylavic, Stefan Eissing]
+
Changes with Apache 2.4.17
have_written = 1;
wait_micros = 0;
}
- else if (status == APR_EAGAIN) {
+ else if (APR_STATUS_IS_EAGAIN(status)) {
/* nop */
}
else if (status == APR_TIMEUP) {
h2_io_cleanup(io);
}
+void h2_io_rst(h2_io *io, int error)
+{
+ io->rst_error = error;
+ io->eos_in = 1;
+}
+
int h2_io_in_has_eos_for(h2_io *io)
{
return io->eos_in || (io->bbin && h2_util_has_eos(io->bbin, 0));
h2_io_data_cb *cb, void *ctx,
apr_size_t *plen, int *peos)
{
+ apr_status_t status;
+
+ if (io->eos_out) {
+ *plen = 0;
+ *peos = 1;
+ return APR_SUCCESS;
+ }
+
if (cb == NULL) {
/* just checking length available */
- return h2_util_bb_avail(io->bbout, plen, peos);
+ status = h2_util_bb_avail(io->bbout, plen, peos);
}
- return h2_util_bb_readx(io->bbout, cb, ctx, plen, peos);
+ else {
+ status = h2_util_bb_readx(io->bbout, cb, ctx, plen, peos);
+ if (status == APR_SUCCESS) {
+ io->eos_out = *peos;
+ }
+ }
+
+ return status;
}
apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb,
apr_size_t maxlen, int *pfile_handles_allowed)
{
+ apr_status_t status;
+ int start_allowed;
+
+ if (io->eos_out) {
+ apr_off_t len;
+ /* We have already delivered an EOS bucket to a reader, no
+ * sense in storing anything more here.
+ */
+ status = apr_brigade_length(bb, 1, &len);
+ if (status == APR_SUCCESS) {
+ if (len > 0) {
+ /* someone tries to write real data after EOS, that
+ * does not look right. */
+ status = APR_EOF;
+ }
+ /* cleanup, as if we had moved the data */
+ apr_brigade_cleanup(bb);
+ }
+ return status;
+ }
+
/* Let's move the buckets from the request processing in here, so
* that the main thread can read them when it has time/capacity.
*
* many open files already buffered. Otherwise we will run out of
* file handles.
*/
- int start_allowed = *pfile_handles_allowed;
- apr_status_t status;
+ start_allowed = *pfile_handles_allowed;
+
+ if (io->rst_error) {
+ return APR_ECONNABORTED;
+ }
status = h2_util_move(io->bbout, bb, maxlen, pfile_handles_allowed,
"h2_io_out_write");
/* track # file buckets moved into our pool */
apr_status_t h2_io_out_close(h2_io *io)
{
- APR_BRIGADE_INSERT_TAIL(io->bbout,
- apr_bucket_eos_create(io->bbout->bucket_alloc));
+ if (!io->eos_out && !h2_util_has_eos(io->bbout, 0)) {
+ APR_BRIGADE_INSERT_TAIL(io->bbout,
+ apr_bucket_eos_create(io->bbout->bucket_alloc));
+ }
return APR_SUCCESS;
}
apr_bucket_brigade *bbin; /* input data for stream */
int eos_in;
int task_done;
+ int rst_error;
apr_size_t input_consumed; /* how many bytes have been read */
struct apr_thread_cond_t *input_arrived; /* block on reading */
apr_bucket_brigade *bbout; /* output data from stream */
+ int eos_out;
struct apr_thread_cond_t *output_drained; /* block on writing */
struct h2_response *response;/* submittable response created */
*/
void h2_io_destroy(h2_io *io);
+/**
+ * Reset the stream with the given error code.
+ */
+void h2_io_rst(h2_io *io, int error);
+
/**
* The input data is completely queued. Blocked reads will return immediately
* and give either data or EOF.
return ps? *ps : NULL;
}
-h2_io *h2_io_set_get_highest_prio(h2_io_set *set)
-{
- h2_io *highest = NULL;
- int i;
- for (i = 0; i < set->list->nelts; ++i) {
- h2_io *io = h2_io_IDX(set->list, i);
- if (!highest /*|| io-prio even higher */ ) {
- highest = io;
- }
- }
- return highest;
-}
-
static void h2_io_set_sort(h2_io_set *sp)
{
qsort(sp->list->elts, sp->list->nelts, sp->list->elt_size,
return APR_SUCCESS;
}
+static void remove_idx(h2_io_set *sp, int idx)
+{
+ int n;
+ --sp->list->nelts;
+ n = sp->list->nelts - idx;
+ if (n > 0) {
+ /* Close the hole in the array by moving the upper
+ * parts down one step.
+ */
+ h2_io **selts = (h2_io**)sp->list->elts;
+ memmove(selts + idx, selts + idx + 1, n * sizeof(h2_io*));
+ }
+}
+
h2_io *h2_io_set_remove(h2_io_set *sp, h2_io *io)
{
int i;
for (i = 0; i < sp->list->nelts; ++i) {
h2_io *e = h2_io_IDX(sp->list, i);
if (e == io) {
- int n;
- --sp->list->nelts;
- n = sp->list->nelts - i;
- if (n > 0) {
- /* Close the hole in the array by moving the upper
- * parts down one step.
- */
- h2_io **selts = (h2_io**)sp->list->elts;
- memmove(selts+i, selts+i+1, n * sizeof(h2_io*));
- }
+ remove_idx(sp, i);
return e;
}
}
return NULL;
}
+h2_io *h2_io_set_pop_highest_prio(h2_io_set *set)
+{
+ /* For now, this just removes the first element in the set.
+ * the name is misleading...
+ */
+ if (set->list->nelts > 0) {
+ h2_io *io = h2_io_IDX(set->list, 0);
+ remove_idx(set, 0);
+ return io;
+ }
+ return NULL;
+}
+
void h2_io_set_destroy_all(h2_io_set *sp)
{
int i;
apr_status_t h2_io_set_add(h2_io_set *set, struct h2_io *io);
h2_io *h2_io_set_get(h2_io_set *set, int stream_id);
-h2_io *h2_io_set_get_highest_prio(h2_io_set *set);
h2_io *h2_io_set_remove(h2_io_set *set, struct h2_io *io);
void h2_io_set_remove_all(h2_io_set *set);
void h2_io_set_iter(h2_io_set *set,
h2_io_set_iter_fn *iter, void *ctx);
+h2_io *h2_io_set_pop_highest_prio(h2_io_set *set);
+
#endif /* defined(__mod_h2__h2_io_set__) */
#include "h2_task_output.h"
#include "h2_task_queue.h"
#include "h2_workers.h"
+#include "h2_util.h"
static int is_aborted(h2_mplx *m, apr_status_t *pstatus) {
apr_atomic_inc32(&m->refs);
}
-static void release(h2_mplx *m)
+static void release(h2_mplx *m, int lock)
{
if (!apr_atomic_dec32(&m->refs)) {
+ if (lock) {
+ apr_thread_mutex_lock(m->lock);
+ }
if (m->join_wait) {
apr_thread_cond_signal(m->join_wait);
}
+ if (lock) {
+ apr_thread_mutex_unlock(m->lock);
+ }
}
}
}
void h2_mplx_release(h2_mplx *m)
{
- release(m);
+ release(m, 1);
}
static void workers_register(h2_mplx *m) {
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
- int attempts = 0;
-
- release(m);
+ release(m, 0);
while (apr_atomic_read32(&m->refs) > 0) {
m->join_wait = wait;
- ap_log_cerror(APLOG_MARK, (attempts? APLOG_INFO : APLOG_DEBUG),
- 0, m->c,
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
"h2_mplx(%ld): release_join, refs=%d, waiting...",
m->id, m->refs);
- apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(10));
- if (++attempts >= 6) {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
- APLOGNO(02952)
- "h2_mplx(%ld): join attempts exhausted, refs=%d",
- m->id, m->refs);
- break;
- }
- }
- if (m->join_wait) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
- "h2_mplx(%ld): release_join -> destroy", m->id);
+ apr_thread_cond_wait(wait, m->lock);
}
m->join_wait = NULL;
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
+ "h2_mplx(%ld): release_join -> destroy", m->id);
apr_thread_mutex_unlock(m->lock);
h2_mplx_destroy(m);
}
stream_destroy(m, stream, io);
}
else {
+ if (stream->rst_error) {
+ /* Forward error code to fail any further attempt to
+ * write to io */
+ h2_io_rst(io, stream->rst_error);
+ }
+ /* Remove io from ready set (if there), since we will never submit it */
+ h2_io_set_remove(m->ready_ios, io);
/* Add stream to closed set for cleanup when task is done */
h2_stream_set_add(m->closed, stream);
}
if (io) {
io->input_arrived = iowait;
status = h2_io_in_read(io, bb, 0);
- while (status == APR_EAGAIN
+ while (APR_STATUS_IS_EAGAIN(status)
&& !is_aborted(m, &status)
&& block == APR_BLOCK_READ) {
apr_thread_cond_wait(io->input_arrived, m->lock);
return status;
}
+#define H2_MPLX_IO_OUT(lvl,m,io,msg) \
+ do { \
+ if (APLOG_C_IS_LEVEL((m)->c,lvl)) \
+ h2_util_bb_log((m)->c,(io)->id,lvl,msg,(io)->bbout); \
+ } while(0)
+
+
apr_status_t h2_mplx_out_readx(h2_mplx *m, int stream_id,
h2_io_data_cb *cb, void *ctx,
apr_size_t *plen, int *peos)
if (APR_SUCCESS == status) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io) {
+ H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_readx_pre");
+
status = h2_io_out_readx(io, cb, ctx, plen, peos);
- if (status == APR_SUCCESS && io->output_drained) {
+
+ H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_readx_post");
+ if (status == APR_SUCCESS && cb && io->output_drained) {
apr_thread_cond_signal(io->output_drained);
}
}
}
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
- h2_io *io = h2_io_set_get_highest_prio(m->ready_ios);
+ h2_io *io = h2_io_set_pop_highest_prio(m->ready_ios);
if (io) {
- h2_response *response = io->response;
-
- AP_DEBUG_ASSERT(response);
- h2_io_set_remove(m->ready_ios, io);
-
- stream = h2_stream_set_get(streams, response->stream_id);
+ stream = h2_stream_set_get(streams, io->id);
if (stream) {
- h2_stream_set_response(stream, response, io->bbout);
+ if (io->rst_error) {
+ h2_stream_rst(stream, io->rst_error);
+ }
+ else {
+ AP_DEBUG_ASSERT(io->response);
+ h2_stream_set_response(stream, io->response, io->bbout);
+ }
+
if (io->output_drained) {
apr_thread_cond_signal(io->output_drained);
}
}
else {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, APR_NOTFOUND, m->c,
- APLOGNO(02953) "h2_mplx(%ld): stream for response %d",
- m->id, response->stream_id);
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(02953)
+ "h2_mplx(%ld): stream for response %d not found",
+ m->id, io->id);
+ /* We have the io ready, but the stream has gone away, maybe
+ * reset by the client. Should no longer happen since such
+ * streams should clear io's from the ready queue.
+ */
}
}
apr_thread_mutex_unlock(m->lock);
status = h2_io_out_write(io, bb, m->stream_max_mem,
&m->file_handles_allowed);
-
/* Wait for data to drain until there is room again */
while (!APR_BRIGADE_EMPTY(bb)
&& iowait
}
}
apr_brigade_cleanup(bb);
+
return status;
}
status = apr_thread_mutex_lock(m->lock);
if (APR_SUCCESS == status) {
status = out_open(m, stream_id, response, f, bb, iowait);
+ if (APLOGctrace1(m->c)) {
+ h2_util_bb_log(m->c, stream_id, APLOG_TRACE1, "h2_mplx_out_open", bb);
+ }
if (m->aborted) {
return APR_ECONNABORTED;
}
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io) {
status = out_write(m, io, f, bb, iowait);
+ H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_write");
+
have_out_data_for(m, stream_id);
if (m->aborted) {
return APR_ECONNABORTED;
if (!m->aborted) {
h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
if (io) {
- if (!io->response || !io->response->ngheader) {
+ if (!io->response && !io->rst_error) {
/* In case a close comes before a response was created,
* insert an error one so that our streams can properly
* reset.
h2_response *r = h2_response_create(stream_id,
"500", NULL, m->pool);
status = out_open(m, stream_id, r, NULL, NULL, NULL);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c,
+ "h2_mplx(%ld-%d): close, no response, no rst",
+ m->id, io->id);
}
status = h2_io_out_close(io);
+ H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_close");
+
+ have_out_data_for(m, stream_id);
+ if (m->aborted) {
+ /* if we were the last output, the whole session might
+ * have gone down in the meantime.
+ */
+ return APR_SUCCESS;
+ }
+ }
+ else {
+ status = APR_ECONNABORTED;
+ }
+ }
+ apr_thread_mutex_unlock(m->lock);
+ }
+ return status;
+}
+
+apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error)
+{
+ apr_status_t status;
+ AP_DEBUG_ASSERT(m);
+ if (m->aborted) {
+ return APR_ECONNABORTED;
+ }
+ status = apr_thread_mutex_lock(m->lock);
+ if (APR_SUCCESS == status) {
+ if (!m->aborted) {
+ h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+ if (io && !io->rst_error) {
+ h2_io_rst(io, error);
+ if (!io->response) {
+ h2_io_set_add(m->ready_ios, io);
+ }
+ H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_rst");
+
have_out_data_for(m, stream_id);
if (m->aborted) {
/* if we were the last output, the whole session might
* Decreases the reference counter of this mplx.
*/
void h2_mplx_release(h2_mplx *m);
+
/**
* Decreases the reference counter of this mplx and waits for it
* to reached 0, destroy the mplx afterwards.
*/
apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id);
+apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error);
+
/*******************************************************************************
* h2_mplx list Manipulation.
******************************************************************************/
static int h2_session_status_from_apr_status(apr_status_t rv)
{
- switch (rv) {
- case APR_SUCCESS:
- return NGHTTP2_NO_ERROR;
- case APR_EAGAIN:
- case APR_TIMEUP:
- return NGHTTP2_ERR_WOULDBLOCK;
- case APR_EOF:
+ if (rv == APR_SUCCESS) {
+ return NGHTTP2_NO_ERROR;
+ }
+ else if (APR_STATUS_IS_EAGAIN(rv)) {
+ return NGHTTP2_ERR_WOULDBLOCK;
+ }
+ else if (APR_STATUS_IS_EOF(rv)) {
return NGHTTP2_ERR_EOF;
- default:
- return NGHTTP2_ERR_PROTO;
}
+ return NGHTTP2_ERR_PROTO;
}
static int stream_open(h2_session *session, int stream_id)
if (status == APR_SUCCESS) {
return length;
}
- if (status == APR_EAGAIN || status == APR_TIMEUP) {
+ if (APR_STATUS_IS_EAGAIN(status)) {
return NGHTTP2_ERR_WOULDBLOCK;
}
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
"h2_stream(%ld-%d): closing with err=%d %s",
session->id, (int)stream->id, (int)error_code,
nghttp2_strerror(error_code));
+ h2_stream_rst(stream, error_code);
}
h2_stream_set_remove(session->streams, stream);
if (status == APR_SUCCESS) {
return 0;
}
- else if (status != APR_EOF) {
- ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
+ else {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
APLOGNO(02925)
"h2_stream(%ld-%d): failed send_data_cb",
session->id, (int)stream_id);
- return NGHTTP2_ERR_CALLBACK_FAILURE;
}
return h2_session_status_from_apr_status(status);
rv = 0; /* ...gracefully shut down */
break;
case APR_EBADF: /* connection unusable, terminate silently */
- case APR_ECONNABORTED:
- rv = NGHTTP2_ERR_EOF;
- break;
default:
+ if (APR_STATUS_IS_ECONNABORTED(reason)
+ || APR_STATUS_IS_ECONNRESET(reason)
+ || APR_STATUS_IS_EBADF(reason)) {
+ rv = NGHTTP2_ERR_EOF;
+ }
break;
}
}
if (status == APR_SUCCESS) {
flush_output = 1;
}
- else if (status != APR_EAGAIN) {
+ else if (!APR_STATUS_IS_EAGAIN(status)) {
return status;
}
case APR_SUCCESS:
break;
+ case APR_ECONNRESET:
+ return nghttp2_submit_rst_stream(ng2s, NGHTTP2_FLAG_NONE,
+ stream->id, stream->rst_error);
+
case APR_EAGAIN:
/* If there is no data available, our session will automatically
* suspend this stream and not ask for more data until we resume
int rv = 0;
AP_DEBUG_ASSERT(session);
AP_DEBUG_ASSERT(stream);
- AP_DEBUG_ASSERT(stream->response);
+ AP_DEBUG_ASSERT(stream->response || stream->rst_error);
- if (stream->response->ngheader) {
+ if (stream->response && stream->response->ngheader) {
rv = submit_response(session, stream->response);
}
+ else if (stream->rst_error) {
+ rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
+ stream->id, stream->rst_error);
+ }
else {
rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
stream->id, NGHTTP2_PROTOCOL_ERROR);
return APR_SUCCESS;
}
-void h2_stream_attach_pool(h2_stream *stream, apr_pool_t *pool)
-{
- stream->pool = pool;
-}
-
apr_pool_t *h2_stream_detach_pool(h2_stream *stream)
{
apr_pool_t *pool = stream->pool;
return pool;
}
-void h2_stream_abort(h2_stream *stream)
+void h2_stream_rst(h2_stream *stream, int error_code)
{
- AP_DEBUG_ASSERT(stream);
- stream->aborted = 1;
+ stream->rst_error = error_code;
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->m->c,
+ "h2_stream(%ld-%d): reset, error=%d",
+ stream->m->id, stream->id, error_code);
}
apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response,
apr_bucket_brigade *bb)
{
+ apr_status_t status = APR_SUCCESS;
+
stream->response = response;
if (bb && !APR_BRIGADE_EMPTY(bb)) {
if (!stream->bbout) {
stream->bbout = apr_brigade_create(stream->pool,
stream->m->c->bucket_alloc);
}
- return h2_util_move(stream->bbout, bb, 16 * 1024, NULL,
- "h2_stream_set_response");
+ status = h2_util_move(stream->bbout, bb, 16 * 1024, NULL,
+ "h2_stream_set_response");
}
- return APR_SUCCESS;
+ if (APLOGctrace1(stream->m->c)) {
+ apr_size_t len = 0;
+ int eos = 0;
+ if (stream->bbout) {
+ h2_util_bb_avail(stream->bbout, &len, &eos);
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, stream->m->c,
+ "h2_stream(%ld-%d): set_response(%s), brigade=%s, "
+ "len=%ld, eos=%d",
+ stream->m->id, stream->id, response->status,
+ (stream->bbout? "yes" : "no"), (long)len, (int)eos);
+ }
+ return status;
}
static int set_closed(h2_stream *stream)
{
apr_status_t status;
AP_DEBUG_ASSERT(stream);
+ if (stream->rst_error) {
+ return APR_ECONNRESET;
+ }
set_state(stream, H2_STREAM_ST_OPEN);
status = h2_request_rwrite(stream->request, r, stream->m);
return status;
apr_status_t status;
AP_DEBUG_ASSERT(stream);
+ if (stream->rst_error) {
+ return APR_ECONNRESET;
+ }
/* Seeing the end-of-headers, we have everything we need to
* start processing it.
*/
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->m->c,
"h2_stream(%ld-%d): closing input",
stream->m->id, stream->id);
+ if (stream->rst_error) {
+ return APR_ECONNRESET;
+ }
if (set_closed(stream)) {
return h2_request_close(stream->request);
}
const char *value, size_t vlen)
{
AP_DEBUG_ASSERT(stream);
+ if (stream->rst_error) {
+ return APR_ECONNRESET;
+ }
switch (stream->state) {
case H2_STREAM_ST_IDLE:
set_state(stream, H2_STREAM_ST_OPEN);
const char *data, size_t len)
{
AP_DEBUG_ASSERT(stream);
- AP_DEBUG_ASSERT(stream);
+ if (stream->rst_error) {
+ return APR_ECONNRESET;
+ }
switch (stream->state) {
case H2_STREAM_ST_OPEN:
break;
apr_status_t status = APR_SUCCESS;
const char *src;
+ if (stream->rst_error) {
+ return APR_ECONNRESET;
+ }
if (stream->bbout && !APR_BRIGADE_EMPTY(stream->bbout)) {
src = "stream";
status = h2_util_bb_avail(stream->bbout, plen, peos);
h2_io_data_cb *cb, void *ctx,
apr_size_t *plen, int *peos)
{
+ if (stream->rst_error) {
+ return APR_ECONNRESET;
+ }
if (stream->bbout && !APR_BRIGADE_EMPTY(stream->bbout)) {
return h2_util_bb_readx(stream->bbout, cb, ctx, plen, peos);
}
struct h2_task *task; /* task created for this stream */
struct h2_response *response; /* the response, once ready */
apr_bucket_brigade *bbout; /* output DATA */
+ int rst_error; /* stream error for RST_STREAM */
};
apr_status_t h2_stream_destroy(h2_stream *stream);
-apr_pool_t *h2_stream_detach_pool(h2_stream *stream);
-void h2_stream_attach_pool(h2_stream *stream, apr_pool_t *pool);
+void h2_stream_rst(h2_stream *streamm, int error_code);
-void h2_stream_abort(h2_stream *stream);
+apr_pool_t *h2_stream_detach_pool(h2_stream *stream);
apr_status_t h2_stream_rwrite(h2_stream *stream, request_rec *r);
/* Time to populate r with the data we have. */
r->request_time = apr_time_now();
- r->the_request = apr_psprintf(r->pool, "%s %s HTTP/1.1",
- env->method, env->path);
r->method = env->method;
/* Provide quick information about the request method as soon as known */
r->method_number = ap_method_number_of(r->method);
ap_parse_uri(r, env->path);
r->protocol = (char*)"HTTP/2";
r->proto_num = HTTP_VERSION(2, 0);
+
+ r->the_request = apr_psprintf(r->pool, "%s %s %s",
+ r->method, env->path, r->protocol);
/* update what we think the virtual host is based on the headers we've
* now read. may update status.
#include <httpd.h>
#include <http_core.h>
#include <http_log.h>
+#include <http_request.h>
#include <nghttp2/nghttp2.h>
return status;
}
+void h2_util_bb_log(conn_rec *c, int stream_id, int level,
+ const char *tag, apr_bucket_brigade *bb)
+{
+ char buffer[16 * 1024];
+ const char *line = "(null)";
+ apr_size_t bmax = sizeof(buffer)/sizeof(buffer[0]);
+ int off = 0;
+ apr_bucket *b;
+
+ if (bb) {
+ memset(buffer, 0, bmax--);
+ for (b = APR_BRIGADE_FIRST(bb);
+ bmax && (b != APR_BRIGADE_SENTINEL(bb));
+ b = APR_BUCKET_NEXT(b)) {
+
+ if (APR_BUCKET_IS_METADATA(b)) {
+ if (APR_BUCKET_IS_EOS(b)) {
+ off += apr_snprintf(buffer+off, bmax-off, "eos ");
+ }
+ else if (APR_BUCKET_IS_FLUSH(b)) {
+ off += apr_snprintf(buffer+off, bmax-off, "flush ");
+ }
+ else if (AP_BUCKET_IS_EOR(b)) {
+ off += apr_snprintf(buffer+off, bmax-off, "eor ");
+ }
+ else {
+ off += apr_snprintf(buffer+off, bmax-off, "meta(unknown) ");
+ }
+ }
+ else {
+ const char *btype = "data";
+ if (APR_BUCKET_IS_FILE(b)) {
+ btype = "file";
+ }
+ else if (APR_BUCKET_IS_PIPE(b)) {
+ btype = "pipe";
+ }
+ else if (APR_BUCKET_IS_SOCKET(b)) {
+ btype = "socket";
+ }
+ else if (APR_BUCKET_IS_HEAP(b)) {
+ btype = "heap";
+ }
+ else if (APR_BUCKET_IS_TRANSIENT(b)) {
+ btype = "transient";
+ }
+ else if (APR_BUCKET_IS_IMMORTAL(b)) {
+ btype = "immortal";
+ }
+ else if (APR_BUCKET_IS_MMAP(b)) {
+ btype = "mmap";
+ }
+ else if (APR_BUCKET_IS_POOL(b)) {
+ btype = "pool";
+ }
+
+ off += apr_snprintf(buffer+off, bmax-off, "%s[%ld] ",
+ btype,
+ (long)(b->length == ((apr_size_t)-1)?
+ -1 : b->length));
+ }
+ }
+ line = *buffer? buffer : "(empty)";
+ }
+ ap_log_cerror(APLOG_MARK, level, 0, c, "bb_dump(%ld-%d)-%s: %s",
+ c->id, stream_id, tag, line);
+
+}
typedef apr_status_t h2_util_pass_cb(void *ctx,
const char *data, apr_size_t len);
+/**
+ * Read at most *plen bytes from the brigade and pass them into the
+ * given callback. If cb is NULL, just return the amount of data that
+ * could have been read.
+ * If an EOS was/would be encountered, set *peos != 0.
+ * @param bb the brigade to read from
+ * @param cb the callback to invoke for the read data
+ * @param ctx optional data passed to callback
+ * @param plen inout, as input gives the maximum number of bytes to read,
+ * on return specifies the actual/would be number of bytes
+ * @param peos != 0 iff an EOS bucket was/would be encountered.
+ */
apr_status_t h2_util_bb_readx(apr_bucket_brigade *bb,
h2_util_pass_cb *cb, void *ctx,
apr_size_t *plen, int *peos);
+/**
+ * Logs the bucket brigade (which bucket types with what length)
+ * to the log at the given level.
+ * @param c the connection to log for
+ * @param stream_id the stream identifier this brigade belongs to
+ * @param level the log level (as in APLOG_*)
+ * @param tag a short message text about the context
+ * @param bb the brigade to log
+ */
+void h2_util_bb_log(conn_rec *c, int stream_id, int level,
+ const char *tag, apr_bucket_brigade *bb);
+
#endif /* defined(__mod_h2__h2_util__) */
* @macro
* Version number of the h2 module as c string
*/
-#define MOD_HTTP2_VERSION "1.0.0"
+#define MOD_HTTP2_VERSION "1.0.1-DEV"
/**
* @macro
* release. This is a 24 bit number with 8 bits for major number, 8 bits
* for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
*/
-#define MOD_HTTP2_VERSION_NUM 0x010000
+#define MOD_HTTP2_VERSION_NUM 0x010001
#endif /* mod_h2_h2_version_h */
return NULL;
}
+static apr_status_t cleanup_join_thread(void *ctx)
+{
+ h2_worker *w = ctx;
+ /* do the join only when the worker is aborted. Otherwise,
+ * we are probably in a process shutdown.
+ */
+ if (w->thread && w->aborted) {
+ apr_status_t rv;
+ apr_thread_join(&rv, w->thread);
+ }
+ return APR_SUCCESS;
+}
+
h2_worker *h2_worker_create(int id,
apr_pool_t *parent_pool,
apr_threadattr_t *attr,
return NULL;
}
+ apr_pool_pre_cleanup_register(pool, w, cleanup_join_thread);
apr_thread_create(&w->thread, attr, execute, w, pool);
}
return w;
return 0;
}
+static void cleanup_zombies(h2_workers *workers, int lock) {
+ if (lock) {
+ apr_thread_mutex_lock(workers->lock);
+ }
+ while (!H2_WORKER_LIST_EMPTY(&workers->zombies)) {
+ h2_worker *zombie = H2_WORKER_LIST_FIRST(&workers->zombies);
+ H2_WORKER_REMOVE(zombie);
+ ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s,
+ "h2_workers: cleanup zombie %d", zombie->id);
+ h2_worker_destroy(zombie);
+ }
+ if (lock) {
+ apr_thread_mutex_unlock(workers->lock);
+ }
+}
+
/**
* Get the next task for the given worker. Will block until a task arrives
if (!task) {
/* Need to wait for either a new mplx to arrive.
*/
+ cleanup_zombies(workers, 0);
+
if (workers->worker_count > workers->min_size) {
apr_time_t now = apr_time_now();
if (now >= (start_wait + max_wait)) {
/* waited long enough without getting a task. */
- status = APR_TIMEUP;
- }
- else {
- ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s,
- "h2_worker(%d): waiting signal, "
- "worker_count=%d", worker->id,
- (int)workers->worker_count);
- status = apr_thread_cond_timedwait(workers->mplx_added,
- workers->lock, max_wait);
- }
-
- if (status == APR_TIMEUP) {
- /* waited long enough */
if (workers->worker_count > workers->min_size) {
ap_log_error(APLOG_MARK, APLOG_TRACE1, 0,
workers->s,
break;
}
}
+ ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s,
+ "h2_worker(%d): waiting signal, "
+ "worker_count=%d", worker->id,
+ (int)workers->worker_count);
+ apr_thread_cond_timedwait(workers->mplx_added,
+ workers->lock, max_wait);
}
else {
ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s,
h2_workers *workers = (h2_workers *)ctx;
apr_status_t status = apr_thread_mutex_lock(workers->lock);
if (status == APR_SUCCESS) {
- ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s,
+ ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s,
"h2_worker(%d): done", h2_worker_get_id(worker));
H2_WORKER_REMOVE(worker);
--workers->worker_count;
- h2_worker_destroy(worker);
+ H2_WORKER_LIST_INSERT_TAIL(&workers->zombies, worker);
apr_thread_mutex_unlock(workers->lock);
}
if (!w) {
return APR_ENOMEM;
}
- ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s,
+ ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s,
"h2_workers: adding worker(%d)", h2_worker_get_id(w));
++workers->worker_count;
H2_WORKER_LIST_INSERT_TAIL(&workers->workers, w);
return status;
}
-h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pool,
+h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool,
int min_size, int max_size)
{
apr_status_t status;
h2_workers *workers;
+ apr_pool_t *pool;
+
AP_DEBUG_ASSERT(s);
- AP_DEBUG_ASSERT(pool);
- status = APR_SUCCESS;
+ AP_DEBUG_ASSERT(server_pool);
+ /* let's have our own pool that will be parent to all h2_worker
+ * instances we create. This happens in various threads, but always
+ * guarded by our lock. Without this pool, all subpool creations would
+ * happen on the pool handed to us, which we do not guard.
+ */
+ apr_pool_create(&pool, server_pool);
workers = apr_pcalloc(pool, sizeof(h2_workers));
if (workers) {
workers->s = s;
apr_threadattr_create(&workers->thread_attr, workers->pool);
APR_RING_INIT(&workers->workers, h2_worker, link);
+ APR_RING_INIT(&workers->zombies, h2_worker, link);
APR_RING_INIT(&workers->mplxs, h2_mplx, link);
status = apr_thread_mutex_create(&workers->lock,
void h2_workers_destroy(h2_workers *workers)
{
+ /* before we go, cleanup any zombie workers that may have accumulated */
+ cleanup_zombies(workers, 1);
+
if (workers->mplx_added) {
apr_thread_cond_destroy(workers->mplx_added);
workers->mplx_added = NULL;
h2_worker *w = H2_WORKER_LIST_FIRST(&workers->workers);
H2_WORKER_REMOVE(w);
}
+ if (workers->pool) {
+ apr_pool_destroy(workers->pool);
+ /* workers is gone */
+ }
}
apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m)
add_worker(workers);
}
+ /* cleanup any zombie workers that may have accumulated */
+ cleanup_zombies(workers, 0);
+
apr_thread_mutex_unlock(workers->lock);
}
return status;
H2_MPLX_REMOVE(m);
status = APR_SUCCESS;
}
+ /* cleanup any zombie workers that may have accumulated */
+ cleanup_zombies(workers, 0);
+
apr_thread_mutex_unlock(workers->lock);
}
return status;
apr_threadattr_t *thread_attr;
APR_RING_HEAD(h2_worker_list, h2_worker) workers;
+ APR_RING_HEAD(h2_worker_zombies, h2_worker) zombies;
APR_RING_HEAD(h2_mplx_list, h2_mplx) mplxs;
int worker_count;