]> git.ipfire.org Git - thirdparty/apache/httpd.git/commitdiff
mod_http2: delaying response start until flush or data accumulation
authorStefan Eissing <icing@apache.org>
Fri, 15 Apr 2016 14:56:11 +0000 (14:56 +0000)
committerStefan Eissing <icing@apache.org>
Fri, 15 Apr 2016 14:56:11 +0000 (14:56 +0000)
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1739312 13f79535-47bb-0310-9956-ffa450edef68

CHANGES
modules/http2/h2_bucket_beam.c
modules/http2/h2_bucket_beam.h
modules/http2/h2_proxy_session.c
modules/http2/h2_task.c

diff --git a/CHANGES b/CHANGES
index 66d8811832cf9008a9f98d4cdf5cefdc580587c7..6e92461c5ccc3ce7c546cc7aea9bf1773b4d104b 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -2,7 +2,8 @@
 Changes with Apache 2.5.0
 
   *) mod_http2: new "bucket beam" technology to transport buckets across
-     threads without buffer copy.
+     threads without buffer copy. Delaying response start until flush or
+     enough body data has been accumulated. [Stefan Eissing]
      
   *) http: Respond with "408 Request Timeout" when a timeout occurs while
      reading the request body.  [Yann Ylavic]
index 5406176fadc4b432f5e1a78eab90255a5c097603..fee6c7bc87d82919d3f86344928096705f9a7cdd 100644 (file)
@@ -803,6 +803,30 @@ apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam)
     return l;
 }
 
+apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam)
+{
+    apr_thread_mutex_t *lock;
+    apr_bucket *b;
+    apr_off_t l = 0;
+    int acquired;
+    
+    if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+        for (b = H2_BLIST_FIRST(&beam->red); 
+            b != H2_BLIST_SENTINEL(&beam->red);
+            b = APR_BUCKET_NEXT(b)) {
+            if (APR_BUCKET_IS_FILE(b)) {
+                /* do not count */
+            }
+            else {
+                /* should all have determinate length */
+                l += b->length;
+            }
+        }
+        leave_yellow(beam, lock, acquired);
+    }
+    return l;
+}
+
 int h2_beam_empty(h2_bucket_beam *beam)
 {
     apr_thread_mutex_t *lock;
index 45d98b29cc25eb1b1207e739805e528a1b1ed865..0b405990229ca7155ef8e1a559409f50c3a5b680 100644 (file)
@@ -284,6 +284,11 @@ void h2_beam_on_file_beam(h2_bucket_beam *beam,
  */
 apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam);
 
+/**
+ * Get the memory used by the buffered buckets, approximately.
+ */
+apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam);
+
 int h2_beam_closed(h2_bucket_beam *beam);
 int h2_beam_empty(h2_bucket_beam *beam);
 
index 19aff5bf6f896b2473a706e3f8823ddae92fa4db..51b68f2f4fa9f08b7f3be06cbd4f4433d62ac9cf 100644 (file)
@@ -354,10 +354,10 @@ static int on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags,
     b = apr_bucket_transient_create((const char*)data, len, 
                                     stream->r->connection->bucket_alloc);
     APR_BRIGADE_INSERT_TAIL(stream->output, b);
-    if (flags & NGHTTP2_DATA_FLAG_EOF) {
-        b = apr_bucket_flush_create(stream->r->connection->bucket_alloc);
-        APR_BRIGADE_INSERT_TAIL(stream->output, b);
-    }
+    /* always flush after a DATA frame, as we have no other indication
+     * of buffer use */
+    b = apr_bucket_flush_create(stream->r->connection->bucket_alloc);
+    APR_BRIGADE_INSERT_TAIL(stream->output, b);
     
     ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, stream->r, APLOGNO(03359)
                   "h2_proxy_session(%s): pass response data for "
