]> git.ipfire.org Git - thirdparty/apache/httpd.git/commitdiff
using proxy http2 connection concurrently, if main connection also uses http/2, needs...
authorStefan Eissing <icing@apache.org>
Thu, 18 Feb 2016 17:02:02 +0000 (17:02 +0000)
committerStefan Eissing <icing@apache.org>
Thu, 18 Feb 2016 17:02:02 +0000 (17:02 +0000)
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1731096 13f79535-47bb-0310-9956-ffa450edef68

26 files changed:
CHANGES
CMakeLists.txt
modules/http2/NWGNUmod_http2
modules/http2/config2.m4
modules/http2/h2_ctx.c
modules/http2/h2_ctx.h
modules/http2/h2_int_queue.c [new file with mode: 0644]
modules/http2/h2_int_queue.h [new file with mode: 0644]
modules/http2/h2_io.c
modules/http2/h2_io.h
modules/http2/h2_mplx.c
modules/http2/h2_mplx.h
modules/http2/h2_proxy_session.c
modules/http2/h2_proxy_session.h
modules/http2/h2_session.c
modules/http2/h2_task.c
modules/http2/h2_task_input.c
modules/http2/h2_task_input.h
modules/http2/h2_util.c
modules/http2/h2_worker.c
modules/http2/h2_workers.c
modules/http2/h2_workers.h
modules/http2/mod_http2.c
modules/http2/mod_http2.dsp
modules/http2/mod_http2.h
modules/http2/mod_proxy_http2.c

diff --git a/CHANGES b/CHANGES
index c20b208b72c4238964be85c2dcef175a255e5d71..df83e57f0c78411176262034f97765e0a2765ce0 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,10 @@
                                                          -*- coding: utf-8 -*-
 Changes with Apache 2.5.0
 
+  *) mod_proxy_http2: using single connection for several requests *if*
+     master connection uses HTTP/2 itself. Not yet hardened under load.
+     [Stefan Eissing]
+
   *) core: Added support for HTTP code 451. PR58985.
      [Yehuda Katz <yehuda ymkatz.net>, Jim Jagielski]
 
@@ -22,6 +26,7 @@ Changes with Apache 2.5.0
 
   *) mod_proxy_http2: new experimental http2 proxy module for h2: and h2c: proxy
      urls. Uses, so far, one connection per request, reuses connections.
+     [Stefan Eissing]
   
   *) event: use pre_connection hook to properly initialize connection state for
      slave connections. use protocol_switch hook to initialize server config
index 3ea412f02d593e192bfc971581eda5fe71b4bacc..d9fc914a38b8eba98600827a193a0ad70c98e5ca 100644 (file)
@@ -409,7 +409,7 @@ SET(mod_http2_extra_sources
   modules/http2/h2_session.c         modules/http2/h2_stream.c 
   modules/http2/h2_stream_set.c      modules/http2/h2_switch.c
   modules/http2/h2_task.c            modules/http2/h2_task_input.c
-  modules/http2/h2_task_output.c     modules/http2/h2_task_queue.c
+  modules/http2/h2_task_output.c     modules/http2/h2_int_queue.c
   modules/http2/h2_util.c            modules/http2/h2_worker.c
   modules/http2/h2_workers.c
 )
index dd4ac10d2093fb73012e0acbaac3abd5bced43b0..0e53b4ae1fb160685819612d6d2ea7077e6c08f8 100644 (file)
@@ -194,6 +194,7 @@ FILES_nlm_objs = \
        $(OBJDIR)/h2_filter.o \
        $(OBJDIR)/h2_from_h1.o \
        $(OBJDIR)/h2_h2.o \
+       $(OBJDIR)/h2_int_queue.o \
        $(OBJDIR)/h2_io.o \
        $(OBJDIR)/h2_io_set.o \
        $(OBJDIR)/h2_mplx.o \
@@ -207,7 +208,6 @@ FILES_nlm_objs = \
        $(OBJDIR)/h2_task.o \
        $(OBJDIR)/h2_task_input.o \
        $(OBJDIR)/h2_task_output.o \
-       $(OBJDIR)/h2_task_queue.o \
        $(OBJDIR)/h2_util.o \
        $(OBJDIR)/h2_worker.o \
        $(OBJDIR)/h2_workers.o \
index 0c9871552f947d795ce026c60222f0351ba281b6..85a635e6b3459425d629890f9823de4c7e8f1b08 100644 (file)
@@ -29,6 +29,7 @@ h2_ctx.lo dnl
 h2_filter.lo dnl
 h2_from_h1.lo dnl
 h2_h2.lo dnl
+h2_int_queue.lo dnl
 h2_io.lo dnl
 h2_io_set.lo dnl
 h2_mplx.lo dnl
@@ -42,7 +43,6 @@ h2_switch.lo dnl
 h2_task.lo dnl
 h2_task_input.lo dnl
 h2_task_output.lo dnl
-h2_task_queue.lo dnl
 h2_util.lo dnl
 h2_worker.lo dnl
 h2_workers.lo dnl
@@ -156,8 +156,10 @@ AC_DEFUN([APACHE_CHECK_NGHTTP2],[
         AC_MSG_WARN([nghttp2 library is unusable])
       fi
 dnl # nghttp2 >= 1.3.0: access to stream weights
-      AC_CHECK_FUNCS([nghttp2_stream_get_weight], 
-        [APR_ADDTO(MOD_CPPFLAGS, ["-DH2_NG2_STREAM_API"])], [])
+      AC_CHECK_FUNCS([nghttp2_stream_get_weight], [], [liberrors="yes"])
+      if test "x$liberrors" != "x"; then
+        AC_MSG_WARN([nghttp2 version >= 1.3.0 is required])
+      fi
 dnl # nghttp2 >= 1.5.0: changing stream priorities
       AC_CHECK_FUNCS([nghttp2_session_change_stream_priority], 
         [APR_ADDTO(MOD_CPPFLAGS, ["-DH2_NG2_CHANGE_PRIO"])], [])
@@ -206,6 +208,7 @@ APR_ADDTO(INCLUDES, [-I\$(top_srcdir)/$modpath_current])
 dnl #  list of module object files
 proxy_http2_objs="dnl
 mod_proxy_http2.lo dnl
+h2_int_queue.lo dnl
 h2_proxy_session.lo dnl
 h2_request.lo dnl
 h2_util.lo dnl
index b40037cb35b270d985bebfe0de4c430f8b67658d..e8294fcb2b3e6b37683b364bf18bcbc39d664aa4 100644 (file)
@@ -101,7 +101,17 @@ int h2_ctx_is_task(h2_ctx *ctx)
     return ctx && ctx->task;
 }
 
-struct h2_task *h2_ctx_get_task(h2_ctx *ctx)
+h2_task *h2_ctx_get_task(h2_ctx *ctx)
 {
     return ctx? ctx->task : NULL;
 }
+
+h2_task *h2_ctx_cget_task(conn_rec *c)
+{
+    return h2_ctx_get_task(h2_ctx_get(c, 0));
+}
+
+h2_task *h2_ctx_rget_task(request_rec *r)
+{
+    return h2_ctx_get_task(h2_ctx_rget(r));
+}
index 68dc7c84c350b43af7883c218f55452b35ec6469..3b2c842caef4e90ce9a8d7ab90979a06e61be397 100644 (file)
@@ -71,5 +71,7 @@ const char *h2_ctx_protocol_get(const conn_rec *c);
 int h2_ctx_is_task(h2_ctx *ctx);
 
 struct h2_task *h2_ctx_get_task(h2_ctx *ctx);
+struct h2_task *h2_ctx_cget_task(conn_rec *c);
+struct h2_task *h2_ctx_rget_task(request_rec *r);
 
 #endif /* defined(__mod_h2__h2_ctx__) */
diff --git a/modules/http2/h2_int_queue.c b/modules/http2/h2_int_queue.c
new file mode 100644 (file)
index 0000000..ba44afb
--- /dev/null
@@ -0,0 +1,182 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <assert.h>
+#include <stddef.h>
+#include <apr_pools.h>
+
+#include "h2_int_queue.h"
+
+
+static void tq_grow(h2_int_queue *q, int nlen);
+static void tq_swap(h2_int_queue *q, int i, int j);
+static int tq_bubble_up(h2_int_queue *q, int i, int top, 
+                        h2_iq_cmp *cmp, void *ctx);
+static int tq_bubble_down(h2_int_queue *q, int i, int bottom, 
+                          h2_iq_cmp *cmp, void *ctx);
+
+h2_int_queue *h2_iq_create(apr_pool_t *pool, int capacity)
+{
+    h2_int_queue *q = apr_pcalloc(pool, sizeof(h2_int_queue));
+    if (q) {
+        q->pool = pool;
+        tq_grow(q, capacity);
+        q->nelts = 0;
+    }
+    return q;
+}
+
+int h2_iq_empty(h2_int_queue *q)
+{
+    return q->nelts == 0;
+}
+
+int h2_iq_size(h2_int_queue *q)
+{
+    return q->nelts;
+}
+
+
+void h2_iq_add(h2_int_queue *q, int sid, h2_iq_cmp *cmp, void *ctx)
+{
+    int i;
+    
+    if (q->nelts >= q->nalloc) {
+        tq_grow(q, q->nalloc * 2);
+    }
+    
+    i = (q->head + q->nelts) % q->nalloc;
+    q->elts[i] = sid;
+    ++q->nelts;
+    
+    if (cmp) {
+        /* bubble it to the front of the queue */
+        tq_bubble_up(q, i, q->head, cmp, ctx);
+    }
+}
+
+int h2_iq_remove(h2_int_queue *q, int sid)
+{
+    int i;
+    for (i = 0; i < q->nelts; ++i) {
+        if (sid == q->elts[(q->head + i) % q->nalloc]) {
+            break;
+        }
+    }
+    
+    if (i < q->nelts) {
+        ++i;
+        for (; i < q->nelts; ++i) {
+            q->elts[(q->head+i-1)%q->nalloc] = q->elts[(q->head+i)%q->nalloc];
+        }
+        --q->nelts;
+        return 1;
+    }
+    return 0;
+}
+
+void h2_iq_sort(h2_int_queue *q, h2_iq_cmp *cmp, void *ctx)
+{
+    /* Assume that changes in ordering are minimal. This needs,
+     * best case, q->nelts - 1 comparisions to check that nothing
+     * changed.
+     */
+    if (q->nelts > 0) {
+        int i, ni, prev, last;
+        
+        /* Start at the end of the queue and create a tail of sorted
+         * entries. Make that tail one element longer in each iteration.
+         */
+        last = i = (q->head + q->nelts - 1) % q->nalloc;
+        while (i != q->head) {
+            prev = (q->nalloc + i - 1) % q->nalloc;
+            
+            ni = tq_bubble_up(q, i, prev, cmp, ctx);
+            if (ni == prev) {
+                /* i bubbled one up, bubble the new i down, which
+                 * keeps all tasks below i sorted. */
+                tq_bubble_down(q, i, last, cmp, ctx);
+            }
+            i = prev;
+        };
+    }
+}
+
+
+int h2_iq_shift(h2_int_queue *q)
+{
+    int sid;
+    
+    if (q->nelts <= 0) {
+        return 0;
+    }
+    
+    sid = q->elts[q->head];
+    q->head = (q->head + 1) % q->nalloc;
+    q->nelts--;
+    
+    return sid;
+}
+
+static void tq_grow(h2_int_queue *q, int nlen)
+{
+    if (nlen > q->nalloc) {
+        int *nq = apr_pcalloc(q->pool, sizeof(int) * nlen);
+        if (q->nelts > 0) {
+            int l = ((q->head + q->nelts) % q->nalloc) - q->head;
+            
+            memmove(nq, q->elts + q->head, sizeof(int) * l);
+            if (l < q->nelts) {
+                /* elts wrapped, append elts in [0, remain] to nq */
+                int remain = q->nelts - l;
+                memmove(nq + l, q->elts, sizeof(int) * remain);
+            }
+        }
+        q->elts = nq;
+        q->nalloc = nlen;
+        q->head = 0;
+    }
+}
+
+static void tq_swap(h2_int_queue *q, int i, int j)
+{
+    int x = q->elts[i];
+    q->elts[i] = q->elts[j];
+    q->elts[j] = x;
+}
+
+static int tq_bubble_up(h2_int_queue *q, int i, int top, 
+                        h2_iq_cmp *cmp, void *ctx) 
+{
+    int prev;
+    while (((prev = (q->nalloc + i - 1) % q->nalloc), i != top) 
+           && (*cmp)(q->elts[i], q->elts[prev], ctx) < 0) {
+        tq_swap(q, prev, i);
+        i = prev;
+    }
+    return i;
+}
+
+static int tq_bubble_down(h2_int_queue *q, int i, int bottom, 
+                          h2_iq_cmp *cmp, void *ctx)
+{
+    int next;
+    while (((next = (q->nalloc + i + 1) % q->nalloc), i != bottom) 
+           && (*cmp)(q->elts[i], q->elts[next], ctx) > 0) {
+        tq_swap(q, next, i);
+        i = next;
+    }
+    return i;
+}
diff --git a/modules/http2/h2_int_queue.h b/modules/http2/h2_int_queue.h
new file mode 100644 (file)
index 0000000..6cdd84c
--- /dev/null
@@ -0,0 +1,103 @@
+/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __mod_h2__h2_int_queue__
+#define __mod_h2__h2_int_queue__
+
+/**
+ * h2_int_queue keeps a list of sorted h2_task* in ascending order.
+ */
+typedef struct h2_int_queue h2_int_queue;
+
+struct h2_int_queue {
+    int *elts;
+    int head;
+    int nelts;
+    int nalloc;
+    apr_pool_t *pool;
+};
+
+/**
+ * Comparator for two task to determine their order.
+ *
+ * @param s1 stream id to compare
+ * @param s2 stream id to compare
+ * @param ctx provided user data
+ * @return value is the same as for strcmp() and has the effect:
+ *    == 0: s1 and s2 are treated equal in ordering
+ *     < 0: s1 should be sorted before s2
+ *     > 0: s2 should be sorted before s1
+ */
+typedef int h2_iq_cmp(int s1, int s2, void *ctx);
+
+
+/**
+ * Allocate a new queue from the pool and initialize.
+ * @param id the identifier of the queue
+ * @param pool the memory pool
+ */
+h2_int_queue *h2_iq_create(apr_pool_t *pool, int capacity);
+
+/**
+ * Return != 0 iff there are no tasks in the queue.
+ * @param q the queue to check
+ */
+int h2_iq_empty(h2_int_queue *q);
+
+/**
+ * Return the number of int in the queue.
+ * @param q the queue to get size on
+ */
+int h2_iq_size(h2_int_queue *q);
+
+/**
+ * Add a stream idto the queue. 
+ *
+ * @param q the queue to append the task to
+ * @param sid the stream id to add
+ * @param cmp the comparator for sorting
+ * @param ctx user data for comparator 
+ */
+void h2_iq_add(h2_int_queue *q, int sid, h2_iq_cmp *cmp, void *ctx);
+
+/**
+ * Remove the stream id from the queue. Return != 0 iff task
+ * was found in queue.
+ * @param q the task queue
+ * @param sid the stream id to remove
+ * @return != 0 iff task was found in queue
+ */
+int h2_iq_remove(h2_int_queue *q, int sid);
+
+/**
+ * Sort the stream idqueue again. Call if the task ordering
+ * has changed.
+ *
+ * @param q the queue to sort
+ * @param cmp the comparator for sorting
+ * @param ctx user data for the comparator 
+ */
+void h2_iq_sort(h2_int_queue *q, h2_iq_cmp *cmp, void *ctx);
+
+/**
+ * Get the first stream id from the queue or NULL if the queue is empty. 
+ * The task will be removed.
+ *
+ * @param q the queue to get the first task from
+ * @return the first stream id of the queue, 0 if empty
+ */
+int h2_iq_shift(h2_int_queue *q);
+
+#endif /* defined(__mod_h2__h2_int_queue__) */
index 07953d1899624992bcd1beec81ef33fb60403201..6107c01ea5c052344eb4c77ab78e5651d2dfab57 100644 (file)
@@ -75,6 +75,11 @@ int h2_io_in_has_eos_for(h2_io *io)
     return io->eos_in || (io->bbin && h2_util_has_eos(io->bbin, -1));
 }
 
