]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: rhttp: support multi-thread active connect
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Wed, 22 Nov 2023 17:02:37 +0000 (18:02 +0100)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Thu, 23 Nov 2023 16:45:56 +0000 (17:45 +0100)
Implement support for active HTTP reverse task migration on listener
threads. This operation is done each time a new reversable connection
will be instantiated. Instead of directly allocate the connection, a
lookup is done among all the listener threads.

A comparison is done to select the thread with the smallest number of
current reverse connection. If the thread found is different from the
current one, the connection allocation is delayed and the task
rescheduled on the chosen thread. The connection will then be created
and pinned on the new thread. This mechanisms allows to balance reverse
HTTP connections accross different threads.

Note that rhttp_set_affinity is still defined to disable thread
migration on accept. This is necessary as it's unsafe to move an
existing connection to another thread. However, active reverse task
migration should be sufficient to distribute connections accross several
threads. Better than that, this design allows to differentiate standard
frontend and reversable connections. The latest are designed to be
long-lived so it's useful to have their repartition solely based on
others reversed connections.

src/proto_rhttp.c

index 52d45441fee7f2cf70331dfe3d11229d99203b68..9acc41143805fbe638c352168876d7be3e503d50 100644 (file)
@@ -4,6 +4,7 @@
 #include <haproxy/api.h>
 #include <haproxy/connection.h>
 #include <haproxy/errors.h>
+#include <haproxy/intops.h>
 #include <haproxy/list.h>
 #include <haproxy/listener.h>
 #include <haproxy/log.h>
@@ -129,9 +130,6 @@ static struct connection *new_reverse_conn(struct listener *l, struct server *sr
  */
 void rhttp_notify_preconn_err(struct listener *l)
 {
-       /* For the moment reverse connection are bound only on first thread. */
-       BUG_ON(tid != 0);
-
        /* Receiver must reference a reverse connection as pending. */
        BUG_ON(!l->rx.rhttp.pend_conn);
 
@@ -150,6 +148,52 @@ void rhttp_notify_preconn_err(struct listener *l)
        task_queue(l->rx.rhttp.task);
 }
 
+/* Lookup over listener <l> threads for their current count of active reverse
+ * HTTP connections. Returns the less loaded thread ID.
+ */
+static unsigned int select_thread(struct listener *l)
+{
+       unsigned long mask = l->rx.bind_thread & _HA_ATOMIC_LOAD(&tg->threads_enabled);
+       unsigned int load_min = HA_ATOMIC_LOAD(&th_ctx->nb_rhttp_conns);
+       unsigned int load_thr;
+       unsigned int ret = tid;
+       int i;
+
+       /* Returns current tid if listener runs on one thread only. */
+       if (!atleast2(mask))
+               goto end;
+
+       /* Loop over all threads and return the less loaded one. This needs to
+        * be just an approximation so it's not important if the selected
+        * thread load has varied since its selection.
+        */
+
+       for (i = tg->base; mask; mask >>= 1, i++) {
+               if (!(mask & 0x1))
+                       continue;
+
+               load_thr = HA_ATOMIC_LOAD(&ha_thread_ctx[i].nb_rhttp_conns);
+               if (load_min > load_thr) {
+                       ret = i;
+                       load_min = load_thr;
+               }
+       }
+
+ end:
+       return ret;
+}
+
+/* Detach <task> from its thread and assign it to <new_tid> thread. The task is
+ * queued to be woken up on the new thread.
+ */
+static void task_migrate(struct task *task, uint new_tid)
+{
+       task_unlink_wq(task);
+       task->expire = TICK_ETERNITY;
+       task_set_thread(task, new_tid);
+       task_wakeup(task, TASK_WOKEN_MSG);
+}
+
 struct task *rhttp_process(struct task *task, void *ctx, unsigned int state)
 {
        struct listener *l = ctx;
@@ -179,6 +223,8 @@ struct task *rhttp_process(struct task *task, void *ctx, unsigned int state)
 
                        /* conn_free() must report preconnect failure using rhttp_notify_preconn_err(). */
                        BUG_ON(l->rx.rhttp.pend_conn);
+
+                       l->rx.rhttp.task->expire = TICKS_TO_MS(now_ms);
                }
                else {
                        /* Spurious receiver task woken up despite pend_conn not ready/on error. */
@@ -192,6 +238,14 @@ struct task *rhttp_process(struct task *task, void *ctx, unsigned int state)
        else {
                struct server *srv = l->rx.rhttp.srv;
 
+               if ((state & TASK_WOKEN_ANY) != TASK_WOKEN_MSG) {
+                       unsigned int new_tid = select_thread(l);
+                       if (new_tid != tid) {
+                               task_migrate(l->rx.rhttp.task, new_tid);
+                               return task;
+                       }
+               }
+
                /* No pending reverse connection, prepare a new one. Store it in the
                 * listener and return NULL. Connection will be returned later after
                 * reversal is completed.
@@ -222,8 +276,16 @@ int rhttp_bind_listener(struct listener *listener, char *errmsg, int errlen)
        struct ist be_name, sv_name;
        char *name = NULL;
 
-       /* TODO for the moment reverse conn creation is pinned to the first thread only. */
-       if (!(task = task_new_here())) {
+       unsigned long mask;
+       uint task_tid;
+
+       if (listener->state != LI_ASSIGNED)
+               return ERR_NONE; /* already bound */
+
+       /* Retrieve the first thread usable for this listener. */
+       mask = listener->rx.bind_thread & _HA_ATOMIC_LOAD(&tg->threads_enabled);
+       task_tid = my_ffsl(mask) + ha_tgroup_info[listener->rx.bind_tgroup].base;
+       if (!(task = task_new_on(task_tid))) {
                snprintf(errmsg, errlen, "Out of memory.");
                goto err;
        }
@@ -336,6 +398,15 @@ struct connection *rhttp_accept_conn(struct listener *l, int *status)
 
                /* Instantiate a new conn if maxconn not yet exceeded. */
                if (l->nbconn <= l->bind_conf->maxconn) {
+                       /* Try first if a new thread should be used for the new connection. */
+                       unsigned int new_tid = select_thread(l);
+                       if (new_tid != tid) {
+                               task_migrate(l->rx.rhttp.task, new_tid);
+                               *status = CO_AC_DONE;
+                               return NULL;
+                       }
+
+                       /* No need to use a new thread, use the opportunity to alloc the connection right now. */
                        l->rx.rhttp.pend_conn = new_reverse_conn(l, l->rx.rhttp.srv);
                        if (!l->rx.rhttp.pend_conn) {
                                *status = CO_AC_PAUSE;
@@ -366,8 +437,10 @@ void rhttp_unbind_receiver(struct listener *l)
 
 int rhttp_set_affinity(struct connection *conn, int new_tid)
 {
-       /* TODO reversal conn rebinding after is disabled for the moment as we
-        * did not test possible race conditions.
+       /* Explicitely disable connection thread migration on accept. Indeed,
+        * it's unsafe to move a connection with its FD to another thread. Note
+        * that active reverse task thread migration should be sufficient to
+        * ensure repartition of reversed connections accross listener threads.
         */
        return -1;
 }