index ba67b4198f375a18f4dea279a583773b272a5028..3eb25e8f7aa04cb8c55526120fb96b61b0da4217 100644 (file)
@@ -242,7 +242,7 @@ static apr_status_t input_read(h2_task *task, ap_filter_t* f,
 }
 
 /*******************************************************************************
- * task input handling
+ * task output handling
  ******************************************************************************/
 
 static apr_status_t open_response(h2_task *task)
@@ -312,6 +312,7 @@ static apr_status_t output_write(h2_task *task, ap_filter_t* f,
 {
     apr_bucket *b;
     apr_status_t status = APR_SUCCESS;
+    int flush = 0;
     
     if (APR_BRIGADE_EMPTY(bb)) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
@@ -349,7 +350,22 @@ static apr_status_t output_write(h2_task *task, ap_filter_t* f,
     }
     
     /* If there is nothing saved (anymore), try to write the brigade passed */
-    if ((!task->output.bb || APR_BRIGADE_EMPTY(task->output.bb)) && !APR_BRIGADE_EMPTY(bb)) {
+    if ((!task->output.bb || APR_BRIGADE_EMPTY(task->output.bb)) 
+        && !APR_BRIGADE_EMPTY(bb)) {
+        /* check if we have a flush before the end-of-request */
+        if (!task->output.response_open) {
+            for (b = APR_BRIGADE_FIRST(bb);
+                 b != APR_BRIGADE_SENTINEL(bb);
+                 b = APR_BUCKET_NEXT(b)) {
+                if (AP_BUCKET_IS_EOR(b)) {
+                    break;
+                }
+                else if (APR_BUCKET_IS_FLUSH(b)) {
+                    flush = 1;
+                }
+            }
+        }
+
         status = send_out(task, bb); 
         if (status != APR_SUCCESS) {
             return status;
@@ -368,9 +384,10 @@ static apr_status_t output_write(h2_task *task, ap_filter_t* f,
         return ap_save_brigade(f, &task->output.bb, &bb, task->pool);
     }
     
-    if (!task->output.response_open) {
-        /* data is in the output beam, if we have not opened the response,
-         * do so now. */
+    if (!task->output.response_open 
+        && (flush || h2_beam_get_mem_used(task->output.beam) > (32*1024))) {
+        /* if we have enough buffered or we got a flush bucket, open
+        * the response now. */
         status = open_response(task);
         task->output.response_open = 1;
     }
@@ -378,6 +395,17 @@ static apr_status_t output_write(h2_task *task, ap_filter_t* f,
     return status;
 }
 
+static apr_status_t output_finish(h2_task *task)
+{
+    apr_status_t status = APR_SUCCESS;
+    
+    if (!task->output.response_open) {
+        status = open_response(task);
+        task->output.response_open = 1;
+    }
+    return status;
+}
+
 /*******************************************************************************
  * task slave connection filters
  ******************************************************************************/
@@ -461,7 +489,6 @@ h2_task *h2_task_create(conn_rec *c, const h2_request *req,
         ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, c,
                       APLOGNO(02941) "h2_task(%ld-%d): create stream task", 
                       c->id, req->id);
-        h2_mplx_out_close(mplx, req->id);
         return NULL;
     }
     
@@ -505,8 +532,6 @@ void h2_task_set_io_blocking(h2_task *task, int blocking)
 
 apr_status_t h2_task_do(h2_task *task)
 {
-    apr_status_t status;
-    
     AP_DEBUG_ASSERT(task);
     
     task->input.block = APR_BLOCK_READ;
@@ -546,16 +571,13 @@ apr_status_t h2_task_do(h2_task *task)
                       "h2_task(%s): process_conn returned frozen task", 
                       task->id);
         /* cleanup delayed */
-        status = APR_EAGAIN;
+        return APR_EAGAIN;
     }
     else {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, task->c,
                       "h2_task(%s): processing done", task->id);
-        h2_mplx_out_close(task->mplx, task->stream_id);
-        status = APR_SUCCESS;
+        return output_finish(task);
     }
-    
-    return status;
 }
 
 static apr_status_t h2_task_process_request(h2_task *task, conn_rec *c)