+int h2_io_in_has_data(h2_io *io)
+{
+    return io->bbin && h2_util_bb_has_data_or_eos(io->bbin);
+}
+
 int h2_io_out_has_data(h2_io *io)
 {
     return io->bbout && h2_util_bb_has_data_or_eos(io->bbout);
@@ -256,6 +261,18 @@ apr_status_t h2_io_in_read(h2_io *io, apr_bucket_brigade *bb,
         }
     }
     
+    if (status == APR_SUCCESS && (!io->bbin || APR_BRIGADE_EMPTY(io->bbin))) {
+        if (io->eos_in) {
+            if (!io->eos_in_written) {
+                status = append_eos(io, bb, trailers);
+                io->eos_in_written = 1;
+            }
+        }
+    }
+    
+    if (status == APR_SUCCESS && APR_BRIGADE_EMPTY(bb)) {
+        return APR_EAGAIN;
+    }
     return status;
 }
 
index f0b085b0d2cde38dc6bbc9809b6343de139d5927..9c1584a376abd438c0d83a32e26aeb4c71b5a997 100644 (file)
@@ -63,9 +63,6 @@ struct h2_io {
     apr_size_t input_consumed;       /* how many bytes have been read */
         
     int files_handles_owned;
-    
-    struct h2_task *task;            /* parked task */
-    request_rec *r;                  /* parked request */
 };
 
 /*******************************************************************************
@@ -101,6 +98,10 @@ int h2_io_in_has_eos_for(h2_io *io);
  * Output data is available.
  */
 int h2_io_out_has_data(h2_io *io);
+/**
+ * Input data is available.
+ */
+int h2_io_in_has_data(h2_io *io);
 
 void h2_io_signal(h2_io *io, h2_io_op op);
 void h2_io_signal_init(h2_io *io, h2_io_op op, apr_interval_time_t timeout, 
index 80b36c05ca0e908cd633b38d40acc4beff045d65..f5c4bb7acdf0a9a6a04f40fbea378a32fe343e10 100644 (file)
@@ -34,6 +34,7 @@
 #include "h2_conn.h"
 #include "h2_ctx.h"
 #include "h2_h2.h"
+#include "h2_int_queue.h"
 #include "h2_io.h"
 #include "h2_io_set.h"
 #include "h2_response.h"
@@ -44,7 +45,6 @@
 #include "h2_task.h"
 #include "h2_task_input.h"
 #include "h2_task_output.h"
-#include "h2_task_queue.h"
 #include "h2_worker.h"
 #include "h2_workers.h"
 #include "h2_util.h"
@@ -206,7 +206,13 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
             return NULL;
         }
         
-        m->q = h2_tq_create(m->pool, h2_config_geti(conf, H2_CONF_MAX_STREAMS));
+        status = apr_thread_cond_create(&m->request_done, m->pool);
+        if (status != APR_SUCCESS) {
+            h2_mplx_destroy(m);
+            return NULL;
+        }
+
+        m->q = h2_iq_create(m->pool, h2_config_geti(conf, H2_CONF_MAX_STREAMS));
         m->stream_ios = h2_io_set_create(m->pool);
         m->ready_ios = h2_io_set_create(m->pool);
         m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM);
@@ -296,7 +302,7 @@ static int io_stream_done(h2_mplx *m, h2_io *io, int rst_error)
     h2_io_set_remove(m->ready_ios, io);
     if (!io->processing_started || io->processing_done) {
         /* already finished or not even started yet */
-        h2_tq_remove(m->q, io->id);
+        h2_iq_remove(m->q, io->id);
         io_destroy(m, io, 1);
         return 0;
     }
@@ -324,6 +330,7 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
         /* disable WINDOW_UPDATE callbacks */
         h2_mplx_set_consumed_cb(m, NULL, NULL);
         
+        apr_thread_cond_broadcast(m->request_done);
         while (!h2_io_set_iter(m->stream_ios, stream_done_iter, m)) {
             /* iterate until all ios have been orphaned or destroyed */
         }
@@ -351,6 +358,8 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
                                   "all h2_workers to return, have still %d requests outstanding", 
                                   m->id, i*wait_secs, (int)h2_io_set_size(m->stream_ios));
                 }
+                m->aborted = 1;
+                apr_thread_cond_broadcast(m->request_done);
             }
         }
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
@@ -408,7 +417,7 @@ static const h2_request *pop_request(h2_mplx *m)
 {
     const h2_request *req = NULL;
     int sid;
-    while (!m->aborted && !req && (sid = h2_tq_shift(m->q)) > 0) {
+    while (!m->aborted && !req && (sid = h2_iq_shift(m->q)) > 0) {
         h2_io *io = h2_io_set_get(m->stream_ios, sid);
         if (io) {
             req = io->request;
@@ -421,17 +430,8 @@ static const h2_request *pop_request(h2_mplx *m)
     return req;
 }
 
-static apr_status_t h2_mplx_engine_schedule(h2_mplx *m, request_rec *r)
-{
-    if (!m->engine_queue) {
-        apr_queue_create(&m->engine_queue, 200, m->pool);
-    }
-    return apr_queue_trypush(m->engine_queue, r);
-}
-
-void h2_mplx_request_done(h2_mplx **pm, int stream_id, const h2_request **preq)
+void h2_mplx_request_done(h2_mplx *m, int stream_id, const h2_request **preq)
 {
-    h2_mplx *m = *pm;
     int acquired;
     
     if (enter_mutex(m, &acquired) == APR_SUCCESS) {
@@ -440,49 +440,30 @@ void h2_mplx_request_done(h2_mplx **pm, int stream_id, const h2_request **preq)
             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
                           "h2_mplx(%ld): request(%d) done", m->id, stream_id);
             if (io) {
-                request_rec *r = io->r;
-                
+                io->processing_done = 1;
+                h2_mplx_out_close(m, stream_id, NULL);
                 if (io->orphaned) {
-                    io->processing_done = 1;
-                }
-                else if (r) {
-                    /* A parked request which is being transferred from
-                     * one worker thread to another. This request_done call
-                     * was from the initial thread and now it is safe to
-                     * schedule it for further processing. */
-                    h2_task_thaw(io->task);
-                    io->task = NULL;
-                    io->r = NULL;
-                    h2_mplx_engine_schedule(*pm, r);
+                    io_destroy(m, io, 0);
+                    if (m->join_wait) {
+                        apr_thread_cond_signal(m->join_wait);
+                    }
                 }
                 else {
-                    io->processing_done = 1;
-                }
-                
-                if (io->processing_done) {
-                    h2_io_out_close(io, NULL);
-                    if (io->orphaned) {
-                        io_destroy(m, io, 0);
-                        if (m->join_wait) {
-                            apr_thread_cond_signal(m->join_wait);
-                        }
-                    }
-                    else {
-                        /* hang around until the stream deregisteres */
-                    }
+                    /* hang around until the stream deregisteres */
                 }
             }
+            else {
+                ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+                              "h2_mplx(%ld): request(%d) done, no io found", 
+                              m->id, stream_id);
+            }
+            apr_thread_cond_broadcast(m->request_done);
         }
         
         if (preq) {
             /* someone wants another request, if we have */
             *preq = pop_request(m);
         }
-        if (!preq || !*preq) {
-            /* No request to hand back to the worker, NULLify reference
-             * and decrement count */
-            *pm = NULL;
-        }
         leave_mutex(m, acquired);
     }
 }
@@ -935,6 +916,26 @@ int h2_mplx_in_has_eos_for(h2_mplx *m, int stream_id)
     return has_eos;
 }
 
+int h2_mplx_in_has_data_for(h2_mplx *m, int stream_id)
+{
+    apr_status_t status;
+    int has_data = 0;
+    int acquired;
+    
+    AP_DEBUG_ASSERT(m);
+    if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
+        h2_io *io = h2_io_set_get(m->stream_ios, stream_id);
+        if (io && !io->orphaned) {
+            has_data = h2_io_in_has_data(io);
+        }
+        else {
+            has_data = 0;
+        }
+        leave_mutex(m, acquired);
+    }
+    return has_data;
+}
+
 int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id)
 {
     apr_status_t status;
@@ -1001,7 +1002,7 @@ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx)
             status = APR_ECONNABORTED;
         }
         else {
-            h2_tq_sort(m->q, cmp, ctx);
+            h2_iq_sort(m->q, cmp, ctx);
             
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                           "h2_mplx(%ld): reprioritize tasks", m->id);
@@ -1050,8 +1051,8 @@ apr_status_t h2_mplx_process(h2_mplx *m, int stream_id, const h2_request *req,
                 status = h2_io_in_close(io);
             }
             
-            was_empty = h2_tq_empty(m->q);
-            h2_tq_add(m->q, io->id, cmp, ctx);
+            was_empty = h2_iq_empty(m->q);
+            h2_iq_add(m->q, io->id, cmp, ctx);
             
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
                           "h2_mplx(%ld-%d): process", m->c->id, stream_id);
@@ -1079,61 +1080,116 @@ const h2_request *h2_mplx_pop_request(h2_mplx *m, int *has_more)
         }
         else {
             req = pop_request(m);
-            *has_more = !h2_tq_empty(m->q);
+            *has_more = !h2_iq_empty(m->q);
         }
         leave_mutex(m, acquired);
     }
     return req;
 }
 
