]> git.ipfire.org Git - thirdparty/apache/httpd.git/commitdiff
mod_http2: rewrote TLS buffering on master connection
authorStefan Eissing <icing@apache.org>
Mon, 2 May 2016 16:39:42 +0000 (16:39 +0000)
committerStefan Eissing <icing@apache.org>
Mon, 2 May 2016 16:39:42 +0000 (16:39 +0000)
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1742005 13f79535-47bb-0310-9956-ffa450edef68

CHANGES
modules/http2/h2_conn_io.c
modules/http2/h2_conn_io.h
modules/http2/h2_mplx.c
modules/http2/h2_request.c
modules/http2/h2_request.h
modules/http2/h2_session.c
modules/http2/h2_stream.c
modules/http2/h2_stream.h

diff --git a/CHANGES b/CHANGES
index 5ec96fb5f12fa873d405643bf5c64c07c4c4aabf..e05d90d8904053b953b1bece083731a40e16d587 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,12 @@
                                                          -*- coding: utf-8 -*-
 Changes with Apache 2.5.0
 
+  *) mod_http2: elimination of fixed master connectin buffer for TLS 
+     connections. New scratch bucket handling optimized for TLS write sizes. 
+     File bucket data read directly into scratch buffers, avoiding one
+     copy. Non-TLS connections continue to pass buckets unchanged to the core
+     filters to allow sendfile() usage. 
+  
   *) mod_http2/mod_proxy_http2: h2_request.c is no longer shared between these
      modules. This simplifies building on platforms such as Windows, as module
      reference used in logging is now clear.
index d21ae8b95943ef9e80566be1228adb112b601706..c1120740bfdf3085893a5668ebb28cfe249edd19 100644 (file)
@@ -127,23 +127,13 @@ static void h2_conn_io_bb_log(conn_rec *c, int stream_id, int level,
 }
 
 apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c, 
-                             const h2_config *cfg, 
-                             apr_pool_t *pool)
+                             const h2_config *cfg)
 {
     io->c             = c;
     io->output        = apr_brigade_create(c->pool, c->bucket_alloc);
-    io->buflen        = 0;
     io->is_tls        = h2_h2_is_tls(c);
     io->buffer_output = io->is_tls;
     
-    if (io->buffer_output) {
-        io->bufsize = WRITE_BUFFER_SIZE;
-        io->buffer = apr_pcalloc(pool, io->bufsize);
-    }
-    else {
-        io->bufsize = 0;
-    }
-    
     if (io->is_tls) {
         /* This is what we start with, 
          * see https://issues.apache.org/jira/browse/TS-2503 
@@ -151,12 +141,13 @@ apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c,
         io->warmup_size    = h2_config_geti64(cfg, H2_CONF_TLS_WARMUP_SIZE);
         io->cooldown_usecs = (h2_config_geti(cfg, H2_CONF_TLS_COOLDOWN_SECS) 
                               * APR_USEC_PER_SEC);
-        io->write_size     = WRITE_SIZE_INITIAL; 
+        io->write_size     = (io->cooldown_usecs > 0? 
+                              WRITE_SIZE_INITIAL : WRITE_SIZE_MAX); 
     }
     else {
         io->warmup_size    = 0;
         io->cooldown_usecs = 0;
-        io->write_size     = io->bufsize;
+        io->write_size     = 0;
     }
 
     if (APLOGctrace1(c)) {
@@ -170,9 +161,95 @@ apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c,
     return APR_SUCCESS;
 }
 
+#define LOG_SCRATCH 0
+
+static void append_scratch(h2_conn_io *io) 
+{
+    if (io->scratch && io->slen > 0) {
+        apr_bucket *b = apr_bucket_heap_create(io->scratch, io->slen,
+                                               apr_bucket_free,
+                                               io->c->bucket_alloc);
+        APR_BRIGADE_INSERT_TAIL(io->output, b);
+#if LOG_SCRATCH
+        ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c,
+                      "h2_conn_io(%ld): append_scratch(%ld)", 
+                      io->c->id, (long)io->slen);
+#endif
+        io->scratch = NULL;
+        io->slen = io->ssize = 0;
+    }
+}
+
+static apr_size_t assure_scratch_space(h2_conn_io *io) {
+    apr_size_t remain = io->ssize - io->slen; 
+    if (io->scratch && remain == 0) {
+        append_scratch(io);
+    }
+    if (!io->scratch) {
+        /* we control the size and it is larger than what buckets usually
+         * allocate. */
+        io->scratch = apr_bucket_alloc(io->write_size, io->c->bucket_alloc);
+        io->ssize = io->write_size;
+        io->slen = 0;
+        remain = io->ssize;
+    }
+    return remain;
+}
+    
+static apr_status_t read_to_scratch(h2_conn_io *io, apr_bucket *b)
+{
+    apr_status_t status;
+    const char *data;
+    apr_size_t len;
+    
+    if (!b->length) {
+        return APR_SUCCESS;
+    }
+    
+    AP_DEBUG_ASSERT(b->length <= (io->ssize - io->slen));
+    if (APR_BUCKET_IS_FILE(b)) {
+        apr_bucket_file *f = (apr_bucket_file *)b->data;
+        apr_file_t *fd = f->fd;
+        apr_off_t offset = b->start;
+        apr_size_t len = b->length;
+        
+        /* file buckets will either mmap (which we do not want) or
+         * read 8000 byte chunks and split themself. However, we do
+         * know *exactly* how many bytes we need where.
+         */
+        status = apr_file_seek(fd, APR_SET, &offset);
+        if (status != APR_SUCCESS) {
+            return status;
+        }
+        status = apr_file_read(fd, io->scratch + io->slen, &len);
+#if LOG_SCRATCH
+        ap_log_cerror(APLOG_MARK, APLOG_INFO, status, io->c,
+                      "h2_conn_io(%ld): FILE_to_scratch(%ld)", 
+                      io->c->id, (long)len); 
+#endif
+        if (status != APR_SUCCESS && status != APR_EOF) {
+            return status;
+        }
+        io->slen += len;
+    }
+    else {
+        status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ);
+        if (status == APR_SUCCESS) {
+#if LOG_SCRATCH
+            ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c,
+                          "h2_conn_io(%ld): read_to_scratch(%ld)", 
+                          io->c->id, (long)b->length); 
+#endif
+            memcpy(io->scratch+io->slen, data, len);
+            io->slen += len;
+        }
+    }
+    return status;
+}
+
 int h2_conn_io_is_buffered(h2_conn_io *io)
 {
-    return io->bufsize > 0;
+    return io->buffer_output;
 }
 
 typedef struct {
@@ -208,16 +285,8 @@ static apr_status_t pass_out(apr_bucket_brigade *bb, void *ctx)
     return status;
 }
 
