]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: sessions: Store multiple outgoing connections in the session.
authorOlivier Houchard <ohouchard@haproxy.com>
Fri, 30 Nov 2018 16:24:55 +0000 (17:24 +0100)
committerWilly Tarreau <w@1wt.eu>
Sat, 1 Dec 2018 09:47:18 +0000 (10:47 +0100)
Instead of just storing the last connection in the session, store all of
the connections, for at most MAX_SRV_LIST (currently 5) targets.
That way we can do keepalive on more than 1 outgoing connection when the
client uses HTTP/2.

include/proto/connection.h
include/proto/session.h
include/types/connection.h
include/types/session.h
src/backend.c
src/session.c

index a1b979e295bcf1198f77a38e7d56c96e689490fd..3f48b154f5b7b50eb949938fc0441dde5ebbd409 100644 (file)
@@ -662,13 +662,8 @@ static inline void conn_force_unsubscribe(struct connection *conn)
 /* Releases a connection previously allocated by conn_new() */
 static inline void conn_free(struct connection *conn)
 {
-       struct session *sess, *sess_back;
-
-       list_for_each_entry_safe(sess, sess_back, &conn->session_list, conn_list) {
-               sess->srv_conn = NULL;
-               LIST_DEL(&sess->conn_list);
-               LIST_INIT(&sess->conn_list);
-       }
+       /* Remove ourself from the session's connections list, if any. */
+       LIST_DEL(&conn->session_list);
        /* If we temporarily stored the connection as the stream_interface's
         * end point, remove it.
         */
index 68f5c0d0d92645c3d042d3d77b0ad0c82fc81f00..bc5498a44cbf81f68045dd8a3f59f0fd37a6fefc 100644 (file)
@@ -71,6 +71,46 @@ static inline void session_store_counters(struct session *sess)
        }
 }
 
+static inline void session_add_conn(struct session *sess, struct connection *conn, void *target)
+{
+       int avail = -1;
+       int i;
+
+       for (i = 0; i < MAX_SRV_LIST; i++) {
+               if (sess->srv_list[i].target == target) {
+                       avail = i;
+                       break;
+               }
+               if (LIST_ISEMPTY(&sess->srv_list[i].list) && avail == -1)
+                       avail = i;
+       }
+       if (avail == -1) {
+               struct connection *conn, *conn_back;
+               int count = 0;
+               /* We have no slot free, let's free the one with the fewer connections */
+               for (i = 0; i < MAX_SRV_LIST; i++) {
+                       int count_list = 0;
+                       list_for_each_entry(conn, &sess->srv_list[i].list, session_list)
+                           count_list++;
+                       if (count == 0 || count_list < count) {
+                               count = count_list;
+                               avail = i;
+                       }
+               }
+               /* Now unown all the connections */
+               list_for_each_entry_safe(conn, conn_back, &sess->srv_list[avail].list, session_list) {
+                       conn->owner = NULL;
+                       LIST_DEL(&conn->session_list);
+                       LIST_INIT(&conn->session_list);
+                       if (conn->mux)
+                               conn->mux->destroy(conn);
+               }
+
+       }
+       sess->srv_list[avail].target = target;
+       LIST_ADDQ(&sess->srv_list[avail].list, &conn->session_list);
+}
+
 
 #endif /* _PROTO_SESSION_H */
 