-apr_status_t h2_mplx_engine_push(h2_mplx *m, h2_task *task,
-                                 const char *engine_type, 
+
+/*******************************************************************************
+ * HTTP/2 request engines
+ ******************************************************************************/
+typedef struct h2_req_engine_i h2_req_engine_i;
+struct h2_req_engine_i {
+    h2_req_engine pub;
+    conn_rec *c;               /* connection this engine is assigned to */
+    h2_mplx *m;
+    unsigned int shutdown : 1; /* engine is being shut down */
+    apr_thread_cond_t *io;     /* condition var for waiting on data */
+    apr_queue_t *queue;        /* queue of scheduled request_rec* */
+    apr_size_t no_assigned;    /* # of assigned requests */
+    apr_size_t no_live;        /* # of live */
+    apr_size_t no_finished;    /* # of finished */
+};
+
+static apr_status_t h2_mplx_engine_schedule(h2_mplx *m, 
+                                            h2_req_engine_i *engine, 
+                                            request_rec *r)
+{
+    if (!engine->queue) {
+        apr_queue_create(&engine->queue, 100, engine->pub.pool);
+    }
+    return apr_queue_trypush(engine->queue, r);
+}
+
+
+apr_status_t h2_mplx_engine_push(const char *engine_type, 
                                  request_rec *r, h2_mplx_engine_init *einit)
 {
     apr_status_t status;
+    h2_mplx *m;
+    h2_task *task;
     int acquired;
     
+    task = h2_ctx_rget_task(r);
+    if (!task) {
+        return APR_ECONNABORTED;
+    }
+    m = task->mplx;
     AP_DEBUG_ASSERT(m);
+    
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         h2_io *io = h2_io_set_get(m->stream_ios, task->stream_id);
         if (!io || io->orphaned) {
             status = APR_ECONNABORTED;
         }
         else {
-            h2_req_engine *engine;
+            h2_req_engine_i *engine = (h2_req_engine_i*)m->engine;
             
             apr_table_set(r->connection->notes, H2_TASK_ID_NOTE, task->id);
             status = APR_EOF;
-            engine = m->engine; /* just a single one for now */
+            
             if (task->ser_headers) {
                 /* Max compatibility, deny processing of this */
             }
-            else if (!engine && einit) {
-                engine = apr_pcalloc(r->connection->pool, sizeof(*engine));
-                engine->id = 1;
-                engine->c = r->connection;
-                engine->pool = r->connection->pool;
-                engine->type = apr_pstrdup(engine->pool, engine_type);
-                
-                status = einit(engine, r);
-                if (status == APR_SUCCESS) {
-                    m->engine = engine;
+            else if (engine && !strcmp(engine->pub.type, engine_type)) {
+                if (engine->shutdown 
+                    || engine->no_assigned >= H2MIN(engine->pub.capacity, 100)) {
                     ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
-                                  "h2_mplx(%ld): init engine %d (%s)", 
-                                  m->c->id, engine->id, engine->type);
+                                  "h2_mplx(%ld): engine shutdown or over %s", 
+                                  m->c->id, engine->pub.id);
+                    engine = NULL;
                 }
-            }
-            else if (engine && !strcmp(engine->type, engine_type)) {
-                if (status == APR_SUCCESS) {
+                else if (h2_mplx_engine_schedule(m, engine, r) == APR_SUCCESS) {
                     /* this task will be processed in another thread,
                      * freeze any I/O for the time being. */
                     h2_task_freeze(task, r);
-                    io->task = task;
-                    io->r = r;
+                    engine->no_assigned++;
+                    status = APR_SUCCESS;
+                    ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r,
+                                  "h2_mplx(%ld): push request %s", 
+                                  m->c->id, r->the_request);
+                }
+                else {
+                    ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
+                                  "h2_mplx(%ld): engine error adding req %s", 
+                                  m->c->id, engine->pub.id);
+                    engine = NULL;
+                }
+            }
+            
+            if (!engine && einit) {
+                engine = apr_pcalloc(task->pool, sizeof(*engine));
+                engine->pub.id = apr_psprintf(task->pool, "eng-%ld-%d", 
+                                               m->id, m->next_eng_id++);
+                engine->pub.pool = task->pool;
+                engine->pub.type = apr_pstrdup(task->pool, engine_type);
+                engine->c = r->connection;
+                engine->m = m;
+                engine->io = task->io;
+                engine->no_assigned = 1;
+                engine->no_live = 1;
+                
+                status = einit(&engine->pub, r);
+                ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r,
+                              "h2_mplx(%ld): init engine %s (%s)", 
+                              m->c->id, engine->pub.id, engine->pub.type);
+                if (status == APR_SUCCESS) {
+                    m->engine = &engine->pub;
                 }
-                ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, r,
-                              "h2_mplx(%ld): push request %s", 
-                              m->c->id, r->the_request);
             }
         }
         
@@ -1141,52 +1197,163 @@ apr_status_t h2_mplx_engine_push(h2_mplx *m, h2_task *task,
     }
     return status;
 }
+
+static request_rec *get_non_frozen(apr_queue_t *equeue)
+{
+    request_rec *r, *first = NULL;
+    h2_task *task;
+    void *elem;
+
+    if (equeue) {
+        /* FIFO queue, try to find a  request_rec whose task is not frozen */
+        while (apr_queue_trypop(equeue, &elem) == APR_SUCCESS) {
+            r = elem;
+            task = h2_ctx_rget_task(r);
+            AP_DEBUG_ASSERT(task);
+            if (!task->frozen) {
+                return r;
+            }
+            apr_queue_push(equeue, r);  
+            if (!first) {
+                first = r;
+            }
+            else if (r == first) {
+                return NULL; /* walked the whole queue */
+            }
+        }
+    }
+    return NULL;
+}
+
+static apr_status_t engine_pull(h2_mplx *m, h2_req_engine_i *engine, 
+                                apr_read_type_e block, request_rec **pr)
+{   
+    request_rec *r;
+    
+    AP_DEBUG_ASSERT(m);
+    AP_DEBUG_ASSERT(engine);
+    while (1) {
+        if (m->aborted) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
+                          "h2_mplx(%ld): mplx abort while pulling requests %s", 
+                          m->id, engine->pub.id);
+            *pr = NULL;
+            return APR_EOF;
+        }
+        
+        if (engine->queue && (r = get_non_frozen(engine->queue))) {
+            ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
+                          "h2_mplx(%ld): request %s pulled by engine %s", 
+                          m->c->id, r->the_request, engine->pub.id);
+            engine->no_live++;
+            *pr = r;
+            return APR_SUCCESS;
+        }
+        else if (APR_NONBLOCK_READ == block) {
+            *pr = NULL;
+            return APR_EAGAIN;
+        }
+        else if (!engine->queue || !apr_queue_size(engine->queue)) {
+            engine->shutdown = 1;
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                          "h2_mplx(%ld): emtpy queue, shutdown engine %s", 
+                          m->id, engine->pub.id);
+            *pr = NULL;
+            return APR_EOF;
+        }
+        apr_thread_cond_timedwait(m->request_done, m->lock, 
+                                  apr_time_from_msec(100));
+    }
+}
                                  
-apr_status_t h2_mplx_engine_pull(h2_mplx *m, h2_task *task,
-                                 struct h2_req_engine *engine, 
-                                 apr_time_t timeout, request_rec **pr)
+apr_status_t h2_mplx_engine_pull(h2_req_engine *pub_engine, 
+                                 apr_read_type_e block, request_rec **pr)
 {   
+    h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine;
+    h2_mplx *m = engine->m;
     apr_status_t status;
     int acquired;
     
-    AP_DEBUG_ASSERT(m);
+    *pr = NULL;
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        status = APR_ECONNABORTED;
-        if (m->engine == engine && m->engine_queue) {
-            void *elem;
-            status = apr_queue_trypop(m->engine_queue, &elem);
-            if (status == APR_SUCCESS) {
-                *pr = elem;
-                ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, *pr,
-                              "h2_mplx(%ld): request %s pulled by engine %d", 
-                              m->c->id, (*pr)->the_request, engine->id);
-            }
-        }
+        status = engine_pull(m, engine, block, pr);
         leave_mutex(m, acquired);
     }
     return status;
 }
  
-void h2_mplx_engine_done(h2_mplx *m, struct h2_task *task, conn_rec *r_conn)
+static void engine_done(h2_mplx *m, h2_req_engine_i *engine, h2_task *task, 
+                        int waslive, int aborted)
 {
-    int stream_id = task->stream_id;
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
+                  "h2_mplx(%ld): task %s %s by %s", 
+                  m->id, task->id, aborted? "aborted":"done", 
+                  engine->pub.id);
     h2_task_output_close(task->output);
-    h2_mplx_request_done(&m, stream_id, NULL);
-    apr_pool_destroy(r_conn->pool);
+    h2_mplx_request_done(m, task->stream_id, NULL);
+    apr_pool_destroy(task->pool);
+    engine->no_finished++;
+    if (waslive) engine->no_live--;
+    engine->no_assigned--;
+}
+                                
+void h2_mplx_engine_done(h2_req_engine *pub_engine, conn_rec *r_conn)
+{
+    h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine;
+    h2_mplx *m = engine->m;
+    h2_task *task;
+    int acquired;
+
+    task = h2_ctx_cget_task(r_conn);
+    if (task && (enter_mutex(m, &acquired) == APR_SUCCESS)) {
+        engine_done(m, engine, task, 1, 0);
+        leave_mutex(m, acquired);
+    }
 }
                                 
-void h2_mplx_engine_exit(h2_mplx *m, struct h2_task *task, 
-                         struct h2_req_engine *engine)
+void h2_mplx_engine_exit(h2_req_engine *pub_engine)
 {
+    h2_req_engine_i *engine = (h2_req_engine_i*)pub_engine;
+    h2_mplx *m = engine->m;
     int acquired;
     
-    AP_DEBUG_ASSERT(m);
     if (enter_mutex(m, &acquired) == APR_SUCCESS) {
-        /* TODO: shutdown of engine->c */
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
-                      "h2_mplx(%ld): exit engine %d (%s)", 
-                      m->c->id, engine->id, engine->type);
-        m->engine = NULL;
+        if (engine->queue && apr_queue_size(engine->queue)) {
+            void *entry;
+            ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+                          "h2_mplx(%ld): exit engine %s (%s), "
+                          "has still %d requests queued, "
+                          "assigned=%ld, live=%ld, finished=%ld", 
+                          m->c->id, engine->pub.id, engine->pub.type,
+                          (int)apr_queue_size(engine->queue),
+                          (long)engine->no_assigned, (long)engine->no_live,
+                          (long)engine->no_finished);
+            while (apr_queue_trypop(engine->queue, &entry) == APR_SUCCESS) {
+                request_rec *r = entry;
+                h2_task *task = h2_ctx_rget_task(r);
+                ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+                              "h2_mplx(%ld): engine %s has queued task %s, "
+                              "frozen=%d, aborting",
+                              m->c->id, engine->pub.id, task->id, task->frozen);
+                engine_done(m, engine, task, 0, 1);
+            }
+        }
+        if (engine->no_assigned > 1 || engine->no_live > 1) {
+            ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
+                          "h2_mplx(%ld): exit engine %s (%s), "
+                          "assigned=%ld, live=%ld, finished=%ld", 
+                          m->c->id, engine->pub.id, engine->pub.type,
+                          (long)engine->no_assigned, (long)engine->no_live,
+                          (long)engine->no_finished);
+        }
+        else {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                          "h2_mplx(%ld): exit engine %s (%s)", 
+                          m->c->id, engine->pub.id, engine->pub.type);
+        }
+        if (m->engine == &engine->pub) {
+            m->engine = NULL; /* TODO */
+        }
         leave_mutex(m, acquired);
     }
 }
index 837025f996a13285d43715add81026ab0a93250e..724cfe1c2e970d437d463c4fbfadb4154cfdf129 100644 (file)
@@ -46,7 +46,7 @@ struct h2_io_set;
 struct apr_thread_cond_t;
 struct h2_workers;
 struct h2_stream_set;
-struct h2_task_queue;
+struct h2_int_queue;
 struct h2_req_engine;
 
 #include <apr_queue.h>
