From: Stefan Eissing Date: Wed, 29 Mar 2017 17:36:35 +0000 (+0000) Subject: On the trunk: X-Git-Tag: 2.5.0-alpha~512 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=0dc4a16fa38611ede5c436ad85acc14f79952be8;p=thirdparty%2Fapache%2Fhttpd.git On the trunk: mod_http2: better performance, eliminated need for nested locks and thread privates. git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1789395 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/CHANGES b/CHANGES index f99ca450bf9..e219f7ad10f 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,9 @@ -*- coding: utf-8 -*- Changes with Apache 2.5.0 + *) mod_http2: better performance, eliminated need for nested locks and + thread privates. [Stefan Eissing] + *) core: Disallow multiple Listen on the same IP:port when listener buckets are configured (ListenCoresBucketsRatio > 0), consistently with the single bucket case (default), thus avoiding the leak of the corresponding socket diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 2251dcb510f..c1b0b049c1b 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -57,35 +57,40 @@ typedef struct { /* NULL or the mutex hold by this thread, used for recursive calls */ +static const int nested_lock = 0; + static apr_threadkey_t *thread_lock; apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s) { - return apr_threadkey_private_create(&thread_lock, NULL, pool); + if (nested_lock) { + return apr_threadkey_private_create(&thread_lock, NULL, pool); + } + return APR_SUCCESS; } static apr_status_t enter_mutex(h2_mplx *m, int *pacquired) { apr_status_t status; - void *mutex = NULL; - /* Enter the mutex if this thread already holds the lock or - * if we can acquire it. Only on the later case do we unlock - * onleaving the mutex. - * This allow recursive entering of the mutex from the saem thread, - * which is what we need in certain situations involving callbacks - */ - ap_assert(m); - apr_threadkey_private_get(&mutex, thread_lock); - if (mutex == m->lock) { - *pacquired = 0; - return APR_SUCCESS; + if (nested_lock) { + void *mutex = NULL; + /* Enter the mutex if this thread already holds the lock or + * if we can acquire it. Only on the later case do we unlock + * onleaving the mutex. + * This allow recursive entering of the mutex from the saem thread, + * which is what we need in certain situations involving callbacks + */ + apr_threadkey_private_get(&mutex, thread_lock); + if (mutex == m->lock) { + *pacquired = 0; + ap_assert(NULL); /* nested, why? */ + return APR_SUCCESS; + } } - - ap_assert(m->lock); status = apr_thread_mutex_lock(m->lock); *pacquired = (status == APR_SUCCESS); - if (*pacquired) { + if (nested_lock && *pacquired) { apr_threadkey_private_set(m->lock, thread_lock); } return status; @@ -94,7 +99,9 @@ static apr_status_t enter_mutex(h2_mplx *m, int *pacquired) static void leave_mutex(h2_mplx *m, int acquired) { if (acquired) { - apr_threadkey_private_set(NULL, thread_lock); + if (nested_lock) { + apr_threadkey_private_set(NULL, thread_lock); + } apr_thread_mutex_unlock(m->lock); } } @@ -105,38 +112,23 @@ static void stream_output_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length) { h2_stream *stream = ctx; - h2_mplx *m = stream->session->mplx; h2_task *task = stream->task; - int acquired; if (length > 0 && task && task->assigned) { - if (enter_mutex(m, &acquired) == APR_SUCCESS) { - h2_req_engine_out_consumed(task->assigned, task->c, length); - leave_mutex(m, acquired); - } + h2_req_engine_out_consumed(task->assigned, task->c, length); } } static void stream_input_ev(void *ctx, h2_bucket_beam *beam) { - h2_mplx *m = ctx; + h2_stream *stream = ctx; + h2_mplx *m = stream->session->mplx; apr_atomic_set32(&m->event_pending, 1); } -static void stream_input_consumed(void *ctx, - h2_bucket_beam *beam, apr_off_t length) +static void stream_input_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length) { - if (length > 0) { - h2_mplx *m = ctx; - int acquired; - - if (enter_mutex(m, &acquired) == APR_SUCCESS) { - if (m->input_consumed) { - m->input_consumed(m->input_consumed_ctx, beam->id, length); - } - leave_mutex(m, acquired); - } - } + h2_stream_in_consumed(ctx, length); } static int can_always_beam_file(void *ctx, h2_bucket_beam *beam, apr_file_t *file) @@ -289,6 +281,12 @@ static int input_consumed_signal(h2_mplx *m, h2_stream *stream) return 0; } +static int report_consumption_iter(void *ctx, void *val) +{ + input_consumed_signal(ctx, val); + return 1; +} + static int output_consumed_signal(h2_mplx *m, h2_task *task) { if (task->output.beam) { @@ -426,6 +424,10 @@ static int stream_cancel_iter(void *ctx, void *val) { h2_mplx *m = ctx; h2_stream *stream = val; + /* disabled input consumed reporting */ + if (stream->input) { + h2_beam_on_consumed(stream->input, NULL, NULL, NULL); + } /* take over event monitoring */ h2_stream_set_monitor(stream, NULL); /* Reset, should transit to CLOSED state */ @@ -527,12 +529,6 @@ h2_stream *h2_mplx_stream_get(h2_mplx *m, int id) return s; } -void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx) -{ - m->input_consumed = cb; - m->input_consumed_ctx = ctx; -} - static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes) { h2_mplx *m = ctx; @@ -618,18 +614,6 @@ static apr_status_t out_close(h2_mplx *m, h2_task *task) return status; } -static int report_input_consumption(void *ctx, void *val) -{ - h2_stream *stream = val; - - (void)ctx; - if (stream->input) { - h2_beam_report_consumption(stream->input); - } - return 1; -} - - apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, apr_thread_cond_t *iowait) { @@ -645,7 +629,7 @@ apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, } else { purge_streams(m); - h2_ihash_iter(m->streams, report_input_consumption, m); + h2_ihash_iter(m->streams, report_consumption_iter, m); m->added_output = iowait; status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout); if (APLOGctrace2(m->c)) { @@ -757,7 +741,7 @@ static h2_task *next_stream_task(h2_mplx *m) if (stream->input) { h2_beam_on_consumed(stream->input, stream_input_ev, - stream_input_consumed, m); + stream_input_consumed, stream); h2_beam_on_file_beam(stream->input, can_always_beam_file, m); h2_beam_mutex_enable(stream->input); } @@ -1207,12 +1191,6 @@ int h2_mplx_has_master_events(h2_mplx *m) return apr_atomic_read32(&m->event_pending) > 0; } -static int report_consumption_iter(void *ctx, void *val) -{ - input_consumed_signal(ctx, val); - return 1; -} - apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, stream_ev_callback *on_resume, void *on_ctx) @@ -1227,16 +1205,19 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, "h2_mplx(%ld): dispatch events", m->id); apr_atomic_set32(&m->event_pending, 0); + purge_streams(m); + /* update input windows for streams */ h2_ihash_iter(m->streams, report_consumption_iter, m); - purge_streams(m); if (!h2_iq_empty(m->readyq)) { n = h2_iq_mshift(m->readyq, ids, H2_ALEN(ids)); for (i = 0; i < n; ++i) { stream = h2_ihash_get(m->streams, ids[i]); if (stream) { + leave_mutex(m, acquired); on_resume(on_ctx, stream); + enter_mutex(m, &acquired); } } } diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index e4437d846e4..23162e8eb8b 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -53,12 +53,6 @@ struct h2_req_engine; typedef struct h2_mplx h2_mplx; -/** - * Callback invoked for every stream that had input data read since - * the last invocation. - */ -typedef void h2_mplx_consumed_cb(void *ctx, int stream_id, apr_off_t consumed); - struct h2_mplx { long id; conn_rec *c; @@ -100,9 +94,6 @@ struct h2_mplx { struct h2_workers *workers; - h2_mplx_consumed_cb *input_consumed; - void *input_consumed_ctx; - struct h2_ngn_shed *ngn_shed; }; @@ -194,18 +185,6 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, */ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx); -/** - * Register a callback for the amount of input data consumed per stream. The - * will only ever be invoked from the thread creating this h2_mplx, e.g. when - * calls from that thread into this h2_mplx are made. - * - * @param m the multiplexer to register the callback at - * @param cb the function to invoke - * @param ctx user supplied argument to invocation. - */ -void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx); - - typedef apr_status_t stream_ev_callback(void *ctx, struct h2_stream *stream); /** diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index 78180b4c036..c7126beb95b 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -77,68 +77,6 @@ static h2_stream *get_stream(h2_session *session, int stream_id) return nghttp2_session_get_stream_user_data(session->ngh2, stream_id); } -static void update_window(void *ctx, int stream_id, apr_off_t bytes_read) -{ - h2_session *session = ctx; - h2_stream *stream; - - if (bytes_read > 0) { - apr_off_t consumed = bytes_read; - - while (consumed > 0) { - int len = (bytes_read > INT_MAX)? INT_MAX : bytes_read; - nghttp2_session_consume(session->ngh2, stream_id, (int)bytes_read); - consumed -= len; - } - - (void)stream; -#ifdef H2_NG2_LOCAL_WIN_SIZE - if ((stream = get_stream(session, stream_id))) { - int cur_size = nghttp2_session_get_stream_local_window_size( - session->ngh2, stream->id); - int win = stream->in_window_size; - int thigh = win * 8/10; - int tlow = win * 2/10; - const int win_max = 2*1024*1024; - const int win_min = 32*1024; - - /* Work in progress, probably shoud add directives for these - * values once this stabilizes somewhat. The general idea is - * to adapt stream window sizes if the input window changes - * a) very quickly (< good RTT) from full to empty - * b) only a little bit (> bad RTT) - * where in a) it grows and in b) it shrinks again. - */ - if (cur_size > thigh && bytes_read > thigh && win < win_max) { - /* almost empty again with one reported consumption, how - * long did this take? */ - long ms = apr_time_msec(apr_time_now() - stream->in_last_write); - if (ms < 40) { - win = H2MIN(win_max, win + (64*1024)); - } - } - else if (cur_size < tlow && bytes_read < tlow && win > win_min) { - /* staying full, for how long already? */ - long ms = apr_time_msec(apr_time_now() - stream->in_last_write); - if (ms > 700) { - win = H2MAX(win_min, win - (32*1024)); - } - } - - if (win != stream->in_window_size) { - stream->in_window_size = win; - nghttp2_session_set_local_window_size(session->ngh2, - NGHTTP2_FLAG_NONE, stream_id, win); - } - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - "h2_stream(%ld-%d): consumed %ld bytes, window now %d/%d", - session->id, stream_id, (long)bytes_read, - cur_size, stream->in_window_size); - } -#endif - } -} - static void dispatch_event(h2_session *session, h2_session_event_t ev, int err, const char *msg); @@ -755,7 +693,6 @@ static apr_status_t session_cleanup(h2_session *session, const char *trigger) } transit(session, trigger, H2_SESSION_ST_CLEANUP); - h2_mplx_set_consumed_cb(session->mplx, NULL, NULL); h2_mplx_release_and_join(session->mplx, session->iowait); session->mplx = NULL; @@ -880,8 +817,6 @@ static apr_status_t h2_session_create_int(h2_session **psession, session->mplx = h2_mplx_create(c, session->pool, session->config, workers); - h2_mplx_set_consumed_cb(session->mplx, update_window, session); - /* connection input filter that feeds the session */ session->cin = h2_filter_cin_create(session); ap_add_input_filter("H2_IN", session->cin, r, c); diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c index 9af2f29edc1..0b66af84e79 100644 --- a/modules/http2/h2_stream.c +++ b/modules/http2/h2_stream.c @@ -969,4 +969,64 @@ int h2_stream_was_closed(const h2_stream *stream) } } +apr_status_t h2_stream_in_consumed(h2_stream *stream, apr_off_t amount) +{ + h2_session *session = stream->session; + + if (amount > 0) { + apr_off_t consumed = amount; + + while (consumed > 0) { + int len = (consumed > INT_MAX)? INT_MAX : consumed; + nghttp2_session_consume(session->ngh2, stream->id, len); + consumed -= len; + } + +#ifdef H2_NG2_LOCAL_WIN_SIZE + if (1) { + int cur_size = nghttp2_session_get_stream_local_window_size( + session->ngh2, stream->id); + int win = stream->in_window_size; + int thigh = win * 8/10; + int tlow = win * 2/10; + const int win_max = 2*1024*1024; + const int win_min = 32*1024; + + /* Work in progress, probably should add directives for these + * values once this stabilizes somewhat. The general idea is + * to adapt stream window sizes if the input window changes + * a) very quickly (< good RTT) from full to empty + * b) only a little bit (> bad RTT) + * where in a) it grows and in b) it shrinks again. + */ + if (cur_size > thigh && amount > thigh && win < win_max) { + /* almost empty again with one reported consumption, how + * long did this take? */ + long ms = apr_time_msec(apr_time_now() - stream->in_last_write); + if (ms < 40) { + win = H2MIN(win_max, win + (64*1024)); + } + } + else if (cur_size < tlow && amount < tlow && win > win_min) { + /* staying full, for how long already? */ + long ms = apr_time_msec(apr_time_now() - stream->in_last_write); + if (ms > 700) { + win = H2MAX(win_min, win - (32*1024)); + } + } + + if (win != stream->in_window_size) { + stream->in_window_size = win; + nghttp2_session_set_local_window_size(session->ngh2, + NGHTTP2_FLAG_NONE, stream->id, win); + } + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, + "h2_stream(%ld-%d): consumed %ld bytes, window now %d/%d", + session->id, stream->id, (long)amount, + cur_size, stream->in_window_size); + } +#endif + } + return APR_SUCCESS; +} diff --git a/modules/http2/h2_stream.h b/modules/http2/h2_stream.h index 15d43995402..f328714951b 100644 --- a/modules/http2/h2_stream.h +++ b/modules/http2/h2_stream.h @@ -164,6 +164,12 @@ void h2_stream_cleanup(h2_stream *stream); */ apr_pool_t *h2_stream_detach_pool(h2_stream *stream); +/** + * Notify the stream that amount bytes have been consumed of its input + * since the last invocation of this method (delta amount). + */ +apr_status_t h2_stream_in_consumed(h2_stream *stream, apr_off_t amount); + /** * Set complete stream headers from given h2_request. *