From: Stefan Eissing Date: Fri, 23 Oct 2015 13:34:20 +0000 (+0000) Subject: combined patches for alpha testing next release of http2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9d6558ad15fd0a4c210a0721079f655d17cec96e;p=thirdparty%2Fapache%2Fhttpd.git combined patches for alpha testing next release of http2 git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4-http2-alpha@1710210 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/CHANGES b/CHANGES index c7e204f9b0a..c12e96e957c 100644 --- a/CHANGES +++ b/CHANGES @@ -1,7 +1,11 @@ -*- coding: utf-8 -*- Changes with Apache 2.4.18 - + *) mod_http2: reworked deallocation on connection shutdown and worker + abort. Separate parent pool for all workers. worker threads are joined + on planned worker shutdown. + [Yann Ylavic, Stefan Eissing] + Changes with Apache 2.4.17 diff --git a/modules/http2/h2_conn.c b/modules/http2/h2_conn.c index 8bffbc42693..1cb3234ca1f 100644 --- a/modules/http2/h2_conn.c +++ b/modules/http2/h2_conn.c @@ -255,7 +255,7 @@ apr_status_t h2_session_process(h2_session *session) have_written = 1; wait_micros = 0; } - else if (status == APR_EAGAIN) { + else if (APR_STATUS_IS_EAGAIN(status)) { /* nop */ } else if (status == APR_TIMEUP) { diff --git a/modules/http2/h2_io.c b/modules/http2/h2_io.c index 42734430fa4..9f699aa9b53 100644 --- a/modules/http2/h2_io.c +++ b/modules/http2/h2_io.c @@ -47,6 +47,12 @@ void h2_io_destroy(h2_io *io) h2_io_cleanup(io); } +void h2_io_rst(h2_io *io, int error) +{ + io->rst_error = error; + io->eos_in = 1; +} + int h2_io_in_has_eos_for(h2_io *io) { return io->eos_in || (io->bbin && h2_util_has_eos(io->bbin, 0)); @@ -124,16 +130,52 @@ apr_status_t h2_io_out_readx(h2_io *io, h2_io_data_cb *cb, void *ctx, apr_size_t *plen, int *peos) { + apr_status_t status; + + if (io->eos_out) { + *plen = 0; + *peos = 1; + return APR_SUCCESS; + } + if (cb == NULL) { /* just checking length available */ - return h2_util_bb_avail(io->bbout, plen, peos); + status = h2_util_bb_avail(io->bbout, plen, peos); } - return h2_util_bb_readx(io->bbout, cb, ctx, plen, peos); + else { + status = h2_util_bb_readx(io->bbout, cb, ctx, plen, peos); + if (status == APR_SUCCESS) { + io->eos_out = *peos; + } + } + + return status; } apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb, apr_size_t maxlen, int *pfile_handles_allowed) { + apr_status_t status; + int start_allowed; + + if (io->eos_out) { + apr_off_t len; + /* We have already delivered an EOS bucket to a reader, no + * sense in storing anything more here. + */ + status = apr_brigade_length(bb, 1, &len); + if (status == APR_SUCCESS) { + if (len > 0) { + /* someone tries to write real data after EOS, that + * does not look right. */ + status = APR_EOF; + } + /* cleanup, as if we had moved the data */ + apr_brigade_cleanup(bb); + } + return status; + } + /* Let's move the buckets from the request processing in here, so * that the main thread can read them when it has time/capacity. * @@ -144,8 +186,11 @@ apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb, * many open files already buffered. Otherwise we will run out of * file handles. */ - int start_allowed = *pfile_handles_allowed; - apr_status_t status; + start_allowed = *pfile_handles_allowed; + + if (io->rst_error) { + return APR_ECONNABORTED; + } status = h2_util_move(io->bbout, bb, maxlen, pfile_handles_allowed, "h2_io_out_write"); /* track # file buckets moved into our pool */ @@ -158,7 +203,9 @@ apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb, apr_status_t h2_io_out_close(h2_io *io) { - APR_BRIGADE_INSERT_TAIL(io->bbout, - apr_bucket_eos_create(io->bbout->bucket_alloc)); + if (!io->eos_out && !h2_util_has_eos(io->bbout, 0)) { + APR_BRIGADE_INSERT_TAIL(io->bbout, + apr_bucket_eos_create(io->bbout->bucket_alloc)); + } return APR_SUCCESS; } diff --git a/modules/http2/h2_io.h b/modules/http2/h2_io.h index 946ee44334e..23655d21dfa 100644 --- a/modules/http2/h2_io.h +++ b/modules/http2/h2_io.h @@ -33,11 +33,13 @@ struct h2_io { apr_bucket_brigade *bbin; /* input data for stream */ int eos_in; int task_done; + int rst_error; apr_size_t input_consumed; /* how many bytes have been read */ struct apr_thread_cond_t *input_arrived; /* block on reading */ apr_bucket_brigade *bbout; /* output data from stream */ + int eos_out; struct apr_thread_cond_t *output_drained; /* block on writing */ struct h2_response *response;/* submittable response created */ @@ -58,6 +60,11 @@ h2_io *h2_io_create(int id, apr_pool_t *pool, apr_bucket_alloc_t *bucket_alloc); */ void h2_io_destroy(h2_io *io); +/** + * Reset the stream with the given error code. + */ +void h2_io_rst(h2_io *io, int error); + /** * The input data is completely queued. Blocked reads will return immediately * and give either data or EOF. diff --git a/modules/http2/h2_io_set.c b/modules/http2/h2_io_set.c index 91afde8f1f5..74ab508fefe 100644 --- a/modules/http2/h2_io_set.c +++ b/modules/http2/h2_io_set.c @@ -78,19 +78,6 @@ h2_io *h2_io_set_get(h2_io_set *sp, int stream_id) return ps? *ps : NULL; } -h2_io *h2_io_set_get_highest_prio(h2_io_set *set) -{ - h2_io *highest = NULL; - int i; - for (i = 0; i < set->list->nelts; ++i) { - h2_io *io = h2_io_IDX(set->list, i); - if (!highest /*|| io-prio even higher */ ) { - highest = io; - } - } - return highest; -} - static void h2_io_set_sort(h2_io_set *sp) { qsort(sp->list->elts, sp->list->nelts, sp->list->elt_size, @@ -118,28 +105,46 @@ apr_status_t h2_io_set_add(h2_io_set *sp, h2_io *io) return APR_SUCCESS; } +static void remove_idx(h2_io_set *sp, int idx) +{ + int n; + --sp->list->nelts; + n = sp->list->nelts - idx; + if (n > 0) { + /* Close the hole in the array by moving the upper + * parts down one step. + */ + h2_io **selts = (h2_io**)sp->list->elts; + memmove(selts + idx, selts + idx + 1, n * sizeof(h2_io*)); + } +} + h2_io *h2_io_set_remove(h2_io_set *sp, h2_io *io) { int i; for (i = 0; i < sp->list->nelts; ++i) { h2_io *e = h2_io_IDX(sp->list, i); if (e == io) { - int n; - --sp->list->nelts; - n = sp->list->nelts - i; - if (n > 0) { - /* Close the hole in the array by moving the upper - * parts down one step. - */ - h2_io **selts = (h2_io**)sp->list->elts; - memmove(selts+i, selts+i+1, n * sizeof(h2_io*)); - } + remove_idx(sp, i); return e; } } return NULL; } +h2_io *h2_io_set_pop_highest_prio(h2_io_set *set) +{ + /* For now, this just removes the first element in the set. + * the name is misleading... + */ + if (set->list->nelts > 0) { + h2_io *io = h2_io_IDX(set->list, 0); + remove_idx(set, 0); + return io; + } + return NULL; +} + void h2_io_set_destroy_all(h2_io_set *sp) { int i; diff --git a/modules/http2/h2_io_set.h b/modules/http2/h2_io_set.h index a9c6546c701..5e7555af92e 100644 --- a/modules/http2/h2_io_set.h +++ b/modules/http2/h2_io_set.h @@ -30,7 +30,6 @@ void h2_io_set_destroy(h2_io_set *set); apr_status_t h2_io_set_add(h2_io_set *set, struct h2_io *io); h2_io *h2_io_set_get(h2_io_set *set, int stream_id); -h2_io *h2_io_set_get_highest_prio(h2_io_set *set); h2_io *h2_io_set_remove(h2_io_set *set, struct h2_io *io); void h2_io_set_remove_all(h2_io_set *set); @@ -44,4 +43,6 @@ typedef int h2_io_set_iter_fn(void *ctx, struct h2_io *io); void h2_io_set_iter(h2_io_set *set, h2_io_set_iter_fn *iter, void *ctx); +h2_io *h2_io_set_pop_highest_prio(h2_io_set *set); + #endif /* defined(__mod_h2__h2_io_set__) */ diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 2d07b1eb6c3..b1513db10e3 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -41,6 +41,7 @@ #include "h2_task_output.h" #include "h2_task_queue.h" #include "h2_workers.h" +#include "h2_util.h" static int is_aborted(h2_mplx *m, apr_status_t *pstatus) { @@ -143,12 +144,18 @@ static void reference(h2_mplx *m) apr_atomic_inc32(&m->refs); } -static void release(h2_mplx *m) +static void release(h2_mplx *m, int lock) { if (!apr_atomic_dec32(&m->refs)) { + if (lock) { + apr_thread_mutex_lock(m->lock); + } if (m->join_wait) { apr_thread_cond_signal(m->join_wait); } + if (lock) { + apr_thread_mutex_unlock(m->lock); + } } } @@ -158,7 +165,7 @@ void h2_mplx_reference(h2_mplx *m) } void h2_mplx_release(h2_mplx *m) { - release(m); + release(m, 1); } static void workers_register(h2_mplx *m) { @@ -187,29 +194,17 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) status = apr_thread_mutex_lock(m->lock); if (APR_SUCCESS == status) { - int attempts = 0; - - release(m); + release(m, 0); while (apr_atomic_read32(&m->refs) > 0) { m->join_wait = wait; - ap_log_cerror(APLOG_MARK, (attempts? APLOG_INFO : APLOG_DEBUG), - 0, m->c, + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, "h2_mplx(%ld): release_join, refs=%d, waiting...", m->id, m->refs); - apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(10)); - if (++attempts >= 6) { - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, - APLOGNO(02952) - "h2_mplx(%ld): join attempts exhausted, refs=%d", - m->id, m->refs); - break; - } - } - if (m->join_wait) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, - "h2_mplx(%ld): release_join -> destroy", m->id); + apr_thread_cond_wait(wait, m->lock); } m->join_wait = NULL; + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, + "h2_mplx(%ld): release_join -> destroy", m->id); apr_thread_mutex_unlock(m->lock); h2_mplx_destroy(m); } @@ -298,6 +293,13 @@ apr_status_t h2_mplx_cleanup_stream(h2_mplx *m, h2_stream *stream) stream_destroy(m, stream, io); } else { + if (stream->rst_error) { + /* Forward error code to fail any further attempt to + * write to io */ + h2_io_rst(io, stream->rst_error); + } + /* Remove io from ready set (if there), since we will never submit it */ + h2_io_set_remove(m->ready_ios, io); /* Add stream to closed set for cleanup when task is done */ h2_stream_set_add(m->closed, stream); } @@ -345,7 +347,7 @@ apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block, if (io) { io->input_arrived = iowait; status = h2_io_in_read(io, bb, 0); - while (status == APR_EAGAIN + while (APR_STATUS_IS_EAGAIN(status) && !is_aborted(m, &status) && block == APR_BLOCK_READ) { apr_thread_cond_wait(io->input_arrived, m->lock); @@ -454,6 +456,13 @@ apr_status_t h2_mplx_in_update_windows(h2_mplx *m, return status; } +#define H2_MPLX_IO_OUT(lvl,m,io,msg) \ + do { \ + if (APLOG_C_IS_LEVEL((m)->c,lvl)) \ + h2_util_bb_log((m)->c,(io)->id,lvl,msg,(io)->bbout); \ + } while(0) + + apr_status_t h2_mplx_out_readx(h2_mplx *m, int stream_id, h2_io_data_cb *cb, void *ctx, apr_size_t *plen, int *peos) @@ -467,8 +476,12 @@ apr_status_t h2_mplx_out_readx(h2_mplx *m, int stream_id, if (APR_SUCCESS == status) { h2_io *io = h2_io_set_get(m->stream_ios, stream_id); if (io) { + H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_readx_pre"); + status = h2_io_out_readx(io, cb, ctx, plen, peos); - if (status == APR_SUCCESS && io->output_drained) { + + H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_readx_post"); + if (status == APR_SUCCESS && cb && io->output_drained) { apr_thread_cond_signal(io->output_drained); } } @@ -490,24 +503,30 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_stream_set *streams) } status = apr_thread_mutex_lock(m->lock); if (APR_SUCCESS == status) { - h2_io *io = h2_io_set_get_highest_prio(m->ready_ios); + h2_io *io = h2_io_set_pop_highest_prio(m->ready_ios); if (io) { - h2_response *response = io->response; - - AP_DEBUG_ASSERT(response); - h2_io_set_remove(m->ready_ios, io); - - stream = h2_stream_set_get(streams, response->stream_id); + stream = h2_stream_set_get(streams, io->id); if (stream) { - h2_stream_set_response(stream, response, io->bbout); + if (io->rst_error) { + h2_stream_rst(stream, io->rst_error); + } + else { + AP_DEBUG_ASSERT(io->response); + h2_stream_set_response(stream, io->response, io->bbout); + } + if (io->output_drained) { apr_thread_cond_signal(io->output_drained); } } else { - ap_log_cerror(APLOG_MARK, APLOG_WARNING, APR_NOTFOUND, m->c, - APLOGNO(02953) "h2_mplx(%ld): stream for response %d", - m->id, response->stream_id); + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(02953) + "h2_mplx(%ld): stream for response %d not found", + m->id, io->id); + /* We have the io ready, but the stream has gone away, maybe + * reset by the client. Should no longer happen since such + * streams should clear io's from the ready queue. + */ } } apr_thread_mutex_unlock(m->lock); @@ -531,7 +550,6 @@ static apr_status_t out_write(h2_mplx *m, h2_io *io, status = h2_io_out_write(io, bb, m->stream_max_mem, &m->file_handles_allowed); - /* Wait for data to drain until there is room again */ while (!APR_BRIGADE_EMPTY(bb) && iowait @@ -549,6 +567,7 @@ static apr_status_t out_write(h2_mplx *m, h2_io *io, } } apr_brigade_cleanup(bb); + return status; } @@ -592,6 +611,9 @@ apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response, status = apr_thread_mutex_lock(m->lock); if (APR_SUCCESS == status) { status = out_open(m, stream_id, response, f, bb, iowait); + if (APLOGctrace1(m->c)) { + h2_util_bb_log(m->c, stream_id, APLOG_TRACE1, "h2_mplx_out_open", bb); + } if (m->aborted) { return APR_ECONNABORTED; } @@ -616,6 +638,8 @@ apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id, h2_io *io = h2_io_set_get(m->stream_ios, stream_id); if (io) { status = out_write(m, io, f, bb, iowait); + H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_write"); + have_out_data_for(m, stream_id); if (m->aborted) { return APR_ECONNABORTED; @@ -645,7 +669,7 @@ apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id) if (!m->aborted) { h2_io *io = h2_io_set_get(m->stream_ios, stream_id); if (io) { - if (!io->response || !io->response->ngheader) { + if (!io->response && !io->rst_error) { /* In case a close comes before a response was created, * insert an error one so that our streams can properly * reset. @@ -653,8 +677,48 @@ apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id) h2_response *r = h2_response_create(stream_id, "500", NULL, m->pool); status = out_open(m, stream_id, r, NULL, NULL, NULL); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c, + "h2_mplx(%ld-%d): close, no response, no rst", + m->id, io->id); } status = h2_io_out_close(io); + H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_close"); + + have_out_data_for(m, stream_id); + if (m->aborted) { + /* if we were the last output, the whole session might + * have gone down in the meantime. + */ + return APR_SUCCESS; + } + } + else { + status = APR_ECONNABORTED; + } + } + apr_thread_mutex_unlock(m->lock); + } + return status; +} + +apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error) +{ + apr_status_t status; + AP_DEBUG_ASSERT(m); + if (m->aborted) { + return APR_ECONNABORTED; + } + status = apr_thread_mutex_lock(m->lock); + if (APR_SUCCESS == status) { + if (!m->aborted) { + h2_io *io = h2_io_set_get(m->stream_ios, stream_id); + if (io && !io->rst_error) { + h2_io_rst(io, error); + if (!io->response) { + h2_io_set_add(m->ready_ios, io); + } + H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_rst"); + have_out_data_for(m, stream_id); if (m->aborted) { /* if we were the last output, the whole session might diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index 62977d6157c..5cb40af16fa 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -96,6 +96,7 @@ void h2_mplx_reference(h2_mplx *m); * Decreases the reference counter of this mplx. */ void h2_mplx_release(h2_mplx *m); + /** * Decreases the reference counter of this mplx and waits for it * to reached 0, destroy the mplx afterwards. @@ -247,6 +248,8 @@ apr_status_t h2_mplx_out_write(h2_mplx *mplx, int stream_id, */ apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id); +apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error); + /******************************************************************************* * h2_mplx list Manipulation. ******************************************************************************/ diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index c3456a0654d..d8a8735afac 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -41,17 +41,16 @@ static int frame_print(const nghttp2_frame *frame, char *buffer, size_t maxlen); static int h2_session_status_from_apr_status(apr_status_t rv) { - switch (rv) { - case APR_SUCCESS: - return NGHTTP2_NO_ERROR; - case APR_EAGAIN: - case APR_TIMEUP: - return NGHTTP2_ERR_WOULDBLOCK; - case APR_EOF: + if (rv == APR_SUCCESS) { + return NGHTTP2_NO_ERROR; + } + else if (APR_STATUS_IS_EAGAIN(rv)) { + return NGHTTP2_ERR_WOULDBLOCK; + } + else if (APR_STATUS_IS_EOF(rv)) { return NGHTTP2_ERR_EOF; - default: - return NGHTTP2_ERR_PROTO; } + return NGHTTP2_ERR_PROTO; } static int stream_open(h2_session *session, int stream_id) @@ -107,7 +106,7 @@ static ssize_t send_cb(nghttp2_session *ngh2, if (status == APR_SUCCESS) { return length; } - if (status == APR_EAGAIN || status == APR_TIMEUP) { + if (APR_STATUS_IS_EAGAIN(status)) { return NGHTTP2_ERR_WOULDBLOCK; } ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, @@ -243,6 +242,7 @@ static apr_status_t stream_destroy(h2_session *session, "h2_stream(%ld-%d): closing with err=%d %s", session->id, (int)stream->id, (int)error_code, nghttp2_strerror(error_code)); + h2_stream_rst(stream, error_code); } h2_stream_set_remove(session->streams, stream); @@ -508,12 +508,11 @@ static int on_send_data_cb(nghttp2_session *ngh2, if (status == APR_SUCCESS) { return 0; } - else if (status != APR_EOF) { - ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c, + else { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO(02925) "h2_stream(%ld-%d): failed send_data_cb", session->id, (int)stream_id); - return NGHTTP2_ERR_CALLBACK_FAILURE; } return h2_session_status_from_apr_status(status); @@ -731,10 +730,12 @@ apr_status_t h2_session_abort(h2_session *session, apr_status_t reason, int rv) rv = 0; /* ...gracefully shut down */ break; case APR_EBADF: /* connection unusable, terminate silently */ - case APR_ECONNABORTED: - rv = NGHTTP2_ERR_EOF; - break; default: + if (APR_STATUS_IS_ECONNABORTED(reason) + || APR_STATUS_IS_ECONNRESET(reason) + || APR_STATUS_IS_EBADF(reason)) { + rv = NGHTTP2_ERR_EOF; + } break; } } @@ -910,7 +911,7 @@ apr_status_t h2_session_write(h2_session *session, apr_interval_time_t timeout) if (status == APR_SUCCESS) { flush_output = 1; } - else if (status != APR_EAGAIN) { + else if (!APR_STATUS_IS_EAGAIN(status)) { return status; } @@ -1062,6 +1063,10 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s, case APR_SUCCESS: break; + case APR_ECONNRESET: + return nghttp2_submit_rst_stream(ng2s, NGHTTP2_FLAG_NONE, + stream->id, stream->rst_error); + case APR_EAGAIN: /* If there is no data available, our session will automatically * suspend this stream and not ask for more data until we resume @@ -1141,11 +1146,15 @@ apr_status_t h2_session_handle_response(h2_session *session, h2_stream *stream) int rv = 0; AP_DEBUG_ASSERT(session); AP_DEBUG_ASSERT(stream); - AP_DEBUG_ASSERT(stream->response); + AP_DEBUG_ASSERT(stream->response || stream->rst_error); - if (stream->response->ngheader) { + if (stream->response && stream->response->ngheader) { rv = submit_response(session, stream->response); } + else if (stream->rst_error) { + rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE, + stream->id, stream->rst_error); + } else { rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE, stream->id, NGHTTP2_PROTOCOL_ERROR); diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c index 52781d8474f..d5a716f4490 100644 --- a/modules/http2/h2_stream.c +++ b/modules/http2/h2_stream.c @@ -86,11 +86,6 @@ apr_status_t h2_stream_destroy(h2_stream *stream) return APR_SUCCESS; } -void h2_stream_attach_pool(h2_stream *stream, apr_pool_t *pool) -{ - stream->pool = pool; -} - apr_pool_t *h2_stream_detach_pool(h2_stream *stream) { apr_pool_t *pool = stream->pool; @@ -98,25 +93,41 @@ apr_pool_t *h2_stream_detach_pool(h2_stream *stream) return pool; } -void h2_stream_abort(h2_stream *stream) +void h2_stream_rst(h2_stream *stream, int error_code) { - AP_DEBUG_ASSERT(stream); - stream->aborted = 1; + stream->rst_error = error_code; + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->m->c, + "h2_stream(%ld-%d): reset, error=%d", + stream->m->id, stream->id, error_code); } apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response, apr_bucket_brigade *bb) { + apr_status_t status = APR_SUCCESS; + stream->response = response; if (bb && !APR_BRIGADE_EMPTY(bb)) { if (!stream->bbout) { stream->bbout = apr_brigade_create(stream->pool, stream->m->c->bucket_alloc); } - return h2_util_move(stream->bbout, bb, 16 * 1024, NULL, - "h2_stream_set_response"); + status = h2_util_move(stream->bbout, bb, 16 * 1024, NULL, + "h2_stream_set_response"); } - return APR_SUCCESS; + if (APLOGctrace1(stream->m->c)) { + apr_size_t len = 0; + int eos = 0; + if (stream->bbout) { + h2_util_bb_avail(stream->bbout, &len, &eos); + } + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, stream->m->c, + "h2_stream(%ld-%d): set_response(%s), brigade=%s, " + "len=%ld, eos=%d", + stream->m->id, stream->id, response->status, + (stream->bbout? "yes" : "no"), (long)len, (int)eos); + } + return status; } static int set_closed(h2_stream *stream) @@ -141,6 +152,9 @@ apr_status_t h2_stream_rwrite(h2_stream *stream, request_rec *r) { apr_status_t status; AP_DEBUG_ASSERT(stream); + if (stream->rst_error) { + return APR_ECONNRESET; + } set_state(stream, H2_STREAM_ST_OPEN); status = h2_request_rwrite(stream->request, r, stream->m); return status; @@ -151,6 +165,9 @@ apr_status_t h2_stream_write_eoh(h2_stream *stream, int eos) apr_status_t status; AP_DEBUG_ASSERT(stream); + if (stream->rst_error) { + return APR_ECONNRESET; + } /* Seeing the end-of-headers, we have everything we need to * start processing it. */ @@ -180,6 +197,9 @@ apr_status_t h2_stream_write_eos(h2_stream *stream) ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->m->c, "h2_stream(%ld-%d): closing input", stream->m->id, stream->id); + if (stream->rst_error) { + return APR_ECONNRESET; + } if (set_closed(stream)) { return h2_request_close(stream->request); } @@ -191,6 +211,9 @@ apr_status_t h2_stream_write_header(h2_stream *stream, const char *value, size_t vlen) { AP_DEBUG_ASSERT(stream); + if (stream->rst_error) { + return APR_ECONNRESET; + } switch (stream->state) { case H2_STREAM_ST_IDLE: set_state(stream, H2_STREAM_ST_OPEN); @@ -208,7 +231,9 @@ apr_status_t h2_stream_write_data(h2_stream *stream, const char *data, size_t len) { AP_DEBUG_ASSERT(stream); - AP_DEBUG_ASSERT(stream); + if (stream->rst_error) { + return APR_ECONNRESET; + } switch (stream->state) { case H2_STREAM_ST_OPEN: break; @@ -224,6 +249,9 @@ apr_status_t h2_stream_prep_read(h2_stream *stream, apr_status_t status = APR_SUCCESS; const char *src; + if (stream->rst_error) { + return APR_ECONNRESET; + } if (stream->bbout && !APR_BRIGADE_EMPTY(stream->bbout)) { src = "stream"; status = h2_util_bb_avail(stream->bbout, plen, peos); @@ -251,6 +279,9 @@ apr_status_t h2_stream_readx(h2_stream *stream, h2_io_data_cb *cb, void *ctx, apr_size_t *plen, int *peos) { + if (stream->rst_error) { + return APR_ECONNRESET; + } if (stream->bbout && !APR_BRIGADE_EMPTY(stream->bbout)) { return h2_util_bb_readx(stream->bbout, cb, ctx, plen, peos); } diff --git a/modules/http2/h2_stream.h b/modules/http2/h2_stream.h index 0608f2f3400..c2bb9af7494 100644 --- a/modules/http2/h2_stream.h +++ b/modules/http2/h2_stream.h @@ -66,6 +66,7 @@ struct h2_stream { struct h2_task *task; /* task created for this stream */ struct h2_response *response; /* the response, once ready */ apr_bucket_brigade *bbout; /* output DATA */ + int rst_error; /* stream error for RST_STREAM */ }; @@ -73,10 +74,9 @@ h2_stream *h2_stream_create(int id, apr_pool_t *pool, struct h2_mplx *m); apr_status_t h2_stream_destroy(h2_stream *stream); -apr_pool_t *h2_stream_detach_pool(h2_stream *stream); -void h2_stream_attach_pool(h2_stream *stream, apr_pool_t *pool); +void h2_stream_rst(h2_stream *streamm, int error_code); -void h2_stream_abort(h2_stream *stream); +apr_pool_t *h2_stream_detach_pool(h2_stream *stream); apr_status_t h2_stream_rwrite(h2_stream *stream, request_rec *r); diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c index bbea7b20f8a..58b39c53970 100644 --- a/modules/http2/h2_task.c +++ b/modules/http2/h2_task.c @@ -379,8 +379,6 @@ static request_rec *h2_task_create_request(h2_task_env *env) /* Time to populate r with the data we have. */ r->request_time = apr_time_now(); - r->the_request = apr_psprintf(r->pool, "%s %s HTTP/1.1", - env->method, env->path); r->method = env->method; /* Provide quick information about the request method as soon as known */ r->method_number = ap_method_number_of(r->method); @@ -391,6 +389,9 @@ static request_rec *h2_task_create_request(h2_task_env *env) ap_parse_uri(r, env->path); r->protocol = (char*)"HTTP/2"; r->proto_num = HTTP_VERSION(2, 0); + + r->the_request = apr_psprintf(r->pool, "%s %s %s", + r->method, env->path, r->protocol); /* update what we think the virtual host is based on the headers we've * now read. may update status. diff --git a/modules/http2/h2_util.c b/modules/http2/h2_util.c index 9d141be93bf..6f29b461d66 100644 --- a/modules/http2/h2_util.c +++ b/modules/http2/h2_util.c @@ -20,6 +20,7 @@ #include #include #include +#include #include @@ -647,3 +648,71 @@ apr_status_t h2_util_bb_readx(apr_bucket_brigade *bb, return status; } +void h2_util_bb_log(conn_rec *c, int stream_id, int level, + const char *tag, apr_bucket_brigade *bb) +{ + char buffer[16 * 1024]; + const char *line = "(null)"; + apr_size_t bmax = sizeof(buffer)/sizeof(buffer[0]); + int off = 0; + apr_bucket *b; + + if (bb) { + memset(buffer, 0, bmax--); + for (b = APR_BRIGADE_FIRST(bb); + bmax && (b != APR_BRIGADE_SENTINEL(bb)); + b = APR_BUCKET_NEXT(b)) { + + if (APR_BUCKET_IS_METADATA(b)) { + if (APR_BUCKET_IS_EOS(b)) { + off += apr_snprintf(buffer+off, bmax-off, "eos "); + } + else if (APR_BUCKET_IS_FLUSH(b)) { + off += apr_snprintf(buffer+off, bmax-off, "flush "); + } + else if (AP_BUCKET_IS_EOR(b)) { + off += apr_snprintf(buffer+off, bmax-off, "eor "); + } + else { + off += apr_snprintf(buffer+off, bmax-off, "meta(unknown) "); + } + } + else { + const char *btype = "data"; + if (APR_BUCKET_IS_FILE(b)) { + btype = "file"; + } + else if (APR_BUCKET_IS_PIPE(b)) { + btype = "pipe"; + } + else if (APR_BUCKET_IS_SOCKET(b)) { + btype = "socket"; + } + else if (APR_BUCKET_IS_HEAP(b)) { + btype = "heap"; + } + else if (APR_BUCKET_IS_TRANSIENT(b)) { + btype = "transient"; + } + else if (APR_BUCKET_IS_IMMORTAL(b)) { + btype = "immortal"; + } + else if (APR_BUCKET_IS_MMAP(b)) { + btype = "mmap"; + } + else if (APR_BUCKET_IS_POOL(b)) { + btype = "pool"; + } + + off += apr_snprintf(buffer+off, bmax-off, "%s[%ld] ", + btype, + (long)(b->length == ((apr_size_t)-1)? + -1 : b->length)); + } + } + line = *buffer? buffer : "(empty)"; + } + ap_log_cerror(APLOG_MARK, level, 0, c, "bb_dump(%ld-%d)-%s: %s", + c->id, stream_id, tag, line); + +} diff --git a/modules/http2/h2_util.h b/modules/http2/h2_util.h index 9a1b5c6d35b..e1a6b3c4d20 100644 --- a/modules/http2/h2_util.h +++ b/modules/http2/h2_util.h @@ -117,8 +117,32 @@ apr_status_t h2_util_bb_avail(apr_bucket_brigade *bb, typedef apr_status_t h2_util_pass_cb(void *ctx, const char *data, apr_size_t len); +/** + * Read at most *plen bytes from the brigade and pass them into the + * given callback. If cb is NULL, just return the amount of data that + * could have been read. + * If an EOS was/would be encountered, set *peos != 0. + * @param bb the brigade to read from + * @param cb the callback to invoke for the read data + * @param ctx optional data passed to callback + * @param plen inout, as input gives the maximum number of bytes to read, + * on return specifies the actual/would be number of bytes + * @param peos != 0 iff an EOS bucket was/would be encountered. + */ apr_status_t h2_util_bb_readx(apr_bucket_brigade *bb, h2_util_pass_cb *cb, void *ctx, apr_size_t *plen, int *peos); +/** + * Logs the bucket brigade (which bucket types with what length) + * to the log at the given level. + * @param c the connection to log for + * @param stream_id the stream identifier this brigade belongs to + * @param level the log level (as in APLOG_*) + * @param tag a short message text about the context + * @param bb the brigade to log + */ +void h2_util_bb_log(conn_rec *c, int stream_id, int level, + const char *tag, apr_bucket_brigade *bb); + #endif /* defined(__mod_h2__h2_util__) */ diff --git a/modules/http2/h2_version.h b/modules/http2/h2_version.h index 7a03865c87c..dc7c5772211 100644 --- a/modules/http2/h2_version.h +++ b/modules/http2/h2_version.h @@ -20,7 +20,7 @@ * @macro * Version number of the h2 module as c string */ -#define MOD_HTTP2_VERSION "1.0.0" +#define MOD_HTTP2_VERSION "1.0.1-DEV" /** * @macro @@ -28,7 +28,7 @@ * release. This is a 24 bit number with 8 bits for major number, 8 bits * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203. */ -#define MOD_HTTP2_VERSION_NUM 0x010000 +#define MOD_HTTP2_VERSION_NUM 0x010001 #endif /* mod_h2_h2_version_h */ diff --git a/modules/http2/h2_worker.c b/modules/http2/h2_worker.c index 8145b7aaa5f..297b4b21fe6 100644 --- a/modules/http2/h2_worker.c +++ b/modules/http2/h2_worker.c @@ -71,6 +71,19 @@ static void* APR_THREAD_FUNC execute(apr_thread_t *thread, void *wctx) return NULL; } +static apr_status_t cleanup_join_thread(void *ctx) +{ + h2_worker *w = ctx; + /* do the join only when the worker is aborted. Otherwise, + * we are probably in a process shutdown. + */ + if (w->thread && w->aborted) { + apr_status_t rv; + apr_thread_join(&rv, w->thread); + } + return APR_SUCCESS; +} + h2_worker *h2_worker_create(int id, apr_pool_t *parent_pool, apr_threadattr_t *attr, @@ -110,6 +123,7 @@ h2_worker *h2_worker_create(int id, return NULL; } + apr_pool_pre_cleanup_register(pool, w, cleanup_join_thread); apr_thread_create(&w->thread, attr, execute, w, pool); } return w; diff --git a/modules/http2/h2_workers.c b/modules/http2/h2_workers.c index cf3009585b7..18f39d136c9 100644 --- a/modules/http2/h2_workers.c +++ b/modules/http2/h2_workers.c @@ -42,6 +42,22 @@ static int in_list(h2_workers *workers, h2_mplx *m) return 0; } +static void cleanup_zombies(h2_workers *workers, int lock) { + if (lock) { + apr_thread_mutex_lock(workers->lock); + } + while (!H2_WORKER_LIST_EMPTY(&workers->zombies)) { + h2_worker *zombie = H2_WORKER_LIST_FIRST(&workers->zombies); + H2_WORKER_REMOVE(zombie); + ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s, + "h2_workers: cleanup zombie %d", zombie->id); + h2_worker_destroy(zombie); + } + if (lock) { + apr_thread_mutex_unlock(workers->lock); + } +} + /** * Get the next task for the given worker. Will block until a task arrives @@ -123,23 +139,12 @@ static apr_status_t get_mplx_next(h2_worker *worker, h2_mplx **pm, if (!task) { /* Need to wait for either a new mplx to arrive. */ + cleanup_zombies(workers, 0); + if (workers->worker_count > workers->min_size) { apr_time_t now = apr_time_now(); if (now >= (start_wait + max_wait)) { /* waited long enough without getting a task. */ - status = APR_TIMEUP; - } - else { - ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s, - "h2_worker(%d): waiting signal, " - "worker_count=%d", worker->id, - (int)workers->worker_count); - status = apr_thread_cond_timedwait(workers->mplx_added, - workers->lock, max_wait); - } - - if (status == APR_TIMEUP) { - /* waited long enough */ if (workers->worker_count > workers->min_size) { ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s, @@ -148,6 +153,12 @@ static apr_status_t get_mplx_next(h2_worker *worker, h2_mplx **pm, break; } } + ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s, + "h2_worker(%d): waiting signal, " + "worker_count=%d", worker->id, + (int)workers->worker_count); + apr_thread_cond_timedwait(workers->mplx_added, + workers->lock, max_wait); } else { ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s, @@ -194,11 +205,11 @@ static void worker_done(h2_worker *worker, void *ctx) h2_workers *workers = (h2_workers *)ctx; apr_status_t status = apr_thread_mutex_lock(workers->lock); if (status == APR_SUCCESS) { - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s, + ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s, "h2_worker(%d): done", h2_worker_get_id(worker)); H2_WORKER_REMOVE(worker); --workers->worker_count; - h2_worker_destroy(worker); + H2_WORKER_LIST_INSERT_TAIL(&workers->zombies, worker); apr_thread_mutex_unlock(workers->lock); } @@ -213,7 +224,7 @@ static apr_status_t add_worker(h2_workers *workers) if (!w) { return APR_ENOMEM; } - ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s, + ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s, "h2_workers: adding worker(%d)", h2_worker_get_id(w)); ++workers->worker_count; H2_WORKER_LIST_INSERT_TAIL(&workers->workers, w); @@ -235,15 +246,22 @@ static apr_status_t h2_workers_start(h2_workers *workers) { return status; } -h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pool, +h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool, int min_size, int max_size) { apr_status_t status; h2_workers *workers; + apr_pool_t *pool; + AP_DEBUG_ASSERT(s); - AP_DEBUG_ASSERT(pool); - status = APR_SUCCESS; + AP_DEBUG_ASSERT(server_pool); + /* let's have our own pool that will be parent to all h2_worker + * instances we create. This happens in various threads, but always + * guarded by our lock. Without this pool, all subpool creations would + * happen on the pool handed to us, which we do not guard. + */ + apr_pool_create(&pool, server_pool); workers = apr_pcalloc(pool, sizeof(h2_workers)); if (workers) { workers->s = s; @@ -255,6 +273,7 @@ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pool, apr_threadattr_create(&workers->thread_attr, workers->pool); APR_RING_INIT(&workers->workers, h2_worker, link); + APR_RING_INIT(&workers->zombies, h2_worker, link); APR_RING_INIT(&workers->mplxs, h2_mplx, link); status = apr_thread_mutex_create(&workers->lock, @@ -278,6 +297,9 @@ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pool, void h2_workers_destroy(h2_workers *workers) { + /* before we go, cleanup any zombie workers that may have accumulated */ + cleanup_zombies(workers, 1); + if (workers->mplx_added) { apr_thread_cond_destroy(workers->mplx_added); workers->mplx_added = NULL; @@ -294,6 +316,10 @@ void h2_workers_destroy(h2_workers *workers) h2_worker *w = H2_WORKER_LIST_FIRST(&workers->workers); H2_WORKER_REMOVE(w); } + if (workers->pool) { + apr_pool_destroy(workers->pool); + /* workers is gone */ + } } apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m) @@ -320,6 +346,9 @@ apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m) add_worker(workers); } + /* cleanup any zombie workers that may have accumulated */ + cleanup_zombies(workers, 0); + apr_thread_mutex_unlock(workers->lock); } return status; @@ -334,6 +363,9 @@ apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m) H2_MPLX_REMOVE(m); status = APR_SUCCESS; } + /* cleanup any zombie workers that may have accumulated */ + cleanup_zombies(workers, 0); + apr_thread_mutex_unlock(workers->lock); } return status; diff --git a/modules/http2/h2_workers.h b/modules/http2/h2_workers.h index 50fd6b8ad58..99aa1f4daf7 100644 --- a/modules/http2/h2_workers.h +++ b/modules/http2/h2_workers.h @@ -42,6 +42,7 @@ struct h2_workers { apr_threadattr_t *thread_attr; APR_RING_HEAD(h2_worker_list, h2_worker) workers; + APR_RING_HEAD(h2_worker_zombies, h2_worker) zombies; APR_RING_HEAD(h2_mplx_list, h2_mplx) mplxs; int worker_count;