]> git.ipfire.org Git - thirdparty/apache/httpd.git/commitdiff
combined patches for alpha testing next release of http2
authorStefan Eissing <icing@apache.org>
Fri, 23 Oct 2015 13:34:20 +0000 (13:34 +0000)
committerStefan Eissing <icing@apache.org>
Fri, 23 Oct 2015 13:34:20 +0000 (13:34 +0000)
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4-http2-alpha@1710210 13f79535-47bb-0310-9956-ffa450edef68

18 files changed:
CHANGES
modules/http2/h2_conn.c
modules/http2/h2_io.c
modules/http2/h2_io.h
modules/http2/h2_io_set.c
modules/http2/h2_io_set.h
modules/http2/h2_mplx.c
modules/http2/h2_mplx.h
modules/http2/h2_session.c
modules/http2/h2_stream.c
modules/http2/h2_stream.h
modules/http2/h2_task.c
modules/http2/h2_util.c
modules/http2/h2_util.h
modules/http2/h2_version.h
modules/http2/h2_worker.c
modules/http2/h2_workers.c
modules/http2/h2_workers.h

diff --git a/CHANGES b/CHANGES
index c7e204f9b0a47855ca58e61b80259cc87645d322..c12e96e957cc4d873ce8da20556978b0bd9e9571 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -1,7 +1,11 @@
                                                          -*- coding: utf-8 -*-
 
 Changes with Apache 2.4.18
-
+  *) mod_http2: reworked deallocation on connection shutdown and worker
+     abort. Separate parent pool for all workers. worker threads are joined
+     on planned worker shutdown.
+     [Yann Ylavic, Stefan Eissing]
+     
 
 Changes with Apache 2.4.17
 
index 8bffbc42693cad31e50ff0b3b86e15046c9922da..1cb3234ca1f9de2ebc7f4b326f0f4b28e776b865 100644 (file)
@@ -255,7 +255,7 @@ apr_status_t h2_session_process(h2_session *session)
             have_written = 1;
             wait_micros = 0;
         }
