]> git.ipfire.org Git - thirdparty/apache/httpd.git/commitdiff
mod_http2: removing beam mutex when task worker done
authorStefan Eissing <icing@apache.org>
Wed, 27 Apr 2016 14:01:12 +0000 (14:01 +0000)
committerStefan Eissing <icing@apache.org>
Wed, 27 Apr 2016 14:01:12 +0000 (14:01 +0000)
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1741268 13f79535-47bb-0310-9956-ffa450edef68

CHANGES
modules/http2/h2_bucket_beam.c
modules/http2/h2_bucket_beam.h
modules/http2/h2_h2.c
modules/http2/h2_mplx.c
modules/http2/h2_mplx.h
modules/http2/h2_version.h

diff --git a/CHANGES b/CHANGES
index df02a284c8dd62df90d7be749787bb28c334fc0b..0e320b2fd511fc56e42477be61cfa30843637169 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,10 @@
                                                          -*- coding: utf-8 -*-
 Changes with Apache 2.5.0
 
+  *) mod_http2: bucket beams now have safe mutex remove. Used for streams where
+     the task worker has finished and all processing happens in the same
+     thread again. [Stefan Eissing]
+     
   *) mod_proxy, mod_ssl: Handle SSLProxy* directives in <Proxy> sections,
      allowing per backend TLS configuration.  [Yann Ylavic]
 
index 6ca39e1de3ab4e43bb94b87a29a9414bb40d16a6..e630f84ecb3dd2fa6b36ca4a4747b5b43fdf54a7 100644 (file)
@@ -199,22 +199,20 @@ apr_size_t h2_util_bl_print(char *buffer, apr_size_t bmax,
  * bucket beam that can transport buckets across threads
  ******************************************************************************/
 
-static apr_status_t enter_yellow(h2_bucket_beam *beam, 
-                                 apr_thread_mutex_t **plock, int *pacquired)
+static apr_status_t enter_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
 {
     if (beam->m_enter) {
-        return beam->m_enter(beam->m_ctx, plock, pacquired);
+        return beam->m_enter(beam->m_ctx, pbl);
     }
-    *plock = NULL;
-    *pacquired = 0;
+    pbl->mutex = NULL;
+    pbl->leave = NULL;
     return APR_SUCCESS;
 }
 
-static void leave_yellow(h2_bucket_beam *beam, 
-                         apr_thread_mutex_t *lock, int acquired)
+static void leave_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl)
 {
-    if (acquired && beam->m_leave) {
-        beam->m_leave(beam->m_ctx, lock, acquired);
+    if (pbl->leave) {
+        pbl->leave(pbl->leave_ctx, pbl->mutex);
     }
 }
 
@@ -269,12 +267,12 @@ static apr_status_t wait_cond(h2_bucket_beam *beam, apr_thread_mutex_t *lock)
 }
 
 static apr_status_t r_wait_space(h2_bucket_beam *beam, apr_read_type_e block,
-                                 apr_thread_mutex_t *lock, apr_off_t *premain) 
+                                 h2_beam_lock *pbl, apr_off_t *premain) 
 {
     *premain = calc_space_left(beam);
     while (!beam->aborted && *premain <= 0 
-           && (block == APR_BLOCK_READ) && lock) {
-        apr_status_t status = wait_cond(beam, lock);
+           && (block == APR_BLOCK_READ) && pbl->mutex) {
+        apr_status_t status = wait_cond(beam, pbl->mutex);
         if (APR_STATUS_IS_TIMEUP(status)) {
             return status;
         }
@@ -292,10 +290,9 @@ static void h2_beam_prep_purge(h2_bucket_beam *beam, apr_bucket *bred)
 
 static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
 {
-    apr_thread_mutex_t *lock;
-    int acquired;
+    h2_beam_lock bl;
 
-    if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
         /* even when beam buckets are split, only the one where
          * refcount drops to 0 will call us */
         H2_BPROXY_REMOVE(proxy);
@@ -307,13 +304,13 @@ static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy)
             proxy->bred = NULL;
         }
         /* notify anyone waiting on space to become available */
-        if (!lock) {
+        if (!bl.mutex) {
             r_purge_reds(beam);
         }
         else if (beam->m_cond) {
             apr_thread_cond_broadcast(beam->m_cond);
         }
-        leave_yellow(beam, lock, acquired);
+        leave_yellow(beam, &bl);
     }
 }
 
