]> git.ipfire.org Git - thirdparty/apache/httpd.git/commitdiff
mod_proxy_http2: stability improvements, timeout blocking read when waiting, new...
authorStefan Eissing <icing@apache.org>
Thu, 25 Feb 2016 16:19:18 +0000 (16:19 +0000)
committerStefan Eissing <icing@apache.org>
Thu, 25 Feb 2016 16:19:18 +0000 (16:19 +0000)
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1732328 13f79535-47bb-0310-9956-ffa450edef68

modules/http2/h2_mplx.c
modules/http2/h2_proxy_session.c
modules/http2/h2_proxy_session.h
modules/http2/h2_util.c
modules/http2/h2_util.h
modules/http2/mod_proxy_http2.c

index 90765f58d79b8a240151d780756daf123aadd9da..f18b3437aa4b9982d3cde7ed05a4e4a5f93f8afb 100644 (file)
@@ -17,7 +17,6 @@
 #include <stddef.h>
 #include <stdlib.h>
 
-#include <apr_queue.h>
 #include <apr_thread_mutex.h>
 #include <apr_thread_cond.h>
 #include <apr_strings.h>
@@ -368,11 +367,14 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
             /* iterate until all ios have been orphaned or destroyed */
         }
     
-        /* Any remaining ios have handed out requests to workers that are
-         * not done yet. Any operation they do on their assigned stream ios will
-         * be errored ECONNRESET/ABORTED, so they should find out pretty soon.
+        /* If we still have busy workers, we cannot release our memory
+         * pool yet, as slave connections have child pools of their respective
+         * h2_io's.
+         * Any remaining ios are processed in these workers. Any operation 
+         * they do on their input/outputs will be errored ECONNRESET/ABORTED, 
+         * so processing them should fail and workers *should* return.
          */
-        for (i = 0; h2_io_set_size(m->stream_ios) > 0; ++i) {
+        for (i = 0; m->workers_busy > 0; ++i) {
             m->join_wait = wait;
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                           "h2_mplx(%ld): release_join, waiting on %d worker to report back", 
@@ -388,8 +390,9 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
                      */
                     ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03198)
                                   "h2_mplx(%ld): release, waiting for %d seconds now for "
-                                  "all h2_workers to return, have still %d requests outstanding", 
-                                  m->id, i*wait_secs, (int)h2_io_set_size(m->stream_ios));
+                                  "%d h2_workers to return, have still %d requests outstanding", 
+                                  m->id, i*wait_secs, m->workers_busy,
+                                  (int)h2_io_set_size(m->stream_ios));
                     if (i == 1) {
                         h2_io_set_iter(m->stream_ios, stream_print, m);
                     }
@@ -1130,10 +1133,6 @@ static void task_done(h2_mplx *m, h2_task *task)
             /* TODO: this will keep a worker attached to this h2_mplx as
              * long as it has requests to handle. Might no be fair to
              * other mplx's. Perhaps leave after n requests? */