-/* Bring the current buffer content into the output brigade, appropriately
- * chunked.
- */
-static apr_status_t bucketeer_buffer(h2_conn_io *io)
+static void check_write_size(h2_conn_io *io) 
 {
-    const char *data = io->buffer;
-    apr_size_t remaining = io->buflen;
-    apr_bucket *b;
-    int bcount, i;
-
     if (io->write_size > WRITE_SIZE_INITIAL 
         && (io->cooldown_usecs > 0)
         && (apr_time_now() - io->last_write) >= io->cooldown_usecs) {
@@ -236,32 +305,6 @@ static apr_status_t bucketeer_buffer(h2_conn_io *io)
                       "h2_conn_io(%ld): threshold reached, write size now %ld", 
                       (long)io->c->id, (long)io->write_size);
     }
-    
-    bcount = (int)(remaining / io->write_size);
-    for (i = 0; i < bcount; ++i) {
-        b = apr_bucket_transient_create(data, io->write_size, 
-                                        io->output->bucket_alloc);
-        APR_BRIGADE_INSERT_TAIL(io->output, b);
-        data += io->write_size;
-        remaining -= io->write_size;
-    }
-    
-    if (remaining > 0) {
-        b = apr_bucket_transient_create(data, remaining, 
-                                        io->output->bucket_alloc);
-        APR_BRIGADE_INSERT_TAIL(io->output, b);
-    }
-    return APR_SUCCESS;
-}
-
-apr_status_t h2_conn_io_writeb(h2_conn_io *io, apr_bucket *b, int flush)
-{
-    APR_BRIGADE_INSERT_TAIL(io->output, b);
-    if (flush) {
-        b = apr_bucket_flush_create(io->c->bucket_alloc);
-        APR_BRIGADE_INSERT_TAIL(io->output, b);
-    }
-    return APR_SUCCESS;
 }
 
 static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int flush, int eoc)
@@ -269,17 +312,10 @@ static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int flush, int eoc)
     pass_out_ctx ctx;
     apr_bucket *b;
     
-    if (io->buflen == 0 && APR_BRIGADE_EMPTY(io->output)) {
+    append_scratch(io);
+    if (APR_BRIGADE_EMPTY(io->output)) {
         return APR_SUCCESS;
     }
-        
-    if (io->buflen > 0) {
-        /* something in the buffer, put it in the output brigade */
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c,
-                      "h2_conn_io: flush, flushing %ld bytes", 
-                      (long)io->buflen);
-        bucketeer_buffer(io);
-    }
     
     if (flush) {
         b = apr_bucket_flush_create(io->c->bucket_alloc);
@@ -287,7 +323,6 @@ static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int flush, int eoc)
     }
     
     ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c, "h2_conn_io: flush");
-    io->buflen = 0;
     ctx.c = io->c;
     ctx.io = eoc? NULL : io;
     