@@ -391,18 +388,18 @@ static apr_status_t beam_cleanup(void *data)
 
 apr_status_t h2_beam_destroy(h2_bucket_beam *beam)
 {
-    apr_pool_cleanup_kill(beam->life_pool, beam, beam_cleanup);
+    apr_pool_cleanup_kill(beam->red_pool, beam, beam_cleanup);
     return beam_cleanup(beam);
 }
 
-apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *life_pool, 
+apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *red_pool, 
                             int id, const char *tag, 
                             apr_size_t max_buf_size)
 {
     h2_bucket_beam *beam;
     apr_status_t status = APR_SUCCESS;
     
-    beam = apr_pcalloc(life_pool, sizeof(*beam));
+    beam = apr_pcalloc(red_pool, sizeof(*beam));
     if (!beam) {
         return APR_ENOMEM;
     }
@@ -413,10 +410,10 @@ apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *life_pool,
     H2_BLIST_INIT(&beam->hold);
     H2_BLIST_INIT(&beam->purge);
     H2_BPROXY_LIST_INIT(&beam->proxies);
-    beam->life_pool = life_pool;
+    beam->red_pool = red_pool;
     beam->max_buf_size = max_buf_size;
 
-    apr_pool_pre_cleanup_register(life_pool, beam, beam_cleanup);
+    apr_pool_pre_cleanup_register(red_pool, beam, beam_cleanup);
     *pbeam = beam;
     
     return status;
@@ -424,83 +421,68 @@ apr_status_t h2_beam_create(h2_bucket_beam **pbeam, apr_pool_t *life_pool,
 
 void h2_beam_buffer_size_set(h2_bucket_beam *beam, apr_size_t buffer_size)
 {
-    apr_thread_mutex_t *lock;
-    int acquired;
+    h2_beam_lock bl;
     
-    if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
         beam->max_buf_size = buffer_size;
-        leave_yellow(beam, lock, acquired);
+        leave_yellow(beam, &bl);
     }
 }
 
 apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam)
 {
-    apr_thread_mutex_t *lock;
-    int acquired;
+    h2_beam_lock bl;
     apr_size_t buffer_size = 0;
     
-    if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
         buffer_size = beam->max_buf_size;
-        leave_yellow(beam, lock, acquired);
+        leave_yellow(beam, &bl);
     }
     return buffer_size;
 }
 
 void h2_beam_mutex_set(h2_bucket_beam *beam, 
                        h2_beam_mutex_enter m_enter,
-                       h2_beam_mutex_leave m_leave,
                        apr_thread_cond_t *cond,
                        void *m_ctx)
 {
-    apr_thread_mutex_t *lock;
-    h2_beam_mutex_leave *prev_leave;
-    void *prev_ctx;
-    int acquired;
+    h2_beam_lock bl;
     
-    if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
-        prev_ctx = beam->m_ctx;
-        prev_leave = beam->m_leave;
+    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
         beam->m_enter = m_enter;
-        beam->m_leave = m_leave;
         beam->m_ctx   = m_ctx;
         beam->m_cond  = cond;
-        if (acquired && prev_leave) {
-            /* special tactics when NULLing a lock */
-            prev_leave(prev_ctx, lock, acquired);
-        }
+        leave_yellow(beam, &bl);
     }
 }
 
 void h2_beam_timeout_set(h2_bucket_beam *beam, apr_interval_time_t timeout)
 {
-    apr_thread_mutex_t *lock;
-    int acquired;
+    h2_beam_lock bl;
     
-    if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
         beam->timeout = timeout;
-        leave_yellow(beam, lock, acquired);
+        leave_yellow(beam, &bl);
     }
 }
 
 apr_interval_time_t h2_beam_timeout_get(h2_bucket_beam *beam)
 {
-    apr_thread_mutex_t *lock;
-    int acquired;
+    h2_beam_lock bl;
     apr_interval_time_t timeout = 0;
     
-    if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
         timeout = beam->timeout;
-        leave_yellow(beam, lock, acquired);
+        leave_yellow(beam, &bl);
     }
     return timeout;
 }
 
 void h2_beam_abort(h2_bucket_beam *beam)
 {
-    apr_thread_mutex_t *lock;
-    int acquired;
+    h2_beam_lock bl;
     
-    if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
         r_purge_reds(beam);
         h2_blist_cleanup(&beam->red);
         beam->aborted = 1;
@@ -508,32 +490,30 @@ void h2_beam_abort(h2_bucket_beam *beam)
         if (beam->m_cond) {
             apr_thread_cond_broadcast(beam->m_cond);
         }
-        leave_yellow(beam, lock, acquired);
+        leave_yellow(beam, &bl);
     }
 }
 
 apr_status_t h2_beam_close(h2_bucket_beam *beam)
 {
-    apr_thread_mutex_t *lock;
-    int acquired;
+    h2_beam_lock bl;
     
-    if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
         r_purge_reds(beam);
         beam_close(beam);
         report_consumption(beam);
-        leave_yellow(beam, lock, acquired);
+        leave_yellow(beam, &bl);
     }
     return beam->aborted? APR_ECONNABORTED : APR_SUCCESS;
 }
 
 void h2_beam_shutdown(h2_bucket_beam *beam)
 {
-    apr_thread_mutex_t *lock;
-    int acquired;
+    h2_beam_lock bl;
     
-    if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
         beam_shutdown(beam, 1);
-        leave_yellow(beam, lock, acquired);
+        leave_yellow(beam, &bl);
     }
 }
 