-            
-            if (task->c) {
-                apr_pool_destroy(task->c->pool);
-            }
             task = NULL;
             if (io) {
                 io->processing_done = 1;
@@ -1176,7 +1175,17 @@ void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
 /*******************************************************************************
  * HTTP/2 request engines
  ******************************************************************************/
+
+typedef struct h2_req_entry h2_req_entry;
+struct h2_req_entry {
+    APR_RING_ENTRY(h2_req_entry) link;
+    request_rec *r;
+};
+
+#define H2_REQ_ENTRY_NEXT(e)   APR_RING_NEXT((e), link)
+#define H2_REQ_ENTRY_PREV(e)   APR_RING_PREV((e), link)
+#define H2_REQ_ENTRY_REMOVE(e) APR_RING_REMOVE((e), link)
+
 typedef struct h2_req_engine_i h2_req_engine_i;
 struct h2_req_engine_i {
     h2_req_engine pub;
@@ -1184,20 +1193,37 @@ struct h2_req_engine_i {
     h2_mplx *m;
     unsigned int shutdown : 1; /* engine is being shut down */
     apr_thread_cond_t *io;     /* condition var for waiting on data */
-    apr_queue_t *queue;        /* queue of scheduled request_rec* */
+    APR_RING_HEAD(h2_req_entries, h2_req_entry) entries;
     apr_size_t no_assigned;    /* # of assigned requests */
     apr_size_t no_live;        /* # of live */
     apr_size_t no_finished;    /* # of finished */
 };
 
+#define H2_REQ_ENTRIES_SENTINEL(b)     APR_RING_SENTINEL((b), h2_req_entry, link)
+#define H2_REQ_ENTRIES_EMPTY(b)        APR_RING_EMPTY((b), h2_req_entry, link)
+#define H2_REQ_ENTRIES_FIRST(b)        APR_RING_FIRST(b)
+#define H2_REQ_ENTRIES_LAST(b) APR_RING_LAST(b)
+
+#define H2_REQ_ENTRIES_INSERT_HEAD(b, e) do {                          \
+h2_req_entry *ap__b = (e);                                        \
+APR_RING_INSERT_HEAD((b), ap__b, h2_req_entry, link);  \
+} while (0)
+
+#define H2_REQ_ENTRIES_INSERT_TAIL(b, e) do {                          \
+h2_req_entry *ap__b = (e);                                     \
+APR_RING_INSERT_TAIL((b), ap__b, h2_req_entry, link);  \
+} while (0)
+
 static apr_status_t h2_mplx_engine_schedule(h2_mplx *m, 
                                             h2_req_engine_i *engine, 
                                             request_rec *r)
 {
-    if (!engine->queue) {
-        apr_queue_create(&engine->queue, 100, engine->pub.pool);
-    }
-    return apr_queue_trypush(engine->queue, r);
+    h2_req_entry *entry = apr_pcalloc(r->pool, sizeof(*entry));
+
+    APR_RING_ELEM_INIT(entry, link);
+    entry->r = r;
+    H2_REQ_ENTRIES_INSERT_TAIL(&engine->entries, entry);
+    return APR_SUCCESS;
 }
 
 
