]> git.ipfire.org Git - thirdparty/apache/httpd.git/commitdiff
mod_proxy: Improve tunneling loop.
authorYann Ylavic <ylavic@apache.org>
Tue, 5 Nov 2019 16:41:14 +0000 (16:41 +0000)
committerYann Ylavic <ylavic@apache.org>
Tue, 5 Nov 2019 16:41:14 +0000 (16:41 +0000)
Support half closed connections and pending data draining (for protocols like
rsync). PR 61616.

When reading on one side goes faster than writing on the other side, the output
filters chain may start buffering data and finally block, which will break
bidirectional tunneling for some protocols.

To avoid this, proxy_tunnel_run() now stops polling/reading until pending data
are drained, and recovers appropriately.

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1869420 13f79535-47bb-0310-9956-ffa450edef68

CHANGES
docs/log-message-tags/next-number
modules/proxy/mod_proxy.h
modules/proxy/mod_proxy_connect.c
modules/proxy/mod_proxy_wstunnel.c
modules/proxy/proxy_util.c

diff --git a/CHANGES b/CHANGES
index c311e82ba8c108e11662fb3beb3361071a21db75..f8a54e9f0e8d43dec3ac35a2ec5af6a58ae6e93a 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,9 @@
                                                          -*- coding: utf-8 -*-
 Changes with Apache 2.5.1
    
+  *) mod_proxy: Improve tunneling loop to support half closed connections and
+     pending data draining (for protocols like rsync). PR 61616. [Yann Ylavic]
+
   *) mod_proxy: Add proxy check_trans hook for proxy modules to possibly
      decline request handling at early stage.  [Yann Ylavic]
 
index d1037b174def16270830e9fda850881eaf0b56b2..7705262be1eb7752a0e8a6eb5c91ab625dfe04b6 100644 (file)
@@ -1 +1 @@
-10224
+10226
index c75992a60b3c902c139986a3f3cd57b89ce4fcce..335e0a3defee5ed011360436f9b7cba27b07e177 100644 (file)
@@ -1215,14 +1215,15 @@ PROXY_DECLARE(int) ap_proxy_pass_brigade(apr_bucket_alloc_t *bucket_alloc,
                                          conn_rec *origin, apr_bucket_brigade *bb,
                                          int flush);
 
+struct proxy_tunnel_conn; /* opaque */
 typedef struct {
     request_rec *r;
-    conn_rec *origin;
+    const char *scheme;
     apr_pollset_t *pollset;
     apr_array_header_t *pfds;
     apr_interval_time_t timeout;
-    apr_bucket_brigade *bb_i;
-    apr_bucket_brigade *bb_o;
+    struct proxy_tunnel_conn *client,
+                             *origin;
     int replied;
 } proxy_tunnel_rec;
 
@@ -1230,22 +1231,20 @@ typedef struct {
  * Create a tunnel, to be activated by ap_proxy_tunnel_run().
  * @param tunnel   tunnel created
  * @param r        client request
- * @param origin   backend connection
+ * @param c_o      connection to origin
+ * @param scheme   caller proxy scheme (connect, ws(s), http(s), ...)
  * @return         APR_SUCCESS or error status
  */
 PROXY_DECLARE(apr_status_t) ap_proxy_tunnel_create(proxy_tunnel_rec **tunnel,
-                                                   request_rec *r,
-                                                   conn_rec *origin);
+                                                   request_rec *r, conn_rec *c_o,
+                                                   const char *scheme);
 
 /**
  * Forward anything from either side of the tunnel to the other,
  * until one end aborts or a polling timeout/error occurs.
- * @param tunnel  tunnel created
- * @return        OK: closed/aborted on one side,
- *                HTTP_GATEWAY_TIME_OUT: polling timeout,
- *                HTTP_INTERNAL_SERVER_ERROR: polling error,
- *                HTTP_BAD_GATEWAY: no response from backend, ever,
- *                                  so client may expect one still.
+ * @param tunnel  tunnel to run
+ * @return        OK if completion is full, HTTP_GATEWAY_TIME_OUT on timeout
+ *                or another HTTP_ error otherwise.
  */
 PROXY_DECLARE(int) ap_proxy_tunnel_run(proxy_tunnel_rec *tunnel);
 
