#define H2_MPLX_ENTER_ALWAYS(m) \
apr_thread_mutex_lock(m->lock)
-#define H2_MPLX_ENTER_MAYBE(m, lock) \
- if (lock) apr_thread_mutex_lock(m->lock)
+#define H2_MPLX_ENTER_MAYBE(m, dolock) \
+ if (dolock) apr_thread_mutex_lock(m->lock)
-#define H2_MPLX_LEAVE_MAYBE(m, lock) \
- if (lock) apr_thread_mutex_unlock(m->lock)
+#define H2_MPLX_LEAVE_MAYBE(m, dolock) \
+ if (dolock) apr_thread_mutex_unlock(m->lock)
-static void check_data_for(h2_mplx *m, h2_stream *stream, int lock);
+static void check_data_for(h2_mplx *m, h2_stream *stream, int mplx_is_locked);
static void stream_output_consumed(void *ctx,
h2_bucket_beam *beam, apr_off_t length)
{
ap_assert(!h2_task_has_started(stream->task) || stream->task->worker_done);
+ h2_ififo_remove(m->readyq, stream->id);
h2_ihash_remove(m->shold, stream->id);
h2_ihash_add(m->spurge, stream);
}
h2_ihash_remove(m->streams, stream->id);
h2_iq_remove(m->q, stream->id);
- h2_ififo_remove(m->readyq, stream->id);
- h2_ihash_add(m->shold, stream);
if (!h2_task_has_started(stream->task) || stream->task->done_done) {
stream_joined(m, stream);
}
- else if (stream->task) {
- stream->task->c->aborted = 1;
+ else {
+ h2_ififo_remove(m->readyq, stream->id);
+ h2_ihash_add(m->shold, stream);
+ if (stream->task) {
+ stream->task->c->aborted = 1;
+ }
}
}
h2_stream *stream = ctx;
h2_mplx *m = stream->session->mplx;
- check_data_for(m, stream, 1);
+ check_data_for(m, stream, 0);
}
static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
{
- apr_status_t status = APR_SUCCESS;
h2_stream *stream = h2_ihash_get(m->streams, stream_id);
if (!stream || !stream->task || m->aborted) {
h2_beam_log(beam, m->c, APLOG_TRACE2, "out_open");
}
else {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%s): out open", stream->task->id);
}
/* we might see some file buckets in the output, see
* if we have enough handles reserved. */
- check_data_for(m, stream, 0);
- return status;
+ check_data_for(m, stream, 1);
+ return APR_SUCCESS;
}
apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam)
status = h2_beam_close(task->output.beam);
h2_beam_log(task->output.beam, m->c, APLOG_TRACE2, "out_close");
output_consumed_signal(m, task);
- check_data_for(m, stream, 0);
+ check_data_for(m, stream, 1);
return status;
}
return status;
}
-static void check_data_for(h2_mplx *m, h2_stream *stream, int lock)
+static void check_data_for(h2_mplx *m, h2_stream *stream, int mplx_is_locked)
{
+ /* If m->lock is already held, we must release during h2_ififo_push()
+ * which can wait on its not_full condition, causing a deadlock because
+ * no one would then be able to acquire m->lock to empty the fifo.
+ */
+ H2_MPLX_LEAVE_MAYBE(m, mplx_is_locked);
if (h2_ififo_push(m->readyq, stream->id) == APR_SUCCESS) {
+ H2_MPLX_ENTER_ALWAYS(m);
apr_atomic_set32(&m->event_pending, 1);
- H2_MPLX_ENTER_MAYBE(m, lock);
if (m->added_output) {
apr_thread_cond_signal(m->added_output);
}
- H2_MPLX_LEAVE_MAYBE(m, lock);
+ H2_MPLX_LEAVE_MAYBE(m, !mplx_is_locked);
+ }
+ else {
+ H2_MPLX_ENTER_MAYBE(m, mplx_is_locked);
}
}
h2_ihash_add(m->streams, stream);
if (h2_stream_is_ready(stream)) {
/* already have a response */
- check_data_for(m, stream, 0);
+ check_data_for(m, stream, 1);
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
H2_STRM_MSG(stream, "process, add to readyq"));
}
}
/* more data will not arrive, resume the stream */
- check_data_for(m, stream, 0);
+ check_data_for(m, stream, 1);
}
}
else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) {
h2_beam_is_closed(stream->output),
(long)h2_beam_get_buffered(stream->output));
h2_ihash_add(m->streams, stream);
- check_data_for(m, stream, 0);
+ check_data_for(m, stream, 1);
stream->out_checked = 1;
status = APR_EAGAIN;
}
apr_status_t h2_mplx_keep_active(h2_mplx *m, h2_stream *stream)
{
- check_data_for(m, stream, 1);
+ check_data_for(m, stream, 0);
return APR_SUCCESS;
}
apr_status_t rv;
if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
fifo->aborted = 1;
- apr_thread_mutex_unlock(fifo->lock);
- }
- return rv;
-}
-
-apr_status_t h2_fifo_interrupt(h2_fifo *fifo)
-{
- apr_status_t rv;
- if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
apr_thread_cond_broadcast(fifo->not_empty);
apr_thread_cond_broadcast(fifo->not_full);
apr_thread_mutex_unlock(fifo->lock);
{
apr_status_t rv;
- if (fifo->aborted) {
- return APR_EOF;
- }
-
if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
rv = fifo_push_int(fifo, elem, block);
apr_thread_mutex_unlock(fifo->lock);
{
apr_status_t rv;
- if (fifo->aborted) {
- return APR_EOF;
- }
-
if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
rv = pull_head(fifo, pelem, block);
apr_thread_mutex_unlock(fifo->lock);
apr_status_t rv;
if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
fifo->aborted = 1;
- apr_thread_mutex_unlock(fifo->lock);
- }
- return rv;
-}
-
-apr_status_t h2_ififo_interrupt(h2_ififo *fifo)
-{
- apr_status_t rv;
- if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
apr_thread_cond_broadcast(fifo->not_empty);
apr_thread_cond_broadcast(fifo->not_full);
apr_thread_mutex_unlock(fifo->lock);
{
apr_status_t rv;
- if (fifo->aborted) {
- return APR_EOF;
- }
-
if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
rv = ififo_push_int(fifo, id, block);
apr_thread_mutex_unlock(fifo->lock);
{
apr_status_t rv;
- if (fifo->aborted) {
- return APR_EOF;
- }
-
if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
rv = ipull_head(fifo, pi, block);
apr_thread_mutex_unlock(fifo->lock);
apr_status_t rv;
int id;
- if (fifo->aborted) {
- return APR_EOF;
- }
-
if (APR_SUCCESS == (rv = apr_thread_mutex_lock(fifo->lock))) {
if (APR_SUCCESS == (rv = ipull_head(fifo, &id, block))) {
switch (fn(id, ctx)) {
return ififo_peek(fifo, fn, ctx, 0);
}
-apr_status_t h2_ififo_remove(h2_ififo *fifo, int id)
+static apr_status_t ififo_remove(h2_ififo *fifo, int id)
{
- apr_status_t rv;
+ int rc, i;
if (fifo->aborted) {
return APR_EOF;
}
- if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
- int i, rc;
- int e;
-
- rc = 0;
- for (i = 0; i < fifo->count; ++i) {
- e = fifo->elems[inth_index(fifo, i)];
- if (e == id) {
- ++rc;
- }
- else if (rc) {
- fifo->elems[inth_index(fifo, i-rc)] = e;
- }
- }
- if (rc) {
- fifo->count -= rc;
- if (fifo->count + rc == fifo->nelems) {
- apr_thread_cond_broadcast(fifo->not_full);
- }
- rv = APR_SUCCESS;
+ rc = 0;
+ for (i = 0; i < fifo->count; ++i) {
+ int e = fifo->elems[inth_index(fifo, i)];
+ if (e == id) {
+ ++rc;
}
- else {
- rv = APR_EAGAIN;
+ else if (rc) {
+ fifo->elems[inth_index(fifo, i-rc)] = e;
}
-
+ }
+ if (!rc) {
+ return APR_EAGAIN;
+ }
+ fifo->count -= rc;
+ if (fifo->count + rc == fifo->nelems) {
+ apr_thread_cond_broadcast(fifo->not_full);
+ }
+ return APR_SUCCESS;
+}
+
+apr_status_t h2_ififo_remove(h2_ififo *fifo, int id)
+{
+ apr_status_t rv;
+
+ if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+ rv = ififo_remove(fifo, id);
apr_thread_mutex_unlock(fifo->lock);
}
return rv;