From: Stefan Eissing Date: Wed, 27 Apr 2016 14:01:12 +0000 (+0000) Subject: mod_http2: removing beam mutex when task worker done X-Git-Tag: 2.5.0-alpha~1695 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=ed7fa29baa2ded3572374f88c98c98c531b72660;p=thirdparty%2Fapache%2Fhttpd.git mod_http2: removing beam mutex when task worker done git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1741268 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/CHANGES b/CHANGES index df02a284c8d..0e320b2fd51 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,10 @@ -*- coding: utf-8 -*- Changes with Apache 2.5.0 + *) mod_http2: bucket beams now have safe mutex remove. Used for streams where + the task worker has finished and all processing happens in the same + thread again. [Stefan Eissing] + *) mod_proxy, mod_ssl: Handle SSLProxy* directives in sections, allowing per backend TLS configuration. [Yann Ylavic] diff --git a/modules/http2/h2_bucket_beam.c b/modules/http2/h2_bucket_beam.c index 6ca39e1de3a..e630f84ecb3 100644 --- a/modules/http2/h2_bucket_beam.c +++ b/modules/http2/h2_bucket_beam.c @@ -199,22 +199,20 @@ apr_size_t h2_util_bl_print(char *buffer, apr_size_t bmax, * bucket beam that can transport buckets across threads ******************************************************************************/ -static apr_status_t enter_yellow(h2_bucket_beam *beam, - apr_thread_mutex_t **plock, int *pacquired) +static apr_status_t enter_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl) { if (beam->m_enter) { - return beam->m_enter(beam->m_ctx, plock, pacquired); + return beam->m_enter(beam->m_ctx, pbl); } - *plock = NULL; - *pacquired = 0; + pbl->mutex = NULL; + pbl->leave = NULL; return APR_SUCCESS; } -static void leave_yellow(h2_bucket_beam *beam, - apr_thread_mutex_t *lock, int acquired) +static void leave_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl) { - if (acquired && beam->m_leave) { - beam->m_leave(beam->m_ctx, lock, acquired); + if (pbl->leave) { + pbl->leave(pbl->leave_ctx, pbl->mutex); } } @@ -269,12 +267,12 @@ static apr_status_t wait_cond(h2_bucket_beam *beam, apr_thread_mutex_t *lock) } static apr_status_t r_wait_space(h2_bucket_beam *beam, apr_read_type_e block, - apr_thread_mutex_t *lock, apr_off_t *premain) + h2_beam_lock *pbl, apr_off_t *premain) { *premain = calc_space_left(beam); while (!beam->aborted && *premain <= 0 - && (block == APR_BLOCK_READ) && lock) { - apr_status_t status = wait_cond(beam, lock); + && (block == APR_BLOCK_READ) && pbl->mutex) { + apr_status_t status = wait_cond(beam, pbl->mutex); if (APR_STATUS_IS_TIMEUP(status)) { return status; } @@ -292,10 +290,9 @@ static void h2_beam_prep_purge(h2_bucket_beam *beam, apr_bucket *bred) static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy) { - apr_thread_mutex_t *lock; - int acquired; + h2_beam_lock bl; - if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + if (enter_yellow(beam, &bl) == APR_SUCCESS) { /* even when beam buckets are split, only the one where * refcount drops to 0 will call us */ H2_BPROXY_REMOVE(proxy); @@ -307,13 +304,13 @@ static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy) proxy->bred = NULL; } /* notify anyone waiting on space to become available */ - if (!lock) { + if (!bl.mutex) { r_purge_reds(beam); } else if (beam->m_cond) { apr_thread_cond_broadcast(beam->m_cond); } - leave_yellow(beam, lock, acquired); + leave_yellow(beam, &bl); } } @@ -391,18 +388,18 @@ static apr_status_t beam_cleanup(void *data) apr_status_t h2_beam_destroy(h2_bucket_beam *beam) { - apr_pool_cleanup_kill(beam->life_pool, beam, beam_cleanup); + apr_pool_cleanup_kill(beam->red_pool, beam, beam_cleanup); return beam_cleanup(beam); } -apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *life_pool, +apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *red_pool, int id, const char *tag, apr_size_t max_buf_size) { h2_bucket_beam *beam; apr_status_t status = APR_SUCCESS; - beam = apr_pcalloc(life_pool, sizeof(*beam)); + beam = apr_pcalloc(red_pool, sizeof(*beam)); if (!beam) { return APR_ENOMEM; } @@ -413,10 +410,10 @@ apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *life_pool, H2_BLIST_INIT(&beam->hold); H2_BLIST_INIT(&beam->purge); H2_BPROXY_LIST_INIT(&beam->proxies); - beam->life_pool = life_pool; + beam->red_pool = red_pool; beam->max_buf_size = max_buf_size; - apr_pool_pre_cleanup_register(life_pool, beam, beam_cleanup); + apr_pool_pre_cleanup_register(red_pool, beam, beam_cleanup); *pbeam = beam; return status; @@ -424,83 +421,68 @@ apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *life_pool, void h2_beam_buffer_size_set(h2_bucket_beam *beam, apr_size_t buffer_size) { - apr_thread_mutex_t *lock; - int acquired; + h2_beam_lock bl; - if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + if (enter_yellow(beam, &bl) == APR_SUCCESS) { beam->max_buf_size = buffer_size; - leave_yellow(beam, lock, acquired); + leave_yellow(beam, &bl); } } apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam) { - apr_thread_mutex_t *lock; - int acquired; + h2_beam_lock bl; apr_size_t buffer_size = 0; - if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + if (enter_yellow(beam, &bl) == APR_SUCCESS) { buffer_size = beam->max_buf_size; - leave_yellow(beam, lock, acquired); + leave_yellow(beam, &bl); } return buffer_size; } void h2_beam_mutex_set(h2_bucket_beam *beam, h2_beam_mutex_enter m_enter, - h2_beam_mutex_leave m_leave, apr_thread_cond_t *cond, void *m_ctx) { - apr_thread_mutex_t *lock; - h2_beam_mutex_leave *prev_leave; - void *prev_ctx; - int acquired; + h2_beam_lock bl; - if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { - prev_ctx = beam->m_ctx; - prev_leave = beam->m_leave; + if (enter_yellow(beam, &bl) == APR_SUCCESS) { beam->m_enter = m_enter; - beam->m_leave = m_leave; beam->m_ctx = m_ctx; beam->m_cond = cond; - if (acquired && prev_leave) { - /* special tactics when NULLing a lock */ - prev_leave(prev_ctx, lock, acquired); - } + leave_yellow(beam, &bl); } } void h2_beam_timeout_set(h2_bucket_beam *beam, apr_interval_time_t timeout) { - apr_thread_mutex_t *lock; - int acquired; + h2_beam_lock bl; - if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + if (enter_yellow(beam, &bl) == APR_SUCCESS) { beam->timeout = timeout; - leave_yellow(beam, lock, acquired); + leave_yellow(beam, &bl); } } apr_interval_time_t h2_beam_timeout_get(h2_bucket_beam *beam) { - apr_thread_mutex_t *lock; - int acquired; + h2_beam_lock bl; apr_interval_time_t timeout = 0; - if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + if (enter_yellow(beam, &bl) == APR_SUCCESS) { timeout = beam->timeout; - leave_yellow(beam, lock, acquired); + leave_yellow(beam, &bl); } return timeout; } void h2_beam_abort(h2_bucket_beam *beam) { - apr_thread_mutex_t *lock; - int acquired; + h2_beam_lock bl; - if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + if (enter_yellow(beam, &bl) == APR_SUCCESS) { r_purge_reds(beam); h2_blist_cleanup(&beam->red); beam->aborted = 1; @@ -508,32 +490,30 @@ void h2_beam_abort(h2_bucket_beam *beam) if (beam->m_cond) { apr_thread_cond_broadcast(beam->m_cond); } - leave_yellow(beam, lock, acquired); + leave_yellow(beam, &bl); } } apr_status_t h2_beam_close(h2_bucket_beam *beam) { - apr_thread_mutex_t *lock; - int acquired; + h2_beam_lock bl; - if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + if (enter_yellow(beam, &bl) == APR_SUCCESS) { r_purge_reds(beam); beam_close(beam); report_consumption(beam); - leave_yellow(beam, lock, acquired); + leave_yellow(beam, &bl); } return beam->aborted? APR_ECONNABORTED : APR_SUCCESS; } void h2_beam_shutdown(h2_bucket_beam *beam) { - apr_thread_mutex_t *lock; - int acquired; + h2_beam_lock bl; - if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + if (enter_yellow(beam, &bl) == APR_SUCCESS) { beam_shutdown(beam, 1); - leave_yellow(beam, lock, acquired); + leave_yellow(beam, &bl); } } @@ -541,7 +521,7 @@ static apr_status_t append_bucket(h2_bucket_beam *beam, apr_bucket *bred, apr_read_type_e block, apr_pool_t *pool, - apr_thread_mutex_t *lock) + h2_beam_lock *pbl) { const char *data; apr_size_t len; @@ -570,7 +550,7 @@ static apr_status_t append_bucket(h2_bucket_beam *beam, } if (space_left < bred->length) { - status = r_wait_space(beam, block, lock, &space_left); + status = r_wait_space(beam, block, pbl, &space_left); if (status != APR_SUCCESS) { return status; } @@ -596,23 +576,33 @@ static apr_status_t append_bucket(h2_bucket_beam *beam, * affected by this. */ status = apr_bucket_setaside(bred, pool); } - else if (APR_BUCKET_IS_HEAP(bred) || APR_BUCKET_IS_POOL(bred)) { - /* For heap/pool buckets read from a green thread is fine. The + else if (APR_BUCKET_IS_HEAP(bred)) { + /* For heap buckets read from a green thread is fine. The * data will be there and live until the bucket itself is * destroyed. */ status = APR_SUCCESS; } + else if (APR_BUCKET_IS_POOL(bred)) { + /* pool buckets are bastards that register at pool cleanup + * to morph themselves into heap buckets. That may happen anytime, + * even after the bucket data pointer has been read. So at + * any time inside the green thread, the pool bucket memory + * may disappear. yikes. */ + status = apr_bucket_read(bred, &data, &len, APR_BLOCK_READ); + if (status == APR_SUCCESS) { + apr_bucket_heap_make(bred, data, len, NULL); + } + } else if (APR_BUCKET_IS_FILE(bred)) { /* For file buckets the problem is their internal readpool that * is used on the first read to allocate buffer/mmap. * Since setting aside a file bucket will de-register the * file cleanup function from the previous pool, we need to - * call that from a red thread. Do it now and make our - * yellow pool the owner. + * call that from a red thread. * Additionally, we allow callbacks to prevent beaming file * handles across. The use case for this is to limit the number * of open file handles and rather use a less efficient beam - * transport. */ + * transport. */ apr_file_t *fd = ((apr_bucket_file *)bred->data)->fd; int can_beam = 1; if (beam->last_beamed != fd && beam->can_beam_fn) { @@ -622,13 +612,16 @@ static apr_status_t append_bucket(h2_bucket_beam *beam, beam->last_beamed = fd; status = apr_bucket_setaside(bred, pool); } + /* else: enter ENOTIMPL case below */ } if (status == APR_ENOTIMPL) { /* we have no knowledge about the internals of this bucket, - * but on read, it needs to make the data available somehow. - * So we do this while still in a red thread. The data will - * live at least os long as the red bucket itself. */ + * but hope that after read, its data stays immutable for the + * lifetime of the bucket. (see pool bucket handling above for + * a counter example). + * We do the read while in a red thread, so that the bucket may + * use pools/allocators safely. */ if (space_left < APR_BUCKET_BUFF_SIZE) { space_left = APR_BUCKET_BUFF_SIZE; } @@ -656,13 +649,12 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam, apr_bucket_brigade *red_brigade, apr_read_type_e block) { - apr_thread_mutex_t *lock; apr_bucket *bred; apr_status_t status = APR_SUCCESS; - int acquired; + h2_beam_lock bl; /* Called from the red thread to add buckets to the beam */ - if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + if (enter_yellow(beam, &bl) == APR_SUCCESS) { r_purge_reds(beam); if (beam->aborted) { @@ -672,14 +664,14 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam, while (!APR_BRIGADE_EMPTY(red_brigade) && status == APR_SUCCESS) { bred = APR_BRIGADE_FIRST(red_brigade); - status = append_bucket(beam, bred, block, red_brigade->p, lock); + status = append_bucket(beam, bred, block, beam->red_pool, &bl); } if (beam->m_cond) { apr_thread_cond_broadcast(beam->m_cond); } } report_consumption(beam); - leave_yellow(beam, lock, acquired); + leave_yellow(beam, &bl); } return status; } @@ -689,14 +681,14 @@ apr_status_t h2_beam_receive(h2_bucket_beam *beam, apr_read_type_e block, apr_off_t readbytes) { - apr_thread_mutex_t *lock; + h2_beam_lock bl; apr_bucket *bred, *bgreen, *ng; - int acquired, transferred = 0; + int transferred = 0; apr_status_t status = APR_SUCCESS; apr_off_t remain = readbytes; /* Called from the green thread to take buckets from the beam */ - if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + if (enter_yellow(beam, &bl) == APR_SUCCESS) { transfer: if (beam->aborted) { status = APR_ECONNABORTED; @@ -817,8 +809,8 @@ transfer: status = APR_EOF; } } - else if (block == APR_BLOCK_READ && lock && beam->m_cond) { - status = wait_cond(beam, lock); + else if (block == APR_BLOCK_READ && bl.mutex && beam->m_cond) { + status = wait_cond(beam, bl.mutex); if (status != APR_SUCCESS) { goto leave; } @@ -828,7 +820,7 @@ transfer: status = APR_EAGAIN; } leave: - leave_yellow(beam, lock, acquired); + leave_yellow(beam, &bl); } return status; } @@ -836,57 +828,53 @@ leave: void h2_beam_on_consumed(h2_bucket_beam *beam, h2_beam_consumed_callback *cb, void *ctx) { - apr_thread_mutex_t *lock; - int acquired; + h2_beam_lock bl; - if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + if (enter_yellow(beam, &bl) == APR_SUCCESS) { beam->consumed_fn = cb; beam->consumed_ctx = ctx; - leave_yellow(beam, lock, acquired); + leave_yellow(beam, &bl); } } void h2_beam_on_file_beam(h2_bucket_beam *beam, h2_beam_can_beam_callback *cb, void *ctx) { - apr_thread_mutex_t *lock; - int acquired; + h2_beam_lock bl; - if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + if (enter_yellow(beam, &bl) == APR_SUCCESS) { beam->can_beam_fn = cb; beam->can_beam_ctx = ctx; - leave_yellow(beam, lock, acquired); + leave_yellow(beam, &bl); } } apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam) { - apr_thread_mutex_t *lock; apr_bucket *b; apr_off_t l = 0; - int acquired; + h2_beam_lock bl; - if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + if (enter_yellow(beam, &bl) == APR_SUCCESS) { for (b = H2_BLIST_FIRST(&beam->red); b != H2_BLIST_SENTINEL(&beam->red); b = APR_BUCKET_NEXT(b)) { /* should all have determinate length */ l += b->length; } - leave_yellow(beam, lock, acquired); + leave_yellow(beam, &bl); } return l; } apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam) { - apr_thread_mutex_t *lock; apr_bucket *b; apr_off_t l = 0; - int acquired; + h2_beam_lock bl; - if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + if (enter_yellow(beam, &bl) == APR_SUCCESS) { for (b = H2_BLIST_FIRST(&beam->red); b != H2_BLIST_SENTINEL(&beam->red); b = APR_BUCKET_NEXT(b)) { @@ -898,21 +886,20 @@ apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam) l += b->length; } } - leave_yellow(beam, lock, acquired); + leave_yellow(beam, &bl); } return l; } int h2_beam_empty(h2_bucket_beam *beam) { - apr_thread_mutex_t *lock; int empty = 1; - int acquired; + h2_beam_lock bl; - if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + if (enter_yellow(beam, &bl) == APR_SUCCESS) { empty = (H2_BLIST_EMPTY(&beam->red) && (!beam->green || APR_BRIGADE_EMPTY(beam->green))); - leave_yellow(beam, lock, acquired); + leave_yellow(beam, &bl); } return empty; } @@ -924,26 +911,24 @@ int h2_beam_closed(h2_bucket_beam *beam) int h2_beam_was_received(h2_bucket_beam *beam) { - apr_thread_mutex_t *lock; int happend = 0; - int acquired; + h2_beam_lock bl; - if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + if (enter_yellow(beam, &bl) == APR_SUCCESS) { happend = (beam->received_bytes > 0); - leave_yellow(beam, lock, acquired); + leave_yellow(beam, &bl); } return happend; } apr_size_t h2_beam_get_files_beamed(h2_bucket_beam *beam) { - apr_thread_mutex_t *lock; apr_size_t n = 0; - int acquired; + h2_beam_lock bl; - if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) { + if (enter_yellow(beam, &bl) == APR_SUCCESS) { n = beam->files_beamed; - leave_yellow(beam, lock, acquired); + leave_yellow(beam, &bl); } return n; } diff --git a/modules/http2/h2_bucket_beam.h b/modules/http2/h2_bucket_beam.h index f94b1b9ecac..a8abc908a1e 100644 --- a/modules/http2/h2_bucket_beam.h +++ b/modules/http2/h2_bucket_beam.h @@ -139,14 +139,19 @@ apr_size_t h2_util_bl_print(char *buffer, apr_size_t bmax, * technology where humans are kept inside the transporter's memory * buffers until the transmission is complete. Star gates use a similar trick. */ + +typedef void h2_beam_mutex_leave(void *ctx, struct apr_thread_mutex_t *lock); + +typedef struct { + apr_thread_mutex_t *mutex; + h2_beam_mutex_leave *leave; + void *leave_ctx; +} h2_beam_lock; + typedef struct h2_bucket_beam h2_bucket_beam; -typedef apr_status_t h2_beam_mutex_enter(void *ctx, - struct apr_thread_mutex_t **plock, - int *acquired); -typedef void h2_beam_mutex_leave(void *ctx, - struct apr_thread_mutex_t *lock, - int acquired); +typedef apr_status_t h2_beam_mutex_enter(void *ctx, h2_beam_lock *pbl); + typedef void h2_beam_consumed_callback(void *ctx, h2_bucket_beam *beam, apr_off_t bytes); @@ -166,7 +171,7 @@ struct h2_bucket_beam { h2_blist purge; apr_bucket_brigade *green; h2_bproxy_list proxies; - apr_pool_t *life_pool; + apr_pool_t *red_pool; apr_size_t max_buf_size; apr_size_t files_beamed; /* how many file handles have been set aside */ @@ -181,7 +186,6 @@ struct h2_bucket_beam { void *m_ctx; h2_beam_mutex_enter *m_enter; - h2_beam_mutex_leave *m_leave; struct apr_thread_cond_t *m_cond; apr_interval_time_t timeout; @@ -199,15 +203,22 @@ struct h2_bucket_beam { * that is only used inside that same mutex. * * @param pbeam will hold the created beam on return - * @param life_pool pool for allocating initial structure and cleanups + * @param red_pool pool usable on red side, beam lifeline * @param buffer_size maximum memory footprint of buckets buffered in beam, or * 0 for no limitation + * + * Call from the red side only. */ apr_status_t h2_beam_create(h2_bucket_beam **pbeam, - apr_pool_t *life_pool, + apr_pool_t *red_pool, int id, const char *tag, apr_size_t buffer_size); +/** + * Destroys the beam immediately without cleanup. + * + * Call from the red side only. + */ apr_status_t h2_beam_destroy(h2_bucket_beam *beam); /** @@ -215,6 +226,8 @@ apr_status_t h2_beam_destroy(h2_bucket_beam *beam); * internally as long as they have not been processed by the receiving side. * All accepted buckets are removed from the given brigade. Will return with * APR_EAGAIN on non-blocking sends when not all buckets could be accepted. + * + * Call from the red side only. */ apr_status_t h2_beam_send(h2_bucket_beam *beam, apr_bucket_brigade *red_buckets, @@ -225,28 +238,52 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam, * when reading past an EOS bucket. Reads can be blocking until data is * available or the beam has been closed. Non-blocking calls return APR_EAGAIN * if no data is available. + * + * Call from the green side only. */ apr_status_t h2_beam_receive(h2_bucket_beam *beam, apr_bucket_brigade *green_buckets, apr_read_type_e block, apr_off_t readbytes); +/** + * Determine if beam is closed. May still contain buffered data. + * + * Call from red or green side. + */ +int h2_beam_closed(h2_bucket_beam *beam); + +/** + * Determine if beam is empty. + * + * Call from red or green side. + */ +int h2_beam_empty(h2_bucket_beam *beam); + +/** + * Abort the beam. Will cleanup any buffered buckets and answer all send + * and receives with APR_ECONNABORTED. + * + * Call from the red side only. + */ void h2_beam_abort(h2_bucket_beam *beam); /** - * Close the beam. Does not need to be invoked if certain that an EOS bucket - * has been sent. + * Close the beam. Sending an EOS bucket serves the same purpose. + * + * Call from the red side only. */ apr_status_t h2_beam_close(h2_bucket_beam *beam); /** * Empty the buffer and close. + * + * Call from the red side only. */ void h2_beam_shutdown(h2_bucket_beam *beam); void h2_beam_mutex_set(h2_bucket_beam *beam, h2_beam_mutex_enter m_enter, - h2_beam_mutex_leave m_leave, struct apr_thread_cond_t *cond, void *m_ctx); @@ -272,6 +309,8 @@ apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam); * @param beam the beam to set the callback on * @param cb the callback or NULL * @param ctx the context to use in callback invocation + * + * Call from the red side, callbacks invoked on red side. */ void h2_beam_on_consumed(h2_bucket_beam *beam, h2_beam_consumed_callback *cb, void *ctx); @@ -289,9 +328,6 @@ apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam); */ apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam); -int h2_beam_closed(h2_bucket_beam *beam); -int h2_beam_empty(h2_bucket_beam *beam); - /** * Return != 0 iff (some) data from the beam has been received. */ diff --git a/modules/http2/h2_h2.c b/modules/http2/h2_h2.c index 7c8bf3dd100..bc9e261b526 100644 --- a/modules/http2/h2_h2.c +++ b/modules/http2/h2_h2.c @@ -678,7 +678,6 @@ static int h2_h2_post_read_req(request_rec *r) struct h2_task *task = h2_ctx_get_task(ctx); /* This hook will get called twice on internal redirects. Take care * that we manipulate filters only once. */ - /* our slave connection? */ if (task && !task->filters_set) { ap_filter_t *f; diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 3f0398a051d..6a886ac59dd 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -112,18 +112,24 @@ static void leave_mutex(h2_mplx *m, int acquired) } } -static apr_status_t io_mutex_enter(void *ctx, - apr_thread_mutex_t **plock, int *acquired) +static void beam_leave(void *ctx, apr_thread_mutex_t *lock) { - h2_mplx *m = ctx; - *plock = m->lock; - return enter_mutex(m, acquired); + leave_mutex(ctx, 1); } -static void io_mutex_leave(void *ctx, apr_thread_mutex_t *lock, int acquired) +static apr_status_t beam_enter(void *ctx, h2_beam_lock *pbl) { h2_mplx *m = ctx; - leave_mutex(m, acquired); + int acquired; + apr_status_t status; + + status = enter_mutex(m, &acquired); + if (status == APR_SUCCESS) { + pbl->mutex = m->lock; + pbl->leave = acquired? beam_leave : NULL; + pbl->leave_ctx = m; + } + return status; } static void stream_output_consumed(void *ctx, @@ -366,7 +372,6 @@ static int task_stream_done(h2_mplx *m, h2_task *task, int rst_error) /* cleanup once task is done */ task->orphaned = 1; if (task->input.beam) { - /* TODO: this is currently allocated by the stream and will disappear */ h2_beam_shutdown(task->input.beam); task->input.beam = NULL; } @@ -638,8 +643,7 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response) h2_beam_on_consumed(task->output.beam, stream_output_consumed, task); m->tx_handles_reserved -= h2_beam_get_files_beamed(task->output.beam); h2_beam_on_file_beam(task->output.beam, can_beam_file, m); - h2_beam_mutex_set(task->output.beam, io_mutex_enter, io_mutex_leave, - task->cond, m); + h2_beam_mutex_set(task->output.beam, beam_enter, task->cond, m); } h2_ihash_add(m->ready_tasks, task); @@ -670,42 +674,34 @@ apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response) return status; } -apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id) +static apr_status_t out_close(h2_mplx *m, h2_task *task) { - apr_status_t status; - int acquired; + apr_status_t status = APR_SUCCESS; - AP_DEBUG_ASSERT(m); - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_task *task = h2_ihash_get(m->tasks, stream_id); - if (task && !task->orphaned) { - if (!task->response && !task->rst_error) { - /* In case a close comes before a response was created, - * insert an error one so that our streams can properly - * reset. - */ - h2_response *r = h2_response_die(stream_id, APR_EGENERAL, - task->request, m->pool); - status = out_open(m, stream_id, r); - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c, - "h2_mplx(%ld-%d): close, no response, no rst", - m->id, stream_id); - } - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, - "h2_mplx(%ld-%d): close", m->id, stream_id); - if (task->output.beam) { - status = h2_beam_close(task->output.beam); - h2_beam_log(task->output.beam, stream_id, "out_close", m->c, - APLOG_TRACE2); - } - output_consumed_signal(m, task); - have_out_data_for(m, stream_id); - } - else { - status = APR_ECONNABORTED; - } - leave_mutex(m, acquired); + if (!task || task->orphaned) { + return APR_ECONNABORTED; + } + + if (!task->response && !task->rst_error) { + /* In case a close comes before a response was created, + * insert an error one so that our streams can properly + * reset. + */ + h2_response *r = h2_response_die(task->stream_id, APR_EGENERAL, + task->request, m->pool); + status = out_open(m, task->stream_id, r); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c, + "h2_mplx(%s): close, no response, no rst", task->id); + } + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, + "h2_mplx(%s): close", task->id); + if (task->output.beam) { + status = h2_beam_close(task->output.beam); + h2_beam_log(task->output.beam, task->stream_id, "out_close", m->c, + APLOG_TRACE2); } + output_consumed_signal(m, task); + have_out_data_for(m, task->stream_id); return status; } @@ -832,17 +828,17 @@ static h2_task *pop_task(h2_mplx *m) task->worker_started = 1; task->started_at = apr_time_now(); - - if (task->input.beam) { - h2_beam_timeout_set(task->input.beam, m->stream_timeout); - h2_beam_on_consumed(task->input.beam, stream_input_consumed, m); - h2_beam_on_file_beam(task->input.beam, can_beam_file, m); - h2_beam_mutex_set(task->input.beam, io_mutex_enter, - io_mutex_leave, task->cond, m); - } if (sid > m->max_stream_started) { m->max_stream_started = sid; } + + if (stream->input) { + h2_beam_timeout_set(stream->input, m->stream_timeout); + h2_beam_on_consumed(stream->input, stream_input_consumed, m); + h2_beam_on_file_beam(stream->input, can_beam_file, m); + h2_beam_mutex_set(stream->input, beam_enter, task->cond, m); + } + ++m->workers_busy; } } @@ -886,18 +882,12 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) /* FIXME: this implementation is incomplete. */ h2_task_set_io_blocking(task, 0); apr_thread_cond_broadcast(m->task_thawed); + return; } else { - apr_time_t now = apr_time_now(); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): task(%s) done", m->id, task->id); - /* clean our references and report request as done. Signal - * that we want another unless we have been aborted */ - /* TODO: this will keep a worker attached to this h2_mplx as - * long as it has requests to handle. Might no be fair to - * other mplx's. Perhaps leave after n requests? */ - h2_mplx_out_close(m, task->stream_id); + out_close(m, task); if (ngn) { apr_off_t bytes = 0; @@ -930,29 +920,33 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) h2_task_redo(task); h2_ihash_remove(m->redo_tasks, task->stream_id); h2_iq_add(m->q, task->stream_id, NULL, NULL); + return; } - else { - task->worker_done = 1; - task->done_at = now; - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%s): request done, %f ms" - " elapsed", task->id, - (task->done_at - task->started_at) / 1000.0); - if (task->started_at > m->last_idle_block) { - /* this task finished without causing an 'idle block', e.g. - * a block by flow control. - */ - if (now - m->last_limit_change >= m->limit_change_interval - && m->workers_limit < m->workers_max) { - /* Well behaving stream, allow it more workers */ - m->workers_limit = H2MIN(m->workers_limit * 2, - m->workers_max); - m->last_limit_change = now; - m->need_registration = 1; - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): increase worker limit to %d", - m->id, m->workers_limit); - } + + task->worker_done = 1; + task->done_at = apr_time_now(); + if (task->output.beam) { + h2_beam_on_consumed(task->output.beam, NULL, NULL); + h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL); + } + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%s): request done, %f ms" + " elapsed", task->id, + (task->done_at - task->started_at) / 1000.0); + if (task->started_at > m->last_idle_block) { + /* this task finished without causing an 'idle block', e.g. + * a block by flow control. + */ + if (task->done_at- m->last_limit_change >= m->limit_change_interval + && m->workers_limit < m->workers_max) { + /* Well behaving stream, allow it more workers */ + m->workers_limit = H2MIN(m->workers_limit * 2, + m->workers_max); + m->last_limit_change = task->done_at; + m->need_registration = 1; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld): increase worker limit to %d", + m->id, m->workers_limit); } } diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index f6a83d62f10..a6fe12a3efc 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -244,11 +244,6 @@ struct h2_stream *h2_mplx_next_submit(h2_mplx *m, apr_status_t h2_mplx_out_open(h2_mplx *mplx, int stream_id, struct h2_response *response); -/** - * Closes the output for stream stream_id. - */ -apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id); - /******************************************************************************* * h2_mplx list Manipulation. ******************************************************************************/ diff --git a/modules/http2/h2_version.h b/modules/http2/h2_version.h index ed74198a045..8fe813a3752 100644 --- a/modules/http2/h2_version.h +++ b/modules/http2/h2_version.h @@ -26,7 +26,7 @@ * @macro * Version number of the http2 module as c string */ -#define MOD_HTTP2_VERSION "1.5.1-DEV" +#define MOD_HTTP2_VERSION "1.5.2-DEV" /** * @macro @@ -34,7 +34,7 @@ * release. This is a 24 bit number with 8 bits for major number, 8 bits * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203. */ -#define MOD_HTTP2_VERSION_NUM 0x010501 +#define MOD_HTTP2_VERSION_NUM 0x010502 #endif /* mod_h2_h2_version_h */