*pacquired = 0;
return APR_SUCCESS;
}
-
+
+ AP_DEBUG_ASSERT(m->lock);
status = apr_thread_mutex_lock(m->lock);
*pacquired = (status == APR_SUCCESS);
if (*pacquired) {
m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id));
- m->sready = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->q = h2_iq_create(m->pool, m->max_streams);
+ m->sready = h2_ihash_create(m->pool, offsetof(h2_stream,id));
+ m->sresume = h2_ihash_create(m->pool, offsetof(h2_stream,id));
m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id));
m->stream_timeout = stream_timeout;
static void input_consumed_signal(h2_mplx *m, h2_stream *stream)
{
- if (stream->input) {
+ if (stream->input && stream->started) {
h2_beam_send(stream->input, NULL, 0); /* trigger updates */
}
}
static int output_consumed_signal(h2_mplx *m, h2_task *task)
{
if (task->output.beam && task->worker_started && task->assigned) {
- h2_beam_send(task->output.beam, NULL, 0); /* trigger updates */
+ /* trigger updates */
+ h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ);
}
return 0;
}
*/
h2_iq_remove(m->q, stream->id);
h2_ihash_remove(m->sready, stream->id);
+ h2_ihash_remove(m->sresume, stream->id);
h2_ihash_remove(m->streams, stream->id);
if (stream->input) {
m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
m->input_consumed_ctx = ctx;
}
-static int update_window(void *ctx, void *val)
-{
- input_consumed_signal(ctx, val);
- return 1;
-}
-
-apr_status_t h2_mplx_in_update_windows(h2_mplx *m)
-{
- apr_status_t status;
- int acquired;
-
- AP_DEBUG_ASSERT(m);
- if (m->aborted) {
- return APR_ECONNABORTED;
- }
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_ihash_iter(m->streams, update_window, m);
-
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
- "h2_session(%ld): windows updated", m->id);
- status = APR_SUCCESS;
- leave_mutex(m, acquired);
- }
- return status;
-}
-
-static int stream_iter_first(void *ctx, void *val)
-{
- h2_stream **pstream = ctx;
- *pstream = val;
- return 0;
-}
-
-h2_stream *h2_mplx_next_submit(h2_mplx *m)
-{
- apr_status_t status;
- h2_stream *stream = NULL;
- int acquired;
-
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_ihash_iter(m->sready, stream_iter_first, &stream);
- if (stream) {
- h2_task *task = h2_ihash_get(m->tasks, stream->id);
- h2_ihash_remove(m->sready, stream->id);
- if (task) {
- task->submitted = 1;
- if (task->rst_error) {
- h2_stream_rst(stream, task->rst_error);
- }
- else {
- AP_DEBUG_ASSERT(task->response);
- h2_stream_set_response(stream, task->response,
- task->output.beam);
- }
- }
- else {
- /* We have the stream ready without a task. This happens
- * when we fail streams early. A response should already
- * be present. */
- AP_DEBUG_ASSERT(stream->response || stream->rst_error);
- }
- }
- leave_mutex(m, acquired);
- }
- return stream;
-}
-
static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response)
{
apr_status_t status = APR_SUCCESS;
if (m->aborted) {
status = APR_ECONNABORTED;
}
+ else if (!h2_ihash_empty(m->sready) || !h2_ihash_empty(m->sresume)) {
+ status = APR_SUCCESS;
+ }
else {
m->added_output = iowait;
status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout);
if (new_conn) {
h2_slave_run_pre_connection(slave, ap_get_conn_socket(slave));
}
+ stream->started = 1;
task->worker_started = 1;
task->started_at = apr_time_now();
if (sid > m->max_stream_started) {
task->done_at = apr_time_now();
if (task->output.beam) {
h2_beam_on_consumed(task->output.beam, NULL, NULL);
+ h2_beam_on_produced(task->output.beam, NULL, NULL);
h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL);
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
"h2_mplx(%s): task_done, stream still open",
task->id);
+ if (h2_stream_is_suspended(stream)) {
+ /* more data will not arrive, resume the stream */
+ h2_ihash_add(m->sresume, stream);
+ have_out_data_for(m, stream->id);
+ }
}
else {
/* stream done, was it placed in hold? */
}
}
}
-
+
+/*******************************************************************************
+ * mplx master events dispatching
+ ******************************************************************************/
+
+typedef struct {
+ h2_mplx *m;
+ stream_ev_callback *on_resume;
+ stream_ev_callback *on_response;
+ void *on_ctx;
+ apr_status_t status;
+} dispatch_ctx;
+
+static int update_window(void *ctx, void *val)
+{
+ input_consumed_signal(ctx, val);
+ return 1;
+}
+
+static int stream_ready_iter(void *data, void *val)
+{
+ dispatch_ctx *ctx = data;
+ h2_stream *stream = val;
+ h2_task *task = h2_ihash_get(ctx->m->tasks, stream->id);
+
+ if (task) {
+ task->submitted = 1;
+ if (task->rst_error) {
+ h2_stream_rst(stream, task->rst_error);
+ }
+ else {
+ AP_DEBUG_ASSERT(task->response);
+ h2_stream_set_response(stream, task->response, task->output.beam);
+ }
+ }
+ else {
+ /* We have the stream ready without a task. This happens
+ * when we fail streams early. A response should already
+ * be present. */
+ AP_DEBUG_ASSERT(stream->response || stream->rst_error);
+ }
+
+ ctx->status = ctx->on_response(ctx->on_ctx, stream->id);
+ return 1;
+}
+
+static int stream_resume_iter(void *data, void *val)
+{
+ dispatch_ctx *ctx = data;
+ h2_stream *stream = val;
+
+ h2_stream_set_suspended(stream, 0);
+ ctx->status = ctx->on_resume(ctx->on_ctx, stream->id);
+ return 1;
+}
+
+apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
+ stream_ev_callback *on_resume,
+ stream_ev_callback *on_response,
+ void *on_ctx)
+{
+ apr_status_t status;
+ int acquired;
+
+ AP_DEBUG_ASSERT(m);
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ dispatch_ctx ctx;
+ ctx.m = m;
+ ctx.on_resume = on_resume;
+ ctx.on_response = on_response;
+ ctx.on_ctx = on_ctx;
+ ctx.status = APR_SUCCESS;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ "h2_mplx(%ld): dispatch events", m->id);
+ /* update input windows for streams */
+ h2_ihash_iter(m->streams, update_window, m);
+
+ if (ctx.on_response) {
+ h2_ihash_iter(m->sready, stream_ready_iter, &ctx);
+ h2_ihash_clear(m->sready);
+ }
+
+ if (ctx.on_resume) {
+ h2_ihash_iter(m->sresume, stream_resume_iter, &ctx);
+ h2_ihash_clear(m->sresume);
+ }
+
+ leave_mutex(m, acquired);
+ return ctx.status;
+ }
+ return status;
+}
+
+static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
+{
+ h2_mplx *m = ctx;
+ apr_status_t status;
+ h2_stream *stream;
+ h2_task *task;
+ int acquired;
+
+ AP_DEBUG_ASSERT(m);
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ stream = h2_ihash_get(m->streams, beam->id);
+ if (stream && h2_stream_is_suspended(stream)) {
+ h2_ihash_add(m->sresume, stream);
+ task = h2_ihash_get(m->tasks, stream->id);
+ if (task && task->output.beam) {
+ h2_beam_on_produced(task->output.beam, NULL, NULL);
+ }
+ have_out_data_for(m, beam->id);
+ }
+ leave_mutex(m, acquired);
+ }
+}
+
+apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id)
+{
+ apr_status_t status;
+ h2_stream *stream;
+ h2_task *task;
+ int acquired;
+
+ AP_DEBUG_ASSERT(m);
+ if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+ stream = h2_ihash_get(m->streams, stream_id);
+ if (stream && !h2_stream_is_suspended(stream)) {
+ h2_stream_set_suspended(stream, 1);
+ task = h2_ihash_get(m->tasks, stream->id);
+ if (task && task->output.beam && h2_beam_empty(task->output.beam)) {
+ /* register callback so that we can resume on new output */
+ h2_beam_on_produced(task->output.beam, output_produced, m);
+ }
+ else {
+ /* if the beam got data in the meantime, add this to the to-be
+ * resumed streams right away. */
+ h2_ihash_add(m->sresume, stream);
+ }
+ }
+ leave_mutex(m, acquired);
+ }
+ return status;
+}
static void dispatch_event(h2_session *session, h2_session_event_t ev,
int err, const char *msg);
+apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream)
+{
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_stream(%ld-%d): EOS bucket cleanup -> done",
+ session->id, stream->id);
+ h2_ihash_remove(session->streams, stream->id);
+ h2_mplx_stream_done(session->mplx, stream);
+
+ dispatch_event(session, H2_SESSION_EV_STREAM_DONE, 0, NULL);
+ return APR_SUCCESS;
+}
+
typedef struct stream_sel_ctx {
h2_session *session;
h2_stream *candidate;
stream = h2_stream_open(stream_id, stream_pool, session,
initiated_on, req);
- ++session->unanswered_streams;
nghttp2_session_set_stream_user_data(session->ngh2, stream_id, stream);
h2_ihash_add(session->streams, stream);
return status;
}
-typedef struct {
- h2_session *session;
- int resume_count;
-} resume_ctx;
-
-static int resume_on_data(void *ctx, void *val)
-{
- h2_stream *stream = val;
- resume_ctx *rctx = (resume_ctx*)ctx;
- h2_session *session = rctx->session;
- AP_DEBUG_ASSERT(session);
- AP_DEBUG_ASSERT(stream);
-
- if (h2_stream_is_suspended(stream)) {
- apr_status_t status;
- apr_off_t len = -1;
- int eos;
-
- status = h2_stream_out_prepare(stream, &len, &eos);
- if (status == APR_SUCCESS) {
- int rv;
- h2_stream_set_suspended(stream, 0);
- ++rctx->resume_count;
-
- rv = nghttp2_session_resume_data(session->ngh2, stream->id);
- ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)?
- APLOG_ERR : APLOG_DEBUG, 0, session->c,
- APLOGNO(02936)
- "h2_stream(%ld-%d): resuming %s, len=%ld, eos=%d",
- session->id, stream->id,
- rv? nghttp2_strerror(rv) : "", (long)len, eos);
- }
- }
- return 1;
-}
-
-static int h2_session_resume_streams_with_data(h2_session *session)
-{
- AP_DEBUG_ASSERT(session);
- if (session->open_streams && !session->mplx->aborted) {
- resume_ctx ctx;
- ctx.session = session;
- ctx.resume_count = 0;
-
- /* Resume all streams where we have data in the out queue and
- * which had been suspended before. */
- h2_ihash_iter(session->streams, resume_on_data, &ctx);
- return ctx.resume_count;
- }
- return 0;
-}
-
static ssize_t stream_data_cb(nghttp2_session *ng2s,
int32_t stream_id,
uint8_t *buf,
* it. Remember at our h2_stream that we need to do this.
*/
nread = 0;
- h2_stream_set_suspended(stream, 1);
+ h2_mplx_suspend_stream(session->mplx, stream->id);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03071)
"h2_stream(%ld-%d): suspending",
session->id, (int)stream_id);
size_t offset;
} nvctx_t;
-/**
- * Start submitting the response to a stream request. This is possible
- * once we have all the response headers. The response body will be
- * read by the session using the callback we supply.
- */
-static apr_status_t submit_response(h2_session *session, h2_stream *stream)
-{
- apr_status_t status = APR_SUCCESS;
- h2_response *response = h2_stream_get_response(stream);
- int rv = 0;
- AP_DEBUG_ASSERT(session);
- AP_DEBUG_ASSERT(stream);
- AP_DEBUG_ASSERT(response || stream->rst_error);
-
- if (stream->submitted) {
- rv = NGHTTP2_PROTOCOL_ERROR;
- }
- else if (response && response->headers) {
- nghttp2_data_provider provider, *pprovider = NULL;
- h2_ngheader *ngh;
- const h2_priority *prio;
-
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03073)
- "h2_stream(%ld-%d): submit response %d, REMOTE_WINDOW_SIZE=%u",
- session->id, stream->id, response->http_status,
- (unsigned int)nghttp2_session_get_stream_remote_window_size(session->ngh2, stream->id));
-
- if (response->content_length != 0) {
- memset(&provider, 0, sizeof(provider));
- provider.source.fd = stream->id;
- provider.read_callback = stream_data_cb;
- pprovider = &provider;
- }
-
- /* If this stream is not a pushed one itself,
- * and HTTP/2 server push is enabled here,
- * and the response is in the range 200-299 *),
- * and the remote side has pushing enabled,
- * -> find and perform any pushes on this stream
- * *before* we submit the stream response itself.
- * This helps clients avoid opening new streams on Link
- * headers that get pushed right afterwards.
- *
- * *) the response code is relevant, as we do not want to
- * make pushes on 401 or 403 codes, neiterh on 301/302
- * and friends. And if we see a 304, we do not push either
- * as the client, having this resource in its cache, might
- * also have the pushed ones as well.
- */
- if (stream->request && !stream->request->initiated_on
- && H2_HTTP_2XX(response->http_status)
- && h2_session_push_enabled(session)) {
-
- h2_stream_submit_pushes(stream);
- }
-
- prio = h2_stream_get_priority(stream);
- if (prio) {
- h2_session_set_prio(session, stream, prio);
- /* no showstopper if that fails for some reason */
- }
-
- ngh = h2_util_ngheader_make_res(stream->pool, response->http_status,
- response->headers);
- rv = nghttp2_submit_response(session->ngh2, response->stream_id,
- ngh->nv, ngh->nvlen, pprovider);
- }
- else {
- int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR);
-
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03074)
- "h2_stream(%ld-%d): RST_STREAM, err=%d",
- session->id, stream->id, err);
-
- rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
- stream->id, err);
- }
-
- stream->submitted = 1;
- --session->unanswered_streams;
- if (stream->request && stream->request->initiated_on) {
- ++session->pushes_submitted;
- }
- else {
- ++session->responses_submitted;
- }
-
- if (nghttp2_is_fatal(rv)) {
- status = APR_EGENERAL;
- dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, rv, nghttp2_strerror(rv));
- ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
- APLOGNO(02940) "submit_response: %s",
- nghttp2_strerror(rv));
- }
-
- return status;
-}
-
struct h2_stream *h2_session_push(h2_session *session, h2_stream *is,
h2_push *push)
{
return status;
}
-apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream)
-{
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
- "h2_stream(%ld-%d): EOS bucket cleanup -> done",
- session->id, stream->id);
- h2_ihash_remove(session->streams, stream->id);
- --session->unanswered_streams;
- h2_mplx_stream_done(session->mplx, stream);
-
- dispatch_event(session, H2_SESSION_EV_STREAM_DONE, 0, NULL);
- return APR_SUCCESS;
-}
-
int h2_session_push_enabled(h2_session *session)
{
/* iff we can and they can and want */
if (socket) {
apr_socket_timeout_set(socket, saved_timeout);
}
+ session->have_written = 1;
if (rv != 0) {
if (nghttp2_is_fatal(rv)) {
dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, rv, nghttp2_strerror(rv));
return APR_SUCCESS;
}
+/**
+ * A stream was resumed as new output data arrived.
+ */
+static apr_status_t on_stream_resume(void *ctx, int stream_id)
+{
+ h2_session *session = ctx;
+ h2_stream *stream = get_stream(session, stream_id);
+ apr_status_t status = APR_SUCCESS;
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_stream(%ld-%d): on_resume", session->id, stream_id);
+ if (stream) {
+ int rv = nghttp2_session_resume_data(session->ngh2, stream_id);
+ session->have_written = 1;
+ ap_log_cerror(APLOG_MARK, nghttp2_is_fatal(rv)?
+ APLOG_ERR : APLOG_DEBUG, 0, session->c,
+ APLOGNO(02936)
+ "h2_stream(%ld-%d): resuming %s",
+ session->id, stream->id, rv? nghttp2_strerror(rv) : "");
+ }
+ return status;
+}
+
+/**
+ * A response for the stream is ready.
+ */
+static apr_status_t on_stream_response(void *ctx, int stream_id)
+{
+ h2_session *session = ctx;
+ h2_stream *stream = get_stream(session, stream_id);
+ apr_status_t status = APR_SUCCESS;
+ h2_response *response;
+ int rv = 0;
+
+ AP_DEBUG_ASSERT(session);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ "h2_stream(%ld-%d): on_response", session->id, stream_id);
+ if (!stream) {
+ return APR_NOTFOUND;
+ }
+
+ response = h2_stream_get_response(stream);
+ AP_DEBUG_ASSERT(response || stream->rst_error);
+
+ if (stream->submitted) {
+ rv = NGHTTP2_PROTOCOL_ERROR;
+ }
+ else if (response && response->headers) {
+ nghttp2_data_provider provider, *pprovider = NULL;
+ h2_ngheader *ngh;
+ const h2_priority *prio;
+
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03073)
+ "h2_stream(%ld-%d): submit response %d, REMOTE_WINDOW_SIZE=%u",
+ session->id, stream->id, response->http_status,
+ (unsigned int)nghttp2_session_get_stream_remote_window_size(session->ngh2, stream->id));
+
+ if (response->content_length != 0) {
+ memset(&provider, 0, sizeof(provider));
+ provider.source.fd = stream->id;
+ provider.read_callback = stream_data_cb;
+ pprovider = &provider;
+ }
+
+ /* If this stream is not a pushed one itself,
+ * and HTTP/2 server push is enabled here,
+ * and the response is in the range 200-299 *),
+ * and the remote side has pushing enabled,
+ * -> find and perform any pushes on this stream
+ * *before* we submit the stream response itself.
+ * This helps clients avoid opening new streams on Link
+ * headers that get pushed right afterwards.
+ *
+ * *) the response code is relevant, as we do not want to
+ * make pushes on 401 or 403 codes, neiterh on 301/302
+ * and friends. And if we see a 304, we do not push either
+ * as the client, having this resource in its cache, might
+ * also have the pushed ones as well.
+ */
+ if (stream->request && !stream->request->initiated_on
+ && H2_HTTP_2XX(response->http_status)
+ && h2_session_push_enabled(session)) {
+
+ h2_stream_submit_pushes(stream);
+ }
+
+ prio = h2_stream_get_priority(stream);
+ if (prio) {
+ h2_session_set_prio(session, stream, prio);
+ /* no showstopper if that fails for some reason */
+ }
+
+ ngh = h2_util_ngheader_make_res(stream->pool, response->http_status,
+ response->headers);
+ rv = nghttp2_submit_response(session->ngh2, response->stream_id,
+ ngh->nv, ngh->nvlen, pprovider);
+ }
+ else {
+ int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR);
+
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03074)
+ "h2_stream(%ld-%d): RST_STREAM, err=%d",
+ session->id, stream->id, err);
+
+ rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
+ stream->id, err);
+ }
+
+ stream->submitted = 1;
+ session->have_written = 1;
+
+ if (stream->request && stream->request->initiated_on) {
+ ++session->pushes_submitted;
+ }
+ else {
+ ++session->responses_submitted;
+ }
+
+ if (nghttp2_is_fatal(rv)) {
+ status = APR_EGENERAL;
+ dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, rv, nghttp2_strerror(rv));
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
+ APLOGNO(02940) "submit_response: %s",
+ nghttp2_strerror(rv));
+ }
+
+ ++session->unsent_submits;
+
+ /* Unsent push promises are written immediately, as nghttp2
+ * 1.5.0 realizes internal stream data structures only on
+ * send and we might need them for other submits.
+ * Also, to conserve memory, we send at least every 10 submits
+ * so that nghttp2 does not buffer all outbound items too
+ * long.
+ */
+ if (status == APR_SUCCESS
+ && (session->unsent_promises || session->unsent_submits > 10)) {
+ status = h2_session_send(session);
+ }
+ return status;
+}
+
static apr_status_t h2_session_receive(void *ctx, const char *data,
apr_size_t len, apr_size_t *readlen)
{
return has_suspended;
}
-static apr_status_t h2_session_submit(h2_session *session)
-{
- apr_status_t status = APR_EAGAIN;
- h2_stream *stream;
-
- if (has_unsubmitted_streams(session)) {
- /* If we have responses ready, submit them now. */
- while ((stream = h2_mplx_next_submit(session->mplx))) {
- status = submit_response(session, stream);
- ++session->unsent_submits;
-
- /* Unsent push promises are written immediately, as nghttp2
- * 1.5.0 realizes internal stream data structures only on
- * send and we might need them for other submits.
- * Also, to conserve memory, we send at least every 10 submits
- * so that nghttp2 does not buffer all outbound items too
- * long.
- */
- if (status == APR_SUCCESS
- && (session->unsent_promises || session->unsent_submits > 10)) {
- status = h2_session_send(session);
- if (status != APR_SUCCESS) {
- break;
- }
- }
- }
- }
- return status;
-}
-
static const char *StateNames[] = {
"INIT", /* H2_SESSION_ST_INIT */
"DONE", /* H2_SESSION_ST_DONE */
case H2_SESSION_ST_BUSY:
case H2_SESSION_ST_LOCAL_SHUTDOWN:
case H2_SESSION_ST_REMOTE_SHUTDOWN:
- /* nothing for input and output to do. If we remain
- * in this state, we go into a tight loop and suck up
- * CPU cycles. Ideally, we'd like to do a blocking read, but that
- * is not possible if we have scheduled tasks and wait
- * for them to produce something. */
+ /* Nothing to READ, nothing to WRITE on the master connection.
+ * Possible causes:
+ * - we wait for the client to send us sth
+ * - we wait for started tasks to produce output
+ * - we have finished all streams and the client has sent GO_AWAY
+ */
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
"h2_session(%ld): NO_IO event, %d streams open",
session->id, session->open_streams);
- if (!session->open_streams) {
- if (!is_accepting_streams(session)) {
- /* We are no longer accepting new streams and have
- * finished processing existing ones. Time to leave. */
- h2_session_shutdown(session, arg, msg, 0);
- transit(session, "no io", H2_SESSION_ST_DONE);
+ if (session->open_streams > 0) {
+ if (has_unsubmitted_streams(session)
+ || has_suspended_streams(session)) {
+ /* waiting for at least one stream to produce data */
+ transit(session, "no io", H2_SESSION_ST_WAIT);
}
else {
- apr_time_t now = apr_time_now();
- /* When we have no streams, no task event are possible,
- * switch to blocking reads */
- transit(session, "no io (keepalive)", H2_SESSION_ST_IDLE);
- session->idle_until = (session->remote.emitted_count?
- session->s->keep_alive_timeout :
- session->s->timeout) + now;
- session->keep_sync_until = now + apr_time_from_sec(1);
+ /* we have streams open, and all are submitted and none
+ * is suspended. The only thing keeping us from WRITEing
+ * more must be the flow control.
+ * This means we only wait for WINDOW_UPDATE from the
+ * client and can block on READ. */
+ transit(session, "no io (flow wait)", H2_SESSION_ST_IDLE);
+ session->idle_until = apr_time_now() + session->s->timeout;
+ session->keep_sync_until = session->idle_until;
+ /* Make sure we have flushed all previously written output
+ * so that the client will react. */
+ if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
+ return;
+ }
}
}
- else if (!has_unsubmitted_streams(session)
- && !has_suspended_streams(session)) {
- transit(session, "no io (flow wait)", H2_SESSION_ST_IDLE);
- session->idle_until = apr_time_now() + session->s->timeout;
- session->keep_sync_until = session->idle_until;
- /* none of our streams is waiting for a response or
- * new output data from task processing,
- * switch to blocking reads. We are probably waiting on
- * window updates. */
- if (h2_conn_io_flush(&session->io) != APR_SUCCESS) {
- dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL);
- return;
- }
+ else if (is_accepting_streams(session)) {
+ /* When we have no streams, but accept new, switch to idle */
+ apr_time_t now = apr_time_now();
+ transit(session, "no io (keepalive)", H2_SESSION_ST_IDLE);
+ session->idle_until = (session->remote.emitted_count?
+ session->s->keep_alive_timeout :
+ session->s->timeout) + now;
+ session->keep_sync_until = now + apr_time_from_sec(1);
}
else {
- /* Unable to do blocking reads, as we wait on events from
- * task processing in other threads. Do a busy wait with
- * backoff timer. */
- transit(session, "no io", H2_SESSION_ST_WAIT);
+ /* We are no longer accepting new streams and there are
+ * none left. Time to leave. */
+ h2_session_shutdown(session, arg, msg, 0);
+ transit(session, "no io", H2_SESSION_ST_DONE);
}
break;
default:
static void h2_session_ev_stream_done(h2_session *session, int arg, const char *msg)
{
--session->open_streams;
- if (session->open_streams <= 0) {
- }
switch (session->state) {
case H2_SESSION_ST_IDLE:
if (session->open_streams == 0) {
{
apr_status_t status = APR_SUCCESS;
conn_rec *c = session->c;
- int rv, have_written, have_read, mpm_state;
+ int rv, mpm_state, trace = APLOGctrace3(c);
- ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
- "h2_session(%ld): process start, async=%d", session->id, async);
+ if (trace) {
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): process start, async=%d",
+ session->id, async);
+ }
if (c->cs) {
c->cs->state = CONN_STATE_WRITE_COMPLETION;
}
while (1) {
- have_read = have_written = 0;
+ trace = APLOGctrace3(c);
+ session->have_read = session->have_written = 0;
if (!ap_mpm_query(AP_MPMQ_MPM_STATE, &mpm_state)) {
if (mpm_state == AP_MPMQ_STOPPING) {
/* make certain, we send everything before we idle */
if (!session->keep_sync_until && async && !session->open_streams
&& !session->r && session->remote.emitted_count) {
- ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
- "h2_session(%ld): async idle, nonblock read, "
- "%d streams open", session->id,
- session->open_streams);
+ if (trace) {
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): async idle, nonblock read, "
+ "%d streams open", session->id,
+ session->open_streams);
+ }
/* We do not return to the async mpm immediately, since under
* load, mpms show the tendency to throw keep_alive connections
* away very rapidly.
status = h2_session_read(session, 0);
if (status == APR_SUCCESS) {
- have_read = 1;
+ session->have_read = 1;
dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
}
else if (APR_STATUS_IS_EAGAIN(status) || APR_STATUS_IS_TIMEUP(status)) {
}
}
else {
- ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
- "h2_session(%ld): sync idle, stutter 1-sec, "
- "%d streams open", session->id,
- session->open_streams);
+ if (trace) {
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): sync idle, stutter 1-sec, "
+ "%d streams open", session->id,
+ session->open_streams);
+ }
/* We wait in smaller increments, using a 1 second timeout.
* That gives us the chance to check for MPMQ_STOPPING often.
*/
h2_filter_cin_timeout_set(session->cin, apr_time_from_sec(1));
status = h2_session_read(session, 1);
if (status == APR_SUCCESS) {
- have_read = 1;
+ session->have_read = 1;
dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
}
else if (status == APR_EAGAIN) {
session->keep_sync_until = 0;
}
if (now > session->idle_until) {
- ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
- "h2_session(%ld): keepalive timeout",
- session->id);
+ if (trace) {
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): keepalive timeout",
+ session->id);
+ }
dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, "timeout");
}
- else {
+ else if (trace) {
ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
"h2_session(%ld): keepalive, %f sec left",
session->id, (session->idle_until - now) / 1000000.0f);
/* continue reading handling */
}
else {
- ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
- "h2_session(%ld): idle(1 sec timeout) "
- "read failed", session->id);
+ if (trace) {
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): idle(1 sec timeout) "
+ "read failed", session->id);
+ }
dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, "error");
}
}
h2_filter_cin_timeout_set(session->cin, session->s->timeout);
status = h2_session_read(session, 0);
if (status == APR_SUCCESS) {
- have_read = 1;
+ session->have_read = 1;
dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL);
}
else if (status == APR_EAGAIN) {
}
}
- if (session->open_streams) {
- /* resume any streams with output data */
- h2_session_resume_streams_with_data(session);
- /* Submit any responses/push_promises that are ready */
- status = h2_session_submit(session);
- if (status == APR_SUCCESS) {
- have_written = 1;
- }
- else if (status != APR_EAGAIN) {
- dispatch_event(session, H2_SESSION_EV_CONN_ERROR,
- H2_ERR_INTERNAL_ERROR, "submit error");
- break;
- }
- /* send out window updates for our inputs */
- status = h2_mplx_in_update_windows(session->mplx);
- if (status != APR_SUCCESS) {
- dispatch_event(session, H2_SESSION_EV_CONN_ERROR,
- H2_ERR_INTERNAL_ERROR,
- "window update error");
- break;
- }
+ /* trigger window updates, stream resumes and submits */
+ status = h2_mplx_dispatch_master_events(session->mplx,
+ on_stream_resume,
+ on_stream_response,
+ session);
+ if (status != APR_SUCCESS) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): dispatch error",
+ session->id);
+ dispatch_event(session, H2_SESSION_EV_CONN_ERROR,
+ H2_ERR_INTERNAL_ERROR,
+ "dispatch error");
+ break;
}
if (nghttp2_session_want_write(session->ngh2)) {
ap_update_child_status(session->c->sbh, SERVER_BUSY_WRITE, NULL);
status = h2_session_send(session);
- if (status == APR_SUCCESS) {
- have_written = 1;
- }
- else {
+ if (status != APR_SUCCESS) {
dispatch_event(session, H2_SESSION_EV_CONN_ERROR,
- H2_ERR_INTERNAL_ERROR, "writing");
+ H2_ERR_INTERNAL_ERROR, "writing");
break;
}
}
- if (have_read || have_written) {
+ if (session->have_read || session->have_written) {
if (session->wait_us) {
session->wait_us = 0;
}
}
else if ((apr_time_now() - session->start_wait) >= session->s->timeout) {
/* waited long enough */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, APR_TIMEUP, c,
- "h2_session: wait for data");
+ if (trace) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, APR_TIMEUP, c,
+ "h2_session: wait for data");
+ }
dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, "timeout");
break;
}
session->wait_us = H2MIN(session->wait_us*2, MAX_WAIT_MICROS);
}
- if (APLOGctrace1(c)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
+ if (trace) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, c,
"h2_session: wait for data, %ld micros",
(long)session->wait_us);
}
}
out:
- ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c,
- "h2_session(%ld): [%s] process returns",
- session->id, state_name(session->state));
+ if (trace) {
+ ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c,
+ "h2_session(%ld): [%s] process returns",
+ session->id, state_name(session->state));
+ }
if ((session->state != H2_SESSION_ST_DONE)
&& (APR_STATUS_IS_EOF(status)
return 1;
}
-static apr_status_t input_append_eos(h2_task *task, request_rec *r)
+static void make_chunk(h2_task *task, apr_bucket_brigade *bb,
+ apr_bucket *first, apr_uint64_t chunk_len,
+ apr_bucket *tail)
+{
+ /* Surround the buckets [first, tail[ with new buckets carrying the
+ * HTTP/1.1 chunked encoding format. If tail is NULL, the chunk extends
+ * to the end of the brigade. */
+ char buffer[128];
+ apr_bucket *c;
+ int len;
+
+ len = apr_snprintf(buffer, H2_ALEN(buffer),
+ "%"APR_UINT64_T_HEX_FMT"\r\n", chunk_len);
+ c = apr_bucket_heap_create(buffer, len, NULL, bb->bucket_alloc);
+ APR_BUCKET_INSERT_BEFORE(first, c);
+ c = apr_bucket_heap_create("\r\n", 2, NULL, bb->bucket_alloc);
+ if (tail) {
+ APR_BUCKET_INSERT_BEFORE(tail, c);
+ }
+ else {
+ APR_BRIGADE_INSERT_TAIL(bb, c);
+ }
+}
+
+static apr_status_t input_handle_eos(h2_task *task, request_rec *r,
+ apr_bucket *b)
{
apr_status_t status = APR_SUCCESS;
apr_bucket_brigade *bb = task->input.bb;
apr_table_t *t = task->request->trailers;
if (task->input.chunked) {
+ task->input.tmp = apr_brigade_split_ex(bb, b, task->input.tmp);
if (t && !apr_is_empty_table(t)) {
status = apr_brigade_puts(bb, NULL, NULL, "0\r\n");
apr_table_do(input_ser_header, task, t, NULL);
else {
status = apr_brigade_puts(bb, NULL, NULL, "0\r\n\r\n");
}
+ APR_BRIGADE_CONCAT(bb, task->input.tmp);
}
else if (r && t && !apr_is_empty_table(t)){
/* trailers passed in directly. */
apr_table_overlap(r->trailers_in, t, APR_OVERLAP_TABLES_SET);
}
task->input.eos_written = 1;
+ return status;
+}
+
+static apr_status_t input_append_eos(h2_task *task, request_rec *r)
+{
+ apr_status_t status = APR_SUCCESS;
+ apr_bucket_brigade *bb = task->input.bb;
+ apr_table_t *t = task->request->trailers;
+
+ if (task->input.chunked) {
+ if (t && !apr_is_empty_table(t)) {
+ status = apr_brigade_puts(bb, NULL, NULL, "0\r\n");
+ apr_table_do(input_ser_header, task, t, NULL);
+ status = apr_brigade_puts(bb, NULL, NULL, "\r\n");
+ }
+ else {
+ status = apr_brigade_puts(bb, NULL, NULL, "0\r\n\r\n");
+ }
+ }
+ else if (r && t && !apr_is_empty_table(t)){
+ /* trailers passed in directly. */
+ apr_table_overlap(r->trailers_in, t, APR_OVERLAP_TABLES_SET);
+ }
APR_BRIGADE_INSERT_TAIL(bb, apr_bucket_eos_create(bb->bucket_alloc));
+ task->input.eos_written = 1;
return status;
}
apr_read_type_e block, apr_off_t readbytes)
{
apr_status_t status = APR_SUCCESS;
- apr_bucket *b, *next;
+ apr_bucket *b, *next, *first_data;
apr_off_t bblen = 0;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, f->c,
return APR_ECONNABORTED;
}
- if (task->input.bb) {
- /* Cleanup brigades from those nasty 0 length non-meta buckets
- * that apr_brigade_split_line() sometimes produces. */
- for (b = APR_BRIGADE_FIRST(task->input.bb);
- b != APR_BRIGADE_SENTINEL(task->input.bb); b = next) {
- next = APR_BUCKET_NEXT(b);
- if (b->length == 0 && !APR_BUCKET_IS_METADATA(b)) {
- apr_bucket_delete(b);
- }
+ if (!task->input.bb) {
+ if (!task->input.eos_written) {
+ input_append_eos(task, f->r);
}
- apr_brigade_length(task->input.bb, 0, &bblen);
+ return APR_EOF;
}
- if (bblen == 0) {
- if (task->input.eos_written) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, APR_EOF, f->c,
- "h2_task(%s): read no data", task->id);
- return APR_EOF;
- }
- else if (task->input.eos) {
- input_append_eos(task, f->r);
- }
+ /* Cleanup brigades from those nasty 0 length non-meta buckets
+ * that apr_brigade_split_line() sometimes produces. */
+ for (b = APR_BRIGADE_FIRST(task->input.bb);
+ b != APR_BRIGADE_SENTINEL(task->input.bb); b = next) {
+ next = APR_BUCKET_NEXT(b);
+ if (b->length == 0 && !APR_BUCKET_IS_METADATA(b)) {
+ apr_bucket_delete(b);
+ }
}
while (APR_BRIGADE_EMPTY(task->input.bb)) {
+ if (task->input.eos_written) {
+ return APR_EOF;
+ }
+
/* Get more input data for our request. */
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
"h2_task(%s): get more data from mplx, block=%d, "
return status;
}
- apr_brigade_length(task->input.bb, 0, &bblen);
- if (bblen > 0 && task->input.chunked) {
- /* need to add chunks since request processing expects it */
- char buffer[128];
- apr_bucket *b;
- int len;
-
- len = apr_snprintf(buffer, H2_ALEN(buffer), "%lx\r\n",
- (unsigned long)bblen);
- b = apr_bucket_heap_create(buffer, len, NULL,
- task->input.bb->bucket_alloc);
- APR_BRIGADE_INSERT_HEAD(task->input.bb, b);
- status = apr_brigade_puts(task->input.bb, NULL, NULL, "\r\n");
- }
-
- if (h2_util_has_eos(task->input.bb, -1)) {
- task->input.eos = 1;
- }
-
- if (task->input.eos && !task->input.eos_written) {
- input_append_eos(task, f->r);
+ /* Inspect the buckets received, detect EOS and apply
+ * chunked encoding if necessary */
+ h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2,
+ "input.beam recv raw", task->input.bb);
+ first_data = NULL;
+ bblen = 0;
+ for (b = APR_BRIGADE_FIRST(task->input.bb);
+ b != APR_BRIGADE_SENTINEL(task->input.bb); b = next) {
+ next = APR_BUCKET_NEXT(b);
+ if (APR_BUCKET_IS_METADATA(b)) {
+ if (first_data && task->input.chunked) {
+ make_chunk(task, task->input.bb, first_data, bblen, b);
+ first_data = NULL;
+ bblen = 0;
+ }
+ if (APR_BUCKET_IS_EOS(b)) {
+ task->input.eos = 1;
+ input_handle_eos(task, f->r, b);
+ h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2,
+ "input.bb after handle eos",
+ task->input.bb);
+ }
+ }
+ else if (b->length == 0) {
+ apr_bucket_delete(b);
+ }
+ else {
+ if (!first_data) {
+ first_data = b;
+ }
+ bblen += b->length;
+ }
}
+ if (first_data && task->input.chunked) {
+ make_chunk(task, task->input.bb, first_data, bblen, NULL);
+ }
if (h2_task_logio_add_bytes_in) {
h2_task_logio_add_bytes_in(f->c, bblen);
}
}
+ if (!task->input.eos_written && task->input.eos) {
+ input_append_eos(task, f->r);
+ }
+
h2_util_bb_log(f->c, task->stream_id, APLOG_TRACE2,
"task_input.bb", task->input.bb);