]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: connections: Detach connections from streams.
authorOlivier Houchard <ohouchard@haproxy.com>
Tue, 13 Nov 2018 15:48:36 +0000 (16:48 +0100)
committerWilly Tarreau <w@1wt.eu>
Sun, 18 Nov 2018 20:45:45 +0000 (21:45 +0100)
Do not destroy the connection when we're about to destroy a stream. This
prevents us from doing keepalive on server connections when the client is
using HTTP/2, as a new stream is created for each request.
Instead, the session is now responsible for destroying connections.
When reusing connections, the attach() mux method is now used to create a new
conn_stream.

include/proto/connection.h
include/proto/stream_interface.h
src/backend.c
src/mux_h2.c
src/mux_pt.c
src/proto_http.c
src/session.c
src/stream_interface.c

index ade69940114fbeea07af8a1059e39688e254dcf9..611e6ad2dce1febe5c1caddcd2098b364989cc12 100644 (file)
@@ -673,10 +673,12 @@ static inline void conn_free(struct connection *conn)
                LIST_INIT(&sess->conn_list);
        }
        conn_force_unsubscribe(conn);
+       LIST_DEL(&conn->list);
+       LIST_INIT(&conn->list);
        pool_free(pool_head_connection, conn);
 }
 
-/* Release a conn_stream, and kill the connection if it was the last one */
+/* Release a conn_stream */
 static inline void cs_destroy(struct conn_stream *cs)
 {
        if (cs->conn->mux)
index 90e395655c077ebdb67f3ce2aebe56c649d4229a..3d2a6db6ef7683cf2bd6a6a874416f8849c48fc7 100644 (file)
@@ -45,7 +45,6 @@ extern struct si_ops si_embedded_ops;
 extern struct si_ops si_conn_ops;
 extern struct si_ops si_applet_ops;
 extern struct data_cb si_conn_cb;
-extern struct data_cb si_idle_conn_cb;
 
 struct appctx *stream_int_register_handler(struct stream_interface *si, struct applet *app);
 void si_applet_wake_cb(struct stream_interface *si);
@@ -182,24 +181,6 @@ static inline void si_release_endpoint(struct stream_interface *si)
        si_detach_endpoint(si);
 }
 
-/* Turn an existing connection endpoint of stream interface <si> to idle mode,
- * which means that the connection will be polled for incoming events and might
- * be killed by the underlying I/O handler. If <pool> is not null, the
- * connection will also be added at the head of this list. This connection
- * remains assigned to the stream interface it is currently attached to.
- */
-static inline void si_idle_cs(struct stream_interface *si, struct list *pool)
-{
-       struct conn_stream *cs = __objt_cs(si->end);
-       struct connection *conn = cs->conn;
-
-       conn_force_unsubscribe(conn);
-       if (pool)
-               LIST_ADD(pool, &conn->list);
-
-       cs_attach(cs, si, &si_idle_conn_cb);
-}
-
 /* Attach conn_stream <cs> to the stream interface <si>. The stream interface
  * is configured to work with a connection and the connection it configured
  * with a stream interface data layer.
index d40202885d405fe5cad4af82afdb5f69a79cf026..0b45d8591897c46196810aba5f74d410f02b582e 100644 (file)
@@ -583,7 +583,7 @@ int assign_server(struct stream *s)
 
        srv = NULL;
        s->target = NULL;
-       conn = cs_conn(objt_cs(s->si[1].end));
+       conn = s->sess->srv_conn;
 
        if (conn &&
            (conn->flags & CO_FL_CONNECTED) &&
@@ -1056,28 +1056,22 @@ int connect_server(struct stream *s)
 {
        struct connection *cli_conn = NULL;
        struct connection *srv_conn;
+       struct connection *old_conn;
        struct conn_stream *srv_cs;
-       struct conn_stream *old_cs;
        struct server *srv;
        int reuse = 0;
        int err;
 
+
        srv = objt_server(s->target);
-       srv_cs = objt_cs(s->si[1].end);
-       srv_conn = cs_conn(srv_cs);
+       old_conn = srv_conn = s->sess->srv_conn;
        if (srv_conn)
-               reuse = s->target == srv_conn->target;
+               reuse = (s->target == srv_conn->target) &&
+                   (srv_conn->mux->avail_streams(srv_conn) > 0) &&
+                   conn_xprt_ready(srv_conn);
 
        if (srv && !reuse) {
-               old_cs = srv_cs;
-               if (old_cs) {
-                       srv_conn = NULL;
-                       srv_cs->data = NULL;
-                       si_detach_endpoint(&s->si[1]);
-                       /* note: if the connection was in a server's idle
-                        * queue, it doesn't get dequeued.
-                        */
-               }
+               srv_conn = NULL;
 
                /* Below we pick connections from the safe or idle lists based
                 * on the strategy, the fact that this is a first or second
@@ -1114,29 +1108,8 @@ int connect_server(struct stream *s)
                 * other owner's. That way it may remain alive for others to
                 * pick.
                 */