@@ -1324,6 +1323,14 @@ PROXY_DECLARE(apr_status_t) ap_proxy_buckets_lifetime_transform(request_rec *r,
                                                       apr_bucket_brigade *from,
                                                       apr_bucket_brigade *to);
 
+/* 
+ * The flags for ap_proxy_transfer_between_connections(), where for legacy and
+ * compatibility reasons FLUSH_EACH and FLUSH_AFTER are boolean values.
+ */
+#define AP_PROXY_TRANSFER_FLUSH_EACH    (0x0)
+#define AP_PROXY_TRANSFER_FLUSH_AFTER   (0x1)
+#define AP_PROXY_TRANSFER_SHOULD_YIELD  (0x2)
+
 /*
  * Sends all data that can be read non blocking from the input filter chain of
  * c_i and send it down the output filter chain of c_o. For reading it uses
@@ -1341,10 +1348,12 @@ PROXY_DECLARE(apr_status_t) ap_proxy_buckets_lifetime_transform(request_rec *r,
  * @param name  string for logging from where data was pulled
  * @param sent  if not NULL will be set to 1 if data was sent through c_o
  * @param bsize maximum amount of data pulled in one iteration from c_i
- * @param after if set flush data on c_o only once after the loop
+ * @param flags AP_PROXY_TRANSFER_* bitmask
  * @return      apr_status_t of the operation. Could be any error returned from
  *              either the input filter chain of c_i or the output filter chain
- *              of c_o. APR_EPIPE if the outgoing connection was aborted.
+ *              of c_o, APR_EPIPE if the outgoing connection was aborted, or
+ *              APR_INCOMPLETE if AP_PROXY_TRANSFER_SHOULD_YIELD was set and
+ *              the output stack gets full before the input stack is exhausted.
  */
 PROXY_DECLARE(apr_status_t) ap_proxy_transfer_between_connections(
                                                        request_rec *r,
@@ -1355,7 +1364,7 @@ PROXY_DECLARE(apr_status_t) ap_proxy_transfer_between_connections(
                                                        const char *name,
                                                        int *sent,
                                                        apr_off_t bsize,
-                                                       int after);
+                                                       int flags);
 
 extern module PROXY_DECLARE_DATA proxy_module;
 
