From: Stefan Eissing Date: Mon, 15 Feb 2016 17:10:54 +0000 (+0000) Subject: first working h2 request engine implementation that does serial processing of proxy... X-Git-Tag: 2.5.0-alpha~2070 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=457462dfddf43b0975dddeeba7d5f60735587101;p=thirdparty%2Fapache%2Fhttpd.git first working h2 request engine implementation that does serial processing of proxy requests git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1730572 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/modules/http2/h2_from_h1.c b/modules/http2/h2_from_h1.c index 59ee655d86a..8e1f163a2aa 100644 --- a/modules/http2/h2_from_h1.c +++ b/modules/http2/h2_from_h1.c @@ -49,12 +49,6 @@ h2_from_h1 *h2_from_h1_create(int stream_id, apr_pool_t *pool) return from_h1; } -apr_status_t h2_from_h1_destroy(h2_from_h1 *from_h1) -{ - from_h1->bb = NULL; - return APR_SUCCESS; -} - static void set_state(h2_from_h1 *from_h1, h2_from_h1_state_t state) { if (from_h1->state != state) { diff --git a/modules/http2/h2_from_h1.h b/modules/http2/h2_from_h1.h index cdd38ca605f..af5dea24c15 100644 --- a/modules/http2/h2_from_h1.h +++ b/modules/http2/h2_from_h1.h @@ -60,8 +60,6 @@ struct h2_from_h1 { h2_from_h1 *h2_from_h1_create(int stream_id, apr_pool_t *pool); -apr_status_t h2_from_h1_destroy(h2_from_h1 *response); - apr_status_t h2_from_h1_read_response(h2_from_h1 *from_h1, ap_filter_t* f, apr_bucket_brigade* bb); diff --git a/modules/http2/h2_io.h b/modules/http2/h2_io.h index 5957feddc75..f0b085b0d2c 100644 --- a/modules/http2/h2_io.h +++ b/modules/http2/h2_io.h @@ -20,6 +20,7 @@ struct h2_response; struct apr_thread_cond_t; struct h2_mplx; struct h2_request; +struct h2_task; typedef apr_status_t h2_io_data_cb(void *ctx, const char *data, apr_off_t len); @@ -62,6 +63,9 @@ struct h2_io { apr_size_t input_consumed; /* how many bytes have been read */ int files_handles_owned; + + struct h2_task *task; /* parked task */ + request_rec *r; /* parked request */ }; /******************************************************************************* diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 3c4d219197d..80b36c05ca0 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -17,7 +17,7 @@ #include #include -#include +#include #include #include #include @@ -27,9 +27,12 @@ #include #include +#include "mod_http2.h" + #include "h2_private.h" #include "h2_config.h" #include "h2_conn.h" +#include "h2_ctx.h" #include "h2_h2.h" #include "h2_io.h" #include "h2_io_set.h" @@ -390,6 +393,9 @@ apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error) * for processing, e.g. when we received all HEADERs. But when * a stream is cancelled very early, it will not exist. */ if (io) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld-%d): marking stream as done.", + m->id, stream_id); io_stream_done(m, io, rst_error); } @@ -415,25 +421,56 @@ static const h2_request *pop_request(h2_mplx *m) return req; } +static apr_status_t h2_mplx_engine_schedule(h2_mplx *m, request_rec *r) +{ + if (!m->engine_queue) { + apr_queue_create(&m->engine_queue, 200, m->pool); + } + return apr_queue_trypush(m->engine_queue, r); +} + void h2_mplx_request_done(h2_mplx **pm, int stream_id, const h2_request **preq) { h2_mplx *m = *pm; int acquired; if (enter_mutex(m, &acquired) == APR_SUCCESS) { - h2_io *io = h2_io_set_get(m->stream_ios, stream_id); - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%ld): request(%d) done", m->id, stream_id); - if (io) { - io->processing_done = 1; - if (io->orphaned) { - io_destroy(m, io, 0); - if (m->join_wait) { - apr_thread_cond_signal(m->join_wait); + if (stream_id) { + h2_io *io = h2_io_set_get(m->stream_ios, stream_id); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld): request(%d) done", m->id, stream_id); + if (io) { + request_rec *r = io->r; + + if (io->orphaned) { + io->processing_done = 1; + } + else if (r) { + /* A parked request which is being transferred from + * one worker thread to another. This request_done call + * was from the initial thread and now it is safe to + * schedule it for further processing. */ + h2_task_thaw(io->task); + io->task = NULL; + io->r = NULL; + h2_mplx_engine_schedule(*pm, r); + } + else { + io->processing_done = 1; + } + + if (io->processing_done) { + h2_io_out_close(io, NULL); + if (io->orphaned) { + io_destroy(m, io, 0); + if (m->join_wait) { + apr_thread_cond_signal(m->join_wait); + } + } + else { + /* hang around until the stream deregisteres */ + } } - } - else { - /* hang around until the stream deregisteres */ } } @@ -800,7 +837,7 @@ apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id, h2_io *io = h2_io_set_get(m->stream_ios, stream_id); if (io && !io->orphaned) { status = out_write(m, io, f, bb, trailers, iowait); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, "h2_mplx(%ld-%d): write with trailers=%s", m->id, io->id, trailers? "yes" : "no"); H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_write"); @@ -1049,3 +1086,107 @@ const h2_request *h2_mplx_pop_request(h2_mplx *m, int *has_more) return req; } +apr_status_t h2_mplx_engine_push(h2_mplx *m, h2_task *task, + const char *engine_type, + request_rec *r, h2_mplx_engine_init *einit) +{ + apr_status_t status; + int acquired; + + AP_DEBUG_ASSERT(m); + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { + h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id); + if (!io || io->orphaned) { + status = APR_ECONNABORTED; + } + else { + h2_req_engine *engine; + + apr_table_set(r->connection->notes, H2_TASK_ID_NOTE, task->id); + status = APR_EOF; + engine = m->engine; /* just a single one for now */ + if (task->ser_headers) { + /* Max compatibility, deny processing of this */ + } + else if (!engine && einit) { + engine = apr_pcalloc(r->connection->pool, sizeof(*engine)); + engine->id = 1; + engine->c = r->connection; + engine->pool = r->connection->pool; + engine->type = apr_pstrdup(engine->pool, engine_type); + + status = einit(engine, r); + if (status == APR_SUCCESS) { + m->engine = engine; + ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, + "h2_mplx(%ld): init engine %d (%s)", + m->c->id, engine->id, engine->type); + } + } + else if (engine && !strcmp(engine->type, engine_type)) { + if (status == APR_SUCCESS) { + /* this task will be processed in another thread, + * freeze any I/O for the time being. */ + h2_task_freeze(task, r); + io->task = task; + io->r = r; + } + ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, r, + "h2_mplx(%ld): push request %s", + m->c->id, r->the_request); + } + } + + leave_mutex(m, acquired); + } + return status; +} + +apr_status_t h2_mplx_engine_pull(h2_mplx *m, h2_task *task, + struct h2_req_engine *engine, + apr_time_t timeout, request_rec **pr) +{ + apr_status_t status; + int acquired; + + AP_DEBUG_ASSERT(m); + if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { + status = APR_ECONNABORTED; + if (m->engine == engine && m->engine_queue) { + void *elem; + status = apr_queue_trypop(m->engine_queue, &elem); + if (status == APR_SUCCESS) { + *pr = elem; + ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, *pr, + "h2_mplx(%ld): request %s pulled by engine %d", + m->c->id, (*pr)->the_request, engine->id); + } + } + leave_mutex(m, acquired); + } + return status; +} + +void h2_mplx_engine_done(h2_mplx *m, struct h2_task *task, conn_rec *r_conn) +{ + int stream_id = task->stream_id; + h2_task_output_close(task->output); + h2_mplx_request_done(&m, stream_id, NULL); + apr_pool_destroy(r_conn->pool); +} + +void h2_mplx_engine_exit(h2_mplx *m, struct h2_task *task, + struct h2_req_engine *engine) +{ + int acquired; + + AP_DEBUG_ASSERT(m); + if (enter_mutex(m, &acquired) == APR_SUCCESS) { + /* TODO: shutdown of engine->c */ + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld): exit engine %d (%s)", + m->c->id, engine->id, engine->type); + m->engine = NULL; + leave_mutex(m, acquired); + } +} diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index 024401dbe51..837025f996a 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -47,7 +47,9 @@ struct apr_thread_cond_t; struct h2_workers; struct h2_stream_set; struct h2_task_queue; +struct h2_req_engine; +#include #include "h2_io.h" typedef struct h2_mplx h2_mplx; @@ -87,6 +89,9 @@ struct h2_mplx { h2_mplx_consumed_cb *input_consumed; void *input_consumed_ctx; + + struct h2_req_engine *engine; + apr_queue_t *engine_queue; }; @@ -373,4 +378,24 @@ APR_RING_INSERT_TAIL((b), ap__b, h2_mplx, link); \ #define H2_MPLX_REMOVE(e) APR_RING_REMOVE((e), link) +/******************************************************************************* + * h2_mplx h2_req_engine handling. + ******************************************************************************/ + +typedef apr_status_t h2_mplx_engine_init(struct h2_req_engine *engine, + request_rec *r); + +apr_status_t h2_mplx_engine_push(h2_mplx *m, struct h2_task *task, + const char *engine_type, + request_rec *r, h2_mplx_engine_init *einit); + +apr_status_t h2_mplx_engine_pull(h2_mplx *m, struct h2_task *task, + struct h2_req_engine *engine, + apr_time_t timeout, request_rec **pr); + +void h2_mplx_engine_done(h2_mplx *m, struct h2_task *task, conn_rec *r_conn); + +void h2_mplx_engine_exit(h2_mplx *m, struct h2_task *task, + struct h2_req_engine *engine); + #endif /* defined(__mod_h2__h2_mplx__) */ diff --git a/modules/http2/h2_proxy_session.c b/modules/http2/h2_proxy_session.c index a40f825f3f3..8f7b5d0a988 100644 --- a/modules/http2/h2_proxy_session.c +++ b/modules/http2/h2_proxy_session.c @@ -18,6 +18,7 @@ #include #include +#include #include "h2.h" #include "h2_request.h" @@ -60,6 +61,45 @@ static apr_status_t proxy_session_shutdown(void *theconn) return APR_SUCCESS; } +static int proxy_pass_brigade(apr_bucket_alloc_t *bucket_alloc, + proxy_conn_rec *p_conn, + conn_rec *origin, apr_bucket_brigade *bb, + int flush) +{ + apr_status_t status; + apr_off_t transferred; + + if (flush) { + apr_bucket *e = apr_bucket_flush_create(bucket_alloc); + APR_BRIGADE_INSERT_TAIL(bb, e); + } + apr_brigade_length(bb, 0, &transferred); + if (transferred != -1) + p_conn->worker->s->transferred += transferred; + status = ap_pass_brigade(origin->output_filters, bb); + /* Cleanup the brigade now to avoid buckets lifetime + * issues in case of error returned below. */ + apr_brigade_cleanup(bb); + if (status != APR_SUCCESS) { + ap_log_cerror(APLOG_MARK, APLOG_ERR, status, origin, APLOGNO(01084) + "pass request body failed to %pI (%s)", + p_conn->addr, p_conn->hostname); + if (origin->aborted) { + const char *ssl_note; + + if (((ssl_note = apr_table_get(origin->notes, "SSL_connect_rv")) + != NULL) && (strcmp(ssl_note, "err") == 0)) { + return HTTP_INTERNAL_SERVER_ERROR; + } + return HTTP_GATEWAY_TIME_OUT; + } + else { + return HTTP_BAD_REQUEST; + } + } + return OK; +} + static ssize_t raw_send(nghttp2_session *ngh2, const uint8_t *data, size_t length, int flags, void *user_data) { @@ -75,9 +115,9 @@ static ssize_t raw_send(nghttp2_session *ngh2, const uint8_t *data, session->c->bucket_alloc); APR_BRIGADE_INSERT_TAIL(session->output, b); - status = ap_proxy_pass_brigade(session->c->bucket_alloc, session->r, - session->p_conn, session->c, - session->output, flush); + status = proxy_pass_brigade(session->c->bucket_alloc, + session->p_conn, session->c, + session->output, flush); if (status != APR_SUCCESS) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, "h2_proxy_sesssion(%ld): sending", session->c->id); @@ -203,6 +243,16 @@ static apr_status_t h2_proxy_stream_add_header_out(h2_proxy_stream *stream, return APR_SUCCESS; } +static int log_header(void *ctx, const char *key, const char *value) +{ + h2_proxy_stream *stream = ctx; + + ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r, + "h2_proxy_stream(%ld-%d), header_out %s: %s", + stream->session->c->id, stream->id, key, value); + return 1; +} + static void h2_proxy_stream_end_headers_out(h2_proxy_stream *stream) { h2_proxy_session *session = stream->session; @@ -254,6 +304,13 @@ static void h2_proxy_stream_end_headers_out(h2_proxy_stream *stream) server_name, portstr) ); } + + if (APLOGrtrace2(stream->r)) { + ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r, + "h2_proxy_stream(%ld-%d), header_out after merging", + stream->session->c->id, stream->id); + apr_table_do(log_header, stream, stream->r->headers_out, NULL); + } } static int on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags, @@ -278,7 +335,8 @@ static int on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags, stream->data_received = 1; } - b = apr_bucket_transient_create((const char*)data, len, session->c->bucket_alloc); + b = apr_bucket_transient_create((const char*)data, len, + stream->r->connection->bucket_alloc); APR_BRIGADE_INSERT_TAIL(stream->output, b); status = ap_pass_brigade(stream->r->output_filters, stream->output); if (status != APR_SUCCESS) { @@ -344,7 +402,6 @@ static ssize_t stream_data_read(nghttp2_session *ngh2, int32_t stream_id, uint32_t *data_flags, nghttp2_data_source *source, void *user_data) { - h2_proxy_session *session = user_data; h2_proxy_stream *stream; apr_status_t status = APR_SUCCESS; @@ -358,9 +415,9 @@ static ssize_t stream_data_read(nghttp2_session *ngh2, int32_t stream_id, status = ap_get_brigade(stream->r->input_filters, stream->input, AP_MODE_READBYTES, APR_BLOCK_READ, H2MIN(APR_BUCKET_BUFF_SIZE, length)); - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, - "h2_proxy_stream(%ld-%d): request body read", - session->c->id, stream->id); + ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r, + "h2_proxy_stream(%d): request body read", + stream->id); } if (status == APR_SUCCESS) { @@ -396,9 +453,9 @@ static ssize_t stream_data_read(nghttp2_session *ngh2, int32_t stream_id, apr_bucket_delete(b); } - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, - "h2_proxy_stream(%ld-%d): request body read %ld bytes, flags=%d", - session->c->id, stream->id, (long)readlen, (int)*data_flags); + ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r, + "h2_proxy_stream(%d): request body read %ld bytes, flags=%d", + stream->id, (long)readlen, (int)*data_flags); return readlen; } else if (APR_STATUS_IS_EAGAIN(status)) { @@ -424,7 +481,6 @@ h2_proxy_session *h2_proxy_session_setup(request_rec *r, proxy_conn_rec *p_conn, session->c = p_conn->connection; session->p_conn = p_conn; session->conf = conf; - session->r = r; session->pool = p_conn->scpool; session->window_bits_default = 30; session->window_bits_connection = 30; @@ -471,7 +527,7 @@ apr_status_t h2_proxy_session_open_stream(h2_proxy_session *session, const char h2_proxy_stream *stream; apr_uri_t puri; const char *authority, *scheme, *path; - + stream = apr_pcalloc(r->pool, sizeof(*stream)); stream->pool = r->pool; @@ -625,9 +681,14 @@ apr_status_t h2_proxy_stream_process(h2_proxy_stream *stream) rv = nghttp2_submit_request(session->ngh2, NULL, hd->nv, hd->nvlen, pp, stream); - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - "h2_session(%ld): submit request -> %d", - session->c->id, rv); + if (APLOGcdebug(session->c)) { + const char *task_id = apr_table_get(stream->r->connection->notes, + H2_TASK_ID_NOTE); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + "h2_session(%ld): submit %s%s -> %d (task %s)", + session->c->id, stream->req->authority, stream->req->path, + rv, task_id); + } if (rv > 0) { stream->id = rv; stream->state = H2_STREAM_ST_OPEN; diff --git a/modules/http2/h2_proxy_session.h b/modules/http2/h2_proxy_session.h index 598a2a037a6..38924afdee6 100644 --- a/modules/http2/h2_proxy_session.h +++ b/modules/http2/h2_proxy_session.h @@ -24,7 +24,6 @@ typedef struct h2_proxy_session { conn_rec *c; proxy_conn_rec *p_conn; proxy_server_conf *conf; - request_rec *r; apr_pool_t *pool; nghttp2_session *ngh2; /* the nghttp2 session itself */ @@ -66,4 +65,6 @@ apr_status_t h2_proxy_session_open_stream(h2_proxy_session *s, const char *url, request_rec *r, h2_proxy_stream **pstream); apr_status_t h2_proxy_stream_process(h2_proxy_stream *stream); +#define H2_PROXY_REQ_URL_NOTE "h2-proxy-req-url" + #endif /* h2_proxy_session_h */ diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c index b1f6bf7f3fb..9fe003247d7 100644 --- a/modules/http2/h2_task.c +++ b/modules/http2/h2_task.c @@ -85,6 +85,28 @@ static apr_status_t h2_filter_read_response(ap_filter_t* f, return h2_from_h1_read_response(task->output->from_h1, f, bb); } +static apr_status_t h2_response_freeze_filter(ap_filter_t* f, + apr_bucket_brigade* bb) +{ + h2_task *task = f->ctx; + AP_DEBUG_ASSERT(task); + + if (task->frozen) { + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, f->r, + "h2_response_freeze_filter, saving"); + APR_BRIGADE_CONCAT(task->frozen_out, bb); + return APR_SUCCESS; + } + + if (APR_BRIGADE_EMPTY(bb)) { + return APR_SUCCESS; + } + + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, f->r, + "h2_response_freeze_filter, passing"); + return ap_pass_brigade(f->next, bb); +} + /******************************************************************************* * Register various hooks */ @@ -119,6 +141,8 @@ void h2_task_register_hooks(void) NULL, AP_FTYPE_PROTOCOL); ap_register_output_filter("H2_TRAILERS", h2_response_trailers_filter, NULL, AP_FTYPE_PROTOCOL); + ap_register_output_filter("H2_RESPONSE_FREEZE", h2_response_freeze_filter, + NULL, AP_FTYPE_RESOURCE); } /* post config init */ @@ -168,6 +192,7 @@ h2_task *h2_task_create(long session_id, const h2_request *req, task->id = apr_psprintf(pool, "%ld-%d", session_id, req->id); task->stream_id = req->id; + task->pool = pool; task->mplx = mplx; task->request = req; task->input_eos = !req->body; @@ -179,6 +204,8 @@ h2_task *h2_task_create(long session_id, const h2_request *req, apr_status_t h2_task_do(h2_task *task, conn_rec *c, apr_thread_cond_t *cond, apr_socket_t *socket) { + apr_status_t status; + AP_DEBUG_ASSERT(task); task->io = cond; task->input = h2_task_input_create(task, c); @@ -186,21 +213,27 @@ apr_status_t h2_task_do(h2_task *task, conn_rec *c, apr_thread_cond_t *cond, ap_process_connection(c, socket); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, - "h2_task(%s): processing done", task->id); - - h2_task_input_destroy(task->input); - h2_task_output_close(task->output); - h2_task_output_destroy(task->output); - task->io = NULL; + if (task->frozen) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, + "h2_task(%s): process_conn returned frozen task", + task->id); + /* cleanup delayed */ + status = APR_EAGAIN; + } + else { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, + "h2_task(%s): processing done", task->id); + status = APR_SUCCESS; + } - return APR_SUCCESS; + return status; } -static apr_status_t h2_task_process_request(const h2_request *req, conn_rec *c) +static apr_status_t h2_task_process_request(h2_task *task, conn_rec *c) { - request_rec *r; + const h2_request *req = task->request; conn_state_t *cs = c->cs; + request_rec *r; r = h2_request_create_rec(req, c); if (r && (r->status == HTTP_OK)) { @@ -210,10 +243,15 @@ static apr_status_t h2_task_process_request(const h2_request *req, conn_rec *c) cs->state = CONN_STATE_HANDLER; } ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, - "h2_task(%ld-%d): start process_request", c->id, req->id); + "h2_task(%s): start process_request", task->id); ap_process_request(r); + if (task->frozen) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, + "h2_task(%s): process_request frozen", task->id); + } ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, - "h2_task(%ld-%d): process_request done", c->id, req->id); + "h2_task(%s): process_request done", task->id); + /* After the call to ap_process_request, the * request pool will have been deleted. We set * r=NULL here to ensure that any dereference @@ -221,11 +259,10 @@ static apr_status_t h2_task_process_request(const h2_request *req, conn_rec *c) * will result in a segfault immediately instead * of nondeterministic failures later. */ - if (cs) + if (cs) cs->state = CONN_STATE_WRITE_COMPLETION; r = NULL; } - ap_update_child_status_from_conn(c->sbh, SERVER_BUSY_WRITE, c); c->sbh = NULL; return APR_SUCCESS; @@ -244,7 +281,7 @@ static int h2_task_process_conn(conn_rec* c) if (!ctx->task->ser_headers) { ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c, "h2_h2, processing request directly"); - h2_task_process_request(ctx->task->request, c); + h2_task_process_request(ctx->task, c); return DONE; } ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c, @@ -252,3 +289,24 @@ static int h2_task_process_conn(conn_rec* c) } return DECLINED; } + +apr_status_t h2_task_freeze(h2_task *task, request_rec *r) +{ + if (!task->frozen) { + conn_rec *c = task->output->c; + + task->frozen = 1; + task->frozen_out = apr_brigade_create(c->pool, c->bucket_alloc); + ap_add_output_filter("H2_RESPONSE_FREEZE", task, r, r->connection); + } + return APR_SUCCESS; +} + +apr_status_t h2_task_thaw(h2_task *task) +{ + if (task->frozen) { + task->frozen = 0; + } + return APR_SUCCESS; +} + diff --git a/modules/http2/h2_task.h b/modules/http2/h2_task.h index ce17f85167e..94f18315625 100644 --- a/modules/http2/h2_task.h +++ b/modules/http2/h2_task.h @@ -50,16 +50,20 @@ typedef struct h2_task h2_task; struct h2_task { const char *id; int stream_id; + apr_pool_t *pool; struct h2_mplx *mplx; const struct h2_request *request; unsigned int filters_set : 1; unsigned int input_eos : 1; unsigned int ser_headers : 1; + unsigned int frozen : 1; struct h2_task_input *input; struct h2_task_output *output; struct apr_thread_cond_t *io; /* used to wait for events on */ + + apr_bucket_brigade *frozen_out; }; h2_task *h2_task_create(long session_id, const struct h2_request *req, @@ -77,4 +81,7 @@ apr_status_t h2_task_init(apr_pool_t *pool, server_rec *s); extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_in) *h2_task_logio_add_bytes_in; extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *h2_task_logio_add_bytes_out; +apr_status_t h2_task_freeze(h2_task *task, request_rec *r); +apr_status_t h2_task_thaw(h2_task *task); + #endif /* defined(__mod_h2__h2_task__) */ diff --git a/modules/http2/h2_task_input.c b/modules/http2/h2_task_input.c index 16ef3c8d147..953433d4593 100644 --- a/modules/http2/h2_task_input.c +++ b/modules/http2/h2_task_input.c @@ -75,11 +75,6 @@ h2_task_input *h2_task_input_create(h2_task *task, conn_rec *c) return input; } -void h2_task_input_destroy(h2_task_input *input) -{ - input->bb = NULL; -} - apr_status_t h2_task_input_read(h2_task_input *input, ap_filter_t* f, apr_bucket_brigade* bb, diff --git a/modules/http2/h2_task_input.h b/modules/http2/h2_task_input.h index bfc01f580f8..6085b78c8f8 100644 --- a/modules/http2/h2_task_input.h +++ b/modules/http2/h2_task_input.h @@ -34,8 +34,6 @@ struct h2_task_input { h2_task_input *h2_task_input_create(struct h2_task *task, conn_rec *c); -void h2_task_input_destroy(h2_task_input *input); - apr_status_t h2_task_input_read(h2_task_input *input, ap_filter_t* filter, apr_bucket_brigade* brigade, diff --git a/modules/http2/h2_task_output.c b/modules/http2/h2_task_output.c index 340b82d94d4..b717fc3d6ec 100644 --- a/modules/http2/h2_task_output.c +++ b/modules/http2/h2_task_output.c @@ -49,14 +49,6 @@ h2_task_output *h2_task_output_create(h2_task *task, conn_rec *c) return output; } -void h2_task_output_destroy(h2_task_output *output) -{ - if (output->from_h1) { - h2_from_h1_destroy(output->from_h1); - output->from_h1 = NULL; - } -} - static apr_table_t *get_trailers(h2_task_output *output) { if (!output->trailers_passed) { @@ -75,7 +67,7 @@ static apr_table_t *get_trailers(h2_task_output *output) } static apr_status_t open_if_needed(h2_task_output *output, ap_filter_t *f, - apr_bucket_brigade *bb) + apr_bucket_brigade *bb, const char *caller) { if (output->state == H2_TASK_OUT_INIT) { h2_response *response; @@ -86,9 +78,10 @@ static apr_status_t open_if_needed(h2_task_output *output, ap_filter_t *f, /* This happens currently when ap_die(status, r) is invoked * by a read request filter. */ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, APLOGNO(03204) - "h2_task_output(%s): write without response " + "h2_task_output(%s): write without response by %s " "for %s %s %s", - output->task->id, output->task->request->method, + output->task->id, caller, + output->task->request->method, output->task->request->authority, output->task->request->path); f->c->aborted = 1; @@ -108,6 +101,11 @@ static apr_status_t open_if_needed(h2_task_output *output, ap_filter_t *f, h2_task_logio_add_bytes_out(f->c, bytes_written); } get_trailers(output); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, APLOGNO(03204) + "h2_task_output(%s): open as needed %s %s %s", + output->task->id, output->task->request->method, + output->task->request->authority, + output->task->request->path); return h2_mplx_out_open(output->task->mplx, output->task->stream_id, response, f, bb, output->task->io); } @@ -116,19 +114,19 @@ static apr_status_t open_if_needed(h2_task_output *output, ap_filter_t *f, void h2_task_output_close(h2_task_output *output) { - open_if_needed(output, NULL, NULL); + open_if_needed(output, NULL, NULL, "close"); if (output->state != H2_TASK_OUT_DONE) { + if (output->task->frozen_out + && !APR_BRIGADE_EMPTY(output->task->frozen_out)) { + h2_mplx_out_write(output->task->mplx, output->task->stream_id, + NULL, output->task->frozen_out, NULL, NULL); + } h2_mplx_out_close(output->task->mplx, output->task->stream_id, get_trailers(output)); output->state = H2_TASK_OUT_DONE; } } -int h2_task_output_has_started(h2_task_output *output) -{ - return output->state >= H2_TASK_OUT_STARTED; -} - /* Bring the data from the brigade (which represents the result of the * request_rec out filter chain) into the h2_mplx for further sending * on the master connection. @@ -144,7 +142,14 @@ apr_status_t h2_task_output_write(h2_task_output *output, return APR_SUCCESS; } - status = open_if_needed(output, f, bb); + if (output->task->frozen) { + h2_util_bb_log(output->c, output->task->stream_id, APLOG_TRACE2, + "frozen task output write", bb); + APR_BRIGADE_CONCAT(output->task->frozen_out, bb); + return APR_SUCCESS; + } + + status = open_if_needed(output, f, bb, "write"); if (status != APR_EOF) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, "h2_task_output(%s): opened and passed brigade", diff --git a/modules/http2/h2_task_output.h b/modules/http2/h2_task_output.h index eed1f0123ce..aa719cdeea3 100644 --- a/modules/http2/h2_task_output.h +++ b/modules/http2/h2_task_output.h @@ -44,14 +44,13 @@ struct h2_task_output { h2_task_output *h2_task_output_create(struct h2_task *task, conn_rec *c); -void h2_task_output_destroy(h2_task_output *output); - apr_status_t h2_task_output_write(h2_task_output *output, ap_filter_t* filter, apr_bucket_brigade* brigade); void h2_task_output_close(h2_task_output *output); -int h2_task_output_has_started(h2_task_output *output); +apr_status_t h2_task_output_freeze(h2_task_output *output); +apr_status_t h2_task_output_thaw(h2_task_output *output); #endif /* defined(__mod_h2__h2_task_output__) */ diff --git a/modules/http2/h2_worker.c b/modules/http2/h2_worker.c index 404a6046862..56feec118dd 100644 --- a/modules/http2/h2_worker.c +++ b/modules/http2/h2_worker.c @@ -35,24 +35,9 @@ static void* APR_THREAD_FUNC execute(apr_thread_t *thread, void *wctx) { h2_worker *worker = (h2_worker *)wctx; apr_status_t status; - apr_allocator_t *task_allocator = NULL; - apr_pool_t *task_pool; + apr_pool_t *task_pool = NULL; (void)thread; - - /* We create a root pool with its own allocator to be used for - * processing a request. This is the only way to have the processing - * independant of the worker pool as the h2_mplx pool as well as - * not sensitive to which thread it is in. - * In that sense, memory allocation and lifetime is similar to a master - * connection. - * The main goal in this is that slave connections and requests will - * - one day - be suspended and resumed in different threads. - */ - apr_allocator_create(&task_allocator); - apr_pool_create_ex(&task_pool, NULL, NULL, task_allocator); - apr_allocator_owner_set(task_allocator, task_pool); - /* Other code might want to see a socket for this connection this * worker processes. Allocate one without further function... */ @@ -78,6 +63,22 @@ static void* APR_THREAD_FUNC execute(apr_thread_t *thread, void *wctx) conn_rec *c, *master = m->c; int stream_id = req->id; + if (!task_pool) { + apr_allocator_t *task_allocator = NULL; + /* We create a root pool with its own allocator to be used for + * processing a request. This is the only way to have the processing + * independant of the worker pool as the h2_mplx pool as well as + * not sensitive to which thread it is in. + * In that sense, memory allocation and lifetime is similar to a master + * connection. + * The main goal in this is that slave connections and requests will + * - one day - be suspended and resumed in different threads. + */ + apr_allocator_create(&task_allocator); + apr_pool_create_ex(&task_pool, NULL, NULL, task_allocator); + apr_allocator_owner_set(task_allocator, task_pool); + } + c = h2_slave_create(master, task_pool, worker->thread, worker->socket); if (!c) { @@ -92,8 +93,13 @@ static void* APR_THREAD_FUNC execute(apr_thread_t *thread, void *wctx) task = h2_task_create(m->id, req, task_pool, m); h2_ctx_create_for(c, task); h2_task_do(task, c, worker->io, worker->socket); - task = NULL; + if (task->frozen) { + /* this task was handed over to someone else for + * processing */ + task_pool = NULL; + } + task = NULL; apr_thread_cond_signal(worker->io); } @@ -103,7 +109,9 @@ static void* APR_THREAD_FUNC execute(apr_thread_t *thread, void *wctx) * long as it has requests to handle. Might no be fair to * other mplx's. Perhaps leave after n requests? */ req = NULL; - apr_pool_clear(task_pool); + if (task_pool) { + apr_pool_clear(task_pool); + } h2_mplx_request_done(&m, stream_id, worker->aborted? NULL : &req); } } diff --git a/modules/http2/mod_http2.c b/modules/http2/mod_http2.c index 8d26cc241a9..097a372096a 100644 --- a/modules/http2/mod_http2.c +++ b/modules/http2/mod_http2.c @@ -35,6 +35,7 @@ #include "h2_config.h" #include "h2_ctx.h" #include "h2_h2.h" +#include "h2_mplx.h" #include "h2_push.h" #include "h2_request.h" #include "h2_switch.h" @@ -121,6 +122,57 @@ static char *http2_var_lookup(apr_pool_t *, server_rec *, conn_rec *, request_rec *, char *name); static int http2_is_h2(conn_rec *); +static apr_status_t http2_req_engine_push(const char *engine_type, + request_rec *r, + h2_req_engine_init *einit) +{ + h2_ctx *ctx = h2_ctx_rget(r); + if (ctx) { + h2_task *task = h2_ctx_get_task(ctx); + if (task) { + return h2_mplx_engine_push(task->mplx, task, engine_type, r, einit); + } + } + return APR_EINVAL; +} + +static apr_status_t http2_req_engine_pull(h2_req_engine *engine, + apr_time_t timeout, request_rec **pr) +{ + h2_ctx *ctx = h2_ctx_get(engine->c, 0); + if (ctx) { + h2_task *task = h2_ctx_get_task(ctx); + if (task) { + return h2_mplx_engine_pull(task->mplx, task, engine, timeout, pr); + } + } + return APR_ECONNABORTED; +} + +static void http2_req_engine_done(h2_req_engine *engine, conn_rec *r_conn) +{ + h2_ctx *ctx = h2_ctx_get(r_conn, 0); + if (ctx) { + h2_task *task = h2_ctx_get_task(ctx); + if (task) { + h2_mplx_engine_done(task->mplx, task, r_conn); + /* task is destroyed */ + } + } +} + +static void http2_req_engine_exit(h2_req_engine *engine) +{ + h2_ctx *ctx = h2_ctx_get(engine->c, 0); + if (ctx) { + h2_task *task = h2_ctx_get_task(ctx); + if (task) { + h2_mplx_engine_exit(task->mplx, task, engine); + } + } +} + + /* Runs once per created child process. Perform any process * related initionalization here. */ @@ -143,6 +195,10 @@ static void h2_hooks(apr_pool_t *pool) APR_REGISTER_OPTIONAL_FN(http2_is_h2); APR_REGISTER_OPTIONAL_FN(http2_var_lookup); + APR_REGISTER_OPTIONAL_FN(http2_req_engine_push); + APR_REGISTER_OPTIONAL_FN(http2_req_engine_pull); + APR_REGISTER_OPTIONAL_FN(http2_req_engine_done); + APR_REGISTER_OPTIONAL_FN(http2_req_engine_exit); ap_log_perror(APLOG_MARK, APLOG_TRACE1, 0, pool, "installing hooks"); diff --git a/modules/http2/mod_http2.h b/modules/http2/mod_http2.h index 0be8170968d..8055320ef5a 100644 --- a/modules/http2/mod_http2.h +++ b/modules/http2/mod_http2.h @@ -18,13 +18,97 @@ /** The http2_var_lookup() optional function retrieves HTTP2 environment * variables. */ -APR_DECLARE_OPTIONAL_FN(char *, http2_var_lookup, - (apr_pool_t *, server_rec *, - conn_rec *, request_rec *, - char *)); +APR_DECLARE_OPTIONAL_FN(char *, + http2_var_lookup, (apr_pool_t *, server_rec *, + conn_rec *, request_rec *, char *)); /** An optional function which returns non-zero if the given connection * or its master connection is using HTTP/2. */ -APR_DECLARE_OPTIONAL_FN(int, http2_is_h2, (conn_rec *)); +APR_DECLARE_OPTIONAL_FN(int, + http2_is_h2, (conn_rec *)); + + +/******************************************************************************* + * HTTP/2 slave engines + ******************************************************************************/ + +typedef struct h2_req_engine h2_req_engine; + +/** + * Initialize a h2_req_engine. The structure will be passed in but + * only the name and master are set. The function should initialize + * all fields. + * @param engine the allocated, partially filled structure + * @param r the first request to process, or NULL + */ +typedef apr_status_t h2_req_engine_init(h2_req_engine *engine, request_rec *r); + +/** + * The public structure of a h2_req_engine. It gets allocated by the http2 + * infrastructure, assigned id, type, pool and connection and passed to the + * h2_req_engine_init() callback to complete initialization. + * This happens whenever a new request gets "push"ed for an engine type and + * no instance, or no free instance, for the type is available. + */ +struct h2_req_engine { + int id; /* identifier, unique for a master connection */ + const char *type; /* name of the engine type */ + apr_pool_t *pool; /* pool for engine specific allocations */ + conn_rec *c; /* connection this engine is assigned to */ + apr_size_t r_capacity; /* request capacity engine is willing to handle, + may change between invocations. If the engine + sets this to 0, it signals that it no longer + wants more requests. New requests, already + scheduled for this engine might still arrive for + a time. */ + apr_size_t r_count; /* number of request currently assigned, it is the + responsibility of the engine to update this. */ + void *data; /* engine specific data */ +}; + +/** + * Push a request to an engine with the specified name for further processing. + * If no such engine is available, einit is not NULL, einit is called + * with a new engine record and the caller is responsible for running the + * new engine instance. + * @param engine_type the type of the engine to add the request to + * @param r the request to push to an engine for processing + * @param einit an optional initialization callback for a new engine + * of the requested type, should no instance be available. + * By passing a non-NULL callback, the caller is willing + * to init and run a new engine itself. + * @return APR_SUCCESS iff slave was successfully added to an engine + */ +APR_DECLARE_OPTIONAL_FN(apr_status_t, + http2_req_engine_push, (const char *engine_type, + request_rec *r, + h2_req_engine_init *einit)); + +/** + * Get a new request for processing in this engine. + * @param engine the engine which is done processing the slave + * @param timeout wait a maximum amount of time for a new slave, 0 will not wait + * @param pslave the slave connection that needs processing or NULL + * @return APR_SUCCESS if new request was assigned + * APR_EAGAIN/APR_TIMEUP if no new request is available + * APR_ECONNABORTED if the engine needs to shut down + */ +APR_DECLARE_OPTIONAL_FN(apr_status_t, + http2_req_engine_pull, (h2_req_engine *engine, + apr_time_t timeout, + request_rec **pr)); +APR_DECLARE_OPTIONAL_FN(void, + http2_req_engine_done, (h2_req_engine *engine, + conn_rec *rconn)); +/** + * The given request engine is done processing and needs to be excluded + * from further handling. + * @param engine the engine to exit + */ +APR_DECLARE_OPTIONAL_FN(void, + http2_req_engine_exit, (h2_req_engine *engine)); + + +#define H2_TASK_ID_NOTE "http2-task-id" #endif diff --git a/modules/http2/mod_proxy_http2.c b/modules/http2/mod_proxy_http2.c index 0ab6dd07fc4..1130d357c59 100644 --- a/modules/http2/mod_proxy_http2.c +++ b/modules/http2/mod_proxy_http2.c @@ -17,6 +17,7 @@ #include #include +#include #include "mod_proxy_http2.h" @@ -37,6 +38,30 @@ AP_DECLARE_MODULE(proxy_http2) = { register_hook /* register hooks */ }; +/* Optional functions from mod_http2 */ +static int (*is_h2)(conn_rec *c); +static apr_status_t (*req_engine_push)(const char *name, request_rec *r, + h2_req_engine_init *einit); +static apr_status_t (*req_engine_pull)(h2_req_engine *engine, + apr_time_t timeout, request_rec **pr); +static void (*req_engine_done)(h2_req_engine *engine, conn_rec *r_conn); +static void (*req_engine_exit)(h2_req_engine *engine); + +typedef struct h2_proxy_ctx { + conn_rec *owner; + server_rec *server; + const char *proxy_func; + char server_portstr[32]; + proxy_conn_rec *p_conn; + proxy_worker *worker; + proxy_server_conf *conf; + + h2_req_engine *engine; + unsigned standalone : 1; + unsigned is_ssl : 1; + unsigned flushall : 1; +} h2_proxy_ctx; + static int h2_proxy_post_config(apr_pool_t *p, apr_pool_t *plog, apr_pool_t *ptemp, server_rec *s) { @@ -58,6 +83,21 @@ static int h2_proxy_post_config(apr_pool_t *p, apr_pool_t *plog, "mod_proxy_http2 (v%s, nghttp2 %s), initializing...", MOD_HTTP2_VERSION, ngh2? ngh2->version_str : "unknown"); + is_h2 = APR_RETRIEVE_OPTIONAL_FN(http2_is_h2); + req_engine_push = APR_RETRIEVE_OPTIONAL_FN(http2_req_engine_push); + req_engine_pull = APR_RETRIEVE_OPTIONAL_FN(http2_req_engine_pull); + req_engine_done = APR_RETRIEVE_OPTIONAL_FN(http2_req_engine_done); + req_engine_exit = APR_RETRIEVE_OPTIONAL_FN(http2_req_engine_exit); + + /* we need all of them */ + if (!req_engine_push || !req_engine_pull + || !req_engine_done || !req_engine_exit) { + req_engine_push = NULL; + req_engine_pull = NULL; + req_engine_done = NULL; + req_engine_exit = NULL; + } + return status; } @@ -147,59 +187,111 @@ static int proxy_http2_canon(request_rec *r, char *url) return OK; } -static apr_status_t proxy_http2_cleanup(const char *scheme, request_rec *r, - proxy_conn_rec *backend) +static apr_status_t proxy_engine_init(h2_req_engine *engine, request_rec *r) { - ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, "cleanup, releasing connection"); - ap_proxy_release_connection(scheme, backend, r->server); - return OK; + h2_proxy_ctx *ctx = ap_get_module_config(engine->c->conn_config, + &proxy_http2_module); + if (ctx) { + ctx->engine = engine; + return APR_SUCCESS; + } + return APR_ENOTIMPL; } -static -int proxy_http2_process_stream(apr_pool_t *p, const char *url, request_rec *r, - proxy_conn_rec **pp_conn, proxy_worker *worker, - proxy_server_conf *conf, char *server_portstr, - int flushall) -{ - int rv = APR_ENOTIMPL; - proxy_conn_rec *p_conn = *pp_conn; +static int proxy_engine_run(h2_proxy_ctx *ctx, request_rec *r) { + int status = OK; h2_proxy_session *session; h2_proxy_stream *stream; - session = h2_proxy_session_setup(r, *pp_conn, conf); + /* Step Two: Make the Connection (or check that an already existing + * socket is still usable). On success, we have a socket connected to + * backend->hostname. */ + if (ap_proxy_connect_backend(ctx->proxy_func, ctx->p_conn, ctx->worker, + ctx->server)) { + ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, ctx->owner, APLOGNO() + "H2: failed to make connection to backend: %s", + ctx->p_conn->hostname); + return HTTP_SERVICE_UNAVAILABLE; + } + + /* Step Three: Create conn_rec for the socket we have open now. */ + if (!ctx->p_conn->connection) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, ctx->owner, APLOGNO() + "setup new connection: is_ssl=%d %s %s %s", + ctx->p_conn->is_ssl, ctx->p_conn->ssl_hostname, + r->hostname, ctx->p_conn->hostname); + if ((status = ap_proxy_connection_create(ctx->proxy_func, ctx->p_conn, + ctx->owner, + ctx->server)) != OK) { + return status; + } + + /* + * On SSL connections set a note on the connection what CN is + * requested, such that mod_ssl can check if it is requested to do + * so. + */ + if (ctx->p_conn->ssl_hostname) { + apr_table_setn(ctx->p_conn->connection->notes, + "proxy-request-hostname", ctx->p_conn->ssl_hostname); + } + + if (ctx->is_ssl) { + apr_table_setn(ctx->p_conn->connection->notes, + "proxy-request-alpn-protos", "h2"); + } + } + + /* Step Four: Send the Request in a new HTTP/2 stream and + * loop until we got the response or encounter errors. + */ + status = APR_ENOTIMPL; + session = h2_proxy_session_setup(r, ctx->p_conn, ctx->conf); if (!session) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, p_conn->connection, + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->p_conn->connection, "session unavailable"); return HTTP_SERVICE_UNAVAILABLE; } - /* TODO - * - enter http2 client processing loop: - * - send any input in datasource callback from r->input_filters - * - await response HEADERs - * - send any DATA to r->output_filters - * - on stream close, check for missing response - * - on certain errors, mark connection for close - */ - rv = h2_proxy_session_open_stream(session, url, r, &stream); - if (rv == OK) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, - "process stream(%d): %s %s%s, original: %s", - stream->id, stream->req->method, - stream->req->authority, stream->req->path, - r->the_request); - rv = h2_proxy_stream_process(stream); + while (r) { + conn_rec *r_conn = r->connection; + const char *url; + + url = apr_table_get(r->notes, H2_PROXY_REQ_URL_NOTE); + status = h2_proxy_session_open_stream(session, url, r, &stream); + if (status == OK) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r_conn, + "process stream(%d): %s %s%s, original: %s", + stream->id, stream->req->method, + stream->req->authority, stream->req->path, + r->the_request); + status = h2_proxy_stream_process(stream); + } + r = NULL; + + if (status != OK) { + ap_log_cerror(APLOG_MARK, APLOG_ERR, status, r_conn, APLOGNO() + "pass request body failed to %pI (%s) from %s (%s)", + ctx->p_conn->addr, ctx->p_conn->hostname ? + ctx->p_conn->hostname: "", session->c->client_ip, + session->c->remote_host ? session->c->remote_host: ""); + } + + if (!ctx->standalone && req_engine_done && r_conn != ctx->owner) { + req_engine_done(ctx->engine, r_conn); + } + r_conn = NULL; + + if (!ctx->standalone && req_engine_pull) { + status = req_engine_pull(ctx->engine, ctx->server->timeout, &r); + if (status != APR_SUCCESS) { + status = APR_SUCCESS; + break; + } + } } - if (rv != OK) { - conn_rec *c = r->connection; - ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO() - "pass request body failed to %pI (%s) from %s (%s)", - p_conn->addr, p_conn->hostname ? p_conn->hostname: "", - c->client_ip, c->remote_host ? c->remote_host: ""); - } - - return rv; + return status; } static int proxy_http2_handler(request_rec *r, @@ -209,18 +301,17 @@ static int proxy_http2_handler(request_rec *r, const char *proxyname, apr_port_t proxyport) { - const char *proxy_function; - proxy_conn_rec *backend; + const char *proxy_func; char *locurl = url, *u; apr_size_t slen; int is_ssl = 0; - int flushall = 0; - int status; - char server_portstr[32]; + apr_status_t status; conn_rec *c = r->connection; - apr_pool_t *p = r->pool; + server_rec *s = r->server; + apr_pool_t *p = c->pool; apr_uri_t *uri = apr_palloc(p, sizeof(*uri)); - conn_rec *backconn; + h2_proxy_ctx *ctx; + const char *engine_type, *hostname; /* find the scheme */ if ((url[0] != 'h' && url[0] != 'H') || url[1] != '2') { @@ -233,22 +324,30 @@ static int proxy_http2_handler(request_rec *r, slen = (u - url); switch(slen) { case 2: - proxy_function = "H2"; + proxy_func = "H2"; is_ssl = 1; break; case 3: if (url[2] != 'c' && url[2] != 'C') { return DECLINED; } - proxy_function = "H2C"; + proxy_func = "H2C"; break; default: return DECLINED; } - if (apr_table_get(r->subprocess_env, "proxy-flushall")) { - flushall = 1; - } + ctx = apr_pcalloc(p, sizeof(*ctx)); + ctx->owner = c; + ctx->server = s; + ctx->proxy_func = proxy_func; + ctx->is_ssl = is_ssl; + ctx->worker = worker; + ctx->conf = conf; + ctx->flushall = apr_table_get(r->subprocess_env, "proxy-flushall")? 1 : 0; + + ap_set_module_config(c->conn_config, &proxy_http2_module, ctx); + apr_table_setn(r->notes, H2_PROXY_REQ_URL_NOTE, url); /* scheme says, this is for us. */ ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, "H2: serving URL %s", url); @@ -256,89 +355,82 @@ static int proxy_http2_handler(request_rec *r, /* Get a proxy_conn_rec from the worker, might be a new one, might * be one still open from another request, or it might fail if the * worker is stopped or in error. */ - if ((status = ap_proxy_acquire_connection(proxy_function, &backend, - worker, r->server)) != OK) { + if ((status = ap_proxy_acquire_connection(ctx->proxy_func, &ctx->p_conn, + ctx->worker, s)) != OK) { goto cleanup; } - backend->is_ssl = is_ssl; - if (is_ssl) { + ctx->p_conn->is_ssl = ctx->is_ssl; + if (ctx->is_ssl) { /* If there is still some data on an existing ssl connection, now * would be a good timne to get rid of it. */ - ap_proxy_ssl_connection_cleanup(backend, r); + ap_proxy_ssl_connection_cleanup(ctx->p_conn, r); } /* Step One: Determine the URL to connect to (might be a proxy), * initialize the backend accordingly and determine the server * port string we can expect in responses. */ - if ((status = ap_proxy_determine_connection(p, r, conf, worker, backend, + if ((status = ap_proxy_determine_connection(p, r, conf, worker, ctx->p_conn, uri, &locurl, proxyname, - proxyport, server_portstr, - sizeof(server_portstr))) != OK) { + proxyport, ctx->server_portstr, + sizeof(ctx->server_portstr))) != OK) { goto cleanup; } - /* Step Two: Make the Connection (or check that an already existing - * socket is still usable). On success, we have a socket connected to - * backend->hostname. */ - if (ap_proxy_connect_backend(proxy_function, backend, worker, r->server)) { - ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO() - "H2: failed to make connection to backend: %s", - backend->hostname); - status = HTTP_SERVICE_UNAVAILABLE; - goto cleanup; - } + hostname = (ctx->p_conn->ssl_hostname? + ctx->p_conn->ssl_hostname : ctx->p_conn->hostname); + engine_type = apr_psprintf(p, "proxy_http2 %s%s", hostname, ctx->server_portstr); - /* Step Three: Create conn_rec for the socket we have open now. */ - backconn = backend->connection; - if (!backconn) { - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO() - "setup new connection: is_ssl=%d %s %s %s", - backend->is_ssl, - backend->ssl_hostname, r->hostname, backend->hostname); - if ((status = ap_proxy_connection_create(proxy_function, backend, - c, r->server)) != OK) { - goto cleanup; - } - backconn = backend->connection; - - /* - * On SSL connections set a note on the connection what CN is - * requested, such that mod_ssl can check if it is requested to do - * so. + if (c->master && req_engine_push && is_h2 && is_h2(ctx->owner)) { + /* If we are have req_engine capabilities, push the handling of this + * request (e.g. slave connection) to a proxy_http2 engine which uses + * the same backend. We may be called to create an engine ourself. */ - if (backend->ssl_hostname) { - apr_table_setn(backend->connection->notes, - "proxy-request-hostname", backend->ssl_hostname); - } - - if (backend->is_ssl) { - apr_table_setn(backend->connection->notes, - "proxy-request-alpn-protos", "h2"); + status = req_engine_push(engine_type, r, proxy_engine_init); + ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, + "H2: pushing request %s to engine type %s", + url, engine_type); + if (status == APR_SUCCESS && ctx->engine == NULL) { + /* Another engine instance has taken over processing of this + * request. */ + goto cleanup; } } - - /* Step Four: Send the Request in a new HTTP/2 stream and - * loop until we got the response or encounter errors. - */ - if ((status = proxy_http2_process_stream(p, url, r, &backend, worker, - conf, server_portstr, - flushall)) != OK) { - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO() - "H2: failed to process request: %s", - r->the_request); + + if (!ctx->engine) { + /* No engine was available or has been initialized, handle this + * request just by ourself. */ + h2_req_engine *engine = apr_pcalloc(p, sizeof(*engine)); + engine->id = 0; + engine->type = engine_type; + engine->pool = p; + engine->c = c; + ctx->engine = engine; + ctx->standalone = 1; } + + status = proxy_engine_run(ctx, r); - /* clean up before return */ cleanup: - if (backend) { - if (status != OK) { - backend->close = 1; + if (ctx->engine && !ctx->standalone && req_engine_exit) { + req_engine_exit(ctx->engine); + } + ctx->engine = NULL; + + if (ctx) { + if (ctx->p_conn) { + if (status != OK) { + ctx->p_conn->close = 1; + } + proxy_run_detach_backend(r, ctx->p_conn); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "cleanup, releasing connection"); + ap_proxy_release_connection(ctx->proxy_func, ctx->p_conn, ctx->server); } - proxy_run_detach_backend(r, backend); - proxy_http2_cleanup(proxy_function, r, backend); + ctx->worker = NULL; + ctx->conf = NULL; + ctx->p_conn = NULL; } - ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, "leaving handler"); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, "leaving handler"); return status; }