@@ -69,7 +69,7 @@ struct h2_mplx {
 
     unsigned int aborted : 1;
 
-    struct h2_task_queue *q;
+    struct h2_int_queue *q;
     struct h2_io_set *stream_ios;
     struct h2_io_set *ready_ios;
     
@@ -77,6 +77,7 @@ struct h2_mplx {
 
     apr_thread_mutex_t *lock;
     struct apr_thread_cond_t *added_output;
+    struct apr_thread_cond_t *request_done;
     struct apr_thread_cond_t *join_wait;
     
     apr_size_t stream_max_mem;
@@ -91,7 +92,9 @@ struct h2_mplx {
     void *input_consumed_ctx;
     
     struct h2_req_engine *engine;
+    /* TODO: signal for waiting tasks*/
     apr_queue_t *engine_queue;
+    int next_eng_id;
 };
 
 
@@ -127,7 +130,7 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, struct apr_thread_cond_t *wait
  */
 void h2_mplx_abort(h2_mplx *mplx);
 
-void h2_mplx_request_done(h2_mplx **pm, int stream_id, const struct h2_request **preq);
+void h2_mplx_request_done(h2_mplx *m, int stream_id, const struct h2_request **preq);
 
 /**
  * Get the highest stream identifier that has been passed on to processing.
@@ -151,10 +154,14 @@ int h2_mplx_get_max_stream_started(h2_mplx *m);
  */
 apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error);
 
-/* Return != 0 iff the multiplexer has data for the given stream. 
+/* Return != 0 iff the multiplexer has output data for the given stream. 
  */
 int h2_mplx_out_has_data_for(h2_mplx *m, int stream_id);
 
+/* Return != 0 iff the multiplexer has input data for the given stream. 
+ */
+int h2_mplx_in_has_data_for(h2_mplx *m, int stream_id);
+
 /**
  * Waits on output data from any stream in this session to become available. 
  * Returns APR_TIMEUP if no data arrived in the given time.
@@ -385,17 +392,14 @@ APR_RING_INSERT_TAIL((b), ap__b, h2_mplx, link);  \
 typedef apr_status_t h2_mplx_engine_init(struct h2_req_engine *engine, 
                                          request_rec *r);
 
-apr_status_t h2_mplx_engine_push(h2_mplx *m, struct h2_task *task, 
-                                 const char *engine_type, 
+apr_status_t h2_mplx_engine_push(const char *engine_type, 
                                  request_rec *r, h2_mplx_engine_init *einit);
                                  
-apr_status_t h2_mplx_engine_pull(h2_mplx *m, struct h2_task *task, 
-                                 struct h2_req_engine *engine, 
-                                 apr_time_t timeout, request_rec **pr);
+apr_status_t h2_mplx_engine_pull(struct h2_req_engine *engine, 
+                                 apr_read_type_e block, request_rec **pr);
 
-void h2_mplx_engine_done(h2_mplx *m, struct h2_task *task, conn_rec *r_conn);
+void h2_mplx_engine_done(struct h2_req_engine *engine, conn_rec *r_conn);
                                  
-void h2_mplx_engine_exit(h2_mplx *m, struct h2_task *task, 
-                         struct h2_req_engine *engine);
+void h2_mplx_engine_exit(struct h2_req_engine *engine);
 
 #endif /* defined(__mod_h2__h2_mplx__) */
index 8f7b5d0a98852c2689ee2b9225a9056c028b46d1..a55d7ec7395dda17b1c6129ebc86c37c74c90d81 100644 (file)
 #include <mod_http2.h>
 
 #include "h2.h"
+#include "h2_int_queue.h"
 #include "h2_request.h"
 #include "h2_util.h"
 #include "h2_proxy_session.h"
 
 APLOG_USE_MODULE(proxy_http2);
 
+typedef struct h2_proxy_stream {
+    int id;
+    apr_pool_t *pool;
+    h2_proxy_session *session;
+
+    const char *url;
+    request_rec *r;
+    h2_request *req;
+
+    h2_stream_state_t state;
+    unsigned int suspended : 1;
+    unsigned int data_received : 1;
+
+    apr_bucket_brigade *input;
+    apr_bucket_brigade *output;
+    
+    apr_table_t *saves;
+} h2_proxy_stream;
+
+
 static int ngstatus_from_apr_status(apr_status_t rv)
 {
     if (rv == APR_SUCCESS) {
@@ -41,19 +62,21 @@ static int ngstatus_from_apr_status(apr_status_t rv)
     return NGHTTP2_ERR_PROTO;
 }
 
+static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev, 
+                           int arg, const char *msg);
 
-static apr_status_t proxy_session_shutdown(void *theconn)
+
+static apr_status_t proxy_session_pre_close(void *theconn)
 {
     proxy_conn_rec *p_conn = (proxy_conn_rec *)theconn;
     h2_proxy_session *session = p_conn->data;
 
     if (session && session->ngh2) {
-        if (session->c && !session->c->aborted && !session->goaway_sent) {
-            nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 
-                                  session->max_stream_recv, 0, NULL, 0);
-            nghttp2_session_send(session->ngh2);
-        }
-
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
+                      "proxy_session(%s): shutdown, state=%d, streams=%d",
+                      session->id, session->state, 
+                      h2_iq_size(session->streams));
+        dispatch_event(session, H2_PROXYS_EV_PRE_CLOSE, 0, NULL);
         nghttp2_session_del(session->ngh2);
         session->ngh2 = NULL;
         p_conn->data = NULL;
@@ -109,8 +132,8 @@ static ssize_t raw_send(nghttp2_session *ngh2, const uint8_t *data,
     int flush = 1;
 
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, 
-                  "h2_proxy_sesssion(%ld): raw_send %d bytes, flush=%d", 
-                  session->c->id, (int)length, flush);
+                  "h2_proxy_sesssion(%s): raw_send %d bytes, flush=%d", 
+                  session->id, (int)length, flush);
     b = apr_bucket_transient_create((const char*)data, length, 
                                     session->c->bucket_alloc);
     APR_BRIGADE_INSERT_TAIL(session->output, b);
@@ -120,7 +143,7 @@ static ssize_t raw_send(nghttp2_session *ngh2, const uint8_t *data,
                                 session->output, flush);
     if (status != APR_SUCCESS) {
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, 
-                      "h2_proxy_sesssion(%ld): sending", session->c->id);
+                      "h2_proxy_sesssion(%s): sending", session->id);
         return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
     return length;
@@ -138,8 +161,8 @@ static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame,
         
         h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO()
-                      "h2_session(%ld): recv FRAME[%s]",
-                      session->c->id, buffer);
+                      "h2_session(%s): recv FRAME[%s]",
+                      session->id, buffer);
     }
 
     switch (frame->hd.type) {
@@ -150,9 +173,22 @@ static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame,
             break;
         case NGHTTP2_PUSH_PROMISE:
             break;
+        case NGHTTP2_SETTINGS:
+            if (frame->settings.niv > 0) {
+                session->remote_max_concurrent = nghttp2_session_get_remote_settings(ngh2, NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS);
+            }
+            break;
         case NGHTTP2_GOAWAY:
-            session->goaway_recvd = 1;
+            dispatch_event(session, H2_PROXYS_EV_REMOTE_GOAWAY, 0, NULL);
             /* TODO: close handling */
+            if (APLOGcinfo(session->c)) {
+                char buffer[256];
+                
+                h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
+                ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, session->c, APLOGNO()
+                              "h2_session(%s): recv FRAME[%s]",
+                              session->id, buffer);
+            }
             break;
         default:
             break;
@@ -164,13 +200,27 @@ static int before_frame_send(nghttp2_session *ngh2,
                              const nghttp2_frame *frame, void *user_data)
 {
     h2_proxy_session *session = user_data;
-    if (APLOGcdebug(session->c)) {
-        char buffer[256];
-        
-        h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
-        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03068)
-                      "h2_session(%ld): sent FRAME[%s]",
-                      session->c->id, buffer);
+    switch (frame->hd.type) {
+        case NGHTTP2_GOAWAY:
+            if (APLOGcinfo(session->c)) {
+                char buffer[256];
+                
+                h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
+                ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, session->c, APLOGNO()
+                              "h2_session(%s): sent FRAME[%s]",
+                              session->id, buffer);
+            }
+            break;
+        default:
+            if (APLOGcdebug(session->c)) {
+                char buffer[256];
+                
+                h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
+                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO()
+                              "h2_session(%s): sent FRAME[%s]",
+                              session->id, buffer);
+            }
+            break;
     }
     return 0;
 }
@@ -217,8 +267,8 @@ static apr_status_t h2_proxy_stream_add_header_out(h2_proxy_stream *stream,
             char *s = apr_pstrndup(stream->pool, v, vlen);
             
             ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, 
-                          "h2_proxy_stream(%ld-%d): got status %s", 
-                          stream->session->c->id, stream->id, s);
+                          "h2_proxy_stream(%s-%d): got status %s", 
+                          stream->session->id, stream->id, s);
             stream->r->status = (int)apr_atoi64(s);
             if (stream->r->status <= 0) {
                 stream->r->status = 500;
@@ -236,8 +286,8 @@ static apr_status_t h2_proxy_stream_add_header_out(h2_proxy_stream *stream,
         hvalue = apr_pstrndup(stream->pool, v, vlen);
         
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, 
-                      "h2_proxy_stream(%ld-%d): got header %s: %s", 
-                      stream->session->c->id, stream->id, hname, hvalue);
+                      "h2_proxy_stream(%s-%d): got header %s: %s", 
+                      stream->session->id, stream->id, hname, hvalue);
         process_proxy_header(stream->r, hname, hvalue);
     }
     return APR_SUCCESS;
@@ -248,8 +298,8 @@ static int log_header(void *ctx, const char *key, const char *value)
     h2_proxy_stream *stream = ctx;
     
     ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r, 
-                  "h2_proxy_stream(%ld-%d), header_out %s: %s", 
-                  stream->session->c->id, stream->id, key, value);
+                  "h2_proxy_stream(%s-%d), header_out %s: %s", 
+                  stream->session->id, stream->id, key, value);
     return 1;
 }
 
@@ -307,8 +357,8 @@ static void h2_proxy_stream_end_headers_out(h2_proxy_stream *stream)
     
     if (APLOGrtrace2(stream->r)) {
         ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r, 
-                      "h2_proxy_stream(%ld-%d), header_out after merging", 
-                      stream->session->c->id, stream->id);
+                      "h2_proxy_stream(%s-%d), header_out after merging", 
+                      stream->session->id, stream->id);
         apr_table_do(log_header, stream, stream->r->headers_out, NULL);
     }
 }
@@ -338,11 +388,15 @@ static int on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags,
     b = apr_bucket_transient_create((const char*)data, len, 
                                     stream->r->connection->bucket_alloc);
     APR_BRIGADE_INSERT_TAIL(stream->output, b);
+    if (flags & NGHTTP2_DATA_FLAG_EOF) {
+        b = apr_bucket_flush_create(stream->r->connection->bucket_alloc);
+        APR_BRIGADE_INSERT_TAIL(stream->output, b);
+    }
     status = ap_pass_brigade(stream->r->output_filters, stream->output);
     if (status != APR_SUCCESS) {
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO()
-                      "h2_session(%ld-%d): passing output", 
-                      session->c->id, stream->id);
+                      "h2_session(%s-%d): passing output", 
+                      session->id, stream->id);
         return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
     return 0;
@@ -352,23 +406,7 @@ static int on_stream_close(nghttp2_session *ngh2, int32_t stream_id,
                            uint32_t error_code, void *user_data) 
 {
     h2_proxy_session *session = user_data;
-    h2_proxy_stream *stream;
-    
-    stream = nghttp2_session_get_stream_user_data(ngh2, stream_id);
-    if (!stream) {
-        return 0;
-    }
-    
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, 
-                  "h2_proxy_sesssion(%ld): closing stream(%d)", 
-                  session->c->id, stream_id);
-
-    if (!stream->data_received) {
-        /* last chance to manipulate response headers.
-         * after this, only trailers */
-        stream->data_received = 1;
-    }
-    stream->state = H2_STREAM_ST_CLOSED;
+    dispatch_event(session, H2_PROXYS_EV_STREAM_DONE, stream_id, NULL);
     return 0;
 }
 
@@ -413,11 +451,11 @@ static ssize_t stream_data_read(nghttp2_session *ngh2, int32_t stream_id,
     
     if (APR_BRIGADE_EMPTY(stream->input)) {
         status = ap_get_brigade(stream->r->input_filters, stream->input,
-                                AP_MODE_READBYTES, APR_BLOCK_READ,
-                                H2MIN(APR_BUCKET_BUFF_SIZE, length));
+                                AP_MODE_READBYTES, APR_NONBLOCK_READ,
+                                H2MAX(APR_BUCKET_BUFF_SIZE, length));
         ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r, 
-                      "h2_proxy_stream(%d): request body read", 
-                      stream->id);
+                      "h2_proxy_stream(%s-%d): request body read", 
+                      stream->session->id, stream->id);
     }
 
     if (status == APR_SUCCESS) {
@@ -459,31 +497,42 @@ static ssize_t stream_data_read(nghttp2_session *ngh2, int32_t stream_id,
         return readlen;
     }
     else if (APR_STATUS_IS_EAGAIN(status)) {
+        /* suspended stream, needs to be re-awakened */
+        ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r, 
+                      "h2_proxy_stream(%s-%d): suspending", 
+                      stream->session->id, stream_id);
+        stream->suspended = 1;
+        h2_iq_add(stream->session->suspended, stream->id, NULL, NULL);
         return NGHTTP2_ERR_DEFERRED;
     }
     return ngstatus_from_apr_status(status);
 }
 
-h2_proxy_session *h2_proxy_session_setup(request_rec *r, proxy_conn_rec *p_conn,
-                                         proxy_server_conf *conf)
+h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
+                                         proxy_server_conf *conf,
+                                         h2_proxy_request_done *done)
 {
     if (!p_conn->data) {
+        apr_pool_t *pool = p_conn->scpool;
         h2_proxy_session *session;
-        nghttp2_settings_entry settings[2];
         nghttp2_session_callbacks *cbs;
-        int add_conn_window;
-        int rv;
+        nghttp2_option *option;
         
-        session = apr_pcalloc(p_conn->scpool, sizeof(*session));
-        apr_pool_pre_cleanup_register(p_conn->scpool, p_conn, proxy_session_shutdown);
+        session = apr_pcalloc(pool, sizeof(*session));
+        apr_pool_pre_cleanup_register(pool, p_conn, proxy_session_pre_close);
         p_conn->data = session;
         
+        session->id = apr_pstrdup(p_conn->scpool, id);
         session->c = p_conn->connection;
         session->p_conn = p_conn;
         session->conf = conf;
         session->pool = p_conn->scpool;
+        session->state = H2_PROXYS_ST_INIT;
         session->window_bits_default    = 30;
         session->window_bits_connection = 30;
+        session->streams = h2_iq_create(pool, 25);
+        session->suspended = h2_iq_create(pool, 5);
+        session->done = done;
     
         session->input = apr_brigade_create(session->pool, session->c->bucket_alloc);
         session->output = apr_brigade_create(session->pool, session->c->bucket_alloc);
@@ -496,70 +545,41 @@ h2_proxy_session *h2_proxy_session_setup(request_rec *r, proxy_conn_rec *p_conn,
         nghttp2_session_callbacks_set_before_frame_send_callback(cbs, before_frame_send);
         nghttp2_session_callbacks_set_send_callback(cbs, raw_send);
         
-        nghttp2_session_client_new(&session->ngh2, cbs, session);
+        nghttp2_option_new(&option);
+        nghttp2_option_set_peer_max_concurrent_streams(option, 20);
+        
+        nghttp2_session_client_new2(&session->ngh2, cbs, session, option);
+        
+        nghttp2_option_del(option);
         nghttp2_session_callbacks_del(cbs);
 
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
                       "setup session for %s", p_conn->hostname);
         
-        settings[0].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH;
-        settings[0].value = 0;
-        settings[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
-        settings[1].value = (1 << session->window_bits_default) - 1;
-        
-        rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE, settings, 
-                                     H2_ALEN(settings));
-
-        /* If the connection window is larger than our default, trigger a WINDOW_UPDATE */
-        add_conn_window = ((1 << session->window_bits_connection) - 1 -
-                           NGHTTP2_INITIAL_CONNECTION_WINDOW_SIZE);
-        if (!rv && add_conn_window != 0) {
-            rv = nghttp2_submit_window_update(session->ngh2, NGHTTP2_FLAG_NONE, 0, add_conn_window);
-        }
     }
     return p_conn->data;
 }
 