@@ -307,10 +342,9 @@ apr_status_t h2_conn_io_consider_pass(h2_conn_io *io)
     
     if (!APR_BRIGADE_EMPTY(io->output)) {
         len = h2_brigade_mem_size(io->output);
-    }
-    len += io->buflen;
-    if (len >= WRITE_BUFFER_SIZE) {
-        return h2_conn_io_flush_int(io, 1, 0);
+        if (len >= WRITE_BUFFER_SIZE) {
+            return h2_conn_io_flush_int(io, 1, 0);
+        }
     }
     return APR_SUCCESS;
 }
@@ -322,48 +356,98 @@ apr_status_t h2_conn_io_write_eoc(h2_conn_io *io, h2_session *session)
     return h2_conn_io_flush_int(io, 1, 1);
 }
 
-apr_status_t h2_conn_io_write(h2_conn_io *io, 
-                              const char *buf, size_t length)
+apr_status_t h2_conn_io_write(h2_conn_io *io, const char *data, size_t length)
 {
     apr_status_t status = APR_SUCCESS;
-    pass_out_ctx ctx;
+    apr_size_t remain;
     
-    ctx.c = io->c;
-    ctx.io = io;
-    if (io->bufsize > 0) {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c,
-                      "h2_conn_io: buffering %ld bytes", (long)length);
-                      
-        if (!APR_BRIGADE_EMPTY(io->output)) {
-            status = h2_conn_io_flush_int(io, 0, 0);
+    if (io->buffer_output) {
+        while (length > 0) {
+            remain = assure_scratch_space(io);
+            if (remain >= length) {
+#if LOG_SCRATCH
+                ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c,
+                              "h2_conn_io(%ld): write_to_scratch(%ld)", 
+                              io->c->id, (long)length); 
+#endif
+                memcpy(io->scratch + io->slen, data, length);
+                io->slen += length;
+                length = 0;
+            }
+            else {
+#if LOG_SCRATCH
+                ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c,
+                              "h2_conn_io(%ld): write_to_scratch(%ld)", 
+                              io->c->id, (long)remain); 
+#endif
+                memcpy(io->scratch + io->slen, data, remain);
+                io->slen += remain;
+                data += remain;
+                length -= remain;
+            }
         }
+    }
+    else {
+        status = apr_brigade_write(io->output, NULL, NULL, data, length);
+    }
+    return status;
+}
+
+apr_status_t h2_conn_io_pass(h2_conn_io *io, apr_bucket_brigade *bb)
+{
+    apr_bucket *b;
+    apr_status_t status = APR_SUCCESS;
+    
+    check_write_size(io);
+    while (!APR_BRIGADE_EMPTY(bb) && status == APR_SUCCESS) {
+        b = APR_BRIGADE_FIRST(bb);
         
-        while (length > 0 && (status == APR_SUCCESS)) {
-            apr_size_t avail = io->bufsize - io->buflen;
-            if (avail <= 0) {
+        if (APR_BUCKET_IS_METADATA(b)) {
+            /* need to finish any open scratch bucket, as meta data 
+             * needs to be forward "in order". */
+            append_scratch(io);
+            APR_BUCKET_REMOVE(b);
+            APR_BRIGADE_INSERT_TAIL(io->output, b);
+            
+            if (APR_BUCKET_IS_FLUSH(b)) {
                 status = h2_conn_io_flush_int(io, 0, 0);
             }
-            else if (length > avail) {
-                memcpy(io->buffer + io->buflen, buf, avail);
-                io->buflen += avail;
-                length -= avail;
-                buf += avail;
+        }
+        else if (io->buffer_output) {
+            apr_size_t remain = assure_scratch_space(io);
+            if (b->length > remain) {
+                apr_bucket_split(b, remain);
+                if (io->slen == 0) {
+                    /* complete write_size bucket, append unchanged */
+                    APR_BUCKET_REMOVE(b);
+                    APR_BRIGADE_INSERT_TAIL(io->output, b);
+#if LOG_SCRATCH
+                    ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c,
+                                  "h2_conn_io(%ld): pass bucket(%ld)", 
+                                  io->c->id, (long)b->length);
+#endif
+                    continue;
+                }
             }
             else {
-                memcpy(io->buffer + io->buflen, buf, length);
-                io->buflen += length;
-                length = 0;
-                break;
+                /* bucket fits in remain, copy to scratch */
+                read_to_scratch(io, b);
+                apr_bucket_delete(b);
+                continue;
             }
         }
-        
+        else {
+            /* no buffering, forward buckets setaside on flush */
+            if (APR_BUCKET_IS_TRANSIENT(b)) {
+                apr_bucket_setaside(b, io->c->pool);
+            }
+            APR_BUCKET_REMOVE(b);
+            APR_BRIGADE_INSERT_TAIL(io->output, b);
+        }
     }
-    else {
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE4, status, io->c,
-                      "h2_conn_io: writing %ld bytes to brigade", (long)length);
-        status = apr_brigade_write(io->output, pass_out, &ctx, buf, length);
+    if (status == APR_SUCCESS) {
+        return h2_conn_io_consider_pass(io);
     }
-    
     return status;
 }
 
index c397e9f608ea87277f33965a8867bb6a472a3f70..f1d877a3f634df5f079418ff7d8cf434e1a802e2 100644 (file)
@@ -39,14 +39,13 @@ typedef struct {
     apr_int64_t bytes_written;
     
     int buffer_output;
-    char *buffer;
-    apr_size_t buflen;
-    apr_size_t bufsize;
+    char *scratch;
+    apr_size_t ssize;
+    apr_size_t slen;
 } h2_conn_io;
 
 apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c, 
-                             const struct h2_config *cfg, 
-                             apr_pool_t *pool);
+                             const struct h2_config *cfg);
 
 int h2_conn_io_is_buffered(h2_conn_io *io);
 