-        else if (status == APR_EAGAIN) {
+        else if (APR_STATUS_IS_EAGAIN(status)) {
             /* nop */
         }
         else if (status == APR_TIMEUP) {
index 42734430fa48f517a7aafa699918ee300efdaa9c..9f699aa9b530cc9b23b80d0c9081cb29983700a5 100644 (file)
@@ -47,6 +47,12 @@ void h2_io_destroy(h2_io *io)
     h2_io_cleanup(io);
 }
 
+void h2_io_rst(h2_io *io, int error)
+{
+    io->rst_error = error;
+    io->eos_in = 1;
+}
+
 int h2_io_in_has_eos_for(h2_io *io)
 {
     return io->eos_in || (io->bbin && h2_util_has_eos(io->bbin, 0));
@@ -124,16 +130,52 @@ apr_status_t h2_io_out_readx(h2_io *io,
                              h2_io_data_cb *cb, void *ctx, 
                              apr_size_t *plen, int *peos)
 {
+    apr_status_t status;
+    
+    if (io->eos_out) {
+        *plen = 0;
+        *peos = 1;
+        return APR_SUCCESS;
+    }
+    
     if (cb == NULL) {
         /* just checking length available */
-        return h2_util_bb_avail(io->bbout, plen, peos);
+        status = h2_util_bb_avail(io->bbout, plen, peos);
     }
-    return h2_util_bb_readx(io->bbout, cb, ctx, plen, peos);
+    else {
+        status = h2_util_bb_readx(io->bbout, cb, ctx, plen, peos);
+        if (status == APR_SUCCESS) {
+            io->eos_out = *peos;
+        }
+    }
+    
+    return status;
 }
 
 apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb, 
                              apr_size_t maxlen, int *pfile_handles_allowed)
 {
+    apr_status_t status;
+    int start_allowed;
+    
+    if (io->eos_out) {
+        apr_off_t len;
+        /* We have already delivered an EOS bucket to a reader, no
+         * sense in storing anything more here.
+         */
+        status = apr_brigade_length(bb, 1, &len);
+        if (status == APR_SUCCESS) {
+            if (len > 0) {
+                /* someone tries to write real data after EOS, that
+                 * does not look right. */
+                status = APR_EOF;
+            }
+            /* cleanup, as if we had moved the data */
+            apr_brigade_cleanup(bb);
+        }
+        return status;
+    }
+    
     /* Let's move the buckets from the request processing in here, so
      * that the main thread can read them when it has time/capacity.
      *
@@ -144,8 +186,11 @@ apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb,
      * many open files already buffered. Otherwise we will run out of
      * file handles.
      */
-    int start_allowed = *pfile_handles_allowed;
-    apr_status_t status;
+    start_allowed = *pfile_handles_allowed;
+
+    if (io->rst_error) {
+        return APR_ECONNABORTED;
+    }
     status = h2_util_move(io->bbout, bb, maxlen, pfile_handles_allowed, 
                           "h2_io_out_write");
     /* track # file buckets moved into our pool */
@@ -158,7 +203,9 @@ apr_status_t h2_io_out_write(h2_io *io, apr_bucket_brigade *bb,
 
 apr_status_t h2_io_out_close(h2_io *io)
 {
-    APR_BRIGADE_INSERT_TAIL(io->bbout, 
-                            apr_bucket_eos_create(io->bbout->bucket_alloc));
+    if (!io->eos_out && !h2_util_has_eos(io->bbout, 0)) {
+        APR_BRIGADE_INSERT_TAIL(io->bbout, 
+                                apr_bucket_eos_create(io->bbout->bucket_alloc));
+    }
     return APR_SUCCESS;
 }
index 946ee44334e8d995a3a0e8ab05ad4b7f86549c34..23655d21dfa60d70f075868dc28442325c0c5e11 100644 (file)
@@ -33,11 +33,13 @@ struct h2_io {
     apr_bucket_brigade *bbin;    /* input data for stream */
     int eos_in;
     int task_done;
+    int rst_error;
     
     apr_size_t input_consumed;   /* how many bytes have been read */
     struct apr_thread_cond_t *input_arrived; /* block on reading */
     
     apr_bucket_brigade *bbout;   /* output data from stream */
+    int eos_out;
     struct apr_thread_cond_t *output_drained; /* block on writing */
     
     struct h2_response *response;/* submittable response created */
@@ -58,6 +60,11 @@ h2_io *h2_io_create(int id, apr_pool_t *pool, apr_bucket_alloc_t *bucket_alloc);
  */
 void h2_io_destroy(h2_io *io);
 
+/**
+ * Reset the stream with the given error code.
+ */
+void h2_io_rst(h2_io *io, int error);
+
 /**
  * The input data is completely queued. Blocked reads will return immediately
  * and give either data or EOF.
index 91afde8f1f5e2c55f6b12a5fbecaa7797a2951d1..74ab508fefe87591de4c3f05f6457d88ce022061 100644 (file)
@@ -78,19 +78,6 @@ h2_io *h2_io_set_get(h2_io_set *sp, int stream_id)
     return ps? *ps : NULL;
 }
 
-h2_io *h2_io_set_get_highest_prio(h2_io_set *set)
-{
-    h2_io *highest = NULL;
-    int i;
-    for (i = 0; i < set->list->nelts; ++i) {
-        h2_io *io = h2_io_IDX(set->list, i);
-        if (!highest /*|| io-prio even higher */ ) {
-            highest = io;
-        }
-    }
-    return highest;
-}
-
 static void h2_io_set_sort(h2_io_set *sp)
 {
     qsort(sp->list->elts, sp->list->nelts, sp->list->elt_size, 
@@ -118,28 +105,46 @@ apr_status_t h2_io_set_add(h2_io_set *sp, h2_io *io)
     return APR_SUCCESS;
 }
 
+static void remove_idx(h2_io_set *sp, int idx)
+{
+    int n;
+    --sp->list->nelts;
+    n = sp->list->nelts - idx;
+    if (n > 0) {
+        /* Close the hole in the array by moving the upper
+         * parts down one step.
+         */
+        h2_io **selts = (h2_io**)sp->list->elts;
+        memmove(selts + idx, selts + idx + 1, n * sizeof(h2_io*));
+    }
+}
+
 h2_io *h2_io_set_remove(h2_io_set *sp, h2_io *io)
 {
     int i;
     for (i = 0; i < sp->list->nelts; ++i) {
         h2_io *e = h2_io_IDX(sp->list, i);
         if (e == io) {
-            int n;
-            --sp->list->nelts;
-            n = sp->list->nelts - i;
-            if (n > 0) {
-                /* Close the hole in the array by moving the upper
-                 * parts down one step.
-                 */
-                h2_io **selts = (h2_io**)sp->list->elts;
-                memmove(selts+i, selts+i+1, n * sizeof(h2_io*));
-            }
+            remove_idx(sp, i);
             return e;
         }
     }
     return NULL;
 }
 
+h2_io *h2_io_set_pop_highest_prio(h2_io_set *set)
+{
+    /* For now, this just removes the first element in the set.
+     * the name is misleading...
+     */
+    if (set->list->nelts > 0) {
+        h2_io *io = h2_io_IDX(set->list, 0);
+        remove_idx(set, 0);
+        return io;
+    }
+    return NULL;
+}
+
 void h2_io_set_destroy_all(h2_io_set *sp)
 {
     int i;
index a9c6546c7015a9ada43298a33345cb4abcca4e86..5e7555af92e35bacef141c5a7f86c023ce28ba9c 100644 (file)
@@ -30,7 +30,6 @@ void h2_io_set_destroy(h2_io_set *set);
 
 apr_status_t h2_io_set_add(h2_io_set *set, struct h2_io *io);
 h2_io *h2_io_set_get(h2_io_set *set, int stream_id);
-h2_io *h2_io_set_get_highest_prio(h2_io_set *set);
 h2_io *h2_io_set_remove(h2_io_set *set, struct h2_io *io);
 
 void h2_io_set_remove_all(h2_io_set *set);
@@ -44,4 +43,6 @@ typedef int h2_io_set_iter_fn(void *ctx, struct h2_io *io);
 void h2_io_set_iter(h2_io_set *set,
                            h2_io_set_iter_fn *iter, void *ctx);
 
+h2_io *h2_io_set_pop_highest_prio(h2_io_set *set);
+
 #endif /* defined(__mod_h2__h2_io_set__) */
index 2d07b1eb6c344c64ccbb6b87c3781adbe3076204..b1513db10e3d7b5b48d378066bacfa6399dfa506 100644 (file)
@@ -41,6 +41,7 @@
 #include "h2_task_output.h"
 #include "h2_task_queue.h"
 #include "h2_workers.h"
+#include "h2_util.h"
 
 
 static int is_aborted(h2_mplx *m, apr_status_t *pstatus) {
@@ -143,12 +144,18 @@ static void reference(h2_mplx *m)
     apr_atomic_inc32(&m->refs);
 }
 
-static void release(h2_mplx *m)
+static void release(h2_mplx *m, int lock)
 {
     if (!apr_atomic_dec32(&m->refs)) {
+        if (lock) {
+            apr_thread_mutex_lock(m->lock);
+        }
         if (m->join_wait) {
             apr_thread_cond_signal(m->join_wait);
         }
+        if (lock) {
+            apr_thread_mutex_unlock(m->lock);
+        }
     }
 }
 
@@ -158,7 +165,7 @@ void h2_mplx_reference(h2_mplx *m)
 }
 void h2_mplx_release(h2_mplx *m)
 {
-    release(m);
+    release(m, 1);
 }
 
 static void workers_register(h2_mplx *m) {
@@ -187,29 +194,17 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
 
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
-        int attempts = 0;
-        
-        release(m);
+        release(m, 0);
         while (apr_atomic_read32(&m->refs) > 0) {
             m->join_wait = wait;
-            ap_log_cerror(APLOG_MARK, (attempts? APLOG_INFO : APLOG_DEBUG), 
-                          0, m->c,
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
                           "h2_mplx(%ld): release_join, refs=%d, waiting...", 
                           m->id, m->refs);
-            apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(10));
-            if (++attempts >= 6) {
-                ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
-                              APLOGNO(02952) 
-                              "h2_mplx(%ld): join attempts exhausted, refs=%d", 
-                              m->id, m->refs);
-                break;
-            }
-        }
-        if (m->join_wait) {
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
-                          "h2_mplx(%ld): release_join -> destroy", m->id);
+            apr_thread_cond_wait(wait, m->lock);
         }
         m->join_wait = NULL;
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
+                      "h2_mplx(%ld): release_join -> destroy", m->id);
         apr_thread_mutex_unlock(m->lock);
         h2_mplx_destroy(m);
     }
