From: Stefan Eissing Date: Fri, 19 Feb 2016 15:52:27 +0000 (+0000) Subject: task pools have their h2_io pools as ancestors, some code cleanup X-Git-Tag: 2.5.0-alpha~2038 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=3a6f891bddf1159993ae7047327768845a96cc96;p=thirdparty%2Fapache%2Fhttpd.git task pools have their h2_io pools as ancestors, some code cleanup git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1731259 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/modules/http2/h2_conn.c b/modules/http2/h2_conn.c index 34bc14c15e0..ac0379b3292 100644 --- a/modules/http2/h2_conn.c +++ b/modules/http2/h2_conn.c @@ -261,7 +261,7 @@ conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *p, /* Replace these */ c->master = master; - c->pool = p; + c->pool = p; c->current_thread = thread; c->conn_config = ap_create_conn_config(p); c->notes = apr_table_make(p, 5); diff --git a/modules/http2/h2_io.c b/modules/http2/h2_io.c index 6107c01ea5c..97ba08510bd 100644 --- a/modules/http2/h2_io.c +++ b/modules/http2/h2_io.c @@ -44,14 +44,6 @@ h2_io *h2_io_create(int id, apr_pool_t *pool) return io; } -void h2_io_destroy(h2_io *io) -{ - if (io->pool) { - apr_pool_destroy(io->pool); - /* gone */ - } -} - void h2_io_set_response(h2_io *io, h2_response *response) { AP_DEBUG_ASSERT(io->pool); diff --git a/modules/http2/h2_io.h b/modules/http2/h2_io.h index 9c1584a376a..6b126d70aa2 100644 --- a/modules/http2/h2_io.h +++ b/modules/http2/h2_io.h @@ -74,11 +74,6 @@ struct h2_io { */ h2_io *h2_io_create(int id, apr_pool_t *pool); -/** - * Frees any resources hold by the h2_io instance. - */ -void h2_io_destroy(h2_io *io); - /** * Set the response of this stream. */ diff --git a/modules/http2/h2_io_set.c b/modules/http2/h2_io_set.c index 2bb6e694691..e09497954f8 100644 --- a/modules/http2/h2_io_set.c +++ b/modules/http2/h2_io_set.c @@ -45,16 +45,6 @@ h2_io_set *h2_io_set_create(apr_pool_t *pool) return sp; } -void h2_io_set_destroy(h2_io_set *sp) -{ - int i; - for (i = 0; i < sp->list->nelts; ++i) { - h2_io *io = h2_io_IDX(sp->list, i); - h2_io_destroy(io); - } - sp->list->nelts = 0; -} - static int h2_stream_id_cmp(const void *s1, const void *s2) { h2_io **pio1 = (h2_io **)s1; @@ -91,7 +81,7 @@ apr_status_t h2_io_set_add(h2_io_set *sp, h2_io *io) int last; APR_ARRAY_PUSH(sp->list, h2_io*) = io; /* Normally, streams get added in ascending order if id. We - * keep the array sorted, so we just need to check of the newly + * keep the array sorted, so we just need to check if the newly * appended stream has a lower id than the last one. if not, * sorting is not necessary. */ @@ -111,9 +101,7 @@ static void remove_idx(h2_io_set *sp, int idx) --sp->list->nelts; n = sp->list->nelts - idx; if (n > 0) { - /* Close the hole in the array by moving the upper - * parts down one step. - */ + /* There are n h2_io* behind idx. Move the rest down */ h2_io **selts = (h2_io**)sp->list->elts; memmove(selts + idx, selts + idx + 1, n * sizeof(h2_io*)); } @@ -124,7 +112,7 @@ h2_io *h2_io_set_remove(h2_io_set *sp, h2_io *io) int i; for (i = 0; i < sp->list->nelts; ++i) { h2_io *e = h2_io_IDX(sp->list, i); - if (e == io) { + if (e->id == io->id) { remove_idx(sp, i); return e; } @@ -132,7 +120,7 @@ h2_io *h2_io_set_remove(h2_io_set *sp, h2_io *io) return NULL; } -h2_io *h2_io_set_pop_highest_prio(h2_io_set *set) +h2_io *h2_io_set_shift(h2_io_set *set) { /* For now, this just removes the first element in the set. * the name is misleading... diff --git a/modules/http2/h2_io_set.h b/modules/http2/h2_io_set.h index 04ff8702ed2..936e7252222 100644 --- a/modules/http2/h2_io_set.h +++ b/modules/http2/h2_io_set.h @@ -26,8 +26,6 @@ typedef struct h2_io_set h2_io_set; h2_io_set *h2_io_set_create(apr_pool_t *pool); -void h2_io_set_destroy(h2_io_set *set); - apr_status_t h2_io_set_add(h2_io_set *set, struct h2_io *io); h2_io *h2_io_set_get(h2_io_set *set, int stream_id); h2_io *h2_io_set_remove(h2_io_set *set, struct h2_io *io); @@ -48,9 +46,8 @@ typedef int h2_io_set_iter_fn(void *ctx, struct h2_io *io); * @param ctx user data for the callback * @return 1 iff iteration completed for all members */ -int h2_io_set_iter(h2_io_set *set, - h2_io_set_iter_fn *iter, void *ctx); +int h2_io_set_iter(h2_io_set *set, h2_io_set_iter_fn *iter, void *ctx); -h2_io *h2_io_set_pop_highest_prio(h2_io_set *set); +h2_io *h2_io_set_shift(h2_io_set *set); #endif /* defined(__mod_h2__h2_io_set__) */ diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index f5c4bb7acdf..1561fbffef5 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -146,14 +146,6 @@ static void h2_mplx_destroy(h2_mplx *m) "h2_mplx(%ld): destroy, ios=%d", m->id, (int)h2_io_set_size(m->stream_ios)); m->aborted = 1; - if (m->ready_ios) { - h2_io_set_destroy(m->ready_ios); - m->ready_ios = NULL; - } - if (m->stream_ios) { - h2_io_set_destroy(m->stream_ios); - m->stream_ios = NULL; - } check_tx_free(m); @@ -197,6 +189,7 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, if (!m->pool) { return NULL; } + apr_pool_tag(m->pool, "h2_mplx"); apr_allocator_owner_set(allocator, m->pool); status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT, @@ -211,6 +204,13 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, h2_mplx_destroy(m); return NULL; } + + status = apr_socket_create(&m->dummy_socket, APR_INET, SOCK_STREAM, + APR_PROTO_TCP, m->pool); + if (status != APR_SUCCESS) { + h2_mplx_destroy(m); + return NULL; + } m->q = h2_iq_create(m->pool, h2_config_geti(conf, H2_CONF_MAX_STREAMS)); m->stream_ios = h2_io_set_create(m->pool); @@ -283,7 +283,6 @@ static void io_destroy(h2_mplx *m, h2_io *io, int events) h2_io_set_remove(m->stream_ios, io); h2_io_set_remove(m->ready_ios, io); - h2_io_destroy(io); if (pool) { apr_pool_clear(pool); @@ -318,6 +317,37 @@ static int stream_done_iter(void *ctx, h2_io *io) return io_stream_done((h2_mplx*)ctx, io, 0); } +static int stream_print(void *ctx, h2_io *io) +{ + h2_mplx *m = ctx; + if (io && io->request) { + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ + "->03198: h2_stream(%ld-%d): %s %s %s -> %s %d" + "[orph=%d/started=%d/done=%d/eos_in=%d/eos_out=%d]", + m->id, io->id, + io->request->method, io->request->authority, io->request->path, + io->response? "http" : (io->rst_error? "reset" : "?"), + io->response? io->response->http_status : io->rst_error, + io->orphaned, io->processing_started, io->processing_done, + io->eos_in, io->eos_out); + } + else if (io) { + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ + "->03198: h2_stream(%ld-%d): NULL -> %s %d" + "[orph=%d/started=%d/done=%d/eos_in=%d/eos_out=%d]", + m->id, io->id, + io->response? "http" : (io->rst_error? "reset" : "?"), + io->response? io->response->http_status : io->rst_error, + io->orphaned, io->processing_started, io->processing_done, + io->eos_in, io->eos_out); + } + else { + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ + "->03198: h2_stream(%ld-NULL): NULL", m->id); + } + return 1; +} + apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) { apr_status_t status; @@ -337,7 +367,7 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) /* Any remaining ios have handed out requests to workers that are * not done yet. Any operation they do on their assigned stream ios will - * be errored ECONNRESET/ABORTED, so that should find out pretty soon. + * be errored ECONNRESET/ABORTED, so they should find out pretty soon. */ for (i = 0; h2_io_set_size(m->stream_ios) > 0; ++i) { m->join_wait = wait; @@ -357,6 +387,9 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) "h2_mplx(%ld): release, waiting for %d seconds now for " "all h2_workers to return, have still %d requests outstanding", m->id, i*wait_secs, (int)h2_io_set_size(m->stream_ios)); + if (i == 1) { + h2_io_set_iter(m->stream_ios, stream_print, m); + } } m->aborted = 1; apr_thread_cond_broadcast(m->request_done); @@ -413,61 +446,6 @@ apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error) return status; } -static const h2_request *pop_request(h2_mplx *m) -{ - const h2_request *req = NULL; - int sid; - while (!m->aborted && !req && (sid = h2_iq_shift(m->q)) > 0) { - h2_io *io = h2_io_set_get(m->stream_ios, sid); - if (io) { - req = io->request; - io->processing_started = 1; - if (sid > m->max_stream_started) { - m->max_stream_started = sid; - } - } - } - return req; -} - -void h2_mplx_request_done(h2_mplx *m, int stream_id, const h2_request **preq) -{ - int acquired; - - if (enter_mutex(m, &acquired) == APR_SUCCESS) { - if (stream_id) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%ld): request(%d) done", m->id, stream_id); - if (io) { - io->processing_done = 1; - h2_mplx_out_close(m, stream_id, NULL); - if (io->orphaned) { - io_destroy(m, io, 0); - if (m->join_wait) { - apr_thread_cond_signal(m->join_wait); - } - } - else { - /* hang around until the stream deregisteres */ - } - } - else { - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, - "h2_mplx(%ld): request(%d) done, no io found", - m->id, stream_id); - } - apr_thread_cond_broadcast(m->request_done); - } - - if (preq) { - /* someone wants another request, if we have */ - *preq = pop_request(m); - } - leave_mutex(m, acquired); - } -} - apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block, int stream_id, apr_bucket_brigade *bb, apr_table_t *trailers, @@ -666,7 +644,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_stream_set *streams) AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_io *io = h2_io_set_pop_highest_prio(m->ready_ios); + h2_io *io = h2_io_set_shift(m->ready_ios); if (io && !m->aborted) { stream = h2_stream_set_get(streams, io->id); if (stream) { @@ -685,7 +663,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_stream_set *streams) * reset by the client. Should no longer happen since such * streams should clear io's from the ready queue. */ - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO() "h2_mplx(%ld): stream for response %d closed, " "resetting io to close request processing", m->id, io->id); @@ -1019,6 +997,7 @@ static h2_io *open_io(h2_mplx *m, int stream_id) if (!io_pool) { apr_pool_create(&io_pool, m->pool); + apr_pool_tag(io_pool, "h2_io"); } else { m->spare_pool = NULL; @@ -1066,25 +1045,104 @@ apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, const h2_request *req, return status; } -const h2_request *h2_mplx_pop_request(h2_mplx *m, int *has_more) +static h2_task *pop_task(h2_mplx *m) { - const h2_request *req = NULL; + h2_task *task = NULL; + int sid; + while (!m->aborted && !task && (sid = h2_iq_shift(m->q)) > 0) { + h2_io *io = h2_io_set_get(m->stream_ios, sid); + if (io) { + conn_rec *c; + apr_pool_t *task_pool; + apr_allocator_t *task_allocator = NULL; + + /* We create a pool with its own allocator to be used for + * processing a request. This is the only way to have the processing + * independant of the worker pool as the h2_mplx pool as well as + * not sensitive to which thread it is in. + * In that sense, memory allocation and lifetime is similar to a master + * connection. + * The main goal in this is that slave connections and requests will + * - one day - be suspended and resumed in different threads. + */ + apr_allocator_create(&task_allocator); + apr_pool_create_ex(&task_pool, io->pool, NULL, task_allocator); + apr_pool_tag(task_pool, "h2_task"); + apr_allocator_owner_set(task_allocator, task_pool); + + c = h2_slave_create(m->c, task_pool, m->c->current_thread, m->dummy_socket); + task = h2_task_create(m->id, io->request, c, m); + + io->processing_started = 1; + if (sid > m->max_stream_started) { + m->max_stream_started = sid; + } + } + } + return task; +} + +h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more) +{ + h2_task *task = NULL; apr_status_t status; int acquired; AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { if (m->aborted) { - req = NULL; *has_more = 0; } else { - req = pop_request(m); + task = pop_task(m); *has_more = !h2_iq_empty(m->q); } leave_mutex(m, acquired); } - return req; + return task; +} + +void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask) +{ + int acquired; + + if (enter_mutex(m, &acquired) == APR_SUCCESS) { + if (task) { + h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld): task(%s) done", m->id, task->id); + /* clean our references and report request as done. Signal + * that we want another unless we have been aborted */ + /* TODO: this will keep a worker attached to this h2_mplx as + * long as it has requests to handle. Might no be fair to + * other mplx's. Perhaps leave after n requests? */ + + if (task->c) { + apr_pool_destroy(task->c->pool); + } + task = NULL; + if (io) { + io->processing_done = 1; + h2_mplx_out_close(m, io->id, NULL); + if (io->orphaned) { + io_destroy(m, io, 0); + if (m->join_wait) { + apr_thread_cond_signal(m->join_wait); + } + } + else { + /* hang around until the stream deregisteres */ + } + } + apr_thread_cond_broadcast(m->request_done); + } + + if (ptask) { + /* caller wants another task */ + *ptask = pop_task(m); + } + leave_mutex(m, acquired); + } } @@ -1172,11 +1230,11 @@ apr_status_t h2_mplx_engine_push(const char *engine_type, } if (!engine && einit) { - engine = apr_pcalloc(task->pool, sizeof(*engine)); - engine->pub.id = apr_psprintf(task->pool, "eng-%ld-%d", + engine = apr_pcalloc(task->c->pool, sizeof(*engine)); + engine->pub.id = apr_psprintf(task->c->pool, "eng-%ld-%d", m->id, m->next_eng_id++); - engine->pub.pool = task->pool; - engine->pub.type = apr_pstrdup(task->pool, engine_type); + engine->pub.pool = task->c->pool; + engine->pub.type = apr_pstrdup(task->c->pool, engine_type); engine->c = r->connection; engine->m = m; engine->io = task->io; @@ -1246,6 +1304,7 @@ static apr_status_t engine_pull(h2_mplx *m, h2_req_engine_i *engine, "h2_mplx(%ld): request %s pulled by engine %s", m->c->id, r->the_request, engine->pub.id); engine->no_live++; + r->connection->current_thread = engine->c->current_thread; *pr = r; return APR_SUCCESS; } @@ -1290,11 +1349,12 @@ static void engine_done(h2_mplx *m, h2_req_engine_i *engine, h2_task *task, m->id, task->id, aborted? "aborted":"done", engine->pub.id); h2_task_output_close(task->output); - h2_mplx_request_done(m, task->stream_id, NULL); - apr_pool_destroy(task->pool); engine->no_finished++; if (waslive) engine->no_live--; engine->no_assigned--; + if (task->c != engine->c) { /* do not release what the engine runs on */ + h2_mplx_task_done(m, task, NULL); + } } void h2_mplx_engine_done(h2_req_engine *pub_engine, conn_rec *r_conn) @@ -1322,10 +1382,11 @@ void h2_mplx_engine_exit(h2_req_engine *pub_engine) void *entry; ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, "h2_mplx(%ld): exit engine %s (%s), " - "has still %d requests queued, " + "has still %d requests queued, shutdown=%d," "assigned=%ld, live=%ld, finished=%ld", m->c->id, engine->pub.id, engine->pub.type, (int)apr_queue_size(engine->queue), + engine->shutdown, (long)engine->no_assigned, (long)engine->no_live, (long)engine->no_finished); while (apr_queue_trypop(engine->queue, &entry) == APR_SUCCESS) { @@ -1338,7 +1399,7 @@ void h2_mplx_engine_exit(h2_req_engine *pub_engine) engine_done(m, engine, task, 0, 1); } } - if (engine->no_assigned > 1 || engine->no_live > 1) { + if (engine->no_assigned > 0 || engine->no_live > 0) { ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, "h2_mplx(%ld): exit engine %s (%s), " "assigned=%ld, live=%ld, finished=%ld", diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index 724cfe1c2e9..7b8b30251d1 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -79,6 +79,7 @@ struct h2_mplx { struct apr_thread_cond_t *added_output; struct apr_thread_cond_t *request_done; struct apr_thread_cond_t *join_wait; + apr_socket_t *dummy_socket; apr_size_t stream_max_mem; apr_interval_time_t stream_timeout; @@ -130,7 +131,9 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, struct apr_thread_cond_t *wait */ void h2_mplx_abort(h2_mplx *mplx); -void h2_mplx_request_done(h2_mplx *m, int stream_id, const struct h2_request **preq); +struct h2_task *h2_mplx_pop_task(h2_mplx *mplx, int *has_more); + +void h2_mplx_task_done(h2_mplx *m, struct h2_task *task, struct h2_task **ptask); /** * Get the highest stream identifier that has been passed on to processing. @@ -194,8 +197,6 @@ apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, const struct h2_request */ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx); -const struct h2_request *h2_mplx_pop_request(h2_mplx *mplx, int *has_more); - /** * Register a callback for the amount of input data consumed per stream. The * will only ever be invoked from the thread creating this h2_mplx, e.g. when diff --git a/modules/http2/h2_private.h b/modules/http2/h2_private.h index 383adb1f0eb..b68613692da 100644 --- a/modules/http2/h2_private.h +++ b/modules/http2/h2_private.h @@ -24,5 +24,4 @@ extern module AP_MODULE_DECLARE_DATA http2_module; APLOG_USE_MODULE(http2); - #endif diff --git a/modules/http2/h2_proxy_session.c b/modules/http2/h2_proxy_session.c index a55d7ec7395..83755da7117 100644 --- a/modules/http2/h2_proxy_session.c +++ b/modules/http2/h2_proxy_session.c @@ -72,8 +72,8 @@ static apr_status_t proxy_session_pre_close(void *theconn) h2_proxy_session *session = p_conn->data; if (session && session->ngh2) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - "proxy_session(%s): shutdown, state=%d, streams=%d", + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, + "proxy_session(%s): pool cleanup, state=%d, streams=%d", session->id, session->state, h2_iq_size(session->streams)); dispatch_event(session, H2_PROXYS_EV_PRE_CLOSE, 0, NULL); @@ -200,27 +200,13 @@ static int before_frame_send(nghttp2_session *ngh2, const nghttp2_frame *frame, void *user_data) { h2_proxy_session *session = user_data; - switch (frame->hd.type) { - case NGHTTP2_GOAWAY: - if (APLOGcinfo(session->c)) { - char buffer[256]; - - h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0])); - ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, session->c, APLOGNO() - "h2_session(%s): sent FRAME[%s]", - session->id, buffer); - } - break; - default: - if (APLOGcdebug(session->c)) { - char buffer[256]; - - h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0])); - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO() - "h2_session(%s): sent FRAME[%s]", - session->id, buffer); - } - break; + if (APLOGcdebug(session->c)) { + char buffer[256]; + + h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0])); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO() + "h2_session(%s): sent FRAME[%s]", + session->id, buffer); } return 0; } @@ -546,7 +532,7 @@ h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn, nghttp2_session_callbacks_set_send_callback(cbs, raw_send); nghttp2_option_new(&option); - nghttp2_option_set_peer_max_concurrent_streams(option, 20); + nghttp2_option_set_peer_max_concurrent_streams(option, 100); nghttp2_session_client_new2(&session->ngh2, cbs, session, option); @@ -720,7 +706,7 @@ static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block) status = ap_get_brigade(session->c->input_filters, session->input, AP_MODE_READBYTES, block? APR_BLOCK_READ : APR_NONBLOCK_READ, - APR_BUCKET_BUFF_SIZE); + 64 * 1024); if (status == APR_SUCCESS) { if (APR_BRIGADE_EMPTY(session->input)) { status = APR_EAGAIN; @@ -778,7 +764,7 @@ static apr_status_t check_suspended(h2_proxy_session *session) return APR_SUCCESS; } else if (status != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(status)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c, + ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, session->c, "h2_proxy_stream(%s-%d): check input", session->id, stream_id); h2_iq_remove(session->suspended, stream_id); diff --git a/modules/http2/h2_request.c b/modules/http2/h2_request.c index a2c899144d4..9aa8d49e5ea 100644 --- a/modules/http2/h2_request.c +++ b/modules/http2/h2_request.c @@ -366,7 +366,7 @@ request_rec *h2_request_create_rec(const h2_request *req, conn_rec *conn) r->allowed_methods = ap_make_method_list(p, 2); - r->headers_in = apr_table_copy(r->pool, req->headers); + r->headers_in = apr_table_clone(r->pool, req->headers); r->trailers_in = apr_table_make(r->pool, 5); r->subprocess_env = apr_table_make(r->pool, 25); r->headers_out = apr_table_make(r->pool, 12); diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index cd49843a0e7..9b24e621c4d 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -89,6 +89,7 @@ h2_stream *h2_session_open_stream(h2_session *session, int stream_id) } else { apr_pool_create(&stream_pool, session->pool); + apr_pool_tag(stream_pool, "h2_stream"); } stream = h2_stream_open(stream_id, stream_pool, session); @@ -795,6 +796,7 @@ static h2_session *h2_session_create_int(conn_rec *c, if (status != APR_SUCCESS) { return NULL; } + apr_pool_tag(pool, "h2_session"); session = apr_pcalloc(pool, sizeof(h2_session)); if (session) { diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c index 8ccb2794194..7299b299cf6 100644 --- a/modules/http2/h2_task.c +++ b/modules/http2/h2_task.c @@ -35,6 +35,7 @@ #include "h2_private.h" #include "h2_conn.h" #include "h2_config.h" +#include "h2_ctx.h" #include "h2_from_h1.h" #include "h2_h2.h" #include "h2_mplx.h" @@ -179,49 +180,51 @@ static int h2_task_pre_conn(conn_rec* c, void *arg) } h2_task *h2_task_create(long session_id, const h2_request *req, - apr_pool_t *pool, h2_mplx *mplx) + conn_rec *c, h2_mplx *mplx) { - h2_task *task = apr_pcalloc(pool, sizeof(h2_task)); + h2_task *task = apr_pcalloc(c->pool, sizeof(h2_task)); if (task == NULL) { - ap_log_perror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, pool, + ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, c, APLOGNO(02941) "h2_task(%ld-%d): create stream task", session_id, req->id); h2_mplx_out_close(mplx, req->id, NULL); return NULL; } - task->id = apr_psprintf(pool, "%ld-%d", session_id, req->id); + task->id = apr_psprintf(c->pool, "%ld-%d", session_id, req->id); task->stream_id = req->id; - task->pool = pool; + task->c = c; task->mplx = mplx; task->request = req; task->input_eos = !req->body; task->ser_headers = req->serialize; + h2_ctx_create_for(c, task); + return task; } -apr_status_t h2_task_do(h2_task *task, conn_rec *c, apr_thread_cond_t *cond, +apr_status_t h2_task_do(h2_task *task, apr_thread_cond_t *cond, apr_socket_t *socket) { apr_status_t status; AP_DEBUG_ASSERT(task); task->io = cond; - task->input = h2_task_input_create(task, c); - task->output = h2_task_output_create(task, c); + task->input = h2_task_input_create(task, task->c); + task->output = h2_task_output_create(task, task->c); - ap_process_connection(c, socket); + ap_process_connection(task->c, socket); if (task->frozen) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, "h2_task(%s): process_conn returned frozen task", task->id); /* cleanup delayed */ status = APR_EAGAIN; } else { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, "h2_task(%s): processing done", task->id); status = APR_SUCCESS; } @@ -293,12 +296,12 @@ static int h2_task_process_conn(conn_rec* c) apr_status_t h2_task_freeze(h2_task *task, request_rec *r) { if (!task->frozen) { - conn_rec *c = task->output->c; + conn_rec *c = task->c; task->frozen = 1; task->frozen_out = apr_brigade_create(c->pool, c->bucket_alloc); ap_add_output_filter("H2_RESPONSE_FREEZE", task, r, r->connection); - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->output->c, + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, "h2_task(%s), frozen", task->id); } return APR_SUCCESS; @@ -308,7 +311,7 @@ apr_status_t h2_task_thaw(h2_task *task) { if (task->frozen) { task->frozen = 0; - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->output->c, + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c, "h2_task(%s), thawed", task->id); } return APR_SUCCESS; diff --git a/modules/http2/h2_task.h b/modules/http2/h2_task.h index 94f18315625..f2cc6dfd8ac 100644 --- a/modules/http2/h2_task.h +++ b/modules/http2/h2_task.h @@ -50,7 +50,7 @@ typedef struct h2_task h2_task; struct h2_task { const char *id; int stream_id; - apr_pool_t *pool; + conn_rec *c; struct h2_mplx *mplx; const struct h2_request *request; @@ -67,10 +67,9 @@ struct h2_task { }; h2_task *h2_task_create(long session_id, const struct h2_request *req, - apr_pool_t *pool, struct h2_mplx *mplx); + conn_rec *c, struct h2_mplx *mplx); -apr_status_t h2_task_do(h2_task *task, conn_rec *c, - struct apr_thread_cond_t *cond, apr_socket_t *socket); +apr_status_t h2_task_do(h2_task *task, struct apr_thread_cond_t *cond, apr_socket_t *socket); void h2_task_register_hooks(void); /* diff --git a/modules/http2/h2_worker.c b/modules/http2/h2_worker.c index fd0e9bae065..e28e1218f5b 100644 --- a/modules/http2/h2_worker.c +++ b/modules/http2/h2_worker.c @@ -35,87 +35,27 @@ static void* APR_THREAD_FUNC execute(apr_thread_t *thread, void *wctx) { h2_worker *worker = (h2_worker *)wctx; apr_status_t status; - apr_pool_t *task_pool = NULL; - - (void)thread; - /* Other code might want to see a socket for this connection this - * worker processes. Allocate one without further function... - */ - status = apr_socket_create(&worker->socket, - APR_INET, SOCK_STREAM, - APR_PROTO_TCP, worker->pool); - if (status != APR_SUCCESS) { - ap_log_perror(APLOG_MARK, APLOG_ERR, status, worker->pool, - APLOGNO(02948) "h2_worker(%d): alloc socket", - worker->id); - worker->worker_done(worker, worker->ctx); - return NULL; - } while (!worker->aborted) { h2_mplx *m; - const h2_request *req; + h2_task *task; /* Get a h2_mplx + h2_request from the main workers queue. */ - status = worker->get_next(worker, &m, &req, worker->ctx); + status = worker->get_next(worker, &m, &task, worker->ctx); - while (req) { - conn_rec *c, *master = m->c; - h2_task *task; - int stream_id = req->id; - - if (!task_pool) { - apr_allocator_t *task_allocator = NULL; - /* We create a root pool with its own allocator to be used for - * processing a request. This is the only way to have the processing - * independant of the worker pool as the h2_mplx pool as well as - * not sensitive to which thread it is in. - * In that sense, memory allocation and lifetime is similar to a master - * connection. - * The main goal in this is that slave connections and requests will - * - one day - be suspended and resumed in different threads. - */ - apr_allocator_create(&task_allocator); - apr_pool_create_ex(&task_pool, NULL, NULL, task_allocator); - apr_allocator_owner_set(task_allocator, task_pool); - } - - c = h2_slave_create(master, task_pool, - worker->thread, worker->socket); - - task = h2_task_create(m->id, req, task_pool, m); - h2_ctx_create_for(c, task); - - h2_task_do(task, c, worker->io, worker->socket); + while (task) { + h2_task_do(task, worker->io, m->dummy_socket); if (task->frozen) { /* this task was handed over to someone else for processing */ h2_task_thaw(task); - task_pool = NULL; - req = NULL; - h2_mplx_request_done(m, 0, worker->aborted? NULL : &req); - } - else { - /* clean our references and report request as done. Signal - * that we want another unless we have been aborted */ - /* TODO: this will keep a worker attached to this h2_mplx as - * long as it has requests to handle. Might no be fair to - * other mplx's. Perhaps leave after n requests? */ - apr_thread_cond_signal(worker->io); - if (task_pool) { - apr_pool_clear(task_pool); - } - req = NULL; - h2_mplx_request_done(m, stream_id, worker->aborted? NULL : &req); + task = NULL; } + apr_thread_cond_signal(worker->io); + h2_mplx_task_done(m, task, worker->aborted? NULL : &task); } } - if (worker->socket) { - apr_socket_close(worker->socket); - worker->socket = NULL; - } - worker->worker_done(worker, worker->ctx); return NULL; } @@ -135,6 +75,7 @@ h2_worker *h2_worker_create(int id, apr_allocator_create(&allocator); apr_allocator_max_free_set(allocator, ap_max_mem_free); apr_pool_create_ex(&pool, parent_pool, NULL, allocator); + apr_pool_tag(pool, "h2_worker"); apr_allocator_owner_set(allocator, pool); w = apr_pcalloc(pool, sizeof(h2_worker)); diff --git a/modules/http2/h2_worker.h b/modules/http2/h2_worker.h index 2b73610c54b..9c92f9a8369 100644 --- a/modules/http2/h2_worker.h +++ b/modules/http2/h2_worker.h @@ -31,7 +31,7 @@ typedef struct h2_worker h2_worker; * gets aborted (idle timeout, for example). */ typedef apr_status_t h2_worker_mplx_next_fn(h2_worker *worker, struct h2_mplx **pm, - const struct h2_request **preq, + struct h2_task **ptask, void *ctx); /* Invoked just before the worker thread exits. */ @@ -46,7 +46,6 @@ struct h2_worker { apr_thread_t *thread; apr_pool_t *pool; struct apr_thread_cond_t *io; - apr_socket_t *socket; h2_worker_mplx_next_fn *get_next; h2_worker_done_fn *worker_done; diff --git a/modules/http2/h2_workers.c b/modules/http2/h2_workers.c index 9d648e9948c..e600e75dc47 100644 --- a/modules/http2/h2_workers.c +++ b/modules/http2/h2_workers.c @@ -68,7 +68,7 @@ static void cleanup_zombies(h2_workers *workers, int lock) * the h2_workers lock. */ static apr_status_t get_mplx_next(h2_worker *worker, h2_mplx **pm, - const h2_request **preq, void *ctx) + struct h2_task **ptask, void *ctx) { apr_status_t status; apr_time_t max_wait, start_wait; @@ -79,7 +79,7 @@ static apr_status_t get_mplx_next(h2_worker *worker, h2_mplx **pm, status = apr_thread_mutex_lock(workers->lock); if (status == APR_SUCCESS) { - const h2_request *req = NULL; + struct h2_task *task = NULL; h2_mplx *m = NULL; int has_more = 0; @@ -87,7 +87,7 @@ static apr_status_t get_mplx_next(h2_worker *worker, h2_mplx **pm, ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, "h2_worker(%d): looking for work", h2_worker_get_id(worker)); - while (!req && !h2_worker_is_aborted(worker) && !workers->aborted) { + while (!task && !h2_worker_is_aborted(worker) && !workers->aborted) { /* Get the next h2_mplx to process that has a task to hand out. * If it does, place it at the end of the queu and return the @@ -99,12 +99,12 @@ static apr_status_t get_mplx_next(h2_worker *worker, h2_mplx **pm, * we do a timed wait or block indefinitely. */ m = NULL; - while (!req && !H2_MPLX_LIST_EMPTY(&workers->mplxs)) { + while (!task && !H2_MPLX_LIST_EMPTY(&workers->mplxs)) { m = H2_MPLX_LIST_FIRST(&workers->mplxs); H2_MPLX_REMOVE(m); - req = h2_mplx_pop_request(m, &has_more); - if (req) { + task = h2_mplx_pop_task(m, &has_more); + if (task) { if (has_more) { H2_MPLX_LIST_INSERT_TAIL(&workers->mplxs, m); } @@ -115,7 +115,7 @@ static apr_status_t get_mplx_next(h2_worker *worker, h2_mplx **pm, } } - if (!req) { + if (!task) { /* Need to wait for a new mplx to arrive. */ cleanup_zombies(workers, 0); @@ -152,13 +152,9 @@ static apr_status_t get_mplx_next(h2_worker *worker, h2_mplx **pm, /* Here, we either have gotten task and mplx for the worker or * needed to give up with more than enough workers. */ - if (req) { - ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s, - "h2_worker(%d): start request(%ld-%d)", - h2_worker_get_id(worker), m->id, req->id); + if (task) { *pm = m; - *preq = req; - + *ptask = task; if (has_more && workers->idle_worker_count > 1) { apr_thread_cond_signal(workers->mplx_added); } @@ -238,6 +234,7 @@ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool, * happen on the pool handed to us, which we do not guard. */ apr_pool_create(&pool, server_pool); + apr_pool_tag(pool, "h2_workers"); workers = apr_pcalloc(pool, sizeof(h2_workers)); if (workers) { workers->s = s; diff --git a/modules/http2/mod_proxy_http2.c b/modules/http2/mod_proxy_http2.c index dbb041cb4b6..3cdf44ae74c 100644 --- a/modules/http2/mod_proxy_http2.c +++ b/modules/http2/mod_proxy_http2.c @@ -194,7 +194,7 @@ static apr_status_t proxy_engine_init(h2_req_engine *engine, request_rec *r) h2_proxy_ctx *ctx = ap_get_module_config(r->connection->conn_config, &proxy_http2_module); if (ctx) { - engine->capacity = 20; /* conservative guess until we know */ + engine->capacity = 100; /* guess until we know */ ctx->engine = engine; return APR_SUCCESS; } @@ -222,7 +222,7 @@ static apr_status_t add_request(h2_proxy_session *session, request_rec *r) } static void request_done(h2_proxy_session *session, request_rec *r) -{ +{ h2_proxy_ctx *ctx = session->user_data; if (req_engine_done && r != ctx->rbase) { @@ -237,6 +237,7 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx, request_rec *r) { apr_status_t status = OK; h2_proxy_session *session; +setup_session: /* Step Two: Make the Connection (or check that an already existing * socket is still usable). On success, we have a socket connected to * backend->hostname. */ @@ -287,9 +288,9 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx, request_rec *r) { return HTTP_SERVICE_UNAVAILABLE; } +run_session: session->user_data = ctx; add_request(session, r); - status = APR_EAGAIN; while (APR_STATUS_IS_EAGAIN(status)) { status = h2_proxy_session_process(session); @@ -305,10 +306,24 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx, request_rec *r) { } } - if (session->state == H2_PROXYS_ST_DONE) { + if (session->state == H2_PROXYS_ST_DONE || status != APR_SUCCESS) { ctx->p_conn->close = 1; } + if (!ctx->standalone) { + ctx->engine->capacity = session->remote_max_concurrent; + if (req_engine_pull(ctx->engine, APR_NONBLOCK_READ, &r) == APR_SUCCESS) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + "h2_proxy_session(%s): idle, pulled request %s", + session->id, r->the_request); + add_request(session, r); + if (ctx->p_conn->close) { + goto setup_session; + } + goto run_session; + } + } + if (session->streams && !h2_iq_empty(session->streams)) { ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, ctx->p_conn->connection, @@ -445,15 +460,6 @@ static int proxy_http2_handler(request_rec *r, } status = proxy_engine_run(ctx, r); - if (!ctx->standalone && status == APR_SUCCESS) { - apr_status_t s2; - do { - s2 = req_engine_pull(ctx->engine, APR_BLOCK_READ, &r); - if (s2 == APR_SUCCESS) { - s2 = proxy_engine_run(ctx, r); - } - } while (s2 != APR_EOF); - } cleanup: if (!ctx->standalone && ctx->engine && req_engine_exit) {