-               if (srv_conn) {
-                       LIST_DEL(&srv_conn->list);
-                       LIST_INIT(&srv_conn->list);
-
-                       /* XXX cognet: this assumes only 1 conn_stream per
-                        * connection, has to be revisited later
-                        */
-                       srv_cs = srv_conn->mux_ctx;
-
-                       if (srv_cs->data) {
-                               si_detach_endpoint(srv_cs->data);
-                               if (old_cs && !(old_cs->conn->flags & CO_FL_PRIVATE)) {
-                                       si_attach_cs(srv_cs->data, old_cs);
-                                       si_idle_cs(srv_cs->data, NULL);
-                               }
-                       }
-                       si_attach_cs(&s->si[1], srv_cs);
+               if (srv_conn)
                        reuse = 1;
-               }
-
-               /* we may have to release our connection if we couldn't swap it */
-               if (old_cs && !old_cs->data)
-                       cs_destroy(old_cs);
        }
 
        if (reuse) {
@@ -1155,13 +1128,74 @@ int connect_server(struct stream *s)
                }
        }
 
+       /* We're about to use another connection, let the mux know we're
+        * done with this one
+        */
+       if (old_conn != srv_conn) {
+               int did_switch = 0;
+
+               if (srv_conn && reuse) {
+                       struct session *sess;
+                       int count = 0;
+
+                       /*
+                        * If we're attempting to reuse a connection, and
+                        * the new connection has only one user, and there
+                        * are no more streams available, attempt to give
+                        * it our old connection
+                        */
+                       list_for_each_entry(sess, &srv_conn->session_list,
+                           conn_list) {
+                               count++;
+                               if (count > 1)
+                                       break;
+                       }
+                       if (count == 1) {
+                               sess = LIST_ELEM(srv_conn->session_list.n,
+                                   struct session *, conn_list);
+                               LIST_DEL(&sess->conn_list);
+                               if (old_conn &&
+                                   !(old_conn->flags & CO_FL_PRIVATE) &&
+                                   (old_conn->mux->avail_streams(old_conn) > 0) &&
+                                   (srv_conn->mux->avail_streams(srv_conn) == 1)) {
+                                       LIST_ADDQ(&old_conn->session_list, &sess->conn_list);
+                                       sess->srv_conn = old_conn;
+                               } else {
+                                       LIST_INIT(&sess->conn_list);
+                                       sess->srv_conn = NULL;
+                               }
+                               did_switch = 1;
+                       }
+
+               }
+               /*
+                * We didn't manage to give our old connection, destroy it
+                */
+               if (old_conn && !did_switch) {
+                       old_conn->owner = NULL;
+                       old_conn->mux->destroy(old_conn);
+                       old_conn = NULL;
+               }
+       }
+
        if (!reuse) {
                srv_cs = si_alloc_cs(&s->si[1], NULL);
                srv_conn = cs_conn(srv_cs);
        } else {
-               /* reusing our connection, take it out of the idle list */
-               LIST_DEL(&srv_conn->list);
-               LIST_INIT(&srv_conn->list);
+               if (srv_conn->mux->avail_streams(srv_conn) == 1) {
+                       /* No more streams available, remove it from the list */
+                       LIST_DEL(&srv_conn->list);
+                       LIST_INIT(&srv_conn->list);
+               }
+               srv_cs = srv_conn->mux->attach(srv_conn);
+               if (srv_cs)
+                       si_attach_cs(&s->si[1], srv_cs);
+       }
+       if (srv_conn && old_conn != srv_conn) {
+               srv_conn->owner = s->sess;
+               s->sess->srv_conn = srv_conn;
+               LIST_DEL(&s->sess->conn_list);
+               LIST_ADDQ(&srv_conn->session_list, &s->sess->conn_list);
        }
 
        if (!srv_cs)