@@ -1263,6 +1289,7 @@ apr_status_t h2_mplx_engine_push(const char *engine_type,
                 engine->pub.pool = task->c->pool;
                 engine->pub.type = apr_pstrdup(task->c->pool, engine_type);
                 engine->c = r->connection;
+                APR_RING_INIT(&engine->entries, h2_req_entry, link);
                 engine->m = m;
                 engine->io = task->io;
                 engine->no_assigned = 1;
@@ -1283,28 +1310,19 @@ apr_status_t h2_mplx_engine_push(const char *engine_type,
     return status;
 }
 
-static request_rec *get_non_frozen(apr_queue_t *equeue)
+static h2_req_entry *pop_non_frozen(h2_req_engine_i *engine)
 {
-    request_rec *r, *first = NULL;
+    h2_req_entry *entry;
     h2_task *task;
-    void *elem;
-
-    if (equeue) {
-        /* FIFO queue, try to find a  request_rec whose task is not frozen */
-        while (apr_queue_trypop(equeue, &elem) == APR_SUCCESS) {
-            r = elem;
-            task = h2_ctx_rget_task(r);
-            AP_DEBUG_ASSERT(task);
-            if (!task->frozen) {
-                return r;
-            }
-            apr_queue_push(equeue, r);  
-            if (!first) {
-                first = r;
-            }
-            else if (r == first) {
-                return NULL; /* walked the whole queue */
-            }
+
+    for (entry = H2_REQ_ENTRIES_FIRST(&engine->entries);
+         entry != H2_REQ_ENTRIES_SENTINEL(&engine->entries);
+         entry = H2_REQ_ENTRY_NEXT(entry)) {
+        task = h2_ctx_rget_task(entry->r);
+        AP_DEBUG_ASSERT(task);
+        if (!task->frozen) {
+            H2_REQ_ENTRY_REMOVE(entry);
+            return entry;
         }
     }
     return NULL;
@@ -1313,7 +1331,7 @@ static request_rec *get_non_frozen(apr_queue_t *equeue)
 static apr_status_t engine_pull(h2_mplx *m, h2_req_engine_i *engine, 
                                 apr_read_type_e block, request_rec **pr)
 {   
-    request_rec *r;
+    h2_req_entry *entry;
     
     AP_DEBUG_ASSERT(m);
     AP_DEBUG_ASSERT(engine);
@@ -1326,20 +1344,21 @@ static apr_status_t engine_pull(h2_mplx *m, h2_req_engine_i *engine,
             return APR_EOF;
         }
         
-        if (engine->queue && (r = get_non_frozen(engine->queue))) {
-            ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
+        if (!H2_REQ_ENTRIES_EMPTY(&engine->entries) 
+            && (entry = pop_non_frozen(engine))) {
+            ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, entry->r,
                           "h2_mplx(%ld): request %s pulled by engine %s", 
-                          m->c->id, r->the_request, engine->pub.id);
+                          m->c->id, entry->r->the_request, engine->pub.id);
             engine->no_live++;
-            r->connection->current_thread = engine->c->current_thread;
-            *pr = r;
+            entry->r->connection->current_thread = engine->c->current_thread;
+            *pr = entry->r;
             return APR_SUCCESS;
         }
         else if (APR_NONBLOCK_READ == block) {
             *pr = NULL;
             return APR_EAGAIN;
         }
-        else if (!engine->queue || !apr_queue_size(engine->queue)) {
+        else if (H2_REQ_ENTRIES_EMPTY(&engine->entries)) {
             engine->shutdown = 1;
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
                           "h2_mplx(%ld): emtpy queue, shutdown engine %s", 
@@ -1409,19 +1428,21 @@ void h2_mplx_engine_exit(h2_req_engine *pub_engine)
     int acquired;
     
     if (enter_mutex(m, &acquired) == APR_SUCCESS) {
-        if (engine->queue && apr_queue_size(engine->queue)) {
-            void *entry;
+        if (!m->aborted 
+            && !H2_REQ_ENTRIES_EMPTY(&engine->entries)) {
+            h2_req_entry *entry;
             ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
                           "h2_mplx(%ld): exit engine %s (%s), "
-                          "has still %d requests queued, shutdown=%d,"
+                          "has still requests queued, shutdown=%d,"
                           "assigned=%ld, live=%ld, finished=%ld", 
                           m->c->id, engine->pub.id, engine->pub.type,
-                          (int)apr_queue_size(engine->queue),
                           engine->shutdown, 
                           (long)engine->no_assigned, (long)engine->no_live,
                           (long)engine->no_finished);
-            while (apr_queue_trypop(engine->queue, &entry) == APR_SUCCESS) {
-                request_rec *r = entry;
+            for (entry = H2_REQ_ENTRIES_FIRST(&engine->entries);
+                 entry != H2_REQ_ENTRIES_SENTINEL(&engine->entries);
+                 entry = H2_REQ_ENTRY_NEXT(entry)) {
+                request_rec *r = entry->r;
                 h2_task *task = h2_ctx_rget_task(r);
                 ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
                               "h2_mplx(%ld): engine %s has queued task %s, "
@@ -1430,7 +1451,7 @@ void h2_mplx_engine_exit(h2_req_engine *pub_engine)
                 engine_done(m, engine, task, 0, 1);
             }
         }
-        if (engine->no_assigned > 1 || engine->no_live > 1) {
+        if (!m->aborted && (engine->no_assigned > 1 || engine->no_live > 1)) {
             ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
                           "h2_mplx(%ld): exit engine %s (%s), "
                           "assigned=%ld, live=%ld, finished=%ld", 
index 9d37cbade1039e664d7c60e81ef135fe7aa5e3f1..9a1f808927f667d3b22fa48d3a7483c43f874130 100644 (file)
@@ -48,20 +48,6 @@ typedef struct h2_proxy_stream {
 } h2_proxy_stream;
 
 
-static int ngstatus_from_apr_status(apr_status_t rv)
-{
-    if (rv == APR_SUCCESS) {
-        return NGHTTP2_NO_ERROR;
-    }
-    else if (APR_STATUS_IS_EAGAIN(rv)) {
-        return NGHTTP2_ERR_WOULDBLOCK;
-    }
-    else if (APR_STATUS_IS_EOF(rv)) {
-            return NGHTTP2_ERR_EOF;
-    }
-    return NGHTTP2_ERR_PROTO;
-}
-
 static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev, 
                            int arg, const char *msg);
 
@@ -75,7 +61,7 @@ static apr_status_t proxy_session_pre_close(void *theconn)
         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, 
                       "proxy_session(%s): pool cleanup, state=%d, streams=%d",
                       session->id, session->state, 
-                      h2_iq_size(session->streams));
+                      (int)h2_ihash_count(session->streams));
         dispatch_event(session, H2_PROXYS_EV_PRE_CLOSE, 0, NULL);
         nghttp2_session_del(session->ngh2);
         session->ngh2 = NULL;
@@ -182,7 +168,7 @@ static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame,
                 char buffer[256];
                 
                 h2_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
-                ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, session->c, APLOGNO()
+                ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO()
                               "h2_session(%s): recv FRAME[%s]",
                               session->id, buffer);
             }
@@ -380,7 +366,9 @@ static int on_data_chunk_recv(nghttp2_session *ngh2, uint8_t flags,
         ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO()
                       "h2_session(%s-%d): passing output", 
                       session->id, stream->id);
-        return NGHTTP2_ERR_CALLBACK_FAILURE;
+        nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE,
+                                  stream_id, NGHTTP2_STREAM_CLOSED);
+        return NGHTTP2_ERR_STREAM_CLOSING;
     }
     return 0;
 }
@@ -488,7 +476,11 @@ static ssize_t stream_data_read(nghttp2_session *ngh2, int32_t stream_id,
         h2_iq_add(stream->session->suspended, stream->id, NULL, NULL);
         return NGHTTP2_ERR_DEFERRED;
     }
-    return ngstatus_from_apr_status(status);
+    else {
+        nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, 
+                                  stream_id, NGHTTP2_STREAM_CLOSED);
+        return NGHTTP2_ERR_STREAM_CLOSING;
+    }
 }
 
 h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
