From: Stefan Eissing Date: Thu, 10 Mar 2016 13:37:55 +0000 (+0000) Subject: backported mod_http2 1.4.0 from trunk X-Git-Tag: 2.4.19~69 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=624548f96ad916fe486f9b0e629be79ad7699bf0;p=thirdparty%2Fapache%2Fhttpd.git backported mod_http2 1.4.0 from trunk git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4.x@1734413 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/CHANGES b/CHANGES index 6a6fe5953b0..2e568a91cb7 100644 --- a/CHANGES +++ b/CHANGES @@ -2,6 +2,10 @@ Changes with Apache 2.4.19 + *) mod_http2: give control to async mpm for keepalive timeouts only when + no streams are open and even if only after 1 sec delay. Under load, event + mpm discards connections otherwise too quickly. [Stefan Eissing] + *) mod_ssl: Don't lose track of the SSL context if the ssl_run_pre_handshake() hook returns an error. [Graham Leggett] diff --git a/CMakeLists.txt b/CMakeLists.txt index 898139afd81..8b67d7a0e79 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -388,7 +388,7 @@ SET(mod_http2_extra_sources modules/http2/h2_mplx.c modules/http2/h2_push.c modules/http2/h2_request.c modules/http2/h2_response.c modules/http2/h2_session.c modules/http2/h2_stream.c - modules/http2/h2_switch.c + modules/http2/h2_switch.c modules/http2/h2_ngn_shed.c modules/http2/h2_task.c modules/http2/h2_task_input.c modules/http2/h2_task_output.c modules/http2/h2_int_queue.c modules/http2/h2_util.c modules/http2/h2_worker.c diff --git a/modules/http2/NWGNUmakefile b/modules/http2/NWGNUmakefile index e97efcaa266..ebfb52767e2 100644 --- a/modules/http2/NWGNUmakefile +++ b/modules/http2/NWGNUmakefile @@ -152,7 +152,6 @@ XDCDATA = # If there is an NLM target, put it here # TARGET_nlm = \ - $(OBJDIR)/mod_http2.nlm \ $(OBJDIR)/mod_http2.nlm \ $(EOLIST) diff --git a/modules/http2/NWGNUmod_http2 b/modules/http2/NWGNUmod_http2 index 12811bd2703..2a8a0fbb494 100644 --- a/modules/http2/NWGNUmod_http2 +++ b/modules/http2/NWGNUmod_http2 @@ -198,6 +198,7 @@ FILES_nlm_objs = \ $(OBJDIR)/h2_io.o \ $(OBJDIR)/h2_io_set.o \ $(OBJDIR)/h2_mplx.o \ + $(OBJDIR)/h2_ngn_shed.o \ $(OBJDIR)/h2_push.o \ $(OBJDIR)/h2_request.o \ $(OBJDIR)/h2_response.o \ @@ -259,7 +260,7 @@ FILES_nlm_Ximports = \ # Any symbols exported to here # FILES_nlm_exports = \ - http2_module \ + @$(OBJDIR)/mod_http2.imp \ $(EOLIST) # @@ -273,7 +274,7 @@ FILES_lib_objs := $(sort $(patsubst $(NGH2SRC)/lib/%.c,$(OBJDIR)/%.o,$(wildcard libs :: $(OBJDIR) $(NGH2SRC)/lib/config.h $(TARGET_lib) -nlms :: libs $(TARGET_nlm) +nlms :: libs $(OBJDIR)/mod_http2.imp $(TARGET_nlm) # # Updated this target to create necessary directories and copy files to the @@ -289,7 +290,8 @@ clean :: # vpath %.c $(NGH2SRC)/lib -$(NGH2SRC)/lib/config.h : NWGNUmakefile +$(NGH2SRC)/lib/config.h : NWGNUmod_http2 + @-$(RM) $@ @echo $(DL)GEN $@$(DL) @echo $(DL)/* For NetWare target.$(DL) > $@ @echo $(DL)** Do not edit - created by Make!$(DL) >> $@ @@ -351,6 +353,60 @@ $(NGH2SRC)/lib/config.h : NWGNUmakefile @echo $(DL)#endif /* NGH2_CONFIG_H */$(DL) >> $@ # +# Exports from mod_http2 for mod_proxy_http2 +$(OBJDIR)/mod_http2.imp : NWGNUmod_http2 + @-$(RM) $@ + @echo $(DL)GEN $@$(DL) + @echo $(DL) (HTTP2)$(DL) > $@ + @echo $(DL) http2_module,$(DL) >> $@ + @echo $(DL) h2_ihash_add,$(DL) >> $@ + @echo $(DL) h2_ihash_clear,$(DL) >> $@ + @echo $(DL) h2_ihash_count,$(DL) >> $@ + @echo $(DL) h2_ihash_create,$(DL) >> $@ + @echo $(DL) h2_ihash_is_empty,$(DL) >> $@ + @echo $(DL) h2_ihash_iter,$(DL) >> $@ + @echo $(DL) h2_ihash_remove,$(DL) >> $@ + @echo $(DL) h2_iq_add,$(DL) >> $@ + @echo $(DL) h2_iq_create,$(DL) >> $@ + @echo $(DL) h2_iq_remove,$(DL) >> $@ + @echo $(DL) h2_log2,$(DL) >> $@ + @echo $(DL) h2_proxy_res_ignore_header,$(DL) >> $@ + @echo $(DL) h2_request_create,$(DL) >> $@ + @echo $(DL) h2_request_make,$(DL) >> $@ + @echo $(DL) h2_util_camel_case_header,$(DL) >> $@ + @echo $(DL) h2_util_frame_print,$(DL) >> $@ + @echo $(DL) h2_util_ngheader_make_req,$(DL) >> $@ + @echo $(DL) nghttp2_is_fatal,$(DL) >> $@ + @echo $(DL) nghttp2_option_del,$(DL) >> $@ + @echo $(DL) nghttp2_option_new,$(DL) >> $@ + @echo $(DL) nghttp2_option_set_no_auto_window_update,$(DL) >> $@ + @echo $(DL) nghttp2_option_set_peer_max_concurrent_streams,$(DL) >> $@ + @echo $(DL) nghttp2_session_callbacks_del,$(DL) >> $@ + @echo $(DL) nghttp2_session_callbacks_new,$(DL) >> $@ + @echo $(DL) nghttp2_session_callbacks_set_before_frame_send_callback,$(DL) >> $@ + @echo $(DL) nghttp2_session_callbacks_set_on_data_chunk_recv_callback,$(DL) >> $@ + @echo $(DL) nghttp2_session_callbacks_set_on_frame_recv_callback,$(DL) >> $@ + @echo $(DL) nghttp2_session_callbacks_set_on_header_callback,$(DL) >> $@ + @echo $(DL) nghttp2_session_callbacks_set_on_stream_close_callback,$(DL) >> $@ + @echo $(DL) nghttp2_session_callbacks_set_send_callback,$(DL) >> $@ + @echo $(DL) nghttp2_session_client_new2,$(DL) >> $@ + @echo $(DL) nghttp2_session_consume,$(DL) >> $@ + @echo $(DL) nghttp2_session_del,$(DL) >> $@ + @echo $(DL) nghttp2_session_get_remote_settings,$(DL) >> $@ + @echo $(DL) nghttp2_session_get_stream_user_data,$(DL) >> $@ + @echo $(DL) nghttp2_session_mem_recv,$(DL) >> $@ + @echo $(DL) nghttp2_session_resume_data,$(DL) >> $@ + @echo $(DL) nghttp2_session_send,$(DL) >> $@ + @echo $(DL) nghttp2_session_want_read,$(DL) >> $@ + @echo $(DL) nghttp2_session_want_write,$(DL) >> $@ + @echo $(DL) nghttp2_strerror,$(DL) >> $@ + @echo $(DL) nghttp2_submit_goaway,$(DL) >> $@ + @echo $(DL) nghttp2_submit_request,$(DL) >> $@ + @echo $(DL) nghttp2_submit_rst_stream,$(DL) >> $@ + @echo $(DL) nghttp2_submit_settings,$(DL) >> $@ + @echo $(DL) nghttp2_submit_window_update,$(DL) >> $@ + @echo $(DL) nghttp2_version$(DL) >> $@ + # Include the 'tail' makefile that has targets that depend on variables defined # in this makefile # diff --git a/modules/http2/config2.m4 b/modules/http2/config2.m4 index 2ecb5dda885..e94da22264b 100644 --- a/modules/http2/config2.m4 +++ b/modules/http2/config2.m4 @@ -33,6 +33,7 @@ h2_int_queue.lo dnl h2_io.lo dnl h2_io_set.lo dnl h2_mplx.lo dnl +h2_ngn_shed.lo dnl h2_push.lo dnl h2_request.lo dnl h2_response.lo dnl @@ -200,7 +201,10 @@ is usually linked shared and requires loading. ], $http2_objs, , most, [ ]) # Ensure that other modules can pick up mod_http2.h -# APR_ADDTO(INCLUDES, [-I\$(top_srcdir)/$modpath_current]) +# icing: hold back for now until it is more stable +#APR_ADDTO(INCLUDES, [-I\$(top_srcdir)/$modpath_current]) + + dnl # end of module specific part APACHE_MODPATH_FINISH diff --git a/modules/http2/h2.h b/modules/http2/h2.h index 54294444624..89d174e2376 100644 --- a/modules/http2/h2.h +++ b/modules/http2/h2.h @@ -139,4 +139,9 @@ struct h2_response { }; +/* Note key to attach connection task id to conn_rec/request_rec instances */ + +#define H2_TASK_ID_NOTE "http2-task-id" + + #endif /* defined(__mod_h2__h2__) */ diff --git a/modules/http2/h2_conn.c b/modules/http2/h2_conn.c index b12ee677738..a0cd54e6ac7 100644 --- a/modules/http2/h2_conn.c +++ b/modules/http2/h2_conn.c @@ -239,58 +239,6 @@ apr_status_t h2_conn_pre_close(struct h2_ctx *ctx, conn_rec *c) return status; } -/* This is an internal mpm event.c struct which is disguised - * as a conn_state_t so that mpm_event can have special connection - * state information without changing the struct seen on the outside. - * - * For our task connections we need to create a new beast of this type - * and fill it with enough meaningful things that mpm_event reads and - * starts processing out task request. - */ -typedef struct event_conn_state_t event_conn_state_t; -struct event_conn_state_t { - /** APR_RING of expiration timeouts */ - APR_RING_ENTRY(event_conn_state_t) timeout_list; - /** the expiration time of the next keepalive timeout */ - apr_time_t expiration_time; - /** connection record this struct refers to */ - conn_rec *c; - /** request record (if any) this struct refers to */ - request_rec *r; - /** is the current conn_rec suspended? (disassociated with - * a particular MPM thread; for suspend_/resume_connection - * hooks) - */ - int suspended; - /** memory pool to allocate from */ - apr_pool_t *p; - /** bucket allocator */ - apr_bucket_alloc_t *bucket_alloc; - /** poll file descriptor information */ - apr_pollfd_t pfd; - /** public parts of the connection state */ - conn_state_t pub; -}; -APR_RING_HEAD(timeout_head_t, event_conn_state_t); - -static void fix_event_conn(conn_rec *c, conn_rec *master) -{ - event_conn_state_t *master_cs = ap_get_module_config(master->conn_config, - h2_conn_mpm_module()); - event_conn_state_t *cs = apr_pcalloc(c->pool, sizeof(event_conn_state_t)); - cs->bucket_alloc = apr_bucket_alloc_create(c->pool); - - ap_set_module_config(c->conn_config, h2_conn_mpm_module(), cs); - - cs->c = c; - cs->r = NULL; - cs->p = master_cs->p; - cs->pfd = master_cs->pfd; - cs->pub = master_cs->pub; - cs->pub.state = CONN_STATE_READ_REQUEST_LINE; - - c->cs = &(cs->pub); -} conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *parent, apr_allocator_t *allocator) @@ -356,20 +304,15 @@ conn_rec *h2_slave_create(conn_rec *master, apr_pool_t *parent, ap_set_module_config(c->conn_config, h2_conn_mpm_module(), cfg); } - switch (h2_conn_mpm_type()) { - case H2_MPM_EVENT: - fix_event_conn(c, master); - break; - default: - break; - } - return c; } void h2_slave_destroy(conn_rec *slave, apr_allocator_t **pallocator) { apr_allocator_t *allocator = apr_pool_allocator_get(slave->pool); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, slave, + "h2_slave_conn(%ld): destroy (task=%s)", slave->id, + apr_table_get(slave->notes, H2_TASK_ID_NOTE)); apr_pool_destroy(slave->pool); if (pallocator) { *pallocator = allocator; diff --git a/modules/http2/h2_conn_io.c b/modules/http2/h2_conn_io.c index 4a8375b9401..56d01e6732c 100644 --- a/modules/http2/h2_conn_io.c +++ b/modules/http2/h2_conn_io.c @@ -213,6 +213,16 @@ apr_status_t h2_conn_io_pass(h2_conn_io *io, int flush) return h2_conn_io_flush_int(io, flush, 0); } +apr_status_t h2_conn_io_flush(h2_conn_io *io) +{ + /* make sure we always write a flush, even if our buffers are empty. + * We want to flush not only our buffers, but alse ones further down + * the connection filters. */ + apr_bucket *b = apr_bucket_flush_create(io->connection->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(io->output, b); + return h2_conn_io_flush_int(io, 0, 0); +} + apr_status_t h2_conn_io_consider_pass(h2_conn_io *io) { apr_off_t len = 0; diff --git a/modules/http2/h2_conn_io.h b/modules/http2/h2_conn_io.h index f0243927d60..8d71fffcd7b 100644 --- a/modules/http2/h2_conn_io.h +++ b/modules/http2/h2_conn_io.h @@ -78,6 +78,7 @@ apr_status_t h2_conn_io_write_eoc(h2_conn_io *io, struct h2_session *session); * @param flush if a flush bucket should be appended to any output */ apr_status_t h2_conn_io_pass(h2_conn_io *io, int flush); +apr_status_t h2_conn_io_flush(h2_conn_io *io); /** * Check the amount of buffered output and pass it on if enough has accumulated. diff --git a/modules/http2/h2_io.c b/modules/http2/h2_io.c index a54e8763b76..3f82c60f102 100644 --- a/modules/http2/h2_io.c +++ b/modules/http2/h2_io.c @@ -357,6 +357,7 @@ apr_status_t h2_io_out_readx(h2_io *io, status = h2_util_bb_readx(io->bbout, cb, ctx, plen, peos); if (status == APR_SUCCESS) { io->eos_out_read = *peos; + io->output_consumed += *plen; } } @@ -366,6 +367,8 @@ apr_status_t h2_io_out_readx(h2_io *io, apr_status_t h2_io_out_read_to(h2_io *io, apr_bucket_brigade *bb, apr_off_t *plen, int *peos) { + apr_status_t status; + if (io->rst_error) { return APR_ECONNABORTED; } @@ -382,7 +385,9 @@ apr_status_t h2_io_out_read_to(h2_io *io, apr_bucket_brigade *bb, } io->eos_out_read = *peos = h2_util_has_eos(io->bbout, *plen); - return h2_util_move(bb, io->bbout, *plen, NULL, "h2_io_read_to"); + status = h2_util_move(bb, io->bbout, *plen, NULL, "h2_io_read_to"); + io->output_consumed += *plen; + return status; } static void process_trailers(h2_io *io, apr_table_t *trailers) diff --git a/modules/http2/h2_io.h b/modules/http2/h2_io.h index bfe42a96b47..d92b7eb0d42 100644 --- a/modules/http2/h2_io.h +++ b/modules/http2/h2_io.h @@ -65,6 +65,7 @@ struct h2_io { apr_time_t started_at; /* when processing started */ apr_time_t done_at; /* when processing was done */ apr_size_t input_consumed; /* how many bytes have been read */ + apr_size_t output_consumed; /* how many bytes have been written out */ int files_handles_owned; }; diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 86451ecb567..4d7f63bb52e 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -38,6 +38,7 @@ #include "h2_io_set.h" #include "h2_response.h" #include "h2_mplx.h" +#include "h2_ngn_shed.h" #include "h2_request.h" #include "h2_stream.h" #include "h2_task.h" @@ -143,10 +144,7 @@ static void h2_mplx_destroy(h2_mplx *m) ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): destroy, ios=%d", m->id, (int)h2_io_set_size(m->stream_ios)); - m->aborted = 1; - check_tx_free(m); - if (m->pool) { apr_pool_destroy(m->pool); } @@ -197,16 +195,17 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, return NULL; } - status = apr_thread_cond_create(&m->task_done, m->pool); + status = apr_thread_cond_create(&m->req_added, m->pool); if (status != APR_SUCCESS) { h2_mplx_destroy(m); return NULL; } - m->q = h2_iq_create(m->pool, h2_config_geti(conf, H2_CONF_MAX_STREAMS)); + m->max_streams = h2_config_geti(conf, H2_CONF_MAX_STREAMS); + m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM); + m->q = h2_iq_create(m->pool, m->max_streams); m->stream_ios = h2_io_set_create(m->pool); m->ready_ios = h2_io_set_create(m->pool); - m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM); m->stream_timeout = stream_timeout; m->workers = workers; m->workers_max = workers->max_workers; @@ -217,6 +216,10 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, m->tx_handles_reserved = 0; m->tx_chunk_size = 4; + + m->ngn_shed = h2_ngn_shed_create(m->pool, m->c, m->max_streams, + m->stream_max_mem); + h2_ngn_shed_set_ctx(m->ngn_shed , m); } return m; } @@ -362,7 +365,7 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) h2_mplx_set_consumed_cb(m, NULL, NULL); h2_iq_clear(m->q); - apr_thread_cond_broadcast(m->task_done); + apr_thread_cond_broadcast(m->req_added); while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) { /* iterate until all ios have been orphaned or destroyed */ } @@ -397,10 +400,16 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) h2_io_set_iter(m->stream_ios, stream_print, m); } } - m->aborted = 1; - apr_thread_cond_broadcast(m->task_done); + h2_mplx_abort(m); + apr_thread_cond_broadcast(m->req_added); } } + + if (!h2_io_set_is_empty(m->stream_ios)) { + ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c, + "h2_mplx(%ld): release_join, %d streams still open", + m->id, (int)h2_io_set_size(m->stream_ios)); + } ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056) "h2_mplx(%ld): release_join -> destroy", m->id); leave_mutex(m, acquired); @@ -412,15 +421,13 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) void h2_mplx_abort(h2_mplx *m) { - apr_status_t status; int acquired; AP_DEBUG_ASSERT(m); - if (!m->aborted) { - if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - m->aborted = 1; - leave_mutex(m, acquired); - } + if (!m->aborted && enter_mutex(m, &acquired) == APR_SUCCESS) { + m->aborted = 1; + h2_ngn_shed_abort(m->ngn_shed); + leave_mutex(m, acquired); } } @@ -695,7 +702,8 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams) } static apr_status_t out_write(h2_mplx *m, h2_io *io, - ap_filter_t* f, apr_bucket_brigade *bb, + ap_filter_t* f, int blocking, + apr_bucket_brigade *bb, apr_table_t *trailers, struct apr_thread_cond_t *iowait) { @@ -719,6 +727,9 @@ static apr_status_t out_write(h2_mplx *m, h2_io *io, && iowait && (m->stream_max_mem <= h2_io_out_length(io)) && !is_aborted(m, &status)) { + if (!blocking) { + return APR_INCOMPLETE; + } trailers = NULL; if (f) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, @@ -757,7 +768,12 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response, check_tx_reservation(m); } if (bb) { - status = out_write(m, io, f, bb, response->trailers, iowait); + status = out_write(m, io, f, 0, bb, response->trailers, iowait); + if (status == APR_INCOMPLETE) { + /* write will have transferred as much data as possible. + caller has to deal with non-empty brigade */ + status = APR_SUCCESS; + } } have_out_data_for(m, stream_id); } @@ -791,7 +807,8 @@ apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response, } apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id, - ap_filter_t* f, apr_bucket_brigade *bb, + ap_filter_t* f, int blocking, + apr_bucket_brigade *bb, apr_table_t *trailers, struct apr_thread_cond_t *iowait) { @@ -802,7 +819,7 @@ apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id, if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { h2_io *io = h2_io_set_get(m->stream_ios, stream_id); if (io && !io->orphaned) { - status = out_write(m, io, f, bb, trailers, iowait); + status = out_write(m, io, f, blocking, bb, trailers, iowait); ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, "h2_mplx(%ld-%d): write with trailers=%s", m->id, io->id, trailers? "yes" : "no"); @@ -835,7 +852,7 @@ apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id, apr_table_t *trailers) h2_response *r = h2_response_die(stream_id, APR_EGENERAL, io->request, m->pool); status = out_open(m, stream_id, r, NULL, NULL, NULL); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c, "h2_mplx(%ld-%d): close, no response, no rst", m->id, io->id); } @@ -1070,6 +1087,7 @@ static h2_task *pop_task(h2_mplx *m) conn_rec *slave = h2_slave_create(m->c, m->pool, m->spare_allocator); m->spare_allocator = NULL; task = h2_task_create(m->id, io->request, slave, m); + apr_table_setn(slave->notes, H2_TASK_ID_NOTE, task->id); io->worker_started = 1; io->started_at = apr_time_now(); if (sid > m->max_stream_started) { @@ -1109,14 +1127,20 @@ static void task_done(h2_mplx *m, h2_task *task) { if (task) { if (task->frozen) { - /* this task was handed over to an engine for processing */ + /* this task was handed over to an engine for processing + * and the original worker has finished. That means the + * engine may start processing now. */ h2_task_thaw(task); - /* TODO: can we signal an engine that it can now start on this? */ + /* we do not want the task to block on writing response + * bodies into the mplx. */ + /* FIXME: this implementation is incomplete. */ + h2_task_set_io_blocking(task, 0); + apr_thread_cond_broadcast(m->req_added); } else { h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id); - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, "h2_mplx(%ld): task(%s) done", m->id, task->id); /* clean our references and report request as done. Signal * that we want another unless we have been aborted */ @@ -1124,12 +1148,25 @@ static void task_done(h2_mplx *m, h2_task *task) * long as it has requests to handle. Might no be fair to * other mplx's. Perhaps leave after n requests? */ h2_mplx_out_close(m, task->stream_id, NULL); + + if (task->engine) { + if (!h2_req_engine_is_shutdown(task->engine)) { + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, + "h2_mplx(%ld): task(%s) has not-shutdown " + "engine(%s)", m->id, task->id, + h2_req_engine_get_id(task->engine)); + } + h2_ngn_shed_done_ngn(m->ngn_shed, task->engine); + } + if (m->spare_allocator) { apr_allocator_destroy(m->spare_allocator); m->spare_allocator = NULL; } + h2_slave_destroy(task->c, &m->spare_allocator); task = NULL; + if (io) { apr_time_t now = apr_time_now(); if (!io->orphaned && m->redo_ios @@ -1174,7 +1211,6 @@ static void task_done(h2_mplx *m, h2_task *task) /* hang around until the stream deregisteres */ } } - apr_thread_cond_broadcast(m->task_done); } } } @@ -1337,59 +1373,8 @@ apr_status_t h2_mplx_idle(h2_mplx *m) * HTTP/2 request engines ******************************************************************************/ -typedef struct h2_req_entry h2_req_entry; -struct h2_req_entry { - APR_RING_ENTRY(h2_req_entry) link; - request_rec *r; -}; - -#define H2_REQ_ENTRY_NEXT(e) APR_RING_NEXT((e), link) -#define H2_REQ_ENTRY_PREV(e) APR_RING_PREV((e), link) -#define H2_REQ_ENTRY_REMOVE(e) APR_RING_REMOVE((e), link) - -typedef struct h2_req_engine_i h2_req_engine_i; -struct h2_req_engine_i { - h2_req_engine pub; - conn_rec *c; /* connection this engine is assigned to */ - h2_mplx *m; - unsigned int shutdown : 1; /* engine is being shut down */ - apr_thread_cond_t *io; /* condition var for waiting on data */ - APR_RING_HEAD(h2_req_entries, h2_req_entry) entries; - apr_size_t no_assigned; /* # of assigned requests */ - apr_size_t no_live; /* # of live */ - apr_size_t no_finished; /* # of finished */ -}; - -#define H2_REQ_ENTRIES_SENTINEL(b) APR_RING_SENTINEL((b), h2_req_entry, link) -#define H2_REQ_ENTRIES_EMPTY(b) APR_RING_EMPTY((b), h2_req_entry, link) -#define H2_REQ_ENTRIES_FIRST(b) APR_RING_FIRST(b) -#define H2_REQ_ENTRIES_LAST(b) APR_RING_LAST(b) - -#define H2_REQ_ENTRIES_INSERT_HEAD(b, e) do { \ -h2_req_entry *ap__b = (e); \ -APR_RING_INSERT_HEAD((b), ap__b, h2_req_entry, link); \ -} while (0) - -#define H2_REQ_ENTRIES_INSERT_TAIL(b, e) do { \ -h2_req_entry *ap__b = (e); \ -APR_RING_INSERT_TAIL((b), ap__b, h2_req_entry, link); \ -} while (0) - -static apr_status_t h2_mplx_engine_schedule(h2_mplx *m, - h2_req_engine_i *engine, - request_rec *r) -{ - h2_req_entry *entry = apr_pcalloc(r->pool, sizeof(*entry)); - - APR_RING_ELEM_INIT(entry, link); - entry->r = r; - H2_REQ_ENTRIES_INSERT_TAIL(&engine->entries, entry); - return APR_SUCCESS; -} - - -apr_status_t h2_mplx_engine_push(const char *engine_type, - request_rec *r, h2_mplx_engine_init *einit) +apr_status_t h2_mplx_req_engine_push(const char *ngn_type, + request_rec *r, h2_req_engine_init *einit) { apr_status_t status; h2_mplx *m; @@ -1401,7 +1386,6 @@ apr_status_t h2_mplx_engine_push(const char *engine_type, return APR_ECONNABORTED; } m = task->mplx; - AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id); @@ -1409,227 +1393,68 @@ apr_status_t h2_mplx_engine_push(const char *engine_type, status = APR_ECONNABORTED; } else { - h2_req_engine_i *engine = (h2_req_engine_i*)m->engine; - - apr_table_set(r->connection->notes, H2_TASK_ID_NOTE, task->id); - status = APR_EOF; - - if (task->ser_headers) { - /* Max compatibility, deny processing of this */ - } - else if (engine && !strcmp(engine->pub.type, engine_type)) { - if (engine->shutdown - || engine->no_assigned >= H2MIN(engine->pub.capacity, 100)) { - ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, - "h2_mplx(%ld): engine shutdown or over %s", - m->c->id, engine->pub.id); - engine = NULL; - } - else if (h2_mplx_engine_schedule(m, engine, r) == APR_SUCCESS) { - /* this task will be processed in another thread, - * freeze any I/O for the time being. */ - h2_task_freeze(task, r); - engine->no_assigned++; - status = APR_SUCCESS; - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, - "h2_mplx(%ld): push request %s", - m->c->id, r->the_request); - } - else { - ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, - "h2_mplx(%ld): engine error adding req %s", - m->c->id, engine->pub.id); - engine = NULL; - } - } - - if (!engine && einit) { - engine = apr_pcalloc(task->c->pool, sizeof(*engine)); - engine->pub.id = apr_psprintf(task->c->pool, "eng-%ld-%d", - m->id, m->next_eng_id++); - engine->pub.pool = task->c->pool; - engine->pub.type = apr_pstrdup(task->c->pool, engine_type); - engine->pub.window_bits = 30; - engine->pub.req_window_bits = h2_log2(m->stream_max_mem); - engine->c = r->connection; - APR_RING_INIT(&engine->entries, h2_req_entry, link); - engine->m = m; - engine->io = task->io; - engine->no_assigned = 1; - engine->no_live = 1; - - status = einit(&engine->pub, r); - ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, - "h2_mplx(%ld): init engine %s (%s)", - m->c->id, engine->pub.id, engine->pub.type); - if (status == APR_SUCCESS) { - m->engine = &engine->pub; - } - } + status = h2_ngn_shed_push_req(m->ngn_shed, ngn_type, + task, r, einit); } - leave_mutex(m, acquired); } return status; } -static h2_req_entry *pop_non_frozen(h2_req_engine_i *engine) -{ - h2_req_entry *entry; - h2_task *task; - - for (entry = H2_REQ_ENTRIES_FIRST(&engine->entries); - entry != H2_REQ_ENTRIES_SENTINEL(&engine->entries); - entry = H2_REQ_ENTRY_NEXT(entry)) { - task = h2_ctx_rget_task(entry->r); - AP_DEBUG_ASSERT(task); - if (!task->frozen) { - H2_REQ_ENTRY_REMOVE(entry); - return entry; - } - } - return NULL; -} - -static apr_status_t engine_pull(h2_mplx *m, h2_req_engine_i *engine, - apr_read_type_e block, request_rec **pr) +apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn, + apr_read_type_e block, + apr_uint32_t capacity, + request_rec **pr) { - h2_req_entry *entry; - - AP_DEBUG_ASSERT(m); - AP_DEBUG_ASSERT(engine); - while (1) { - if (m->aborted) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%ld): mplx abort while pulling requests %s", - m->id, engine->pub.id); - *pr = NULL; - return APR_EOF; - } - - if (!H2_REQ_ENTRIES_EMPTY(&engine->entries) - && (entry = pop_non_frozen(engine))) { - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, entry->r, - "h2_mplx(%ld): request %s pulled by engine %s", - m->c->id, entry->r->the_request, engine->pub.id); - engine->no_live++; - entry->r->connection->current_thread = engine->c->current_thread; - *pr = entry->r; - return APR_SUCCESS; - } - else if (APR_NONBLOCK_READ == block) { - *pr = NULL; - return APR_EAGAIN; - } - else if (H2_REQ_ENTRIES_EMPTY(&engine->entries)) { - engine->shutdown = 1; - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): emtpy queue, shutdown engine %s", - m->id, engine->pub.id); - *pr = NULL; - return APR_EOF; - } - apr_thread_cond_timedwait(m->task_done, m->lock, - apr_time_from_msec(100)); - } -} - -apr_status_t h2_mplx_engine_pull(h2_req_engine *pub_engine, - apr_read_type_e block, request_rec **pr) -{ - h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine; - h2_mplx *m = engine->m; + h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn); + h2_mplx *m = h2_ngn_shed_get_ctx(shed); apr_status_t status; int acquired; *pr = NULL; if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - status = engine_pull(m, engine, block, pr); + int want_shutdown = (block == APR_BLOCK_READ); + if (want_shutdown && !h2_iq_empty(m->q)) { + /* For a blocking read, check first if requests are to be + * had and, if not, wait a short while before doing the + * blocking, and if unsuccessful, terminating read. + */ + status = h2_ngn_shed_pull_req(shed, ngn, capacity, 1, pr); + if (APR_STATUS_IS_EAGAIN(status)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld): start block engine pull", m->id); + apr_thread_cond_timedwait(m->req_added, m->lock, + apr_time_from_msec(20)); + status = h2_ngn_shed_pull_req(shed, ngn, capacity, 1, pr); + } + } + else { + status = h2_ngn_shed_pull_req(shed, ngn, capacity, want_shutdown, pr); + } leave_mutex(m, acquired); } return status; } -static void engine_done(h2_mplx *m, h2_req_engine_i *engine, h2_task *task, - int waslive, int aborted) +void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn) { - int acquired; - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, - "h2_mplx(%ld): task %s %s by %s", - m->id, task->id, aborted? "aborted":"done", - engine->pub.id); - h2_task_output_close(task->output); - engine->no_finished++; - if (waslive) engine->no_live--; - engine->no_assigned--; - if (task->c != engine->c) { /* do not release what the engine runs on */ + h2_task *task = h2_ctx_cget_task(r_conn); + + if (task) { + h2_mplx *m = task->mplx; + int acquired; + if (enter_mutex(m, &acquired) == APR_SUCCESS) { - task_done(m, task); + h2_ngn_shed_done_task(m->ngn_shed, ngn, task); + if (task->engine) { + /* cannot report that as done until engine returns */ + } + else { + h2_task_output_close(task->output); + task_done(m, task); + } leave_mutex(m, acquired); } } } -void h2_mplx_engine_done(h2_req_engine *pub_engine, conn_rec *r_conn) -{ - h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine; - h2_mplx *m = engine->m; - h2_task *task; - int acquired; - - task = h2_ctx_cget_task(r_conn); - if (task && (enter_mutex(m, &acquired) == APR_SUCCESS)) { - engine_done(m, engine, task, 1, 0); - leave_mutex(m, acquired); - } -} - -void h2_mplx_engine_exit(h2_req_engine *pub_engine) -{ - h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine; - h2_mplx *m = engine->m; - int acquired; - - if (enter_mutex(m, &acquired) == APR_SUCCESS) { - if (!m->aborted - && !H2_REQ_ENTRIES_EMPTY(&engine->entries)) { - h2_req_entry *entry; - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, - "h2_mplx(%ld): exit engine %s (%s), " - "has still requests queued, shutdown=%d," - "assigned=%ld, live=%ld, finished=%ld", - m->c->id, engine->pub.id, engine->pub.type, - engine->shutdown, - (long)engine->no_assigned, (long)engine->no_live, - (long)engine->no_finished); - for (entry = H2_REQ_ENTRIES_FIRST(&engine->entries); - entry != H2_REQ_ENTRIES_SENTINEL(&engine->entries); - entry = H2_REQ_ENTRY_NEXT(entry)) { - request_rec *r = entry->r; - h2_task *task = h2_ctx_rget_task(r); - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, - "h2_mplx(%ld): engine %s has queued task %s, " - "frozen=%d, aborting", - m->c->id, engine->pub.id, task->id, task->frozen); - engine_done(m, engine, task, 0, 1); - } - } - if (!m->aborted && (engine->no_assigned > 1 || engine->no_live > 1)) { - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, - "h2_mplx(%ld): exit engine %s (%s), " - "assigned=%ld, live=%ld, finished=%ld", - m->c->id, engine->pub.id, engine->pub.type, - (long)engine->no_assigned, (long)engine->no_live, - (long)engine->no_finished); - } - else { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): exit engine %s (%s)", - m->c->id, engine->pub.id, engine->pub.type); - } - if (m->engine == &engine->pub) { - m->engine = NULL; /* TODO */ - } - leave_mutex(m, acquired); - } -} diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index 4d6ce7c0d5a..a61a63891ae 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -47,6 +47,7 @@ struct h2_io_set; struct apr_thread_cond_t; struct h2_workers; struct h2_int_queue; +struct h2_ngn_shed; struct h2_req_engine; #include @@ -75,19 +76,20 @@ struct h2_mplx { struct h2_io_set *ready_ios; struct h2_io_set *redo_ios; - int max_stream_started; /* highest stream id that started processing */ - int workers_busy; /* # of workers processing on this mplx */ - int workers_limit; /* current # of workers limit, dynamic */ - int workers_def_limit; /* default # of workers limit */ - int workers_max; /* max, hard limit # of workers in a process */ - apr_time_t last_idle_block; /* last time, this mplx entered IDLE while - * streams were ready */ - apr_time_t last_limit_change;/* last time, worker limit changed */ + apr_uint32_t max_streams; /* max # of concurrent streams */ + apr_uint32_t max_stream_started; /* highest stream id that started processing */ + apr_uint32_t workers_busy; /* # of workers processing on this mplx */ + apr_uint32_t workers_limit; /* current # of workers limit, dynamic */ + apr_uint32_t workers_def_limit; /* default # of workers limit */ + apr_uint32_t workers_max; /* max, hard limit # of workers in a process */ + apr_time_t last_idle_block; /* last time, this mplx entered IDLE while + * streams were ready */ + apr_time_t last_limit_change; /* last time, worker limit changed */ apr_interval_time_t limit_change_interval; apr_thread_mutex_t *lock; struct apr_thread_cond_t *added_output; - struct apr_thread_cond_t *task_done; + struct apr_thread_cond_t *req_added; struct apr_thread_cond_t *join_wait; apr_size_t stream_max_mem; @@ -102,11 +104,8 @@ struct h2_mplx { h2_mplx_consumed_cb *input_consumed; void *input_consumed_ctx; - - struct h2_req_engine *engine; - /* TODO: signal for waiting tasks*/ - apr_queue_t *engine_queue; - int next_eng_id; + + struct h2_ngn_shed *ngn_shed; }; @@ -308,12 +307,16 @@ apr_status_t h2_mplx_out_open(h2_mplx *mplx, int stream_id, * of bytes buffered reaches configured max. * @param stream_id the stream identifier * @param filter the apache filter context of the data + * @param blocking == 0 iff call should return with APR_INCOMPLETE if + * the full brigade cannot be written at once * @param bb the bucket brigade to append * @param trailers optional trailers for response, maybe NULL * @param iowait a conditional used for block/signalling in h2_mplx */ apr_status_t h2_mplx_out_write(h2_mplx *mplx, int stream_id, - ap_filter_t* filter, apr_bucket_brigade *bb, + ap_filter_t* filter, + int blocking, + apr_bucket_brigade *bb, apr_table_t *trailers, struct apr_thread_cond_t *iowait); @@ -408,20 +411,23 @@ APR_RING_INSERT_TAIL((b), ap__b, h2_mplx, link); \ apr_status_t h2_mplx_idle(h2_mplx *m); /******************************************************************************* - * h2_mplx h2_req_engine handling. + * h2_req_engine handling ******************************************************************************/ - -typedef apr_status_t h2_mplx_engine_init(struct h2_req_engine *engine, - request_rec *r); - -apr_status_t h2_mplx_engine_push(const char *engine_type, - request_rec *r, h2_mplx_engine_init *einit); - -apr_status_t h2_mplx_engine_pull(struct h2_req_engine *engine, - apr_read_type_e block, request_rec **pr); - -void h2_mplx_engine_done(struct h2_req_engine *engine, conn_rec *r_conn); - -void h2_mplx_engine_exit(struct h2_req_engine *engine); + +typedef apr_status_t h2_mplx_req_engine_init(struct h2_req_engine *engine, + const char *id, + const char *type, + apr_pool_t *pool, + apr_uint32_t req_buffer_size, + request_rec *r); + +apr_status_t h2_mplx_req_engine_push(const char *ngn_type, + request_rec *r, + h2_mplx_req_engine_init *einit); +apr_status_t h2_mplx_req_engine_pull(struct h2_req_engine *ngn, + apr_read_type_e block, + apr_uint32_t capacity, + request_rec **pr); +void h2_mplx_req_engine_done(struct h2_req_engine *ngn, conn_rec *r_conn); #endif /* defined(__mod_h2__h2_mplx__) */ diff --git a/modules/http2/h2_ngn_shed.c b/modules/http2/h2_ngn_shed.c new file mode 100644 index 00000000000..5b97cf914d2 --- /dev/null +++ b/modules/http2/h2_ngn_shed.c @@ -0,0 +1,330 @@ +/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include + +#include "mod_http2.h" + +#include "h2_private.h" +#include "h2_config.h" +#include "h2_conn.h" +#include "h2_ctx.h" +#include "h2_h2.h" +#include "h2_int_queue.h" +#include "h2_response.h" +#include "h2_request.h" +#include "h2_task.h" +#include "h2_task_output.h" +#include "h2_util.h" +#include "h2_ngn_shed.h" + + +typedef struct h2_ngn_entry h2_ngn_entry; +struct h2_ngn_entry { + APR_RING_ENTRY(h2_ngn_entry) link; + h2_task *task; + request_rec *r; +}; + +#define H2_NGN_ENTRY_NEXT(e) APR_RING_NEXT((e), link) +#define H2_NGN_ENTRY_PREV(e) APR_RING_PREV((e), link) +#define H2_NGN_ENTRY_REMOVE(e) APR_RING_REMOVE((e), link) + +#define H2_REQ_ENTRIES_SENTINEL(b) APR_RING_SENTINEL((b), h2_ngn_entry, link) +#define H2_REQ_ENTRIES_EMPTY(b) APR_RING_EMPTY((b), h2_ngn_entry, link) +#define H2_REQ_ENTRIES_FIRST(b) APR_RING_FIRST(b) +#define H2_REQ_ENTRIES_LAST(b) APR_RING_LAST(b) + +#define H2_REQ_ENTRIES_INSERT_HEAD(b, e) do { \ +h2_ngn_entry *ap__b = (e); \ +APR_RING_INSERT_HEAD((b), ap__b, h2_ngn_entry, link); \ +} while (0) + +#define H2_REQ_ENTRIES_INSERT_TAIL(b, e) do { \ +h2_ngn_entry *ap__b = (e); \ +APR_RING_INSERT_TAIL((b), ap__b, h2_ngn_entry, link); \ +} while (0) + +struct h2_req_engine { + const char *id; /* identifier */ + const char *type; /* name of the engine type */ + apr_pool_t *pool; /* pool for engine specific allocations */ + conn_rec *c; /* connection this engine is assigned to */ + h2_task *task; /* the task this engine is base on, running in */ + h2_ngn_shed *shed; + + unsigned int shutdown : 1; /* engine is being shut down */ + unsigned int done : 1; /* engine has finished */ + + APR_RING_HEAD(h2_req_entries, h2_ngn_entry) entries; + apr_uint32_t capacity; /* maximum concurrent requests */ + apr_uint32_t no_assigned; /* # of assigned requests */ + apr_uint32_t no_live; /* # of live */ + apr_uint32_t no_finished; /* # of finished */ +}; + +const char *h2_req_engine_get_id(h2_req_engine *engine) +{ + return engine->id; +} + +int h2_req_engine_is_shutdown(h2_req_engine *engine) +{ + return engine->shutdown; +} + +h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c, + apr_uint32_t default_capacity, + apr_uint32_t req_buffer_size) +{ + h2_ngn_shed *shed; + + shed = apr_pcalloc(pool, sizeof(*shed)); + shed->c = c; + shed->pool = pool; + shed->default_capacity = default_capacity; + shed->req_buffer_size = req_buffer_size; + shed->ngns = apr_hash_make(pool); + + return shed; +} + +void h2_ngn_shed_set_ctx(h2_ngn_shed *shed, void *user_ctx) +{ + shed->user_ctx = user_ctx; +} + +void *h2_ngn_shed_get_ctx(h2_ngn_shed *shed) +{ + return shed->user_ctx; +} + +h2_ngn_shed *h2_ngn_shed_get_shed(h2_req_engine *ngn) +{ + return ngn->shed; +} + +void h2_ngn_shed_abort(h2_ngn_shed *shed) +{ + shed->aborted = 1; +} + +static void ngn_add_req(h2_req_engine *ngn, h2_task *task, request_rec *r) +{ + h2_ngn_entry *entry = apr_pcalloc(task->c->pool, sizeof(*entry)); + APR_RING_ELEM_INIT(entry, link); + entry->task = task; + entry->r = r; + H2_REQ_ENTRIES_INSERT_TAIL(&ngn->entries, entry); +} + + +apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type, + h2_task *task, request_rec *r, + h2_req_engine_init *einit){ + h2_req_engine *ngn; + + AP_DEBUG_ASSERT(shed); + + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c, + "h2_ngn_shed(%ld): PUSHing request (task=%s)", shed->c->id, + apr_table_get(r->connection->notes, H2_TASK_ID_NOTE)); + if (task->ser_headers) { + /* Max compatibility, deny processing of this */ + return APR_EOF; + } + + ngn = apr_hash_get(shed->ngns, ngn_type, APR_HASH_KEY_STRING); + if (ngn && !ngn->shutdown) { + /* this task will be processed in another thread, + * freeze any I/O for the time being. */ + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, + "h2_ngn_shed(%ld): pushing request %s to %s", + shed->c->id, task->id, ngn->id); + if (!h2_task_is_detached(task)) { + h2_task_freeze(task, r); + } + /* FIXME: sometimes ngn is garbage, probly alread freed */ + ngn_add_req(ngn, task, r); + ngn->no_assigned++; + return APR_SUCCESS; + } + + /* no existing engine or being shut down, start a new one */ + if (einit) { + apr_status_t status; + apr_pool_t *pool = task->c->pool; + h2_req_engine *newngn; + + newngn = apr_pcalloc(pool, sizeof(*ngn)); + newngn->pool = pool; + newngn->id = apr_psprintf(pool, "ngn-%s", task->id); + newngn->type = apr_pstrdup(pool, ngn_type); + newngn->c = task->c; + newngn->shed = shed; + newngn->capacity = shed->default_capacity; + newngn->no_assigned = 1; + newngn->no_live = 1; + APR_RING_INIT(&newngn->entries, h2_ngn_entry, link); + + status = einit(newngn, newngn->id, newngn->type, newngn->pool, + shed->req_buffer_size, r); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, task->c, + "h2_ngn_shed(%ld): create engine %s (%s)", + shed->c->id, newngn->id, newngn->type); + if (status == APR_SUCCESS) { + AP_DEBUG_ASSERT(task->engine == NULL); + newngn->task = task; + task->engine = newngn; + apr_hash_set(shed->ngns, newngn->type, APR_HASH_KEY_STRING, newngn); + } + return status; + } + return APR_EOF; +} + +static h2_ngn_entry *pop_non_frozen(h2_req_engine *ngn) +{ + h2_ngn_entry *entry; + for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries); + entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries); + entry = H2_NGN_ENTRY_NEXT(entry)) { + if (!entry->task->frozen) { + H2_NGN_ENTRY_REMOVE(entry); + return entry; + } + } + return NULL; +} + +apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed, + h2_req_engine *ngn, + apr_uint32_t capacity, + int want_shutdown, + request_rec **pr) +{ + h2_ngn_entry *entry; + + AP_DEBUG_ASSERT(ngn); + *pr = NULL; + if (shed->aborted) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, shed->c, + "h2_ngn_shed(%ld): abort while pulling requests %s", + shed->c->id, ngn->id); + ngn->shutdown = 1; + return APR_ECONNABORTED; + } + + ngn->capacity = capacity; + if (H2_REQ_ENTRIES_EMPTY(&ngn->entries)) { + if (want_shutdown) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c, + "h2_ngn_shed(%ld): emtpy queue, shutdown engine %s", + shed->c->id, ngn->id); + ngn->shutdown = 1; + } + return ngn->shutdown? APR_EOF : APR_EAGAIN; + } + + if ((entry = pop_non_frozen(ngn))) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, entry->task->c, + "h2_ngn_shed(%ld): pulled request %s for engine %s", + shed->c->id, entry->task->id, ngn->id); + ngn->no_live++; + *pr = entry->r; + return APR_SUCCESS; + } + return APR_EAGAIN; +} + +static apr_status_t ngn_done_task(h2_ngn_shed *shed, h2_req_engine *ngn, + h2_task *task, int waslive, int aborted, + int close) +{ + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, shed->c, + "h2_ngn_shed(%ld): task %s %s by %s", + shed->c->id, task->id, aborted? "aborted":"done", ngn->id); + ngn->no_finished++; + if (waslive) ngn->no_live--; + ngn->no_assigned--; + + if (close) { + h2_task_output_close(task->output); + } + return APR_SUCCESS; +} + +apr_status_t h2_ngn_shed_done_task(h2_ngn_shed *shed, + struct h2_req_engine *ngn, h2_task *task) +{ + return ngn_done_task(shed, ngn, task, 1, 0, 0); +} + +void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn) +{ + if (ngn->done) { + return; + } + + if (!shed->aborted && !H2_REQ_ENTRIES_EMPTY(&ngn->entries)) { + h2_ngn_entry *entry; + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c, + "h2_ngn_shed(%ld): exit engine %s (%s), " + "has still requests queued, shutdown=%d," + "assigned=%ld, live=%ld, finished=%ld", + shed->c->id, ngn->id, ngn->type, + ngn->shutdown, + (long)ngn->no_assigned, (long)ngn->no_live, + (long)ngn->no_finished); + for (entry = H2_REQ_ENTRIES_FIRST(&ngn->entries); + entry != H2_REQ_ENTRIES_SENTINEL(&ngn->entries); + entry = H2_NGN_ENTRY_NEXT(entry)) { + request_rec *r = entry->r; + h2_task *task = h2_ctx_rget_task(r); + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c, + "h2_ngn_shed(%ld): engine %s has queued task %s, " + "frozen=%d, aborting", + shed->c->id, ngn->id, task->id, task->frozen); + ngn_done_task(shed, ngn, task, 0, 1, 1); + } + } + if (!shed->aborted && (ngn->no_assigned > 1 || ngn->no_live > 1)) { + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, shed->c, + "h2_ngn_shed(%ld): exit engine %s (%s), " + "assigned=%ld, live=%ld, finished=%ld", + shed->c->id, ngn->id, ngn->type, + (long)ngn->no_assigned, (long)ngn->no_live, + (long)ngn->no_finished); + } + else { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, shed->c, + "h2_ngn_shed(%ld): exit engine %s", + shed->c->id, ngn->id); + } + + apr_hash_set(shed->ngns, ngn->type, APR_HASH_KEY_STRING, NULL); + ngn->done = 1; +} diff --git a/modules/http2/h2_ngn_shed.h b/modules/http2/h2_ngn_shed.h new file mode 100644 index 00000000000..3dc9e375ef1 --- /dev/null +++ b/modules/http2/h2_ngn_shed.h @@ -0,0 +1,71 @@ +/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef h2_req_shed_h +#define h2_req_shed_h + +struct h2_req_engine; +struct h2_task; + +typedef struct h2_ngn_shed h2_ngn_shed; +struct h2_ngn_shed { + conn_rec *c; + apr_pool_t *pool; + apr_hash_t *ngns; + void *user_ctx; + + unsigned int aborted : 1; + + apr_uint32_t default_capacity; + apr_uint32_t req_buffer_size; /* preferred buffer size for responses */ +}; + +const char *h2_req_engine_get_id(h2_req_engine *engine); +int h2_req_engine_is_shutdown(h2_req_engine *engine); + +typedef apr_status_t h2_shed_ngn_init(h2_req_engine *engine, + const char *id, + const char *type, + apr_pool_t *pool, + apr_uint32_t req_buffer_size, + request_rec *r); + +h2_ngn_shed *h2_ngn_shed_create(apr_pool_t *pool, conn_rec *c, + apr_uint32_t default_capactiy, + apr_uint32_t req_buffer_size); + +void h2_ngn_shed_set_ctx(h2_ngn_shed *shed, void *user_ctx); +void *h2_ngn_shed_get_ctx(h2_ngn_shed *shed); + +h2_ngn_shed *h2_ngn_shed_get_shed(struct h2_req_engine *ngn); + +void h2_ngn_shed_abort(h2_ngn_shed *shed); + +apr_status_t h2_ngn_shed_push_req(h2_ngn_shed *shed, const char *ngn_type, + struct h2_task *task, request_rec *r, + h2_shed_ngn_init *init_cb); + +apr_status_t h2_ngn_shed_pull_req(h2_ngn_shed *shed, h2_req_engine *pub_ngn, + apr_uint32_t capacity, + int want_shutdown, request_rec **pr); + +apr_status_t h2_ngn_shed_done_task(h2_ngn_shed *shed, + struct h2_req_engine *ngn, + struct h2_task *task); + +void h2_ngn_shed_done_ngn(h2_ngn_shed *shed, struct h2_req_engine *ngn); + + +#endif /* h2_req_shed_h */ diff --git a/modules/http2/h2_request.c b/modules/http2/h2_request.c index 2767ef538a4..18509dfc125 100644 --- a/modules/http2/h2_request.c +++ b/modules/http2/h2_request.c @@ -451,6 +451,9 @@ request_rec *h2_request_create_rec(const h2_request *req, conn_rec *conn) /* Request check post hooks failed. An example of this would be a * request for a vhost where h2 is disabled --> 421. */ + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, conn, APLOGNO() + "h2_request(%d): access_status=%d, request_create failed", + req->id, access_status); ap_die(access_status, r); ap_update_child_status(conn->sbh, SERVER_BUSY_LOG, r); ap_run_log_transaction(r); diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index 855e8b9c855..d99573850d5 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -597,15 +597,6 @@ static int on_frame_send_cb(nghttp2_session *ngh2, (long)session->frames_sent); } ++session->frames_sent; - switch (frame->hd.type) { - case NGHTTP2_HEADERS: - case NGHTTP2_DATA: - /* no explicit flushing necessary */ - break; - default: - session->flush = 1; - break; - } return 0; } @@ -1570,7 +1561,7 @@ static apr_status_t h2_session_read(h2_session *session, int block) } else { /* uncommon status, log on INFO so that we see this */ - ap_log_cerror( APLOG_MARK, APLOG_INFO, status, c, + ap_log_cerror( APLOG_MARK, APLOG_DEBUG, status, c, APLOGNO(02950) "h2_session(%ld): error reading, terminating", session->id); @@ -1754,7 +1745,7 @@ static void h2_session_ev_conn_error(h2_session *session, int arg, const char *m break; default: - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, "h2_session(%ld): conn error -> shutdown", session->id); h2_session_shutdown(session, arg, msg, 0); break; @@ -1771,7 +1762,7 @@ static void h2_session_ev_proto_error(h2_session *session, int arg, const char * break; default: - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, "h2_session(%ld): proto error -> shutdown", session->id); h2_session_shutdown(session, arg, msg, 0); break; @@ -1810,12 +1801,14 @@ static void h2_session_ev_no_io(h2_session *session, int arg, const char *msg) transit(session, "no io", H2_SESSION_ST_DONE); } else { + apr_time_t now = apr_time_now(); /* When we have no streams, no task event are possible, * switch to blocking reads */ transit(session, "no io", H2_SESSION_ST_IDLE); session->idle_until = (session->requests_received? session->s->keep_alive_timeout : - session->s->timeout) + apr_time_now(); + session->s->timeout) + now; + session->keep_sync_until = now + apr_time_from_sec(1); } } else if (!has_unsubmitted_streams(session) @@ -1826,6 +1819,7 @@ static void h2_session_ev_no_io(h2_session *session, int arg, const char *msg) * window updates. */ transit(session, "no io", H2_SESSION_ST_IDLE); session->idle_until = apr_time_now() + session->s->timeout; + session->keep_sync_until = session->idle_until; } else { /* Unable to do blocking reads, as we wait on events from @@ -2021,7 +2015,10 @@ apr_status_t h2_session_process(h2_session *session, int async) no_streams = h2_ihash_is_empty(session->streams); update_child_status(session, (no_streams? SERVER_BUSY_KEEPALIVE : SERVER_BUSY_READ), "idle"); - if (async && no_streams && !session->r && session->requests_received) { + /* make certain, the client receives everything before we idle */ + h2_conn_io_flush(&session->io); + if (!session->keep_sync_until + && async && no_streams && !session->r && session->requests_received) { ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c, "h2_session(%ld): async idle, nonblock read", session->id); /* We do not return to the async mpm immediately, since under @@ -2074,7 +2071,13 @@ apr_status_t h2_session_process(h2_session *session, int async) /* nothing to read */ } else if (APR_STATUS_IS_TIMEUP(status)) { - if (apr_time_now() > session->idle_until) { + apr_time_t now = apr_time_now(); + if (now > session->keep_sync_until) { + /* if we are on an async mpm, now is the time that + * we may dare to pass control to it. */ + session->keep_sync_until = 0; + } + if (now > session->idle_until) { dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, "timeout"); } /* continue reading handling */ @@ -2164,7 +2167,8 @@ apr_status_t h2_session_process(h2_session *session, int async) /* waited long enough */ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, APR_TIMEUP, c, "h2_session: wait for data"); - dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, NULL); + dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, "timeout"); + break; } else { /* repeating, increase timer for graceful backoff */ @@ -2176,6 +2180,8 @@ apr_status_t h2_session_process(h2_session *session, int async) "h2_session: wait for data, %ld micros", (long)session->wait_us); } + /* make certain, the client receives everything before we idle */ + h2_conn_io_flush(&session->io); status = h2_mplx_out_trywait(session->mplx, session->wait_us, session->iowait); if (status == APR_SUCCESS) { @@ -2187,7 +2193,11 @@ apr_status_t h2_session_process(h2_session *session, int async) transit(session, "wait cycle", H2_SESSION_ST_BUSY); } else { - h2_session_shutdown(session, H2_ERR_INTERNAL_ERROR, "cond wait error", 0); + ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, c, + "h2_session(%ld): waiting on conditional", + session->id); + h2_session_shutdown(session, H2_ERR_INTERNAL_ERROR, + "cond wait error", 0); } break; @@ -2212,8 +2222,7 @@ apr_status_t h2_session_process(h2_session *session, int async) } out: - h2_conn_io_pass(&session->io, session->flush); - session->flush = 0; + h2_conn_io_flush(&session->io); ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c, "h2_session(%ld): [%s] process returns", diff --git a/modules/http2/h2_session.h b/modules/http2/h2_session.h index d07ae4cc081..fa98bf91869 100644 --- a/modules/http2/h2_session.h +++ b/modules/http2/h2_session.h @@ -108,6 +108,7 @@ typedef struct h2_session { apr_time_t start_wait; /* Time we started waiting for sth. to happen */ apr_time_t idle_until; /* Time we shut down due to sheer boredom */ + apr_time_t keep_sync_until; /* Time we sync wait until passing to async mpm */ apr_pool_t *pool; /* pool to use in session handling */ apr_bucket_brigade *bbtmp; /* brigade for keeping temporary data */ diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c index 06543d670c8..7b1aa8df67e 100644 --- a/modules/http2/h2_task.c +++ b/modules/http2/h2_task.c @@ -93,16 +93,16 @@ static apr_status_t h2_response_freeze_filter(ap_filter_t* f, AP_DEBUG_ASSERT(task); if (task->frozen) { - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, f->r, + ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, f->r, "h2_response_freeze_filter, saving"); - return ap_save_brigade(f, &task->frozen_out, &bb, task->c->pool); + return ap_save_brigade(f, &task->output->frozen_bb, &bb, task->c->pool); } if (APR_BRIGADE_EMPTY(bb)) { return APR_SUCCESS; } - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, f->r, + ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, f->r, "h2_response_freeze_filter, passing"); return ap_pass_brigade(f->next, bb); } @@ -197,12 +197,18 @@ h2_task *h2_task_create(long session_id, const h2_request *req, task->request = req; task->input_eos = !req->body; task->ser_headers = req->serialize; + task->blocking = 1; h2_ctx_create_for(c, task); return task; } +void h2_task_set_io_blocking(h2_task *task, int blocking) +{ + task->blocking = blocking; +} + apr_status_t h2_task_do(h2_task *task, apr_thread_cond_t *cond) { apr_status_t status; @@ -212,6 +218,8 @@ apr_status_t h2_task_do(h2_task *task, apr_thread_cond_t *cond) task->input = h2_task_input_create(task, task->c); task->output = h2_task_output_create(task, task->c); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, + "h2_task(%s): process connection", task->id); ap_process_connection(task->c, ap_get_conn_socket(task->c)); if (task->frozen) { @@ -236,6 +244,8 @@ static apr_status_t h2_task_process_request(h2_task *task, conn_rec *c) conn_state_t *cs = c->cs; request_rec *r; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, + "h2_task(%s): create request_rec", task->id); r = h2_request_create_rec(req, c); if (r && (r->status == HTTP_OK)) { ap_update_child_status(c->sbh, SERVER_BUSY_READ, r); @@ -264,6 +274,15 @@ static apr_status_t h2_task_process_request(h2_task *task, conn_rec *c) cs->state = CONN_STATE_WRITE_COMPLETION; r = NULL; } + else if (!r) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, + "h2_task(%s): create request_rec failed, r=NULL", task->id); + } + else { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, + "h2_task(%s): create request_rec failed, r->status=%d", + task->id, r->status); + } c->sbh = NULL; return APR_SUCCESS; @@ -280,14 +299,18 @@ static int h2_task_process_conn(conn_rec* c) ctx = h2_ctx_get(c, 0); if (h2_ctx_is_task(ctx)) { if (!ctx->task->ser_headers) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "h2_h2, processing request directly"); h2_task_process_request(ctx->task, c); return DONE; } - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "h2_task(%s), serialized handling", ctx->task->id); } + else { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, + "slave_conn(%ld): has no task", c->id); + } return DECLINED; } @@ -297,7 +320,7 @@ apr_status_t h2_task_freeze(h2_task *task, request_rec *r) conn_rec *c = task->c; task->frozen = 1; - task->frozen_out = apr_brigade_create(c->pool, c->bucket_alloc); + task->output->frozen_bb = apr_brigade_create(c->pool, c->bucket_alloc); ap_add_output_filter("H2_RESPONSE_FREEZE", task, r, r->connection); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, "h2_task(%s), frozen", task->id); @@ -312,6 +335,11 @@ apr_status_t h2_task_thaw(h2_task *task) ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->c, "h2_task(%s), thawed", task->id); } + task->detached = 1; return APR_SUCCESS; } +int h2_task_is_detached(h2_task *task) +{ + return task->detached; +} diff --git a/modules/http2/h2_task.h b/modules/http2/h2_task.h index 24bde946f30..c4c1c13d1dc 100644 --- a/modules/http2/h2_task.h +++ b/modules/http2/h2_task.h @@ -41,6 +41,7 @@ struct apr_thread_cond_t; struct h2_conn; struct h2_mplx; struct h2_task; +struct h2_req_engine; struct h2_request; struct h2_resp_head; struct h2_worker; @@ -58,12 +59,14 @@ struct h2_task { unsigned int input_eos : 1; unsigned int ser_headers : 1; unsigned int frozen : 1; + unsigned int blocking : 1; + unsigned int detached : 1; struct h2_task_input *input; struct h2_task_output *output; struct apr_thread_cond_t *io; /* used to wait for events on */ - - apr_bucket_brigade *frozen_out; + + struct h2_req_engine *engine; }; h2_task *h2_task_create(long session_id, const struct h2_request *req, @@ -82,5 +85,8 @@ extern APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *h2_task_logio_add_bytes_out apr_status_t h2_task_freeze(h2_task *task, request_rec *r); apr_status_t h2_task_thaw(h2_task *task); +int h2_task_is_detached(h2_task *task); + +void h2_task_set_io_blocking(h2_task *task, int blocking); #endif /* defined(__mod_h2__h2_task__) */ diff --git a/modules/http2/h2_task_output.c b/modules/http2/h2_task_output.c index 1a2bd863315..025c1398732 100644 --- a/modules/http2/h2_task_output.c +++ b/modules/http2/h2_task_output.c @@ -77,14 +77,14 @@ static apr_status_t open_if_needed(h2_task_output *output, ap_filter_t *f, if (f) { /* This happens currently when ap_die(status, r) is invoked * by a read request filter. */ - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, APLOGNO(03204) + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03204) "h2_task_output(%s): write without response by %s " "for %s %s %s", output->task->id, caller, output->task->request->method, output->task->request->authority, output->task->request->path); - f->c->aborted = 1; + output->c->aborted = 1; } if (output->task->io) { apr_thread_cond_broadcast(output->task->io); @@ -94,37 +94,48 @@ static apr_status_t open_if_needed(h2_task_output *output, ap_filter_t *f, if (h2_task_logio_add_bytes_out) { /* counter headers as if we'd do a HTTP/1.1 serialization */ - /* TODO: counter a virtual status line? */ - apr_off_t bytes_written; - apr_brigade_length(bb, 0, &bytes_written); - bytes_written += h2_util_table_bytes(response->headers, 3)+1; - h2_task_logio_add_bytes_out(f->c, bytes_written); + output->written = h2_util_table_bytes(response->headers, 3)+1; + h2_task_logio_add_bytes_out(output->c, output->written); } get_trailers(output); - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, f->c, APLOGNO(03348) - "h2_task_output(%s): open as needed %s %s %s", + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, output->c, APLOGNO(03348) + "h2_task(%s): open response to %s %s %s", output->task->id, output->task->request->method, output->task->request->authority, output->task->request->path); return h2_mplx_out_open(output->task->mplx, output->task->stream_id, response, f, bb, output->task->io); } - return APR_EOF; + return APR_SUCCESS; } -void h2_task_output_close(h2_task_output *output) +static apr_status_t write_brigade_raw(h2_task_output *output, + ap_filter_t* f, apr_bucket_brigade* bb) { - open_if_needed(output, NULL, NULL, "close"); - if (output->state != H2_TASK_OUT_DONE) { - if (output->task->frozen_out - && !APR_BRIGADE_EMPTY(output->task->frozen_out)) { - h2_mplx_out_write(output->task->mplx, output->task->stream_id, - NULL, output->task->frozen_out, NULL, NULL); + apr_off_t written, left; + apr_status_t status; + + apr_brigade_length(bb, 0, &written); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, output->c, + "h2_task(%s): write response body (%ld bytes)", + output->task->id, (long)written); + + status = h2_mplx_out_write(output->task->mplx, output->task->stream_id, + f, output->task->blocking, bb, + get_trailers(output), output->task->io); + if (status == APR_INCOMPLETE) { + apr_brigade_length(bb, 0, &left); + written -= left; + status = APR_SUCCESS; + } + + if (status == APR_SUCCESS) { + output->written += written; + if (h2_task_logio_add_bytes_out) { + h2_task_logio_add_bytes_out(output->c, written); } - h2_mplx_out_close(output->task->mplx, output->task->stream_id, - get_trailers(output)); - output->state = H2_TASK_OUT_DONE; } + return status; } /* Bring the data from the brigade (which represents the result of the @@ -137,34 +148,60 @@ apr_status_t h2_task_output_write(h2_task_output *output, apr_status_t status; if (APR_BRIGADE_EMPTY(bb)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, - "h2_task_output(%s): empty write", output->task->id); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, output->c, + "h2_task(%s): empty write", output->task->id); return APR_SUCCESS; } if (output->task->frozen) { h2_util_bb_log(output->c, output->task->stream_id, APLOG_TRACE2, "frozen task output write", bb); - return ap_save_brigade(f, &output->task->frozen_out, &bb, - output->c->pool); + return ap_save_brigade(f, &output->frozen_bb, &bb, output->c->pool); } status = open_if_needed(output, f, bb, "write"); - if (status != APR_EOF) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c, - "h2_task_output(%s): opened and passed brigade", + + /* Attempt to write saved brigade first */ + if (status == APR_SUCCESS && output->bb + && !APR_BRIGADE_EMPTY(output->bb)) { + status = write_brigade_raw(output, f, output->bb); + } + + /* If there is nothing saved (anymore), try to write the brigade passed */ + if (status == APR_SUCCESS + && (!output->bb || APR_BRIGADE_EMPTY(output->bb)) + && !APR_BRIGADE_EMPTY(bb)) { + status = write_brigade_raw(output, f, bb); + } + + /* If the passed brigade is not empty, save it before return */ + if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, output->c, + "h2_task(%s): could not write all, saving brigade", output->task->id); - return status; + if (!output->bb) { + output->bb = apr_brigade_create(output->c->pool, output->c->bucket_alloc); + } + return ap_save_brigade(f, &output->bb, &bb, output->c->pool); } - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c, - "h2_task_output(%s): write brigade", output->task->id); - if (h2_task_logio_add_bytes_out) { - apr_off_t bytes_written; - apr_brigade_length(bb, 0, &bytes_written); - h2_task_logio_add_bytes_out(f->c, bytes_written); + return status; +} + +void h2_task_output_close(h2_task_output *output) +{ + if (output->task->frozen) { + return; + } + open_if_needed(output, NULL, NULL, "close"); + if (output->state != H2_TASK_OUT_DONE) { + if (output->frozen_bb && !APR_BRIGADE_EMPTY(output->frozen_bb)) { + h2_mplx_out_write(output->task->mplx, output->task->stream_id, + NULL, 1, output->frozen_bb, NULL, NULL); + } + output->state = H2_TASK_OUT_DONE; + h2_mplx_out_close(output->task->mplx, output->task->stream_id, + get_trailers(output)); } - return h2_mplx_out_write(output->task->mplx, output->task->stream_id, - f, bb, get_trailers(output), output->task->io); } diff --git a/modules/http2/h2_task_output.h b/modules/http2/h2_task_output.h index aa719cdeea3..26326f0908b 100644 --- a/modules/http2/h2_task_output.h +++ b/modules/http2/h2_task_output.h @@ -30,16 +30,21 @@ typedef enum { H2_TASK_OUT_INIT, H2_TASK_OUT_STARTED, H2_TASK_OUT_DONE, -} h2_task_output_state_t; +} h2_task_out_state_t; typedef struct h2_task_output h2_task_output; struct h2_task_output { conn_rec *c; struct h2_task *task; - h2_task_output_state_t state; + h2_task_out_state_t state; struct h2_from_h1 *from_h1; + unsigned int trailers_passed : 1; + + apr_off_t written; + apr_bucket_brigade *bb; + apr_bucket_brigade *frozen_bb; }; h2_task_output *h2_task_output_create(struct h2_task *task, conn_rec *c); diff --git a/modules/http2/h2_util.c b/modules/http2/h2_util.c index 54e6a2ab0be..71a3ff90a69 100644 --- a/modules/http2/h2_util.c +++ b/modules/http2/h2_util.c @@ -309,12 +309,12 @@ static int ihash_iter(void *ctx, const void *key, apr_ssize_t klen, return ictx->iter(ictx->ctx, (void*)val); /* why is this passed const?*/ } -void h2_ihash_iter(h2_ihash_t *ih, h2_ihash_iter_t *fn, void *ctx) +int h2_ihash_iter(h2_ihash_t *ih, h2_ihash_iter_t *fn, void *ctx) { iter_ctx ictx; ictx.iter = fn; ictx.ctx = ctx; - apr_hash_do(ihash_iter, &ictx, ih->hash); + return apr_hash_do(ihash_iter, &ictx, ih->hash); } void h2_ihash_add(h2_ihash_t *ih, void *val) @@ -1212,8 +1212,9 @@ int h2_util_frame_print(const nghttp2_frame *frame, char *buffer, size_t maxlen) frame->goaway.opaque_data_len : s_len-1; memcpy(scratch, frame->goaway.opaque_data, len); scratch[len] = '\0'; - return apr_snprintf(buffer, maxlen, "GOAWAY[error=%d, reason='%s']", - frame->goaway.error_code, scratch); + return apr_snprintf(buffer, maxlen, "GOAWAY[error=%d, reason='%s', " + "last_stream=%d]", frame->goaway.error_code, + scratch, frame->goaway.last_stream_id); } case NGHTTP2_WINDOW_UPDATE: { return apr_snprintf(buffer, maxlen, diff --git a/modules/http2/h2_util.h b/modules/http2/h2_util.h index 97417f72616..4fffabb9596 100644 --- a/modules/http2/h2_util.h +++ b/modules/http2/h2_util.h @@ -56,8 +56,9 @@ void *h2_ihash_get(h2_ihash_t *ih, int id); * @param ih the hash to iterate over * @param fn the function to invoke on each member * @param ctx user supplied data passed into each iteration call + * @param 0 if one iteration returned 0, otherwise != 0 */ -void h2_ihash_iter(h2_ihash_t *ih, h2_ihash_iter_t *fn, void *ctx); +int h2_ihash_iter(h2_ihash_t *ih, h2_ihash_iter_t *fn, void *ctx); void h2_ihash_add(h2_ihash_t *ih, void *val); void h2_ihash_remove(h2_ihash_t *ih, int id); diff --git a/modules/http2/h2_version.h b/modules/http2/h2_version.h index 0c0f0314e98..9677bc1a750 100644 --- a/modules/http2/h2_version.h +++ b/modules/http2/h2_version.h @@ -26,7 +26,7 @@ * @macro * Version number of the http2 module as c string */ -#define MOD_HTTP2_VERSION "1.3.3" +#define MOD_HTTP2_VERSION "1.4.0" /** * @macro @@ -34,7 +34,7 @@ * release. This is a 24 bit number with 8 bits for major number, 8 bits * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203. */ -#define MOD_HTTP2_VERSION_NUM 0x010303 +#define MOD_HTTP2_VERSION_NUM 0x010400 #endif /* mod_h2_h2_version_h */ diff --git a/modules/http2/mod_http2.c b/modules/http2/mod_http2.c index 87cac2f678d..c3b01733a95 100644 --- a/modules/http2/mod_http2.c +++ b/modules/http2/mod_http2.c @@ -128,31 +128,26 @@ static char *http2_var_lookup(apr_pool_t *, server_rec *, conn_rec *, request_rec *, char *name); static int http2_is_h2(conn_rec *); -static apr_status_t http2_req_engine_push(const char *engine_type, +static apr_status_t http2_req_engine_push(const char *ngn_type, request_rec *r, h2_req_engine_init *einit) { - return h2_mplx_engine_push(engine_type, r, einit); + return h2_mplx_req_engine_push(ngn_type, r, einit); } -static apr_status_t http2_req_engine_pull(h2_req_engine *engine, +static apr_status_t http2_req_engine_pull(h2_req_engine *ngn, apr_read_type_e block, + apr_uint32_t capacity, request_rec **pr) { - return h2_mplx_engine_pull(engine, block, pr); + return h2_mplx_req_engine_pull(ngn, block, capacity, pr); } -static void http2_req_engine_done(h2_req_engine *engine, conn_rec *r_conn) +static void http2_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn) { - h2_mplx_engine_done(engine, r_conn); + h2_mplx_req_engine_done(ngn, r_conn); } -static void http2_req_engine_exit(h2_req_engine *engine) -{ - h2_mplx_engine_exit(engine); -} - - /* Runs once per created child process. Perform any process * related initionalization here. */ @@ -178,7 +173,6 @@ static void h2_hooks(apr_pool_t *pool) APR_REGISTER_OPTIONAL_FN(http2_req_engine_push); APR_REGISTER_OPTIONAL_FN(http2_req_engine_pull); APR_REGISTER_OPTIONAL_FN(http2_req_engine_done); - APR_REGISTER_OPTIONAL_FN(http2_req_engine_exit); ap_log_perror(APLOG_MARK, APLOG_TRACE1, 0, pool, "installing hooks"); diff --git a/modules/http2/mod_http2.h b/modules/http2/mod_http2.h index ae135293101..c5cfe704e3a 100644 --- a/modules/http2/mod_http2.h +++ b/modules/http2/mod_http2.h @@ -43,27 +43,12 @@ typedef struct h2_req_engine h2_req_engine; * @param engine the allocated, partially filled structure * @param r the first request to process, or NULL */ -typedef apr_status_t h2_req_engine_init(h2_req_engine *engine, request_rec *r); - -/** - * The public structure of a h2_req_engine. It gets allocated by the http2 - * infrastructure, assigned id, type, pool, io and connection and passed to the - * h2_req_engine_init() callback to complete initialization. - * This happens whenever a new request gets "push"ed for an engine type and - * no instance, or no free instance, for the type is available. - */ -struct h2_req_engine { - const char *id; /* identifier */ - apr_pool_t *pool; /* pool for engine specific allocations */ - const char *type; /* name of the engine type */ - unsigned char window_bits;/* preferred size of overall response data - * mod_http2 is willing to buffer as log2 */ - unsigned char req_window_bits;/* preferred size of response body data - * mod_http2 is willing to buffer per request, - * as log2 */ - apr_size_t capacity; /* maximum concurrent requests */ - void *user_data; /* user specific data */ -}; +typedef apr_status_t h2_req_engine_init(h2_req_engine *engine, + const char *id, + const char *type, + apr_pool_t *pool, + apr_uint32_t req_buffer_size, + request_rec *r); /** * Push a request to an engine with the specified name for further processing. @@ -89,25 +74,16 @@ APR_DECLARE_OPTIONAL_FN(apr_status_t, * @param timeout wait a maximum amount of time for a new slave, 0 will not wait * @param pslave the slave connection that needs processing or NULL * @return APR_SUCCESS if new request was assigned - * APR_EAGAIN/APR_TIMEUP if no new request is available - * APR_ECONNABORTED if the engine needs to shut down + * APR_EAGAIN if no new request is available + * APR_EOF if engine may shut down, as no more request will be scheduled + * APR_ECONNABORTED if the engine needs to shut down immediately */ APR_DECLARE_OPTIONAL_FN(apr_status_t, http2_req_engine_pull, (h2_req_engine *engine, apr_read_type_e block, + apr_uint32_t capacity, request_rec **pr)); APR_DECLARE_OPTIONAL_FN(void, http2_req_engine_done, (h2_req_engine *engine, conn_rec *rconn)); -/** - * The given request engine is done processing and needs to be excluded - * from further handling. - * @param engine the engine to exit - */ -APR_DECLARE_OPTIONAL_FN(void, - http2_req_engine_exit, (h2_req_engine *engine)); - - -#define H2_TASK_ID_NOTE "http2-task-id" - #endif