@@ -1203,15 +1237,10 @@ int connect_server(struct stream *s)
                                conn_get_to_addr(cli_conn);
                }
 
-               si_attach_cs(&s->si[1], srv_cs);
-
                assign_tproxy_address(s);
        }
-       else {
-               /* the connection is being reused, just re-attach it */
-               si_attach_cs(&s->si[1], srv_cs);
+       else
                s->flags |= SF_SRV_REUSED;
-       }
 
        /* flag for logging source ip/port */
        if (strm_fe(s)->options2 & PR_O2_SRC_ADDR)
index 87b5cb1dcf3c5f2f169eb23917f6cec7d984eca2..9e4f80178f19e38edc9631509616883cfa6ada11 100644 (file)
@@ -62,7 +62,6 @@ static struct pool_head *pool_head_h2s;
 #define H2_CF_WAIT_FOR_HS       0x00004000  // We did check that at least a stream was waiting for handshake
 #define H2_CF_IS_BACK           0x00008000  // this is an outgoing connection
 
-
 /* H2 connection state, in h2c->st0 */
 enum h2_cs {
        H2_CS_PREFACE,   // init done, waiting for connection preface
@@ -2264,7 +2263,7 @@ static int h2_recv(struct h2c *h2c)
 
        if (!b_data(buf)) {
                h2_release_buf(h2c, &h2c->dbuf);
-               return 0;
+               return conn_xprt_read0_pending(conn);
        }
 
        if (b_data(buf) == buf->size)
@@ -2282,7 +2281,7 @@ static int h2_send(struct h2c *h2c)
        int sent = 0;
 
        if (conn->flags & CO_FL_ERROR)
-               return 0;
+               return 1;
 
 
        if (conn->flags & (CO_FL_HANDSHAKE|CO_FL_WAIT_L4_CONN|CO_FL_WAIT_L6_CONN)) {
index a0f039775824c36e7507bec85c6a865c9cf1c199..a974ec30202f51d20897146fe3eb736dd506e49c 100644 (file)
@@ -30,6 +30,9 @@ static void mux_pt_destroy(struct mux_pt_ctx *ctx)
        LIST_DEL(&conn->list);
        conn_stop_tracking(conn);
        conn_full_close(conn);
+       tasklet_free(ctx->wait_event.task);
+       conn->mux = NULL;
+       conn->mux_ctx = NULL;
        if (conn->destroy_cb)
                conn->destroy_cb(conn);
        /* We don't bother unsubscribing here, as we're about to destroy
@@ -45,7 +48,7 @@ static struct task *mux_pt_io_cb(struct task *t, void *tctx, unsigned short stat
        struct mux_pt_ctx *ctx = tctx;
 
        conn_sock_drain(ctx->conn);
-       if (ctx->conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH))
+       if (ctx->conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH))
                mux_pt_destroy(ctx);
        else
                ctx->conn->xprt->subscribe(ctx->conn, SUB_CAN_RECV,
@@ -135,6 +138,16 @@ static int mux_pt_wake(struct connection *conn)
  */
 static struct conn_stream *mux_pt_attach(struct connection *conn)
 {
+       struct conn_stream *cs;
+       struct mux_pt_ctx *ctx = conn->mux_ctx;
+
+       cs = cs_new(conn);
+       if (!cs)
+               goto fail;
+
+       ctx->cs = cs;
+       return (cs);
+fail:
        return NULL;
 }
 
@@ -149,10 +162,13 @@ static const struct conn_stream *mux_pt_get_first_cs(const struct connection *co
        return cs;
 }
 
-/* Destroy the mux and the associated connection */
+/* Destroy the mux and the associated connection, if no longer used */
 static void mux_pt_destroy_meth(struct connection *conn)
 {
-       mux_pt_destroy(conn->mux_ctx);
+       struct mux_pt_ctx *ctx = conn->mux_ctx;
+
+       if (!(ctx->cs))
+               mux_pt_destroy(ctx);
 }
 
 /*
@@ -164,9 +180,13 @@ static void mux_pt_detach(struct conn_stream *cs)
        struct mux_pt_ctx *ctx = cs->conn->mux_ctx;
 
        /* Subscribe, to know if we got disconnected */
-       conn->xprt->subscribe(conn, SUB_CAN_RECV, &ctx->wait_event);
-       ctx->cs = NULL;
-       mux_pt_destroy(ctx);
+       if (conn->owner != NULL &&
+           !(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH))) {
+               ctx->cs = NULL;
+               conn->xprt->subscribe(conn, SUB_CAN_RECV, &ctx->wait_event);
+       } else
+               /* There's no session attached to that connection, destroy it */
+               mux_pt_destroy(ctx);
 }
 
 static int mux_pt_avail_streams(struct connection *conn)