@@ -541,7 +521,7 @@ static apr_status_t append_bucket(h2_bucket_beam *beam,
                                   apr_bucket *bred,
                                   apr_read_type_e block,
                                   apr_pool_t *pool,
-                                  apr_thread_mutex_t *lock)
+                                  h2_beam_lock *pbl)
 {
     const char *data;
     apr_size_t len;
@@ -570,7 +550,7 @@ static apr_status_t append_bucket(h2_bucket_beam *beam,
         }
         
         if (space_left < bred->length) {
-            status = r_wait_space(beam, block, lock, &space_left);
+            status = r_wait_space(beam, block, pbl, &space_left);
             if (status != APR_SUCCESS) {
                 return status;
             }
@@ -596,23 +576,33 @@ static apr_status_t append_bucket(h2_bucket_beam *beam,
          * affected by this. */
         status = apr_bucket_setaside(bred, pool);
     }
-    else if (APR_BUCKET_IS_HEAP(bred) || APR_BUCKET_IS_POOL(bred)) {
-        /* For heap/pool buckets read from a green thread is fine. The
+    else if (APR_BUCKET_IS_HEAP(bred)) {
+        /* For heap buckets read from a green thread is fine. The
          * data will be there and live until the bucket itself is
          * destroyed. */
         status = APR_SUCCESS;
     }
+    else if (APR_BUCKET_IS_POOL(bred)) {
+        /* pool buckets are bastards that register at pool cleanup
+         * to morph themselves into heap buckets. That may happen anytime,
+         * even after the bucket data pointer has been read. So at
+         * any time inside the green thread, the pool bucket memory
+         * may disappear. yikes. */
+        status = apr_bucket_read(bred, &data, &len, APR_BLOCK_READ);
+        if (status == APR_SUCCESS) {
+            apr_bucket_heap_make(bred, data, len, NULL);
+        }
+    }
     else if (APR_BUCKET_IS_FILE(bred)) {
         /* For file buckets the problem is their internal readpool that
          * is used on the first read to allocate buffer/mmap.
          * Since setting aside a file bucket will de-register the
          * file cleanup function from the previous pool, we need to
-         * call that from a red thread. Do it now and make our
-         * yellow pool the owner. 
+         * call that from a red thread. 
          * Additionally, we allow callbacks to prevent beaming file
          * handles across. The use case for this is to limit the number 
          * of open file handles and rather use a less efficient beam
-         * transport. */ 
+         * transport. */
         apr_file_t *fd = ((apr_bucket_file *)bred->data)->fd;
         int can_beam = 1;
         if (beam->last_beamed != fd && beam->can_beam_fn) {
@@ -622,13 +612,16 @@ static apr_status_t append_bucket(h2_bucket_beam *beam,
             beam->last_beamed = fd;
             status = apr_bucket_setaside(bred, pool);
         }
+        /* else: enter ENOTIMPL case below */
     }
     
     if (status == APR_ENOTIMPL) {
         /* we have no knowledge about the internals of this bucket,
-         * but on read, it needs to make the data available somehow.
-         * So we do this while still in a red thread. The data will
-         * live at least os long as the red bucket itself. */
+         * but hope that after read, its data stays immutable for the
+         * lifetime of the bucket. (see pool bucket handling above for
+         * a counter example).
+         * We do the read while in a red thread, so that the bucket may
+         * use pools/allocators safely. */
         if (space_left < APR_BUCKET_BUFF_SIZE) {
             space_left = APR_BUCKET_BUFF_SIZE;
         }
@@ -656,13 +649,12 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam,
                           apr_bucket_brigade *red_brigade, 
                           apr_read_type_e block)
 {
-    apr_thread_mutex_t *lock;
     apr_bucket *bred;
     apr_status_t status = APR_SUCCESS;
-    int acquired;
+    h2_beam_lock bl;
 
     /* Called from the red thread to add buckets to the beam */
-    if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
         r_purge_reds(beam);
         
         if (beam->aborted) {
@@ -672,14 +664,14 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam,
             while (!APR_BRIGADE_EMPTY(red_brigade)
                    && status == APR_SUCCESS) {
                 bred = APR_BRIGADE_FIRST(red_brigade);
-                status = append_bucket(beam, bred, block, red_brigade->p, lock);
+                status = append_bucket(beam, bred, block, beam->red_pool, &bl);
             }
             if (beam->m_cond) {
                 apr_thread_cond_broadcast(beam->m_cond);
             }
         }
         report_consumption(beam);
-        leave_yellow(beam, lock, acquired);
+        leave_yellow(beam, &bl);
     }
     return status;
 }
@@ -689,14 +681,14 @@ apr_status_t h2_beam_receive(h2_bucket_beam *beam,
                              apr_read_type_e block,
                              apr_off_t readbytes)
 {
-    apr_thread_mutex_t *lock;
+    h2_beam_lock bl;
     apr_bucket *bred, *bgreen, *ng;
-    int acquired, transferred = 0;
+    int transferred = 0;
     apr_status_t status = APR_SUCCESS;
     apr_off_t remain = readbytes;
     
     /* Called from the green thread to take buckets from the beam */
-    if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
 transfer:
         if (beam->aborted) {
             status = APR_ECONNABORTED;
@@ -817,8 +809,8 @@ transfer:
                 status = APR_EOF;
             }
         }
-        else if (block == APR_BLOCK_READ && lock && beam->m_cond) {
-            status = wait_cond(beam, lock);
+        else if (block == APR_BLOCK_READ && bl.mutex && beam->m_cond) {
+            status = wait_cond(beam, bl.mutex);
             if (status != APR_SUCCESS) {
                 goto leave;
             }
@@ -828,7 +820,7 @@ transfer:
             status = APR_EAGAIN;
         }
 leave:        
-        leave_yellow(beam, lock, acquired);
+        leave_yellow(beam, &bl);
     }
     return status;
 }
@@ -836,57 +828,53 @@ leave:
 void h2_beam_on_consumed(h2_bucket_beam *beam, 
                          h2_beam_consumed_callback *cb, void *ctx)
 {
-    apr_thread_mutex_t *lock;
-    int acquired;
+    h2_beam_lock bl;
     
-    if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
         beam->consumed_fn = cb;
         beam->consumed_ctx = ctx;
-        leave_yellow(beam, lock, acquired);
+        leave_yellow(beam, &bl);
     }
 }
 
 void h2_beam_on_file_beam(h2_bucket_beam *beam, 
                           h2_beam_can_beam_callback *cb, void *ctx)
 {
-    apr_thread_mutex_t *lock;
-    int acquired;
+    h2_beam_lock bl;
     
-    if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
         beam->can_beam_fn = cb;
         beam->can_beam_ctx = ctx;
-        leave_yellow(beam, lock, acquired);
+        leave_yellow(beam, &bl);
     }
 }
 
 
 apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam)
 {
-    apr_thread_mutex_t *lock;
     apr_bucket *b;
     apr_off_t l = 0;
-    int acquired;
+    h2_beam_lock bl;
     
-    if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
         for (b = H2_BLIST_FIRST(&beam->red); 
             b != H2_BLIST_SENTINEL(&beam->red);
             b = APR_BUCKET_NEXT(b)) {
             /* should all have determinate length */
             l += b->length;
         }
-        leave_yellow(beam, lock, acquired);
+        leave_yellow(beam, &bl);
     }
     return l;
 }
 
 apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam)
 {
-    apr_thread_mutex_t *lock;
     apr_bucket *b;
     apr_off_t l = 0;
-    int acquired;
+    h2_beam_lock bl;
     
-    if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
         for (b = H2_BLIST_FIRST(&beam->red); 
             b != H2_BLIST_SENTINEL(&beam->red);
             b = APR_BUCKET_NEXT(b)) {
@@ -898,21 +886,20 @@ apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam)
                 l += b->length;
             }
         }
