From: Stefan Eissing Date: Thu, 25 Feb 2016 16:19:18 +0000 (+0000) Subject: mod_proxy_http2: stability improvements, timeout blocking read when waiting, new... X-Git-Tag: 2.5.0-alpha~2012 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b47f895958415ae1b28757628417ce68c3b9c10b;p=thirdparty%2Fapache%2Fhttpd.git mod_proxy_http2: stability improvements, timeout blocking read when waiting, new entry list git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1732328 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 90765f58d79..f18b3437aa4 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -17,7 +17,6 @@ #include #include -#include #include #include #include @@ -368,11 +367,14 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) /* iterate until all ios have been orphaned or destroyed */ } - /* Any remaining ios have handed out requests to workers that are - * not done yet. Any operation they do on their assigned stream ios will - * be errored ECONNRESET/ABORTED, so they should find out pretty soon. + /* If we still have busy workers, we cannot release our memory + * pool yet, as slave connections have child pools of their respective + * h2_io's. + * Any remaining ios are processed in these workers. Any operation + * they do on their input/outputs will be errored ECONNRESET/ABORTED, + * so processing them should fail and workers *should* return. */ - for (i = 0; h2_io_set_size(m->stream_ios) > 0; ++i) { + for (i = 0; m->workers_busy > 0; ++i) { m->join_wait = wait; ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): release_join, waiting on %d worker to report back", @@ -388,8 +390,9 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) */ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03198) "h2_mplx(%ld): release, waiting for %d seconds now for " - "all h2_workers to return, have still %d requests outstanding", - m->id, i*wait_secs, (int)h2_io_set_size(m->stream_ios)); + "%d h2_workers to return, have still %d requests outstanding", + m->id, i*wait_secs, m->workers_busy, + (int)h2_io_set_size(m->stream_ios)); if (i == 1) { h2_io_set_iter(m->stream_ios, stream_print, m); } @@ -1130,10 +1133,6 @@ static void task_done(h2_mplx *m, h2_task *task) /* TODO: this will keep a worker attached to this h2_mplx as * long as it has requests to handle. Might no be fair to * other mplx's. Perhaps leave after n requests? */ - - if (task->c) { - apr_pool_destroy(task->c->pool); - } task = NULL; if (io) { io->processing_done = 1; @@ -1176,7 +1175,17 @@ void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask) /******************************************************************************* * HTTP/2 request engines ******************************************************************************/ - + +typedef struct h2_req_entry h2_req_entry; +struct h2_req_entry { + APR_RING_ENTRY(h2_req_entry) link; + request_rec *r; +}; + +#define H2_REQ_ENTRY_NEXT(e) APR_RING_NEXT((e), link) +#define H2_REQ_ENTRY_PREV(e) APR_RING_PREV((e), link) +#define H2_REQ_ENTRY_REMOVE(e) APR_RING_REMOVE((e), link) + typedef struct h2_req_engine_i h2_req_engine_i; struct h2_req_engine_i { h2_req_engine pub; @@ -1184,20 +1193,37 @@ struct h2_req_engine_i { h2_mplx *m; unsigned int shutdown : 1; /* engine is being shut down */ apr_thread_cond_t *io; /* condition var for waiting on data */ - apr_queue_t *queue; /* queue of scheduled request_rec* */ + APR_RING_HEAD(h2_req_entries, h2_req_entry) entries; apr_size_t no_assigned; /* # of assigned requests */ apr_size_t no_live; /* # of live */ apr_size_t no_finished; /* # of finished */ }; +#define H2_REQ_ENTRIES_SENTINEL(b) APR_RING_SENTINEL((b), h2_req_entry, link) +#define H2_REQ_ENTRIES_EMPTY(b) APR_RING_EMPTY((b), h2_req_entry, link) +#define H2_REQ_ENTRIES_FIRST(b) APR_RING_FIRST(b) +#define H2_REQ_ENTRIES_LAST(b) APR_RING_LAST(b) + +#define H2_REQ_ENTRIES_INSERT_HEAD(b, e) do { \ +h2_req_entry *ap__b = (e); \ +APR_RING_INSERT_HEAD((b), ap__b, h2_req_entry, link); \ +} while (0) + +#define H2_REQ_ENTRIES_INSERT_TAIL(b, e) do { \ +h2_req_entry *ap__b = (e); \ +APR_RING_INSERT_TAIL((b), ap__b, h2_req_entry, link); \ +} while (0) + static apr_status_t h2_mplx_engine_schedule(h2_mplx *m, h2_req_engine_i *engine, request_rec *r) { - if (!engine->queue) { - apr_queue_create(&engine->queue, 100, engine->pub.pool); - } - return apr_queue_trypush(engine->queue, r); + h2_req_entry *entry = apr_pcalloc(r->pool, sizeof(*entry)); + + APR_RING_ELEM_INIT(entry, link); + entry->r = r; + H2_REQ_ENTRIES_INSERT_TAIL(&engine->entries, entry); + return APR_SUCCESS; } @@ -1263,6 +1289,7 @@ apr_status_t h2_mplx_engine_push(const char *engine_type, engine->pub.pool = task->c->pool; engine->pub.type = apr_pstrdup(task->c->pool, engine_type); engine->c = r->connection; + APR_RING_INIT(&engine->entries, h2_req_entry, link); engine->m = m; engine->io = task->io; engine->no_assigned = 1; @@ -1283,28 +1310,19 @@ apr_status_t h2_mplx_engine_push(const char *engine_type, return status; } -static request_rec *get_non_frozen(apr_queue_t *equeue) +static h2_req_entry *pop_non_frozen(h2_req_engine_i *engine) { - request_rec *r, *first = NULL; + h2_req_entry *entry; h2_task *task; - void *elem; - - if (equeue) { - /* FIFO queue, try to find a request_rec whose task is not frozen */ - while (apr_queue_trypop(equeue, &elem) == APR_SUCCESS) { - r = elem; - task = h2_ctx_rget_task(r); - AP_DEBUG_ASSERT(task); - if (!task->frozen) { - return r; - } - apr_queue_push(equeue, r); - if (!first) { - first = r; - } - else if (r == first) { - return NULL; /* walked the whole queue */ - } + + for (entry = H2_REQ_ENTRIES_FIRST(&engine->entries); + entry != H2_REQ_ENTRIES_SENTINEL(&engine->entries); + entry = H2_REQ_ENTRY_NEXT(entry)) { + task = h2_ctx_rget_task(entry->r); + AP_DEBUG_ASSERT(task); + if (!task->frozen) { + H2_REQ_ENTRY_REMOVE(entry); + return entry; } } return NULL; @@ -1313,7 +1331,7 @@ static request_rec *get_non_frozen(apr_queue_t *equeue) static apr_status_t engine_pull(h2_mplx *m, h2_req_engine_i *engine, apr_read_type_e block, request_rec **pr) { - request_rec *r; + h2_req_entry *entry; AP_DEBUG_ASSERT(m); AP_DEBUG_ASSERT(engine); @@ -1326,20 +1344,21 @@ static apr_status_t engine_pull(h2_mplx *m, h2_req_engine_i *engine, return APR_EOF; } - if (engine->queue && (r = get_non_frozen(engine->queue))) { - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, + if (!H2_REQ_ENTRIES_EMPTY(&engine->entries) + && (entry = pop_non_frozen(engine))) { + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, entry->r, "h2_mplx(%ld): request %s pulled by engine %s", - m->c->id, r->the_request, engine->pub.id); + m->c->id, entry->r->the_request, engine->pub.id); engine->no_live++; - r->connection->current_thread = engine->c->current_thread; - *pr = r; + entry->r->connection->current_thread = engine->c->current_thread; + *pr = entry->r; return APR_SUCCESS; } else if (APR_NONBLOCK_READ == block) { *pr = NULL; return APR_EAGAIN; } - else if (!engine->queue || !apr_queue_size(engine->queue)) { + else if (H2_REQ_ENTRIES_EMPTY(&engine->entries)) { engine->shutdown = 1; ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): emtpy queue, shutdown engine %s", @@ -1409,19 +1428,21 @@ void h2_mplx_engine_exit(h2_req_engine *pub_engine) int acquired; if (enter_mutex(m, &acquired) == APR_SUCCESS) { - if (engine->queue && apr_queue_size(engine->queue)) { - void *entry; + if (!m->aborted + && !H2_REQ_ENTRIES_EMPTY(&engine->entries)) { + h2_req_entry *entry; ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, "h2_mplx(%ld): exit engine %s (%s), " - "has still %d requests queued, shutdown=%d," + "has still requests queued, shutdown=%d," "assigned=%ld, live=%ld, finished=%ld", m->c->id, engine->pub.id, engine->pub.type, - (int)apr_queue_size(engine->queue), engine->shutdown, (long)engine->no_assigned, (long)engine->no_live, (long)engine->no_finished); - while (apr_queue_trypop(engine->queue, &entry) == APR_SUCCESS) { - request_rec *r = entry; + for (entry = H2_REQ_ENTRIES_FIRST(&engine->entries); + entry != H2_REQ_ENTRIES_SENTINEL(&engine->entries); + entry = H2_REQ_ENTRY_NEXT(entry)) { + request_rec *r = entry->r; h2_task *task = h2_ctx_rget_task(r); ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, "h2_mplx(%ld): engine %s has queued task %s, " @@ -1430,7 +1451,7 @@ void h2_mplx_engine_exit(h2_req_engine *pub_engine) engine_done(m, engine, task, 0, 1); } } - if (engine->no_assigned > 1 || engine->no_live > 1) { + if (!m->aborted && (engine->no_assigned > 1 || engine->no_live > 1)) { ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, "h2_mplx(%ld): exit engine %s (%s), " "assigned=%ld, live=%ld, finished=%ld", diff --git a/modules/http2/h2_proxy_session.c b/modules/http2/h2_proxy_session.c index 9d37cbade10..9a1f808927f 100644 --- a/modules/http2/h2_proxy_session.c +++ b/modules/http2/h2_proxy_session.c @@ -48,20 +48,6 @@ typedef struct h2_proxy_stream { } h2_proxy_stream; -static int ngstatus_from_apr_status(apr_status_t rv) -{ - if (rv == APR_SUCCESS) { - return NGHTTP2_NO_ERROR; - } - else if (APR_STATUS_IS_EAGAIN(rv)) { - return NGHTTP2_ERR_WOULDBLOCK; - } - else if (APR_STATUS_IS_EOF(rv)) { - return NGHTTP2_ERR_EOF; - } - return NGHTTP2_ERR_PROTO; -} - static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev, int arg, const char *msg); @@ -75,7 +61,7 @@ static apr_status_t proxy_session_pre_close(void *theconn) ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, "proxy_session(%s): pool cleanup, state=%d, streams=%d", session->id, session->state, - h2_iq_size(session->streams)); + (int)h2_ihash_count(session->streams)); dispatch_event(session, H2_PROXYS_EV_PRE_CLOSE, 0, NULL); nghttp2_session_del(session->ngh2); session->ngh2 = NULL; @@ -182,7 +168,7 @@ static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame, char buffer[256]; h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0])); - ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, session->c, APLOGNO() + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO() "h2_session(%s): recv FRAME[%s]", session->id, buffer); } @@ -380,7 +366,9 @@ static int on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags, ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO() "h2_session(%s-%d): passing output", session->id, stream->id); - return NGHTTP2_ERR_CALLBACK_FAILURE; + nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, + stream_id, NGHTTP2_STREAM_CLOSED); + return NGHTTP2_ERR_STREAM_CLOSING; } return 0; } @@ -488,7 +476,11 @@ static ssize_t stream_data_read(nghttp2_session *ngh2, int32_t stream_id, h2_iq_add(stream->session->suspended, stream->id, NULL, NULL); return NGHTTP2_ERR_DEFERRED; } - return ngstatus_from_apr_status(status); + else { + nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, + stream_id, NGHTTP2_STREAM_CLOSED); + return NGHTTP2_ERR_STREAM_CLOSING; + } } h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn, @@ -513,7 +505,7 @@ h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn, session->state = H2_PROXYS_ST_INIT; session->window_bits_default = 30; session->window_bits_connection = 30; - session->streams = h2_iq_create(pool, 25); + session->streams = h2_ihash_create(pool, offsetof(h2_proxy_stream, id)); session->suspended = h2_iq_create(pool, 5); session->done = done; @@ -689,7 +681,7 @@ static apr_status_t submit_stream(h2_proxy_session *session, h2_proxy_stream *st if (rv > 0) { stream->id = rv; stream->state = H2_STREAM_ST_OPEN; - h2_iq_add(session->streams, stream->id, NULL, NULL); + h2_ihash_add(session->streams, stream); dispatch_event(session, H2_PROXYS_EV_STREAM_SUBMITTED, rv, NULL); return APR_SUCCESS; @@ -697,13 +689,36 @@ static apr_status_t submit_stream(h2_proxy_session *session, h2_proxy_stream *st return APR_EGENERAL; } -static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block) +static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block, + apr_interval_time_t timeout) { apr_status_t status; + apr_socket_t *socket = NULL; + apr_time_t save_timeout = -1; + + if (block) { + socket = ap_get_conn_socket(session->c); + if (socket) { + apr_socket_timeout_get(socket, &save_timeout); + apr_socket_timeout_set(socket, timeout); + } + else { + /* cannot block on timeout */ + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, session->c, + "h2_session(%s): unable to get conn socket", + session->id); + return APR_ENOTIMPL; + } + } + status = ap_get_brigade(session->c->input_filters, session->input, AP_MODE_READBYTES, block? APR_BLOCK_READ : APR_NONBLOCK_READ, 64 * 1024); + if (socket && save_timeout != -1) { + apr_socket_timeout_set(socket, save_timeout); + } + if (status == APR_SUCCESS) { if (APR_BRIGADE_EMPTY(session->input)) { status = APR_EAGAIN; @@ -712,6 +727,9 @@ static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block) feed_brigade(session, session->input); } } + else if (APR_STATUS_IS_TIMEUP(status)) { + /* nop */ + } else if (!APR_STATUS_IS_EAGAIN(status)) { dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, 0, NULL); } @@ -841,7 +859,7 @@ static void ev_init(h2_proxy_session *session, int arg, const char *msg) { switch (session->state) { case H2_PROXYS_ST_INIT: - if (h2_iq_empty(session->streams)) { + if (h2_ihash_is_empty(session->streams)) { transit(session, "init", H2_PROXYS_ST_IDLE); } else { @@ -948,7 +966,7 @@ static void ev_no_io(h2_proxy_session *session, int arg, const char *msg) * CPU cycles. Ideally, we'd like to do a blocking read, but that * is not possible if we have scheduled tasks and wait * for them to produce something. */ - if (h2_iq_empty(session->streams)) { + if (h2_ihash_is_empty(session->streams)) { if (!is_accepting_streams(session)) { /* We are no longer accepting new streams and have * finished processing existing ones. Time to leave. */ @@ -966,6 +984,7 @@ static void ev_no_io(h2_proxy_session *session, int arg, const char *msg) * task processing in other threads. Do a busy wait with * backoff timer. */ transit(session, "no io", H2_PROXYS_ST_WAIT); + session->wait_timeout = 25; } break; default: @@ -1004,7 +1023,7 @@ static void ev_stream_done(h2_proxy_session *session, int stream_id, stream->data_received = 1; } stream->state = H2_STREAM_ST_CLOSED; - h2_iq_remove(session->streams, stream_id); + h2_ihash_remove(session->streams, stream_id); h2_iq_remove(session->suspended, stream_id); if (session->done) { session->done(session, stream->r); @@ -1154,7 +1173,8 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *session) } if (nghttp2_session_want_read(session->ngh2)) { - if (h2_proxy_session_read(session, 0) == APR_SUCCESS) { + status = h2_proxy_session_read(session, 0, 0); + if (status == APR_SUCCESS) { have_read = 1; } } @@ -1169,9 +1189,14 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *session) if (check_suspended(session) == APR_EAGAIN) { /* no stream has become resumed. Do a blocking read with * ever increasing timeouts... */ - if (h2_proxy_session_read(session, 0) == APR_SUCCESS) { + status = h2_proxy_session_read(session, 0, session->wait_timeout); + if (status == APR_SUCCESS) { dispatch_event(session, H2_PROXYS_EV_DATA_READ, 0, NULL); } + else if (APR_STATUS_IS_TIMEUP(status)) { + session->wait_timeout = H2MIN(apr_time_from_msec(100), + 2*session->wait_timeout); + } } break; @@ -1198,3 +1223,26 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *session) return APR_EAGAIN; } +typedef struct { + h2_proxy_session *session; + h2_proxy_request_done *done; +} cleanup_iter_ctx; + +static int cleanup_iter(void *udata, void *val) +{ + cleanup_iter_ctx *ctx = udata; + h2_proxy_stream *stream = val; + ctx->done(ctx->session, stream->r); + return 1; +} + +void h2_proxy_session_cleanup(h2_proxy_session *session, + h2_proxy_request_done *done) +{ + cleanup_iter_ctx ctx; + ctx.session = session; + ctx.done = done; + h2_ihash_iter(session->streams, cleanup_iter, &ctx); + h2_ihash_clear(session->streams); +} + diff --git a/modules/http2/h2_proxy_session.h b/modules/http2/h2_proxy_session.h index 089fd107a59..d4f68b3a19c 100644 --- a/modules/http2/h2_proxy_session.h +++ b/modules/http2/h2_proxy_session.h @@ -21,6 +21,7 @@ #include struct h2_int_queue; +struct h2_ihash_t; typedef enum { H2_PROXYS_ST_INIT, /* send initial SETTINGS, etc. */ @@ -67,8 +68,9 @@ struct h2_proxy_session { int window_bits_connection; h2_proxys_state state; + apr_interval_time_t wait_timeout; - struct h2_int_queue *streams; + struct h2_ihash_t *streams; struct h2_int_queue *suspended; apr_size_t remote_max_concurrent; int max_stream_recv; @@ -86,6 +88,7 @@ apr_status_t h2_proxy_session_submit(h2_proxy_session *s, const char *url, apr_status_t h2_proxy_session_process(h2_proxy_session *s); +void h2_proxy_session_cleanup(h2_proxy_session *s, h2_proxy_request_done *done); #define H2_PROXY_REQ_URL_NOTE "h2-proxy-req-url" diff --git a/modules/http2/h2_util.c b/modules/http2/h2_util.c index 1cbe8d1e8e3..52c858e6095 100644 --- a/modules/http2/h2_util.c +++ b/modules/http2/h2_util.c @@ -297,6 +297,11 @@ void h2_ihash_remove(h2_ihash_t *ih, int id) apr_hash_set(ih->hash, &id, sizeof(id), NULL); } +void h2_ihash_clear(h2_ihash_t *ih) +{ + apr_hash_clear(ih->hash); +} + /******************************************************************************* * h2_util for apt_table_t ******************************************************************************/ diff --git a/modules/http2/h2_util.h b/modules/http2/h2_util.h index e13a0dc2f2a..cd2d8a12e36 100644 --- a/modules/http2/h2_util.h +++ b/modules/http2/h2_util.h @@ -61,6 +61,7 @@ void h2_ihash_iter(h2_ihash_t *ih, h2_ihash_iter_t *fn, void *ctx); void h2_ihash_add(h2_ihash_t *ih, void *val); void h2_ihash_remove(h2_ihash_t *ih, int id); +void h2_ihash_clear(h2_ihash_t *ih); /******************************************************************************* * common helpers diff --git a/modules/http2/mod_proxy_http2.c b/modules/http2/mod_proxy_http2.c index efd69465c29..a2d80d32e8b 100644 --- a/modules/http2/mod_proxy_http2.c +++ b/modules/http2/mod_proxy_http2.c @@ -234,11 +234,13 @@ static void request_done(h2_proxy_session *session, request_rec *r) } static request_rec *next_request(h2_proxy_ctx *ctx, h2_proxy_session *session, - request_rec *r) + request_rec *r, int before_leave) { if (!r && !ctx->standalone) { ctx->engine->capacity = session->remote_max_concurrent; - if (req_engine_pull(ctx->engine, APR_NONBLOCK_READ, &r) == APR_SUCCESS) { + if (req_engine_pull(ctx->engine, + before_leave? APR_BLOCK_READ: APR_NONBLOCK_READ, + &r) == APR_SUCCESS) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, "h2_proxy_session(%s): pulled request %s", session->id, r->the_request); @@ -312,7 +314,7 @@ run_session: session->user_data = ctx; status = h2_proxy_session_process(session); while (APR_STATUS_IS_EAGAIN(status)) { - r = next_request(ctx, session, r); + r = next_request(ctx, session, r, 0); if (r) { add_request(session, r); r = NULL; @@ -327,9 +329,15 @@ run_session: ctx->p_conn->close = 1; } - r = next_request(ctx, session, r); + r = next_request(ctx, session, r, 1); if (r) { if (ctx->p_conn->close) { + /* the connection is/willbe closed, the session is terminated. + * Any open stream of that session needs to + * a) be reopened on the new session iff safe to do so + * b) reported as done (failed) otherwise + */ + h2_proxy_session_cleanup(session, request_done); goto setup_backend; } add_request(session, r); @@ -337,11 +345,11 @@ run_session: goto run_session; } - if (session->streams && !h2_iq_empty(session->streams)) { + if (session->streams && !h2_ihash_is_empty(session->streams)) { ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, ctx->p_conn->connection, "session run done with %d streams unfinished", - h2_iq_size(session->streams)); + (int)h2_ihash_count(session->streams)); } ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, ctx->p_conn->connection, "eng(%s): session run done",