-
-apr_status_t h2_proxy_session_open_stream(h2_proxy_session *session, const char *url,
-                                          request_rec *r, h2_proxy_stream **pstream)
+static apr_status_t session_start(h2_proxy_session *session) 
 {
-    h2_proxy_stream *stream;
-    apr_uri_t puri;
-    const char *authority, *scheme, *path;
-
-    stream = apr_pcalloc(r->pool, sizeof(*stream));
-
-    stream->pool = r->pool;
-    stream->url = url;
-    stream->r = r;
-    stream->session = session;
-    stream->state = H2_STREAM_ST_IDLE;
+    nghttp2_settings_entry settings[2];
+    int rv, add_conn_window;
     
-    stream->input = apr_brigade_create(stream->pool, session->c->bucket_alloc);
-    stream->output = apr_brigade_create(stream->pool, session->c->bucket_alloc);
+    settings[0].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH;
+    settings[0].value = 0;
+    settings[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
+    settings[1].value = (1 << session->window_bits_default) - 1;
     
-    stream->req = h2_request_create(1, stream->pool, 0);
-
-    apr_uri_parse(stream->pool, url, &puri);
-    scheme = (strcmp(puri.scheme, "h2")? "http" : "https");
-    authority = puri.hostname;
-    if (!ap_strchr_c(authority, ':') && puri.port
-        && apr_uri_port_of_scheme(scheme) != puri.port) {
-        /* port info missing and port is not default for scheme: append */
-        authority = apr_psprintf(stream->pool, "%s:%d", authority, puri.port);
-    }
-    path = apr_uri_unparse(stream->pool, &puri, APR_URI_UNP_OMITSITEPART);
-    h2_request_make(stream->req, stream->pool, r->method, scheme,
-                    authority, path, r->headers_in);
-
-    /* Tuck away all already existing cookies */
-    stream->saves = apr_table_make(r->pool, 2);
-    apr_table_do(add_header, stream->saves, r->headers_out,"Set-Cookie", NULL);
-
-    *pstream = stream;
+    rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE, settings, 
+                                 H2_ALEN(settings));
     
-    return APR_SUCCESS;
+    /* If the connection window is larger than our default, trigger a WINDOW_UPDATE */
+    add_conn_window = ((1 << session->window_bits_connection) - 1 -
+                       NGHTTP2_INITIAL_CONNECTION_WINDOW_SIZE);
+    if (!rv && add_conn_window != 0) {
+        rv = nghttp2_submit_window_update(session->ngh2, NGHTTP2_FLAG_NONE, 0, add_conn_window);
+    }
+    return rv? APR_EGENERAL : APR_SUCCESS;
 }
 
 static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade *bb)
@@ -595,69 +615,57 @@ static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade *
     }
     
     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, 
-                  "h2_session(%ld): fed %ld bytes of input", session->c->id, (long)readlen);
+                  "h2_session(%s): fed %ld bytes of input to session", 
+                  session->id, (long)readlen);
     if (readlen == 0 && status == APR_SUCCESS) {
         return APR_EAGAIN;
     }
     return status;
 }
 
-
-static apr_status_t stream_loop(h2_proxy_stream *stream) 
+static apr_status_t open_stream(h2_proxy_session *session, const char *url,
+                                request_rec *r, h2_proxy_stream **pstream)
 {
-    h2_proxy_session *session = stream->session;
-    apr_status_t status = APR_SUCCESS;
-    int want_read, want_write;
+    h2_proxy_stream *stream;
+    apr_uri_t puri;
+    const char *authority, *scheme, *path;
+
+    stream = apr_pcalloc(r->pool, sizeof(*stream));
+
+    stream->pool = r->pool;
+    stream->url = url;
+    stream->r = r;
+    stream->session = session;
+    stream->state = H2_STREAM_ST_IDLE;
     
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, 
-                  "h2_session(%ld): start loop for stream %d", 
-                  session->c->id, stream->id);
-    while ((status == APR_SUCCESS || APR_STATUS_IS_EAGAIN(status))
-           && stream->state != H2_STREAM_ST_CLOSED) {
-           
-        want_read = nghttp2_session_want_read(session->ngh2);
-        want_write = nghttp2_session_want_write(session->ngh2);
-               
-        if (want_write) {
-            int rv = nghttp2_session_send(session->ngh2);
-            if (rv < 0 && nghttp2_is_fatal(rv)) {
-                status = APR_EGENERAL;
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, 
-                              "h2_session(%ld): write, rv=%d", session->c->id, rv);
-                break;
-            }
-        }
+    stream->input = apr_brigade_create(stream->pool, session->c->bucket_alloc);
+    stream->output = apr_brigade_create(stream->pool, session->c->bucket_alloc);
+    
+    stream->req = h2_request_create(1, stream->pool, 0);
 
-        if (want_read) {
-            status = ap_get_brigade(session->c->input_filters, session->input, 
-                                    AP_MODE_READBYTES, 
-                                    (want_write? APR_NONBLOCK_READ : APR_BLOCK_READ), 
-                                    APR_BUCKET_BUFF_SIZE);
-            if (status == APR_SUCCESS) {
-                status = feed_brigade(session, session->input);
-            }
-            else if (!APR_STATUS_IS_EAGAIN(status)) {
-                ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, 
-                              "h2_session(%ld): read", session->c->id);
-                break;
-            }
-        }
-        
-        if (!want_read && !want_write) {
-            status = APR_EGENERAL;
-            break;
-        }
+    apr_uri_parse(stream->pool, url, &puri);
+    scheme = (strcmp(puri.scheme, "h2")? "http" : "https");
+    authority = puri.hostname;
+    if (!ap_strchr_c(authority, ':') && puri.port
+        && apr_uri_port_of_scheme(scheme) != puri.port) {
+        /* port info missing and port is not default for scheme: append */
+        authority = apr_psprintf(stream->pool, "%s:%d", authority, puri.port);
     }
+    path = apr_uri_unparse(stream->pool, &puri, APR_URI_UNP_OMITSITEPART);
+    h2_request_make(stream->req, stream->pool, r->method, scheme,
+                    authority, path, r->headers_in);
+
+    /* Tuck away all already existing cookies */
+    stream->saves = apr_table_make(r->pool, 2);
+    apr_table_do(add_header, stream->saves, r->headers_out,"Set-Cookie", NULL);
+
+    *pstream = stream;
     
-    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, 
-                  "h2_session(%ld): end loop for stream %d", 
-                  session->c->id, stream->id);
-    return status;
+    return APR_SUCCESS;
 }
 
-apr_status_t h2_proxy_stream_process(h2_proxy_stream *stream)
+static apr_status_t submit_stream(h2_proxy_session *session, h2_proxy_stream *stream)
 {
-    h2_proxy_session *session = stream->session;
     h2_ngheader *hd;
     nghttp2_data_provider *pp = NULL;
     nghttp2_data_provider provider;
@@ -685,16 +693,525 @@ apr_status_t h2_proxy_stream_process(h2_proxy_stream *stream)
         const char *task_id = apr_table_get(stream->r->connection->notes, 
                                             H2_TASK_ID_NOTE);
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
-                      "h2_session(%ld): submit %s%s -> %d (task %s)", 
-                      session->c->id, stream->req->authority, stream->req->path,
+                      "h2_session(%s): submit %s%s -> %d (task %s)", 
+                      session->id, stream->req->authority, stream->req->path,
                       rv, task_id);
     }
+    else {
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
+                      "h2_session(%s-%d): submit %s%s", 
+                      session->id, rv, stream->req->authority, stream->req->path);
+    }
+    
     if (rv > 0) {
         stream->id = rv;
         stream->state = H2_STREAM_ST_OPEN;
+        h2_iq_add(session->streams, stream->id, NULL, NULL);
+        dispatch_event(session, H2_PROXYS_EV_STREAM_SUBMITTED, rv, NULL);
         
-        return stream_loop(stream);
+        return APR_SUCCESS;
     }
     return APR_EGENERAL;
 }
 