-        leave_yellow(beam, lock, acquired);
+        leave_yellow(beam, &bl);
     }
     return l;
 }
 
 int h2_beam_empty(h2_bucket_beam *beam)
 {
-    apr_thread_mutex_t *lock;
     int empty = 1;
-    int acquired;
+    h2_beam_lock bl;
     
-    if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
         empty = (H2_BLIST_EMPTY(&beam->red) 
                  && (!beam->green || APR_BRIGADE_EMPTY(beam->green)));
-        leave_yellow(beam, lock, acquired);
+        leave_yellow(beam, &bl);
     }
     return empty;
 }
@@ -924,26 +911,24 @@ int h2_beam_closed(h2_bucket_beam *beam)
 
 int h2_beam_was_received(h2_bucket_beam *beam)
 {
-    apr_thread_mutex_t *lock;
     int happend = 0;
-    int acquired;
+    h2_beam_lock bl;
     
-    if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
         happend = (beam->received_bytes > 0);
-        leave_yellow(beam, lock, acquired);
+        leave_yellow(beam, &bl);
     }
     return happend;
 }
 
 apr_size_t h2_beam_get_files_beamed(h2_bucket_beam *beam)
 {
-    apr_thread_mutex_t *lock;
     apr_size_t n = 0;
-    int acquired;
+    h2_beam_lock bl;
     
-    if (enter_yellow(beam, &lock, &acquired) == APR_SUCCESS) {
+    if (enter_yellow(beam, &bl) == APR_SUCCESS) {
         n = beam->files_beamed;
-        leave_yellow(beam, lock, acquired);
+        leave_yellow(beam, &bl);
     }
     return n;
 }
index f94b1b9ecac45ff2f51c74acd05ffbc7e7e1133a..a8abc908a1e37d8746a9eec82f75c8222431b155 100644 (file)
@@ -139,14 +139,19 @@ apr_size_t h2_util_bl_print(char *buffer, apr_size_t bmax,
  * technology where humans are kept inside the transporter's memory
  * buffers until the transmission is complete. Star gates use a similar trick.
  */
+
+typedef void h2_beam_mutex_leave(void *ctx,  struct apr_thread_mutex_t *lock);
+
+typedef struct {
+    apr_thread_mutex_t *mutex;
+    h2_beam_mutex_leave *leave;
+    void *leave_ctx;
+} h2_beam_lock;
+
 typedef struct h2_bucket_beam h2_bucket_beam;
 
-typedef apr_status_t h2_beam_mutex_enter(void *ctx, 
-                                         struct apr_thread_mutex_t **plock, 
-                                         int *acquired);
-typedef void h2_beam_mutex_leave(void *ctx, 
-                                 struct apr_thread_mutex_t *lock, 
-                                 int acquired);
+typedef apr_status_t h2_beam_mutex_enter(void *ctx, h2_beam_lock *pbl);
+
 typedef void h2_beam_consumed_callback(void *ctx, h2_bucket_beam *beam,
                                        apr_off_t bytes);
 
@@ -166,7 +171,7 @@ struct h2_bucket_beam {
     h2_blist purge;
     apr_bucket_brigade *green;
     h2_bproxy_list proxies;
-    apr_pool_t *life_pool;
+    apr_pool_t *red_pool;
     
     apr_size_t max_buf_size;
     apr_size_t files_beamed;  /* how many file handles have been set aside */
@@ -181,7 +186,6 @@ struct h2_bucket_beam {
 
     void *m_ctx;
     h2_beam_mutex_enter *m_enter;
-    h2_beam_mutex_leave *m_leave;
     struct apr_thread_cond_t *m_cond;
     apr_interval_time_t timeout;
     
@@ -199,15 +203,22 @@ struct h2_bucket_beam {
  * that is only used inside that same mutex.
  *
  * @param pbeam will hold the created beam on return
- * @param life_pool     pool for allocating initial structure and cleanups
+ * @param red_pool      pool usable on red side, beam lifeline
  * @param buffer_size   maximum memory footprint of buckets buffered in beam, or
  *                      0 for no limitation
+ *
+ * Call from the red side only.
  */
 apr_status_t h2_beam_create(h2_bucket_beam **pbeam,
-                            apr_pool_t *life_pool, 
+                            apr_pool_t *red_pool, 
                             int id, const char *tag, 
                             apr_size_t buffer_size);
 
+/**
+ * Destroys the beam immediately without cleanup.
+ *
+ * Call from the red side only.
+ */ 
 apr_status_t h2_beam_destroy(h2_bucket_beam *beam);
 
 /**
@@ -215,6 +226,8 @@ apr_status_t h2_beam_destroy(h2_bucket_beam *beam);
  * internally as long as they have not been processed by the receiving side.
  * All accepted buckets are removed from the given brigade. Will return with
  * APR_EAGAIN on non-blocking sends when not all buckets could be accepted.
+ * 
+ * Call from the red side only.
  */
 apr_status_t h2_beam_send(h2_bucket_beam *beam,  
                           apr_bucket_brigade *red_buckets, 
@@ -225,28 +238,52 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam,
  * when reading past an EOS bucket. Reads can be blocking until data is 
  * available or the beam has been closed. Non-blocking calls return APR_EAGAIN
  * if no data is available.
+ *
+ * Call from the green side only.
  */
 apr_status_t h2_beam_receive(h2_bucket_beam *beam, 
                              apr_bucket_brigade *green_buckets, 
                              apr_read_type_e block,
                              apr_off_t readbytes);
 
+/**
+ * Determine if beam is closed. May still contain buffered data. 
+ * 
+ * Call from red or green side.
+ */
+int h2_beam_closed(h2_bucket_beam *beam);
+
+/**
+ * Determine if beam is empty. 
+ * 
+ * Call from red or green side.
+ */
+int h2_beam_empty(h2_bucket_beam *beam);
+
+/**
+ * Abort the beam. Will cleanup any buffered buckets and answer all send
+ * and receives with APR_ECONNABORTED.
+ * 
+ * Call from the red side only.
+ */
 void h2_beam_abort(h2_bucket_beam *beam);
 
 /**
- * Close the beam. Does not need to be invoked if certain that an EOS bucket
- * has been sent. 
+ * Close the beam. Sending an EOS bucket serves the same purpose. 
+ * 
+ * Call from the red side only.
  */
 apr_status_t h2_beam_close(h2_bucket_beam *beam);
 
 /**
  * Empty the buffer and close.
+ * 
+ * Call from the red side only.
  */
 void h2_beam_shutdown(h2_bucket_beam *beam);
 
 void h2_beam_mutex_set(h2_bucket_beam *beam, 
                        h2_beam_mutex_enter m_enter,
-                       h2_beam_mutex_leave m_leave,
                        struct apr_thread_cond_t *cond,
                        void *m_ctx);
 
@@ -272,6 +309,8 @@ apr_size_t h2_beam_buffer_size_get(h2_bucket_beam *beam);
  * @param beam the beam to set the callback on
  * @param cb   the callback or NULL
  * @param ctx  the context to use in callback invocation
+ * 
+ * Call from the red side, callbacks invoked on red side.
  */
 void h2_beam_on_consumed(h2_bucket_beam *beam, 
                          h2_beam_consumed_callback *cb, void *ctx);
@@ -289,9 +328,6 @@ apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam);
  */
 apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam);
 
-int h2_beam_closed(h2_bucket_beam *beam);
-int h2_beam_empty(h2_bucket_beam *beam);
-
 /**
  * Return != 0 iff (some) data from the beam has been received.
  */
index 7c8bf3dd100dbf9575089fa2cf714b2b9ce8b4c3..bc9e261b5260f25657a46ca63e45c19ecba4485c 100644 (file)
@@ -678,7 +678,6 @@ static int h2_h2_post_read_req(request_rec *r)
         struct h2_task *task = h2_ctx_get_task(ctx);
         /* This hook will get called twice on internal redirects. Take care
          * that we manipulate filters only once. */
-        /* our slave connection? */
         if (task && !task->filters_set) {
             ap_filter_t *f;
             
index 3f0398a051dadcbf5d56fc9c49de4a310ce6d189..6a886ac59dd98ca00647ee57281933130163087c 100644 (file)
@@ -112,18 +112,24 @@ static void leave_mutex(h2_mplx *m, int acquired)
     }
 }
 
-static apr_status_t io_mutex_enter(void *ctx, 
-                                   apr_thread_mutex_t **plock, int *acquired)
+static void beam_leave(void *ctx, apr_thread_mutex_t *lock)
 {
-    h2_mplx *m = ctx;
-    *plock = m->lock;
-    return enter_mutex(m, acquired);
+    leave_mutex(ctx, 1);
 }
 
-static void io_mutex_leave(void *ctx, apr_thread_mutex_t *lock, int acquired)
+static apr_status_t beam_enter(void *ctx, h2_beam_lock *pbl)
 {
     h2_mplx *m = ctx;
-    leave_mutex(m, acquired);
+    int acquired;
+    apr_status_t status;
+    
+    status = enter_mutex(m, &acquired);
+    if (status == APR_SUCCESS) {
+        pbl->mutex = m->lock;
+        pbl->leave = acquired? beam_leave : NULL;
+        pbl->leave_ctx = m;
+    }
+    return status;
 }
 
 static void stream_output_consumed(void *ctx, 
@@ -366,7 +372,6 @@ static int task_stream_done(h2_mplx *m, h2_task *task, int rst_error)
         /* cleanup once task is done */
         task->orphaned = 1;
         if (task->input.beam) {
-            /* TODO: this is currently allocated by the stream and will disappear */
             h2_beam_shutdown(task->input.beam);
             task->input.beam = NULL;
         }
@@ -638,8 +643,7 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response)
         h2_beam_on_consumed(task->output.beam, stream_output_consumed, task);
         m->tx_handles_reserved -= h2_beam_get_files_beamed(task->output.beam);
         h2_beam_on_file_beam(task->output.beam, can_beam_file, m);
-        h2_beam_mutex_set(task->output.beam, io_mutex_enter, io_mutex_leave, 
-                          task->cond, m);
+        h2_beam_mutex_set(task->output.beam, beam_enter, task->cond, m);
     }
     
     h2_ihash_add(m->ready_tasks, task);
@@ -670,42 +674,34 @@ apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response)
     return status;
 }
 
-apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id)
+static apr_status_t out_close(h2_mplx *m, h2_task *task)
 {
-    apr_status_t status;
-    int acquired;
+    apr_status_t status = APR_SUCCESS;
     
-    AP_DEBUG_ASSERT(m);
-    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        h2_task *task = h2_ihash_get(m->tasks, stream_id);
-        if (task && !task->orphaned) {
-            if (!task->response && !task->rst_error) {
-                /* In case a close comes before a response was created,
-                 * insert an error one so that our streams can properly
-                 * reset.
-                 */
-                h2_response *r = h2_response_die(stream_id, APR_EGENERAL, 
-                                                 task->request, m->pool);
-                status = out_open(m, stream_id, r);
-                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c,
-                              "h2_mplx(%ld-%d): close, no response, no rst", 
-                              m->id, stream_id);
-            }
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
-                          "h2_mplx(%ld-%d): close", m->id, stream_id);
-            if (task->output.beam) {
-                status = h2_beam_close(task->output.beam);
-                h2_beam_log(task->output.beam, stream_id, "out_close", m->c, 
-                            APLOG_TRACE2);
-            }
-            output_consumed_signal(m, task);
-            have_out_data_for(m, stream_id);
-        }
-        else {
-            status = APR_ECONNABORTED;
-        }
-        leave_mutex(m, acquired);
+    if (!task || task->orphaned) {
+        return APR_ECONNABORTED;
+    }
+    
+    if (!task->response && !task->rst_error) {
+        /* In case a close comes before a response was created,
+         * insert an error one so that our streams can properly
+         * reset.
+         */
+        h2_response *r = h2_response_die(task->stream_id, APR_EGENERAL, 
+                                         task->request, m->pool);
+        status = out_open(m, task->stream_id, r);
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, m->c,
+                      "h2_mplx(%s): close, no response, no rst", task->id);
+    }
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c,
+                  "h2_mplx(%s): close", task->id);
+    if (task->output.beam) {
+        status = h2_beam_close(task->output.beam);
+        h2_beam_log(task->output.beam, task->stream_id, "out_close", m->c, 
+                    APLOG_TRACE2);
     }