@@ -184,6 +204,11 @@ static void mux_pt_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
                cs->conn->xprt->shutr(cs->conn, (mode == CS_SHR_DRAIN));
        if (cs->flags & CS_FL_SHW)
                conn_full_close(cs->conn);
+       /* Maybe we've been put in the list of available idle connections,
+        * get ouf of here
+        */
+       LIST_DEL(&cs->conn->list);
+       LIST_INIT(&cs->conn->list);
 }
 
 static void mux_pt_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
@@ -196,6 +221,11 @@ static void mux_pt_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
                conn_sock_shutw(cs->conn, (mode == CS_SHW_NORMAL));
        else
                conn_full_close(cs->conn);
+       /* Maybe we've been put in the list of available idle connections,
+        * get ouf of here
+        */
+       LIST_DEL(&cs->conn->list);
+       LIST_INIT(&cs->conn->list);
 }
 
 /*
index 0ce03b3c15504d1c665eabb9be7aa300404ca09d..c93afa9edef262104959d423bb4ad48ca3fde19d 100644 (file)
@@ -3716,20 +3716,18 @@ void http_end_txn_clean_session(struct stream *s)
         * flags. We also need a more accurate method for computing per-request
         * data.
         */
-       /*
-        * XXX cognet: This is probably wrong, this is killing a whole
-        * connection, in the new world order, we probably want to just kill
-        * the stream, this is to be revisited the day we handle multiple
-        * streams in one server connection.
-        */
        cs = objt_cs(s->si[1].end);
        srv_conn = cs_conn(cs);
 
        /* unless we're doing keep-alive, we want to quickly close the connection
         * to the server.
+        * XXX cognet: If the connection doesn't have a owner then it may not
+        * be referenced  anywhere, just kill it now, even if it could be reused.
+        * To be revisited later when revisited later when we handle connection
+        * pools properly.
         */
        if (((s->txn->flags & TX_CON_WANT_MSK) != TX_CON_WANT_KAL) ||
-           !si_conn_ready(&s->si[1])) {
+           !si_conn_ready(&s->si[1]) || !srv_conn->owner) {
                s->si[1].flags |= SI_FL_NOLINGER | SI_FL_NOHALF;
                si_shutr(&s->si[1]);
                si_shutw(&s->si[1]);
@@ -3805,14 +3803,15 @@ void http_end_txn_clean_session(struct stream *s)
 
        s->target = NULL;
 
-       /* only release our endpoint if we don't intend to reuse the
-        * connection.
+
+       /* If we're doing keepalive, first call the mux detach() method
+        * to let it know we want to detach without freing the connection.
+        * We then can call si_release_endpoint() to destroy the conn_stream
         */
        if (((s->txn->flags & TX_CON_WANT_MSK) != TX_CON_WANT_KAL) ||
-           !si_conn_ready(&s->si[1])) {
-               si_release_endpoint(&s->si[1]);
+           !si_conn_ready(&s->si[1]) || !srv_conn->owner)
                srv_conn = NULL;
-       }
+       si_release_endpoint(&s->si[1]);
 
        s->si[1].state     = s->si[1].prev_state = SI_ST_INI;
        s->si[1].err_type  = SI_ET_NONE;
