if (task->output.beam) {
m->tx_handles_reserved +=
h2_beam_get_files_beamed(task->output.beam);
+ h2_beam_on_produced(task->output.beam, NULL, NULL);
}
slave = task->c;
return 1;
}
+static int report_stream_iter(void *ctx, void *val) {
+ h2_mplx *m = ctx;
+ h2_stream *stream = val;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld-%d): exists, started=%d, scheduled=%d, "
+ "submitted=%d, suspended=%d",
+ m->id, stream->id, stream->started, stream->scheduled,
+ stream->submitted, stream->suspended);
+ return 1;
+}
+
apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
{
apr_status_t status;
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
int i, wait_secs = 5;
+
+ if (!h2_ihash_empty(m->streams) && APLOGctrace1(m->c)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): release_join with %d streams open, "
+ "%d streams resume, %d streams ready, %d tasks",
+ m->id, (int)h2_ihash_count(m->streams),
+ (int)h2_ihash_count(m->sresume),
+ (int)h2_ihash_count(m->sready),
+ (int)h2_ihash_count(m->tasks));
+ h2_ihash_iter(m->streams, report_stream_iter, m);
+ }
/* disable WINDOW_UPDATE callbacks */
h2_mplx_set_consumed_cb(m, NULL, NULL);
purge_streams(m);
}
AP_DEBUG_ASSERT(h2_ihash_empty(m->spurge));
- AP_DEBUG_ASSERT(h2_ihash_empty(m->tasks));
if (!h2_ihash_empty(m->tasks)) {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03056)
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
"h2_mplx(%ld): release_join -> destroy, "
"%d tasks still present",
m->id, (int)h2_ihash_count(m->tasks));
if (m->aborted) {
status = APR_ECONNABORTED;
}
- else if (stream->response) {
- /* already have a respone, schedule for submit */
- h2_ihash_add(m->sready, stream);
- }
else {
- h2_beam_create(&stream->input, stream->pool, stream->id,
- "input", 0);
h2_ihash_add(m->streams, stream);
-
- if (!m->need_registration) {
- m->need_registration = h2_iq_empty(m->q);
+ if (stream->response) {
+ /* already have a respone, schedule for submit */
+ h2_ihash_add(m->sready, stream);
}
- if (m->workers_busy < m->workers_max) {
- do_registration = m->need_registration;
+ else {
+ h2_beam_create(&stream->input, stream->pool, stream->id,
+ "input", 0);
+ if (!m->need_registration) {
+ m->need_registration = h2_iq_empty(m->q);
+ }
+ if (m->workers_busy < m->workers_max) {
+ do_registration = m->need_registration;
+ }
+ h2_iq_add(m->q, stream->id, cmp, ctx);
+
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+ "h2_mplx(%ld-%d): process, body=%d",
+ m->c->id, stream->id, stream->request->body);
}
- h2_iq_add(m->q, stream->id, cmp, ctx);
-
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
- "h2_mplx(%ld-%d): process, body=%d",
- m->c->id, stream->id, stream->request->body);
}
leave_mutex(m, acquired);
}
task->done_at = apr_time_now();
if (task->output.beam) {
h2_beam_on_consumed(task->output.beam, NULL, NULL);
- h2_beam_on_produced(task->output.beam, NULL, NULL);
h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL);
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
* mplx master events dispatching
******************************************************************************/
-typedef struct {
- h2_mplx *m;
- stream_ev_callback *on_resume;
- stream_ev_callback *on_response;
- void *on_ctx;
- apr_status_t status;
-} dispatch_ctx;
-
static int update_window(void *ctx, void *val)
{
input_consumed_signal(ctx, val);
return 1;
}
-static int stream_ready_iter(void *data, void *val)
-{
- dispatch_ctx *ctx = data;
- h2_stream *stream = val;
- h2_task *task = h2_ihash_get(ctx->m->tasks, stream->id);
-
- if (task) {
- task->submitted = 1;
- if (task->rst_error) {
- h2_stream_rst(stream, task->rst_error);
- }
- else {
- AP_DEBUG_ASSERT(task->response);
- h2_stream_set_response(stream, task->response, task->output.beam);
- }
- }
- else {
- /* We have the stream ready without a task. This happens
- * when we fail streams early. A response should already
- * be present. */
- AP_DEBUG_ASSERT(stream->response || stream->rst_error);
- }
-
- ctx->status = ctx->on_response(ctx->on_ctx, stream->id);
- return 1;
-}
-
-static int stream_resume_iter(void *data, void *val)
-{
- dispatch_ctx *ctx = data;
- h2_stream *stream = val;
-
- h2_stream_set_suspended(stream, 0);
- ctx->status = ctx->on_resume(ctx->on_ctx, stream->id);
- return 1;
-}
-
apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
stream_ev_callback *on_resume,
stream_ev_callback *on_response,
{
apr_status_t status;
int acquired;
+ int streams[32];
+ h2_stream *stream;
+ h2_task *task;
+ size_t i, n;
AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- dispatch_ctx ctx;
- ctx.m = m;
- ctx.on_resume = on_resume;
- ctx.on_response = on_response;
- ctx.on_ctx = on_ctx;
- ctx.status = APR_SUCCESS;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
"h2_mplx(%ld): dispatch events", m->id);
+
/* update input windows for streams */
h2_ihash_iter(m->streams, update_window, m);
- if (ctx.on_response) {
- h2_ihash_iter(m->sready, stream_ready_iter, &ctx);
- h2_ihash_clear(m->sready);
+ if (on_response && !h2_ihash_empty(m->sready)) {
+ n = h2_ihash_ishift(m->sready, streams, H2_ALEN(streams));
+ for (i = 0; i < n; ++i) {
+ stream = h2_ihash_get(m->streams, streams[i]);
+ if (!stream) {
+ continue;
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+ "h2_mplx(%ld-%d): on_response",
+ m->id, stream->id);
+ task = h2_ihash_get(m->tasks, stream->id);
+ if (task) {
+ task->submitted = 1;
+ if (task->rst_error) {
+ h2_stream_rst(stream, task->rst_error);
+ }
+ else {
+ AP_DEBUG_ASSERT(task->response);
+ h2_stream_set_response(stream, task->response, task->output.beam);
+ }
+ }
+ else {
+ /* We have the stream ready without a task. This happens
+ * when we fail streams early. A response should already
+ * be present. */
+ AP_DEBUG_ASSERT(stream->response || stream->rst_error);
+ }
+ status = on_response(on_ctx, stream->id);
+ }
}
- if (ctx.on_resume) {
- h2_ihash_iter(m->sresume, stream_resume_iter, &ctx);
- h2_ihash_clear(m->sresume);
+ if (on_resume && !h2_ihash_empty(m->sresume)) {
+ n = h2_ihash_ishift(m->sresume, streams, H2_ALEN(streams));
+ for (i = 0; i < n; ++i) {
+ stream = h2_ihash_get(m->streams, streams[i]);
+ if (!stream) {
+ continue;
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c,
+ "h2_mplx(%ld-%d): on_resume",
+ m->id, stream->id);
+ h2_stream_set_suspended(stream, 0);
+ status = on_resume(on_ctx, stream->id);
+ }
}
leave_mutex(m, acquired);
- return ctx.status;
}
return status;
}
h2_mplx *m = ctx;
apr_status_t status;
h2_stream *stream;
- h2_task *task;
int acquired;
AP_DEBUG_ASSERT(m);
stream = h2_ihash_get(m->streams, beam->id);
if (stream && h2_stream_is_suspended(stream)) {
h2_ihash_add(m->sresume, stream);
- task = h2_ihash_get(m->tasks, stream->id);
- if (task && task->output.beam) {
- h2_beam_on_produced(task->output.beam, NULL, NULL);
- }
+ h2_beam_on_produced(beam, NULL, NULL);
have_out_data_for(m, beam->id);
}
leave_mutex(m, acquired);
AP_DEBUG_ASSERT(m);
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
stream = h2_ihash_get(m->streams, stream_id);
- if (stream && !h2_stream_is_suspended(stream)) {
+ if (stream) {
h2_stream_set_suspended(stream, 1);
task = h2_ihash_get(m->tasks, stream->id);
- if (task && task->output.beam && h2_beam_empty(task->output.beam)) {
- /* register callback so that we can resume on new output */
- h2_beam_on_produced(task->output.beam, output_produced, m);
+ if (stream->started && (!task || task->worker_done)) {
+ h2_ihash_add(m->sresume, stream);
}
else {
- /* if the beam got data in the meantime, add this to the to-be
- * resumed streams right away. */
- h2_ihash_add(m->sresume, stream);
+ /* register callback so that we can resume on new output */
+ h2_beam_on_produced(task->output.beam, output_produced, m);
}
}
leave_mutex(m, acquired);