From: Stefan Eissing Date: Wed, 20 Apr 2016 15:17:38 +0000 (+0000) Subject: mod_http2: elimination of h2_io intermediate stream instances X-Git-Tag: 2.5.0-alpha~1722 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=afd1183937a5ceb7d3b6420a6621f1f3356a4594;p=thirdparty%2Fapache%2Fhttpd.git mod_http2: elimination of h2_io intermediate stream instances git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1740155 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/CHANGES b/CHANGES index 2794db8f670..908ed22ad4d 100644 --- a/CHANGES +++ b/CHANGES @@ -1,8 +1,11 @@ -*- coding: utf-8 -*- Changes with Apache 2.5.0 + *) mod_http2: eliminating h2_io instances for streams, reducing memory + pools and footprint. [Stefan Eissing] + *) core: explicitly exclude 'h2' from protocols announced via an Upgrade: - header as commanded by http-wg. + header as commanded by http-wg. [Stefan Eissing] *) mod_http2: new "bucket beam" technology to transport buckets across threads without buffer copy. Delaying response start until flush or diff --git a/CMakeLists.txt b/CMakeLists.txt index b33f11caaba..df19df4c97b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -403,7 +403,7 @@ SET(mod_http2_extra_sources modules/http2/h2_conn.c modules/http2/h2_conn_io.c modules/http2/h2_ctx.c modules/http2/h2_filter.c modules/http2/h2_from_h1.c modules/http2/h2_h2.c - modules/http2/h2_io.c modules/http2/h2_bucket_beam.c + modules/http2/h2_bucket_beam.c modules/http2/h2_mplx.c modules/http2/h2_push.c modules/http2/h2_request.c modules/http2/h2_response.c modules/http2/h2_session.c modules/http2/h2_stream.c diff --git a/modules/http2/NWGNUmod_http2 b/modules/http2/NWGNUmod_http2 index dd855d172af..c5ab259b69d 100644 --- a/modules/http2/NWGNUmod_http2 +++ b/modules/http2/NWGNUmod_http2 @@ -195,7 +195,6 @@ FILES_nlm_objs = \ $(OBJDIR)/h2_filter.o \ $(OBJDIR)/h2_from_h1.o \ $(OBJDIR)/h2_h2.o \ - $(OBJDIR)/h2_io.o \ $(OBJDIR)/h2_mplx.o \ $(OBJDIR)/h2_ngn_shed.o \ $(OBJDIR)/h2_push.o \ diff --git a/modules/http2/config2.m4 b/modules/http2/config2.m4 index b4ba6da98ac..ac01e2a20ff 100644 --- a/modules/http2/config2.m4 +++ b/modules/http2/config2.m4 @@ -30,7 +30,6 @@ h2_ctx.lo dnl h2_filter.lo dnl h2_from_h1.lo dnl h2_h2.lo dnl -h2_io.lo dnl h2_mplx.lo dnl h2_ngn_shed.lo dnl h2_push.lo dnl diff --git a/modules/http2/h2.h b/modules/http2/h2.h index acb79cd2e23..9075b00a793 100644 --- a/modules/http2/h2.h +++ b/modules/http2/h2.h @@ -149,6 +149,9 @@ struct h2_response { const char *sos_filter; }; +typedef apr_status_t h2_io_data_cb(void *ctx, const char *data, apr_off_t len); + +typedef int h2_stream_pri_cmp(int stream_id1, int stream_id2, void *ctx); /* Note key to attach connection task id to conn_rec/request_rec instances */ diff --git a/modules/http2/h2_bucket_eoc.c b/modules/http2/h2_bucket_eoc.c index 3ddb54d68a9..33144ef50b7 100644 --- a/modules/http2/h2_bucket_eoc.c +++ b/modules/http2/h2_bucket_eoc.c @@ -23,6 +23,7 @@ #include #include "h2_private.h" +#include "h2.h" #include "h2_mplx.h" #include "h2_session.h" #include "h2_bucket_eoc.h" diff --git a/modules/http2/h2_bucket_eos.c b/modules/http2/h2_bucket_eos.c index 3a5b1a570a6..9953ca15f94 100644 --- a/modules/http2/h2_bucket_eos.c +++ b/modules/http2/h2_bucket_eos.c @@ -23,6 +23,7 @@ #include #include "h2_private.h" +#include "h2.h" #include "h2_mplx.h" #include "h2_stream.h" #include "h2_bucket_eos.h" diff --git a/modules/http2/h2_conn.c b/modules/http2/h2_conn.c index 3b28c1f925a..4ddf1b7029e 100644 --- a/modules/http2/h2_conn.c +++ b/modules/http2/h2_conn.c @@ -26,6 +26,7 @@ #include #include "h2_private.h" +#include "h2.h" #include "h2_config.h" #include "h2_ctx.h" #include "h2_filter.h" diff --git a/modules/http2/h2_filter.c b/modules/http2/h2_filter.c index 30ccb2c43ed..33189de0164 100644 --- a/modules/http2/h2_filter.c +++ b/modules/http2/h2_filter.c @@ -22,6 +22,7 @@ #include #include "h2_private.h" +#include "h2.h" #include "h2_conn_io.h" #include "h2_ctx.h" #include "h2_mplx.h" diff --git a/modules/http2/h2_io.c b/modules/http2/h2_io.c deleted file mode 100644 index 395c34e5021..00000000000 --- a/modules/http2/h2_io.c +++ /dev/null @@ -1,108 +0,0 @@ -/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include - -#include -#include -#include - -#include -#include -#include -#include -#include - -#include "h2_private.h" -#include "h2_bucket_beam.h" -#include "h2_h2.h" -#include "h2_io.h" -#include "h2_mplx.h" -#include "h2_response.h" -#include "h2_request.h" -#include "h2_task.h" -#include "h2_util.h" - -h2_io *h2_io_create(int id, apr_pool_t *pool, const h2_request *request) -{ - h2_io *io = apr_pcalloc(pool, sizeof(*io)); - if (io) { - io->id = id; - io->pool = pool; - io->request = request; - if (request->body) { - h2_beam_create(&io->beam_in, pool, id, "input", 0); - } - } - return io; -} - -void h2_io_redo(h2_io *io) -{ - io->worker_started = 0; - io->response = NULL; - io->rst_error = 0; - io->started_at = io->done_at = 0; -} - -int h2_io_can_redo(h2_io *io) { - if (io->submitted - || (io->beam_in && h2_beam_was_received(io->beam_in)) - || !io->request) { - /* cannot repeat that. */ - return 0; - } - return (!strcmp("GET", io->request->method) - || !strcmp("HEAD", io->request->method) - || !strcmp("OPTIONS", io->request->method)); -} - -void h2_io_set_response(h2_io *io, h2_response *response, - h2_bucket_beam *output) -{ - AP_DEBUG_ASSERT(response); - AP_DEBUG_ASSERT(!io->response); - /* we used to clone the response into out own pool. But - * we have much tighter control over the EOR bucket nowadays, - * so just use the instance given */ - io->response = response; - if (output) { - io->beam_out = output; - } - if (response->rst_error) { - h2_io_rst(io, response->rst_error); - } -} - -void h2_io_rst(h2_io *io, int error) -{ - io->rst_error = error; - if (io->beam_in) { - h2_beam_abort(io->beam_in); - } - if (io->beam_out) { - h2_beam_abort(io->beam_out); - } -} - -void h2_io_shutdown(h2_io *io) -{ - if (io->beam_in) { - h2_beam_shutdown(io->beam_in); - } - if (io->beam_out) { - h2_beam_shutdown(io->beam_out); - } -} diff --git a/modules/http2/h2_io.h b/modules/http2/h2_io.h deleted file mode 100644 index bfd130eabab..00000000000 --- a/modules/http2/h2_io.h +++ /dev/null @@ -1,88 +0,0 @@ -/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef __mod_h2__h2_io__ -#define __mod_h2__h2_io__ - -struct h2_bucket_beam; -struct h2_response; -struct apr_thread_cond_t; -struct h2_mplx; -struct h2_request; -struct h2_task; - - -typedef apr_status_t h2_io_data_cb(void *ctx, const char *data, apr_off_t len); - -typedef int h2_stream_pri_cmp(int stream_id1, int stream_id2, void *ctx); - -typedef enum { - H2_IO_READ, - H2_IO_WRITE, - H2_IO_ANY, -} h2_io_op; - -typedef struct h2_io h2_io; - -struct h2_io { - int id; /* stream identifier */ - apr_pool_t *pool; /* io pool */ - - const struct h2_request *request;/* request to process */ - struct h2_response *response; /* response to submit */ - - struct h2_bucket_beam *beam_in; /* request body buckets */ - struct h2_bucket_beam *beam_out; /* response body buckets */ - - struct h2_task *task; /* the task once started */ - apr_time_t started_at; /* when processing started */ - apr_time_t done_at; /* when processing was done */ - - int rst_error; /* h2 related stream abort error */ - unsigned int orphaned : 1; /* h2_stream is gone for this io */ - unsigned int submitted : 1; /* response has been submitted to client */ - unsigned int worker_started : 1; /* h2_worker started processing for this io */ - unsigned int worker_done : 1; /* h2_worker finished for this io */ -}; - -/******************************************************************************* - * Object lifecycle and information. - ******************************************************************************/ - -/** - * Creates a new h2_io for the given stream id. - */ -h2_io *h2_io_create(int id, apr_pool_t *pool, const struct h2_request *request); - -/** - * Set the response of this stream. - */ -void h2_io_set_response(h2_io *io, struct h2_response *response, - struct h2_bucket_beam *output); - -/** - * Reset the stream with the given error code. - */ -void h2_io_rst(h2_io *io, int error); - -int h2_io_can_redo(h2_io *io); -void h2_io_redo(h2_io *io); - -/** - * Shuts all input/output down. Clears any buckets buffered and closes. - */ -void h2_io_shutdown(h2_io *io); - -#endif /* defined(__mod_h2__h2_io__) */ diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index d75188a78e0..3f0398a051d 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -34,7 +34,6 @@ #include "h2_conn.h" #include "h2_ctx.h" #include "h2_h2.h" -#include "h2_io.h" #include "h2_response.h" #include "h2_mplx.h" #include "h2_ngn_shed.h" @@ -64,6 +63,13 @@ static void h2_beam_log(h2_bucket_beam *beam, int id, const char *msg, } } +/* utility for iterating over ihash task sets */ +typedef struct { + h2_mplx *m; + h2_task *task; + apr_time_t now; +} task_iter_ctx; + /* NULL or the mutex hold by this thread, used for recursive calls */ static apr_threadkey_t *thread_lock; @@ -123,9 +129,9 @@ static void io_mutex_leave(void *ctx, apr_thread_mutex_t *lock, int acquired) static void stream_output_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length) { - h2_io *io = ctx; - if (length > 0 && io->task && io->task->assigned) { - h2_req_engine_out_consumed(io->task->assigned, io->task->c, length); + h2_task *task = ctx; + if (length > 0 && task && task->assigned) { + h2_req_engine_out_consumed(task->assigned, task->c, length); } } @@ -160,7 +166,7 @@ static void check_tx_reservation(h2_mplx *m) { if (m->tx_handles_reserved <= 0) { m->tx_handles_reserved += h2_workers_tx_reserve(m->workers, - H2MIN(m->tx_chunk_size, h2_ilist_count(m->stream_ios))); + H2MIN(m->tx_chunk_size, h2_ihash_count(m->tasks))); } } @@ -171,8 +177,7 @@ static void check_tx_free(h2_mplx *m) m->tx_handles_reserved = m->tx_chunk_size; h2_workers_tx_free(m->workers, count); } - else if (m->tx_handles_reserved - && (!m->stream_ios || h2_ilist_empty(m->stream_ios))) { + else if (m->tx_handles_reserved && h2_ihash_empty(m->tasks)) { h2_workers_tx_free(m->workers, m->tx_handles_reserved); m->tx_handles_reserved = 0; } @@ -182,8 +187,8 @@ static void h2_mplx_destroy(h2_mplx *m) { AP_DEBUG_ASSERT(m); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): destroy, ios=%d", - m->id, (int)h2_ilist_count(m->stream_ios)); + "h2_mplx(%ld): destroy, tasks=%d", + m->id, (int)h2_ihash_count(m->tasks)); check_tx_free(m); if (m->pool) { apr_pool_destroy(m->pool); @@ -244,9 +249,12 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, m->bucket_alloc = apr_bucket_alloc_create(m->pool); m->max_streams = h2_config_geti(conf, H2_CONF_MAX_STREAMS); m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM); + + m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id)); m->q = h2_iq_create(m->pool, m->max_streams); - m->stream_ios = h2_ilist_create(m->pool); - m->ready_ios = h2_ilist_create(m->pool); + m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id)); + m->ready_tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id)); + m->stream_timeout = stream_timeout; m->workers = workers; m->workers_max = workers->max_workers; @@ -280,65 +288,56 @@ apr_uint32_t h2_mplx_shutdown(h2_mplx *m) return max_stream_started; } -static void io_in_consumed_signal(h2_mplx *m, h2_io *io) +static void input_consumed_signal(h2_mplx *m, h2_task *task) { - if (io->beam_in && io->worker_started) { - h2_beam_send(io->beam_in, NULL, 0); /* trigger updates */ + if (task->input.beam && task->worker_started) { + h2_beam_send(task->input.beam, NULL, 0); /* trigger updates */ } } -static int io_out_consumed_signal(h2_mplx *m, h2_io *io) +static int output_consumed_signal(h2_mplx *m, h2_task *task) { - if (io->beam_out && io->worker_started && io->task && io->task->assigned) { - h2_beam_send(io->beam_out, NULL, 0); /* trigger updates */ + if (task->output.beam && task->worker_started && task->assigned) { + h2_beam_send(task->output.beam, NULL, 0); /* trigger updates */ } return 0; } -static void io_destroy(h2_mplx *m, h2_io *io, int events) +static void task_destroy(h2_mplx *m, h2_task *task, int events) { conn_rec *slave = NULL; - int reuse_slave; + int reuse_slave = 0; /* cleanup any buffered input */ - h2_io_shutdown(io); + h2_task_shutdown(task); if (events) { /* Process outstanding events before destruction */ - io_in_consumed_signal(m, io); + input_consumed_signal(m, task); } /* The pool is cleared/destroyed which also closes all * allocated file handles. Give this count back to our * file handle pool. */ - if (io->beam_in) { - m->tx_handles_reserved += h2_beam_get_files_beamed(io->beam_in); + if (task->input.beam) { + m->tx_handles_reserved += + h2_beam_get_files_beamed(task->input.beam); } - if (io->beam_out) { - m->tx_handles_reserved += h2_beam_get_files_beamed(io->beam_out); + if (task->output.beam) { + m->tx_handles_reserved += + h2_beam_get_files_beamed(task->output.beam); } - - h2_ilist_remove(m->stream_ios, io->id); - h2_ilist_remove(m->ready_ios, io->id); - if (m->redo_ios) { - h2_ilist_remove(m->redo_ios, io->id); - } - + + slave = task->c; reuse_slave = ((m->spare_slaves->nelts < m->spare_slaves->nalloc) - && !io->rst_error); - if (io->task) { - slave = io->task->c; - h2_task_destroy(io->task); - io->task = NULL; - } - - if (io->pool) { - if (m->spare_io_pool) { - apr_pool_destroy(m->spare_io_pool); - } - apr_pool_clear(io->pool); - m->spare_io_pool = io->pool; + && !task->rst_error); + + h2_ihash_remove(m->tasks, task->stream_id); + h2_ihash_remove(m->ready_tasks, task->stream_id); + if (m->redo_tasks) { + h2_ihash_remove(m->redo_tasks, task->stream_id); } + h2_task_destroy(task); if (slave) { if (reuse_slave && slave->keepalive == AP_CONN_KEEPALIVE) { @@ -353,21 +352,26 @@ static void io_destroy(h2_mplx *m, h2_io *io, int events) check_tx_free(m); } -static int io_stream_done(h2_mplx *m, h2_io *io, int rst_error) +static int task_stream_done(h2_mplx *m, h2_task *task, int rst_error) { /* Remove io from ready set, we will never submit it */ - h2_ilist_remove(m->ready_ios, io->id); - if (!io->worker_started || io->worker_done) { + h2_ihash_remove(m->ready_tasks, task->stream_id); + if (task->worker_done) { /* already finished or not even started yet */ - h2_iq_remove(m->q, io->id); - io_destroy(m, io, 1); + h2_iq_remove(m->q, task->stream_id); + task_destroy(m, task, 1); return 0; } else { /* cleanup once task is done */ - io->orphaned = 1; + task->orphaned = 1; + if (task->input.beam) { + /* TODO: this is currently allocated by the stream and will disappear */ + h2_beam_shutdown(task->input.beam); + task->input.beam = NULL; + } if (rst_error) { - h2_io_rst(io, rst_error); + h2_task_rst(task, rst_error); } return 1; } @@ -375,31 +379,27 @@ static int io_stream_done(h2_mplx *m, h2_io *io, int rst_error) static int stream_done_iter(void *ctx, void *val) { - return io_stream_done((h2_mplx*)ctx, val, 0); + return task_stream_done((h2_mplx*)ctx, val, 0); } -static int stream_print(void *ctx, void *val) +static int task_print(void *ctx, void *val) { h2_mplx *m = ctx; - h2_io *io = val; - if (io && io->request) { + h2_task *task = val; + if (task->request) { ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ - "->03198: h2_stream(%ld-%d): %s %s %s -> %s %d" + "->03198: h2_stream(%s): %s %s %s -> %s %d" "[orph=%d/started=%d/done=%d]", - m->id, io->id, - io->request->method, io->request->authority, io->request->path, - io->response? "http" : (io->rst_error? "reset" : "?"), - io->response? io->response->http_status : io->rst_error, - io->orphaned, io->worker_started, io->worker_done); + task->id, task->request->method, + task->request->authority, task->request->path, + task->response? "http" : (task->rst_error? "reset" : "?"), + task->response? task->response->http_status : task->rst_error, + task->orphaned, task->worker_started, + task->worker_done); } - else if (io) { + else if (task) { ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ - "->03198: h2_stream(%ld-%d): NULL -> %s %d" - "[orph=%d/started=%d/done=%d]", - m->id, io->id, - io->response? "http" : (io->rst_error? "reset" : "?"), - io->response? io->response->http_status : io->rst_error, - io->orphaned, io->worker_started, io->worker_done); + "->03198: h2_stream(%ld-%d): NULL", m->id, task->stream_id); } else { ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ @@ -423,7 +423,7 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) h2_iq_clear(m->q); apr_thread_cond_broadcast(m->task_thawed); - while (!h2_ilist_iter(m->stream_ios, stream_done_iter, m)) { + while (!h2_ihash_iter(m->tasks, stream_done_iter, m)) { /* iterate until all ios have been orphaned or destroyed */ } @@ -437,12 +437,12 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) for (i = 0; m->workers_busy > 0; ++i) { m->join_wait = wait; ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): release_join, waiting on %d worker to report back", - m->id, (int)h2_ilist_count(m->stream_ios)); + "h2_mplx(%ld): release_join, waiting on %d tasks to report back", + m->id, (int)h2_ihash_count(m->tasks)); status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs)); - while (!h2_ilist_iter(m->stream_ios, stream_done_iter, m)) { + while (!h2_ihash_iter(m->tasks, stream_done_iter, m)) { /* iterate until all ios have been orphaned or destroyed */ } if (APR_STATUS_IS_TIMEUP(status)) { @@ -454,11 +454,11 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) */ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03198) "h2_mplx(%ld): release, waiting for %d seconds now for " - "%d h2_workers to return, have still %d requests outstanding", + "%d h2_workers to return, have still %d tasks outstanding", m->id, i*wait_secs, m->workers_busy, - (int)h2_ilist_count(m->stream_ios)); + (int)h2_ihash_count(m->tasks)); if (i == 1) { - h2_ilist_iter(m->stream_ios, stream_print, m); + h2_ihash_iter(m->tasks, task_print, m); } } h2_mplx_abort(m); @@ -466,10 +466,10 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) } } - if (!h2_ilist_empty(m->stream_ios)) { + if (!h2_ihash_empty(m->tasks)) { ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c, - "h2_mplx(%ld): release_join, %d streams still open", - m->id, (int)h2_ilist_count(m->stream_ios)); + "h2_mplx(%ld): release_join, %d tasks still open", + m->id, (int)h2_ihash_count(m->tasks)); } ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056) "h2_mplx(%ld): release_join -> destroy", m->id); @@ -503,16 +503,17 @@ apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error) */ AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_ilist_get(m->stream_ios, stream_id); + h2_task *task = h2_ihash_get(m->tasks, stream_id); + h2_ihash_remove(m->streams, stream_id); /* there should be an h2_io, once the stream has been scheduled * for processing, e.g. when we received all HEADERs. But when * a stream is cancelled very early, it will not exist. */ - if (io) { + if (task) { ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%ld-%d): marking stream as done.", + "h2_mplx(%ld-%d): marking stream task as done.", m->id, stream_id); - io_stream_done(m, io, rst_error); + task_stream_done(m, task, rst_error); } leave_mutex(m, acquired); } @@ -528,7 +529,7 @@ void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx) static int update_window(void *ctx, void *val) { h2_mplx *m = ctx; - io_in_consumed_signal(m, val); + input_consumed_signal(m, val); return 1; } @@ -542,7 +543,7 @@ apr_status_t h2_mplx_in_update_windows(h2_mplx *m) return APR_ECONNABORTED; } if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_ilist_iter(m->stream_ios, update_window, m); + h2_ihash_iter(m->tasks, update_window, m); ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, "h2_session(%ld): windows updated", m->id); @@ -552,6 +553,14 @@ apr_status_t h2_mplx_in_update_windows(h2_mplx *m) return status; } +static int task_iter_first(void *ctx, void *val) +{ + task_iter_ctx *tctx = ctx; + h2_task *task = val; + tctx->task = task; + return 0; +} + h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams) { apr_status_t status; @@ -560,38 +569,46 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams) AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_ilist_shift(m->ready_ios); - if (io && !m->aborted) { - stream = h2_ihash_get(streams, io->id); - if (stream) { - io->submitted = 1; - if (io->rst_error) { - h2_stream_rst(stream, io->rst_error); + task_iter_ctx ctx; + ctx.m = m; + ctx.task = NULL; + h2_ihash_iter(m->ready_tasks, task_iter_first, &ctx); + + if (ctx.task && !m->aborted) { + h2_task *task = ctx.task; + + h2_ihash_remove(m->ready_tasks, task->stream_id); + stream = h2_ihash_get(streams, task->stream_id); + if (stream && task) { + task->submitted = 1; + if (task->rst_error) { + h2_stream_rst(stream, task->rst_error); } else { - AP_DEBUG_ASSERT(io->response); - h2_stream_set_response(stream, io->response, io->beam_out); + AP_DEBUG_ASSERT(task->response); + h2_stream_set_response(stream, task->response, + task->output.beam); } } - else { + else if (task) { /* We have the io ready, but the stream has gone away, maybe * reset by the client. Should no longer happen since such * streams should clear io's from the ready queue. */ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03347) - "h2_mplx(%ld): stream for response %d closed, " + "h2_mplx(%s): stream for response closed, " "resetting io to close request processing", - m->id, io->id); - io->orphaned = 1; - h2_io_rst(io, H2_ERR_STREAM_CLOSED); - if (!io->worker_started || io->worker_done) { - io_destroy(m, io, 1); + task->id); + task->orphaned = 1; + h2_task_rst(task, H2_ERR_STREAM_CLOSED); + if (!task->worker_started || task->worker_done) { + task_destroy(m, task, 1); } else { /* hang around until the h2_task is done, but * shutdown input/output and send out any events asap. */ - h2_io_shutdown(io); - io_in_consumed_signal(m, io); + h2_task_shutdown(task); + input_consumed_signal(m, task); } } } @@ -600,33 +617,32 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams) return stream; } -static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response, - h2_bucket_beam *output) +static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response) { apr_status_t status = APR_SUCCESS; + h2_task *task = h2_ihash_get(m->tasks, stream_id); - h2_io *io = h2_ilist_get(m->stream_ios, stream_id); - if (!io || io->orphaned) { + if (!task || task->orphaned) { return APR_ECONNABORTED; } ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld-%d): open response: %d, rst=%d", - m->id, stream_id, response->http_status, - response->rst_error); + "h2_mplx(%s): open response: %d, rst=%d", + task->id, response->http_status, response->rst_error); + + h2_task_set_response(task, response); - if (output) { - h2_beam_buffer_size_set(output, m->stream_max_mem); - h2_beam_timeout_set(output, m->stream_timeout); - h2_beam_on_consumed(output, stream_output_consumed, io); - m->tx_handles_reserved -= h2_beam_get_files_beamed(output); - h2_beam_on_file_beam(output, can_beam_file, m); - h2_beam_mutex_set(output, io_mutex_enter, io_mutex_leave, - io->task->cond, m); + if (task->output.beam) { + h2_beam_buffer_size_set(task->output.beam, m->stream_max_mem); + h2_beam_timeout_set(task->output.beam, m->stream_timeout); + h2_beam_on_consumed(task->output.beam, stream_output_consumed, task); + m->tx_handles_reserved -= h2_beam_get_files_beamed(task->output.beam); + h2_beam_on_file_beam(task->output.beam, can_beam_file, m); + h2_beam_mutex_set(task->output.beam, io_mutex_enter, io_mutex_leave, + task->cond, m); } - h2_io_set_response(io, response, output); - h2_ilist_add(m->ready_ios, io); + h2_ihash_add(m->ready_tasks, task); if (response && response->http_status < 300) { /* we might see some file buckets in the output, see * if we have enough handles reserved. */ @@ -636,8 +652,7 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response, return status; } -apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response, - h2_bucket_beam *output) +apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response) { apr_status_t status; int acquired; @@ -648,7 +663,7 @@ apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response, status = APR_ECONNABORTED; } else { - status = out_open(m, stream_id, response, output); + status = out_open(m, stream_id, response); } leave_mutex(m, acquired); } @@ -662,28 +677,28 @@ apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id) AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_ilist_get(m->stream_ios, stream_id); - if (io && !io->orphaned) { - if (!io->response && !io->rst_error) { + h2_task *task = h2_ihash_get(m->tasks, stream_id); + if (task && !task->orphaned) { + if (!task->response && !task->rst_error) { /* In case a close comes before a response was created, * insert an error one so that our streams can properly * reset. */ h2_response *r = h2_response_die(stream_id, APR_EGENERAL, - io->request, m->pool); - status = out_open(m, stream_id, r, NULL); + task->request, m->pool); + status = out_open(m, stream_id, r); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c, "h2_mplx(%ld-%d): close, no response, no rst", - m->id, io->id); + m->id, stream_id); } ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, - "h2_mplx(%ld-%d): close", m->id, io->id); - if (io->beam_out) { - status = h2_beam_close(io->beam_out); - h2_beam_log(io->beam_out, stream_id, "out_close", m->c, + "h2_mplx(%ld-%d): close", m->id, stream_id); + if (task->output.beam) { + status = h2_beam_close(task->output.beam); + h2_beam_log(task->output.beam, stream_id, "out_close", m->c, APLOG_TRACE2); } - io_out_consumed_signal(m, io); + output_consumed_signal(m, task); have_out_data_for(m, stream_id); } else { @@ -762,8 +777,9 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, status = APR_ECONNABORTED; } else { - apr_pool_t *io_pool; - h2_io *io; + h2_beam_create(&stream->input, stream->pool, stream->id, + "input", 0); + h2_ihash_add(m->streams, stream); if (!m->need_registration) { m->need_registration = h2_iq_empty(m->q); @@ -771,23 +787,11 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, if (m->workers_busy < m->workers_max) { do_registration = m->need_registration; } + h2_iq_add(m->q, stream->id, cmp, ctx); - io_pool = m->spare_io_pool; - if (io_pool) { - m->spare_io_pool = NULL; - } - else { - apr_pool_create(&io_pool, m->pool); - apr_pool_tag(io_pool, "h2_io"); - } - io = h2_io_create(stream->id, io_pool, stream->request); - h2_ilist_add(m->stream_ios, io); - h2_iq_add(m->q, io->id, cmp, ctx); - - stream->input = io->beam_in; ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, "h2_mplx(%ld-%d): process, body=%d", - m->c->id, stream->id, io->request->body); + m->c->id, stream->id, stream->request->body); } leave_mutex(m, acquired); } @@ -801,24 +805,15 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, static h2_task *pop_task(h2_mplx *m) { h2_task *task = NULL; - h2_io *io; + h2_stream *stream; int sid; while (!m->aborted && !task && (m->workers_busy < m->workers_limit) && (sid = h2_iq_shift(m->q)) > 0) { - io = h2_ilist_get(m->stream_ios, sid); - if (io) { + stream = h2_ihash_get(m->streams, sid); + if (stream) { conn_rec *slave, **pslave; - - if (io->orphaned) { - /* TODO: add to purge list */ - io_destroy(m, io, 0); - if (m->join_wait) { - apr_thread_cond_signal(m->join_wait); - } - continue; - } - + pslave = (conn_rec **)apr_array_pop(m->spare_slaves); if (pslave) { slave = *pslave; @@ -829,19 +824,20 @@ static h2_task *pop_task(h2_mplx *m) } slave->sbh = m->c->sbh; - io->task = task = h2_task_create(slave, io->request, - io->beam_in, m); + task = h2_task_create(slave, stream->request, stream->input, m); + h2_ihash_add(m->tasks, task); + m->c->keepalives++; apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id); - io->worker_started = 1; - io->started_at = apr_time_now(); + task->worker_started = 1; + task->started_at = apr_time_now(); - if (io->beam_in) { - h2_beam_timeout_set(io->beam_in, m->stream_timeout); - h2_beam_on_consumed(io->beam_in, stream_input_consumed, m); - h2_beam_on_file_beam(io->beam_in, can_beam_file, m); - h2_beam_mutex_set(io->beam_in, io_mutex_enter, + if (task->input.beam) { + h2_beam_timeout_set(task->input.beam, m->stream_timeout); + h2_beam_on_consumed(task->input.beam, stream_input_consumed, m); + h2_beam_on_file_beam(task->input.beam, can_beam_file, m); + h2_beam_mutex_set(task->input.beam, io_mutex_enter, io_mutex_leave, task->cond, m); } if (sid > m->max_stream_started) { @@ -880,8 +876,6 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more) static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) { if (task) { - h2_io *io = h2_ilist_get(m->stream_ios, task->stream_id); - if (task->frozen) { /* this task was handed over to an engine for processing * and the original worker has finished. That means the @@ -894,6 +888,8 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) apr_thread_cond_broadcast(m->task_thawed); } else { + apr_time_t now = apr_time_now(); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): task(%s) done", m->id, task->id); /* clean our references and report request as done. Signal @@ -903,11 +899,11 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) * other mplx's. Perhaps leave after n requests? */ h2_mplx_out_close(m, task->stream_id); - if (ngn && io) { + if (ngn) { apr_off_t bytes = 0; - if (io->beam_out) { - h2_beam_send(io->beam_out, NULL, APR_NONBLOCK_READ); - bytes += h2_beam_get_buffered(io->beam_out); + if (task->output.beam) { + h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ); + bytes += h2_beam_get_buffered(task->output.beam); } if (bytes > 0) { /* we need to report consumed and current buffered output @@ -928,55 +924,47 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) h2_ngn_shed_done_ngn(m->ngn_shed, task->engine); } - if (io) { - apr_time_t now = apr_time_now(); - if (!io->orphaned && m->redo_ios - && h2_ilist_get(m->redo_ios, io->id)) { - /* reset and schedule again */ - h2_io_redo(io); - h2_ilist_remove(m->redo_ios, io->id); - h2_iq_add(m->q, io->id, NULL, NULL); - } - else { - io->worker_done = 1; - io->done_at = now; - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): request(%d) done, %f ms" - " elapsed", m->id, io->id, - (io->done_at - io->started_at) / 1000.0); - if (io->started_at > m->last_idle_block) { - /* this task finished without causing an 'idle block', e.g. - * a block by flow control. - */ - if (now - m->last_limit_change >= m->limit_change_interval - && m->workers_limit < m->workers_max) { - /* Well behaving stream, allow it more workers */ - m->workers_limit = H2MIN(m->workers_limit * 2, - m->workers_max); - m->last_limit_change = now; - m->need_registration = 1; - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): increase worker limit to %d", - m->id, m->workers_limit); - } - } - } - - if (io->orphaned) { - /* TODO: add to purge list */ - io_destroy(m, io, 0); - if (m->join_wait) { - apr_thread_cond_signal(m->join_wait); + if (!task->orphaned && m->redo_tasks + && h2_ihash_get(m->redo_tasks, task->stream_id)) { + /* reset and schedule again */ + h2_task_redo(task); + h2_ihash_remove(m->redo_tasks, task->stream_id); + h2_iq_add(m->q, task->stream_id, NULL, NULL); + } + else { + task->worker_done = 1; + task->done_at = now; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%s): request done, %f ms" + " elapsed", task->id, + (task->done_at - task->started_at) / 1000.0); + if (task->started_at > m->last_idle_block) { + /* this task finished without causing an 'idle block', e.g. + * a block by flow control. + */ + if (now - m->last_limit_change >= m->limit_change_interval + && m->workers_limit < m->workers_max) { + /* Well behaving stream, allow it more workers */ + m->workers_limit = H2MIN(m->workers_limit * 2, + m->workers_max); + m->last_limit_change = now; + m->need_registration = 1; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld): increase worker limit to %d", + m->id, m->workers_limit); } } - else { - /* hang around until the stream deregisters */ + } + + if (task->orphaned) { + /* TODO: add to purge list */ + task_destroy(m, task, 0); + if (m->join_wait) { + apr_thread_cond_signal(m->join_wait); } } else { - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, - "h2_mplx(%ld): task %s without corresp. h2_io", - m->id, task->id); + /* hang around until the stream deregisters */ } } } @@ -1001,81 +989,76 @@ void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask) * h2_mplx DoS protection ******************************************************************************/ -typedef struct { - h2_mplx *m; - h2_io *io; - apr_time_t now; -} io_iter_ctx; - -static int latest_repeatable_busy_unsubmitted_iter(void *data, void *val) +static int latest_repeatable_unsubmitted_iter(void *data, void *val) { - io_iter_ctx *ctx = data; - h2_io *io = val; - if (io->worker_started && !io->worker_done - && h2_io_can_redo(io) && !h2_ilist_get(ctx->m->redo_ios, io->id)) { - /* this io occupies a worker, the response has not been submitted yet, + task_iter_ctx *ctx = data; + h2_task *task = val; + if (!task->worker_done && h2_task_can_redo(task) + && !h2_ihash_get(ctx->m->redo_tasks, task->stream_id)) { + /* this task occupies a worker, the response has not been submitted yet, * not been cancelled and it is a repeatable request * -> it can be re-scheduled later */ - if (!ctx->io || ctx->io->started_at < io->started_at) { + if (!ctx->task || ctx->task->started_at < task->started_at) { /* we did not have one or this one was started later */ - ctx->io = io; + ctx->task = task; } } return 1; } -static h2_io *get_latest_repeatable_busy_unsubmitted_io(h2_mplx *m) +static h2_task *get_latest_repeatable_unsubmitted_task(h2_mplx *m) { - io_iter_ctx ctx; + task_iter_ctx ctx; ctx.m = m; - ctx.io = NULL; - h2_ilist_iter(m->stream_ios, latest_repeatable_busy_unsubmitted_iter, &ctx); - return ctx.io; + ctx.task = NULL; + h2_ihash_iter(m->tasks, latest_repeatable_unsubmitted_iter, &ctx); + return ctx.task; } static int timed_out_busy_iter(void *data, void *val) { - io_iter_ctx *ctx = data; - h2_io *io = val; - if (io->worker_started && !io->worker_done - && (ctx->now - io->started_at) > ctx->m->stream_timeout) { + task_iter_ctx *ctx = data; + h2_task *task = val; + if (!task->worker_done + && (ctx->now - task->started_at) > ctx->m->stream_timeout) { /* timed out stream occupying a worker, found */ - ctx->io = io; + ctx->task = task; return 0; } return 1; } -static h2_io *get_timed_out_busy_stream(h2_mplx *m) + +static h2_task *get_timed_out_busy_task(h2_mplx *m) { - io_iter_ctx ctx; + task_iter_ctx ctx; ctx.m = m; - ctx.io = NULL; + ctx.task = NULL; ctx.now = apr_time_now(); - h2_ilist_iter(m->stream_ios, timed_out_busy_iter, &ctx); - return ctx.io; + h2_ihash_iter(m->tasks, timed_out_busy_iter, &ctx); + return ctx.task; } -static apr_status_t unschedule_slow_ios(h2_mplx *m) +static apr_status_t unschedule_slow_tasks(h2_mplx *m) { - h2_io *io; + h2_task *task; int n; - if (!m->redo_ios) { - m->redo_ios = h2_ilist_create(m->pool); + if (!m->redo_tasks) { + m->redo_tasks = h2_ihash_create(m->pool, offsetof(h2_task, stream_id)); } /* Try to get rid of streams that occupy workers. Look for safe requests * that are repeatable. If none found, fail the connection. */ - n = (m->workers_busy - m->workers_limit - h2_ilist_count(m->redo_ios)); - while (n > 0 && (io = get_latest_repeatable_busy_unsubmitted_io(m))) { - h2_ilist_add(m->redo_ios, io); - h2_io_rst(io, H2_ERR_CANCEL); + n = (m->workers_busy - m->workers_limit - h2_ihash_count(m->redo_tasks)); + while (n > 0 && (task = get_latest_repeatable_unsubmitted_task(m))) { + h2_task_rst(task, H2_ERR_CANCEL); + h2_ihash_add(m->redo_tasks, task); --n; } - if ((m->workers_busy - h2_ilist_count(m->redo_ios)) > m->workers_limit) { - io = get_timed_out_busy_stream(m); - if (io) { + if ((m->workers_busy - h2_ihash_count(m->redo_tasks)) > m->workers_limit) { + task = get_timed_out_busy_task(m); + if (task) { /* Too many busy workers, unable to cancel enough streams * and with a busy, timed out stream, we tell the client * to go away... */ @@ -1092,7 +1075,7 @@ apr_status_t h2_mplx_idle(h2_mplx *m) int acquired; if (enter_mutex(m, &acquired) == APR_SUCCESS) { - apr_size_t scount = h2_ilist_count(m->stream_ios); + apr_size_t scount = h2_ihash_count(m->streams); if (scount > 0 && m->workers_busy) { /* If we have streams in connection state 'IDLE', meaning * all streams are ready to sent data out, but lack @@ -1129,7 +1112,7 @@ apr_status_t h2_mplx_idle(h2_mplx *m) } if (m->workers_busy > m->workers_limit) { - status = unschedule_slow_ios(m); + status = unschedule_slow_tasks(m); } } leave_mutex(m, acquired); @@ -1150,9 +1133,9 @@ typedef struct { static int ngn_update_window(void *ctx, void *val) { ngn_update_ctx *uctx = ctx; - h2_io *io = val; - if (io && io->task && io->task->assigned == uctx->ngn - && io_out_consumed_signal(uctx->m, io)) { + h2_task *task = val; + if (task && task->assigned == uctx->ngn + && output_consumed_signal(uctx->m, task)) { ++uctx->streams_updated; } return 1; @@ -1165,7 +1148,7 @@ static apr_status_t ngn_out_update_windows(h2_mplx *m, h2_req_engine *ngn) ctx.m = m; ctx.ngn = ngn; ctx.streams_updated = 0; - h2_ilist_iter(m->stream_ios, ngn_update_window, &ctx); + h2_ihash_iter(m->tasks, ngn_update_window, &ctx); return ctx.streams_updated? APR_SUCCESS : APR_EAGAIN; } @@ -1187,8 +1170,7 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type, task->r = r; if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_ilist_get(m->stream_ios, task->stream_id); - if (!io || io->orphaned) { + if (task->orphaned) { status = APR_ECONNABORTED; } else { diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index 7e6f5bceadf..f6a83d62f10 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -52,7 +52,6 @@ struct h2_ngn_shed; struct h2_req_engine; #include -#include "h2_io.h" typedef struct h2_mplx h2_mplx; @@ -73,10 +72,12 @@ struct h2_mplx { unsigned int aborted : 1; unsigned int need_registration : 1; - struct h2_iqueue *q; - struct h2_ilist_t *stream_ios; - struct h2_ilist_t *ready_ios; - struct h2_ilist_t *redo_ios; + struct h2_ihash_t *streams; /* all streams currently processing */ + struct h2_iqueue *q; /* all stream ids that need to be started */ + + struct h2_ihash_t *tasks; /* all tasks started and not destroyed */ + struct h2_ihash_t *ready_tasks; /* all tasks ready for submit */ + struct h2_ihash_t *redo_tasks; /* all tasks that need to be redone */ apr_uint32_t max_streams; /* max # of concurrent streams */ apr_uint32_t max_stream_started; /* highest stream id that started processing */ @@ -241,8 +242,7 @@ struct h2_stream *h2_mplx_next_submit(h2_mplx *m, * Opens the output for the given stream with the specified response. */ apr_status_t h2_mplx_out_open(h2_mplx *mplx, int stream_id, - struct h2_response *response, - struct h2_bucket_beam *output); + struct h2_response *response); /** * Closes the output for stream stream_id. diff --git a/modules/http2/h2_ngn_shed.c b/modules/http2/h2_ngn_shed.c index 48ac09fae63..18a32a1809c 100644 --- a/modules/http2/h2_ngn_shed.c +++ b/modules/http2/h2_ngn_shed.c @@ -29,6 +29,7 @@ #include "mod_http2.h" #include "h2_private.h" +#include "h2.h" #include "h2_config.h" #include "h2_conn.h" #include "h2_ctx.h" diff --git a/modules/http2/h2_proxy_session.c b/modules/http2/h2_proxy_session.c index 51b68f2f4fa..d627e9a8f19 100644 --- a/modules/http2/h2_proxy_session.c +++ b/modules/http2/h2_proxy_session.c @@ -878,7 +878,7 @@ static void ev_init(h2_proxy_session *session, int arg, const char *msg) { switch (session->state) { case H2_PROXYS_ST_INIT: - if (h2_ihash_is_empty(session->streams)) { + if (h2_ihash_empty(session->streams)) { transit(session, "init", H2_PROXYS_ST_IDLE); } else { @@ -985,7 +985,7 @@ static void ev_no_io(h2_proxy_session *session, int arg, const char *msg) * CPU cycles. Ideally, we'd like to do a blocking read, but that * is not possible if we have scheduled tasks and wait * for them to produce something. */ - if (h2_ihash_is_empty(session->streams)) { + if (h2_ihash_empty(session->streams)) { if (!is_accepting_streams(session)) { /* We are no longer accepting new streams and have * finished processing existing ones. Time to leave. */ @@ -1289,7 +1289,7 @@ static int done_iter(void *udata, void *val) void h2_proxy_session_cleanup(h2_proxy_session *session, h2_proxy_request_done *done) { - if (session->streams && !h2_ihash_is_empty(session->streams)) { + if (session->streams && !h2_ihash_empty(session->streams)) { cleanup_iter_ctx ctx; ctx.session = session; ctx.done = done; @@ -1328,7 +1328,7 @@ static int win_update_iter(void *udata, void *val) void h2_proxy_session_update_window(h2_proxy_session *session, conn_rec *c, apr_off_t bytes) { - if (session->streams && !h2_ihash_is_empty(session->streams)) { + if (session->streams && !h2_ihash_empty(session->streams)) { win_update_ctx ctx; ctx.session = session; ctx.c = c; diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index d64c3fce96f..76c9c26da43 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -28,6 +28,7 @@ #include #include "h2_private.h" +#include "h2.h" #include "h2_bucket_eoc.h" #include "h2_bucket_eos.h" #include "h2_config.h" @@ -1136,7 +1137,7 @@ static int resume_on_data(void *ctx, void *val) static int h2_session_resume_streams_with_data(h2_session *session) { AP_DEBUG_ASSERT(session); - if (!h2_ihash_is_empty(session->streams) + if (!h2_ihash_empty(session->streams) && session->mplx && !session->mplx->aborted) { resume_ctx ctx; @@ -1519,11 +1520,7 @@ apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream) } h2_stream_cleanup(stream); - /* this may be called while the session has already freed - * some internal structures or even when the mplx is locked. */ - if (session->mplx) { - h2_mplx_stream_done(session->mplx, stream_id, rst_error); - } + h2_mplx_stream_done(session->mplx, stream_id, rst_error); h2_stream_destroy(stream); if (pool) { @@ -1884,7 +1881,7 @@ static void h2_session_ev_no_io(h2_session *session, int arg, const char *msg) if (h2_conn_io_flush(&session->io) != APR_SUCCESS) { dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL); } - if (h2_ihash_is_empty(session->streams)) { + if (h2_ihash_empty(session->streams)) { if (!is_accepting_streams(session)) { /* We are no longer accepting new streams and have * finished processing existing ones. Time to leave. */ @@ -2107,7 +2104,7 @@ apr_status_t h2_session_process(h2_session *session, int async) break; case H2_SESSION_ST_IDLE: - no_streams = h2_ihash_is_empty(session->streams); + no_streams = h2_ihash_empty(session->streams); update_child_status(session, (no_streams? SERVER_BUSY_KEEPALIVE : SERVER_BUSY_READ), "idle"); /* make certain, the client receives everything before we idle */ @@ -2210,7 +2207,7 @@ apr_status_t h2_session_process(h2_session *session, int async) } } - if (!h2_ihash_is_empty(session->streams)) { + if (!h2_ihash_empty(session->streams)) { /* resume any streams for which data is available again */ h2_session_resume_streams_with_data(session); /* Submit any responses/push_promises that are ready */ diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c index ae980194569..c8635aeec3f 100644 --- a/modules/http2/h2_stream.c +++ b/modules/http2/h2_stream.c @@ -24,6 +24,7 @@ #include #include "h2_private.h" +#include "h2.h" #include "h2_bucket_beam.h" #include "h2_conn.h" #include "h2_config.h" @@ -174,6 +175,10 @@ h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session, void h2_stream_cleanup(h2_stream *stream) { AP_DEBUG_ASSERT(stream); + if (stream->input) { + h2_beam_destroy(stream->input); + stream->input = NULL; + } if (stream->buffer) { apr_brigade_cleanup(stream->buffer); } diff --git a/modules/http2/h2_stream.h b/modules/http2/h2_stream.h index c851f7e64b1..8ae600c78a4 100644 --- a/modules/http2/h2_stream.h +++ b/modules/http2/h2_stream.h @@ -30,7 +30,6 @@ * The h2_response gives the HEADER frames to sent to the client, followed * by DATA frames read from the h2_stream until EOS is reached. */ -#include "h2_io.h" struct h2_mplx; struct h2_priority; diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c index 3eb25e8f7aa..f9b21326d44 100644 --- a/modules/http2/h2_task.c +++ b/modules/http2/h2_task.c @@ -33,6 +33,7 @@ #include #include "h2_private.h" +#include "h2.h" #include "h2_bucket_beam.h" #include "h2_conn.h" #include "h2_config.h" @@ -272,8 +273,7 @@ static apr_status_t open_response(h2_task *task) task->id, task->request->method, task->request->authority, task->request->path); - return h2_mplx_out_open(task->mplx, task->stream_id, - response, task->output.beam); + return h2_mplx_out_open(task->mplx, task->stream_id, response); } static apr_status_t send_out(h2_task *task, apr_bucket_brigade* bb) @@ -440,6 +440,63 @@ static apr_status_t h2_filter_read_response(ap_filter_t* f, return h2_from_h1_read_response(task->output.from_h1, f, bb); } +/******************************************************************************* + * task things + ******************************************************************************/ + +void h2_task_set_response(h2_task *task, h2_response *response) +{ + AP_DEBUG_ASSERT(response); + AP_DEBUG_ASSERT(!task->response); + /* we used to clone the response into out own pool. But + * we have much tighter control over the EOR bucket nowadays, + * so just use the instance given */ + task->response = response; + if (response->rst_error) { + h2_task_rst(task, response->rst_error); + } +} + + +int h2_task_can_redo(h2_task *task) { + if (task->submitted + || (task->input.beam && h2_beam_was_received(task->input.beam)) + || !task->request) { + /* cannot repeat that. */ + return 0; + } + return (!strcmp("GET", task->request->method) + || !strcmp("HEAD", task->request->method) + || !strcmp("OPTIONS", task->request->method)); +} + +void h2_task_redo(h2_task *task) +{ + task->response = NULL; + task->rst_error = 0; +} + +void h2_task_rst(h2_task *task, int error) +{ + task->rst_error = error; + if (task->input.beam) { + h2_beam_abort(task->input.beam); + } + if (task->output.beam) { + h2_beam_abort(task->output.beam); + } +} + +void h2_task_shutdown(h2_task *task) +{ + if (task->input.beam) { + h2_beam_shutdown(task->input.beam); + } + if (task->output.beam) { + h2_beam_shutdown(task->output.beam); + } +} + /******************************************************************************* * Register various hooks */ @@ -517,6 +574,10 @@ void h2_task_destroy(h2_task *task) { ap_remove_input_filter_byhandle(task->c->input_filters, "H2_TO_H1"); ap_remove_output_filter_byhandle(task->c->output_filters, "H1_TO_H2"); + if (task->output.beam) { + h2_beam_destroy(task->output.beam); + task->output.beam = NULL; + } if (task->eor) { apr_bucket_destroy(task->eor); } diff --git a/modules/http2/h2_task.h b/modules/http2/h2_task.h index 450763ee40c..b9d531b5617 100644 --- a/modules/http2/h2_task.h +++ b/modules/http2/h2_task.h @@ -44,7 +44,7 @@ struct h2_mplx; struct h2_task; struct h2_req_engine; struct h2_request; -struct h2_resp_head; +struct h2_response; struct h2_worker; typedef struct h2_task h2_task; @@ -53,17 +53,10 @@ struct h2_task { const char *id; int stream_id; conn_rec *c; - struct h2_mplx *mplx; apr_pool_t *pool; - const struct h2_request *request; - apr_bucket *eor; - struct apr_thread_cond_t *cond; - unsigned int filters_set : 1; - unsigned int ser_headers : 1; - unsigned int frozen : 1; - unsigned int blocking : 1; - unsigned int detached : 1; + const struct h2_request *request; + struct h2_response *response; struct { struct h2_bucket_beam *beam; @@ -81,6 +74,24 @@ struct h2_task { apr_bucket_brigade *bb; } output; + struct h2_mplx *mplx; + struct apr_thread_cond_t *cond; + + int rst_error; /* h2 related stream abort error */ + unsigned int filters_set : 1; + unsigned int ser_headers : 1; + unsigned int frozen : 1; + unsigned int blocking : 1; + unsigned int detached : 1; + unsigned int orphaned : 1; /* h2_stream is gone for this task */ + unsigned int submitted : 1; /* response has been submitted to client */ + unsigned int worker_started : 1; /* h2_worker started processing for this io */ + unsigned int worker_done : 1; /* h2_worker finished for this io */ + + apr_time_t started_at; /* when processing started */ + apr_time_t done_at; /* when processing was done */ + apr_bucket *eor; + struct h2_req_engine *engine; /* engine hosted by this task */ struct h2_req_engine *assigned; /* engine that task has been assigned to */ request_rec *r; /* request being processed in this task */ @@ -93,6 +104,21 @@ void h2_task_destroy(h2_task *task); apr_status_t h2_task_do(h2_task *task); +void h2_task_set_response(h2_task *task, struct h2_response *response); + +void h2_task_redo(h2_task *task); +int h2_task_can_redo(h2_task *task); + +/** + * Reset the task with the given error code, resets all input/output. + */ +void h2_task_rst(h2_task *task, int error); + +/** + * Shuts all input/output down. Clears any buckets buffered and closes. + */ +void h2_task_shutdown(h2_task *task); + void h2_task_register_hooks(void); /* * One time, post config intialization. diff --git a/modules/http2/h2_util.c b/modules/http2/h2_util.c index 6bfd3d4791a..648305247a4 100644 --- a/modules/http2/h2_util.c +++ b/modules/http2/h2_util.c @@ -286,7 +286,7 @@ size_t h2_ihash_count(h2_ihash_t *ih) return apr_hash_count(ih->hash); } -int h2_ihash_is_empty(h2_ihash_t *ih) +int h2_ihash_empty(h2_ihash_t *ih) { return apr_hash_count(ih->hash) == 0; } diff --git a/modules/http2/h2_util.h b/modules/http2/h2_util.h index 23098f4abb0..8e7e2795f1f 100644 --- a/modules/http2/h2_util.h +++ b/modules/http2/h2_util.h @@ -49,7 +49,7 @@ typedef int h2_ihash_iter_t(void *ctx, void *val); h2_ihash_t *h2_ihash_create(apr_pool_t *pool, size_t offset_of_int); size_t h2_ihash_count(h2_ihash_t *ih); -int h2_ihash_is_empty(h2_ihash_t *ih); +int h2_ihash_empty(h2_ihash_t *ih); void *h2_ihash_get(h2_ihash_t *ih, int id); /** diff --git a/modules/http2/mod_http2.dsp b/modules/http2/mod_http2.dsp index cc8f6663871..949414872c9 100644 --- a/modules/http2/mod_http2.dsp +++ b/modules/http2/mod_http2.dsp @@ -145,10 +145,6 @@ SOURCE=./h2_h2.c # End Source File # Begin Source File -SOURCE=./h2_io.c -# End Source File -# Begin Source File - SOURCE=./h2_mplx.c # End Source File # Begin Source File diff --git a/modules/http2/mod_proxy_http2.c b/modules/http2/mod_proxy_http2.c index f6432654974..05d98aa458d 100644 --- a/modules/http2/mod_proxy_http2.c +++ b/modules/http2/mod_proxy_http2.c @@ -377,7 +377,7 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) { status = s2; break; } - if (!ctx->next && h2_ihash_is_empty(ctx->session->streams)) { + if (!ctx->next && h2_ihash_empty(ctx->session->streams)) { break; } }