+static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block)
+{
+    apr_status_t status;
+    status = ap_get_brigade(session->c->input_filters, session->input, 
+                            AP_MODE_READBYTES, 
+                            block? APR_BLOCK_READ : APR_NONBLOCK_READ, 
+                            APR_BUCKET_BUFF_SIZE);
+    if (status == APR_SUCCESS) {
+        if (APR_BRIGADE_EMPTY(session->input)) {
+            status = APR_EAGAIN;
+        }
+        else {
+            feed_brigade(session, session->input);
+        }
+    }
+    else if (!APR_STATUS_IS_EAGAIN(status)) {
+        dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, 0, NULL);
+    }
+    return status;
+}
+
+apr_status_t h2_proxy_session_submit(h2_proxy_session *session, 
+                                     const char *url, request_rec *r)
+{
+    h2_proxy_stream *stream;
+    apr_status_t status;
+    
+    status = open_stream(session, url, r, &stream);
+    if (status == OK) {
+        ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
+                      "process stream(%d): %s %s%s, original: %s", 
+                      stream->id, stream->req->method, 
+                      stream->req->authority, stream->req->path, 
+                      r->the_request);
+        status = submit_stream(session, stream);
+    }
+    return status;
+}
+
+static apr_status_t check_suspended(h2_proxy_session *session)
+{
+    h2_proxy_stream *stream;
+    int i, stream_id;
+    apr_status_t status;
+    
+    for (i = 0; i < session->suspended->nelts; ++i) {
+        stream_id = session->suspended->elts[i];
+        stream = nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
+        if (stream) {
+            status = ap_get_brigade(stream->r->input_filters, stream->input,
+                                    AP_MODE_READBYTES, APR_NONBLOCK_READ,
+                                    APR_BUCKET_BUFF_SIZE);
+            if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(stream->input)) {
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c, 
+                              "h2_proxy_stream(%s-%d): resuming", 
+                              session->id, stream_id);
+                stream->suspended = 0;
+                h2_iq_remove(session->suspended, stream_id);
+                nghttp2_session_resume_data(session->ngh2, stream_id);
+                dispatch_event(session, H2_PROXYS_EV_STREAM_RESUMED, 0, NULL);
+                check_suspended(session);
+                return APR_SUCCESS;
+            }
+            else if (status != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(status)) {
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c, 
+                              "h2_proxy_stream(%s-%d): check input", 
+                              session->id, stream_id);
+                h2_iq_remove(session->suspended, stream_id);
+                dispatch_event(session, H2_PROXYS_EV_STREAM_RESUMED, 0, NULL);
+                check_suspended(session);
+                return APR_SUCCESS;
+            }
+        }
+        else {
+            /* gone? */
+            h2_iq_remove(session->suspended, stream_id);
+            check_suspended(session);
+            return APR_SUCCESS;
+        }
+    }
+    return APR_EAGAIN;
+}
+
+static apr_status_t session_shutdown(h2_proxy_session *session, int reason, 
+                                     const char *msg)
+{
+    apr_status_t status = APR_SUCCESS;
+    const char *err = msg;
+    
+    AP_DEBUG_ASSERT(session);
+    if (!err && reason) {
+        err = nghttp2_strerror(reason);
+    }
+    nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 0, 
+                          reason, (uint8_t*)err, err? strlen(err):0);
+    status = nghttp2_session_send(session->ngh2);
+    dispatch_event(session, H2_PROXYS_EV_LOCAL_GOAWAY, reason, err);
+    return status;
+}
+
+
+static const char *StateNames[] = {
+    "INIT",      /* H2_PROXYS_ST_INIT */
+    "DONE",      /* H2_PROXYS_ST_DONE */
+    "IDLE",      /* H2_PROXYS_ST_IDLE */
+    "BUSY",      /* H2_PROXYS_ST_BUSY */
+    "WAIT",      /* H2_PROXYS_ST_WAIT */
+    "LSHUTDOWN", /* H2_PROXYS_ST_LOCAL_SHUTDOWN */
+    "RSHUTDOWN", /* H2_PROXYS_ST_REMOTE_SHUTDOWN */
+};
+
+static const char *state_name(h2_proxys_state state)
+{
+    if (state >= (sizeof(StateNames)/sizeof(StateNames[0]))) {
+        return "unknown";
+    }
+    return StateNames[state];
+}
+
+static int is_accepting_streams(h2_proxy_session *session)
+{
+    switch (session->state) {
+        case H2_PROXYS_ST_IDLE:
+        case H2_PROXYS_ST_BUSY:
+        case H2_PROXYS_ST_WAIT:
+            return 1;
+        default:
+            return 0;
+    }
+}
+
+static void transit(h2_proxy_session *session, const char *action, 
+                    h2_proxys_state nstate)
+{
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO()
+                  "h2_proxy_session(%s): transit [%s] -- %s --> [%s]", session->id,
+                  state_name(session->state), action, state_name(nstate));
+    session->state = nstate;
+}
+
+static void ev_init(h2_proxy_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_PROXYS_ST_INIT:
+            if (h2_iq_empty(session->streams)) {
+                transit(session, "init", H2_PROXYS_ST_IDLE);
+            }
+            else {
+                transit(session, "init", H2_PROXYS_ST_BUSY);
+            }
+            break;
+
+        default:
+            /* nop */
+            break;
+    }
+}
+
+static void ev_local_goaway(h2_proxy_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_PROXYS_ST_LOCAL_SHUTDOWN:
+            /* already did that? */
+            break;
+        case H2_PROXYS_ST_IDLE:
+        case H2_PROXYS_ST_REMOTE_SHUTDOWN:
+            /* all done */
+            transit(session, "local goaway", H2_PROXYS_ST_DONE);
+            break;
+        default:
+            transit(session, "local goaway", H2_PROXYS_ST_LOCAL_SHUTDOWN);
+            break;
+    }
+}
+
+static void ev_remote_goaway(h2_proxy_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_PROXYS_ST_REMOTE_SHUTDOWN:
+            /* already received that? */
+            break;
+        case H2_PROXYS_ST_IDLE:
+        case H2_PROXYS_ST_LOCAL_SHUTDOWN:
+            /* all done */
+            transit(session, "remote goaway", H2_PROXYS_ST_DONE);
+            break;
+        default:
+            transit(session, "remote goaway", H2_PROXYS_ST_REMOTE_SHUTDOWN);
+            break;
+    }
+}
+
+static void ev_conn_error(h2_proxy_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_PROXYS_ST_INIT:
+        case H2_PROXYS_ST_DONE:
+        case H2_PROXYS_ST_LOCAL_SHUTDOWN:
+            /* just leave */
+            transit(session, "conn error", H2_PROXYS_ST_DONE);
+            break;
+        
+        default:
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+                          "h2_session(%s): conn error -> shutdown", session->id);
+            session_shutdown(session, arg, msg);
+            break;
+    }
+}
+
+static void ev_proto_error(h2_proxy_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_PROXYS_ST_DONE:
+        case H2_PROXYS_ST_LOCAL_SHUTDOWN:
+            /* just leave */
+            transit(session, "proto error", H2_PROXYS_ST_DONE);
+            break;
+        
+        default:
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+                          "h2_session(%s): proto error -> shutdown", session->id);
+            session_shutdown(session, arg, msg);
+            break;
+    }
+}
+
+static void ev_conn_timeout(h2_proxy_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_PROXYS_ST_LOCAL_SHUTDOWN:
+            transit(session, "conn timeout", H2_PROXYS_ST_DONE);
+            break;
+        default:
+            session_shutdown(session, arg, msg);
+            transit(session, "conn timeout", H2_PROXYS_ST_DONE);
+            break;
+    }
+}
+
+static void ev_no_io(h2_proxy_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_PROXYS_ST_BUSY:
+        case H2_PROXYS_ST_LOCAL_SHUTDOWN:
+        case H2_PROXYS_ST_REMOTE_SHUTDOWN:
+            /* nothing for input and output to do. If we remain
+             * in this state, we go into a tight loop and suck up
+             * CPU cycles. Ideally, we'd like to do a blocking read, but that
+             * is not possible if we have scheduled tasks and wait
+             * for them to produce something. */
+            if (h2_iq_empty(session->streams)) {
+                if (!is_accepting_streams(session)) {
+                    /* We are no longer accepting new streams and have
+                     * finished processing existing ones. Time to leave. */
+                    session_shutdown(session, arg, msg);
+                    transit(session, "no io", H2_PROXYS_ST_DONE);
+                }
+                else {
+                    /* When we have no streams, no task event are possible,
+                     * switch to blocking reads */
+                    transit(session, "no io", H2_PROXYS_ST_IDLE);
+                }
+            }
+            else {
+                /* Unable to do blocking reads, as we wait on events from
+                 * task processing in other threads. Do a busy wait with
+                 * backoff timer. */
+                transit(session, "no io", H2_PROXYS_ST_WAIT);
+            }
+            break;
+        default:
+            /* nop */
+            break;
+    }
+}
+
+static void ev_stream_submitted(h2_proxy_session *session, int stream_id, 
+                                const char *msg)
+{
+    switch (session->state) {
+        case H2_PROXYS_ST_IDLE:
+        case H2_PROXYS_ST_WAIT:
+            transit(session, "stream submitted", H2_PROXYS_ST_BUSY);
+            break;
+        default:
+            /* nop */
+            break;
+    }
+}
+
+static void ev_stream_done(h2_proxy_session *session, int stream_id, 
+                           const char *msg)
+{
+    h2_proxy_stream *stream;
+    
+    stream = nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
+    if (stream) {
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
+                      "h2_proxy_sesssion(%s): stream(%d) closed", 
+                      session->id, stream_id);
+        if (!stream->data_received) {
+            /* last chance to manipulate response headers.
+             * after this, only trailers */
+            stream->data_received = 1;
+        }
+        stream->state = H2_STREAM_ST_CLOSED;
+        h2_iq_remove(session->streams, stream_id);
+        h2_iq_remove(session->suspended, stream_id);
+        if (session->done) {
+            session->done(session, stream->r);
+        }
+    }
+    
+    switch (session->state) {
+        default:
+            /* nop */
+            break;
+    }
+}
+
+static void ev_stream_resumed(h2_proxy_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_PROXYS_ST_WAIT:
+            transit(session, "stream resumed", H2_PROXYS_ST_BUSY);
+            break;
+        default:
+            /* nop */
+            break;
+    }
+}
+
+static void ev_data_read(h2_proxy_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_PROXYS_ST_IDLE:
+        case H2_PROXYS_ST_WAIT:
+            transit(session, "data read", H2_PROXYS_ST_BUSY);
+            break;
+            /* fall through */
+        default:
+            /* nop */
+            break;
+    }
+}
+
+static void ev_ngh2_done(h2_proxy_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_PROXYS_ST_DONE:
+            /* nop */
+            break;
+        default:
+            transit(session, "nghttp2 done", H2_PROXYS_ST_DONE);
+            break;
+    }
+}
+
+static void ev_pre_close(h2_proxy_session *session, int arg, const char *msg)
+{
+    switch (session->state) {
+        case H2_PROXYS_ST_DONE:
+        case H2_PROXYS_ST_LOCAL_SHUTDOWN:
+            /* nop */
+            break;
+        default:
+            session_shutdown(session, arg, msg);
+            break;
+    }
+}
+
+static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev, 
+                           int arg, const char *msg)
+{
+    switch (ev) {
+        case H2_PROXYS_EV_INIT:
+            ev_init(session, arg, msg);
+            break;            
+        case H2_PROXYS_EV_LOCAL_GOAWAY:
+            ev_local_goaway(session, arg, msg);
+            break;
+        case H2_PROXYS_EV_REMOTE_GOAWAY:
+            ev_remote_goaway(session, arg, msg);
+            break;
+        case H2_PROXYS_EV_CONN_ERROR:
+            ev_conn_error(session, arg, msg);
+            break;
+        case H2_PROXYS_EV_PROTO_ERROR:
+            ev_proto_error(session, arg, msg);
+            break;
+        case H2_PROXYS_EV_CONN_TIMEOUT:
+            ev_conn_timeout(session, arg, msg);
+            break;
+        case H2_PROXYS_EV_NO_IO:
+            ev_no_io(session, arg, msg);
+            break;
+        case H2_PROXYS_EV_STREAM_SUBMITTED:
+            ev_stream_submitted(session, arg, msg);
+            break;
+        case H2_PROXYS_EV_STREAM_DONE:
+            ev_stream_done(session, arg, msg);
+            break;
+        case H2_PROXYS_EV_STREAM_RESUMED:
+            ev_stream_resumed(session, arg, msg);
+            break;
+        case H2_PROXYS_EV_DATA_READ:
+            ev_data_read(session, arg, msg);
+            break;
+        case H2_PROXYS_EV_NGH2_DONE:
+            ev_ngh2_done(session, arg, msg);
+            break;
+        case H2_PROXYS_EV_PRE_CLOSE:
+            ev_pre_close(session, arg, msg);
+            break;
+        default:
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
+                          "h2_session(%s): unknown event %d", 
+                          session->id, ev);
+            break;
+    }
+}
+
+apr_status_t h2_proxy_session_process(h2_proxy_session *session)
+{
+    apr_status_t status;
+    int have_written = 0, have_read = 0;
+
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
+                  "h2_session(%s): process", session->id);
+           
+    switch (session->state) {
+        case H2_PROXYS_ST_INIT:
+            status = session_start(session);
+            if (status == APR_SUCCESS) {
+                dispatch_event(session, H2_PROXYS_EV_INIT, 0, NULL);
+            }
+            else {
+                dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, 0, NULL);
+            }
+            break;
+            
+        case H2_PROXYS_ST_BUSY:
+        case H2_PROXYS_ST_LOCAL_SHUTDOWN:
+        case H2_PROXYS_ST_REMOTE_SHUTDOWN:
+            while (nghttp2_session_want_write(session->ngh2)) {
+                int rv = nghttp2_session_send(session->ngh2);
+                if (rv < 0 && nghttp2_is_fatal(rv)) {
+                    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
+                                  "h2_session(%s): write, rv=%d", session->id, rv);
+                    dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, 0, NULL);
+                    break;
+                }
+                have_written = 1;
+            }
+            
+            if (nghttp2_session_want_read(session->ngh2)) {
+                if (h2_proxy_session_read(session, 0) == APR_SUCCESS) {
+                    have_read = 1;
+                }
+            }
+            
+            if (!have_written && !have_read 
+                && !nghttp2_session_want_write(session->ngh2)) {
+                dispatch_event(session, H2_PROXYS_EV_NO_IO, 0, NULL);
+            }
+            break;
+            
+        case H2_PROXYS_ST_WAIT:
+            if (check_suspended(session) == APR_EAGAIN) {
+                /* no stream has become resumed. Do a blocking read with
+                 * ever increasing timeouts... */
+                if (h2_proxy_session_read(session, 0) == APR_SUCCESS) {
+                    dispatch_event(session, H2_PROXYS_EV_DATA_READ, 0, NULL);
+                }
+            }
+            break;
+            
+        case H2_PROXYS_ST_IDLE:
+            return APR_SUCCESS;
+
+        case H2_PROXYS_ST_DONE:
+            return APR_SUCCESS;
+            
+        default:
+            ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, session->c,
+                          APLOGNO()"h2_session(%s): unknown state %d", 
+                          session->id, session->state);
+            dispatch_event(session, H2_PROXYS_EV_PROTO_ERROR, 0, NULL);
+            break;
+    }
+
+
+    if (!nghttp2_session_want_read(session->ngh2)
+        && !nghttp2_session_want_write(session->ngh2)) {
+        dispatch_event(session, H2_PROXYS_EV_NGH2_DONE, 0, NULL);
+    }
+    
+    return APR_EAGAIN;
+}
+
index 38924afdee6c1a0635752098ef3cf9bc3e299369..089fd107a594885b647e7c2878741bcd881fd7ad 100644 (file)
 
 #include <nghttp2/nghttp2.h>
 
-typedef struct h2_proxy_session {
+struct h2_int_queue;
+
+typedef enum {
+    H2_PROXYS_ST_INIT,             /* send initial SETTINGS, etc. */
+    H2_PROXYS_ST_DONE,             /* finished, connection close */
+    H2_PROXYS_ST_IDLE,             /* no streams to process */
+    H2_PROXYS_ST_BUSY,             /* read/write without stop */
+    H2_PROXYS_ST_WAIT,             /* waiting for tasks reporting back */
+    H2_PROXYS_ST_LOCAL_SHUTDOWN,   /* we announced GOAWAY */
+    H2_PROXYS_ST_REMOTE_SHUTDOWN,  /* client announced GOAWAY */
+} h2_proxys_state;
+
+typedef enum {
+    H2_PROXYS_EV_INIT,             /* session was initialized */
+    H2_PROXYS_EV_LOCAL_GOAWAY,     /* we send a GOAWAY */
+    H2_PROXYS_EV_REMOTE_GOAWAY,    /* remote send us a GOAWAY */
+    H2_PROXYS_EV_CONN_ERROR,       /* connection error */
+    H2_PROXYS_EV_PROTO_ERROR,      /* protocol error */
+    H2_PROXYS_EV_CONN_TIMEOUT,     /* connection timeout */
+    H2_PROXYS_EV_NO_IO,            /* nothing has been read or written */
+    H2_PROXYS_EV_STREAM_SUBMITTED, /* stream has been submitted */
+    H2_PROXYS_EV_STREAM_DONE,      /* stream has been finished */
+    H2_PROXYS_EV_STREAM_RESUMED,   /* stream signalled availability of headers/data */
+    H2_PROXYS_EV_DATA_READ,        /* connection data has been read */
+    H2_PROXYS_EV_NGH2_DONE,        /* nghttp2 wants neither read nor write anything */
+    H2_PROXYS_EV_PRE_CLOSE,        /* connection will close after this */
+} h2_proxys_event_t;
+
+
+typedef struct h2_proxy_session h2_proxy_session;
+typedef void h2_proxy_request_done(h2_proxy_session *s, request_rec *r);
+
+struct h2_proxy_session {
+    const char *id;
     conn_rec *c;
     proxy_conn_rec *p_conn;
     proxy_server_conf *conf;
     apr_pool_t *pool;
     nghttp2_session *ngh2;   /* the nghttp2 session itself */
     
+    h2_proxy_request_done *done;
+    void *user_data;
+    
     int window_bits_default;
     int window_bits_connection;
 
-    unsigned int goaway_recvd : 1;
-    unsigned int goaway_sent : 1;
-    
+    h2_proxys_state state;
+
+    struct h2_int_queue *streams;
+    struct h2_int_queue *suspended;
+    apr_size_t remote_max_concurrent;
     int max_stream_recv;
     
     apr_bucket_brigade *input;
     apr_bucket_brigade *output;
-} h2_proxy_session;
-
-typedef struct h2_proxy_stream {
-    int id;
-    apr_pool_t *pool;
-    h2_proxy_session *session;
-
-    const char *url;
-    request_rec *r;
-    h2_request *req;
-
-    h2_stream_state_t state;
-    unsigned int data_received : 1;
-
-    apr_bucket_brigade *input;
-    apr_bucket_brigade *output;
-    
-    apr_table_t *saves;
-} h2_proxy_stream;
+};
 
