m->q = h2_iq_create(m->pool, m->max_streams);
m->workers = workers;
- m->processing_max = workers->max_workers;
+ m->processing_max = m->max_streams;
m->processing_limit = 6; /* the original h1 max parallel connections */
m->last_mood_change = apr_time_now();
m->mood_update_interval = apr_time_from_msec(100);
if (stream->c2) {
conn_rec *c2 = stream->c2;
h2_conn_ctx_t *c2_ctx = h2_conn_ctx_get(c2);
+ h2_c2_transit *transit;
stream->c2 = NULL;
ap_assert(c2_ctx);
- h2_c2_destroy(c2);
- if (c2_ctx->transit) {
- c2_transit_recycle(m, c2_ctx->transit);
- c2_ctx->transit = NULL;
+ transit = c2_ctx->transit;
+ h2_c2_destroy(c2); /* c2_ctx is gone as well */
+ if (transit) {
+ c2_transit_recycle(m, transit);
}
}
h2_stream_destroy(stream);
}
}
+static void add_stream_poll_event(h2_mplx *m, int stream_id, h2_iqueue *q)
+{
+ apr_thread_mutex_lock(m->poll_lock);
+ if (h2_iq_append(q, stream_id) && h2_iq_count(q) == 1) {
+ /* newly added first */
+ apr_pollset_wakeup(m->pollset);
+ }
+ apr_thread_mutex_unlock(m->poll_lock);
+}
+
static void c2_beam_input_read_notify(void *ctx, h2_bucket_beam *beam)
{
conn_rec *c = ctx;
h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c);
if (conn_ctx && conn_ctx->stream_id) {
- apr_thread_mutex_lock(conn_ctx->mplx->poll_lock);
- h2_iq_append(conn_ctx->mplx->streams_input_read, conn_ctx->stream_id);
- apr_pollset_wakeup(conn_ctx->mplx->pollset);
- apr_thread_mutex_unlock(conn_ctx->mplx->poll_lock);
+ add_stream_poll_event(conn_ctx->mplx, conn_ctx->stream_id,
+ conn_ctx->mplx->streams_input_read);
}
}
h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c);
if (conn_ctx && conn_ctx->stream_id) {
- apr_thread_mutex_lock(conn_ctx->mplx->poll_lock);
- h2_iq_append(conn_ctx->mplx->streams_output_written, conn_ctx->stream_id);
- apr_pollset_wakeup(conn_ctx->mplx->pollset);
- apr_thread_mutex_unlock(conn_ctx->mplx->poll_lock);
+ add_stream_poll_event(conn_ctx->mplx, conn_ctx->stream_id,
+ conn_ctx->mplx->streams_output_written);
}
}
static apr_status_t h2_session_send(h2_session *session)
{
- int ngrv;
+ int ngrv, pending = 0;
apr_status_t rv = APR_SUCCESS;
while (nghttp2_session_want_write(session->ngh2)) {
ngrv = nghttp2_session_send(session->ngh2);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c1,
"nghttp2_session_send: %d", (int)ngrv);
-
+ pending = 1;
if (ngrv != 0 && ngrv != NGHTTP2_ERR_WOULDBLOCK) {
if (nghttp2_is_fatal(ngrv)) {
h2_session_dispatch_event(session, H2_SESSION_EV_PROTO_ERROR,
goto cleanup;
}
}
+ if (h2_c1_io_needs_flush(&session->io)) {
+ rv = h2_c1_io_assure_flushed(&session->io);
+ pending = 0;
+ }
+ }
+ if (pending) {
rv = h2_c1_io_pass(&session->io);
}
cleanup:
if (!h2_session_want_send(session)) {
if (session->local.accepting) {
/* We wait for new frames on c1 only. */
- transit(session, "c1 keepalive", H2_SESSION_ST_IDLE);
+ transit(session, "all streams done", H2_SESSION_ST_IDLE);
}
else {
/* We are no longer accepting new streams.
/* Give any new incoming request a short grace period to
* arrive while we are still hot and return to the mpm
* connection handling when nothing really happened. */
- h2_mplx_c1_poll(session->mplx, apr_time_from_msec(100),
- on_stream_input, on_stream_output, session);
+ h2_c1_read(session);
if (H2_SESSION_ST_IDLE == session->state) {
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c,
H2_SSSN_LOG(APLOGNO(10306), session,