index 0b8e56a4cebc731964f2b7c5a3bed75a6fa9fd72..a383a64eedb72893c308b3d096c901ac0cde85cd 100644 (file)
@@ -335,7 +335,7 @@ static int proxy_connect_handler(request_rec *r, proxy_worker *worker,
 
     /* r->sent_bodyct = 1; */
 
-    rv = ap_proxy_tunnel_create(&tunnel, r, backconn);
+    rv = ap_proxy_tunnel_create(&tunnel, r, backconn, "CONNECT");
     if (rv != APR_SUCCESS) {
         ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(10208)
                       "can't create tunnel for %pI (%s)",
@@ -345,6 +345,11 @@ static int proxy_connect_handler(request_rec *r, proxy_worker *worker,
 
     rc = ap_proxy_tunnel_run(tunnel);
     if (ap_is_HTTP_ERROR(rc)) {
+        if (rc == HTTP_GATEWAY_TIME_OUT) {
+            /* ap_proxy_tunnel_run() didn't log this */
+            ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10224)
+                          "tunnel timed out");
+        }
         /* Don't send an error page if we sent data already */
         if (proxyport && !tunnel->replied) {
             return rc;
index 794397b30d70b1a3e7aefe7722c24b9d32b90106..6640c62ecd250e5a4568ccd108e8b435708568b1 100644 (file)
@@ -37,10 +37,17 @@ static void proxy_wstunnel_callback(void *b);
 static int proxy_wstunnel_pump(ws_baton_t *baton, int async)
 {
     int status = ap_proxy_tunnel_run(baton->tunnel);
-    if (async && status == HTTP_GATEWAY_TIME_OUT) {
-        ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, baton->r,
-                      APLOGNO(02542) "Attempting to go async");
-        status = SUSPENDED;
+    if (status == HTTP_GATEWAY_TIME_OUT) {
+        if (!async) {
+            /* ap_proxy_tunnel_run() didn't log this */
+            ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, baton->r, APLOGNO(10225)
+                          "Tunnel timed out");
+        }
+        else {
+            ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, baton->r, APLOGNO(02542)
+                          "Attempting to go async");
+            status = SUSPENDED;
+        }
     }
     return status;
 }
@@ -234,7 +241,7 @@ static int proxy_wstunnel_request(apr_pool_t *p, request_rec *r,
 
     ap_remove_input_filter_byhandle(c->input_filters, "reqtimeout");
 
-    rv = ap_proxy_tunnel_create(&tunnel, r, conn->connection);
+    rv = ap_proxy_tunnel_create(&tunnel, r, conn->connection, scheme);
     if (rv != APR_SUCCESS) {
         ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(02543)
                       "error creating websocket tunnel");
index 5a2d072b03a8464a6fb8663dd14d315cfc40e4ae..e7474c549c5dd66e73ae6063c091a851187075e2 100644 (file)
@@ -4051,13 +4051,25 @@ PROXY_DECLARE(apr_status_t) ap_proxy_transfer_between_connections(
                                                        const char *name,
                                                        int *sent,
                                                        apr_off_t bsize,
-                                                       int after)
+                                                       int flags)
 {
     apr_status_t rv;
+    int flush_each = 0;
 #ifdef DEBUGGING
     apr_off_t len;
 #endif
 
+    /*
+     * Compat: since FLUSH_EACH is default (and zero) for legacy reasons, we
+     * pretend it's no FLUSH_AFTER nor SHOULD_YIELD flags, the latter because
+     * flushing would defeat the purpose of checking for pending data (hence
+     * determine whether or not the output chain/stack is full for stopping).
+     */
+    if (!(flags & (AP_PROXY_TRANSFER_FLUSH_AFTER |
+                   AP_PROXY_TRANSFER_SHOULD_YIELD))) {
+        flush_each = 1;
+    }
+
     do {
         apr_brigade_cleanup(bb_i);
         rv = ap_get_brigade(c_i->input_filters, bb_i, AP_MODE_READBYTES,
@@ -4065,7 +4077,9 @@ PROXY_DECLARE(apr_status_t) ap_proxy_transfer_between_connections(
         if (rv == APR_SUCCESS) {
             if (c_o->aborted) {
                 apr_brigade_cleanup(bb_i);
-                return APR_EPIPE;
+                flags &= ~AP_PROXY_TRANSFER_FLUSH_AFTER;
+                rv = APR_EPIPE;
+                break;
             }
             if (APR_BRIGADE_EMPTY(bb_i)) {
                 break;
@@ -4082,14 +4096,14 @@ PROXY_DECLARE(apr_status_t) ap_proxy_transfer_between_connections(
                 *sent = 1;
             }
             ap_proxy_buckets_lifetime_transform(r, bb_i, bb_o);
-            if (!after) {
+            if (flush_each) {
                 apr_bucket *b;
 
                 /*
                  * Do not use ap_fflush here since this would cause the flush
                  * bucket to be sent in a separate brigade afterwards which
                  * causes some filters to set aside the buckets from the first
-                 * brigade and process them when the flush arrives in the second
+                 * brigade and process them when FLUSH arrives in the second
                  * brigade. As set asides of our transformed buckets involve
                  * memory copying we try to avoid this. If we have the flush
                  * bucket in the first brigade they directly process the
@@ -4104,6 +4118,15 @@ PROXY_DECLARE(apr_status_t) ap_proxy_transfer_between_connections(
                               "ap_proxy_transfer_between_connections: "
                               "error on %s - ap_pass_brigade",
                               name);
+                flags &= ~AP_PROXY_TRANSFER_FLUSH_AFTER;
+            }
+            else if ((flags & AP_PROXY_TRANSFER_SHOULD_YIELD) &&
+                     ap_filter_should_yield(c_o->output_filters)) {
+                ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r,
+                              "ap_proxy_transfer_between_connections: "
+                              "output filters full");
+                flags &= ~AP_PROXY_TRANSFER_FLUSH_AFTER;
+                rv = APR_INCOMPLETE;
             }
             apr_brigade_cleanup(bb_o);
         }
@@ -4115,7 +4138,7 @@ PROXY_DECLARE(apr_status_t) ap_proxy_transfer_between_connections(
         }
     } while (rv == APR_SUCCESS);
 
-    if (after) {
+    if (flags & AP_PROXY_TRANSFER_FLUSH_AFTER) {
         ap_fflush(c_o->output_filters, bb_o);
         apr_brigade_cleanup(bb_o);
     }
@@ -4131,191 +4154,328 @@ PROXY_DECLARE(apr_status_t) ap_proxy_transfer_between_connections(
     return rv;
 }
 
+struct proxy_tunnel_conn {
+    conn_rec *c;
+    const char *name;
+    apr_pollfd_t *pfd;
+    apr_bucket_brigade *bb;
+    struct proxy_tunnel_conn *other;
+    unsigned int readable:1,
+                 drain:1;
+};
+
 PROXY_DECLARE(apr_status_t) ap_proxy_tunnel_create(proxy_tunnel_rec **ptunnel,
-                                                   request_rec *r,
-                                                   conn_rec *origin)
+                                                   request_rec *r, conn_rec *c_o,
+                                                   const char *scheme)
 {
     apr_status_t rv;
-    apr_pollfd_t *pfds;
-    conn_rec *c = r->connection;
+    conn_rec *c_i = r->connection;
     proxy_tunnel_rec *tunnel;
 
     *ptunnel = NULL;
 
     tunnel = apr_pcalloc(r->pool, sizeof(*tunnel));
 
-    tunnel->r = r;
-    tunnel->origin = origin;
-    tunnel->bb_i = apr_brigade_create(r->pool,
-                                      c->bucket_alloc);
-    tunnel->bb_o = apr_brigade_create(origin->pool,
-                                      origin->bucket_alloc);
-    
-    tunnel->timeout = -1;
-    rv = apr_pollset_create(&tunnel->pollset, 2, r->pool,
-                            APR_POLLSET_NOCOPY);
+    rv = apr_pollset_create(&tunnel->pollset, 2, r->pool, APR_POLLSET_NOCOPY);
     if (rv != APR_SUCCESS) {
         return rv;
     }
 
+    tunnel->r = r;
+    tunnel->scheme = apr_pstrdup(r->pool, scheme);
+    tunnel->client = apr_pcalloc(r->pool, sizeof(struct proxy_tunnel_conn));
+    tunnel->origin = apr_pcalloc(r->pool, sizeof(struct proxy_tunnel_conn));
     tunnel->pfds = apr_array_make(r->pool, 2, sizeof(apr_pollfd_t));
-    apr_array_push(tunnel->pfds); /* pfds[0] */
-    apr_array_push(tunnel->pfds); /* pfds[1] */
+    tunnel->timeout = -1;
 
-    pfds = &APR_ARRAY_IDX(tunnel->pfds, 0, apr_pollfd_t);
-    pfds[0].desc.s = ap_get_conn_socket(c);
-    pfds[1].desc.s = ap_get_conn_socket(origin);
-    pfds[0].desc_type = pfds[1].desc_type = APR_POLL_SOCKET;
-    pfds[0].reqevents = pfds[1].reqevents = APR_POLLIN | APR_POLLHUP;
-    pfds[0].p = pfds[1].p = r->pool;
+    tunnel->client->c = c_i;
+    tunnel->client->name = "client";
+    tunnel->client->bb = apr_brigade_create(c_i->pool, c_i->bucket_alloc);
+    tunnel->client->pfd = &APR_ARRAY_PUSH(tunnel->pfds, apr_pollfd_t);
+    memset(tunnel->client->pfd, 0, sizeof(*tunnel->client->pfd));
+    tunnel->client->pfd->p = r->pool;
+    tunnel->client->pfd->desc_type = APR_POLL_SOCKET;
+    tunnel->client->pfd->desc.s = ap_get_conn_socket(c_i);
+    tunnel->client->pfd->client_data = tunnel->client;
+    tunnel->client->other = tunnel->origin;
+    tunnel->client->readable = 1;
+
+    tunnel->origin->c = c_o;
+    tunnel->origin->name = "origin";
+    tunnel->origin->bb = apr_brigade_create(c_o->pool, c_o->bucket_alloc);
+    tunnel->origin->pfd = &APR_ARRAY_PUSH(tunnel->pfds, apr_pollfd_t);
+    memset(tunnel->origin->pfd, 0, sizeof(*tunnel->origin->pfd));
+    tunnel->origin->pfd->p = r->pool;
+    tunnel->origin->pfd->desc_type = APR_POLL_SOCKET;
+    tunnel->origin->pfd->desc.s = ap_get_conn_socket(c_o);
+    tunnel->origin->pfd->client_data = tunnel->origin;
+    tunnel->origin->other = tunnel->client;
+    tunnel->origin->readable = 1;
+
+#if 0
+    apr_socket_opt_set(tunnel->client->pfd->desc.s, APR_SO_NONBLOCK, 1);
+    apr_socket_opt_set(tunnel->client->pfd->desc.s, APR_SO_NONBLOCK, 1);
+    apr_socket_opt_set(tunnel->origin->pfd->desc.s, APR_SO_KEEPALIVE, 1);
+    apr_socket_opt_set(tunnel->origin->pfd->desc.s, APR_SO_KEEPALIVE, 1);
+#endif
+
+    /* No coalescing filters */
+    ap_remove_output_filter_byhandle(c_i->output_filters,
+                                     "SSL/TLS Coalescing Filter");
+    ap_remove_output_filter_byhandle(c_o->output_filters,
+                                     "SSL/TLS Coalescing Filter");
 
     /* The input/output filter stacks should contain connection filters only */
-    r->output_filters = c->output_filters;
-    r->proto_output_filters = c->output_filters;
-    r->input_filters = c->input_filters;
-    r->proto_input_filters = c->input_filters;
+    r->input_filters = r->proto_input_filters = c_i->input_filters;
+    r->output_filters = r->proto_output_filters = c_i->output_filters;
 
-    c->keepalive = AP_CONN_CLOSE;
-    origin->keepalive = AP_CONN_CLOSE;
+    c_i->keepalive = AP_CONN_CLOSE;
+    c_o->keepalive = AP_CONN_CLOSE;
 
     *ptunnel = tunnel;
     return APR_SUCCESS;
 }
 
-PROXY_DECLARE(apr_status_t) ap_proxy_tunnel_run(proxy_tunnel_rec *tunnel)
+static void add_pollset(apr_pollset_t *pollset, apr_pollfd_t *pfd,
+                        apr_int16_t events)
 {
     apr_status_t rv;
-    request_rec *r = tunnel->r;
-    conn_rec *c_i = r->connection;
-    conn_rec *c_o = tunnel->origin;
-    apr_socket_t *sock_i = ap_get_conn_socket(c_i);
-    apr_socket_t *sock_o = ap_get_conn_socket(c_o);
-    apr_interval_time_t timeout = tunnel->timeout >= 0 ? tunnel->timeout : -1;
-    apr_pollfd_t *pfds = &APR_ARRAY_IDX(tunnel->pfds, 0, apr_pollfd_t);
-    apr_pollset_t *pollset = tunnel->pollset;
-    const apr_pollfd_t *signalled;
-    apr_int32_t pollcnt, pi;
-    int done = 0;
 
-    AP_DEBUG_ASSERT(tunnel->pfds->nelts == 2);
-    AP_DEBUG_ASSERT(pfds[0].desc.s == sock_i);
-    AP_DEBUG_ASSERT(pfds[1].desc.s == sock_o);
+    if (events & APR_POLLIN) {
+        events |= APR_POLLHUP;
+    }
 
-    ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, APLOGNO(10212)
-                  "proxy: tunnel: running (timeout %" APR_TIME_T_FMT "."
-                                                  "%" APR_TIME_T_FMT ")",
-                  timeout > 0 ? apr_time_sec(timeout) : timeout,
-                  timeout > 0 ? timeout % APR_USEC_PER_SEC : 0);
+    if ((pfd->reqevents & events) == events) {
+        return;
+    }
 
-#if 0
-    apr_socket_opt_set(sock_i, APR_SO_NONBLOCK, 1);
-    apr_socket_opt_set(sock_i, APR_SO_NONBLOCK, 1);
-    apr_socket_opt_set(sock_o, APR_SO_KEEPALIVE, 1);
-    apr_socket_opt_set(sock_o, APR_SO_KEEPALIVE, 1);
-#endif
+    if (pfd->reqevents) {
+        rv = apr_pollset_remove(pollset, pfd);
+        if (rv != APR_SUCCESS) {
+            AP_DEBUG_ASSERT(1);
+        }
+    }
+
+    pfd->reqevents |= events;
+    rv = apr_pollset_add(pollset, pfd);
+    if (rv != APR_SUCCESS) {
+        AP_DEBUG_ASSERT(1);
+    }
+}
+
+static void del_pollset(apr_pollset_t *pollset, apr_pollfd_t *pfd,
+                        apr_int16_t events)
+{
+    apr_status_t rv;
+
+    if (events & APR_POLLIN) {
+        events |= APR_POLLHUP;
+    }
+
+    if ((pfd->reqevents & events) == 0) {
+        return;
+    }
 
-    apr_pollset_add(pollset, &pfds[0]);
-    apr_pollset_add(pollset, &pfds[1]);
+    rv = apr_pollset_remove(pollset, pfd);
+    if (rv != APR_SUCCESS) {
+        AP_DEBUG_ASSERT(1);
+    }
 
-    do { /* Loop until done (one side closes the connection, or an error) */
-        rv = apr_pollset_poll(tunnel->pollset, timeout, &pollcnt, &signalled);
+    pfd->reqevents &= ~events;
+    if (pfd->reqevents) {
+        rv = apr_pollset_add(pollset, pfd);
         if (rv != APR_SUCCESS) {
-            if (APR_STATUS_IS_EINTR(rv)) {
-                continue;
-            }
+            AP_DEBUG_ASSERT(1);
+        }
+    }
+}
+
+PROXY_DECLARE(int) ap_proxy_tunnel_run(proxy_tunnel_rec *tunnel)
+{
+    int rc = OK;
+    request_rec *r = tunnel->r;
+    apr_pollset_t *pollset = tunnel->pollset;
+    apr_interval_time_t timeout = tunnel->timeout >= 0 ? tunnel->timeout : -1;
+    struct proxy_tunnel_conn *client = tunnel->client, *origin = tunnel->origin;
+    apr_size_t read_buf_size = ap_get_read_buf_size(r);
+    const char *scheme = tunnel->scheme;
+    apr_status_t rv;
+
+    ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, APLOGNO(10212)
+                  "proxy: %s: tunnel running (timeout %" APR_TIME_T_FMT "."
+                                                     "%" APR_TIME_T_FMT ")",
+                  scheme, timeout > 0 ? apr_time_sec(timeout) : timeout,
+                          timeout > 0 ? timeout % APR_USEC_PER_SEC : 0);
+
+    client->pfd->reqevents = 0;
+    origin->pfd->reqevents = 0;
+    add_pollset(pollset, client->pfd, APR_POLLIN);
+    add_pollset(pollset, origin->pfd, APR_POLLIN);
+
+    /* Loop until both directions of the connection are closed,
+     * or a failure occurs.
+     */
+    do {
+        struct proxy_tunnel_conn *in, *out;
+        const apr_pollfd_t *results;
+        apr_int32_t nresults, i;
 
-            apr_pollset_remove(pollset, &pfds[1]);
-            apr_pollset_remove(pollset, &pfds[0]);
+        ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
+                      "proxy: %s: polling client=%hx, origin=%hx",
+                      scheme, client->pfd->reqevents, origin->pfd->reqevents);
+        do {
+            rv = apr_pollset_poll(pollset, timeout, &nresults, &results);
+        } while (APR_STATUS_IS_EINTR(rv));
 
+        if (rv != APR_SUCCESS) {
             if (APR_STATUS_IS_TIMEUP(rv)) {
                 ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, APLOGNO(10213)
-                              "proxy: tunnel: woken up, i=%d", (int)pollcnt);
-
-                return HTTP_GATEWAY_TIME_OUT;
+                              "proxy: %s: polling timeout", scheme);
+                rc = HTTP_GATEWAY_TIME_OUT;
             }
-
-            ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(10214)
-                          "proxy: tunnel: polling failed");
-            return HTTP_INTERNAL_SERVER_ERROR;
+            else {
+                ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(10214)
+                              "proxy: %s: polling failed", scheme);
+                rc = HTTP_INTERNAL_SERVER_ERROR;
+            }
+            goto cleanup;
         }
 
-        ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, APLOGNO(10215)
-                      "proxy: tunnel: woken up, i=%d", (int)pollcnt);
-
-        for (pi = 0; pi < pollcnt; pi++) {
-            const apr_pollfd_t *cur = &signalled[pi];
-            apr_int16_t pollevent = cur->rtnevents;
-
-            if (cur->desc.s == sock_o) {
-                if (pollevent & (APR_POLLIN | APR_POLLHUP)) {
-                    ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, APLOGNO(10216)
-                                  "proxy: tunnel: backend was readable");
-                    rv = ap_proxy_transfer_between_connections(r, c_o, c_i,
-                                                               tunnel->bb_o,
-                                                               tunnel->bb_i,
-                                                               "backend",
-                                                               &tunnel->replied,
-                                                               AP_IOBUFSIZE,
-                                                               0);
-                    done |= (rv != APR_SUCCESS);
+        ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r, APLOGNO(10215)
+                      "proxy: %s: woken up, %i result(s)", scheme, nresults);
+
+        for (i = 0; i < nresults; i++) {
+            const apr_pollfd_t *cur = &results[i];
+            int revents = cur->rtnevents;
+
+            /* sanity check */
+            if (cur->desc.s != client->pfd->desc.s
+                    && cur->desc.s != origin->pfd->desc.s) {
+                ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10222)
+                              "proxy: %s: unknown socket in pollset", scheme);
+                rc = HTTP_INTERNAL_SERVER_ERROR;
+                goto cleanup;
+            }
+
+            in = cur->client_data;
+            if (revents & APR_POLLOUT) {
+                in = in->other;
+            }
+            else if (!(revents & (APR_POLLIN | APR_POLLHUP))) {
+                /* this catches POLLERR/POLLNVAL etc.. */
+                ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10220)
+                              "proxy: %s: polling events error (%x)",
+                              scheme, revents);
+                rc = HTTP_INTERNAL_SERVER_ERROR;
+                goto cleanup;
+            }
+            out = in->other;
+
+            ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
+                          "proxy: %s: #%i: %s/%hx => %s/%hx: %x",
+                          scheme, i, in->name, in->pfd->reqevents,
+                          out->name, out->pfd->reqevents, revents);
+
+            if (in->readable && (in->drain || !(revents & APR_POLLOUT))) {
+                int sent = 0;
+
+                ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
+                              "proxy: %s: %s is %s", scheme, in->name,
+                              (revents & APR_POLLOUT) ? "draining"
+                                                      : "readable");
+
+                rv = ap_proxy_transfer_between_connections(r,
+                                               in->c, out->c,
+                                               in->bb, out->bb,
+                                               in->name, &sent,
+                                               read_buf_size,
+                                               AP_PROXY_TRANSFER_SHOULD_YIELD);
+                if (sent && out == client) {
+                    tunnel->replied = 1;
+                }
+                if (rv != APR_SUCCESS) {
+                    if (APR_STATUS_IS_INCOMPLETE(rv)) {
+                        /* Pause POLLIN while waiting for POLLOUT on the other
+                         * side, hence avoid filling the output filters even
+                         * more and hence blocking there.
+                         */
+                        ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
+                                      "proxy: %s: %s wait writable",
+                                      scheme, out->name);
+                        revents &= ~APR_POLLOUT;
+                        in->drain = 1;
+                    }
+                    else if (APR_STATUS_IS_EOF(rv)) {
+                        /* Stop POLLIN and wait for POLLOUT (flush) on the
+                         * other side to shut it down.
+                         */
+                        ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
+                                      "proxy: %s: %s read shutdown",
+                                      scheme, in->name);
+                        in->readable = in->drain = 0;
+                    }
+                    else {
+                        /* Real failure, bail out */
+                        rc = HTTP_INTERNAL_SERVER_ERROR;
+                        goto cleanup;
+                    }
+                    del_pollset(pollset, in->pfd, APR_POLLIN);
+                    sent = 1;
                 }