@@ -59,12 +58,7 @@ apr_status_t h2_conn_io_write(h2_conn_io *io,
                          const char *buf,
                          size_t length);
 
-/**
- * Append a bucket to the buffered output.
- * @param io the connection io
- * @param b the bucket to append
- */
-apr_status_t h2_conn_io_writeb(h2_conn_io *io, apr_bucket *b, int flush);
+apr_status_t h2_conn_io_pass(h2_conn_io *io, apr_bucket_brigade *bb);
 
 /**
  * Append an End-Of-Connection bucket to the output that, once destroyed,
index 9298592b567dcf55e3c212b04b7aa1597c6558a5..9c8498e62e615a0042aab0e1a80d10fc1f5ab1b0 100644 (file)
@@ -167,6 +167,7 @@ static int can_beam_file(void *ctx, h2_bucket_beam *beam,  apr_file_t *file)
 }
 
 static void have_out_data_for(h2_mplx *m, int stream_id);
+static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master);
 
 static void check_tx_reservation(h2_mplx *m) 
 {
@@ -193,8 +194,12 @@ static int purge_stream(void *ctx, void *val)
 {
     h2_mplx *m = ctx;
     h2_stream *stream = val;
+    h2_task *task = h2_ihash_get(m->tasks, stream->id);
     h2_ihash_remove(m->spurge, stream->id);
     h2_stream_destroy(stream);
+    if (task) {
+        task_destroy(m, task, 1);
+    }
     return 0;
 }
 
@@ -386,7 +391,7 @@ static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master)
 
 static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error) 
 {
-    h2_task *task = h2_ihash_get(m->tasks, stream->id);
+    h2_task *task;
     
     /* Situation: we are, on the master connection, done with processing
      * the stream. Either we have handled it successfully, or the stream
@@ -417,29 +422,28 @@ static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error)
      * memory. We should either copy it on task creation or wait with the
      * stream destruction until the task is done. 
      */
+    h2_iq_remove(m->q, stream->id);
+    h2_ihash_remove(m->ready_tasks, stream->id);
     h2_ihash_remove(m->streams, stream->id);
     if (stream->input) {
         m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input);
     }
     h2_stream_cleanup(stream);
 
+    task = h2_ihash_get(m->tasks, stream->id);
     if (task) {
-        /* Remove task from ready set, we will never submit it */
-        h2_ihash_remove(m->ready_tasks, stream->id);
-        task->input.beam = NULL;
-        
         if (!task->worker_done) {
             /* task still running, cleanup once it is done */
             if (rst_error) {
                 h2_task_rst(task, rst_error);
             }
-            /* FIXME: this should work, but does not
+            /* FIXME: this should work, but does not 
             h2_ihash_add(m->shold, stream);
             return;*/
+            task->input.beam = NULL;
         }
         else {
             /* already finished */
-            h2_iq_remove(m->q, task->stream_id);
             task_destroy(m, task, 0);
         }
     }
@@ -492,6 +496,17 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
         /* disable WINDOW_UPDATE callbacks */
         h2_mplx_set_consumed_cb(m, NULL, NULL);
         
+        if (!h2_ihash_empty(m->shold)) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                          "h2_mplx(%ld): start release_join with %d streams in hold", 
+                          m->id, (int)h2_ihash_count(m->shold));
+        }
+        if (!h2_ihash_empty(m->spurge)) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                          "h2_mplx(%ld): start release_join with %d streams to purge", 
+                          m->id, (int)h2_ihash_count(m->spurge));
+        }
+        
         h2_iq_clear(m->q);
         apr_thread_cond_broadcast(m->task_thawed);
         while (!h2_ihash_iter(m->streams, stream_done_iter, m)) {
@@ -499,19 +514,25 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
         }
         AP_DEBUG_ASSERT(h2_ihash_empty(m->streams));
     
