return l;
}
+apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam)
+{
+ apr_thread_mutex_t *lock;
+ apr_bucket *b;
+ apr_off_t l = 0;
+ int acquired;
+
+ if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+ for (b = H2_BLIST_FIRST(&beam->red);
+ b != H2_BLIST_SENTINEL(&beam->red);
+ b = APR_BUCKET_NEXT(b)) {
+ if (APR_BUCKET_IS_FILE(b)) {
+ /* do not count */
+ }
+ else {
+ /* should all have determinate length */
+ l += b->length;
+ }
+ }
+ leave_yellow(beam, lock, acquired);
+ }
+ return l;
+}
+
int h2_beam_empty(h2_bucket_beam *beam)
{
apr_thread_mutex_t *lock;
b = apr_bucket_transient_create((const char*)data, len,
stream->r->connection->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(stream->output, b);
- if (flags & NGHTTP2_DATA_FLAG_EOF) {
- b = apr_bucket_flush_create(stream->r->connection->bucket_alloc);
- APR_BRIGADE_INSERT_TAIL(stream->output, b);
- }
+ /* always flush after a DATA frame, as we have no other indication
+ * of buffer use */
+ b = apr_bucket_flush_create(stream->r->connection->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(stream->output, b);
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, stream->r, APLOGNO(03359)
"h2_proxy_session(%s): pass response data for "
}
/*******************************************************************************
- * task input handling
+ * task output handling
******************************************************************************/
static apr_status_t open_response(h2_task *task)
{
apr_bucket *b;
apr_status_t status = APR_SUCCESS;
+ int flush = 0;
if (APR_BRIGADE_EMPTY(bb)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
}
/* If there is nothing saved (anymore), try to write the brigade passed */
- if ((!task->output.bb || APR_BRIGADE_EMPTY(task->output.bb)) && !APR_BRIGADE_EMPTY(bb)) {
+ if ((!task->output.bb || APR_BRIGADE_EMPTY(task->output.bb))
+ && !APR_BRIGADE_EMPTY(bb)) {
+ /* check if we have a flush before the end-of-request */
+ if (!task->output.response_open) {
+ for (b = APR_BRIGADE_FIRST(bb);
+ b != APR_BRIGADE_SENTINEL(bb);
+ b = APR_BUCKET_NEXT(b)) {
+ if (AP_BUCKET_IS_EOR(b)) {
+ break;
+ }
+ else if (APR_BUCKET_IS_FLUSH(b)) {
+ flush = 1;
+ }
+ }
+ }
+
status = send_out(task, bb);
if (status != APR_SUCCESS) {
return status;
return ap_save_brigade(f, &task->output.bb, &bb, task->pool);
}
- if (!task->output.response_open) {
- /* data is in the output beam, if we have not opened the response,
- * do so now. */
+ if (!task->output.response_open
+ && (flush || h2_beam_get_mem_used(task->output.beam) > (32*1024))) {
+ /* if we have enough buffered or we got a flush bucket, open
+ * the response now. */
status = open_response(task);
task->output.response_open = 1;
}
return status;
}
+static apr_status_t output_finish(h2_task *task)
+{
+ apr_status_t status = APR_SUCCESS;
+
+ if (!task->output.response_open) {
+ status = open_response(task);
+ task->output.response_open = 1;
+ }
+ return status;
+}
+
/*******************************************************************************
* task slave connection filters
******************************************************************************/
ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, c,
APLOGNO(02941) "h2_task(%ld-%d): create stream task",
c->id, req->id);
- h2_mplx_out_close(mplx, req->id);
return NULL;
}
apr_status_t h2_task_do(h2_task *task)
{
- apr_status_t status;
-
AP_DEBUG_ASSERT(task);
task->input.block = APR_BLOCK_READ;
"h2_task(%s): process_conn returned frozen task",
task->id);
/* cleanup delayed */
- status = APR_EAGAIN;
+ return APR_EAGAIN;
}
else {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
"h2_task(%s): processing done", task->id);
- h2_mplx_out_close(task->mplx, task->stream_id);
- status = APR_SUCCESS;
+ return output_finish(task);
}
-
- return status;
}
static apr_status_t h2_task_process_request(h2_task *task, conn_rec *c)