+    output_consumed_signal(m, task);
+    have_out_data_for(m, task->stream_id);
     return status;
 }
 
@@ -832,17 +828,17 @@ static h2_task *pop_task(h2_mplx *m)
             
             task->worker_started = 1;
             task->started_at = apr_time_now();
-            
-            if (task->input.beam) {
-                h2_beam_timeout_set(task->input.beam, m->stream_timeout);
-                h2_beam_on_consumed(task->input.beam, stream_input_consumed, m);
-                h2_beam_on_file_beam(task->input.beam, can_beam_file, m);
-                h2_beam_mutex_set(task->input.beam, io_mutex_enter, 
-                                  io_mutex_leave, task->cond, m);
-            }
             if (sid > m->max_stream_started) {
                 m->max_stream_started = sid;
             }
+
+            if (stream->input) {
+                h2_beam_timeout_set(stream->input, m->stream_timeout);
+                h2_beam_on_consumed(stream->input, stream_input_consumed, m);
+                h2_beam_on_file_beam(stream->input, can_beam_file, m);
+                h2_beam_mutex_set(stream->input, beam_enter, task->cond, m);
+            }
+
             ++m->workers_busy;
         }
     }
@@ -886,18 +882,12 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
             /* FIXME: this implementation is incomplete. */
             h2_task_set_io_blocking(task, 0);
             apr_thread_cond_broadcast(m->task_thawed);
