* bucket beam that can transport buckets across threads
******************************************************************************/
-static apr_status_t enter_yellow(h2_bucket_beam *beam,
- apr_thread_mutex_t **plock, int *pacquired)
+static apr_status_t enter_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
{
if (beam->m_enter) {
- return beam->m_enter(beam->m_ctx, plock, pacquired);
+ return beam->m_enter(beam->m_ctx, pbl);
}
- *plock = NULL;
- *pacquired = 0;
+ pbl->mutex = NULL;
+ pbl->leave = NULL;
return APR_SUCCESS;
}
-static void leave_yellow(h2_bucket_beam *beam,
- apr_thread_mutex_t *lock, int acquired)
+static void leave_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
{
- if (acquired && beam->m_leave) {
- beam->m_leave(beam->m_ctx, lock, acquired);
+ if (pbl->leave) {
+ pbl->leave(pbl->leave_ctx, pbl->mutex);
}
}
}
static apr_status_t r_wait_space(h2_bucket_beam *beam, apr_read_type_e block,
- apr_thread_mutex_t *lock, apr_off_t *premain)
+ h2_beam_lock *pbl, apr_off_t *premain)
{
*premain = calc_space_left(beam);
while (!beam->aborted && *premain <= 0
- && (block == APR_BLOCK_READ) && lock) {
- apr_status_t status = wait_cond(beam, lock);
+ && (block == APR_BLOCK_READ) && pbl->mutex) {
+ apr_status_t status = wait_cond(beam, pbl->mutex);
if (APR_STATUS_IS_TIMEUP(status)) {
return status;
}
static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
{
- apr_thread_mutex_t *lock;
- int acquired;
+ h2_beam_lock bl;
- if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ if (enter_yellow(beam, &bl) == APR_SUCCESS) {
/* even when beam buckets are split, only the one where
* refcount drops to 0 will call us */
H2_BPROXY_REMOVE(proxy);
proxy->bred = NULL;
}
/* notify anyone waiting on space to become available */
- if (!lock) {
+ if (!bl.mutex) {
r_purge_reds(beam);
}
else if (beam->m_cond) {
apr_thread_cond_broadcast(beam->m_cond);
}
- leave_yellow(beam, lock, acquired);
+ leave_yellow(beam, &bl);
}
}
apr_status_t h2_beam_destroy(h2_bucket_beam *beam)
{
- apr_pool_cleanup_kill(beam->life_pool, beam, beam_cleanup);
+ apr_pool_cleanup_kill(beam->red_pool, beam, beam_cleanup);
return beam_cleanup(beam);
}
-apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *life_pool,
+apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *red_pool,
int id, const char *tag,
apr_size_t max_buf_size)
{
h2_bucket_beam *beam;
apr_status_t status = APR_SUCCESS;
- beam = apr_pcalloc(life_pool, sizeof(*beam));
+ beam = apr_pcalloc(red_pool, sizeof(*beam));
if (!beam) {
return APR_ENOMEM;
}
H2_BLIST_INIT(&beam->hold);
H2_BLIST_INIT(&beam->purge);
H2_BPROXY_LIST_INIT(&beam->proxies);
- beam->life_pool = life_pool;
+ beam->red_pool = red_pool;
beam->max_buf_size = max_buf_size;
- apr_pool_pre_cleanup_register(life_pool, beam, beam_cleanup);
+ apr_pool_pre_cleanup_register(red_pool, beam, beam_cleanup);
*pbeam = beam;
return status;
void h2_beam_buffer_size_set(h2_bucket_beam *beam, apr_size_t buffer_size)
{
- apr_thread_mutex_t *lock;
- int acquired;
+ h2_beam_lock bl;
- if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ if (enter_yellow(beam, &bl) == APR_SUCCESS) {
beam->max_buf_size = buffer_size;
- leave_yellow(beam, lock, acquired);
+ leave_yellow(beam, &bl);
}
}
apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam)
{
- apr_thread_mutex_t *lock;
- int acquired;
+ h2_beam_lock bl;
apr_size_t buffer_size = 0;
- if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ if (enter_yellow(beam, &bl) == APR_SUCCESS) {
buffer_size = beam->max_buf_size;
- leave_yellow(beam, lock, acquired);
+ leave_yellow(beam, &bl);
}
return buffer_size;
}
void h2_beam_mutex_set(h2_bucket_beam *beam,
h2_beam_mutex_enter m_enter,
- h2_beam_mutex_leave m_leave,
apr_thread_cond_t *cond,
void *m_ctx)
{
- apr_thread_mutex_t *lock;
- h2_beam_mutex_leave *prev_leave;
- void *prev_ctx;
- int acquired;
+ h2_beam_lock bl;
- if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
- prev_ctx = beam->m_ctx;
- prev_leave = beam->m_leave;
+ if (enter_yellow(beam, &bl) == APR_SUCCESS) {
beam->m_enter = m_enter;
- beam->m_leave = m_leave;
beam->m_ctx = m_ctx;
beam->m_cond = cond;
- if (acquired && prev_leave) {
- /* special tactics when NULLing a lock */
- prev_leave(prev_ctx, lock, acquired);
- }
+ leave_yellow(beam, &bl);
}
}
void h2_beam_timeout_set(h2_bucket_beam *beam, apr_interval_time_t timeout)
{
- apr_thread_mutex_t *lock;
- int acquired;
+ h2_beam_lock bl;
- if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ if (enter_yellow(beam, &bl) == APR_SUCCESS) {
beam->timeout = timeout;
- leave_yellow(beam, lock, acquired);
+ leave_yellow(beam, &bl);
}
}
apr_interval_time_t h2_beam_timeout_get(h2_bucket_beam *beam)
{
- apr_thread_mutex_t *lock;
- int acquired;
+ h2_beam_lock bl;
apr_interval_time_t timeout = 0;
- if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ if (enter_yellow(beam, &bl) == APR_SUCCESS) {
timeout = beam->timeout;
- leave_yellow(beam, lock, acquired);
+ leave_yellow(beam, &bl);
}
return timeout;
}
void h2_beam_abort(h2_bucket_beam *beam)
{
- apr_thread_mutex_t *lock;
- int acquired;
+ h2_beam_lock bl;
- if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ if (enter_yellow(beam, &bl) == APR_SUCCESS) {
r_purge_reds(beam);
h2_blist_cleanup(&beam->red);
beam->aborted = 1;
if (beam->m_cond) {
apr_thread_cond_broadcast(beam->m_cond);
}
- leave_yellow(beam, lock, acquired);
+ leave_yellow(beam, &bl);
}
}
apr_status_t h2_beam_close(h2_bucket_beam *beam)
{
- apr_thread_mutex_t *lock;
- int acquired;
+ h2_beam_lock bl;
- if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ if (enter_yellow(beam, &bl) == APR_SUCCESS) {
r_purge_reds(beam);
beam_close(beam);
report_consumption(beam);
- leave_yellow(beam, lock, acquired);
+ leave_yellow(beam, &bl);
}
return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
}
void h2_beam_shutdown(h2_bucket_beam *beam)
{
- apr_thread_mutex_t *lock;
- int acquired;
+ h2_beam_lock bl;
- if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ if (enter_yellow(beam, &bl) == APR_SUCCESS) {
beam_shutdown(beam, 1);
- leave_yellow(beam, lock, acquired);
+ leave_yellow(beam, &bl);
}
}
apr_bucket *bred,
apr_read_type_e block,
apr_pool_t *pool,
- apr_thread_mutex_t *lock)
+ h2_beam_lock *pbl)
{
const char *data;
apr_size_t len;
}
if (space_left < bred->length) {
- status = r_wait_space(beam, block, lock, &space_left);
+ status = r_wait_space(beam, block, pbl, &space_left);
if (status != APR_SUCCESS) {
return status;
}
* affected by this. */
status = apr_bucket_setaside(bred, pool);
}
- else if (APR_BUCKET_IS_HEAP(bred) || APR_BUCKET_IS_POOL(bred)) {
- /* For heap/pool buckets read from a green thread is fine. The
+ else if (APR_BUCKET_IS_HEAP(bred)) {
+ /* For heap buckets read from a green thread is fine. The
* data will be there and live until the bucket itself is
* destroyed. */
status = APR_SUCCESS;
}
+ else if (APR_BUCKET_IS_POOL(bred)) {
+ /* pool buckets are bastards that register at pool cleanup
+ * to morph themselves into heap buckets. That may happen anytime,
+ * even after the bucket data pointer has been read. So at
+ * any time inside the green thread, the pool bucket memory
+ * may disappear. yikes. */
+ status = apr_bucket_read(bred, &data, &len, APR_BLOCK_READ);
+ if (status == APR_SUCCESS) {
+ apr_bucket_heap_make(bred, data, len, NULL);
+ }
+ }
else if (APR_BUCKET_IS_FILE(bred)) {
/* For file buckets the problem is their internal readpool that
* is used on the first read to allocate buffer/mmap.
* Since setting aside a file bucket will de-register the
* file cleanup function from the previous pool, we need to
- * call that from a red thread. Do it now and make our
- * yellow pool the owner.
+ * call that from a red thread.
* Additionally, we allow callbacks to prevent beaming file
* handles across. The use case for this is to limit the number
* of open file handles and rather use a less efficient beam
- * transport. */
+ * transport. */
apr_file_t *fd = ((apr_bucket_file *)bred->data)->fd;
int can_beam = 1;
if (beam->last_beamed != fd && beam->can_beam_fn) {
beam->last_beamed = fd;
status = apr_bucket_setaside(bred, pool);
}
+ /* else: enter ENOTIMPL case below */
}
if (status == APR_ENOTIMPL) {
/* we have no knowledge about the internals of this bucket,
- * but on read, it needs to make the data available somehow.
- * So we do this while still in a red thread. The data will
- * live at least os long as the red bucket itself. */
+ * but hope that after read, its data stays immutable for the
+ * lifetime of the bucket. (see pool bucket handling above for
+ * a counter example).
+ * We do the read while in a red thread, so that the bucket may
+ * use pools/allocators safely. */
if (space_left < APR_BUCKET_BUFF_SIZE) {
space_left = APR_BUCKET_BUFF_SIZE;
}
apr_bucket_brigade *red_brigade,
apr_read_type_e block)
{
- apr_thread_mutex_t *lock;
apr_bucket *bred;
apr_status_t status = APR_SUCCESS;
- int acquired;
+ h2_beam_lock bl;
/* Called from the red thread to add buckets to the beam */
- if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ if (enter_yellow(beam, &bl) == APR_SUCCESS) {
r_purge_reds(beam);
if (beam->aborted) {
while (!APR_BRIGADE_EMPTY(red_brigade)
&& status == APR_SUCCESS) {
bred = APR_BRIGADE_FIRST(red_brigade);
- status = append_bucket(beam, bred, block, red_brigade->p, lock);
+ status = append_bucket(beam, bred, block, beam->red_pool, &bl);
}
if (beam->m_cond) {
apr_thread_cond_broadcast(beam->m_cond);
}
}
report_consumption(beam);
- leave_yellow(beam, lock, acquired);
+ leave_yellow(beam, &bl);
}
return status;
}
apr_read_type_e block,
apr_off_t readbytes)
{
- apr_thread_mutex_t *lock;
+ h2_beam_lock bl;
apr_bucket *bred, *bgreen, *ng;
- int acquired, transferred = 0;
+ int transferred = 0;
apr_status_t status = APR_SUCCESS;
apr_off_t remain = readbytes;
/* Called from the green thread to take buckets from the beam */
- if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ if (enter_yellow(beam, &bl) == APR_SUCCESS) {
transfer:
if (beam->aborted) {
status = APR_ECONNABORTED;
status = APR_EOF;
}
}
- else if (block == APR_BLOCK_READ && lock && beam->m_cond) {
- status = wait_cond(beam, lock);
+ else if (block == APR_BLOCK_READ && bl.mutex && beam->m_cond) {
+ status = wait_cond(beam, bl.mutex);
if (status != APR_SUCCESS) {
goto leave;
}
status = APR_EAGAIN;
}
leave:
- leave_yellow(beam, lock, acquired);
+ leave_yellow(beam, &bl);
}
return status;
}
void h2_beam_on_consumed(h2_bucket_beam *beam,
h2_beam_consumed_callback *cb, void *ctx)
{
- apr_thread_mutex_t *lock;
- int acquired;
+ h2_beam_lock bl;
- if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ if (enter_yellow(beam, &bl) == APR_SUCCESS) {
beam->consumed_fn = cb;
beam->consumed_ctx = ctx;
- leave_yellow(beam, lock, acquired);
+ leave_yellow(beam, &bl);
}
}
void h2_beam_on_file_beam(h2_bucket_beam *beam,
h2_beam_can_beam_callback *cb, void *ctx)
{
- apr_thread_mutex_t *lock;
- int acquired;
+ h2_beam_lock bl;
- if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ if (enter_yellow(beam, &bl) == APR_SUCCESS) {
beam->can_beam_fn = cb;
beam->can_beam_ctx = ctx;
- leave_yellow(beam, lock, acquired);
+ leave_yellow(beam, &bl);
}
}
apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam)
{
- apr_thread_mutex_t *lock;
apr_bucket *b;
apr_off_t l = 0;
- int acquired;
+ h2_beam_lock bl;
- if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ if (enter_yellow(beam, &bl) == APR_SUCCESS) {
for (b = H2_BLIST_FIRST(&beam->red);
b != H2_BLIST_SENTINEL(&beam->red);
b = APR_BUCKET_NEXT(b)) {
/* should all have determinate length */
l += b->length;
}
- leave_yellow(beam, lock, acquired);
+ leave_yellow(beam, &bl);
}
return l;
}
apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam)
{
- apr_thread_mutex_t *lock;
apr_bucket *b;
apr_off_t l = 0;
- int acquired;
+ h2_beam_lock bl;
- if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ if (enter_yellow(beam, &bl) == APR_SUCCESS) {
for (b = H2_BLIST_FIRST(&beam->red);
b != H2_BLIST_SENTINEL(&beam->red);
b = APR_BUCKET_NEXT(b)) {
l += b->length;
}
}
- leave_yellow(beam, lock, acquired);
+ leave_yellow(beam, &bl);
}
return l;
}
int h2_beam_empty(h2_bucket_beam *beam)
{
- apr_thread_mutex_t *lock;
int empty = 1;
- int acquired;
+ h2_beam_lock bl;
- if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ if (enter_yellow(beam, &bl) == APR_SUCCESS) {
empty = (H2_BLIST_EMPTY(&beam->red)
&& (!beam->green || APR_BRIGADE_EMPTY(beam->green)));
- leave_yellow(beam, lock, acquired);
+ leave_yellow(beam, &bl);
}
return empty;
}
int h2_beam_was_received(h2_bucket_beam *beam)
{
- apr_thread_mutex_t *lock;
int happend = 0;
- int acquired;
+ h2_beam_lock bl;
- if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ if (enter_yellow(beam, &bl) == APR_SUCCESS) {
happend = (beam->received_bytes > 0);
- leave_yellow(beam, lock, acquired);
+ leave_yellow(beam, &bl);
}
return happend;
}
apr_size_t h2_beam_get_files_beamed(h2_bucket_beam *beam)
{
- apr_thread_mutex_t *lock;
apr_size_t n = 0;
- int acquired;
+ h2_beam_lock bl;
- if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ if (enter_yellow(beam, &bl) == APR_SUCCESS) {
n = beam->files_beamed;
- leave_yellow(beam, lock, acquired);
+ leave_yellow(beam, &bl);
}
return n;
}
* technology where humans are kept inside the transporter's memory
* buffers until the transmission is complete. Star gates use a similar trick.
*/
+
+typedef void h2_beam_mutex_leave(void *ctx, struct apr_thread_mutex_t *lock);
+
+typedef struct {
+ apr_thread_mutex_t *mutex;
+ h2_beam_mutex_leave *leave;
+ void *leave_ctx;
+} h2_beam_lock;
+
typedef struct h2_bucket_beam h2_bucket_beam;
-typedef apr_status_t h2_beam_mutex_enter(void *ctx,
- struct apr_thread_mutex_t **plock,
- int *acquired);
-typedef void h2_beam_mutex_leave(void *ctx,
- struct apr_thread_mutex_t *lock,
- int acquired);
+typedef apr_status_t h2_beam_mutex_enter(void *ctx, h2_beam_lock *pbl);
+
typedef void h2_beam_consumed_callback(void *ctx, h2_bucket_beam *beam,
apr_off_t bytes);
h2_blist purge;
apr_bucket_brigade *green;
h2_bproxy_list proxies;
- apr_pool_t *life_pool;
+ apr_pool_t *red_pool;
apr_size_t max_buf_size;
apr_size_t files_beamed; /* how many file handles have been set aside */
void *m_ctx;
h2_beam_mutex_enter *m_enter;
- h2_beam_mutex_leave *m_leave;
struct apr_thread_cond_t *m_cond;
apr_interval_time_t timeout;
* that is only used inside that same mutex.
*
* @param pbeam will hold the created beam on return
- * @param life_pool pool for allocating initial structure and cleanups
+ * @param red_pool pool usable on red side, beam lifeline
* @param buffer_size maximum memory footprint of buckets buffered in beam, or
* 0 for no limitation
+ *
+ * Call from the red side only.
*/
apr_status_t h2_beam_create(h2_bucket_beam **pbeam,
- apr_pool_t *life_pool,
+ apr_pool_t *red_pool,
int id, const char *tag,
apr_size_t buffer_size);
+/**
+ * Destroys the beam immediately without cleanup.
+ *
+ * Call from the red side only.
+ */
apr_status_t h2_beam_destroy(h2_bucket_beam *beam);
/**
* internally as long as they have not been processed by the receiving side.
* All accepted buckets are removed from the given brigade. Will return with
* APR_EAGAIN on non-blocking sends when not all buckets could be accepted.
+ *
+ * Call from the red side only.
*/
apr_status_t h2_beam_send(h2_bucket_beam *beam,
apr_bucket_brigade *red_buckets,
* when reading past an EOS bucket. Reads can be blocking until data is
* available or the beam has been closed. Non-blocking calls return APR_EAGAIN
* if no data is available.
+ *
+ * Call from the green side only.
*/
apr_status_t h2_beam_receive(h2_bucket_beam *beam,
apr_bucket_brigade *green_buckets,
apr_read_type_e block,
apr_off_t readbytes);
+/**
+ * Determine if beam is closed. May still contain buffered data.
+ *
+ * Call from red or green side.
+ */
+int h2_beam_closed(h2_bucket_beam *beam);
+
+/**
+ * Determine if beam is empty.
+ *
+ * Call from red or green side.
+ */
+int h2_beam_empty(h2_bucket_beam *beam);
+
+/**
+ * Abort the beam. Will cleanup any buffered buckets and answer all send
+ * and receives with APR_ECONNABORTED.
+ *
+ * Call from the red side only.
+ */
void h2_beam_abort(h2_bucket_beam *beam);
/**
- * Close the beam. Does not need to be invoked if certain that an EOS bucket
- * has been sent.
+ * Close the beam. Sending an EOS bucket serves the same purpose.
+ *
+ * Call from the red side only.
*/
apr_status_t h2_beam_close(h2_bucket_beam *beam);
/**
* Empty the buffer and close.
+ *
+ * Call from the red side only.
*/
void h2_beam_shutdown(h2_bucket_beam *beam);
void h2_beam_mutex_set(h2_bucket_beam *beam,
h2_beam_mutex_enter m_enter,
- h2_beam_mutex_leave m_leave,
struct apr_thread_cond_t *cond,
void *m_ctx);
* @param beam the beam to set the callback on
* @param cb the callback or NULL
* @param ctx the context to use in callback invocation
+ *
+ * Call from the red side, callbacks invoked on red side.
*/
void h2_beam_on_consumed(h2_bucket_beam *beam,
h2_beam_consumed_callback *cb, void *ctx);
*/
apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam);
-int h2_beam_closed(h2_bucket_beam *beam);
-int h2_beam_empty(h2_bucket_beam *beam);
-
/**
* Return != 0 iff (some) data from the beam has been received.
*/
}
}
-static apr_status_t io_mutex_enter(void *ctx,
- apr_thread_mutex_t **plock, int *acquired)
+static void beam_leave(void *ctx, apr_thread_mutex_t *lock)
{
- h2_mplx *m = ctx;
- *plock = m->lock;
- return enter_mutex(m, acquired);
+ leave_mutex(ctx, 1);
}
-static void io_mutex_leave(void *ctx, apr_thread_mutex_t *lock, int acquired)
+static apr_status_t beam_enter(void *ctx, h2_beam_lock *pbl)
{
h2_mplx *m = ctx;
- leave_mutex(m, acquired);
+ int acquired;
+ apr_status_t status;
+
+ status = enter_mutex(m, &acquired);
+ if (status == APR_SUCCESS) {
+ pbl->mutex = m->lock;
+ pbl->leave = acquired? beam_leave : NULL;
+ pbl->leave_ctx = m;
+ }
+ return status;
}
static void stream_output_consumed(void *ctx,
/* cleanup once task is done */
task->orphaned = 1;
if (task->input.beam) {
- /* TODO: this is currently allocated by the stream and will disappear */
h2_beam_shutdown(task->input.beam);
task->input.beam = NULL;
}
h2_beam_on_consumed(task->output.beam, stream_output_consumed, task);
m->tx_handles_reserved -= h2_beam_get_files_beamed(task->output.beam);
h2_beam_on_file_beam(task->output.beam, can_beam_file, m);
- h2_beam_mutex_set(task->output.beam, io_mutex_enter, io_mutex_leave,
- task->cond, m);
+ h2_beam_mutex_set(task->output.beam, beam_enter, task->cond, m);
}
h2_ihash_add(m->ready_tasks, task);
return status;
}
-apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id)
+static apr_status_t out_close(h2_mplx *m, h2_task *task)
{
- apr_status_t status;
- int acquired;
+ apr_status_t status = APR_SUCCESS;
- AP_DEBUG_ASSERT(m);
- if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- h2_task *task = h2_ihash_get(m->tasks, stream_id);
- if (task && !task->orphaned) {
- if (!task->response && !task->rst_error) {
- /* In case a close comes before a response was created,
- * insert an error one so that our streams can properly
- * reset.
- */
- h2_response *r = h2_response_die(stream_id, APR_EGENERAL,
- task->request, m->pool);
- status = out_open(m, stream_id, r);
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c,
- "h2_mplx(%ld-%d): close, no response, no rst",
- m->id, stream_id);
- }
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
- "h2_mplx(%ld-%d): close", m->id, stream_id);
- if (task->output.beam) {
- status = h2_beam_close(task->output.beam);
- h2_beam_log(task->output.beam, stream_id, "out_close", m->c,
- APLOG_TRACE2);
- }
- output_consumed_signal(m, task);
- have_out_data_for(m, stream_id);
- }
- else {
- status = APR_ECONNABORTED;
- }
- leave_mutex(m, acquired);
+ if (!task || task->orphaned) {
+ return APR_ECONNABORTED;
+ }
+
+ if (!task->response && !task->rst_error) {
+ /* In case a close comes before a response was created,
+ * insert an error one so that our streams can properly
+ * reset.
+ */
+ h2_response *r = h2_response_die(task->stream_id, APR_EGENERAL,
+ task->request, m->pool);
+ status = out_open(m, task->stream_id, r);
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c,
+ "h2_mplx(%s): close, no response, no rst", task->id);
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
+ "h2_mplx(%s): close", task->id);
+ if (task->output.beam) {
+ status = h2_beam_close(task->output.beam);
+ h2_beam_log(task->output.beam, task->stream_id, "out_close", m->c,
+ APLOG_TRACE2);
}
+ output_consumed_signal(m, task);
+ have_out_data_for(m, task->stream_id);
return status;
}
task->worker_started = 1;
task->started_at = apr_time_now();
-
- if (task->input.beam) {
- h2_beam_timeout_set(task->input.beam, m->stream_timeout);
- h2_beam_on_consumed(task->input.beam, stream_input_consumed, m);
- h2_beam_on_file_beam(task->input.beam, can_beam_file, m);
- h2_beam_mutex_set(task->input.beam, io_mutex_enter,
- io_mutex_leave, task->cond, m);
- }
if (sid > m->max_stream_started) {
m->max_stream_started = sid;
}
+
+ if (stream->input) {
+ h2_beam_timeout_set(stream->input, m->stream_timeout);
+ h2_beam_on_consumed(stream->input, stream_input_consumed, m);
+ h2_beam_on_file_beam(stream->input, can_beam_file, m);
+ h2_beam_mutex_set(stream->input, beam_enter, task->cond, m);
+ }
+
++m->workers_busy;
}
}
/* FIXME: this implementation is incomplete. */
h2_task_set_io_blocking(task, 0);
apr_thread_cond_broadcast(m->task_thawed);
+ return;
}
else {
- apr_time_t now = apr_time_now();
-
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): task(%s) done", m->id, task->id);
- /* clean our references and report request as done. Signal
- * that we want another unless we have been aborted */
- /* TODO: this will keep a worker attached to this h2_mplx as
- * long as it has requests to handle. Might no be fair to
- * other mplx's. Perhaps leave after n requests? */
- h2_mplx_out_close(m, task->stream_id);
+ out_close(m, task);
if (ngn) {
apr_off_t bytes = 0;
h2_task_redo(task);
h2_ihash_remove(m->redo_tasks, task->stream_id);
h2_iq_add(m->q, task->stream_id, NULL, NULL);
+ return;
}
- else {
- task->worker_done = 1;
- task->done_at = now;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%s): request done, %f ms"
- " elapsed", task->id,
- (task->done_at - task->started_at) / 1000.0);
- if (task->started_at > m->last_idle_block) {
- /* this task finished without causing an 'idle block', e.g.
- * a block by flow control.
- */
- if (now - m->last_limit_change >= m->limit_change_interval
- && m->workers_limit < m->workers_max) {
- /* Well behaving stream, allow it more workers */
- m->workers_limit = H2MIN(m->workers_limit * 2,
- m->workers_max);
- m->last_limit_change = now;
- m->need_registration = 1;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
- "h2_mplx(%ld): increase worker limit to %d",
- m->id, m->workers_limit);
- }
+
+ task->worker_done = 1;
+ task->done_at = apr_time_now();
+ if (task->output.beam) {
+ h2_beam_on_consumed(task->output.beam, NULL, NULL);
+ h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL);
+ }
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%s): request done, %f ms"
+ " elapsed", task->id,
+ (task->done_at - task->started_at) / 1000.0);
+ if (task->started_at > m->last_idle_block) {
+ /* this task finished without causing an 'idle block', e.g.
+ * a block by flow control.
+ */
+ if (task->done_at- m->last_limit_change >= m->limit_change_interval
+ && m->workers_limit < m->workers_max) {
+ /* Well behaving stream, allow it more workers */
+ m->workers_limit = H2MIN(m->workers_limit * 2,
+ m->workers_max);
+ m->last_limit_change = task->done_at;
+ m->need_registration = 1;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+ "h2_mplx(%ld): increase worker limit to %d",
+ m->id, m->workers_limit);
}
}