@@ -513,7 +505,7 @@ h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
         session->state = H2_PROXYS_ST_INIT;
         session->window_bits_default    = 30;
         session->window_bits_connection = 30;
-        session->streams = h2_iq_create(pool, 25);
+        session->streams = h2_ihash_create(pool, offsetof(h2_proxy_stream, id));
         session->suspended = h2_iq_create(pool, 5);
         session->done = done;
     
@@ -689,7 +681,7 @@ static apr_status_t submit_stream(h2_proxy_session *session, h2_proxy_stream *st
     if (rv > 0) {
         stream->id = rv;
         stream->state = H2_STREAM_ST_OPEN;
-        h2_iq_add(session->streams, stream->id, NULL, NULL);
+        h2_ihash_add(session->streams, stream);
         dispatch_event(session, H2_PROXYS_EV_STREAM_SUBMITTED, rv, NULL);
         
         return APR_SUCCESS;
@@ -697,13 +689,36 @@ static apr_status_t submit_stream(h2_proxy_session *session, h2_proxy_stream *st
     return APR_EGENERAL;
 }
 
-static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block)
+static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block, 
+                                          apr_interval_time_t timeout)
 {
     apr_status_t status;
+    apr_socket_t *socket = NULL;
+    apr_time_t save_timeout = -1;
+    
+    if (block) {
+        socket = ap_get_conn_socket(session->c);
+        if (socket) {
+            apr_socket_timeout_get(socket, &save_timeout);
+            apr_socket_timeout_set(socket, timeout);
+        }
+        else {
+            /* cannot block on timeout */
+            ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, session->c, 
+                          "h2_session(%s): unable to get conn socket", 
+                          session->id);
+            return APR_ENOTIMPL;
+        }
+    }
+    
     status = ap_get_brigade(session->c->input_filters, session->input, 
                             AP_MODE_READBYTES, 
                             block? APR_BLOCK_READ : APR_NONBLOCK_READ, 
                             64 * 1024);
+    if (socket && save_timeout != -1) {
+            apr_socket_timeout_set(socket, save_timeout);
+    }
+    
     if (status == APR_SUCCESS) {
         if (APR_BRIGADE_EMPTY(session->input)) {
             status = APR_EAGAIN;
@@ -712,6 +727,9 @@ static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block)
             feed_brigade(session, session->input);
         }
     }
+    else if (APR_STATUS_IS_TIMEUP(status)) {
+        /* nop */
+    }
     else if (!APR_STATUS_IS_EAGAIN(status)) {
         dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, 0, NULL);
     }
