#include <haproxy/api.h>
#include <haproxy/connection.h>
#include <haproxy/errors.h>
+#include <haproxy/intops.h>
#include <haproxy/list.h>
#include <haproxy/listener.h>
#include <haproxy/log.h>
*/
void rhttp_notify_preconn_err(struct listener *l)
{
- /* For the moment reverse connection are bound only on first thread. */
- BUG_ON(tid != 0);
-
/* Receiver must reference a reverse connection as pending. */
BUG_ON(!l->rx.rhttp.pend_conn);
task_queue(l->rx.rhttp.task);
}
+/* Lookup over listener <l> threads for their current count of active reverse
+ * HTTP connections. Returns the less loaded thread ID.
+ */
+static unsigned int select_thread(struct listener *l)
+{
+ unsigned long mask = l->rx.bind_thread & _HA_ATOMIC_LOAD(&tg->threads_enabled);
+ unsigned int load_min = HA_ATOMIC_LOAD(&th_ctx->nb_rhttp_conns);
+ unsigned int load_thr;
+ unsigned int ret = tid;
+ int i;
+
+ /* Returns current tid if listener runs on one thread only. */
+ if (!atleast2(mask))
+ goto end;
+
+ /* Loop over all threads and return the less loaded one. This needs to
+ * be just an approximation so it's not important if the selected
+ * thread load has varied since its selection.
+ */
+
+ for (i = tg->base; mask; mask >>= 1, i++) {
+ if (!(mask & 0x1))
+ continue;
+
+ load_thr = HA_ATOMIC_LOAD(&ha_thread_ctx[i].nb_rhttp_conns);
+ if (load_min > load_thr) {
+ ret = i;
+ load_min = load_thr;
+ }
+ }
+
+ end:
+ return ret;
+}
+
+/* Detach <task> from its thread and assign it to <new_tid> thread. The task is
+ * queued to be woken up on the new thread.
+ */
+static void task_migrate(struct task *task, uint new_tid)
+{
+ task_unlink_wq(task);
+ task->expire = TICK_ETERNITY;
+ task_set_thread(task, new_tid);
+ task_wakeup(task, TASK_WOKEN_MSG);
+}
+
struct task *rhttp_process(struct task *task, void *ctx, unsigned int state)
{
struct listener *l = ctx;
/* conn_free() must report preconnect failure using rhttp_notify_preconn_err(). */
BUG_ON(l->rx.rhttp.pend_conn);
+
+ l->rx.rhttp.task->expire = TICKS_TO_MS(now_ms);
}
else {
/* Spurious receiver task woken up despite pend_conn not ready/on error. */
else {
struct server *srv = l->rx.rhttp.srv;
+ if ((state & TASK_WOKEN_ANY) != TASK_WOKEN_MSG) {
+ unsigned int new_tid = select_thread(l);
+ if (new_tid != tid) {
+ task_migrate(l->rx.rhttp.task, new_tid);
+ return task;
+ }
+ }
+
/* No pending reverse connection, prepare a new one. Store it in the
* listener and return NULL. Connection will be returned later after
* reversal is completed.
struct ist be_name, sv_name;
char *name = NULL;
- /* TODO for the moment reverse conn creation is pinned to the first thread only. */
- if (!(task = task_new_here())) {
+ unsigned long mask;
+ uint task_tid;
+
+ if (listener->state != LI_ASSIGNED)
+ return ERR_NONE; /* already bound */
+
+ /* Retrieve the first thread usable for this listener. */
+ mask = listener->rx.bind_thread & _HA_ATOMIC_LOAD(&tg->threads_enabled);
+ task_tid = my_ffsl(mask) + ha_tgroup_info[listener->rx.bind_tgroup].base;
+ if (!(task = task_new_on(task_tid))) {
snprintf(errmsg, errlen, "Out of memory.");
goto err;
}
/* Instantiate a new conn if maxconn not yet exceeded. */
if (l->nbconn <= l->bind_conf->maxconn) {
+ /* Try first if a new thread should be used for the new connection. */
+ unsigned int new_tid = select_thread(l);
+ if (new_tid != tid) {
+ task_migrate(l->rx.rhttp.task, new_tid);
+ *status = CO_AC_DONE;
+ return NULL;
+ }
+
+ /* No need to use a new thread, use the opportunity to alloc the connection right now. */
l->rx.rhttp.pend_conn = new_reverse_conn(l, l->rx.rhttp.srv);
if (!l->rx.rhttp.pend_conn) {
*status = CO_AC_PAUSE;
int rhttp_set_affinity(struct connection *conn, int new_tid)
{
- /* TODO reversal conn rebinding after is disabled for the moment as we
- * did not test possible race conditions.
+ /* Explicitely disable connection thread migration on accept. Indeed,
+ * it's unsafe to move a connection with its FD to another thread. Note
+ * that active reverse task thread migration should be sufficient to
+ * ensure repartition of reversed connections accross listener threads.
*/
return -1;
}