@@ -3867,18 +3866,18 @@ void http_end_txn_clean_session(struct stream *s)
        /* we're in keep-alive with an idle connection, monitor it if not already done */
        if (srv_conn && LIST_ISEMPTY(&srv_conn->list)) {
                srv = objt_server(srv_conn->target);
-               if (!srv)
-                       si_idle_cs(&s->si[1], NULL);
-               else if (srv_conn->flags & CO_FL_PRIVATE)
-                       si_idle_cs(&s->si[1], (srv->priv_conns ? &srv->priv_conns[tid] : NULL));
-               else if (prev_flags & TX_NOT_FIRST)
-                       /* note: we check the request, not the connection, but
-                        * this is valid for strategies SAFE and AGGR, and in
-                        * case of ALWS, we don't care anyway.
-                        */
-                       si_idle_cs(&s->si[1], (srv->safe_conns ? &srv->safe_conns[tid] : NULL));
-               else
-                       si_idle_cs(&s->si[1], (srv->idle_conns ? &srv->idle_conns[tid] : NULL));
+               if (srv) {
+                       if (srv_conn->flags & CO_FL_PRIVATE)
+                               LIST_ADD(&srv->priv_conns[tid], &srv_conn->list);
+                       else if (prev_flags & TX_NOT_FIRST)
+                               /* note: we check the request, not the connection, but
+                                * this is valid for strategies SAFE and AGGR, and in
+                                * case of ALWS, we don't care anyway.
+                                */
+                               LIST_ADD(&srv->safe_conns[tid], &srv_conn->list);
+                       else
+                               LIST_ADD(&srv->idle_conns[tid], &srv_conn->list);
+               }
        }
        s->req.analysers = strm_li(s) ? strm_li(s)->analysers : 0;
        s->res.analysers = 0;
index d8c8d36cf086f54decf58b35b7e601f577d2b6ad..7d21a6a6c035d8abb9cf33958713b55ea3b5c726 100644 (file)
@@ -67,11 +67,19 @@ struct session *session_new(struct proxy *fe, struct listener *li, enum obj_type
 
 void session_free(struct session *sess)
 {
+       struct connection *conn;
+
        HA_ATOMIC_SUB(&sess->fe->feconn, 1);
        if (sess->listener)
                listener_release(sess->listener);
        session_store_counters(sess);
        vars_prune_per_sess(&sess->vars);
+       conn = objt_conn(sess->origin);
+       if (conn != NULL && conn->mux)
+               conn->mux->destroy(conn);
+       conn = sess->srv_conn;
+       if (conn != NULL && conn->mux)
+               conn->mux->destroy(conn);
        pool_free(pool_head_session, sess);
        HA_ATOMIC_SUB(&jobs, 1);
 }
@@ -377,6 +385,7 @@ static void session_kill_embryonic(struct session *sess, unsigned short state)
        conn_stop_tracking(conn);
        conn_full_close(conn);
        conn_free(conn);
+       sess->origin = NULL;
 
        task_delete(task);
        task_free(task);
index f88b4323b30a98cc20fb3e92e3925bc0585405ad..22c329fc15ea98bd26ca670811b225bab0582236 100644 (file)
@@ -53,7 +53,6 @@ static void stream_int_chk_rcv_applet(struct stream_interface *si);
 static void stream_int_chk_snd_applet(struct stream_interface *si);
 int si_cs_recv(struct conn_stream *cs);
 static int si_cs_process(struct conn_stream *cs);
-static int si_idle_conn_wake_cb(struct conn_stream *cs);
 int si_cs_send(struct conn_stream *cs);
 
 /* stream-interface operations for embedded tasks */
@@ -85,11 +84,6 @@ struct data_cb si_conn_cb = {
        .name    = "STRM",
 };
 
-struct data_cb si_idle_conn_cb = {
-       .wake    = si_idle_conn_wake_cb,
-       .name    = "IDLE",
-};
-
 /*
  * This function only has to be called once after a wakeup event in case of
  * suspected timeout. It controls the stream interface timeouts and sets
@@ -411,29 +405,6 @@ int conn_si_send_proxy(struct connection *conn, unsigned int flag)
 }
 
 
-/* Callback to be used by connection I/O handlers when some activity is detected
- * on an idle server connection. Its main purpose is to kill the connection once
- * a close was detected on it. It returns 0 if it did nothing serious, or -1 if
- * it killed the connection.
- */
-static int si_idle_conn_wake_cb(struct conn_stream *cs)
-{
-       struct connection *conn = cs->conn;
-       struct stream_interface *si = cs->data;
-
-       if (!conn_ctrl_ready(conn))
-               return 0;
-
-       conn_sock_drain(conn);
-
-       if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH) || cs->flags & CS_FL_ERROR) {
-               /* warning, we can't do anything on <conn> after this call ! */
-               si_release_endpoint(si);
-               return -1;
-       }
-       return 0;
-}
-
 /* This function is the equivalent to stream_int_update() except that it's
  * designed to be called from outside the stream handlers, typically the lower
  * layers (applets, connections) after I/O completion. After updating the stream