@@ -841,7 +859,7 @@ static void ev_init(h2_proxy_session *session, int arg, const char *msg)
 {
     switch (session->state) {
         case H2_PROXYS_ST_INIT:
-            if (h2_iq_empty(session->streams)) {
+            if (h2_ihash_is_empty(session->streams)) {
                 transit(session, "init", H2_PROXYS_ST_IDLE);
             }
             else {
@@ -948,7 +966,7 @@ static void ev_no_io(h2_proxy_session *session, int arg, const char *msg)
              * CPU cycles. Ideally, we'd like to do a blocking read, but that
              * is not possible if we have scheduled tasks and wait
              * for them to produce something. */
-            if (h2_iq_empty(session->streams)) {
+            if (h2_ihash_is_empty(session->streams)) {
                 if (!is_accepting_streams(session)) {
                     /* We are no longer accepting new streams and have
                      * finished processing existing ones. Time to leave. */
@@ -966,6 +984,7 @@ static void ev_no_io(h2_proxy_session *session, int arg, const char *msg)
                  * task processing in other threads. Do a busy wait with
                  * backoff timer. */
                 transit(session, "no io", H2_PROXYS_ST_WAIT);
+                session->wait_timeout = 25;
             }
             break;
         default:
@@ -1004,7 +1023,7 @@ static void ev_stream_done(h2_proxy_session *session, int stream_id,
             stream->data_received = 1;
         }
         stream->state = H2_STREAM_ST_CLOSED;
-        h2_iq_remove(session->streams, stream_id);
+        h2_ihash_remove(session->streams, stream_id);
         h2_iq_remove(session->suspended, stream_id);
         if (session->done) {
             session->done(session, stream->r);
@@ -1154,7 +1173,8 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *session)
             }
             
             if (nghttp2_session_want_read(session->ngh2)) {
-                if (h2_proxy_session_read(session, 0) == APR_SUCCESS) {
+                status = h2_proxy_session_read(session, 0, 0);
+                if (status == APR_SUCCESS) {
                     have_read = 1;
                 }
             }
@@ -1169,9 +1189,14 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *session)
             if (check_suspended(session) == APR_EAGAIN) {
                 /* no stream has become resumed. Do a blocking read with
                  * ever increasing timeouts... */
-                if (h2_proxy_session_read(session, 0) == APR_SUCCESS) {
+                status = h2_proxy_session_read(session, 0, session->wait_timeout);
+                if (status == APR_SUCCESS) {
                     dispatch_event(session, H2_PROXYS_EV_DATA_READ, 0, NULL);
                 }
+                else if (APR_STATUS_IS_TIMEUP(status)) {
+                    session->wait_timeout = H2MIN(apr_time_from_msec(100), 
+                                                  2*session->wait_timeout);
+                }
             }
             break;
             
@@ -1198,3 +1223,26 @@ apr_status_t h2_proxy_session_process(h2_proxy_session *session)
     return APR_EAGAIN;
 }
 
+typedef struct {
+    h2_proxy_session *session;
+    h2_proxy_request_done *done;
+} cleanup_iter_ctx;
+
+static int cleanup_iter(void *udata, void *val)
+{
+    cleanup_iter_ctx *ctx = udata;
+    h2_proxy_stream *stream = val;
+    ctx->done(ctx->session, stream->r);
+    return 1;
+}
+
+void h2_proxy_session_cleanup(h2_proxy_session *session, 
+                              h2_proxy_request_done *done)
+{
+    cleanup_iter_ctx ctx;
+    ctx.session = session;
+    ctx.done = done;
+    h2_ihash_iter(session->streams, cleanup_iter, &ctx);
+    h2_ihash_clear(session->streams);
+}
+
index 089fd107a594885b647e7c2878741bcd881fd7ad..d4f68b3a19c6797afd8d53f7c111632197d561de 100644 (file)
@@ -21,6 +21,7 @@
 #include <nghttp2/nghttp2.h>
 
 struct h2_int_queue;
+struct h2_ihash_t;
 
 typedef enum {
     H2_PROXYS_ST_INIT,             /* send initial SETTINGS, etc. */
@@ -67,8 +68,9 @@ struct h2_proxy_session {
     int window_bits_connection;
 
     h2_proxys_state state;
+    apr_interval_time_t wait_timeout;
 
-    struct h2_int_queue *streams;
+    struct h2_ihash_t *streams;
     struct h2_int_queue *suspended;
     apr_size_t remote_max_concurrent;
     int max_stream_recv;
@@ -86,6 +88,7 @@ apr_status_t h2_proxy_session_submit(h2_proxy_session *s, const char *url,
                                      
 apr_status_t h2_proxy_session_process(h2_proxy_session *s);
 
+void h2_proxy_session_cleanup(h2_proxy_session *s, h2_proxy_request_done *done);
 
 #define H2_PROXY_REQ_URL_NOTE   "h2-proxy-req-url"
 
index 1cbe8d1e8e32c8a4dcfb4f2b3cd161b3811b4227..52c858e609583d8d37f682b0422305b06328e762 100644 (file)
@@ -297,6 +297,11 @@ void h2_ihash_remove(h2_ihash_t *ih, int id)
     apr_hash_set(ih->hash, &id, sizeof(id), NULL);
 }
 
+void h2_ihash_clear(h2_ihash_t *ih)
+{
+    apr_hash_clear(ih->hash);
+}
+
 /*******************************************************************************
  * h2_util for apt_table_t
  ******************************************************************************/
index e13a0dc2f2a6ba7604e92ac94e66a6b75513b01d..cd2d8a12e363e2873c01f40690968a7acc400a34 100644 (file)
@@ -61,6 +61,7 @@ void h2_ihash_iter(h2_ihash_t *ih, h2_ihash_iter_t *fn, void *ctx);
 
 void h2_ihash_add(h2_ihash_t *ih, void *val);
 void h2_ihash_remove(h2_ihash_t *ih, int id);
+void h2_ihash_clear(h2_ihash_t *ih);
  
 /*******************************************************************************
  * common helpers
index efd69465c29fd568decbe214e07a97328328c2d6..a2d80d32e8bdd9b2012f7b455c402e85fccc9eb0 100644 (file)
@@ -234,11 +234,13 @@ static void request_done(h2_proxy_session *session, request_rec *r)
 }
 
 static request_rec *next_request(h2_proxy_ctx *ctx, h2_proxy_session *session, 
-                                 request_rec *r)
+                                 request_rec *r, int before_leave)
 {
     if (!r && !ctx->standalone) {
         ctx->engine->capacity = session->remote_max_concurrent;
-        if (req_engine_pull(ctx->engine, APR_NONBLOCK_READ, &r) == APR_SUCCESS) {
+        if (req_engine_pull(ctx->engine, 
+                            before_leave? APR_BLOCK_READ: APR_NONBLOCK_READ, 
+                            &r) == APR_SUCCESS) {
             ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, 
                           "h2_proxy_session(%s): pulled request %s", 
                           session->id, r->the_request);
@@ -312,7 +314,7 @@ run_session:
     session->user_data = ctx;
     status = h2_proxy_session_process(session);
     while (APR_STATUS_IS_EAGAIN(status)) {
-        r = next_request(ctx, session, r);
+        r = next_request(ctx, session, r, 0);
         if (r) {
             add_request(session, r);
             r = NULL;
@@ -327,9 +329,15 @@ run_session:
         ctx->p_conn->close = 1;
     }
     
-    r = next_request(ctx, session, r);
+    r = next_request(ctx, session, r, 1);
     if (r) { 
         if (ctx->p_conn->close) {
+            /* the connection is/willbe closed, the session is terminated.
+             * Any open stream of that session needs to
+             * a) be reopened on the new session iff safe to do so
+             * b) reported as done (failed) otherwise
+             */
+            h2_proxy_session_cleanup(session, request_done);
             goto setup_backend;
         }
         add_request(session, r);
@@ -337,11 +345,11 @@ run_session:
         goto run_session;
     }
 
-    if (session->streams && !h2_iq_empty(session->streams)) {
+    if (session->streams && !h2_ihash_is_empty(session->streams)) {
         ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, 
                       ctx->p_conn->connection, 
                       "session run done with %d streams unfinished",
-                      h2_iq_size(session->streams));
+                      (int)h2_ihash_count(session->streams));
     }
     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, 
                   ctx->p_conn->connection, "eng(%s): session run done",