From: Amaury Denoyelle Date: Wed, 22 Nov 2023 17:02:37 +0000 (+0100) Subject: MEDIUM: rhttp: support multi-thread active connect X-Git-Tag: v2.9-dev11~6 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=3d0c7f2e2add717b4310ffe4585984612221a971;p=thirdparty%2Fhaproxy.git MEDIUM: rhttp: support multi-thread active connect Implement support for active HTTP reverse task migration on listener threads. This operation is done each time a new reversable connection will be instantiated. Instead of directly allocate the connection, a lookup is done among all the listener threads. A comparison is done to select the thread with the smallest number of current reverse connection. If the thread found is different from the current one, the connection allocation is delayed and the task rescheduled on the chosen thread. The connection will then be created and pinned on the new thread. This mechanisms allows to balance reverse HTTP connections accross different threads. Note that rhttp_set_affinity is still defined to disable thread migration on accept. This is necessary as it's unsafe to move an existing connection to another thread. However, active reverse task migration should be sufficient to distribute connections accross several threads. Better than that, this design allows to differentiate standard frontend and reversable connections. The latest are designed to be long-lived so it's useful to have their repartition solely based on others reversed connections. --- diff --git a/src/proto_rhttp.c b/src/proto_rhttp.c index 52d45441fe..9acc411438 100644 --- a/src/proto_rhttp.c +++ b/src/proto_rhttp.c @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -129,9 +130,6 @@ static struct connection *new_reverse_conn(struct listener *l, struct server *sr */ void rhttp_notify_preconn_err(struct listener *l) { - /* For the moment reverse connection are bound only on first thread. */ - BUG_ON(tid != 0); - /* Receiver must reference a reverse connection as pending. */ BUG_ON(!l->rx.rhttp.pend_conn); @@ -150,6 +148,52 @@ void rhttp_notify_preconn_err(struct listener *l) task_queue(l->rx.rhttp.task); } +/* Lookup over listener threads for their current count of active reverse + * HTTP connections. Returns the less loaded thread ID. + */ +static unsigned int select_thread(struct listener *l) +{ + unsigned long mask = l->rx.bind_thread & _HA_ATOMIC_LOAD(&tg->threads_enabled); + unsigned int load_min = HA_ATOMIC_LOAD(&th_ctx->nb_rhttp_conns); + unsigned int load_thr; + unsigned int ret = tid; + int i; + + /* Returns current tid if listener runs on one thread only. */ + if (!atleast2(mask)) + goto end; + + /* Loop over all threads and return the less loaded one. This needs to + * be just an approximation so it's not important if the selected + * thread load has varied since its selection. + */ + + for (i = tg->base; mask; mask >>= 1, i++) { + if (!(mask & 0x1)) + continue; + + load_thr = HA_ATOMIC_LOAD(&ha_thread_ctx[i].nb_rhttp_conns); + if (load_min > load_thr) { + ret = i; + load_min = load_thr; + } + } + + end: + return ret; +} + +/* Detach from its thread and assign it to thread. The task is + * queued to be woken up on the new thread. + */ +static void task_migrate(struct task *task, uint new_tid) +{ + task_unlink_wq(task); + task->expire = TICK_ETERNITY; + task_set_thread(task, new_tid); + task_wakeup(task, TASK_WOKEN_MSG); +} + struct task *rhttp_process(struct task *task, void *ctx, unsigned int state) { struct listener *l = ctx; @@ -179,6 +223,8 @@ struct task *rhttp_process(struct task *task, void *ctx, unsigned int state) /* conn_free() must report preconnect failure using rhttp_notify_preconn_err(). */ BUG_ON(l->rx.rhttp.pend_conn); + + l->rx.rhttp.task->expire = TICKS_TO_MS(now_ms); } else { /* Spurious receiver task woken up despite pend_conn not ready/on error. */ @@ -192,6 +238,14 @@ struct task *rhttp_process(struct task *task, void *ctx, unsigned int state) else { struct server *srv = l->rx.rhttp.srv; + if ((state & TASK_WOKEN_ANY) != TASK_WOKEN_MSG) { + unsigned int new_tid = select_thread(l); + if (new_tid != tid) { + task_migrate(l->rx.rhttp.task, new_tid); + return task; + } + } + /* No pending reverse connection, prepare a new one. Store it in the * listener and return NULL. Connection will be returned later after * reversal is completed. @@ -222,8 +276,16 @@ int rhttp_bind_listener(struct listener *listener, char *errmsg, int errlen) struct ist be_name, sv_name; char *name = NULL; - /* TODO for the moment reverse conn creation is pinned to the first thread only. */ - if (!(task = task_new_here())) { + unsigned long mask; + uint task_tid; + + if (listener->state != LI_ASSIGNED) + return ERR_NONE; /* already bound */ + + /* Retrieve the first thread usable for this listener. */ + mask = listener->rx.bind_thread & _HA_ATOMIC_LOAD(&tg->threads_enabled); + task_tid = my_ffsl(mask) + ha_tgroup_info[listener->rx.bind_tgroup].base; + if (!(task = task_new_on(task_tid))) { snprintf(errmsg, errlen, "Out of memory."); goto err; } @@ -336,6 +398,15 @@ struct connection *rhttp_accept_conn(struct listener *l, int *status) /* Instantiate a new conn if maxconn not yet exceeded. */ if (l->nbconn <= l->bind_conf->maxconn) { + /* Try first if a new thread should be used for the new connection. */ + unsigned int new_tid = select_thread(l); + if (new_tid != tid) { + task_migrate(l->rx.rhttp.task, new_tid); + *status = CO_AC_DONE; + return NULL; + } + + /* No need to use a new thread, use the opportunity to alloc the connection right now. */ l->rx.rhttp.pend_conn = new_reverse_conn(l, l->rx.rhttp.srv); if (!l->rx.rhttp.pend_conn) { *status = CO_AC_PAUSE; @@ -366,8 +437,10 @@ void rhttp_unbind_receiver(struct listener *l) int rhttp_set_affinity(struct connection *conn, int new_tid) { - /* TODO reversal conn rebinding after is disabled for the moment as we - * did not test possible race conditions. + /* Explicitely disable connection thread migration on accept. Indeed, + * it's unsafe to move a connection with its FD to another thread. Note + * that active reverse task thread migration should be sufficient to + * ensure repartition of reversed connections accross listener threads. */ return -1; }