From: Stefan Eissing Date: Fri, 9 Oct 2015 14:24:46 +0000 (+0000) Subject: avoid double eos buckets on stream output brigades, add stream rst infrstructure... X-Git-Tag: 2.5.0-alpha~2732 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=25b96f15f82fcc31f64c76ead5240e76694b4e4b;p=thirdparty%2Fapache%2Fhttpd.git avoid double eos buckets on stream output brigades, add stream rst infrstructure, bump to 1.0.1-DEV git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1707735 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/modules/http2/h2_io.c b/modules/http2/h2_io.c index 42734430fa4..9f699aa9b53 100644 --- a/modules/http2/h2_io.c +++ b/modules/http2/h2_io.c @@ -47,6 +47,12 @@ void h2_io_destroy(h2_io *io) h2_io_cleanup(io); } +void h2_io_rst(h2_io *io, int error) +{ + io->rst_error = error; + io->eos_in = 1; +} + int h2_io_in_has_eos_for(h2_io *io) { return io->eos_in || (io->bbin && h2_util_has_eos(io->bbin, 0)); @@ -124,16 +130,52 @@ apr_status_t h2_io_out_readx(h2_io *io, h2_io_data_cb *cb, void *ctx, apr_size_t *plen, int *peos) { + apr_status_t status; + + if (io->eos_out) { + *plen = 0; + *peos = 1; + return APR_SUCCESS; + } + if (cb == NULL) { /* just checking length available */ - return h2_util_bb_avail(io->bbout, plen, peos); + status = h2_util_bb_avail(io->bbout, plen, peos); } - return h2_util_bb_readx(io->bbout, cb, ctx, plen, peos); + else { + status = h2_util_bb_readx(io->bbout, cb, ctx, plen, peos); + if (status == APR_SUCCESS) { + io->eos_out = *peos; + } + } + + return status; } apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb, apr_size_t maxlen, int *pfile_handles_allowed) { + apr_status_t status; + int start_allowed; + + if (io->eos_out) { + apr_off_t len; + /* We have already delivered an EOS bucket to a reader, no + * sense in storing anything more here. + */ + status = apr_brigade_length(bb, 1, &len); + if (status == APR_SUCCESS) { + if (len > 0) { + /* someone tries to write real data after EOS, that + * does not look right. */ + status = APR_EOF; + } + /* cleanup, as if we had moved the data */ + apr_brigade_cleanup(bb); + } + return status; + } + /* Let's move the buckets from the request processing in here, so * that the main thread can read them when it has time/capacity. * @@ -144,8 +186,11 @@ apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb, * many open files already buffered. Otherwise we will run out of * file handles. */ - int start_allowed = *pfile_handles_allowed; - apr_status_t status; + start_allowed = *pfile_handles_allowed; + + if (io->rst_error) { + return APR_ECONNABORTED; + } status = h2_util_move(io->bbout, bb, maxlen, pfile_handles_allowed, "h2_io_out_write"); /* track # file buckets moved into our pool */ @@ -158,7 +203,9 @@ apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb, apr_status_t h2_io_out_close(h2_io *io) { - APR_BRIGADE_INSERT_TAIL(io->bbout, - apr_bucket_eos_create(io->bbout->bucket_alloc)); + if (!io->eos_out && !h2_util_has_eos(io->bbout, 0)) { + APR_BRIGADE_INSERT_TAIL(io->bbout, + apr_bucket_eos_create(io->bbout->bucket_alloc)); + } return APR_SUCCESS; } diff --git a/modules/http2/h2_io.h b/modules/http2/h2_io.h index 946ee44334e..23655d21dfa 100644 --- a/modules/http2/h2_io.h +++ b/modules/http2/h2_io.h @@ -33,11 +33,13 @@ struct h2_io { apr_bucket_brigade *bbin; /* input data for stream */ int eos_in; int task_done; + int rst_error; apr_size_t input_consumed; /* how many bytes have been read */ struct apr_thread_cond_t *input_arrived; /* block on reading */ apr_bucket_brigade *bbout; /* output data from stream */ + int eos_out; struct apr_thread_cond_t *output_drained; /* block on writing */ struct h2_response *response;/* submittable response created */ @@ -58,6 +60,11 @@ h2_io *h2_io_create(int id, apr_pool_t *pool, apr_bucket_alloc_t *bucket_alloc); */ void h2_io_destroy(h2_io *io); +/** + * Reset the stream with the given error code. + */ +void h2_io_rst(h2_io *io, int error); + /** * The input data is completely queued. Blocked reads will return immediately * and give either data or EOF. diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 2d07b1eb6c3..3b5207677c7 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -41,6 +41,7 @@ #include "h2_task_output.h" #include "h2_task_queue.h" #include "h2_workers.h" +#include "h2_util.h" static int is_aborted(h2_mplx *m, apr_status_t *pstatus) { @@ -454,6 +455,13 @@ apr_status_t h2_mplx_in_update_windows(h2_mplx *m, return status; } +#define H2_MPLX_IO_OUT(lvl,m,io,msg) \ + do { \ + if (APLOG_C_IS_LEVEL((m)->c,lvl)) \ + h2_util_bb_log((m)->c,(io)->id,lvl,msg,(io)->bbout); \ + } while(0) + + apr_status_t h2_mplx_out_readx(h2_mplx *m, int stream_id, h2_io_data_cb *cb, void *ctx, apr_size_t *plen, int *peos) @@ -467,8 +475,12 @@ apr_status_t h2_mplx_out_readx(h2_mplx *m, int stream_id, if (APR_SUCCESS == status) { h2_io *io = h2_io_set_get(m->stream_ios, stream_id); if (io) { + H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_readx_pre"); + status = h2_io_out_readx(io, cb, ctx, plen, peos); - if (status == APR_SUCCESS && io->output_drained) { + + H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_readx_post"); + if (status == APR_SUCCESS && cb && io->output_drained) { apr_thread_cond_signal(io->output_drained); } } @@ -492,14 +504,18 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_stream_set *streams) if (APR_SUCCESS == status) { h2_io *io = h2_io_set_get_highest_prio(m->ready_ios); if (io) { - h2_response *response = io->response; - - AP_DEBUG_ASSERT(response); - h2_io_set_remove(m->ready_ios, io); - stream = h2_stream_set_get(streams, response->stream_id); + stream = h2_stream_set_get(streams, io->id); if (stream) { - h2_stream_set_response(stream, response, io->bbout); + if (io->rst_error) { + h2_stream_rst(stream, io->rst_error); + } + else { + AP_DEBUG_ASSERT(io->response); + h2_stream_set_response(stream, io->response, io->bbout); + } + + h2_io_set_remove(m->ready_ios, io); if (io->output_drained) { apr_thread_cond_signal(io->output_drained); } @@ -507,7 +523,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_stream_set *streams) else { ap_log_cerror(APLOG_MARK, APLOG_WARNING, APR_NOTFOUND, m->c, APLOGNO(02953) "h2_mplx(%ld): stream for response %d", - m->id, response->stream_id); + m->id, io->id); } } apr_thread_mutex_unlock(m->lock); @@ -531,7 +547,6 @@ static apr_status_t out_write(h2_mplx *m, h2_io *io, status = h2_io_out_write(io, bb, m->stream_max_mem, &m->file_handles_allowed); - /* Wait for data to drain until there is room again */ while (!APR_BRIGADE_EMPTY(bb) && iowait @@ -549,6 +564,7 @@ static apr_status_t out_write(h2_mplx *m, h2_io *io, } } apr_brigade_cleanup(bb); + return status; } @@ -592,6 +608,9 @@ apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response, status = apr_thread_mutex_lock(m->lock); if (APR_SUCCESS == status) { status = out_open(m, stream_id, response, f, bb, iowait); + if (APLOGctrace1(m->c)) { + h2_util_bb_log(m->c, stream_id, APLOG_TRACE1, "h2_mplx_out_open", bb); + } if (m->aborted) { return APR_ECONNABORTED; } @@ -616,6 +635,8 @@ apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id, h2_io *io = h2_io_set_get(m->stream_ios, stream_id); if (io) { status = out_write(m, io, f, bb, iowait); + H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_write"); + have_out_data_for(m, stream_id); if (m->aborted) { return APR_ECONNABORTED; @@ -645,7 +666,7 @@ apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id) if (!m->aborted) { h2_io *io = h2_io_set_get(m->stream_ios, stream_id); if (io) { - if (!io->response || !io->response->ngheader) { + if (!io->response && !io->rst_error) { /* In case a close comes before a response was created, * insert an error one so that our streams can properly * reset. @@ -653,8 +674,48 @@ apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id) h2_response *r = h2_response_create(stream_id, "500", NULL, m->pool); status = out_open(m, stream_id, r, NULL, NULL, NULL); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c, + "h2_mplx(%ld-%d): close, no response, no rst", + m->id, io->id); } status = h2_io_out_close(io); + H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_close"); + + have_out_data_for(m, stream_id); + if (m->aborted) { + /* if we were the last output, the whole session might + * have gone down in the meantime. + */ + return APR_SUCCESS; + } + } + else { + status = APR_ECONNABORTED; + } + } + apr_thread_mutex_unlock(m->lock); + } + return status; +} + +apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error) +{ + apr_status_t status; + AP_DEBUG_ASSERT(m); + if (m->aborted) { + return APR_ECONNABORTED; + } + status = apr_thread_mutex_lock(m->lock); + if (APR_SUCCESS == status) { + if (!m->aborted) { + h2_io *io = h2_io_set_get(m->stream_ios, stream_id); + if (io && !io->rst_error) { + h2_io_rst(io, error); + if (!io->response) { + h2_io_set_add(m->ready_ios, io); + } + H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_rst"); + have_out_data_for(m, stream_id); if (m->aborted) { /* if we were the last output, the whole session might diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index 62977d6157c..5cb40af16fa 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -96,6 +96,7 @@ void h2_mplx_reference(h2_mplx *m); * Decreases the reference counter of this mplx. */ void h2_mplx_release(h2_mplx *m); + /** * Decreases the reference counter of this mplx and waits for it * to reached 0, destroy the mplx afterwards. @@ -247,6 +248,8 @@ apr_status_t h2_mplx_out_write(h2_mplx *mplx, int stream_id, */ apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id); +apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error); + /******************************************************************************* * h2_mplx list Manipulation. ******************************************************************************/ diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index c3456a0654d..80336081d4d 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -1062,6 +1062,10 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s, case APR_SUCCESS: break; + case APR_ECONNRESET: + return nghttp2_submit_rst_stream(ng2s, NGHTTP2_FLAG_NONE, + stream->id, stream->rst_error); + case APR_EAGAIN: /* If there is no data available, our session will automatically * suspend this stream and not ask for more data until we resume @@ -1141,11 +1145,15 @@ apr_status_t h2_session_handle_response(h2_session *session, h2_stream *stream) int rv = 0; AP_DEBUG_ASSERT(session); AP_DEBUG_ASSERT(stream); - AP_DEBUG_ASSERT(stream->response); + AP_DEBUG_ASSERT(stream->response || stream->rst_error); - if (stream->response->ngheader) { + if (stream->response && stream->response->ngheader) { rv = submit_response(session, stream->response); } + else if (stream->rst_error) { + rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE, + stream->id, stream->rst_error); + } else { rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE, stream->id, NGHTTP2_PROTOCOL_ERROR); diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c index 52781d8474f..d5a716f4490 100644 --- a/modules/http2/h2_stream.c +++ b/modules/http2/h2_stream.c @@ -86,11 +86,6 @@ apr_status_t h2_stream_destroy(h2_stream *stream) return APR_SUCCESS; } -void h2_stream_attach_pool(h2_stream *stream, apr_pool_t *pool) -{ - stream->pool = pool; -} - apr_pool_t *h2_stream_detach_pool(h2_stream *stream) { apr_pool_t *pool = stream->pool; @@ -98,25 +93,41 @@ apr_pool_t *h2_stream_detach_pool(h2_stream *stream) return pool; } -void h2_stream_abort(h2_stream *stream) +void h2_stream_rst(h2_stream *stream, int error_code) { - AP_DEBUG_ASSERT(stream); - stream->aborted = 1; + stream->rst_error = error_code; + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->m->c, + "h2_stream(%ld-%d): reset, error=%d", + stream->m->id, stream->id, error_code); } apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response, apr_bucket_brigade *bb) { + apr_status_t status = APR_SUCCESS; + stream->response = response; if (bb && !APR_BRIGADE_EMPTY(bb)) { if (!stream->bbout) { stream->bbout = apr_brigade_create(stream->pool, stream->m->c->bucket_alloc); } - return h2_util_move(stream->bbout, bb, 16 * 1024, NULL, - "h2_stream_set_response"); + status = h2_util_move(stream->bbout, bb, 16 * 1024, NULL, + "h2_stream_set_response"); } - return APR_SUCCESS; + if (APLOGctrace1(stream->m->c)) { + apr_size_t len = 0; + int eos = 0; + if (stream->bbout) { + h2_util_bb_avail(stream->bbout, &len, &eos); + } + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, stream->m->c, + "h2_stream(%ld-%d): set_response(%s), brigade=%s, " + "len=%ld, eos=%d", + stream->m->id, stream->id, response->status, + (stream->bbout? "yes" : "no"), (long)len, (int)eos); + } + return status; } static int set_closed(h2_stream *stream) @@ -141,6 +152,9 @@ apr_status_t h2_stream_rwrite(h2_stream *stream, request_rec *r) { apr_status_t status; AP_DEBUG_ASSERT(stream); + if (stream->rst_error) { + return APR_ECONNRESET; + } set_state(stream, H2_STREAM_ST_OPEN); status = h2_request_rwrite(stream->request, r, stream->m); return status; @@ -151,6 +165,9 @@ apr_status_t h2_stream_write_eoh(h2_stream *stream, int eos) apr_status_t status; AP_DEBUG_ASSERT(stream); + if (stream->rst_error) { + return APR_ECONNRESET; + } /* Seeing the end-of-headers, we have everything we need to * start processing it. */ @@ -180,6 +197,9 @@ apr_status_t h2_stream_write_eos(h2_stream *stream) ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->m->c, "h2_stream(%ld-%d): closing input", stream->m->id, stream->id); + if (stream->rst_error) { + return APR_ECONNRESET; + } if (set_closed(stream)) { return h2_request_close(stream->request); } @@ -191,6 +211,9 @@ apr_status_t h2_stream_write_header(h2_stream *stream, const char *value, size_t vlen) { AP_DEBUG_ASSERT(stream); + if (stream->rst_error) { + return APR_ECONNRESET; + } switch (stream->state) { case H2_STREAM_ST_IDLE: set_state(stream, H2_STREAM_ST_OPEN); @@ -208,7 +231,9 @@ apr_status_t h2_stream_write_data(h2_stream *stream, const char *data, size_t len) { AP_DEBUG_ASSERT(stream); - AP_DEBUG_ASSERT(stream); + if (stream->rst_error) { + return APR_ECONNRESET; + } switch (stream->state) { case H2_STREAM_ST_OPEN: break; @@ -224,6 +249,9 @@ apr_status_t h2_stream_prep_read(h2_stream *stream, apr_status_t status = APR_SUCCESS; const char *src; + if (stream->rst_error) { + return APR_ECONNRESET; + } if (stream->bbout && !APR_BRIGADE_EMPTY(stream->bbout)) { src = "stream"; status = h2_util_bb_avail(stream->bbout, plen, peos); @@ -251,6 +279,9 @@ apr_status_t h2_stream_readx(h2_stream *stream, h2_io_data_cb *cb, void *ctx, apr_size_t *plen, int *peos) { + if (stream->rst_error) { + return APR_ECONNRESET; + } if (stream->bbout && !APR_BRIGADE_EMPTY(stream->bbout)) { return h2_util_bb_readx(stream->bbout, cb, ctx, plen, peos); } diff --git a/modules/http2/h2_stream.h b/modules/http2/h2_stream.h index 0608f2f3400..c2bb9af7494 100644 --- a/modules/http2/h2_stream.h +++ b/modules/http2/h2_stream.h @@ -66,6 +66,7 @@ struct h2_stream { struct h2_task *task; /* task created for this stream */ struct h2_response *response; /* the response, once ready */ apr_bucket_brigade *bbout; /* output DATA */ + int rst_error; /* stream error for RST_STREAM */ }; @@ -73,10 +74,9 @@ h2_stream *h2_stream_create(int id, apr_pool_t *pool, struct h2_mplx *m); apr_status_t h2_stream_destroy(h2_stream *stream); -apr_pool_t *h2_stream_detach_pool(h2_stream *stream); -void h2_stream_attach_pool(h2_stream *stream, apr_pool_t *pool); +void h2_stream_rst(h2_stream *streamm, int error_code); -void h2_stream_abort(h2_stream *stream); +apr_pool_t *h2_stream_detach_pool(h2_stream *stream); apr_status_t h2_stream_rwrite(h2_stream *stream, request_rec *r); diff --git a/modules/http2/h2_util.c b/modules/http2/h2_util.c index 9d141be93bf..6f29b461d66 100644 --- a/modules/http2/h2_util.c +++ b/modules/http2/h2_util.c @@ -20,6 +20,7 @@ #include #include #include +#include #include @@ -647,3 +648,71 @@ apr_status_t h2_util_bb_readx(apr_bucket_brigade *bb, return status; } +void h2_util_bb_log(conn_rec *c, int stream_id, int level, + const char *tag, apr_bucket_brigade *bb) +{ + char buffer[16 * 1024]; + const char *line = "(null)"; + apr_size_t bmax = sizeof(buffer)/sizeof(buffer[0]); + int off = 0; + apr_bucket *b; + + if (bb) { + memset(buffer, 0, bmax--); + for (b = APR_BRIGADE_FIRST(bb); + bmax && (b != APR_BRIGADE_SENTINEL(bb)); + b = APR_BUCKET_NEXT(b)) { + + if (APR_BUCKET_IS_METADATA(b)) { + if (APR_BUCKET_IS_EOS(b)) { + off += apr_snprintf(buffer+off, bmax-off, "eos "); + } + else if (APR_BUCKET_IS_FLUSH(b)) { + off += apr_snprintf(buffer+off, bmax-off, "flush "); + } + else if (AP_BUCKET_IS_EOR(b)) { + off += apr_snprintf(buffer+off, bmax-off, "eor "); + } + else { + off += apr_snprintf(buffer+off, bmax-off, "meta(unknown) "); + } + } + else { + const char *btype = "data"; + if (APR_BUCKET_IS_FILE(b)) { + btype = "file"; + } + else if (APR_BUCKET_IS_PIPE(b)) { + btype = "pipe"; + } + else if (APR_BUCKET_IS_SOCKET(b)) { + btype = "socket"; + } + else if (APR_BUCKET_IS_HEAP(b)) { + btype = "heap"; + } + else if (APR_BUCKET_IS_TRANSIENT(b)) { + btype = "transient"; + } + else if (APR_BUCKET_IS_IMMORTAL(b)) { + btype = "immortal"; + } + else if (APR_BUCKET_IS_MMAP(b)) { + btype = "mmap"; + } + else if (APR_BUCKET_IS_POOL(b)) { + btype = "pool"; + } + + off += apr_snprintf(buffer+off, bmax-off, "%s[%ld] ", + btype, + (long)(b->length == ((apr_size_t)-1)? + -1 : b->length)); + } + } + line = *buffer? buffer : "(empty)"; + } + ap_log_cerror(APLOG_MARK, level, 0, c, "bb_dump(%ld-%d)-%s: %s", + c->id, stream_id, tag, line); + +} diff --git a/modules/http2/h2_util.h b/modules/http2/h2_util.h index 9a1b5c6d35b..e1a6b3c4d20 100644 --- a/modules/http2/h2_util.h +++ b/modules/http2/h2_util.h @@ -117,8 +117,32 @@ apr_status_t h2_util_bb_avail(apr_bucket_brigade *bb, typedef apr_status_t h2_util_pass_cb(void *ctx, const char *data, apr_size_t len); +/** + * Read at most *plen bytes from the brigade and pass them into the + * given callback. If cb is NULL, just return the amount of data that + * could have been read. + * If an EOS was/would be encountered, set *peos != 0. + * @param bb the brigade to read from + * @param cb the callback to invoke for the read data + * @param ctx optional data passed to callback + * @param plen inout, as input gives the maximum number of bytes to read, + * on return specifies the actual/would be number of bytes + * @param peos != 0 iff an EOS bucket was/would be encountered. + */ apr_status_t h2_util_bb_readx(apr_bucket_brigade *bb, h2_util_pass_cb *cb, void *ctx, apr_size_t *plen, int *peos); +/** + * Logs the bucket brigade (which bucket types with what length) + * to the log at the given level. + * @param c the connection to log for + * @param stream_id the stream identifier this brigade belongs to + * @param level the log level (as in APLOG_*) + * @param tag a short message text about the context + * @param bb the brigade to log + */ +void h2_util_bb_log(conn_rec *c, int stream_id, int level, + const char *tag, apr_bucket_brigade *bb); + #endif /* defined(__mod_h2__h2_util__) */ diff --git a/modules/http2/h2_version.h b/modules/http2/h2_version.h index 7a03865c87c..dc7c5772211 100644 --- a/modules/http2/h2_version.h +++ b/modules/http2/h2_version.h @@ -20,7 +20,7 @@ * @macro * Version number of the h2 module as c string */ -#define MOD_HTTP2_VERSION "1.0.0" +#define MOD_HTTP2_VERSION "1.0.1-DEV" /** * @macro @@ -28,7 +28,7 @@ * release. This is a 24 bit number with 8 bits for major number, 8 bits * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203. */ -#define MOD_HTTP2_VERSION_NUM 0x010000 +#define MOD_HTTP2_VERSION_NUM 0x010001 #endif /* mod_h2_h2_version_h */