+h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
+                                         proxy_server_conf *conf,
+                                         h2_proxy_request_done *done);
 
-h2_proxy_session *h2_proxy_session_setup(request_rec *r, proxy_conn_rec *p_connm,
-                                         proxy_server_conf *conf);
+apr_status_t h2_proxy_session_submit(h2_proxy_session *s, const char *url,
+                                     request_rec *r);
+                                     
+apr_status_t h2_proxy_session_process(h2_proxy_session *s);
 
-apr_status_t h2_proxy_session_open_stream(h2_proxy_session *s, const char *url,
-                                          request_rec *r, h2_proxy_stream **pstream);
-apr_status_t h2_proxy_stream_process(h2_proxy_stream *stream);
 
 #define H2_PROXY_REQ_URL_NOTE   "h2-proxy-req-url"
 
index 0eb68a54bee0e41bf9ffa0eab82aa45c3c132183..cd49843a0e735c3c7caf3b1cebddc584d9f8ff53 100644 (file)
@@ -103,8 +103,6 @@ h2_stream *h2_session_open_stream(h2_session *session, int stream_id)
     return stream;
 }
 
-#ifdef H2_NG2_STREAM_API
-
 /**
  * Determine the importance of streams when scheduling tasks.
  * - if both stream depend on the same one, compare weights
@@ -158,20 +156,6 @@ static int stream_pri_cmp(int sid1, int sid2, void *ctx)
     return spri_cmp(sid1, s1, sid2, s2, session);
 }
 
-#else /* ifdef H2_NG2_STREAM_API */
-
-/* In absence of nghttp2_stream API, which gives information about
- * priorities since nghttp2 1.3.x, we just sort the streams by
- * their identifier, aka. order of arrival.
- */
-static int stream_pri_cmp(int sid1, int sid2, void *ctx)
-{
-    (void)ctx;
-    return sid1 - sid2;
-}
-
-#endif /* (ifdef else) H2_NG2_STREAM_API */
-
 static apr_status_t stream_schedule(h2_session *session,
                                     h2_stream *stream, int eos)
 {
index 9fe003247d72dc18e1ffde699d89a79a478a43e0..8ccb2794194805d5b066d5a6ac98e98291f60608 100644 (file)
@@ -298,6 +298,8 @@ apr_status_t h2_task_freeze(h2_task *task, request_rec *r)
         task->frozen = 1;
         task->frozen_out = apr_brigade_create(c->pool, c->bucket_alloc);
         ap_add_output_filter("H2_RESPONSE_FREEZE", task, r, r->connection);
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->output->c, 
+                      "h2_task(%s), frozen", task->id);
     }
     return APR_SUCCESS;
 }
@@ -306,6 +308,8 @@ apr_status_t h2_task_thaw(h2_task *task)
 {
     if (task->frozen) {
         task->frozen = 0;
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, task->output->c, 
+                      "h2_task(%s), thawed", task->id);
     }
     return APR_SUCCESS;
 }
index 953433d45936e2a85aa1309c09a76030474f46d2..7409c363db6a83fba923a7cd519349bdba4a0d97 100644 (file)
@@ -50,6 +50,7 @@ h2_task_input *h2_task_input_create(h2_task *task, conn_rec *c)
         input->c = c;
         input->task = task;
         input->bb = NULL;
+        input->block = APR_BLOCK_READ;
         
         if (task->ser_headers) {
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
@@ -75,6 +76,11 @@ h2_task_input *h2_task_input_create(h2_task *task, conn_rec *c)
     return input;
 }
 