+        if (!h2_ihash_empty(m->shold)) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                          "h2_mplx(%ld): 2. release_join with %d streams in hold", 
+                          m->id, (int)h2_ihash_count(m->shold));
+        }
+        if (!h2_ihash_empty(m->spurge)) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                          "h2_mplx(%ld): 2. release_join with %d streams to purge", 
+                          m->id, (int)h2_ihash_count(m->spurge));
+        }
+        
         /* If we still have busy workers, we cannot release our memory
-         * pool yet, as slave connections have child pools of their respective
-         * h2_io's.
-         * Any remaining ios are processed in these workers. Any operation 
-         * they do on their input/outputs will be errored ECONNRESET/ABORTED, 
-         * so processing them should fail and workers *should* return.
+         * pool yet, as tasks have references to us.
+         * Any operation on the task slave connection will from now on
+         * be errored ECONNRESET/ABORTED, so processing them should fail 
+         * and workers *should* return in a timely fashion.
          */
         for (i = 0; m->workers_busy > 0; ++i) {
             m->join_wait = wait;
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                          "h2_mplx(%ld): release_join, waiting on %d tasks to report back", 
-                          m->id, (int)h2_ihash_count(m->tasks));
-                          
             status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs));
             
             if (APR_STATUS_IS_TIMEUP(status)) {
@@ -534,13 +555,23 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
                 apr_thread_cond_broadcast(m->task_thawed);
             }
         }
-        AP_DEBUG_ASSERT(h2_ihash_empty(m->tasks));
+        
         AP_DEBUG_ASSERT(h2_ihash_empty(m->shold));
-        purge_streams(m);
+        if (!h2_ihash_empty(m->spurge)) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                          "h2_mplx(%ld): release_join %d streams to purge", 
+                          m->id, (int)h2_ihash_count(m->spurge));
+            purge_streams(m);
+        }
+        AP_DEBUG_ASSERT(h2_ihash_empty(m->spurge));
+        AP_DEBUG_ASSERT(h2_ihash_empty(m->tasks));
         
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
-                      "h2_mplx(%ld): release_join (%d tasks left) -> destroy", 
-                      m->id, (int)h2_ihash_count(m->tasks));
+        if (!h2_ihash_empty(m->tasks)) {
+            ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03056)
+                          "h2_mplx(%ld): release_join -> destroy, "
+                          "%d tasks still present", 
+                          m->id, (int)h2_ihash_count(m->tasks));
+        }
         leave_mutex(m, acquired);
         h2_mplx_destroy(m);
         /* all gone */
@@ -928,16 +959,17 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
         h2_task_thaw(task);
         /* we do not want the task to block on writing response
          * bodies into the mplx. */
-        /* FIXME: this implementation is incomplete. */
         h2_task_set_io_blocking(task, 0);
         apr_thread_cond_broadcast(m->task_thawed);
         return;
     }
     else {
-        h2_stream *stream = h2_ihash_get(m->streams, task->stream_id);
+        h2_stream *stream;
+        
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                       "h2_mplx(%ld): task(%s) done", m->id, task->id);
         out_close(m, task);
+        stream = h2_ihash_get(m->streams, task->stream_id);
         
         if (ngn) {
             apr_off_t bytes = 0;
@@ -979,9 +1011,8 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
             h2_beam_on_consumed(task->output.beam, NULL, NULL);
             h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL);
         }
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                      "h2_mplx(%s): request done, %f ms"
-                      " elapsed", task->id, 
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                      "h2_mplx(%s): request done, %f ms elapsed", task->id, 
                       (task->done_at - task->started_at) / 1000.0);
         if (task->started_at > m->last_idle_block) {
             /* this task finished without causing an 'idle block', e.g.
@@ -1002,11 +1033,17 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
         
         if (stream) {
             /* hang around until the stream deregisters */
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                          "h2_mplx(%s): task_done, stream still open", 
+                          task->id);
         }
         else {
+            /* stream done, was it placed in hold? */
             stream = h2_ihash_get(m->shold, task->stream_id);
-            task_destroy(m, task, 0);
             if (stream) {
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                              "h2_mplx(%s): task_done, stream in hold", 
+                              task->id);
                 stream->response = NULL; /* ref from task memory */
                 /* We cannot destroy the stream here since this is 
                  * called from a worker thread and freeing memory pools
@@ -1015,6 +1052,12 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
                 h2_ihash_remove(m->shold, stream->id);
                 h2_ihash_add(m->spurge, stream);
             }
+            else {
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                              "h2_mplx(%s): task_done, stream not found", 
+                              task->id);
+                task_destroy(m, task, 0);
+            }
             
             if (m->join_wait) {
                 apr_thread_cond_signal(m->join_wait);
index ca8a9bf7b90afc78553a78784c116f7f2ce4c9b3..2f232080622d287ceb9dd177f64ae4bfa9239336 100644 (file)
@@ -42,26 +42,27 @@ static apr_status_t inspect_clen(h2_request *req, const char *s)
     return (s == end)? APR_EINVAL : APR_SUCCESS;
 }
 
-apr_status_t h2_request_rwrite(h2_request *req, request_rec *r)
+apr_status_t h2_request_rwrite(h2_request *req, apr_pool_t *pool, 
+                               request_rec *r)
 {
     apr_status_t status;
     const char *scheme, *authority;
     
-    scheme = (r->parsed_uri.scheme? r->parsed_uri.scheme
+    scheme = apr_pstrdup(pool, r->parsed_uri.scheme? r->parsed_uri.scheme
               : ap_http_scheme(r));
-    authority = r->hostname;
+    authority = apr_pstrdup(pool, r->hostname);
     if (!ap_strchr_c(authority, ':') && r->server && r->server->port) {
         apr_port_t defport = apr_uri_port_of_scheme(scheme);
         if (defport != r->server->port) {
             /* port info missing and port is not default for scheme: append */
-            authority = apr_psprintf(r->pool, "%s:%d", authority,
+            authority = apr_psprintf(pool, "%s:%d", authority,
                                      (int)r->server->port);
         }
     }
     
