From: Stefan Eissing Date: Mon, 10 Apr 2017 13:21:43 +0000 (+0000) Subject: On the trunk: X-Git-Tag: 2.5.0-alpha~491 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=8895bd2f8cf68c9e0aca681abd64b81930ad5663;p=thirdparty%2Fapache%2Fhttpd.git On the trunk: mod_proxy_http2: Reliability of reconnect handling improved. git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1790826 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/CHANGES b/CHANGES index 5f05476f643..508efcbc86b 100644 --- a/CHANGES +++ b/CHANGES @@ -5,7 +5,8 @@ Changes with Apache 2.5.0 [Stefan Eissing] *) mod_proxy_http2: Fixed bug in re-attempting proxy requests after - connection error. [Stefan Eissing] + connection error. Reliability of reconnect handling improved. + [Stefan Eissing] *) core: Disallow multiple Listen on the same IP:port when listener buckets are configured (ListenCoresBucketsRatio > 0), consistently with the single diff --git a/docs/log-message-tags/next-number b/docs/log-message-tags/next-number index 651b524a65d..ac4e051f0ae 100644 --- a/docs/log-message-tags/next-number +++ b/docs/log-message-tags/next-number @@ -1 +1 @@ -10022 +10024 diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 73019faeebd..357bf5eaadf 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -787,7 +787,7 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) if (task->engine) { if (!m->aborted && !task->c->aborted && !h2_req_engine_is_shutdown(task->engine)) { - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(10022) "h2_mplx(%ld): task(%s) has not-shutdown " "engine(%s)", m->id, task->id, h2_req_engine_get_id(task->engine)); diff --git a/modules/http2/h2_proxy_util.c b/modules/http2/h2_proxy_util.c index b92a876f42b..206020fd873 100644 --- a/modules/http2/h2_proxy_util.c +++ b/modules/http2/h2_proxy_util.c @@ -16,6 +16,8 @@ #include #include #include +#include +#include #include #include @@ -1053,3 +1055,282 @@ const char *h2_proxy_link_reverse_map(request_rec *r, "link_reverse_map %s --> %s", s, ctx.s); return ctx.s; } + +/******************************************************************************* + * FIFO queue + ******************************************************************************/ + +struct h2_proxy_fifo { + void **elems; + int nelems; + int set; + int head; + int count; + int aborted; + apr_thread_mutex_t *lock; + apr_thread_cond_t *not_empty; + apr_thread_cond_t *not_full; +}; + +static int nth_index(h2_proxy_fifo *fifo, int n) +{ + return (fifo->head + n) % fifo->nelems; +} + +static apr_status_t fifo_destroy(void *data) +{ + h2_proxy_fifo *fifo = data; + + apr_thread_cond_destroy(fifo->not_empty); + apr_thread_cond_destroy(fifo->not_full); + apr_thread_mutex_destroy(fifo->lock); + + return APR_SUCCESS; +} + +static int index_of(h2_proxy_fifo *fifo, void *elem) +{ + int i; + + for (i = 0; i < fifo->count; ++i) { + if (elem == fifo->elems[nth_index(fifo, i)]) { + return i; + } + } + return -1; +} + +static apr_status_t create_int(h2_proxy_fifo **pfifo, apr_pool_t *pool, + int capacity, int as_set) +{ + apr_status_t rv; + h2_proxy_fifo *fifo; + + fifo = apr_pcalloc(pool, sizeof(*fifo)); + if (fifo == NULL) { + return APR_ENOMEM; + } + + rv = apr_thread_mutex_create(&fifo->lock, + APR_THREAD_MUTEX_UNNESTED, pool); + if (rv != APR_SUCCESS) { + return rv; + } + + rv = apr_thread_cond_create(&fifo->not_empty, pool); + if (rv != APR_SUCCESS) { + return rv; + } + + rv = apr_thread_cond_create(&fifo->not_full, pool); + if (rv != APR_SUCCESS) { + return rv; + } + + fifo->elems = apr_pcalloc(pool, capacity * sizeof(void*)); + if (fifo->elems == NULL) { + return APR_ENOMEM; + } + fifo->nelems = capacity; + fifo->set = as_set; + + *pfifo = fifo; + apr_pool_cleanup_register(pool, fifo, fifo_destroy, apr_pool_cleanup_null); + + return APR_SUCCESS; +} + +apr_status_t h2_proxy_fifo_create(h2_proxy_fifo **pfifo, apr_pool_t *pool, int capacity) +{ + return create_int(pfifo, pool, capacity, 0); +} + +apr_status_t h2_proxy_fifo_set_create(h2_proxy_fifo **pfifo, apr_pool_t *pool, int capacity) +{ + return create_int(pfifo, pool, capacity, 1); +} + +apr_status_t h2_proxy_fifo_term(h2_proxy_fifo *fifo) +{ + apr_status_t rv; + if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) { + fifo->aborted = 1; + apr_thread_mutex_unlock(fifo->lock); + } + return rv; +} + +apr_status_t h2_proxy_fifo_interrupt(h2_proxy_fifo *fifo) +{ + apr_status_t rv; + if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) { + apr_thread_cond_broadcast(fifo->not_empty); + apr_thread_cond_broadcast(fifo->not_full); + apr_thread_mutex_unlock(fifo->lock); + } + return rv; +} + +int h2_proxy_fifo_count(h2_proxy_fifo *fifo) +{ + return fifo->count; +} + +int h2_proxy_fifo_capacity(h2_proxy_fifo *fifo) +{ + return fifo->nelems; +} + +static apr_status_t check_not_empty(h2_proxy_fifo *fifo, int block) +{ + if (fifo->count == 0) { + if (!block) { + return APR_EAGAIN; + } + while (fifo->count == 0) { + if (fifo->aborted) { + return APR_EOF; + } + apr_thread_cond_wait(fifo->not_empty, fifo->lock); + } + } + return APR_SUCCESS; +} + +static apr_status_t fifo_push(h2_proxy_fifo *fifo, void *elem, int block) +{ + apr_status_t rv; + + if (fifo->aborted) { + return APR_EOF; + } + + if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) { + if (fifo->set && index_of(fifo, elem) >= 0) { + /* set mode, elem already member */ + apr_thread_mutex_unlock(fifo->lock); + return APR_EEXIST; + } + else if (fifo->count == fifo->nelems) { + if (block) { + while (fifo->count == fifo->nelems) { + if (fifo->aborted) { + apr_thread_mutex_unlock(fifo->lock); + return APR_EOF; + } + apr_thread_cond_wait(fifo->not_full, fifo->lock); + } + } + else { + apr_thread_mutex_unlock(fifo->lock); + return APR_EAGAIN; + } + } + + ap_assert(fifo->count < fifo->nelems); + fifo->elems[nth_index(fifo, fifo->count)] = elem; + ++fifo->count; + if (fifo->count == 1) { + apr_thread_cond_broadcast(fifo->not_empty); + } + apr_thread_mutex_unlock(fifo->lock); + } + return rv; +} + +apr_status_t h2_proxy_fifo_push(h2_proxy_fifo *fifo, void *elem) +{ + return fifo_push(fifo, elem, 1); +} + +apr_status_t h2_proxy_fifo_try_push(h2_proxy_fifo *fifo, void *elem) +{ + return fifo_push(fifo, elem, 0); +} + +static void *pull_head(h2_proxy_fifo *fifo) +{ + void *elem; + + ap_assert(fifo->count > 0); + elem = fifo->elems[fifo->head]; + --fifo->count; + if (fifo->count > 0) { + fifo->head = nth_index(fifo, 1); + if (fifo->count+1 == fifo->nelems) { + apr_thread_cond_broadcast(fifo->not_full); + } + } + return elem; +} + +static apr_status_t fifo_pull(h2_proxy_fifo *fifo, void **pelem, int block) +{ + apr_status_t rv; + + if (fifo->aborted) { + return APR_EOF; + } + + if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) { + if ((rv = check_not_empty(fifo, block)) != APR_SUCCESS) { + apr_thread_mutex_unlock(fifo->lock); + *pelem = NULL; + return rv; + } + + ap_assert(fifo->count > 0); + *pelem = pull_head(fifo); + + apr_thread_mutex_unlock(fifo->lock); + } + return rv; +} + +apr_status_t h2_proxy_fifo_pull(h2_proxy_fifo *fifo, void **pelem) +{ + return fifo_pull(fifo, pelem, 1); +} + +apr_status_t h2_proxy_fifo_try_pull(h2_proxy_fifo *fifo, void **pelem) +{ + return fifo_pull(fifo, pelem, 0); +} + +apr_status_t h2_proxy_fifo_remove(h2_proxy_fifo *fifo, void *elem) +{ + apr_status_t rv; + + if (fifo->aborted) { + return APR_EOF; + } + + if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) { + int i, rc; + void *e; + + rc = 0; + for (i = 0; i < fifo->count; ++i) { + e = fifo->elems[nth_index(fifo, i)]; + if (e == elem) { + ++rc; + } + else if (rc) { + fifo->elems[nth_index(fifo, i-rc)] = e; + } + } + if (rc) { + fifo->count -= rc; + if (fifo->count + rc == fifo->nelems) { + apr_thread_cond_broadcast(fifo->not_full); + } + rv = APR_SUCCESS; + } + else { + rv = APR_EAGAIN; + } + + apr_thread_mutex_unlock(fifo->lock); + } + return rv; +} diff --git a/modules/http2/h2_proxy_util.h b/modules/http2/h2_proxy_util.h index f90d14951bc..ea441842567 100644 --- a/modules/http2/h2_proxy_util.h +++ b/modules/http2/h2_proxy_util.h @@ -201,4 +201,55 @@ const char *h2_proxy_link_reverse_map(request_rec *r, const char *proxy_server_uri, const char *s); +/******************************************************************************* + * FIFO queue + ******************************************************************************/ + +/** + * A thread-safe FIFO queue with some extra bells and whistles, if you + * do not need anything special, better use 'apr_queue'. + */ +typedef struct h2_proxy_fifo h2_proxy_fifo; + +/** + * Create a FIFO queue that can hold up to capacity elements. Elements can + * appear several times. + */ +apr_status_t h2_proxy_fifo_create(h2_proxy_fifo **pfifo, apr_pool_t *pool, int capacity); + +/** + * Create a FIFO set that can hold up to capacity elements. Elements only + * appear once. Pushing an element already present does not change the + * queue and is successful. + */ +apr_status_t h2_proxy_fifo_set_create(h2_proxy_fifo **pfifo, apr_pool_t *pool, int capacity); + +apr_status_t h2_proxy_fifo_term(h2_proxy_fifo *fifo); +apr_status_t h2_proxy_fifo_interrupt(h2_proxy_fifo *fifo); + +int h2_proxy_fifo_capacity(h2_proxy_fifo *fifo); +int h2_proxy_fifo_count(h2_proxy_fifo *fifo); + +/** + * Push en element into the queue. Blocks if there is no capacity left. + * + * @param fifo the FIFO queue + * @param elem the element to push + * @return APR_SUCCESS on push, APR_EAGAIN on try_push on a full queue, + * APR_EEXIST when in set mode and elem already there. + */ +apr_status_t h2_proxy_fifo_push(h2_proxy_fifo *fifo, void *elem); +apr_status_t h2_proxy_fifo_try_push(h2_proxy_fifo *fifo, void *elem); + +apr_status_t h2_proxy_fifo_pull(h2_proxy_fifo *fifo, void **pelem); +apr_status_t h2_proxy_fifo_try_pull(h2_proxy_fifo *fifo, void **pelem); + +/** + * Remove the elem from the queue, will remove multiple appearances. + * @param elem the element to remove + * @return APR_SUCCESS iff > 0 elems were removed, APR_EAGAIN otherwise. + */ +apr_status_t h2_proxy_fifo_remove(h2_proxy_fifo *fifo, void *elem); + + #endif /* defined(__mod_h2__h2_proxy_util__) */ diff --git a/modules/http2/mod_proxy_http2.c b/modules/http2/mod_proxy_http2.c index 6f9fbecf1b7..86a8c3dc5e1 100644 --- a/modules/http2/mod_proxy_http2.c +++ b/modules/http2/mod_proxy_http2.c @@ -26,6 +26,8 @@ #include "h2_version.h" #include "h2_proxy_session.h" +#define H2MIN(x,y) ((x) < (y) ? (x) : (y)) + static void register_hook(apr_pool_t *p); AP_DECLARE_MODULE(proxy_http2) = { @@ -65,7 +67,7 @@ typedef struct h2_proxy_ctx { const char *engine_type; apr_pool_t *engine_pool; apr_size_t req_buffer_size; - request_rec *next; + h2_proxy_fifo *requests; int capacity; unsigned standalone : 1; @@ -230,7 +232,7 @@ static apr_status_t proxy_engine_init(h2_req_engine *engine, ctx->engine_type = type; ctx->engine_pool = pool; ctx->req_buffer_size = req_buffer_size; - ctx->capacity = 100; + ctx->capacity = H2MIN(100, h2_proxy_fifo_capacity(ctx->requests)); *pconsumed = out_consumed; *pctx = ctx; @@ -257,10 +259,9 @@ static apr_status_t add_request(h2_proxy_session *session, request_rec *r) return status; } -static void request_done(h2_proxy_session *session, request_rec *r, +static void request_done(h2_proxy_ctx *ctx, request_rec *r, apr_status_t status, int touched) { - h2_proxy_ctx *ctx = session->user_data; const char *task_id = apr_table_get(r->connection->notes, H2_TASK_ID_NOTE); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, r->connection, @@ -269,35 +270,26 @@ static void request_done(h2_proxy_session *session, request_rec *r, if (status != APR_SUCCESS) { if (!touched) { /* untouched request, need rescheduling */ - if (req_engine_push && is_h2 && is_h2(ctx->owner)) { - if (req_engine_push(ctx->engine_type, r, NULL) == APR_SUCCESS) { - /* push to engine */ - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection, - APLOGNO(03369) - "h2_proxy_session(%s): rescheduled request %s", - ctx->engine_id, task_id); - return; - } - } - else if (!ctx->next) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, r->connection, - "h2_proxy_session(%s): retry untouched request", - ctx->engine_id); - ctx->next = r; - } + status = h2_proxy_fifo_push(ctx->requests, r); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, r->connection, + APLOGNO(03369) + "h2_proxy_session(%s): rescheduled request %s", + ctx->engine_id, task_id); + return; } else { const char *uri; uri = apr_uri_unparse(r->pool, &r->parsed_uri, 0); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, r->connection, APLOGNO(03471) "h2_proxy_session(%s): request %s -> %s " - "not complete, was touched", + "not complete, cannot repeat", ctx->engine_id, task_id, uri); } } if (r == ctx->rbase) { - ctx->r_status = (status == APR_SUCCESS)? APR_SUCCESS : HTTP_SERVICE_UNAVAILABLE; + ctx->r_status = ((status == APR_SUCCESS)? APR_SUCCESS + : HTTP_SERVICE_UNAVAILABLE); } if (req_engine_done && ctx->engine) { @@ -309,21 +301,32 @@ static void request_done(h2_proxy_session *session, request_rec *r, } } +static void session_req_done(h2_proxy_session *session, request_rec *r, + apr_status_t status, int touched) +{ + request_done(session->user_data, r, status, touched); +} + static apr_status_t next_request(h2_proxy_ctx *ctx, int before_leave) { - if (ctx->next) { + if (h2_proxy_fifo_count(ctx->requests) > 0) { return APR_SUCCESS; } else if (req_engine_pull && ctx->engine) { apr_status_t status; + request_rec *r = NULL; + status = req_engine_pull(ctx->engine, before_leave? APR_BLOCK_READ: APR_NONBLOCK_READ, - ctx->capacity, &ctx->next); - ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, ctx->owner, - "h2_proxy_engine(%s): pulled request (%s) %s", - ctx->engine_id, - before_leave? "before leave" : "regular", - (ctx->next? ctx->next->the_request : "NULL")); + ctx->capacity, &r); + if (status == APR_SUCCESS && r) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, ctx->owner, + "h2_proxy_engine(%s): pulled request (%s) %s", + ctx->engine_id, + before_leave? "before leave" : "regular", + r->the_request); + h2_proxy_fifo_push(ctx->requests, r); + } return APR_STATUS_IS_EAGAIN(status)? APR_SUCCESS : status; } return APR_EOF; @@ -332,6 +335,7 @@ static apr_status_t next_request(h2_proxy_ctx *ctx, int before_leave) static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) { apr_status_t status = OK; int h2_front; + request_rec *r; /* Step Four: Send the Request in a new HTTP/2 stream and * loop until we got the response or encounter errors. @@ -342,7 +346,7 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) { ctx->session = h2_proxy_session_setup(ctx->engine_id, ctx->p_conn, ctx->conf, h2_front, 30, h2_proxy_log2((int)ctx->req_buffer_size), - request_done); + session_req_done); if (!ctx->session) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, APLOGNO(03372) "session unavailable"); @@ -353,10 +357,9 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) { "eng(%s): run session %s", ctx->engine_id, ctx->session->id); ctx->session->user_data = ctx; - while (1) { - if (ctx->next) { - add_request(ctx->session, ctx->next); - ctx->next = NULL; + while (!ctx->owner->aborted) { + if (APR_SUCCESS == h2_proxy_fifo_try_pull(ctx->requests, (void**)&r)) { + add_request(ctx->session, r); } status = h2_proxy_session_process(ctx->session); @@ -366,7 +369,8 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) { /* ongoing processing, call again */ if (ctx->session->remote_max_concurrent > 0 && ctx->session->remote_max_concurrent != ctx->capacity) { - ctx->capacity = (int)ctx->session->remote_max_concurrent; + ctx->capacity = H2MIN((int)ctx->session->remote_max_concurrent, + h2_proxy_fifo_capacity(ctx->requests)); } s2 = next_request(ctx, 0); if (s2 == APR_ECONNABORTED) { @@ -382,7 +386,8 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) { status = ctx->r_status = APR_SUCCESS; break; } - if (!ctx->next && h2_proxy_ihash_empty(ctx->session->streams)) { + if ((h2_proxy_fifo_count(ctx->requests) == 0) + && h2_proxy_ihash_empty(ctx->session->streams)) { break; } } @@ -396,7 +401,7 @@ static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx) { * a) be reopened on the new session iff safe to do so * b) reported as done (failed) otherwise */ - h2_proxy_session_cleanup(ctx->session, request_done); + h2_proxy_session_cleanup(ctx->session, session_req_done); break; } } @@ -446,7 +451,8 @@ static apr_status_t push_request_somewhere(h2_proxy_ctx *ctx, request_rec *r) ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "H2: hosting engine %s", ctx->engine_id); } - return APR_SUCCESS; + + return h2_proxy_fifo_push(ctx->requests, r); } static int proxy_http2_handler(request_rec *r, @@ -463,7 +469,7 @@ static int proxy_http2_handler(request_rec *r, apr_status_t status; h2_proxy_ctx *ctx; apr_uri_t uri; - int reconnected = 0; + int reconnects = 0; /* find the scheme */ if ((url[0] != 'h' && url[0] != 'H') || url[1] != '2') { @@ -500,8 +506,8 @@ static int proxy_http2_handler(request_rec *r, ctx->conf = conf; ctx->flushall = apr_table_get(r->subprocess_env, "proxy-flushall")? 1 : 0; ctx->r_status = HTTP_SERVICE_UNAVAILABLE; - ctx->next = r; - r = NULL; + + h2_proxy_fifo_set_create(&ctx->requests, ctx->pool, 100); ap_set_module_config(ctx->owner->conn_config, &proxy_http2_module, ctx); @@ -534,11 +540,11 @@ run_connect: /* If we are not already hosting an engine, try to push the request * to an already existing engine or host a new engine here. */ - if (!ctx->engine) { - ctx->r_status = push_request_somewhere(ctx, ctx->next); + if (r && !ctx->engine) { + ctx->r_status = push_request_somewhere(ctx, r); + r = NULL; if (ctx->r_status == SUSPENDED) { /* request was pushed to another thread, leave processing here */ - ctx->next = NULL; goto cleanup; } } @@ -599,7 +605,7 @@ run_session: } reconnect: - if (!reconnected && next_request(ctx, 1) == APR_SUCCESS) { + if (next_request(ctx, 1) == APR_SUCCESS) { /* Still more to do, tear down old conn and start over */ if (ctx->p_conn) { ctx->p_conn->close = 1; @@ -607,8 +613,13 @@ reconnect: ap_proxy_release_connection(ctx->proxy_func, ctx->p_conn, ctx->server); ctx->p_conn = NULL; } - reconnected = 1; /* we do this only once, then fail */ - goto run_connect; + ++reconnects; + if (reconnects < 5 && !ctx->owner->aborted) { + goto run_connect; + } + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->owner, APLOGNO(10023) + "giving up after %d reconnects, %d requests todo", + reconnects, h2_proxy_fifo_count(ctx->requests)); } cleanup: @@ -622,6 +633,11 @@ cleanup: ctx->p_conn = NULL; } + /* Any requests will still have need to fail */ + while (APR_SUCCESS == h2_proxy_fifo_try_pull(ctx->requests, (void**)&r)) { + request_done(ctx, r, HTTP_SERVICE_UNAVAILABLE, true); + } + ap_set_module_config(ctx->owner->conn_config, &proxy_http2_module, NULL); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, ctx->owner, APLOGNO(03377) "leaving handler");