+void h2_task_input_block_set(h2_task_input *input, apr_read_type_e block)
+{
+    input->block = block;
+}
+
 apr_status_t h2_task_input_read(h2_task_input *input,
                                 ap_filter_t* f,
                                 apr_bucket_brigade* bb,
@@ -115,7 +121,7 @@ apr_status_t h2_task_input_read(h2_task_input *input,
         return APR_EOF;
     }
     
-    while ((bblen == 0) || (mode == AP_MODE_READBYTES && bblen < readbytes)) {
+    while (bblen == 0) {
         /* Get more data for our stream from mplx.
          */
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
@@ -124,27 +130,31 @@ apr_status_t h2_task_input_read(h2_task_input *input,
                       input->task->id, block, 
                       (long)readbytes, (long)bblen);
         
-        /* Although we sometimes get called with APR_NONBLOCK_READs, 
-         we need to fill our buffer blocking. Otherwise we get EAGAIN,
-         return that to our caller and everyone throws up their hands,
-         never calling us again. */
-        status = h2_mplx_in_read(input->task->mplx, APR_BLOCK_READ,
+        /* Override the block mode we get called with depending on the input's
+         * setting. 
+         */
+        status = h2_mplx_in_read(input->task->mplx, block,
                                  input->task->stream_id, input->bb, 
                                  f->r? f->r->trailers_in : NULL, 
                                  input->task->io);
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
                       "h2_task_input(%s): mplx in read returned",
                       input->task->id);
-        if (status != APR_SUCCESS) {
+        if (APR_STATUS_IS_EAGAIN(status) 
+            && (mode == AP_MODE_GETLINE || block == APR_BLOCK_READ)) {
+            /* chunked input handling does not seem to like it if we
+             * return with APR_EAGAIN from a GETLINE read... 
+             * upload 100k test on test-ser.example.org hangs */
+            status = APR_SUCCESS;
+        }
+        else if (status != APR_SUCCESS) {
             return status;
         }
+        
         status = apr_brigade_length(input->bb, 1, &bblen);
         if (status != APR_SUCCESS) {
             return status;
         }
-        if ((bblen == 0) && (block == APR_NONBLOCK_READ)) {
-            return h2_util_has_eos(input->bb, -1)? APR_EOF : APR_EAGAIN;
-        }
         
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, f->c,
                       "h2_task_input(%s): mplx in read, %ld bytes in brigade",
index 6085b78c8f8e3d00996c07467ff05f9409475231..1488dcab49e90aba1d27274ab242e292e040ed2b 100644 (file)
@@ -29,6 +29,7 @@ struct h2_task_input {
     conn_rec *c;
     struct h2_task *task;
     apr_bucket_brigade *bb;
+    apr_read_type_e block;
 };
 
 
@@ -41,4 +42,6 @@ apr_status_t h2_task_input_read(h2_task_input *input,
                                   apr_read_type_e block,
                                   apr_off_t readbytes);
 
+void h2_task_input_block_set(h2_task_input *input, apr_read_type_e block);
+
 #endif /* defined(__mod_h2__h2_task_input__) */
index db717d9dc5093e2bf6afc649227951f79c904d0c..b64683ab17c0e06aaac53ad3370f5694a83a9659 100644 (file)
@@ -1110,7 +1110,7 @@ int h2_util_frame_print(const nghttp2_frame *frame, char *buffer, size_t maxlen)
             size_t len = (frame->goaway.opaque_data_len < s_len)?
             frame->goaway.opaque_data_len : s_len-1;
             memcpy(scratch, frame->goaway.opaque_data, len);
-            scratch[len+1] = '\0';
+            scratch[len] = '\0';
             return apr_snprintf(buffer, maxlen, "GOAWAY[error=%d, reason='%s']",
                                 frame->goaway.error_code, scratch);
         }
index 56feec118ddbe4dfe1fa0c41e371f82d52f4764b..fd0e9bae065e0cdcfe0cdc57b529958b152191dc 100644 (file)
@@ -61,6 +61,7 @@ static void* APR_THREAD_FUNC execute(apr_thread_t *thread, void *wctx)
         
         while (req) {
             conn_rec *c, *master = m->c;
+            h2_task *task;
             int stream_id = req->id;
             
             if (!task_pool) {
@@ -81,38 +82,32 @@ static void* APR_THREAD_FUNC execute(apr_thread_t *thread, void *wctx)
             
             c = h2_slave_create(master, task_pool, 
                                 worker->thread, worker->socket);
-            if (!c) {
-                ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, c,
-                              APLOGNO(02957) "h2_request(%ld-%d): error setting up slave connection", 
-                              m->id, stream_id);
-                h2_mplx_out_rst(m, stream_id, H2_ERR_INTERNAL_ERROR);
+                
+            task = h2_task_create(m->id, req, task_pool, m);
+            h2_ctx_create_for(c, task);
+            
+            h2_task_do(task, c, worker->io, worker->socket);
+            
+            if (task->frozen) {
+                /* this task was handed over to someone else for processing */
+                h2_task_thaw(task);
+                task_pool = NULL;
+                req = NULL;
+                h2_mplx_request_done(m, 0, worker->aborted? NULL : &req);
             }
             else {
-                h2_task *task;
-                
-                task = h2_task_create(m->id, req, task_pool, m);
-                h2_ctx_create_for(c, task);
-                h2_task_do(task, c, worker->io, worker->socket);
-                
-                if (task->frozen) {
-                    /* this task was handed over to someone else for
-                     * processing */
-                    task_pool = NULL;
-                }
-                task = NULL;
+                /* clean our references and report request as done. Signal
+                 * that we want another unless we have been aborted */
+                /* TODO: this will keep a worker attached to this h2_mplx as
+                 * long as it has requests to handle. Might no be fair to
+                 * other mplx's. Perhaps leave after n requests? */
                 apr_thread_cond_signal(worker->io);
+                if (task_pool) {
+                    apr_pool_clear(task_pool);
+                }
+                req = NULL;
+                h2_mplx_request_done(m, stream_id, worker->aborted? NULL : &req);
             }
-            
-            /* clean our references and report request as done. Signal
-             * that we want another unless we have been aborted */
-            /* TODO: this will keep a worker attached to this h2_mplx as
-             * long as it has requests to handle. Might no be fair to
-             * other mplx's. Perhaps leave after n requests? */
-            req = NULL;
-            if (task_pool) {
-                apr_pool_clear(task_pool);
-            }
-            h2_mplx_request_done(&m, stream_id, worker->aborted? NULL : &req);
         }
     }
 
index f7606dc4ed2355001907933f5d121fc6adae5321..9d648e9948cab02a150e7b8f7d9031b9890dfb51 100644 (file)
@@ -26,7 +26,6 @@
 #include "h2_private.h"
 #include "h2_mplx.h"
 #include "h2_request.h"
-#include "h2_task_queue.h"
 #include "h2_worker.h"
 #include "h2_workers.h"
 
index 7ec38813101ce9abec18ff5d84934f0750d41961..c6a24cab1496450080c1791fd9482b14b9da3a3f 100644 (file)
@@ -27,7 +27,6 @@ struct apr_thread_cond_t;
 struct h2_mplx;
 struct h2_request;
 struct h2_task;
-struct h2_task_queue;
 
 typedef struct h2_workers h2_workers;
 
index 49de804bb6f7b1bb42af7cff3b2cf813e3b44e4e..87cac2f678d63ccf024a333a36abf601badd759e 100644 (file)
@@ -132,50 +132,24 @@ static apr_status_t http2_req_engine_push(const char *engine_type,
                                           request_rec *r, 
                                           h2_req_engine_init *einit)
 {
-    h2_ctx *ctx = h2_ctx_rget(r);
-    if (ctx) {
-        h2_task *task = h2_ctx_get_task(ctx);
-        if (task) {
-            return h2_mplx_engine_push(task->mplx, task, engine_type, r, einit);
-        }
-    }
-    return APR_EINVAL;
+    return h2_mplx_engine_push(engine_type, r, einit);
 }
 
 static apr_status_t http2_req_engine_pull(h2_req_engine *engine, 
-                                          apr_time_t timeout, request_rec **pr)
+                                          apr_read_type_e block, 
+                                          request_rec **pr)
 {
-    h2_ctx *ctx = h2_ctx_get(engine->c, 0);
-    if (ctx) {
-        h2_task *task = h2_ctx_get_task(ctx);
-        if (task) {
-            return h2_mplx_engine_pull(task->mplx, task, engine, timeout, pr);
-        }
-    }
-    return APR_ECONNABORTED;
+    return h2_mplx_engine_pull(engine, block, pr);
 }
 
 static void http2_req_engine_done(h2_req_engine *engine, conn_rec *r_conn)
 {
-    h2_ctx *ctx = h2_ctx_get(r_conn, 0);
-    if (ctx) {
-        h2_task *task = h2_ctx_get_task(ctx);
-        if (task) {
-            h2_mplx_engine_done(task->mplx, task, r_conn);
-            /* task is destroyed */
-        }
-    }
+    h2_mplx_engine_done(engine, r_conn);
 }
 
 static void http2_req_engine_exit(h2_req_engine *engine)
 {
-    h2_ctx *ctx = h2_ctx_get(engine->c, 0);
-    if (ctx) {
-        h2_task *task = h2_ctx_get_task(ctx);
-        if (task) {
-            h2_mplx_engine_exit(task->mplx, task, engine);
-        }
-    }
+    h2_mplx_engine_exit(engine);
 }
 
 
index a1dfec4700e88e3d0148105206bf2663fe14d4cb..0431548a961a3e987e411392b2955c77146b6740 100644 (file)
@@ -141,6 +141,10 @@ SOURCE=./h2_h2.c
 # End Source File
 # Begin Source File
 
+SOURCE=./h2_int_queue.c
+# End Source File
+# Begin Source File
+
 SOURCE=./h2_io.c
 # End Source File
 # Begin Source File
@@ -193,10 +197,6 @@ SOURCE=./h2_task_output.c
 # End Source File
 # Begin Source File
 
-SOURCE=./h2_task_queue.c
-# End Source File
-# Begin Source File
-
 SOURCE=./h2_util.c
 # End Source File
 # Begin Source File
index 8055320ef5a5c864cbfa95c15fb8edf9dfc9005d..edacd0f134963f0454e29f8dc4990456153964bb 100644 (file)
@@ -29,9 +29,11 @@ APR_DECLARE_OPTIONAL_FN(int,
 
 
 /*******************************************************************************
- * HTTP/2 slave engines
+ * HTTP/2 request engines
  ******************************************************************************/
  
+struct apr_thread_cond_t;
+
 typedef struct h2_req_engine h2_req_engine;
 
 /**
@@ -45,25 +47,17 @@ typedef apr_status_t h2_req_engine_init(h2_req_engine *engine, request_rec *r);
 
 /**
  * The public structure of a h2_req_engine. It gets allocated by the http2
- * infrastructure, assigned id, type, pool and connection and passed to the
+ * infrastructure, assigned id, type, pool, io and connection and passed to the
  * h2_req_engine_init() callback to complete initialization.
  * This happens whenever a new request gets "push"ed for an engine type and
  * no instance, or no free instance, for the type is available.
  */
 struct h2_req_engine {
-    int id;                /* identifier, unique for a master connection */
-    const char *type;      /* name of the engine type */
+    const char *id;        /* identifier */
     apr_pool_t *pool;      /* pool for engine specific allocations */
-    conn_rec *c;           /* connection this engine is assigned to */
-    apr_size_t r_capacity; /* request capacity engine is willing to handle,
-                              may change between invocations. If the engine
-                              sets this to 0, it signals that it no longer
-                              wants more requests. New requests, already 
-                              scheduled for this engine might still arrive for
-                              a time. */
-    apr_size_t r_count;    /* number of request currently assigned, it is the
-                              responsibility of the engine to update this. */
-    void *data;            /* engine specific data */
+    const char *type;      /* name of the engine type */
+    apr_size_t capacity;   /* number of max assigned requests */
+    void *user_data;       /* user specific data */
 };
 
 /**
@@ -95,7 +89,7 @@ APR_DECLARE_OPTIONAL_FN(apr_status_t,
  */
 APR_DECLARE_OPTIONAL_FN(apr_status_t, 
                         http2_req_engine_pull, (h2_req_engine *engine, 
-                                                apr_time_t timeout,
+                                                apr_read_type_e block,
                                                 request_rec **pr));
 APR_DECLARE_OPTIONAL_FN(void, 
                         http2_req_engine_done, (h2_req_engine *engine, 
index 1130d357c59fb56bdfb022f44a7614674d6a7ebd..dbb041cb4b60d385131ac6b713de9c6d75d0a84e 100644 (file)
@@ -21,6 +21,7 @@
 
 
 #include "mod_proxy_http2.h"
+#include "h2_int_queue.h"
 #include "h2_request.h"
 #include "h2_util.h"
 #include "h2_version.h"
@@ -43,12 +44,13 @@ static int (*is_h2)(conn_rec *c);
 static apr_status_t (*req_engine_push)(const char *name, request_rec *r, 
                                        h2_req_engine_init *einit);
 static apr_status_t (*req_engine_pull)(h2_req_engine *engine, 
-                                       apr_time_t timeout, request_rec **pr);
+                                       apr_read_type_e block, request_rec **pr);
 static void (*req_engine_done)(h2_req_engine *engine, conn_rec *r_conn);
 static void (*req_engine_exit)(h2_req_engine *engine);
                                        
 typedef struct h2_proxy_ctx {
     conn_rec *owner;
+    request_rec *rbase;
     server_rec *server;
     const char *proxy_func;
     char server_portstr[32];
@@ -189,19 +191,51 @@ static int proxy_http2_canon(request_rec *r, char *url)
 
 static apr_status_t proxy_engine_init(h2_req_engine *engine, request_rec *r)
 {
-    h2_proxy_ctx *ctx = ap_get_module_config(engine->c->conn_config, 
+    h2_proxy_ctx *ctx = ap_get_module_config(r->connection->conn_config, 
                                              &proxy_http2_module);
     if (ctx) {
+        engine->capacity = 20; /* conservative guess until we know */
         ctx->engine = engine;
         return APR_SUCCESS;
     }
+    ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r, 
+                  "h2_proxy_session, engine init, no ctx found");
     return APR_ENOTIMPL;
 }
 
-static int proxy_engine_run(h2_proxy_ctx *ctx, request_rec *r) {
-    int status = OK;
+static apr_status_t add_request(h2_proxy_session *session, request_rec *r)
+{
+    h2_proxy_ctx *ctx = session->user_data;
+    const char *url;
+    apr_status_t status;
+
+    url = apr_table_get(r->notes, H2_PROXY_REQ_URL_NOTE);
+    status = h2_proxy_session_submit(session, url, r);
+    if (status != OK) {
+        ap_log_cerror(APLOG_MARK, APLOG_ERR, status, r->connection, APLOGNO()
+                      "pass request body failed to %pI (%s) from %s (%s)",
+                      ctx->p_conn->addr, ctx->p_conn->hostname ? 
+                      ctx->p_conn->hostname: "", session->c->client_ip, 
+                      session->c->remote_host ? session->c->remote_host: "");
+    }
+    return status;
+}
+
+static void request_done(h2_proxy_session *session, request_rec *r)
+{
+    h2_proxy_ctx *ctx = session->user_data;
+    
+    if (req_engine_done && r != ctx->rbase) {
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r->connection, 
+                      "h2_proxy_session(%s): request %s",
+                      ctx->engine->id, r->the_request);
+        req_engine_done(ctx->engine, r->connection);
+    }
+}
+
+static apr_status_t proxy_engine_run(h2_proxy_ctx *ctx, request_rec *r) {
+    apr_status_t status = OK;
     h2_proxy_session *session;
-    h2_proxy_stream *stream;
     
     /* Step Two: Make the Connection (or check that an already existing
      * socket is still usable). On success, we have a socket connected to
@@ -245,52 +279,45 @@ static int proxy_engine_run(h2_proxy_ctx *ctx, request_rec *r) {
     /* Step Four: Send the Request in a new HTTP/2 stream and
      * loop until we got the response or encounter errors.
      */
-    status = APR_ENOTIMPL;
-    session = h2_proxy_session_setup(r, ctx->p_conn, ctx->conf);
+    session = h2_proxy_session_setup(ctx->engine->id, ctx->p_conn, 
+                                     ctx->conf, request_done);
     if (!session) {
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, ctx->p_conn->connection, 
                       "session unavailable");
         return HTTP_SERVICE_UNAVAILABLE;
     }
     
-    while (r) {
-        conn_rec *r_conn = r->connection;
-        const char *url;
-        
-        url = apr_table_get(r->notes, H2_PROXY_REQ_URL_NOTE);
-        status = h2_proxy_session_open_stream(session, url, r, &stream);
-        if (status == OK) {
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, r_conn, 
-                          "process stream(%d): %s %s%s, original: %s", 
-                          stream->id, stream->req->method, 
-                          stream->req->authority, stream->req->path, 
-                          r->the_request);
-            status = h2_proxy_stream_process(stream);
-        }
-        r = NULL;
-        
-        if (status != OK) {
-            ap_log_cerror(APLOG_MARK, APLOG_ERR, status, r_conn, APLOGNO()
-                          "pass request body failed to %pI (%s) from %s (%s)",
-                          ctx->p_conn->addr, ctx->p_conn->hostname ? 
-                          ctx->p_conn->hostname: "", session->c->client_ip, 
-                          session->c->remote_host ? session->c->remote_host: "");
-        }
-        
-        if (!ctx->standalone && req_engine_done && r_conn != ctx->owner) {
-            req_engine_done(ctx->engine, r_conn);
-        }
-        r_conn = NULL;
+    session->user_data = ctx;
+    add_request(session, r);
+    
+    status = APR_EAGAIN;
+    while (APR_STATUS_IS_EAGAIN(status)) {
+        status = h2_proxy_session_process(session);
         
-        if (!ctx->standalone && req_engine_pull) {
-            status = req_engine_pull(ctx->engine, ctx->server->timeout, &r);
-            if (status != APR_SUCCESS) {
-                status = APR_SUCCESS;
-                break;
+        if (APR_STATUS_IS_EAGAIN(status) && !ctx->standalone) {
+            ctx->engine->capacity = session->remote_max_concurrent;
+            if (req_engine_pull(ctx->engine, APR_NONBLOCK_READ, &r) == APR_SUCCESS) {
+                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
+                              "h2_proxy_session(%s): pulled request %s", 
+                              session->id, r->the_request);
+                add_request(session, r);
             }
         }
     }
     
+    if (session->state == H2_PROXYS_ST_DONE) {
+        ctx->p_conn->close = 1;
+    }
+    
+    if (session->streams && !h2_iq_empty(session->streams)) {
+        ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, 
+                      ctx->p_conn->connection, 
+                      "session run done with %d streams unfinished",
+                      h2_iq_size(session->streams));
+    }
+    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, 
+                  ctx->p_conn->connection, "session run done");
+    session->user_data = NULL;
     return status;
 }
 
@@ -339,6 +366,7 @@ static int proxy_http2_handler(request_rec *r,
 
     ctx = apr_pcalloc(p, sizeof(*ctx));
     ctx->owner      = c;
+    ctx->rbase      = r;
     ctx->server     = s;
     ctx->proxy_func = proxy_func;
     ctx->is_ssl     = is_ssl;
@@ -387,12 +415,12 @@ static int proxy_http2_handler(request_rec *r,
          * the same backend. We may be called to create an engine ourself.
          */
         status = req_engine_push(engine_type, r, proxy_engine_init);
-        ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, 
-                      "H2: pushing request %s to engine type %s", 
-                      url, engine_type);
         if (status == APR_SUCCESS && ctx->engine == NULL) {
             /* Another engine instance has taken over processing of this
              * request. */
+            ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, 
+                          "H2: pushed request %s to engine type %s", 
+                          url, engine_type);
             goto cleanup;
         }
     }
@@ -401,25 +429,41 @@ static int proxy_http2_handler(request_rec *r,
         /* No engine was available or has been initialized, handle this
         * request just by ourself. */
         h2_req_engine *engine = apr_pcalloc(p, sizeof(*engine));
-        engine->id = 0;
+        engine->id = apr_psprintf(p, "eng-proxy-%ld", c->id);
         engine->type = engine_type;
         engine->pool = p;
-        engine->c = c;
+        engine->capacity = 1;
         ctx->engine = engine;
         ctx->standalone = 1;
+        ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, 
+                      "h2_proxy_http2(%ld): setup standalone engine for type %s", 
+                      c->id, engine_type);
+    }
+    else {
+        ap_log_rerror(APLOG_MARK, APLOG_TRACE1, status, r, 
+                      "H2: hosting engine %s for request %s", ctx->engine->id, url);
     }
     
-    status = proxy_engine_run(ctx, r);    
+    status = proxy_engine_run(ctx, r);
+    if (!ctx->standalone && status == APR_SUCCESS) { 
+        apr_status_t s2;
+        do {
+            s2 = req_engine_pull(ctx->engine, APR_BLOCK_READ, &r);
+            if (s2 == APR_SUCCESS) {
+                s2 = proxy_engine_run(ctx, r);
+            }
+        } while (s2 != APR_EOF);
+    }
 
 cleanup:
-    if (ctx->engine && !ctx->standalone && req_engine_exit) {
+    if (!ctx->standalone && ctx->engine && req_engine_exit) {
         req_engine_exit(ctx->engine);
     }
     ctx->engine = NULL;
     
     if (ctx) {
         if (ctx->p_conn) {
-            if (status != OK) {
+            if (status != APR_SUCCESS) {
                 ctx->p_conn->close = 1;
             }
             proxy_run_detach_backend(r, ctx->p_conn);