-    status = h2_req_make(req, r->pool,  r->method, scheme, authority,
-                         apr_uri_unparse(r->pool, &r->parsed_uri, 
-                                         APR_URI_UNP_OMITSITEPART),
+    status = h2_req_make(req, pool, apr_pstrdup(pool, r->method), scheme, 
+                         authority, apr_uri_unparse(pool, &r->parsed_uri, 
+                                                    APR_URI_UNP_OMITSITEPART),
                          r->headers_in);
     return status;
 }
index 168d3796a9c5900ff379d34eb8a1ea0f177a95af..ba48f4a15276474ddeac3b0f0301a83b6c7e3d96 100644 (file)
@@ -18,7 +18,8 @@
 
 #include "h2.h"
 
-apr_status_t h2_request_rwrite(h2_request *req, request_rec *r);
+apr_status_t h2_request_rwrite(h2_request *req, apr_pool_t *pool, 
+                               request_rec *r);
 
 apr_status_t h2_request_add_header(h2_request *req, apr_pool_t *pool,
                                    const char *name, size_t nlen,
index 0f8accab928d6d8d5e9e4d8bfbeb06aa5829e109..aa6260749699277a48e0941ab61a45526034568e 100644 (file)
@@ -315,6 +315,9 @@ static apr_status_t stream_release(h2_session *session,
                                    uint32_t error_code) 
 {
     conn_rec *c = session->c;
+    apr_bucket *b;
+    apr_status_t status;
+    
     if (!error_code) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
                       "h2_stream(%ld-%d): handled, closing", 
@@ -333,8 +336,11 @@ static apr_status_t stream_release(h2_session *session,
         h2_stream_rst(stream, error_code);
     }
     
-    return h2_conn_io_writeb(&session->io,
-                             h2_bucket_eos_create(c->bucket_alloc, stream), 0);
+    b = h2_bucket_eos_create(c->bucket_alloc, stream);
+    APR_BRIGADE_INSERT_TAIL(session->bbtmp, b);
+    status = h2_conn_io_pass(&session->io, session->bbtmp);
+    apr_brigade_cleanup(session->bbtmp);
+    return status;
 }
 
 static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id,
@@ -538,13 +544,6 @@ static int on_frame_recv_cb(nghttp2_session *ng2s,
     return 0;
 }
 
-static apr_status_t pass_data(void *ctx, 
-                              const char *data, apr_off_t length)
-{
-    return h2_conn_io_write(&((h2_session*)ctx)->io, data, length);
-}
-
-
 static char immortal_zeros[H2_MAX_PADLEN];
 
 static int on_send_data_cb(nghttp2_session *ngh2, 
@@ -582,49 +581,30 @@ static int on_send_data_cb(nghttp2_session *ngh2,
                   "h2_stream(%ld-%d): send_data_cb for %ld bytes",
                   session->id, (int)stream_id, (long)length);
                   
-    if (h2_conn_io_is_buffered(&session->io)) {
-        status = h2_conn_io_write(&session->io, (const char *)framehd, 9);
-        if (status == APR_SUCCESS) {
-            if (padlen) {
-                status = h2_conn_io_write(&session->io, (const char *)&padlen, 1);
-            }
-            
-            if (status == APR_SUCCESS) {
-                apr_off_t len = length;
-                status = h2_stream_readx(stream, pass_data, session, &len, &eos);
-                if (status == APR_SUCCESS && len != length) {
-                    status = APR_EINVAL;
-                }
-            }
-            
-            if (status == APR_SUCCESS && padlen) {
-                if (padlen) {
-                    status = h2_conn_io_write(&session->io, immortal_zeros, padlen);
-                }
-            }
-        }
+    status = h2_conn_io_write(&session->io, (const char *)framehd, 9);
+    if (padlen && status == APR_SUCCESS) {
+        status = h2_conn_io_write(&session->io, (const char *)&padlen, 1);
     }
-    else {
-        status = h2_conn_io_write(&session->io, (const char *)framehd, 9);
-        if (padlen && status == APR_SUCCESS) {
-            status = h2_conn_io_write(&session->io, (const char *)&padlen, 1);
-        }
-        if (status == APR_SUCCESS) {
-            apr_off_t len = length;
-            status = h2_stream_read_to(stream, session->io.output, &len, &eos);
-            if (status == APR_SUCCESS && len != length) {
-                status = APR_EINVAL;
-            }
-        }
-            
-        if (status == APR_SUCCESS && padlen) {
-            b = apr_bucket_immortal_create(immortal_zeros, padlen, 
-                                           session->c->bucket_alloc);
-            status = h2_conn_io_writeb(&session->io, b, 0);
+    
+    if (status == APR_SUCCESS) {
+        apr_off_t len = length;
+        status = h2_stream_read_to(stream, session->bbtmp, &len, &eos);
+        if (status == APR_SUCCESS && len != length) {
+            status = APR_EINVAL;
         }
     }
     
+    if (status == APR_SUCCESS && padlen) {
+        b = apr_bucket_immortal_create(immortal_zeros, padlen, 
+                                       session->c->bucket_alloc);
+        APR_BRIGADE_INSERT_TAIL(session->bbtmp, b);
+    }
     
+    if (status == APR_SUCCESS) {
+        status = h2_conn_io_pass(&session->io, session->bbtmp);
+    }
+        
+    apr_brigade_cleanup(session->bbtmp);
     if (status == APR_SUCCESS) {
         stream->data_frames_sent++;
         h2_conn_io_consider_pass(&session->io);
@@ -684,45 +664,31 @@ static apr_status_t init_callbacks(conn_rec *c, nghttp2_session_callbacks **pcb)
     return APR_SUCCESS;
 }
 
-static void h2_session_cleanup(h2_session *session)
+static void h2_session_destroy(h2_session *session)
 {
-    AP_DEBUG_ASSERT(session);
-    /* This is an early cleanup of the session that may
-     * discard what is no longer necessary for *new* streams
-     * and general HTTP/2 processing.
-     * At this point, all frames are in transit or somehwere in
-     * our buffers or passed down output filters.
-     * h2 streams might still being written out.
-     */
-    if (session->c) {
-        h2_ctx_clear(session->c);
+    AP_DEBUG_ASSERT(session);    
+
+    h2_ihash_clear(session->streams);
+    if (session->mplx) {
+        h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
+        h2_mplx_release_and_join(session->mplx, session->iowait);
+        session->mplx = NULL;
     }
+
+    ap_remove_input_filter_byhandle((session->r? session->r->input_filters :
+                                     session->c->input_filters), "H2_IN");
     if (session->ngh2) {
         nghttp2_session_del(session->ngh2);
         session->ngh2 = NULL;
     }
-}
+    if (session->c) {
+        h2_ctx_clear(session->c);
+    }
 
-static void h2_session_destroy(h2_session *session)
-{
-    AP_DEBUG_ASSERT(session);
-    
-    h2_session_cleanup(session);
-    AP_DEBUG_ASSERT(session->open_streams == h2_ihash_count(session->streams));
-    h2_ihash_clear(session->streams);
-    session->open_streams = 0;
-    
-    ap_remove_input_filter_byhandle((session->r? session->r->input_filters :
-                                     session->c->input_filters), "H2_IN");
     if (APLOGctrace1(session->c)) {
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
                       "h2_session(%ld): destroy", session->id);
     }
-    if (session->mplx) {
-        h2_mplx_set_consumed_cb(session->mplx, NULL, NULL);
-        h2_mplx_release_and_join(session->mplx, session->iowait);
-        session->mplx = NULL;
-    }
     if (session->pool) {
         apr_pool_destroy(session->pool);
     }
@@ -903,7 +869,7 @@ static h2_session *h2_session_create_int(conn_rec *c,
                                             h2_session_receive, session);
         ap_add_input_filter("H2_IN", session->cin, r, c);
 
-        h2_conn_io_init(&session->io, c, session->config, session->pool);
+        h2_conn_io_init(&session->io, c, session->config);
         session->bbtmp = apr_brigade_create(session->pool, c->bucket_alloc);
         
         status = init_callbacks(c, &callbacks);
@@ -1504,7 +1470,7 @@ apr_status_t h2_session_set_prio(h2_session *session, h2_stream *stream,
 apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream)
 {
     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
-                  "h2_stream(%ld-%d): cleanup by EOS bucket destroy", 
+                  "h2_stream(%ld-%d): EOS bucket cleanup -> done", 
                   session->id, stream->id);
     h2_ihash_remove(session->streams, stream->id);
     --session->open_streams;
index 20d1d350425b4ec2a4156dc4e1805d06e1e9e287..b445da768b3eab148d6f5ec37f1490d77cd5326d 100644 (file)
@@ -203,6 +203,9 @@ void h2_stream_cleanup(h2_stream *stream)
 void h2_stream_destroy(h2_stream *stream)
 {
     AP_DEBUG_ASSERT(stream);
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, 
+                  "h2_stream(%ld-%d): destroy", 
+                  stream->session->id, stream->id);
     if (stream->input) {
         h2_beam_destroy(stream->input);
         stream->input = NULL;
@@ -248,7 +251,7 @@ apr_status_t h2_stream_set_request(h2_stream *stream, request_rec *r)
         return APR_ECONNRESET;
     }
     set_state(stream, H2_STREAM_ST_OPEN);
-    status = h2_request_rwrite(stream->request, r);
+    status = h2_request_rwrite(stream->request, stream->pool, r);
     stream->request->serialize = h2_config_geti(h2_config_rget(r), 
                                                 H2_CONF_SER_HEADERS);
     ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO(03058)
@@ -453,12 +456,14 @@ apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response,
     return status;
 }
 
+static const apr_size_t DATA_CHUNK_SIZE = ((16*1024) - 100 - 9); 
+
 apr_status_t h2_stream_out_prepare(h2_stream *stream,
                                    apr_off_t *plen, int *peos)
 {
     conn_rec *c = stream->session->c;
     apr_status_t status = APR_SUCCESS;
-    apr_off_t requested = (*plen > 0)? *plen : 32*1024;
+    apr_off_t requested;
 
     if (stream->rst_error) {
         *plen = 0;
@@ -466,11 +471,19 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream,
         return APR_ECONNRESET;
     }
 
+    if (*plen > 0) {
+        requested = H2MIN(*plen, DATA_CHUNK_SIZE);
+    }
+    else {
+        requested = DATA_CHUNK_SIZE;
+    }
+    *plen = requested;
+    
     H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "h2_stream_out_prepare_pre");
     h2_util_bb_avail(stream->buffer, plen, peos);
-    if (!*peos && !*plen) {
+    if (!*peos && *plen < requested) {
         /* try to get more data */
-        status = fill_buffer(stream, H2MIN(requested, 32*1024));
+        status = fill_buffer(stream, (requested - *plen) + DATA_CHUNK_SIZE);
         if (APR_STATUS_IS_EOF(status)) {
             apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc);
             APR_BRIGADE_INSERT_TAIL(stream->buffer, eos);
@@ -491,27 +504,6 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream,
 }
 
 
-apr_status_t h2_stream_readx(h2_stream *stream, 
-                             h2_io_data_cb *cb, void *ctx,
-                             apr_off_t *plen, int *peos)
-{
-    conn_rec *c = stream->session->c;
-    apr_status_t status = APR_SUCCESS;
-
-    if (stream->rst_error) {
-        return APR_ECONNRESET;
-    }
-    status = h2_util_bb_readx(stream->buffer, cb, ctx, plen, peos);
-    if (status == APR_SUCCESS && !*peos && !*plen) {
-        status = APR_EAGAIN;
-    }
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c,
-                  "h2_stream(%ld-%d): readx, len=%ld eos=%d",
-                  c->id, stream->id, (long)*plen, *peos);
-    return status;
-}
-
-
 apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb, 
                                apr_off_t *plen, int *peos)
 {
index 8ae600c78a49a3f5cd2a2605bda14f89523854d9..66dca0dbb482359a4c88331d459894f1ccada716 100644 (file)
@@ -204,23 +204,6 @@ apr_status_t h2_stream_set_response(h2_stream *stream,
 apr_status_t h2_stream_out_prepare(h2_stream *stream, 
                                    apr_off_t *plen, int *peos);
 
-/**
- * Read data from the stream output.
- * 
- * @param stream the stream to read from
- * @param cb callback to invoke for byte chunks read. Might be invoked
- *        multiple times (with different values) for one read operation.
- * @param ctx context data for callback
- * @param plen (in-/out) max. number of bytes to read and on return actual
- *        number of bytes read
- * @param peos (out) != 0 iff end of stream has been reached while reading
- * @return APR_SUCCESS if out information was computed successfully.
- *         APR_EAGAIN if not data is available and end of stream has not been
- *         reached yet.
- */
-apr_status_t h2_stream_readx(h2_stream *stream, h2_io_data_cb *cb, 
-                             void *ctx, apr_off_t *plen, int *peos);
-
 /**
  * Read a maximum number of bytes into the bucket brigade.
  *