+            return;
         }
         else {
-            apr_time_t now = apr_time_now();
-            
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                           "h2_mplx(%ld): task(%s) done", m->id, task->id);
-            /* clean our references and report request as done. Signal
-             * that we want another unless we have been aborted */
-            /* TODO: this will keep a worker attached to this h2_mplx as
-             * long as it has requests to handle. Might no be fair to
-             * other mplx's. Perhaps leave after n requests? */
-            h2_mplx_out_close(m, task->stream_id);
+            out_close(m, task);
             
             if (ngn) {
                 apr_off_t bytes = 0;
@@ -930,29 +920,33 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
                 h2_task_redo(task);
                 h2_ihash_remove(m->redo_tasks, task->stream_id);
                 h2_iq_add(m->q, task->stream_id, NULL, NULL);
+                return;
             }
-            else {
-                task->worker_done = 1;
-                task->done_at = now;
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                              "h2_mplx(%s): request done, %f ms"
-                              " elapsed", task->id, 
-                              (task->done_at - task->started_at) / 1000.0);
-                if (task->started_at > m->last_idle_block) {
-                    /* this task finished without causing an 'idle block', e.g.
-                     * a block by flow control.
-                     */
-                    if (now - m->last_limit_change >= m->limit_change_interval
-                        && m->workers_limit < m->workers_max) {
-                        /* Well behaving stream, allow it more workers */
-                        m->workers_limit = H2MIN(m->workers_limit * 2, 
-                                                 m->workers_max);
-                        m->last_limit_change = now;
-                        m->need_registration = 1;
-                        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                                      "h2_mplx(%ld): increase worker limit to %d",
-                                      m->id, m->workers_limit);
-                    }
+            
+            task->worker_done = 1;
+            task->done_at = apr_time_now();
+            if (task->output.beam) {
+                h2_beam_on_consumed(task->output.beam, NULL, NULL);
+                h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL);
+            }
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                          "h2_mplx(%s): request done, %f ms"
+                          " elapsed", task->id, 
+                          (task->done_at - task->started_at) / 1000.0);
+            if (task->started_at > m->last_idle_block) {
+                /* this task finished without causing an 'idle block', e.g.
+                 * a block by flow control.
+                 */
+                if (task->done_at- m->last_limit_change >= m->limit_change_interval
+                    && m->workers_limit < m->workers_max) {
+                    /* Well behaving stream, allow it more workers */
+                    m->workers_limit = H2MIN(m->workers_limit * 2, 
+                                             m->workers_max);
+                    m->last_limit_change = task->done_at;
+                    m->need_registration = 1;
+                    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                                  "h2_mplx(%ld): increase worker limit to %d",
+                                  m->id, m->workers_limit);
                 }
             }
             
