static apr_status_t m_be_annoyed(h2_mplx *m);
static apr_status_t mplx_pollset_create(h2_mplx *m);
-static apr_status_t mplx_pollset_add(h2_mplx *m, h2_conn_ctx_t *conn_ctx);
-static apr_status_t mplx_pollset_remove(h2_mplx *m, h2_conn_ctx_t *conn_ctx);
static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout,
stream_ev_callback *on_stream_input,
stream_ev_callback *on_stream_output,
"nghttp2: could not create pollset");
goto failure;
}
- m->streams_to_poll = apr_array_make(m->pool, 10, sizeof(h2_stream*));
m->streams_ev_in = apr_array_make(m->pool, 10, sizeof(h2_stream*));
m->streams_ev_out = apr_array_make(m->pool, 10, sizeof(h2_stream*));
m->streams_input_read = h2_iq_create(m->pool, 10);
+ m->streams_output_written = h2_iq_create(m->pool, 10);
status = apr_thread_mutex_create(&m->poll_lock, APR_THREAD_MUTEX_DEFAULT,
m->pool);
if (APR_SUCCESS != status) goto failure;
-#if !H2_POLL_STREAMS
- m->streams_output_written = h2_iq_create(m->pool, 10);
-#endif
conn_ctx = h2_conn_ctx_get(m->c1);
- mplx_pollset_add(m, conn_ctx);
+ if (conn_ctx->pfd.reqevents) {
+ apr_pollset_add(m->pollset, &conn_ctx->pfd);
+ }
m->scratch_r = apr_pcalloc(m->pool, sizeof(*m->scratch_r));
m->max_spare_transits = 3;
if (stream->c2) {
conn_rec *c2 = stream->c2;
h2_conn_ctx_t *c2_ctx = h2_conn_ctx_get(c2);
- apr_status_t rv;
stream->c2 = NULL;
ap_assert(c2_ctx);
- rv = mplx_pollset_remove(m, c2_ctx);
- if (APR_SUCCESS != rv) {
- ap_log_cerror(APLOG_MARK, APLOG_INFO, rv, m->c1,
- "h2_mplx(%ld-%d): pollset_remove %d on purge",
- m->id, stream->id, c2_ctx->stream_id);
- }
-
h2_c2_destroy(c2);
if (c2_ctx->transit) {
c2_transit_recycle(m, c2_ctx->transit);
h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c);
(void)beam;
- if (conn_ctx && conn_ctx->stream_id && conn_ctx->pipe_in_prod[H2_PIPE_IN]) {
- apr_file_putc(1, conn_ctx->pipe_in_prod[H2_PIPE_IN]);
+ if (conn_ctx && conn_ctx->stream_id && conn_ctx->pipe_in[H2_PIPE_IN]) {
+ apr_file_putc(1, conn_ctx->pipe_in[H2_PIPE_IN]);
}
}
h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c);
if (conn_ctx && conn_ctx->stream_id) {
- if (conn_ctx->pipe_out_prod[H2_PIPE_IN]) {
- apr_file_putc(1, conn_ctx->pipe_out_prod[H2_PIPE_IN]);
- }
-#if !H2_POLL_STREAMS
- else {
- apr_thread_mutex_lock(conn_ctx->mplx->poll_lock);
- h2_iq_append(conn_ctx->mplx->streams_output_written, conn_ctx->stream_id);
- apr_pollset_wakeup(conn_ctx->mplx->pollset);
- apr_thread_mutex_unlock(conn_ctx->mplx->poll_lock);
- }
-#endif
+ apr_thread_mutex_lock(conn_ctx->mplx->poll_lock);
+ h2_iq_append(conn_ctx->mplx->streams_output_written, conn_ctx->stream_id);
+ apr_pollset_wakeup(conn_ctx->mplx->pollset);
+ apr_thread_mutex_unlock(conn_ctx->mplx->poll_lock);
}
}
h2_beam_on_consumed(stream->input, c1_input_consumed, stream);
}
-#if H2_POLL_STREAMS
- if (!conn_ctx->pipe_out_prod[H2_PIPE_OUT]) {
- action = "create output pipe";
- rv = apr_file_pipe_create_pools(&conn_ctx->pipe_out_prod[H2_PIPE_OUT],
- &conn_ctx->pipe_out_prod[H2_PIPE_IN],
- APR_FULL_NONBLOCK,
- c2->pool, c2->pool);
- if (APR_SUCCESS != rv) goto cleanup;
- }
- conn_ctx->pfd_out_prod.desc_type = APR_POLL_FILE;
- conn_ctx->pfd_out_prod.desc.f = conn_ctx->pipe_out_prod[H2_PIPE_OUT];
- conn_ctx->pfd_out_prod.reqevents = APR_POLLIN | APR_POLLERR | APR_POLLHUP;
- conn_ctx->pfd_out_prod.client_data = conn_ctx;
-
+#if H2_USE_PIPES
if (stream->input) {
- if (!conn_ctx->pipe_in_prod[H2_PIPE_OUT]) {
+ if (!conn_ctx->pipe_in[H2_PIPE_OUT]) {
action = "create input write pipe";
- rv = apr_file_pipe_create_pools(&conn_ctx->pipe_in_prod[H2_PIPE_OUT],
- &conn_ctx->pipe_in_prod[H2_PIPE_IN],
+ rv = apr_file_pipe_create_pools(&conn_ctx->pipe_in[H2_PIPE_OUT],
+ &conn_ctx->pipe_in[H2_PIPE_IN],
APR_READ_BLOCK,
c2->pool, c2->pool);
if (APR_SUCCESS != rv) goto cleanup;
}
}
#else
- memset(&conn_ctx->pfd_out_prod, 0, sizeof(conn_ctx->pfd_out_prod));
- memset(&conn_ctx->pipe_in_prod, 0, sizeof(conn_ctx->pipe_in_prod));
+ memset(&conn_ctx->pipe_in, 0, sizeof(conn_ctx->pipe_in));
#endif
cleanup:
stream->c2 = c2;
++m->processing_count;
- APR_ARRAY_PUSH(m->streams_to_poll, h2_stream *) = stream;
- apr_pollset_wakeup(m->pollset);
cleanup:
if (APR_SUCCESS != rv && c2) {
static apr_status_t mplx_pollset_create(h2_mplx *m)
{
- int max_pdfs;
-
- /* stream0 output, pdf_out+pfd_in_consume per active streams */
- max_pdfs = 1 + 2 * H2MIN(m->processing_max, m->max_streams);
- return apr_pollset_create(&m->pollset, max_pdfs, m->pool,
+ /* stream0 output only */
+ return apr_pollset_create(&m->pollset, 1, m->pool,
APR_POLLSET_WAKEABLE);
}
-static apr_status_t mplx_pollset_add(h2_mplx *m, h2_conn_ctx_t *conn_ctx)
-{
- if (conn_ctx->pfd_out_prod.reqevents) {
- return apr_pollset_add(m->pollset, &conn_ctx->pfd_out_prod);
- }
- return APR_SUCCESS;
-}
-
-static apr_status_t mplx_pollset_remove(h2_mplx *m, h2_conn_ctx_t *conn_ctx)
-{
- apr_status_t rv = APR_SUCCESS;
-
- if (conn_ctx->pfd_out_prod.reqevents) {
- rv = apr_pollset_remove(m->pollset, &conn_ctx->pfd_out_prod);
- conn_ctx->pfd_out_prod.reqevents = 0;
- }
- return rv;
-}
-
static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout,
stream_ev_callback *on_stream_input,
stream_ev_callback *on_stream_output,
do {
/* add streams we started processing in the meantime */
- if (m->streams_to_poll->nelts) {
- for (i = 0; i < m->streams_to_poll->nelts; ++i) {
- stream = APR_ARRAY_IDX(m->streams_to_poll, i, h2_stream*);
- if (stream && stream->c2 && (conn_ctx = h2_conn_ctx_get(stream->c2))) {
- mplx_pollset_add(m, conn_ctx);
- }
- }
- apr_array_clear(m->streams_to_poll);
- }
-
apr_thread_mutex_lock(m->poll_lock);
- if (!h2_iq_empty(m->streams_input_read)) {
+ if (!h2_iq_empty(m->streams_input_read)
+ || !h2_iq_empty(m->streams_output_written)) {
while ((i = h2_iq_shift(m->streams_input_read))) {
stream = h2_ihash_get(m->streams, i);
if (stream) {
APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = stream;
}
}
- nresults = 0;
- rv = APR_SUCCESS;
- apr_thread_mutex_unlock(m->poll_lock);
- break;
- }
-#if !H2_POLL_STREAMS
- if (!h2_iq_empty(m->streams_output_written)) {
while ((i = h2_iq_shift(m->streams_output_written))) {
stream = h2_ihash_get(m->streams, i);
if (stream) {
apr_thread_mutex_unlock(m->poll_lock);
break;
}
-#endif
apr_thread_mutex_unlock(m->poll_lock);
H2_MPLX_LEAVE(m);
pfd = &results[i];
conn_ctx = pfd->client_data;
- ap_assert(conn_ctx);
+ AP_DEBUG_ASSERT(conn_ctx);
if (conn_ctx->stream_id == 0) {
if (on_stream_input) {
APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = m->stream0;
}
continue;
}
-
- h2_util_drain_pipe(pfd->desc.f);
- stream = h2_ihash_get(m->streams, conn_ctx->stream_id);
- if (!stream) {
- stream = h2_ihash_get(m->shold, conn_ctx->stream_id);
- if (stream) {
- /* This is normal and means that stream processing on c1 has
- * already finished to CLEANUP and c2 is not done yet */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, m->c1,
- "h2_mplx(%ld-%d): stream already in hold for poll event %hx",
- m->id, conn_ctx->stream_id, pfd->rtnevents);
- }
- else {
- h2_stream *sp = NULL;
- int j;
-
- for (j = 0; j < m->spurge->nelts; ++j) {
- sp = APR_ARRAY_IDX(m->spurge, j, h2_stream*);
- if (sp->id == conn_ctx->stream_id) {
- stream = sp;
- break;
- }
- }
-
- if (stream) {
- /* This is normal and means that stream processing on c1 has
- * already finished to CLEANUP and c2 is not done yet */
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, rv, m->c1, APLOGNO(10311)
- "h2_mplx(%ld-%d): stream already in purge for poll event %hx",
- m->id, conn_ctx->stream_id, pfd->rtnevents);
- }
- else {
- /* This should not happen. When a stream has been purged,
- * it MUST no longer appear in the pollset. Puring is done
- * outside the poll result processing. */
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, rv, m->c1, APLOGNO(10312)
- "h2_mplx(%ld-%d): stream no longer known for poll event %hx"
- ", m->streams=%d, conn_ctx=%lx, fd=%lx",
- m->id, conn_ctx->stream_id, pfd->rtnevents,
- (int)h2_ihash_count(m->streams),
- (long)conn_ctx, (long)pfd->desc.f);
- h2_ihash_iter(m->streams, m_report_stream_iter, m);
- }
- }
- continue;
- }
-
- if (conn_ctx->pfd_out_prod.desc.f == pfd->desc.f) {
- /* output is available */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
- "[%s-%d] poll output event %hx",
- conn_ctx->id, conn_ctx->stream_id,
- pfd->rtnevents);
- APR_ARRAY_PUSH(m->streams_ev_out, h2_stream*) = stream;
- }
}
if (on_stream_input && m->streams_ev_in->nelts) {