-                else if (pollevent & APR_POLLERR) {
-                    ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10217)
-                            "proxy: tunnel: error on backend connection");
-                    c_o->aborted = 1;
-                    done = 1;
+                else {
+                    in->drain = 0;
                 }
-                else { 
-                    ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10218)
-                            "proxy: tunnel: unknown event %d on backend connection",
-                            (int)pollevent);
-                    done = 1;
+
+                if (sent) {
+                    add_pollset(pollset, out->pfd, APR_POLLOUT);
                 }
             }
-            else if (cur->desc.s == sock_i) {
-                if (pollevent & (APR_POLLIN | APR_POLLHUP)) {
-                    ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, r, APLOGNO(10219)
-                                  "proxy: tunnel: client was readable");
-                    rv = ap_proxy_transfer_between_connections(r, c_i, c_o,
-                                                               tunnel->bb_i,
-                                                               tunnel->bb_o,
-                                                               "client", NULL,
-                                                               AP_IOBUFSIZE,
-                                                               0);
-                    done |= (rv != APR_SUCCESS);
-                }
-                else if (pollevent & APR_POLLERR) {
-                    ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10220)
-                                  "proxy: tunnel: error on client connection");
-                    c_i->aborted = 1;
-                    done = 1;
+
+            if (revents & APR_POLLOUT) {
+                ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
+                              "proxy: %s: %s is writable",
+                              scheme, out->name);
+
+                rv = ap_filter_output_pending(out->c);
+                if (rv == DECLINED) {
+                    /* No more pending data. If the 'in' side is not readable
+                     * anymore it's time to shutdown for write (this direction
+                     * is over). Otherwise draining (if any) is done, back to
+                     * normal business.
+                     */
+                    if (!in->readable) {
+                        ap_log_rerror(APLOG_MARK, APLOG_TRACE8, 0, r,
+                                      "proxy: %s: %s write shutdown",
+                                      scheme, out->name);
+                        del_pollset(pollset, out->pfd, APR_POLLOUT);
+                        apr_socket_shutdown(out->pfd->desc.s, 1);
+                    }
+                    else {
+                        add_pollset(pollset, in->pfd, APR_POLLIN);
+                        if (!in->drain) {
+                            del_pollset(pollset, out->pfd, APR_POLLOUT);
+                        }
+                    }
                 }