index f6a83d62f10b29e5bfc0550be7b6602bd0876110..a6fe12a3efce6fd4b5d0731a77dbc709a1d763bc 100644 (file)
@@ -244,11 +244,6 @@ struct h2_stream *h2_mplx_next_submit(h2_mplx *m,
 apr_status_t h2_mplx_out_open(h2_mplx *mplx, int stream_id,
                               struct h2_response *response);
 
-/**
- * Closes the output for stream stream_id. 
- */
-apr_status_t h2_mplx_out_close(h2_mplx *m, int stream_id);
-
 /*******************************************************************************
  * h2_mplx list Manipulation.
  ******************************************************************************/
index ed74198a045811c863f14d125ed7dc711fc5e17e..8fe813a37528a5661796fa2aed38f8ceaa434a61 100644 (file)
@@ -26,7 +26,7 @@
  * @macro
  * Version number of the http2 module as c string
  */
-#define MOD_HTTP2_VERSION "1.5.1-DEV"
+#define MOD_HTTP2_VERSION "1.5.2-DEV"
 
 /**
  * @macro
@@ -34,7 +34,7 @@
  * release. This is a 24 bit number with 8 bits for major number, 8 bits
  * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
  */
-#define MOD_HTTP2_VERSION_NUM 0x010501
+#define MOD_HTTP2_VERSION_NUM 0x010502
 
 
 #endif /* mod_h2_h2_version_h */