From: Stefan Eissing Date: Mon, 9 Nov 2015 13:05:53 +0000 (+0000) Subject: refactored connection io handling to go for blocking reads more often and use h2_mplx... X-Git-Tag: 2.5.0-alpha~2653 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=f8886e8da4f2a29125f218612a7936faaa21c7d1;p=thirdparty%2Fapache%2Fhttpd.git refactored connection io handling to go for blocking reads more often and use h2_mplx access less frequently git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1713427 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/modules/http2/h2_conn.c b/modules/http2/h2_conn.c index 877447e2ffe..f1fcbffb808 100644 --- a/modules/http2/h2_conn.c +++ b/modules/http2/h2_conn.c @@ -40,8 +40,6 @@ static struct h2_workers *workers; -static apr_status_t h2_conn_loop(h2_session *session); - static h2_mpm_type_t mpm_type = H2_MPM_UNKNOWN; static module *mpm_module; static int checked; @@ -133,42 +131,28 @@ static module *h2_conn_mpm_module(void) { return mpm_module; } -apr_status_t h2_conn_rprocess(request_rec *r) +apr_status_t h2_conn_process(conn_rec *c, request_rec *r) { - h2_config *config = h2_config_rget(r); + apr_status_t status; h2_session *session; + h2_config *config; + int rv; - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, "h2_conn_process start"); if (!workers) { - ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(02911) + ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, c, APLOGNO(02911) "workers not initialized"); return APR_EGENERAL; } - session = h2_session_rcreate(r, config, workers); - if (!session) { - return APR_EGENERAL; - } - - return h2_conn_loop(session); -} - -apr_status_t h2_conn_main(conn_rec *c) -{ - h2_config *config = h2_config_get(c); - h2_session *session; - apr_status_t status; + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, "h2_conn_process start"); - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, "h2_conn_main start"); - if (!workers) { - ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, c, APLOGNO(02912) - "workers not initialized"); - return APR_EGENERAL; + if (r) { + config = h2_config_rget(r); + session = h2_session_rcreate(r, config, workers); } - - session = h2_session_create(c, config, workers); - if (!session) { - return APR_EGENERAL; + else { + config = h2_config_get(c); + session = h2_session_create(c, config, workers); } if (!h2_is_acceptable_connection(c, 1)) { @@ -176,62 +160,7 @@ apr_status_t h2_conn_main(conn_rec *c) NGHTTP2_INADEQUATE_SECURITY, NULL, 0); } - status = h2_conn_loop(session); - - /* Make sure this connection gets closed properly. */ - c->keepalive = AP_CONN_CLOSE; - if (c->cs) { - c->cs->state = CONN_STATE_WRITE_COMPLETION; - } - - return status; -} - -static apr_status_t h2_conn_loop(h2_session *session) -{ - apr_status_t status = APR_SUCCESS; - int rv = 0; - apr_interval_time_t wait_micros = 0; - static const int MAX_WAIT_MICROS = 200 * 1000; - - /* Start talking to the client. Apart from protocol meta data, - * we mainly will see new http/2 streams opened by the client, which - * basically are http requests we need to dispatch. - * - * There will be bursts of new streams, to be served concurrently, - * followed by long pauses of no activity. - * - * Since the purpose of http/2 is to allow siumultaneous streams, we - * need to dispatch the handling of each stream into a separate worker - * thread, keeping this thread open for sending responses back as - * soon as they arrive. - * At the same time, we need to continue reading new frames from - * our client, which may be meta (WINDOWS_UPDATEs, PING, SETTINGS) or - * new streams. - * - * As long as we have streams open in this session, we cannot really rest - * since there are two conditions to wait on: 1. new data from the client, - * 2. new data from the open streams to send back. - * - * Only when we have no more streams open, can we do a blocking read - * on our connection. - * - * TODO: implement graceful GO_AWAY after configurable idle time - */ - - ap_update_child_status_from_conn(session->c->sbh, SERVER_BUSY_READ, - session->c); - - if (APLOGctrace2(session->c)) { - ap_filter_t *filter = session->c->input_filters; - while (filter) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - "h2_conn(%ld), has connection filter %s", - session->id, filter->frec->name); - filter = filter->next; - } - } - + ap_update_child_status_from_conn(c->sbh, SERVER_BUSY_READ, c); status = h2_session_start(session, &rv); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, @@ -244,109 +173,20 @@ static apr_status_t h2_conn_loop(h2_session *session) return status; } - while (!h2_session_is_done(session)) { - int have_written = 0; - int have_read = 0; - int got_streams; - - status = h2_session_write(session, wait_micros); - if (status == APR_SUCCESS) { - have_written = 1; - wait_micros = 0; - } - else if (APR_STATUS_IS_EAGAIN(status)) { - /* nop */ - } - else if (status == APR_TIMEUP) { - wait_micros *= 2; - if (wait_micros > MAX_WAIT_MICROS) { - wait_micros = MAX_WAIT_MICROS; - } - } - else { - ap_log_cerror( APLOG_MARK, APLOG_DEBUG, status, session->c, - "h2_session(%ld): writing, terminating", - session->id); - h2_session_abort(session, status, 0); - break; - } - - /* We would like to do blocking reads as often as possible as they - * are more efficient in regard to server resources. - * We can do them under the following circumstances: - * - we have no open streams and therefore have nothing to write - * - we have just started the session and are waiting for the first - * two frames to come in. There will always be at least 2 frames as - * * h2 will send SETTINGS and SETTINGS-ACK - * * h2c will count the header settings as one frame and we - * submit our settings and need the ACK. - */ - got_streams = !h2_stream_set_is_empty(session->streams); - if (!got_streams || session->frames_received <= 1) { - if (session->c->cs) { - session->c->cs->state = CONN_STATE_WRITE_COMPLETION; - } - status = h2_session_read(session, APR_BLOCK_READ); - } - else { - if (session->c->cs) { - session->c->cs->state = CONN_STATE_HANDLER; - } - status = h2_session_read(session, APR_NONBLOCK_READ); - } - - switch (status) { - case APR_SUCCESS: /* successful read, reset our idle timers */ - have_read = 1; - wait_micros = 0; - break; - case APR_EAGAIN: /* non-blocking read, nothing there */ - break; - default: - if (APR_STATUS_IS_ETIMEDOUT(status) - || APR_STATUS_IS_ECONNABORTED(status) - || APR_STATUS_IS_ECONNRESET(status) - || APR_STATUS_IS_EOF(status) - || APR_STATUS_IS_EBADF(status)) { - /* common status for a client that has left */ - ap_log_cerror( APLOG_MARK, APLOG_DEBUG, status, session->c, - "h2_session(%ld): terminating", - session->id); - /* Stolen from mod_reqtimeout to speed up lingering when - * a read timeout happened. - */ - apr_table_setn(session->c->notes, "short-lingering-close", "1"); - } - else { - /* uncommon status, log on INFO so that we see this */ - ap_log_cerror( APLOG_MARK, APLOG_INFO, status, session->c, - APLOGNO(02950) - "h2_session(%ld): error reading, terminating", - session->id); - } - h2_session_abort(session, status, 0); - break; - } - - if (!have_read && !have_written - && !h2_stream_set_is_empty(session->streams)) { - /* Nothing to read or write, we have streams, but - * the have no data yet ready to be delivered. Slowly - * back off to give others a chance to do their work. - */ - if (wait_micros == 0) { - wait_micros = 10; - } - } - } - + status = h2_session_process(session); + h2_session_close(session); + ap_log_cerror( APLOG_MARK, APLOG_DEBUG, status, session->c, "h2_session(%ld): done", session->id); - h2_session_close(session); - ap_update_child_status_from_conn(session->c->sbh, SERVER_CLOSING, - session->c); - return DONE; + /* Make sure this connection gets closed properly. */ + ap_update_child_status_from_conn(c->sbh, SERVER_CLOSING, c); + c->keepalive = AP_CONN_CLOSE; + if (c->cs) { + c->cs->state = CONN_STATE_WRITE_COMPLETION; + } + + return status; } @@ -460,24 +300,6 @@ apr_status_t h2_conn_setup(h2_task *task, struct h2_worker *worker) return APR_SUCCESS; } -apr_status_t h2_conn_post(conn_rec *c, h2_worker *worker) -{ - (void)worker; - - /* be sure no one messes with this any more */ - memset(c, 0, sizeof(*c)); - return APR_SUCCESS; -} - -apr_status_t h2_conn_process(conn_rec *c, apr_socket_t *socket) -{ - AP_DEBUG_ASSERT(c); - - ap_process_connection(c, socket); - - return APR_SUCCESS; -} - /* This is an internal mpm event.c struct which is disguised * as a conn_state_t so that mpm_event can have special connection * state information without changing the struct seen on the outside. diff --git a/modules/http2/h2_conn.h b/modules/http2/h2_conn.h index 752f28a74cb..88caf1710d5 100644 --- a/modules/http2/h2_conn.h +++ b/modules/http2/h2_conn.h @@ -19,17 +19,15 @@ struct h2_task; struct h2_worker; -/* Process the connection that is now starting the HTTP/2 - * conversation. Return when the HTTP/2 session is done - * and the connection will close. - */ -apr_status_t h2_conn_main(conn_rec *c); - -/* Process the request that has been upgraded to a HTTP/2 +/** + * Process the connection that is now starting the HTTP/2 * conversation. Return when the HTTP/2 session is done * and the connection will close. + * + * @param c the connection HTTP/2 is starting on + * @param r the upgrad requestion that still awaits an answer, optional */ -apr_status_t h2_conn_rprocess(request_rec *r); +apr_status_t h2_conn_process(conn_rec *c, request_rec *r); /* Initialize this child process for h2 connection work, * to be called once during child init before multi processing @@ -52,8 +50,5 @@ h2_mpm_type_t h2_conn_mpm_type(void); conn_rec *h2_conn_create(conn_rec *master, apr_pool_t *stream_pool); apr_status_t h2_conn_setup(struct h2_task *task, struct h2_worker *worker); -apr_status_t h2_conn_post(conn_rec *c, struct h2_worker *worker); - -apr_status_t h2_conn_process(conn_rec *c, apr_socket_t *socket); #endif /* defined(__mod_h2__h2_conn__) */ diff --git a/modules/http2/h2_conn_io.c b/modules/http2/h2_conn_io.c index 15ddebda44e..1fc7d37e9f4 100644 --- a/modules/http2/h2_conn_io.c +++ b/modules/http2/h2_conn_io.c @@ -341,7 +341,6 @@ apr_status_t h2_conn_io_consider_flush(h2_conn_io *io) return h2_conn_io_flush(io); } } - return status; } diff --git a/modules/http2/h2_h2.c b/modules/http2/h2_h2.c index 69061167d8b..5c66dbb88d3 100644 --- a/modules/http2/h2_h2.c +++ b/modules/http2/h2_h2.c @@ -651,7 +651,7 @@ int h2_h2_process_conn(conn_rec* c) ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "h2_h2, connection, h2 active"); - return h2_conn_main(c); + return h2_conn_process(c, NULL); } ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "h2_h2, declined"); diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index e54b9444381..710960c62d7 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -611,7 +611,6 @@ static int on_send_data_cb(nghttp2_session *ngh2, if (status == APR_SUCCESS) { apr_off_t len = length; status = h2_stream_read_to(stream, session->io.output, &len, &eos); - session->io.unflushed = 1; if (status == APR_SUCCESS && len != length) { status = APR_EINVAL; } @@ -953,11 +952,6 @@ apr_status_t h2_session_start(h2_session *session, int *rv) return status; } -static int h2_session_want_write(h2_session *session) -{ - return nghttp2_session_want_write(session->ngh2); -} - typedef struct { h2_session *session; int resume_count; @@ -1009,88 +1003,6 @@ static void update_window(void *ctx, int stream_id, apr_off_t bytes_read) nghttp2_session_consume(session->ngh2, stream_id, bytes_read); } -static apr_status_t h2_session_update_windows(h2_session *session) -{ - /* TODO: only do this, when we have streams with open input */ - return h2_mplx_in_update_windows(session->mplx, update_window, session); -} - -apr_status_t h2_session_write(h2_session *session, apr_interval_time_t timeout) -{ - apr_status_t status = APR_EAGAIN; - h2_stream *stream = NULL; - - AP_DEBUG_ASSERT(session); - - if (session->reprioritize) { - h2_mplx_reprioritize(session->mplx, stream_pri_cmp, session); - session->reprioritize = 0; - } - - /* Check that any pending window updates are sent. */ - status = h2_session_update_windows(session); - if (status != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(status)) { - return status; - } - - if (h2_session_want_write(session)) { - int rv; - status = APR_SUCCESS; - rv = nghttp2_session_send(session->ngh2); - if (rv != 0) { - ap_log_cerror( APLOG_MARK, APLOG_DEBUG, 0, session->c, - "h2_session: send: %s", nghttp2_strerror(rv)); - if (nghttp2_is_fatal(rv)) { - h2_session_abort_int(session, rv); - status = APR_ECONNABORTED; - } - } - } - - /* If we have responses ready, submit them now. */ - /* TODO: only call this when we have unsubmitted streams */ - while (!session->aborted - && (stream = h2_mplx_next_submit(session->mplx, session->streams)) != NULL) { - status = h2_session_handle_response(session, stream); - } - - if (!session->aborted && h2_session_resume_streams_with_data(session) > 0) { - } - - if (!session->aborted && !session->flush && timeout > 0 - && !h2_session_want_write(session)) { - h2_session_flush(session); - status = h2_mplx_out_trywait(session->mplx, timeout, session->iowait); - - if (status != APR_TIMEUP - && h2_session_resume_streams_with_data(session) > 0) { - } - else { - /* nothing happened to ongoing streams, do some house-keeping */ - } - } - - if (h2_session_want_write(session)) { - int rv; - status = APR_SUCCESS; - rv = nghttp2_session_send(session->ngh2); - if (rv != 0) { - ap_log_cerror( APLOG_MARK, APLOG_DEBUG, 0, session->c, - "h2_session: send2: %s", nghttp2_strerror(rv)); - if (nghttp2_is_fatal(rv)) { - h2_session_abort_int(session, rv); - status = APR_ECONNABORTED; - } - } - } - - if (session->flush) { - h2_session_flush(session); - } - - return status; -} - h2_stream *h2_session_get_stream(h2_session *session, int stream_id) { AP_DEBUG_ASSERT(session); @@ -1127,17 +1039,6 @@ static apr_status_t session_receive(const char *data, apr_size_t len, return APR_SUCCESS; } -apr_status_t h2_session_read(h2_session *session, apr_read_type_e block) -{ - AP_DEBUG_ASSERT(session); - if (block == APR_BLOCK_READ) { - /* before we do a blocking read, make sure that all our output - * is send out. Otherwise we might deadlock. */ - h2_session_flush(session); - } - return h2_conn_io_read(&session->io, block, session_receive, session); -} - apr_status_t h2_session_close(h2_session *session) { AP_DEBUG_ASSERT(session); @@ -1241,42 +1142,12 @@ typedef struct { size_t offset; } nvctx_t; -static int submit_response(h2_session *session, h2_response *response) -{ - nghttp2_data_provider provider; - int rv; - - memset(&provider, 0, sizeof(provider)); - provider.source.fd = response->stream_id; - provider.read_callback = stream_data_cb; - - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, - "h2_stream(%ld-%d): submitting response %s", - session->id, response->stream_id, response->status); - - rv = nghttp2_submit_response(session->ngh2, response->stream_id, - response->ngheader->nv, - response->ngheader->nvlen, &provider); - - if (rv != 0) { - ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c, - APLOGNO(02939) "h2_stream(%ld-%d): submit_response: %s", - session->id, response->stream_id, nghttp2_strerror(rv)); - } - else { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, - "h2_stream(%ld-%d): submitted response %s, rv=%d", - session->id, response->stream_id, - response->status, rv); - } - return rv; -} - -/* Start submitting the response to a stream request. This is possible +/** + * Start submitting the response to a stream request. This is possible * once we have all the response headers. The response body will be * read by the session using the callback we supply. */ -apr_status_t h2_session_handle_response(h2_session *session, h2_stream *stream) +static apr_status_t submit_response(h2_session *session, h2_stream *stream) { apr_status_t status = APR_SUCCESS; int rv = 0; @@ -1284,8 +1155,30 @@ apr_status_t h2_session_handle_response(h2_session *session, h2_stream *stream) AP_DEBUG_ASSERT(stream); AP_DEBUG_ASSERT(stream->response || stream->rst_error); - if (stream->response && stream->response->ngheader) { - rv = submit_response(session, stream->response); + if (stream->submitted) { + rv = NGHTTP2_PROTOCOL_ERROR; + } + else if (stream->response && stream->response->ngheader) { + nghttp2_data_provider provider; + h2_response *response = stream->response; + + memset(&provider, 0, sizeof(provider)); + provider.source.fd = stream->id; + provider.read_callback = stream_data_cb; + + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, + "h2_stream(%ld-%d): submitting response %s", + session->id, stream->id, response->status); + + rv = nghttp2_submit_response(session->ngh2, response->stream_id, + response->ngheader->nv, + response->ngheader->nvlen, &provider); + + if (rv != 0) { + ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c, + APLOGNO(02939) "h2_stream(%ld-%d): submit_response: %s", + session->id, response->stream_id, nghttp2_strerror(rv)); + } } else { rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE, @@ -1293,6 +1186,8 @@ apr_status_t h2_session_handle_response(h2_session *session, h2_stream *stream) H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR)); } + stream->submitted = 1; + if (nghttp2_is_fatal(rv)) { status = APR_EGENERAL; h2_session_abort_int(session, rv); @@ -1300,6 +1195,7 @@ apr_status_t h2_session_handle_response(h2_session *session, h2_stream *stream) APLOGNO(02940) "submit_response: %s", nghttp2_strerror(rv)); } + return status; } @@ -1321,15 +1217,6 @@ apr_status_t h2_session_stream_destroy(h2_session *session, h2_stream *stream) return APR_SUCCESS; } -int h2_session_is_done(h2_session *session) -{ - AP_DEBUG_ASSERT(session); - return (session->aborted - || !session->ngh2 - || (!nghttp2_session_want_read(session->ngh2) - && !nghttp2_session_want_write(session->ngh2))); -} - static int frame_print(const nghttp2_frame *frame, char *buffer, size_t maxlen) { char scratch[128]; @@ -1407,3 +1294,171 @@ static int frame_print(const nghttp2_frame *frame, char *buffer, size_t maxlen) } } +apr_status_t h2_session_process(h2_session *session) +{ + apr_status_t status = APR_SUCCESS; + apr_interval_time_t wait_micros = 0; + static const int MAX_WAIT_MICROS = 200 * 1000; + int got_streams = 0; + + while (!session->aborted && (nghttp2_session_want_read(session->ngh2) + || nghttp2_session_want_write(session->ngh2))) { + int have_written = 0; + int have_read = 0; + + /* Send data as long as we have it and window sizes allow. We are + * a server after all. + */ + if (nghttp2_session_want_write(session->ngh2)) { + int rv; + + rv = nghttp2_session_send(session->ngh2); + if (rv != 0) { + ap_log_cerror( APLOG_MARK, APLOG_DEBUG, 0, session->c, + "h2_session: send: %s", nghttp2_strerror(rv)); + if (nghttp2_is_fatal(rv)) { + h2_session_abort(session, status, rv); + goto end_process; + } + } + else { + have_written = 1; + wait_micros = 0; + } + } + + if (have_written) { + h2_session_flush(session); + } + + if (wait_micros > 0) { + ap_log_cerror( APLOG_MARK, APLOG_TRACE3, 0, session->c, + "h2_session: wait for data, %ld micros", (long)(wait_micros)); + status = h2_mplx_out_trywait(session->mplx, wait_micros, session->iowait); + + if (status == APR_TIMEUP) { + if (wait_micros < MAX_WAIT_MICROS) { + wait_micros *= 2; + } + } + } + + if (nghttp2_session_want_read(session->ngh2)) + { + /* When we + * - and have no streams at all + * - or have streams, but none is suspended or needs submit and + * have nothing written on the last try + * + * or, the other way around + * - have only streams where data can be sent, but could + * not send anything + * + * then we are waiting on frames from the client (for + * example WINDOW_UPDATE or HEADER) and without new frames + * from the client, we cannot make any progress, + * + * and *then* we can safely do a blocking read. + */ + int may_block = (session->frames_received <= 1); + if (!may_block) { + if (got_streams) { + may_block = (!have_written + && !h2_stream_set_has_unsubmitted(session->streams) + && !h2_stream_set_has_suspended(session->streams)); + } + else { + may_block = 1; + } + } + + if (may_block) { + if (session->c->cs) { + session->c->cs->state = CONN_STATE_WRITE_COMPLETION; + } + status = h2_conn_io_read(&session->io, APR_BLOCK_READ, + session_receive, session); + } + else { + if (session->c->cs) { + session->c->cs->state = CONN_STATE_HANDLER; + } + status = h2_conn_io_read(&session->io, APR_NONBLOCK_READ, + session_receive, session); + } + + switch (status) { + case APR_SUCCESS: /* successful read, reset our idle timers */ + have_read = 1; + wait_micros = 0; + break; + case APR_EAGAIN: /* non-blocking read, nothing there */ + break; + default: + if (APR_STATUS_IS_ETIMEDOUT(status) + || APR_STATUS_IS_ECONNABORTED(status) + || APR_STATUS_IS_ECONNRESET(status) + || APR_STATUS_IS_EOF(status) + || APR_STATUS_IS_EBADF(status)) { + /* common status for a client that has left */ + ap_log_cerror( APLOG_MARK, APLOG_DEBUG, status, session->c, + "h2_session(%ld): terminating", + session->id); + /* Stolen from mod_reqtimeout to speed up lingering when + * a read timeout happened. + */ + apr_table_setn(session->c->notes, "short-lingering-close", "1"); + } + else { + /* uncommon status, log on INFO so that we see this */ + ap_log_cerror( APLOG_MARK, APLOG_INFO, status, session->c, + APLOGNO(02950) + "h2_session(%ld): error reading, terminating", + session->id); + } + h2_session_abort(session, status, 0); + goto end_process; + } + } + + got_streams = !h2_stream_set_is_empty(session->streams); + if (got_streams) { + h2_stream *stream; + + if (session->reprioritize) { + h2_mplx_reprioritize(session->mplx, stream_pri_cmp, session); + session->reprioritize = 0; + } + + if (!have_read && !have_written) { + /* Nothing read or written. That means no data yet ready to + * be send out. Slowly back off... + */ + if (wait_micros == 0) { + wait_micros = 10; + } + } + + if (h2_stream_set_has_open_input(session->streams)) { + /* Check that any pending window updates are sent. */ + status = h2_mplx_in_update_windows(session->mplx, update_window, session); + if (APR_STATUS_IS_EAGAIN(status)) { + status = APR_SUCCESS; + } + } + + h2_session_resume_streams_with_data(session); + + if (h2_stream_set_has_unsubmitted(session->streams)) { + /* If we have responses ready, submit them now. */ + while ((stream = h2_mplx_next_submit(session->mplx, session->streams))) { + status = submit_response(session, stream); + } + } + } + + } + +end_process: + return status; +} diff --git a/modules/http2/h2_session.h b/modules/http2/h2_session.h index a1d718a9ca6..b6a9beae5d3 100644 --- a/modules/http2/h2_session.h +++ b/modules/http2/h2_session.h @@ -106,6 +106,14 @@ h2_session *h2_session_create(conn_rec *c, struct h2_config *cfg, h2_session *h2_session_rcreate(request_rec *r, struct h2_config *cfg, struct h2_workers *workers); +/** + * Process the given HTTP/2 session until it is ended or a fatal + * error occured. + * + * @param session the sessionm to process + */ +apr_status_t h2_session_process(h2_session *session); + /** * Destroy the session and all objects it still contains. This will not * destroy h2_task instances that have not finished yet. @@ -129,12 +137,6 @@ void h2_session_cleanup(h2_session *session); */ apr_status_t h2_session_start(h2_session *session, int *rv); -/** - * Determine if session is finished. - * @return != 0 iff session is finished and connection can be closed. - */ -int h2_session_is_done(h2_session *session); - /** * Called when an error occured and the session needs to shut down. * @param session the session to shut down @@ -149,18 +151,6 @@ apr_status_t h2_session_abort(h2_session *session, apr_status_t reason, int rv); */ apr_status_t h2_session_close(h2_session *session); -/* Read more data from the client connection. Used normally with blocking - * APR_NONBLOCK_READ, which will return APR_EAGAIN when no data is available. - * Use with APR_BLOCK_READ only when certain that no data needs to be written - * while waiting. */ -apr_status_t h2_session_read(h2_session *session, apr_read_type_e block); - -/* Write data out to the client, if there is any. Otherwise, wait for - * a maximum of timeout micro-seconds and return to the caller. If timeout - * occurred, APR_TIMEUP will be returned. - */ -apr_status_t h2_session_write(h2_session *session, apr_interval_time_t timeout); - /* Start submitting the response to a stream request. This is possible * once we have all the response headers. */ apr_status_t h2_session_handle_response(h2_session *session, diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c index 8a55962feaa..266a9e24fb8 100644 --- a/modules/http2/h2_stream.c +++ b/modules/http2/h2_stream.c @@ -354,3 +354,27 @@ int h2_stream_is_suspended(h2_stream *stream) return stream->suspended; } +int h2_stream_input_is_open(h2_stream *stream) +{ + switch (stream->state) { + case H2_STREAM_ST_OPEN: + case H2_STREAM_ST_CLOSED_OUTPUT: + return 1; + default: + return 0; + } +} + +int h2_stream_needs_submit(h2_stream *stream) +{ + switch (stream->state) { + case H2_STREAM_ST_OPEN: + case H2_STREAM_ST_CLOSED_INPUT: + case H2_STREAM_ST_CLOSED_OUTPUT: + case H2_STREAM_ST_CLOSED: + return !stream->submitted; + default: + return 0; + } +} + diff --git a/modules/http2/h2_stream.h b/modules/http2/h2_stream.h index d8847344ff1..a36733fa189 100644 --- a/modules/http2/h2_stream.h +++ b/modules/http2/h2_stream.h @@ -61,6 +61,7 @@ struct h2_stream { int aborted; /* was aborted */ int suspended; /* DATA sending has been suspended */ int rst_error; /* stream error for RST_STREAM */ + int submitted; /* response HEADER has been sent */ apr_pool_t *pool; /* the memory pool for this stream */ struct h2_request *request; /* the request made in this stream */ @@ -114,5 +115,7 @@ apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb, void h2_stream_set_suspended(h2_stream *stream, int suspended); int h2_stream_is_suspended(h2_stream *stream); +int h2_stream_input_is_open(h2_stream *stream); +int h2_stream_needs_submit(h2_stream *stream); #endif /* defined(__mod_h2__h2_stream__) */ diff --git a/modules/http2/h2_stream_set.c b/modules/http2/h2_stream_set.c index b93e45bb687..17c75f08222 100644 --- a/modules/http2/h2_stream_set.c +++ b/modules/http2/h2_stream_set.c @@ -97,4 +97,51 @@ void h2_stream_set_iter(h2_stream_set *sp, apr_hash_do(hash_iter, &ictx, sp->hash); } +static int unsubmitted_iter(void *ctx, h2_stream *stream) +{ + if (h2_stream_needs_submit(stream)) { + *((int *)ctx) = 1; + return 0; + } + return 1; +} + +int h2_stream_set_has_unsubmitted(h2_stream_set *sp) +{ + int has_unsubmitted = 0; + h2_stream_set_iter(sp, unsubmitted_iter, &has_unsubmitted); + return has_unsubmitted; +} + +static int input_open_iter(void *ctx, h2_stream *stream) +{ + if (h2_stream_input_is_open(stream)) { + *((int *)ctx) = 1; + return 0; + } + return 1; +} + +int h2_stream_set_has_open_input(h2_stream_set *sp) +{ + int has_input_open = 0; + h2_stream_set_iter(sp, input_open_iter, &has_input_open); + return has_input_open; +} + +static int suspended_iter(void *ctx, h2_stream *stream) +{ + if (h2_stream_is_suspended(stream)) { + *((int *)ctx) = 1; + return 0; + } + return 1; +} + +int h2_stream_set_has_suspended(h2_stream_set *sp) +{ + int has_suspended = 0; + h2_stream_set_iter(sp, suspended_iter, &has_suspended); + return has_suspended; +} diff --git a/modules/http2/h2_stream_set.h b/modules/http2/h2_stream_set.h index 44fe94ef3be..d0041c48432 100644 --- a/modules/http2/h2_stream_set.h +++ b/modules/http2/h2_stream_set.h @@ -37,11 +37,15 @@ h2_stream *h2_stream_set_get(h2_stream_set *sp, int stream_id); void h2_stream_set_remove(h2_stream_set *sp, int stream_id); +void h2_stream_set_iter(h2_stream_set *sp, + h2_stream_set_iter_fn *iter, void *ctx); + int h2_stream_set_is_empty(h2_stream_set *sp); apr_size_t h2_stream_set_size(h2_stream_set *sp); -void h2_stream_set_iter(h2_stream_set *sp, - h2_stream_set_iter_fn *iter, void *ctx); +int h2_stream_set_has_unsubmitted(h2_stream_set *sp); +int h2_stream_set_has_open_input(h2_stream_set *sp); +int h2_stream_set_has_suspended(h2_stream_set *sp); #endif /* defined(__mod_h2__h2_stream_set__) */ diff --git a/modules/http2/h2_switch.c b/modules/http2/h2_switch.c index 97f07f891d7..c107db8e73a 100644 --- a/modules/http2/h2_switch.c +++ b/modules/http2/h2_switch.c @@ -154,7 +154,7 @@ static int h2_protocol_switch(conn_rec *c, request_rec *r, server_rec *s, ap_remove_input_filter_byhandle(r->input_filters, "reqtimeout"); /* Ok, start an h2_conn on this one. */ - status = h2_conn_rprocess(r); + status = h2_conn_process(r->connection, r); if (status != DONE) { /* Nothing really to do about this. */ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c index ee78f4347e1..c44d3827d47 100644 --- a/modules/http2/h2_task.c +++ b/modules/http2/h2_task.c @@ -227,8 +227,8 @@ apr_status_t h2_task_do(h2_task *task, h2_worker *worker) task->c->bucket_alloc); task->output = h2_task_output_create(task, task->pool, task->c->bucket_alloc); - status = h2_conn_process(task->c, h2_worker_get_socket(worker)); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, task->c, + ap_process_connection(task->c, h2_worker_get_socket(worker)); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c, "h2_task(%s): processing done", task->id); } else { @@ -257,10 +257,6 @@ apr_status_t h2_task_do(h2_task *task, h2_worker *worker) task->pool = NULL; } - if (task->c->id) { - h2_conn_post(task->c, worker); - } - h2_mplx_task_done(task->mplx, task->stream_id); return status;