@@ -298,6 +293,13 @@ apr_status_t h2_mplx_cleanup_stream(h2_mplx *m, h2_stream *stream)
             stream_destroy(m, stream, io);
         }
         else {
+            if (stream->rst_error) {
+                /* Forward error code to fail any further attempt to
+                 * write to io */
+                h2_io_rst(io, stream->rst_error);
+            }
+            /* Remove io from ready set (if there), since we will never submit it */
+            h2_io_set_remove(m->ready_ios, io);
             /* Add stream to closed set for cleanup when task is done */
             h2_stream_set_add(m->closed, stream);
         }
@@ -345,7 +347,7 @@ apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
         if (io) {
             io->input_arrived = iowait;
             status = h2_io_in_read(io, bb, 0);
-            while (status == APR_EAGAIN 
+            while (APR_STATUS_IS_EAGAIN(status) 
                    && !is_aborted(m, &status)
                    && block == APR_BLOCK_READ) {
                 apr_thread_cond_wait(io->input_arrived, m->lock);
@@ -454,6 +456,13 @@ apr_status_t h2_mplx_in_update_windows(h2_mplx *m,
     return status;
 }
 
+#define H2_MPLX_IO_OUT(lvl,m,io,msg) \
+    do { \
+        if (APLOG_C_IS_LEVEL((m)->c,lvl)) \
+        h2_util_bb_log((m)->c,(io)->id,lvl,msg,(io)->bbout); \
+    } while(0)
+
+
 apr_status_t h2_mplx_out_readx(h2_mplx *m, int stream_id, 
                                h2_io_data_cb *cb, void *ctx, 
                                apr_size_t *plen, int *peos)
@@ -467,8 +476,12 @@ apr_status_t h2_mplx_out_readx(h2_mplx *m, int stream_id,
     if (APR_SUCCESS == status) {
         h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
         if (io) {
+            H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_readx_pre");
+            
             status = h2_io_out_readx(io, cb, ctx, plen, peos);
-            if (status == APR_SUCCESS && io->output_drained) {
+            
+            H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_readx_post");
+            if (status == APR_SUCCESS && cb && io->output_drained) {
                 apr_thread_cond_signal(io->output_drained);
             }
         }
@@ -490,24 +503,30 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_stream_set *streams)
     }
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
-        h2_io *io = h2_io_set_get_highest_prio(m->ready_ios);
+        h2_io *io = h2_io_set_pop_highest_prio(m->ready_ios);
         if (io) {
-            h2_response *response = io->response;
-            
-            AP_DEBUG_ASSERT(response);
-            h2_io_set_remove(m->ready_ios, io);
-            
-            stream = h2_stream_set_get(streams, response->stream_id);
+            stream = h2_stream_set_get(streams, io->id);
             if (stream) {
-                h2_stream_set_response(stream, response, io->bbout);
+                if (io->rst_error) {
+                    h2_stream_rst(stream, io->rst_error);
+                }
+                else {
+                    AP_DEBUG_ASSERT(io->response);
+                    h2_stream_set_response(stream, io->response, io->bbout);
+                }
+                
                 if (io->output_drained) {
                     apr_thread_cond_signal(io->output_drained);
                 }
             }
             else {
-                ap_log_cerror(APLOG_MARK, APLOG_WARNING, APR_NOTFOUND, m->c,
-                              APLOGNO(02953) "h2_mplx(%ld): stream for response %d",
-                              m->id, response->stream_id);
+                ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(02953) 
+                              "h2_mplx(%ld): stream for response %d not found",
+                              m->id, io->id);
+                /* We have the io ready, but the stream has gone away, maybe
+                 * reset by the client. Should no longer happen since such
+                 * streams should clear io's from the ready queue.
+                 */
             }
         }
         apr_thread_mutex_unlock(m->lock);
@@ -531,7 +550,6 @@ static apr_status_t out_write(h2_mplx *m, h2_io *io,
         
         status = h2_io_out_write(io, bb, m->stream_max_mem, 
                                  &m->file_handles_allowed);
-        
         /* Wait for data to drain until there is room again */
         while (!APR_BRIGADE_EMPTY(bb) 
                && iowait
@@ -549,6 +567,7 @@ static apr_status_t out_write(h2_mplx *m, h2_io *io,
         }
     }
     apr_brigade_cleanup(bb);
+    
     return status;
 }
 
@@ -592,6 +611,9 @@ apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response,
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
         status = out_open(m, stream_id, response, f, bb, iowait);
+        if (APLOGctrace1(m->c)) {
+            h2_util_bb_log(m->c, stream_id, APLOG_TRACE1, "h2_mplx_out_open", bb);
+        }
         if (m->aborted) {
             return APR_ECONNABORTED;
         }
@@ -616,6 +638,8 @@ apr_status_t h2_mplx_out_write(h2_mplx *m, int stream_id,
             h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
             if (io) {
                 status = out_write(m, io, f, bb, iowait);
+                H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_write");
+                
                 have_out_data_for(m, stream_id);
                 if (m->aborted) {
                     return APR_ECONNABORTED;
@@ -645,7 +669,7 @@ apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id)
         if (!m->aborted) {
             h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
             if (io) {
-                if (!io->response || !io->response->ngheader) {
+                if (!io->response && !io->rst_error) {
                     /* In case a close comes before a response was created,
                      * insert an error one so that our streams can properly
                      * reset.
@@ -653,8 +677,48 @@ apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id)
                     h2_response *r = h2_response_create(stream_id, 
                                                         "500", NULL, m->pool);
                     status = out_open(m, stream_id, r, NULL, NULL, NULL);
+                    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c,
+                                  "h2_mplx(%ld-%d): close, no response, no rst", 
+                                  m->id, io->id);
                 }
                 status = h2_io_out_close(io);
+                H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_close");
+                
+                have_out_data_for(m, stream_id);
+                if (m->aborted) {
+                    /* if we were the last output, the whole session might
+                     * have gone down in the meantime.
+                     */
+                    return APR_SUCCESS;
+                }
+            }
+            else {
+                status = APR_ECONNABORTED;
+            }
+        }
+        apr_thread_mutex_unlock(m->lock);
+    }
+    return status;
+}
+
+apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error)
+{
+    apr_status_t status;
+    AP_DEBUG_ASSERT(m);
+    if (m->aborted) {
+        return APR_ECONNABORTED;
+    }
+    status = apr_thread_mutex_lock(m->lock);
+    if (APR_SUCCESS == status) {
+        if (!m->aborted) {
+            h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+            if (io && !io->rst_error) {
+                h2_io_rst(io, error);
+                if (!io->response) {
+                        h2_io_set_add(m->ready_ios, io);
+                }
+                H2_MPLX_IO_OUT(APLOG_TRACE2, m, io, "h2_mplx_out_rst");
+                
                 have_out_data_for(m, stream_id);
                 if (m->aborted) {
                     /* if we were the last output, the whole session might
index 62977d6157cecd675eb3fce9dcfcd8c706bfe191..5cb40af16fa393040e2a7357439644e98867f56a 100644 (file)
@@ -96,6 +96,7 @@ void h2_mplx_reference(h2_mplx *m);
  * Decreases the reference counter of this mplx.
  */
 void h2_mplx_release(h2_mplx *m);
+
 /**
  * Decreases the reference counter of this mplx and waits for it
  * to reached 0, destroy the mplx afterwards.
@@ -247,6 +248,8 @@ apr_status_t h2_mplx_out_write(h2_mplx *mplx, int stream_id,
  */
 apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id);
 
+apr_status_t h2_mplx_out_rst(h2_mplx *m, int stream_id, int error);
+
 /*******************************************************************************
  * h2_mplx list Manipulation.
  ******************************************************************************/
index c3456a0654da6254ae50e3e3e9787d20238f4c1b..d8a8735afac45c583905a20b6af5637259334a41 100644 (file)
@@ -41,17 +41,16 @@ static int frame_print(const nghttp2_frame *frame, char *buffer, size_t maxlen);
 
 static int h2_session_status_from_apr_status(apr_status_t rv)
 {
-    switch (rv) {
-        case APR_SUCCESS:
-            return NGHTTP2_NO_ERROR;
-        case APR_EAGAIN:
-        case APR_TIMEUP:
-            return NGHTTP2_ERR_WOULDBLOCK;
-        case APR_EOF:
+    if (rv == APR_SUCCESS) {
+        return NGHTTP2_NO_ERROR;
+    }
+    else if (APR_STATUS_IS_EAGAIN(rv)) {
+        return NGHTTP2_ERR_WOULDBLOCK;
+    }
+    else if (APR_STATUS_IS_EOF(rv)) {
             return NGHTTP2_ERR_EOF;
-        default:
-            return NGHTTP2_ERR_PROTO;
     }
+    return NGHTTP2_ERR_PROTO;
 }
 
 static int stream_open(h2_session *session, int stream_id)
@@ -107,7 +106,7 @@ static ssize_t send_cb(nghttp2_session *ngh2,
     if (status == APR_SUCCESS) {
         return length;
     }
-    if (status == APR_EAGAIN || status == APR_TIMEUP) {
+    if (APR_STATUS_IS_EAGAIN(status)) {
         return NGHTTP2_ERR_WOULDBLOCK;
     }
     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
@@ -243,6 +242,7 @@ static apr_status_t stream_destroy(h2_session *session,
                       "h2_stream(%ld-%d): closing with err=%d %s", 
                       session->id, (int)stream->id, (int)error_code,
                       nghttp2_strerror(error_code));
+        h2_stream_rst(stream, error_code);
     }
     
     h2_stream_set_remove(session->streams, stream);
@@ -508,12 +508,11 @@ static int on_send_data_cb(nghttp2_session *ngh2,
     if (status == APR_SUCCESS) {
         return 0;
     }
-    else if (status != APR_EOF) {
-        ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
+    else {
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
                       APLOGNO(02925) 
                       "h2_stream(%ld-%d): failed send_data_cb",
                       session->id, (int)stream_id);
-        return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
     
     return h2_session_status_from_apr_status(status);
@@ -731,10 +730,12 @@ apr_status_t h2_session_abort(h2_session *session, apr_status_t reason, int rv)
                 rv = 0;                            /* ...gracefully shut down */
                 break;
             case APR_EBADF:        /* connection unusable, terminate silently */
-            case APR_ECONNABORTED:
-                rv = NGHTTP2_ERR_EOF;
-                break;
             default:
+                if (APR_STATUS_IS_ECONNABORTED(reason)
+                    || APR_STATUS_IS_ECONNRESET(reason)
+                    || APR_STATUS_IS_EBADF(reason)) {
+                    rv = NGHTTP2_ERR_EOF;
+                }
                 break;
         }
     }
@@ -910,7 +911,7 @@ apr_status_t h2_session_write(h2_session *session, apr_interval_time_t timeout)
     if (status == APR_SUCCESS) {
         flush_output = 1;
     }
-    else if (status != APR_EAGAIN) {
+    else if (!APR_STATUS_IS_EAGAIN(status)) {
         return status;
     }
     
@@ -1062,6 +1063,10 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
         case APR_SUCCESS:
             break;
             
+        case APR_ECONNRESET:
+            return nghttp2_submit_rst_stream(ng2s, NGHTTP2_FLAG_NONE,
+                stream->id, stream->rst_error);
+            
         case APR_EAGAIN:
             /* If there is no data available, our session will automatically
              * suspend this stream and not ask for more data until we resume
@@ -1141,11 +1146,15 @@ apr_status_t h2_session_handle_response(h2_session *session, h2_stream *stream)
     int rv = 0;
     AP_DEBUG_ASSERT(session);
     AP_DEBUG_ASSERT(stream);
-    AP_DEBUG_ASSERT(stream->response);
+    AP_DEBUG_ASSERT(stream->response || stream->rst_error);
     
-    if (stream->response->ngheader) {
+    if (stream->response && stream->response->ngheader) {
         rv = submit_response(session, stream->response);
     }
+    else if (stream->rst_error) {
+        rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
+                                       stream->id, stream->rst_error);
+    }
     else {
         rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
                                        stream->id, NGHTTP2_PROTOCOL_ERROR);
index 52781d8474fec50392e95dbf3417dc5eaf722cf1..d5a716f4490b381125a856d3563e6ab13aab4fa5 100644 (file)
@@ -86,11 +86,6 @@ apr_status_t h2_stream_destroy(h2_stream *stream)
     return APR_SUCCESS;
 }
 
-void h2_stream_attach_pool(h2_stream *stream, apr_pool_t *pool)
-{
-    stream->pool = pool;
-}
-
 apr_pool_t *h2_stream_detach_pool(h2_stream *stream)
 {
     apr_pool_t *pool = stream->pool;
@@ -98,25 +93,41 @@ apr_pool_t *h2_stream_detach_pool(h2_stream *stream)
     return pool;
 }
 
-void h2_stream_abort(h2_stream *stream)
+void h2_stream_rst(h2_stream *stream, int error_code)
 {
-    AP_DEBUG_ASSERT(stream);
-    stream->aborted = 1;
+    stream->rst_error = error_code;
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->m->c,
+                  "h2_stream(%ld-%d): reset, error=%d", 
+                  stream->m->id, stream->id, error_code);
 }
 
 apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response,
                                     apr_bucket_brigade *bb)
 {
+    apr_status_t status = APR_SUCCESS;
+    
     stream->response = response;
     if (bb && !APR_BRIGADE_EMPTY(bb)) {
         if (!stream->bbout) {
             stream->bbout = apr_brigade_create(stream->pool, 
                                                stream->m->c->bucket_alloc);
         }
-        return h2_util_move(stream->bbout, bb, 16 * 1024, NULL,  
-                            "h2_stream_set_response");
+        status = h2_util_move(stream->bbout, bb, 16 * 1024, NULL,  
+                              "h2_stream_set_response");
     }
-    return APR_SUCCESS;
+    if (APLOGctrace1(stream->m->c)) {
+        apr_size_t len = 0;
+        int eos = 0;
+        if (stream->bbout) {
+            h2_util_bb_avail(stream->bbout, &len, &eos);
+        }
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, stream->m->c,
+                      "h2_stream(%ld-%d): set_response(%s), brigade=%s, "
+                      "len=%ld, eos=%d", 
+                      stream->m->id, stream->id, response->status,
+                      (stream->bbout? "yes" : "no"), (long)len, (int)eos);
+    }
+    return status;
 }
 
 static int set_closed(h2_stream *stream) 
@@ -141,6 +152,9 @@ apr_status_t h2_stream_rwrite(h2_stream *stream, request_rec *r)
 {
     apr_status_t status;
     AP_DEBUG_ASSERT(stream);
+    if (stream->rst_error) {
+        return APR_ECONNRESET;
+    }
     set_state(stream, H2_STREAM_ST_OPEN);
     status = h2_request_rwrite(stream->request, r, stream->m);
     return status;
@@ -151,6 +165,9 @@ apr_status_t h2_stream_write_eoh(h2_stream *stream, int eos)
     apr_status_t status;
     AP_DEBUG_ASSERT(stream);
     
+    if (stream->rst_error) {
+        return APR_ECONNRESET;
+    }
     /* Seeing the end-of-headers, we have everything we need to 
      * start processing it.
      */
@@ -180,6 +197,9 @@ apr_status_t h2_stream_write_eos(h2_stream *stream)
     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->m->c,
                   "h2_stream(%ld-%d): closing input",
                   stream->m->id, stream->id);
+    if (stream->rst_error) {
+        return APR_ECONNRESET;
+    }
     if (set_closed(stream)) {
         return h2_request_close(stream->request);
     }
@@ -191,6 +211,9 @@ apr_status_t h2_stream_write_header(h2_stream *stream,
                                     const char *value, size_t vlen)
 {
     AP_DEBUG_ASSERT(stream);
+    if (stream->rst_error) {
+        return APR_ECONNRESET;
+    }
     switch (stream->state) {
         case H2_STREAM_ST_IDLE:
             set_state(stream, H2_STREAM_ST_OPEN);
@@ -208,7 +231,9 @@ apr_status_t h2_stream_write_data(h2_stream *stream,
                                   const char *data, size_t len)
 {
     AP_DEBUG_ASSERT(stream);
-    AP_DEBUG_ASSERT(stream);
+    if (stream->rst_error) {
+        return APR_ECONNRESET;
+    }
     switch (stream->state) {
         case H2_STREAM_ST_OPEN:
             break;
@@ -224,6 +249,9 @@ apr_status_t h2_stream_prep_read(h2_stream *stream,
     apr_status_t status = APR_SUCCESS;
     const char *src;
     
+    if (stream->rst_error) {
+        return APR_ECONNRESET;
+    }
     if (stream->bbout && !APR_BRIGADE_EMPTY(stream->bbout)) {
         src = "stream";
         status = h2_util_bb_avail(stream->bbout, plen, peos);
@@ -251,6 +279,9 @@ apr_status_t h2_stream_readx(h2_stream *stream,
                              h2_io_data_cb *cb, void *ctx,
                              apr_size_t *plen, int *peos)
 {
+    if (stream->rst_error) {
+        return APR_ECONNRESET;
+    }
     if (stream->bbout && !APR_BRIGADE_EMPTY(stream->bbout)) {
         return h2_util_bb_readx(stream->bbout, cb, ctx, plen, peos);
     }
index 0608f2f34006fcfeb28f2624637512b28ce523b4..c2bb9af74949dc78efe43e39db7143e36539be0c 100644 (file)
@@ -66,6 +66,7 @@ struct h2_stream {
     struct h2_task *task;       /* task created for this stream */
     struct h2_response *response; /* the response, once ready */
     apr_bucket_brigade *bbout;  /* output DATA */
+    int rst_error;              /* stream error for RST_STREAM */
 };
 
 
@@ -73,10 +74,9 @@ h2_stream *h2_stream_create(int id, apr_pool_t *pool, struct h2_mplx *m);
 
 apr_status_t h2_stream_destroy(h2_stream *stream);
 
-apr_pool_t *h2_stream_detach_pool(h2_stream *stream);
-void h2_stream_attach_pool(h2_stream *stream, apr_pool_t *pool);
+void h2_stream_rst(h2_stream *streamm, int error_code);
 
-void h2_stream_abort(h2_stream *stream);
+apr_pool_t *h2_stream_detach_pool(h2_stream *stream);
 
 apr_status_t h2_stream_rwrite(h2_stream *stream, request_rec *r);
 
index bbea7b20f8aa1ae48309e38cd9a04368b5d703ea..58b39c53970cccbdd388e242e291667d406959eb 100644 (file)
@@ -379,8 +379,6 @@ static request_rec *h2_task_create_request(h2_task_env *env)
     
     /* Time to populate r with the data we have. */
     r->request_time = apr_time_now();
-    r->the_request = apr_psprintf(r->pool, "%s %s HTTP/1.1", 
-                                  env->method, env->path);
     r->method = env->method;
     /* Provide quick information about the request method as soon as known */
     r->method_number = ap_method_number_of(r->method);
@@ -391,6 +389,9 @@ static request_rec *h2_task_create_request(h2_task_env *env)
     ap_parse_uri(r, env->path);
     r->protocol = (char*)"HTTP/2";
     r->proto_num = HTTP_VERSION(2, 0);
+
+    r->the_request = apr_psprintf(r->pool, "%s %s %s", 
+                                  r->method, env->path, r->protocol);
     
     /* update what we think the virtual host is based on the headers we've
      * now read. may update status.
index 9d141be93bf160d5d83d8fca278dd69ba990cc10..6f29b461d66a1b1ef531711188f584b82a293990 100644 (file)
@@ -20,6 +20,7 @@
 #include <httpd.h>
 #include <http_core.h>
 #include <http_log.h>
+#include <http_request.h>
 
 #include <nghttp2/nghttp2.h>
 
@@ -647,3 +648,71 @@ apr_status_t h2_util_bb_readx(apr_bucket_brigade *bb,
     return status;
 }
 
+void h2_util_bb_log(conn_rec *c, int stream_id, int level, 
+                    const char *tag, apr_bucket_brigade *bb)
+{
+    char buffer[16 * 1024];
+    const char *line = "(null)";
+    apr_size_t bmax = sizeof(buffer)/sizeof(buffer[0]);
+    int off = 0;
+    apr_bucket *b;
+    
+    if (bb) {
+        memset(buffer, 0, bmax--);
+        for (b = APR_BRIGADE_FIRST(bb); 
+             bmax && (b != APR_BRIGADE_SENTINEL(bb));
+             b = APR_BUCKET_NEXT(b)) {
+            
+            if (APR_BUCKET_IS_METADATA(b)) {
+                if (APR_BUCKET_IS_EOS(b)) {
+                    off += apr_snprintf(buffer+off, bmax-off, "eos ");
+                }
+                else if (APR_BUCKET_IS_FLUSH(b)) {
+                    off += apr_snprintf(buffer+off, bmax-off, "flush ");
+                }
+                else if (AP_BUCKET_IS_EOR(b)) {
+                    off += apr_snprintf(buffer+off, bmax-off, "eor ");
+                }
+                else {
+                    off += apr_snprintf(buffer+off, bmax-off, "meta(unknown) ");
+                }
+            }
+            else {
+                const char *btype = "data";
+                if (APR_BUCKET_IS_FILE(b)) {
+                    btype = "file";
+                }
+                else if (APR_BUCKET_IS_PIPE(b)) {
+                    btype = "pipe";
+                }
+                else if (APR_BUCKET_IS_SOCKET(b)) {
+                    btype = "socket";
+                }
+                else if (APR_BUCKET_IS_HEAP(b)) {
+                    btype = "heap";
+                }
+                else if (APR_BUCKET_IS_TRANSIENT(b)) {
+                    btype = "transient";
+                }
+                else if (APR_BUCKET_IS_IMMORTAL(b)) {
+                    btype = "immortal";
+                }
+                else if (APR_BUCKET_IS_MMAP(b)) {
+                    btype = "mmap";
+                }
+                else if (APR_BUCKET_IS_POOL(b)) {
+                    btype = "pool";
+                }
+                
+                off += apr_snprintf(buffer+off, bmax-off, "%s[%ld] ", 
+                                    btype, 
+                                    (long)(b->length == ((apr_size_t)-1)? 
+                                           -1 : b->length));
+            }
+        }
+        line = *buffer? buffer : "(empty)";
+    }
+    ap_log_cerror(APLOG_MARK, level, 0, c, "bb_dump(%ld-%d)-%s: %s", 
+                  c->id, stream_id, tag, line);
+
+}
index 9a1b5c6d35bf9ae36b271b5c3235d592f77e0837..e1a6b3c4d2039e33217826fe9eec346f82b01d08 100644 (file)
@@ -117,8 +117,32 @@ apr_status_t h2_util_bb_avail(apr_bucket_brigade *bb,
 typedef apr_status_t h2_util_pass_cb(void *ctx, 
                                        const char *data, apr_size_t len);
 
+/**
+ * Read at most *plen bytes from the brigade and pass them into the
+ * given callback. If cb is NULL, just return the amount of data that
+ * could have been read.
+ * If an EOS was/would be encountered, set *peos != 0.
+ * @param bb the brigade to read from
+ * @param cb the callback to invoke for the read data
+ * @param ctx optional data passed to callback
+ * @param plen inout, as input gives the maximum number of bytes to read,
+ *    on return specifies the actual/would be number of bytes
+ * @param peos != 0 iff an EOS bucket was/would be encountered.
+ */
 apr_status_t h2_util_bb_readx(apr_bucket_brigade *bb, 
                               h2_util_pass_cb *cb, void *ctx, 
                               apr_size_t *plen, int *peos);
 
+/**
+ * Logs the bucket brigade (which bucket types with what length)
+ * to the log at the given level.
+ * @param c the connection to log for
+ * @param stream_id the stream identifier this brigade belongs to
+ * @param level the log level (as in APLOG_*)
+ * @param tag a short message text about the context
+ * @param bb the brigade to log
+ */
+void h2_util_bb_log(conn_rec *c, int stream_id, int level, 
+                    const char *tag, apr_bucket_brigade *bb);
+
 #endif /* defined(__mod_h2__h2_util__) */
index 7a03865c87c2ed20a558d0f231738964ca4a8662..dc7c57722113cce848c77f16ad94d09e1ad8cbb4 100644 (file)
@@ -20,7 +20,7 @@
  * @macro
  * Version number of the h2 module as c string
  */
-#define MOD_HTTP2_VERSION "1.0.0"
+#define MOD_HTTP2_VERSION "1.0.1-DEV"
 
 /**
  * @macro
@@ -28,7 +28,7 @@
  * release. This is a 24 bit number with 8 bits for major number, 8 bits
  * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
  */
-#define MOD_HTTP2_VERSION_NUM 0x010000
+#define MOD_HTTP2_VERSION_NUM 0x010001
 
 
 #endif /* mod_h2_h2_version_h */
index 8145b7aaa5f72bdb5d5683651696970768f9f114..297b4b21fe6be1198ecbf2a8a153eb85c9d3bfd7 100644 (file)
@@ -71,6 +71,19 @@ static void* APR_THREAD_FUNC execute(apr_thread_t *thread, void *wctx)
     return NULL;
 }
 
+static apr_status_t cleanup_join_thread(void *ctx)
+{
+    h2_worker *w = ctx;
+    /* do the join only when the worker is aborted. Otherwise,
+     * we are probably in a process shutdown.
+     */
+    if (w->thread && w->aborted) {
+        apr_status_t rv;
+        apr_thread_join(&rv, w->thread);
+    }
+    return APR_SUCCESS;
+}
+
 h2_worker *h2_worker_create(int id,
                             apr_pool_t *parent_pool,
                             apr_threadattr_t *attr,
@@ -110,6 +123,7 @@ h2_worker *h2_worker_create(int id,
             return NULL;
         }
         
+        apr_pool_pre_cleanup_register(pool, w, cleanup_join_thread);
         apr_thread_create(&w->thread, attr, execute, w, pool);
     }
     return w;
index cf3009585b7088c549aacc31ddcaeeb78818855d..18f39d136c9b64dd5f76bd7bf58cc3080175ab3e 100644 (file)
@@ -42,6 +42,22 @@ static int in_list(h2_workers *workers, h2_mplx *m)
     return 0;
 }
 
+static void cleanup_zombies(h2_workers *workers, int lock) {
+    if (lock) {
+        apr_thread_mutex_lock(workers->lock);
+    }
+    while (!H2_WORKER_LIST_EMPTY(&workers->zombies)) {
+        h2_worker *zombie = H2_WORKER_LIST_FIRST(&workers->zombies);
+        H2_WORKER_REMOVE(zombie);
+        ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s,
+                      "h2_workers: cleanup zombie %d", zombie->id);
+        h2_worker_destroy(zombie);
+    }
+    if (lock) {
+        apr_thread_mutex_unlock(workers->lock);
+    }
+}
+
 
 /**
  * Get the next task for the given worker. Will block until a task arrives
@@ -123,23 +139,12 @@ static apr_status_t get_mplx_next(h2_worker *worker, h2_mplx **pm,
             if (!task) {
                 /* Need to wait for either a new mplx to arrive.
                  */
+                cleanup_zombies(workers, 0);
+                
                 if (workers->worker_count > workers->min_size) {
                     apr_time_t now = apr_time_now();
                     if (now >= (start_wait + max_wait)) {
                         /* waited long enough without getting a task. */
-                        status = APR_TIMEUP;
-                    }
-                    else {
-                        ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s,
-                                     "h2_worker(%d): waiting signal, "
-                                     "worker_count=%d", worker->id, 
-                                     (int)workers->worker_count);
-                        status = apr_thread_cond_timedwait(workers->mplx_added,
-                                                           workers->lock, max_wait);
-                    }
-                    
-                    if (status == APR_TIMEUP) {
-                        /* waited long enough */
                         if (workers->worker_count > workers->min_size) {
                             ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, 
                                          workers->s,
@@ -148,6 +153,12 @@ static apr_status_t get_mplx_next(h2_worker *worker, h2_mplx **pm,
                             break;
                         }
                     }
+                    ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s,
+                                 "h2_worker(%d): waiting signal, "
+                                 "worker_count=%d", worker->id, 
+                                 (int)workers->worker_count);
+                    apr_thread_cond_timedwait(workers->mplx_added,
+                                              workers->lock, max_wait);
                 }
                 else {
                     ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s,
@@ -194,11 +205,11 @@ static void worker_done(h2_worker *worker, void *ctx)
     h2_workers *workers = (h2_workers *)ctx;
     apr_status_t status = apr_thread_mutex_lock(workers->lock);
     if (status == APR_SUCCESS) {
-        ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s,
+        ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s,
                      "h2_worker(%d): done", h2_worker_get_id(worker));
         H2_WORKER_REMOVE(worker);
         --workers->worker_count;
-        h2_worker_destroy(worker);
+        H2_WORKER_LIST_INSERT_TAIL(&workers->zombies, worker);
         
         apr_thread_mutex_unlock(workers->lock);
     }
@@ -213,7 +224,7 @@ static apr_status_t add_worker(h2_workers *workers)
     if (!w) {
         return APR_ENOMEM;
     }
-    ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s,
+    ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s,
                  "h2_workers: adding worker(%d)", h2_worker_get_id(w));
     ++workers->worker_count;
     H2_WORKER_LIST_INSERT_TAIL(&workers->workers, w);
@@ -235,15 +246,22 @@ static apr_status_t h2_workers_start(h2_workers *workers) {
     return status;
 }
 
-h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pool,
+h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool,
                               int min_size, int max_size)
 {
     apr_status_t status;
     h2_workers *workers;
+    apr_pool_t *pool;
+
     AP_DEBUG_ASSERT(s);
-    AP_DEBUG_ASSERT(pool);
-    status = APR_SUCCESS;
+    AP_DEBUG_ASSERT(server_pool);
 
+    /* let's have our own pool that will be parent to all h2_worker
+     * instances we create. This happens in various threads, but always
+     * guarded by our lock. Without this pool, all subpool creations would
+     * happen on the pool handed to us, which we do not guard.
+     */
+    apr_pool_create(&pool, server_pool);
     workers = apr_pcalloc(pool, sizeof(h2_workers));
     if (workers) {
         workers->s = s;
@@ -255,6 +273,7 @@ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pool,
         apr_threadattr_create(&workers->thread_attr, workers->pool);
         
         APR_RING_INIT(&workers->workers, h2_worker, link);
+        APR_RING_INIT(&workers->zombies, h2_worker, link);
         APR_RING_INIT(&workers->mplxs, h2_mplx, link);
         
         status = apr_thread_mutex_create(&workers->lock,
@@ -278,6 +297,9 @@ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pool,
 
 void h2_workers_destroy(h2_workers *workers)
 {
+    /* before we go, cleanup any zombie workers that may have accumulated */
+    cleanup_zombies(workers, 1);
+    
     if (workers->mplx_added) {
         apr_thread_cond_destroy(workers->mplx_added);
         workers->mplx_added = NULL;
@@ -294,6 +316,10 @@ void h2_workers_destroy(h2_workers *workers)
         h2_worker *w = H2_WORKER_LIST_FIRST(&workers->workers);
         H2_WORKER_REMOVE(w);
     }
+    if (workers->pool) {
+        apr_pool_destroy(workers->pool);
+        /* workers is gone */
+    }
 }
 
 apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m)
@@ -320,6 +346,9 @@ apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m)
             add_worker(workers);
         }
         
+        /* cleanup any zombie workers that may have accumulated */
+        cleanup_zombies(workers, 0);
+        
         apr_thread_mutex_unlock(workers->lock);
     }
     return status;
@@ -334,6 +363,9 @@ apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m)
             H2_MPLX_REMOVE(m);
             status = APR_SUCCESS;
         }
+        /* cleanup any zombie workers that may have accumulated */
+        cleanup_zombies(workers, 0);
+        
         apr_thread_mutex_unlock(workers->lock);
     }
     return status;
index 50fd6b8ad58db58771cf21f950a48890aaecfabc..99aa1f4daf73a7c837cfcc5b90be914151b13291 100644 (file)
@@ -42,6 +42,7 @@ struct h2_workers {
     apr_threadattr_t *thread_attr;
     
     APR_RING_HEAD(h2_worker_list, h2_worker) workers;
+    APR_RING_HEAD(h2_worker_zombies, h2_worker) zombies;
     APR_RING_HEAD(h2_mplx_list, h2_mplx) mplxs;
     
     int worker_count;