From: Stefan Eissing Date: Fri, 29 Apr 2016 15:21:21 +0000 (+0000) Subject: mod_http2: some more cleanup on stream/task/session takedowns X-Git-Tag: 2.5.0-alpha~1674 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=79804c035b8654e1d195cac87e08cce0ead91de9;p=thirdparty%2Fapache%2Fhttpd.git mod_http2: some more cleanup on stream/task/session takedowns git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1741648 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/modules/http2/h2_bucket_beam.c b/modules/http2/h2_bucket_beam.c index 2e9f4b1f390..2e1bfec5537 100644 --- a/modules/http2/h2_bucket_beam.c +++ b/modules/http2/h2_bucket_beam.c @@ -535,6 +535,9 @@ apr_status_t h2_beam_shutdown(h2_bucket_beam *beam, apr_read_type_e block) status = APR_EAGAIN; break; } + if (beam->m_cond) { + apr_thread_cond_broadcast(beam->m_cond); + } status = wait_cond(beam, bl.mutex); } leave_yellow(beam, &bl); @@ -716,6 +719,9 @@ apr_status_t h2_beam_receive(h2_bucket_beam *beam, if (enter_yellow(beam, &bl) == APR_SUCCESS) { transfer: if (beam->aborted) { + if (!!APR_BRIGADE_EMPTY(beam->green)) { + apr_brigade_cleanup(beam->green); + } status = APR_ECONNABORTED; goto leave; } diff --git a/modules/http2/h2_conn_io.c b/modules/http2/h2_conn_io.c index 2e0a5f2d690..d21ae8b9594 100644 --- a/modules/http2/h2_conn_io.c +++ b/modules/http2/h2_conn_io.c @@ -131,7 +131,7 @@ apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c, apr_pool_t *pool) { io->c = c; - io->output = apr_brigade_create(pool, c->bucket_alloc); + io->output = apr_brigade_create(c->pool, c->bucket_alloc); io->buflen = 0; io->is_tls = h2_h2_is_tls(c); io->buffer_output = io->is_tls; diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 5494551bdf4..9298592b567 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -189,6 +189,25 @@ static void check_tx_free(h2_mplx *m) } } +static int purge_stream(void *ctx, void *val) +{ + h2_mplx *m = ctx; + h2_stream *stream = val; + h2_ihash_remove(m->spurge, stream->id); + h2_stream_destroy(stream); + return 0; +} + +static void purge_streams(h2_mplx *m) +{ + if (!h2_ihash_empty(m->spurge)) { + while(!h2_ihash_iter(m->spurge, purge_stream, m)) { + /* repeat until empty */ + } + h2_ihash_clear(m->spurge); + } +} + static void h2_mplx_destroy(h2_mplx *m) { AP_DEBUG_ASSERT(m); @@ -257,6 +276,8 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM); m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id)); + m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id)); + m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id)); m->q = h2_iq_create(m->pool, m->max_streams); m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id)); m->ready_tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id)); @@ -294,10 +315,10 @@ apr_uint32_t h2_mplx_shutdown(h2_mplx *m) return max_stream_started; } -static void input_consumed_signal(h2_mplx *m, h2_task *task) +static void input_consumed_signal(h2_mplx *m, h2_stream *stream) { - if (task->input.beam && task->worker_started) { - h2_beam_send(task->input.beam, NULL, 0); /* trigger updates */ + if (stream->input) { + h2_beam_send(stream->input, NULL, 0); /* trigger updates */ } } @@ -310,7 +331,7 @@ static int output_consumed_signal(h2_mplx *m, h2_task *task) } -static void task_destroy(h2_mplx *m, h2_task *task, int events) +static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master) { conn_rec *slave = NULL; int reuse_slave = 0; @@ -323,18 +344,17 @@ static void task_destroy(h2_mplx *m, h2_task *task, int events) "h2_task(%s): shutdown", task->id); } - if (events) { + if (called_from_master) { /* Process outstanding events before destruction */ - input_consumed_signal(m, task); + h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); + if (stream) { + input_consumed_signal(m, stream); + } } /* The pool is cleared/destroyed which also closes all * allocated file handles. Give this count back to our * file handle pool. */ - if (task->input.beam) { - m->tx_handles_reserved += - h2_beam_get_files_beamed(task->input.beam); - } if (task->output.beam) { m->tx_handles_reserved += h2_beam_get_files_beamed(task->output.beam); @@ -366,49 +386,69 @@ static void task_destroy(h2_mplx *m, h2_task *task, int events) static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error) { - h2_task *task; + h2_task *task = h2_ihash_get(m->tasks, stream->id); + /* Situation: we are, on the master connection, done with processing + * the stream. Either we have handled it successfully, or the stream + * was reset by the client or the connection is gone and we are + * shutting down the whole session. + * + * We possibly have created a task for this stream to be processed + * on a slave connection. The processing might actually be ongoing + * right now or has already finished. A finished task waits for its + * stream to be done. This is the common case. + * + * If the stream had input (e.g. the request had a body), a task + * may have read, or is still reading buckets from the input beam. + * This means that the task is referencing memory from the stream's + * pool (or the master connection bucket alloc). Before we can free + * the stream pool, we need to make sure that those references are + * gone. This is what h2_beam_shutdown() on the input waits for. + * + * With the input handled, we can tear down that beam and care + * about the output beam. The stream might still have buffered some + * buckets read from the output, so we need to get rid of those. That + * is done by h2_stream_cleanup(). + * + * Now it is save to destroy the task (if it exists and is finished). + * + * FIXME: we currently destroy the stream, even if the task is still + * ongoing. This is not ok, since task->request is coming from stream + * memory. We should either copy it on task creation or wait with the + * stream destruction until the task is done. + */ h2_ihash_remove(m->streams, stream->id); if (stream->input) { - apr_status_t status; - status = h2_beam_shutdown(stream->input, APR_NONBLOCK_READ); - if (status == APR_EAGAIN) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_stream(%ld-%d): wait on input shutdown", - m->id, stream->id); - status = h2_beam_shutdown(stream->input, APR_BLOCK_READ); - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, - "h2_stream(%ld-%d): input shutdown returned", - m->id, stream->id); - } + m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input); } + h2_stream_cleanup(stream); - task = h2_ihash_get(m->tasks, stream->id); if (task) { /* Remove task from ready set, we will never submit it */ h2_ihash_remove(m->ready_tasks, stream->id); + task->input.beam = NULL; - if (task->worker_done) { - /* already finished or not even started yet */ - h2_iq_remove(m->q, task->stream_id); - task_destroy(m, task, 0); - } - else { + if (!task->worker_done) { /* task still running, cleanup once it is done */ - task->orphaned = 1; - task->input.beam = NULL; if (rst_error) { h2_task_rst(task, rst_error); } + /* FIXME: this should work, but does not + h2_ihash_add(m->shold, stream); + return;*/ + } + else { + /* already finished */ + h2_iq_remove(m->q, task->stream_id); + task_destroy(m, task, 0); } } + h2_stream_destroy(stream); } static int stream_done_iter(void *ctx, void *val) { - h2_stream *stream = val; stream_done((h2_mplx*)ctx, val, 0); - h2_stream_destroy(stream); return 0; } @@ -416,6 +456,7 @@ static int task_print(void *ctx, void *val) { h2_mplx *m = ctx; h2_task *task = val; + h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); if (task->request) { ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ "->03198: h2_stream(%s): %s %s %s -> %s %d" @@ -424,7 +465,7 @@ static int task_print(void *ctx, void *val) task->request->authority, task->request->path, task->response? "http" : (task->rst_error? "reset" : "?"), task->response? task->response->http_status : task->rst_error, - task->orphaned, task->worker_started, + (stream? 0 : 1), task->worker_started, task->worker_done); } else if (task) { @@ -493,6 +534,9 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) apr_thread_cond_broadcast(m->task_thawed); } } + AP_DEBUG_ASSERT(h2_ihash_empty(m->tasks)); + AP_DEBUG_ASSERT(h2_ihash_empty(m->shold)); + purge_streams(m); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056) "h2_mplx(%ld): release_join (%d tasks left) -> destroy", @@ -516,24 +560,17 @@ void h2_mplx_abort(h2_mplx *m) } } -apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error) +apr_status_t h2_mplx_stream_done(h2_mplx *m, h2_stream *stream) { apr_status_t status = APR_SUCCESS; int acquired; - /* This maybe called from inside callbacks that already hold the lock. - * E.g. when we are streaming out DATA and the EOF triggers the stream - * release. - */ AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_stream *stream = h2_ihash_get(m->streams, stream_id); - if (stream) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%ld-%d): marking stream as done.", - m->id, stream_id); - stream_done(m, stream, rst_error); - } + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld-%d): marking stream as done.", + m->id, stream->id); + stream_done(m, stream, stream->rst_error); leave_mutex(m, acquired); } return status; @@ -547,8 +584,7 @@ void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx) static int update_window(void *ctx, void *val) { - h2_mplx *m = ctx; - input_consumed_signal(m, val); + input_consumed_signal(ctx, val); return 1; } @@ -562,7 +598,7 @@ apr_status_t h2_mplx_in_update_windows(h2_mplx *m) return APR_ECONNABORTED; } if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_ihash_iter(m->tasks, update_window, m); + h2_ihash_iter(m->streams, update_window, m); ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, "h2_session(%ld): windows updated", m->id); @@ -580,7 +616,7 @@ static int task_iter_first(void *ctx, void *val) return 0; } -h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams) +h2_stream *h2_mplx_next_submit(h2_mplx *m) { apr_status_t status; h2_stream *stream = NULL; @@ -597,7 +633,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams) h2_task *task = ctx.task; h2_ihash_remove(m->ready_tasks, task->stream_id); - stream = h2_ihash_get(streams, task->stream_id); + stream = h2_ihash_get(m->streams, task->stream_id); if (stream && task) { task->submitted = 1; if (task->rst_error) { @@ -618,16 +654,14 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams) "h2_mplx(%s): stream for response closed, " "resetting io to close request processing", task->id); - task->orphaned = 1; h2_task_rst(task, H2_ERR_STREAM_CLOSED); if (!task->worker_started || task->worker_done) { task_destroy(m, task, 1); } else { /* hang around until the h2_task is done, but - * shutdown input/output and send out any events asap. */ + * shutdown output */ h2_task_shutdown(task, 0); - input_consumed_signal(m, task); } } } @@ -640,8 +674,9 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response) { apr_status_t status = APR_SUCCESS; h2_task *task = h2_ihash_get(m->tasks, stream_id); + h2_stream *stream = h2_ihash_get(m->streams, stream_id); - if (!task || task->orphaned) { + if (!task || !stream) { return APR_ECONNABORTED; } @@ -691,8 +726,9 @@ apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response) static apr_status_t out_close(h2_mplx *m, h2_task *task) { apr_status_t status = APR_SUCCESS; + h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); - if (!task || task->orphaned) { + if (!task || !stream) { return APR_ECONNABORTED; } @@ -885,93 +921,103 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more) static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) { - if (task) { - if (task->frozen) { - /* this task was handed over to an engine for processing - * and the original worker has finished. That means the - * engine may start processing now. */ - h2_task_thaw(task); - /* we do not want the task to block on writing response - * bodies into the mplx. */ - /* FIXME: this implementation is incomplete. */ - h2_task_set_io_blocking(task, 0); - apr_thread_cond_broadcast(m->task_thawed); - return; - } - else { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): task(%s) done", m->id, task->id); - out_close(m, task); - - if (ngn) { - apr_off_t bytes = 0; - if (task->output.beam) { - h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ); - bytes += h2_beam_get_buffered(task->output.beam); - } - if (bytes > 0) { - /* we need to report consumed and current buffered output - * to the engine. The request will be streamed out or cancelled, - * no more data is coming from it and the engine should update - * its calculations before we destroy this information. */ - h2_req_engine_out_consumed(ngn, task->c, bytes); - } + if (task->frozen) { + /* this task was handed over to an engine for processing + * and the original worker has finished. That means the + * engine may start processing now. */ + h2_task_thaw(task); + /* we do not want the task to block on writing response + * bodies into the mplx. */ + /* FIXME: this implementation is incomplete. */ + h2_task_set_io_blocking(task, 0); + apr_thread_cond_broadcast(m->task_thawed); + return; + } + else { + h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld): task(%s) done", m->id, task->id); + out_close(m, task); + + if (ngn) { + apr_off_t bytes = 0; + if (task->output.beam) { + h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ); + bytes += h2_beam_get_buffered(task->output.beam); } - - if (task->engine) { - if (!h2_req_engine_is_shutdown(task->engine)) { - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, - "h2_mplx(%ld): task(%s) has not-shutdown " - "engine(%s)", m->id, task->id, - h2_req_engine_get_id(task->engine)); - } - h2_ngn_shed_done_ngn(m->ngn_shed, task->engine); + if (bytes > 0) { + /* we need to report consumed and current buffered output + * to the engine. The request will be streamed out or cancelled, + * no more data is coming from it and the engine should update + * its calculations before we destroy this information. */ + h2_req_engine_out_consumed(ngn, task->c, bytes); } - - if (!m->aborted && !task->orphaned && m->redo_tasks - && h2_ihash_get(m->redo_tasks, task->stream_id)) { - /* reset and schedule again */ - h2_task_redo(task); - h2_ihash_remove(m->redo_tasks, task->stream_id); - h2_iq_add(m->q, task->stream_id, NULL, NULL); - return; + } + + if (task->engine) { + if (!h2_req_engine_is_shutdown(task->engine)) { + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, + "h2_mplx(%ld): task(%s) has not-shutdown " + "engine(%s)", m->id, task->id, + h2_req_engine_get_id(task->engine)); } - - task->worker_done = 1; - task->done_at = apr_time_now(); - if (task->output.beam) { - h2_beam_on_consumed(task->output.beam, NULL, NULL); - h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL); + h2_ngn_shed_done_ngn(m->ngn_shed, task->engine); + } + + if (!m->aborted && stream && m->redo_tasks + && h2_ihash_get(m->redo_tasks, task->stream_id)) { + /* reset and schedule again */ + h2_task_redo(task); + h2_ihash_remove(m->redo_tasks, task->stream_id); + h2_iq_add(m->q, task->stream_id, NULL, NULL); + return; + } + + task->worker_done = 1; + task->done_at = apr_time_now(); + if (task->output.beam) { + h2_beam_on_consumed(task->output.beam, NULL, NULL); + h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL); + } + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%s): request done, %f ms" + " elapsed", task->id, + (task->done_at - task->started_at) / 1000.0); + if (task->started_at > m->last_idle_block) { + /* this task finished without causing an 'idle block', e.g. + * a block by flow control. + */ + if (task->done_at- m->last_limit_change >= m->limit_change_interval + && m->workers_limit < m->workers_max) { + /* Well behaving stream, allow it more workers */ + m->workers_limit = H2MIN(m->workers_limit * 2, + m->workers_max); + m->last_limit_change = task->done_at; + m->need_registration = 1; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld): increase worker limit to %d", + m->id, m->workers_limit); } - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%s): request done, %f ms" - " elapsed", task->id, - (task->done_at - task->started_at) / 1000.0); - if (task->started_at > m->last_idle_block) { - /* this task finished without causing an 'idle block', e.g. - * a block by flow control. - */ - if (task->done_at- m->last_limit_change >= m->limit_change_interval - && m->workers_limit < m->workers_max) { - /* Well behaving stream, allow it more workers */ - m->workers_limit = H2MIN(m->workers_limit * 2, - m->workers_max); - m->last_limit_change = task->done_at; - m->need_registration = 1; - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): increase worker limit to %d", - m->id, m->workers_limit); - } + } + + if (stream) { + /* hang around until the stream deregisters */ + } + else { + stream = h2_ihash_get(m->shold, task->stream_id); + task_destroy(m, task, 0); + if (stream) { + stream->response = NULL; /* ref from task memory */ + /* We cannot destroy the stream here since this is + * called from a worker thread and freeing memory pools + * is only safe in the only thread using it (and its + * parent pool / allocator) */ + h2_ihash_remove(m->shold, stream->id); + h2_ihash_add(m->spurge, stream); } - if (task->orphaned) { - task_destroy(m, task, 0); - if (m->join_wait) { - apr_thread_cond_signal(m->join_wait); - } - } - else { - /* hang around until the stream deregisters */ + if (m->join_wait) { + apr_thread_cond_signal(m->join_wait); } } } @@ -1177,11 +1223,13 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type, task->r = r; if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - if (task->orphaned) { - status = APR_ECONNABORTED; + h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); + + if (stream) { + status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit); } else { - status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit); + status = APR_ECONNABORTED; } leave_mutex(m, acquired); } diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index a6fe12a3efc..9b316b0b3f8 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -73,6 +73,8 @@ struct h2_mplx { unsigned int need_registration : 1; struct h2_ihash_t *streams; /* all streams currently processing */ + struct h2_ihash_t *shold; /* all streams done with task ongoing */ + struct h2_ihash_t *spurge; /* all streams done, ready for destroy */ struct h2_iqueue *q; /* all stream ids that need to be started */ struct h2_ihash_t *tasks; /* all tasks started and not destroyed */ @@ -167,7 +169,7 @@ apr_uint32_t h2_mplx_shutdown(h2_mplx *m); * @param rst_error if != 0, the stream was reset with the error given * */ -apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error); +apr_status_t h2_mplx_stream_done(h2_mplx *m, struct h2_stream *stream); /** * Waits on output data from any stream in this session to become available. @@ -235,8 +237,7 @@ apr_status_t h2_mplx_in_update_windows(h2_mplx *m); * @param m the mplxer to get a response from * @param bb the brigade to place any existing repsonse body data into */ -struct h2_stream *h2_mplx_next_submit(h2_mplx *m, - struct h2_ihash_t *streams); +struct h2_stream *h2_mplx_next_submit(h2_mplx *m); /** * Opens the output for the given stream with the specified response. diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index 1e023133d50..0f8accab928 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -128,19 +128,16 @@ h2_stream *h2_session_open_stream(h2_session *session, int stream_id, h2_stream * stream; apr_pool_t *stream_pool; - if (session->spare) { - stream_pool = session->spare; - session->spare = NULL; - } - else { - apr_pool_create(&stream_pool, session->pool); - apr_pool_tag(stream_pool, "h2_stream"); - } + apr_pool_create(&stream_pool, session->pool); + apr_pool_tag(stream_pool, "h2_stream"); stream = h2_stream_open(stream_id, stream_pool, session, initiated_on, req); - + ++session->open_streams; + ++session->unanswered_streams; + nghttp2_session_set_stream_user_data(session->ngh2, stream_id, stream); h2_ihash_add(session->streams, stream); + if (H2_STREAM_CLIENT_INITIATED(stream_id)) { if (stream_id > session->remote.emitted_max) { ++session->remote.emitted_count; @@ -262,6 +259,11 @@ static int on_invalid_frame_recv_cb(nghttp2_session *ngh2, return 0; } +static h2_stream *get_stream(h2_session *session, int stream_id) +{ + return nghttp2_session_get_stream_user_data(session->ngh2, stream_id); +} + static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags, int32_t stream_id, const uint8_t *data, size_t len, void *userp) @@ -277,7 +279,7 @@ static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags, return 0; } - stream = h2_session_get_stream(session, stream_id); + stream = get_stream(session, stream_id); if (!stream) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03064) "h2_stream(%ld-%d): on_data_chunk for unknown stream", @@ -342,7 +344,7 @@ static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id, h2_stream *stream; (void)ngh2; - stream = h2_session_get_stream(session, stream_id); + stream = get_stream(session, stream_id); if (stream) { stream_release(session, stream, error_code); } @@ -358,7 +360,7 @@ static int on_begin_headers_cb(nghttp2_session *ngh2, /* We may see HEADERs at the start of a stream or after all DATA * streams to carry trailers. */ (void)ngh2; - s = h2_session_get_stream(session, frame->hd.stream_id); + s = get_stream(session, frame->hd.stream_id); if (s) { /* nop */ } @@ -385,7 +387,7 @@ static int on_header_cb(nghttp2_session *ngh2, const nghttp2_frame *frame, return 0; } - stream = h2_session_get_stream(session, frame->hd.stream_id); + stream = get_stream(session, frame->hd.stream_id); if (!stream) { ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c, APLOGNO(02920) @@ -432,7 +434,7 @@ static int on_frame_recv_cb(nghttp2_session *ng2s, /* This can be HEADERS for a new stream, defining the request, * or HEADER may come after DATA at the end of a stream as in * trailers */ - stream = h2_session_get_stream(session, frame->hd.stream_id); + stream = get_stream(session, frame->hd.stream_id); if (stream) { int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM); @@ -456,7 +458,7 @@ static int on_frame_recv_cb(nghttp2_session *ng2s, } break; case NGHTTP2_DATA: - stream = h2_session_get_stream(session, frame->hd.stream_id); + stream = get_stream(session, frame->hd.stream_id); if (stream) { int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, @@ -493,7 +495,7 @@ static int on_frame_recv_cb(nghttp2_session *ng2s, "h2_session(%ld-%d): RST_STREAM by client, errror=%d", session->id, (int)frame->hd.stream_id, (int)frame->rst_stream.error_code); - stream = h2_session_get_stream(session, frame->hd.stream_id); + stream = get_stream(session, frame->hd.stream_id); if (stream && stream->request && stream->request->initiated_on) { ++session->pushes_reset; } @@ -567,7 +569,7 @@ static int on_send_data_cb(nghttp2_session *ngh2, } padlen = (unsigned char)frame->data.padlen; - stream = h2_session_get_stream(session, stream_id); + stream = get_stream(session, stream_id); if (!stream) { ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_NOTFOUND, session->c, APLOGNO(02924) @@ -699,10 +701,6 @@ static void h2_session_cleanup(h2_session *session) nghttp2_session_del(session->ngh2); session->ngh2 = NULL; } - if (session->spare) { - apr_pool_destroy(session->spare); - session->spare = NULL; - } } static void h2_session_destroy(h2_session *session) @@ -710,8 +708,12 @@ static void h2_session_destroy(h2_session *session) AP_DEBUG_ASSERT(session); h2_session_cleanup(session); + AP_DEBUG_ASSERT(session->open_streams == h2_ihash_count(session->streams)); h2_ihash_clear(session->streams); + session->open_streams = 0; + ap_remove_input_filter_byhandle((session->r? session->r->input_filters : + session->c->input_filters), "H2_IN"); if (APLOGctrace1(session->c)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, "h2_session(%ld): destroy", session->id); @@ -1138,10 +1140,8 @@ static int resume_on_data(void *ctx, void *val) static int h2_session_resume_streams_with_data(h2_session *session) { AP_DEBUG_ASSERT(session); - if (!h2_ihash_empty(session->streams) - && session->mplx && !session->mplx->aborted) { + if (session->open_streams && !session->mplx->aborted) { resume_ctx ctx; - ctx.session = session; ctx.resume_count = 0; @@ -1153,11 +1153,6 @@ static int h2_session_resume_streams_with_data(h2_session *session) return 0; } -h2_stream *h2_session_get_stream(h2_session *session, int stream_id) -{ - return h2_ihash_get(session->streams, stream_id); -} - static ssize_t stream_data_cb(nghttp2_session *ng2s, int32_t stream_id, uint8_t *buf, @@ -1183,7 +1178,7 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s, (void)ng2s; (void)buf; (void)source; - stream = h2_session_get_stream(session, stream_id); + stream = get_stream(session, stream_id); if (!stream) { ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c, APLOGNO(02937) @@ -1334,7 +1329,7 @@ static apr_status_t submit_response(h2_session *session, h2_stream *stream) stream->id, err); } - stream->submitted = 1; + --session->unanswered_streams; if (stream->request && stream->request->initiated_on) { ++session->pushes_submitted; } @@ -1384,7 +1379,6 @@ struct h2_stream *h2_session_push(h2_session *session, h2_stream *is, ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c, "h2_stream(%ld-%d): scheduling push stream", session->id, stream->id); - h2_stream_cleanup(stream); stream = NULL; } ++session->unsent_promises; @@ -1509,29 +1503,14 @@ apr_status_t h2_session_set_prio(h2_session *session, h2_stream *stream, apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream) { - apr_pool_t *pool = h2_stream_detach_pool(stream); - int stream_id = stream->id; - int rst_error = stream->rst_error; - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, "h2_stream(%ld-%d): cleanup by EOS bucket destroy", - session->id, stream_id); - if (session->streams) { - h2_ihash_remove(session->streams, stream_id); - } + session->id, stream->id); + h2_ihash_remove(session->streams, stream->id); + --session->open_streams; + --session->unanswered_streams; + h2_mplx_stream_done(session->mplx, stream); - h2_stream_cleanup(stream); - h2_mplx_stream_done(session->mplx, stream_id, rst_error); - h2_stream_destroy(stream); - - if (pool) { - apr_pool_clear(pool); - if (session->spare) { - apr_pool_destroy(session->spare); - } - session->spare = pool; - } - return APR_SUCCESS; } @@ -1708,7 +1687,7 @@ static apr_status_t h2_session_submit(h2_session *session) if (has_unsubmitted_streams(session)) { /* If we have responses ready, submit them now. */ - while ((stream = h2_mplx_next_submit(session->mplx, session->streams))) { + while ((stream = h2_mplx_next_submit(session->mplx))) { status = submit_response(session, stream); ++session->unsent_submits; @@ -1770,7 +1749,7 @@ static void update_child_status(h2_session *session, int status, const char *msg apr_snprintf(session->status, sizeof(session->status), "%s, streams: %d/%d/%d/%d/%d (open/recv/resp/push/rst)", msg? msg : "-", - (int)h2_ihash_count(session->streams), + (int)session->open_streams, (int)session->remote.emitted_count, (int)session->responses_submitted, (int)session->pushes_submitted, @@ -1788,7 +1767,7 @@ static void transit(h2_session *session, const char *action, h2_session_state ns session->state = nstate; switch (session->state) { case H2_SESSION_ST_IDLE: - update_child_status(session, (h2_ihash_empty(session->streams)? + update_child_status(session, (session->open_streams == 0? SERVER_BUSY_KEEPALIVE : SERVER_BUSY_READ), "idle"); break; @@ -1920,7 +1899,7 @@ static void h2_session_ev_no_io(h2_session *session, int arg, const char *msg) if (h2_conn_io_flush(&session->io) != APR_SUCCESS) { dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL); } - if (h2_ihash_empty(session->streams)) { + if (!session->open_streams) { if (!is_accepting_streams(session)) { /* We are no longer accepting new streams and have * finished processing existing ones. Time to leave. */ @@ -2125,9 +2104,8 @@ apr_status_t h2_session_process(h2_session *session, int async) break; case H2_SESSION_ST_IDLE: - /* make certain, the client receives everything before we idle */ - if (!session->keep_sync_until - && async && h2_ihash_empty(session->streams) + /* make certain, we send everything before we idle */ + if (!session->keep_sync_until && async && !session->open_streams && !session->r && session->remote.emitted_count) { ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c, "h2_session(%ld): async idle, nonblock read", session->id); @@ -2225,8 +2203,8 @@ apr_status_t h2_session_process(h2_session *session, int async) } } - if (!h2_ihash_empty(session->streams)) { - /* resume any streams for which data is available again */ + if (session->open_streams) { + /* resume any streams with output data */ h2_session_resume_streams_with_data(session); /* Submit any responses/push_promises that are ready */ status = h2_session_submit(session); diff --git a/modules/http2/h2_session.h b/modules/http2/h2_session.h index bf4ded338a8..32202dc3030 100644 --- a/modules/http2/h2_session.h +++ b/modules/http2/h2_session.h @@ -100,6 +100,8 @@ typedef struct h2_session { struct h2_push_diary *push_diary; /* remember pushes, avoid duplicates */ + int open_streams; /* number of streams open */ + int unanswered_streams; /* number of streams waiting for response */ int unsent_submits; /* number of submitted, but not yet written responses. */ int unsent_promises; /* number of submitted, but not yet written push promised */ @@ -122,8 +124,6 @@ typedef struct h2_session { apr_bucket_brigade *bbtmp; /* brigade for keeping temporary data */ struct apr_thread_cond_t *iowait; /* our cond when trywaiting for data */ - apr_pool_t *spare; /* spare stream pool */ - char status[64]; /* status message for scoreboard */ int last_status_code; /* the one already reported */ const char *last_status_msg; /* the one already reported */ @@ -190,9 +190,6 @@ void h2_session_close(h2_session *session); apr_status_t h2_session_handle_response(h2_session *session, struct h2_stream *stream); -/* Get the h2_stream for the given stream idenrtifier. */ -struct h2_stream *h2_session_get_stream(h2_session *session, int stream_id); - /** * Create and register a new stream under the given id. * diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c index 8853e6cad44..20d1d350425 100644 --- a/modules/http2/h2_stream.c +++ b/modules/http2/h2_stream.c @@ -182,19 +182,31 @@ h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session, void h2_stream_cleanup(h2_stream *stream) { AP_DEBUG_ASSERT(stream); - if (stream->input) { - h2_beam_destroy(stream->input); - stream->input = NULL; - } if (stream->buffer) { apr_brigade_cleanup(stream->buffer); } + if (stream->input) { + apr_status_t status; + status = h2_beam_shutdown(stream->input, APR_NONBLOCK_READ); + if (status == APR_EAGAIN) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, + "h2_stream(%ld-%d): wait on input shutdown", + stream->session->id, stream->id); + status = h2_beam_shutdown(stream->input, APR_BLOCK_READ); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c, + "h2_stream(%ld-%d): input shutdown returned", + stream->session->id, stream->id); + } + } } void h2_stream_destroy(h2_stream *stream) { AP_DEBUG_ASSERT(stream); - h2_stream_cleanup(stream); + if (stream->input) { + h2_beam_destroy(stream->input); + stream->input = NULL; + } if (stream->pool) { apr_pool_destroy(stream->pool); } diff --git a/modules/http2/h2_task.h b/modules/http2/h2_task.h index 58b64b0a1c6..454bc376fe1 100644 --- a/modules/http2/h2_task.h +++ b/modules/http2/h2_task.h @@ -83,7 +83,6 @@ struct h2_task { unsigned int frozen : 1; unsigned int blocking : 1; unsigned int detached : 1; - unsigned int orphaned : 1; /* h2_stream is gone for this task */ unsigned int submitted : 1; /* response has been submitted to client */ unsigned int worker_started : 1; /* h2_worker started processing for this io */ unsigned int worker_done : 1; /* h2_worker finished for this io */