index dbf985be9c63314011ac7885ed90ff13541a4b82..85afca0306ce080bb6c1524408ed8616d25ca63e 100644 (file)
@@ -414,7 +414,7 @@ struct connection {
        struct wait_event *send_wait; /* Task to wake when we're ready to send */
        struct wait_event *recv_wait; /* Task to wake when we're ready to recv */
        struct list list;             /* attach point to various connection lists (idle, ...) */
-       struct list session_list;     /* List of all sessions attached to this connection */
+       struct list session_list;     /* List of attached connections to a session */
        int xprt_st;                  /* transport layer state, initialized to zero */
        int tmp_early_data;           /* 1st byte of early data, if any */
        int sent_early_data;          /* Amount of early data we sent so far */
index e0e1455640baa978c8646859a050c704fe91b704..334a071578ee9e31ce5b77b1665649ad94a846eb 100644 (file)
 #include <types/task.h>
 #include <types/vars.h>
 
+struct sess_srv_list {
+       void *target;
+       struct list list;
+};
+
+#define MAX_SRV_LIST   5
+
 struct session {
        struct proxy *fe;               /* the proxy this session depends on for the client side */
        struct listener *listener;      /* the listener by which the request arrived */
@@ -47,8 +54,7 @@ struct session {
        struct vars vars;               /* list of variables for the session scope. */
        struct task *task;              /* handshake timeout processing */
        long t_handshake;               /* handshake duration, -1 = not completed */
-       struct connection *srv_conn;    /* Server connection we last used */
-       struct list conn_list;          /* List element for the session list in each connection */
+       struct sess_srv_list srv_list[MAX_SRV_LIST]; /* List of servers and the connections the session is currently responsible for */
 };
 
 #endif /* _TYPES_SESSION_H */
index fa01a7d240f73be8382577ccca981e968f4fabac..cde3c2f40c8c30fdc4a11f7890a27f483751671d 100644 (file)
@@ -52,6 +52,7 @@
 #include <proto/queue.h>
 #include <proto/sample.h>
 #include <proto/server.h>
+#include <proto/session.h>
 #include <proto/stream.h>
 #include <proto/stream_interface.h>
 #include <proto/task.h>
@@ -559,8 +560,9 @@ int assign_server(struct stream *s)
 {
        struct connection *conn;
        struct server *conn_slot;
-       struct server *srv, *prev_srv;
+       struct server *srv = NULL, *prev_srv;
        int err;
+       int i;
 
        DPRINTF(stderr,"assign_server : s=%p\n",s);
 
@@ -584,27 +586,27 @@ int assign_server(struct stream *s)
 
        srv = NULL;
        s->target = NULL;
-       conn = s->sess->srv_conn;
-
-       if (conn &&
-           (conn->flags & CO_FL_CONNECTED) &&
-           objt_server(conn->target) && __objt_server(conn->target)->proxy == s->be &&
-           (s->be->lbprm.algo & BE_LB_KIND) != BE_LB_KIND_HI &&
-           ((s->txn && s->txn->flags & TX_PREFER_LAST) ||
-            ((s->be->options & PR_O_PREF_LAST) &&
-             (!s->be->max_ka_queue ||
-              server_has_room(__objt_server(conn->target)) ||
-              (__objt_server(conn->target)->nbpend + 1) < s->be->max_ka_queue))) &&
-           srv_currently_usable(__objt_server(conn->target))) {
-               /* This stream was relying on a server in a previous request
-                * and the proxy has "option prefer-last-server" set
-                * and balance algorithm dont tell us to do otherwise, so
-                * let's try to reuse the same server.
-                */
-               srv = __objt_server(conn->target);
-               s->target = &srv->obj_type;
+
+       for (i = 0; i < MAX_SRV_LIST; i++) {
+               list_for_each_entry(conn, &s->sess->srv_list[i].list, session_list) {
+                       if (conn->flags & CO_FL_CONNECTED &&
+                           objt_server(conn->target) &&
+                           __objt_server(conn->target)->proxy == s->be &&
+                           (s->be->lbprm.algo & BE_LB_KIND) != BE_LB_KIND_HI &&
+                           ((s->txn && s->txn->flags & TX_PREFER_LAST) ||
+                            ((s->be->options & PR_O_PREF_LAST) &&
+                             (!s->be->max_ka_queue ||
+                              server_has_room(__objt_server(conn->target)) ||
+                              (__objt_server(conn->target)->nbpend + 1) < s->be->max_ka_queue))) &&
+                           srv_currently_usable(__objt_server(conn->target))) {
+                               srv = __objt_server(conn->target);
+                               s->target = &srv->obj_type;
+                               goto out_ok;
+
+                       }
+               }
        }
-       else if (s->be->lbprm.algo & BE_LB_KIND) {
+       if (s->be->lbprm.algo & BE_LB_KIND) {
 
                /* we must check if we have at least one server available */
                if (!s->be->lbprm.tot_weight) {
@@ -748,6 +750,7 @@ int assign_server(struct stream *s)
                goto out;
        }
 
+out_ok:
        s->flags |= SF_ASSIGNED;
        err = SRV_STATUS_OK;
  out:
@@ -1100,20 +1103,39 @@ fail:
 int connect_server(struct stream *s)
 {
        struct connection *cli_conn = NULL;
-       struct connection *srv_conn;
-       struct connection *old_conn;
+       struct connection *srv_conn = NULL;
+       struct connection *old_conn = NULL;
        struct conn_stream *srv_cs;
        struct server *srv;
        int reuse = 0;
        int err;
+       int i;
 
 
+       for (i = 0; i < MAX_SRV_LIST; i++) {
+               if (s->sess->srv_list[i].target == s->target) {
+                       list_for_each_entry(srv_conn, &s->sess->srv_list[i].list,
+                           session_list) {
+                               if (conn_xprt_ready(srv_conn) &&
+                                   srv_conn->mux && (srv_conn->mux->avail_streams(srv_conn) > 0)) {
+                                       reuse = 1;
+                                       break;
+                               }
+                       }
+               }
+       }
+       if (!srv_conn) {
+               for (i = 0; i < MAX_SRV_LIST; i++) {
+                       if (!LIST_ISEMPTY(&s->sess->srv_list[i].list)) {
+                               srv_conn = LIST_ELEM(&s->sess->srv_list[i].list,
+                                   struct connection *, session_list);
+                               break;
+                       }
+               }
+       }
+       old_conn = srv_conn;
+
        srv = objt_server(s->target);
-       old_conn = srv_conn = s->sess->srv_conn;
-       if (srv_conn)
-               reuse = (s->target == srv_conn->target) &&
-                   conn_xprt_ready(srv_conn) && srv_conn->mux &&
-                   (srv_conn->mux->avail_streams(srv_conn) > 0);
 
        if (srv && !reuse) {
                srv_conn = NULL;
@@ -1176,55 +1198,25 @@ int connect_server(struct stream *s)
        /* We're about to use another connection, let the mux know we're
         * done with this one
         */
-       if (old_conn != srv_conn) {
-               int did_switch = 0;
+       if (old_conn != srv_conn || !reuse) {
 
                if (srv_conn && reuse) {
-                       struct session *sess;
-                       int count = 0;
-
-                       /*
-                        * If we're attempting to reuse a connection, and
-                        * the new connection has only one user, and there
-                        * are no more streams available, attempt to give
-                        * it our old connection
-                        */
-                       list_for_each_entry(sess, &srv_conn->session_list,
-                           conn_list) {
-                               count++;
-                               if (count > 1)
-                                       break;
-                       }
-                       if (count == 1) {
-                               sess = LIST_ELEM(srv_conn->session_list.n,
-                                   struct session *, conn_list);
-                               LIST_DEL(&sess->conn_list);
+                       struct session *sess = srv_conn->owner;
+
+                       if (sess) {
                                if (old_conn &&
                                    !(old_conn->flags & CO_FL_PRIVATE) &&
                                    old_conn->mux != NULL &&
                                    (old_conn->mux->avail_streams(old_conn) > 0) &&
                                    (srv_conn->mux->avail_streams(srv_conn) == 1)) {
-                                       LIST_ADDQ(&old_conn->session_list, &sess->conn_list);
-                                       sess->srv_conn = old_conn;
-                                       did_switch = 1;
-                               } else {
-                                       LIST_INIT(&sess->conn_list);
-                                       sess->srv_conn = NULL;
+                                       LIST_DEL(&old_conn->session_list);
+                                       LIST_INIT(&old_conn->session_list);
+                                       session_add_conn(sess, old_conn, s->target);
+                                       old_conn->owner = sess;
                                }
                        }
 
                }
-               /*
-                * We didn't manage to give our old connection, destroy it
-                */
-               if (old_conn && !did_switch) {
-                       old_conn->owner = NULL;
-                       LIST_DEL(&old_conn->list);
-                       LIST_INIT(&old_conn->list);
-                       if (old_conn->mux)
-                               old_conn->mux->destroy(old_conn);
-                       old_conn = NULL;
-               }
        }
 
        if (!reuse) {
@@ -1242,9 +1234,8 @@ int connect_server(struct stream *s)
        }
        if (srv_conn && old_conn != srv_conn) {
                srv_conn->owner = s->sess;
-               s->sess->srv_conn = srv_conn;
-               LIST_DEL(&s->sess->conn_list);
-               LIST_ADDQ(&srv_conn->session_list, &s->sess->conn_list);
+               LIST_DEL(&srv_conn->session_list);
+               session_add_conn(s->sess, srv_conn, s->target);
        }
 
        if (!srv_conn)
index c23d35c06e8b5bcbba17079962cf1c3d1620107c..8080c94deec6c5148056a368955120a78dff1e44 100644 (file)
@@ -41,6 +41,7 @@ static struct task *session_expire_embryonic(struct task *t, void *context, unsi
 struct session *session_new(struct proxy *fe, struct listener *li, enum obj_type *origin)
 {
        struct session *sess;
+       int i;
 
        sess = pool_alloc(pool_head_session);
        if (sess) {
@@ -53,21 +54,24 @@ struct session *session_new(struct proxy *fe, struct listener *li, enum obj_type
                vars_init(&sess->vars, SCOPE_SESS);
                sess->task = NULL;
                sess->t_handshake = -1; /* handshake not done yet */
-               LIST_INIT(&sess->conn_list);
-               sess->srv_conn = NULL;
                HA_ATOMIC_UPDATE_MAX(&fe->fe_counters.conn_max,
                                     HA_ATOMIC_ADD(&fe->feconn, 1));
                if (li)
                        proxy_inc_fe_conn_ctr(li, fe);
                HA_ATOMIC_ADD(&totalconn, 1);
                HA_ATOMIC_ADD(&jobs, 1);
+               for (i = 0; i < MAX_SRV_LIST; i++) {
+                       sess->srv_list[i].target = NULL;
+                       LIST_INIT(&sess->srv_list[i].list);
+               }
        }
        return sess;
 }
 
 void session_free(struct session *sess)
 {
-       struct connection *conn;
+       struct connection *conn, *conn_back;
+       int i;
 
        HA_ATOMIC_SUB(&sess->fe->feconn, 1);
        if (sess->listener)
@@ -77,21 +81,25 @@ void session_free(struct session *sess)
        conn = objt_conn(sess->origin);
        if (conn != NULL && conn->mux)
                conn->mux->destroy(conn);
-       conn = sess->srv_conn;
-       if (conn != NULL && conn->mux) {
-               LIST_DEL(&conn->list);
-               LIST_INIT(&conn->list);
-               conn->owner = NULL;
-               conn->mux->destroy(conn);
-       } else if (conn) {
-               /* We have a connection, but not yet an associated mux.
-                * So destroy it now.
-                */
-               conn_stop_tracking(conn);
-               conn_full_close(conn);
-               conn_free(conn);
+       for (i = 0; i < MAX_SRV_LIST; i++) {
+               int count = 0;
+               list_for_each_entry_safe(conn, conn_back, &sess->srv_list[i].list, session_list) {
+                       count++;
+                       if (conn->mux) {
+                               LIST_DEL(&conn->session_list);
+                               LIST_INIT(&conn->session_list);
+                               conn->owner = NULL;
+                               conn->mux->destroy(conn);
+                       } else {
+                               /* We have a connection, but not yet an associated mux.
+                                * So destroy it now.
+                                */
+                               conn_stop_tracking(conn);
+                               conn_full_close(conn);
+                               conn_free(conn);
+                       }
+               }
        }
-       LIST_DEL(&sess->conn_list);
        pool_free(pool_head_session, sess);
        HA_ATOMIC_SUB(&jobs, 1);
 }