-                else { 
+                else if (rv != OK) {
+                    /* Real failure, bail out */
                     ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10221)
-                            "proxy: tunnel: unknown event %d on client connection",
-                            (int)pollevent);
-                    done = 1;
+                                  "proxy: %s: %s flushing failed (%i)",
+                                  scheme, out->name, rv);
+                    rc = HTTP_INTERNAL_SERVER_ERROR;
+                    goto cleanup;
                 }
             }
-            else {
-                ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, APLOGNO(10222)
-                              "proxy: tunnel: unknown socket in pollset");
-                done = 1;
-            }
         }
-    } while (!done);
+    } while (client->pfd->reqevents || origin->pfd->reqevents);
 
     ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, APLOGNO(10223)
-                  "proxy: tunnel: finished");
-
-    apr_pollset_remove(pollset, &pfds[1]);
-    apr_pollset_remove(pollset, &pfds[0]);
+                  "proxy: %s: tunnel finished", scheme);
 
-    if (!tunnel->replied) {
-        return HTTP_BAD_GATEWAY;
-    }
-
-    return OK;
+cleanup:
+    del_pollset(pollset, client->pfd, ~0);
+    del_pollset(pollset, origin->pfd, ~0);
+    return rc;
 }
 
 PROXY_DECLARE (const char *) ap_proxy_show_hcmethod(hcmethod_t method)