]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: stick-tables: Use a per-shard expiration task
authorOlivier Houchard <ohouchard@haproxy.com>
Mon, 29 Sep 2025 13:37:11 +0000 (15:37 +0200)
committerOlivier Houchard <cognet@ci0.org>
Mon, 20 Oct 2025 13:04:47 +0000 (15:04 +0200)
Instead of having per-table expiration tasks, just use one per shard.
The task will now go through all the tables to expire entries. When a
table gets an expiration earlier than the one previously known, it will
be put in a mt-list, and the task will be responsible to put it into an
eb32, ordered based on the next expiration.
Each per-shard task will run on a different thread, so it should lead to
a better load distribution than the per-table tasks.

include/haproxy/stick_table-t.h
src/stick_table.c

index 19dc600885dea5c08f72aa2220ec9ba3ecf3f684..b8a9f5ab704d8bdceef283647faa1f5afd8f8f65 100644 (file)
@@ -175,7 +175,6 @@ struct stktable {
                                   */
        struct ceb_node  id_node; /* Stick-table are lookup by name here, indexes <id> above. */
        struct pool_head *pool;   /* pool used to allocate sticky sessions */
-       struct task *exp_task;    /* expiration task */
        struct task *sync_task;   /* sync task */
 
        uint64_t hash_seed;      /* hash seed used by shards */
@@ -212,7 +211,11 @@ struct stktable {
        struct {
                struct eb_root keys;      /* head of sticky session tree */
                struct eb_root exps;      /* head of sticky session expiration tree */
+               struct eb32_node in_bucket; /* Each bucket maintains a tree, ordered by expiration date, this does not require sh_lock as only one task will ever modify it */
+               struct mt_list in_bucket_toadd; /* To add to the bucket tree */
+
                __decl_thread(HA_RWLOCK_T sh_lock); /* for the trees above */
+               int next_exp;    /* Next expiration for this table */
        } shards[CONFIG_HAP_TBL_BUCKETS];
 
        unsigned int refcnt;     /* number of local peer over all peers sections
@@ -241,6 +244,13 @@ struct stktable {
        } conf;
 };
 
+struct stk_per_bucket {
+       struct eb_root tables;
+       struct mt_list toadd_tables;
+       __decl_thread(HA_SPINLOCK_T lock); /* Should not have any contention, only there in case a table gets destroyed, which should happen very rarely */
+       struct task *exp_task; /* Expiration task */
+};
+
 extern struct stktable_data_type stktable_data_types[STKTABLE_DATA_TYPES];
 
 /* stick table key */
index 08e08a03c8094972c32cd672c847b3f778292654..dc788e5d38b3fdffdb836548c6b6bc45b017418a 100644 (file)
@@ -66,6 +66,8 @@ struct pool_head *pool_head_stk_ctr __read_mostly = NULL;
 struct stktable *stktables_list;
 struct ceb_root *stktable_by_name = NULL;
 
+static struct stk_per_bucket per_bucket[CONFIG_HAP_TBL_BUCKETS];
+
 #define round_ptr_size(i) (((i) + (sizeof(void *) - 1)) &~ (sizeof(void *) - 1))
 
 /* This function inserts stktable <t> into the tree of known stick-table.
@@ -706,24 +708,33 @@ void stktable_requeue_exp(struct stktable *t, const struct stksess *ts)
 {
        int old_exp, new_exp;
        int expire = ts->expire;
+       int bucket;
+       int len;
 
        if (!t->expire)
                return;
+       if (t->type == SMP_T_STR)
+               len = strlen((const char *)ts->key.key);
+       else
+               len = t->key_size;
+
+       bucket = stktable_calc_shard_num(t, ts->key.key, len);
 
        /* set the task's expire to the newest expiration date. */
-       old_exp = HA_ATOMIC_LOAD(&t->exp_task->expire);
+       old_exp = HA_ATOMIC_LOAD(&t->shards[bucket].next_exp);
        new_exp = tick_first(expire, old_exp);
 
        /* let's not go further if we're already up to date. We have
         * to make sure the compared date doesn't change under us.
         */
        if (new_exp == old_exp &&
-           HA_ATOMIC_CAS(&t->exp_task->expire, &old_exp, new_exp))
+           HA_ATOMIC_CAS(&t->shards[bucket].next_exp, &old_exp, new_exp)) {
                return;
+       }
 
        HA_RWLOCK_WRLOCK(STK_TABLE_LOCK, &t->lock);
 
-       while (!HA_ATOMIC_CAS(&t->exp_task->expire, &old_exp, new_exp)) {
+       while (!HA_ATOMIC_CAS(&t->shards[bucket].next_exp, &old_exp, new_exp)) {
                if (new_exp == old_exp)
                        break;
                __ha_cpu_relax();
@@ -732,9 +743,20 @@ void stktable_requeue_exp(struct stktable *t, const struct stksess *ts)
 
        HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &t->lock);
 
+       if (t->type == SMP_T_STR)
+               len = strlen((const char *)ts->key.key);
+       else
+               len = t->key_size;
+
+
        /* the timer was advanced, only the task can update it */
-       if (!tick_isset(old_exp) || tick_is_lt(new_exp, old_exp))
-               task_wakeup(t->exp_task, TASK_WOKEN_OTHER);
+       if (!tick_isset(old_exp) || tick_is_lt(new_exp, old_exp)) {
+               int ret;
+
+               ret = MT_LIST_TRY_APPEND(&per_bucket[bucket].toadd_tables, &t->shards[bucket].in_bucket_toadd);
+               if (ret)
+                       task_wakeup(per_bucket[bucket].exp_task, TASK_WOKEN_OTHER);
+       }
 }
 
 /* Returns a valid or initialized stksess for the specified stktable_key in the
@@ -922,38 +944,57 @@ struct stksess *stktable_set_entry(struct stktable *table, struct stksess *nts)
  * Task processing function to trash expired sticky sessions. A pointer to the
  * task itself is returned since it never dies.
  */
-struct task *process_table_expire(struct task *task, void *context, unsigned int state)
+struct task *process_tables_expire(struct task *task, void *context, unsigned int state)
 {
-       struct stktable *t = context;
+       struct stk_per_bucket *ps = context;
+       struct stktable *t;
        struct stksess *ts;
-       struct eb32_node *eb;
+       struct eb32_node *table_eb, *eb;
        int updt_locked;
-       int to_visit = STKTABLE_MAX_UPDATES_AT_ONCE;
-       int looped;
-       int exp_next;
+       int to_visit;
        int task_exp;
-       int shard, init_shard;
-       int failed_once = 0;
-       int purged = 0;
+       int shard;
 
        task_exp = TICK_ETERNITY;
 
-       /* start from a random shard number to avoid starvation in the last ones */
-       shard = init_shard = statistical_prng_range(CONFIG_HAP_TBL_BUCKETS - 1);
-       do {
+       shard = (ps - &per_bucket[0]);
+
+       to_visit = STKTABLE_MAX_UPDATES_AT_ONCE;
+
+       /*
+        * First put all the tables to be added from the list to the tree
+        */
+       while ((t = MT_LIST_POP(&ps->toadd_tables, struct stktable *, shards[shard].in_bucket_toadd)) != NULL) {
+               int next_exp = HA_ATOMIC_LOAD(&t->shards[shard].next_exp);
+               /*
+                * We're already in the tree
+                */
+               if (tick_isset(t->shards[shard].in_bucket.key) &&
+                   tick_is_lt(t->shards[shard].in_bucket.key, next_exp))
+                       continue;
+
+               eb32_delete(&t->shards[shard].in_bucket);
+               t->shards[shard].in_bucket.key = next_exp;
+               eb32_insert(&ps->tables, &t->shards[shard].in_bucket);
+       }
+       table_eb = eb32_first(&ps->tables);
+
+       while (table_eb) {
+               struct eb32_node *tmpnode;
+               unsigned int next_exp_table = TICK_ETERNITY;
+
+               t = eb32_entry(table_eb, struct stktable, shards[shard].in_bucket);
                updt_locked = 0;
-               looped = 0;
 
-               if (HA_RWLOCK_TRYWRLOCK(STK_TABLE_LOCK, &t->shards[shard].sh_lock) != 0) {
-                       if (purged || failed_once) {
-                               /* already purged or second failed lock, yield and come back later */
-                               to_visit = 0;
-                               break;
-                       }
-                       /* make sure we succeed at least once */
-                       failed_once = 1;
-                       HA_RWLOCK_WRLOCK(STK_TABLE_LOCK, &t->shards[shard].sh_lock);
+               if (tick_is_lt(now_ms, table_eb->key)) {
+                       /*
+                        * Next expiration in the future, we can give up
+                        */
+                       if (!tick_isset(task_exp) || tick_is_lt(table_eb->key, task_exp))
+                               task_exp = table_eb->key;
+                       break;
                }
+               HA_RWLOCK_WRLOCK(STK_TABLE_LOCK, &t->shards[shard].sh_lock);
 
                eb = eb32_lookup_ge(&t->shards[shard].exps, now_ms - TIMER_LOOK_BACK);
 
@@ -964,9 +1005,6 @@ struct task *process_table_expire(struct task *task, void *context, unsigned int
                                 * half. Let's loop back to the beginning of the tree now if we
                                 * have not yet visited it.
                                 */
-                               if (looped)
-                                       break;
-                               looped = 1;
                                eb = eb32_first(&t->shards[shard].exps);
                                if (likely(!eb))
                                        break;
@@ -974,8 +1012,9 @@ struct task *process_table_expire(struct task *task, void *context, unsigned int
 
                        if (likely(tick_is_lt(now_ms, eb->key))) {
                                /* timer not expired yet, revisit it later */
-                               exp_next = eb->key;
-                               goto out_unlock;
+                               if (!tick_isset(task_exp) || tick_is_lt(eb->key, task_exp))
+                                       task_exp = eb->key;
+                               break;
                        }
 
                        /* Let's quit earlier if we currently hold the update lock */
@@ -1045,34 +1084,60 @@ struct task *process_table_expire(struct task *task, void *context, unsigned int
                        MT_LIST_DELETE(&ts->pend_updts);
                        eb32_delete(&ts->upd);
                        __stksess_free(t, ts);
-                       purged++;
                }
 
-               /* We have found no task to expire in any tree */
-               exp_next = TICK_ETERNITY;
-
-       out_unlock:
                if (updt_locked)
                        HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
 
-               task_exp = tick_first(task_exp, exp_next);
+               /*
+                * Now find the first element, so that we can reposition
+                * the table in the shard tree.
+                */
+               eb = eb32_lookup_ge(&t->shards[shard].exps, now_ms - TIMER_LOOK_BACK);
+               if (!eb)
+                       eb = eb32_first(&t->shards[shard].exps);
+
+               if (eb)
+                       next_exp_table = eb->key;
+               else
+                       next_exp_table = TICK_ETERNITY;
+
+               if (!tick_isset(task_exp) || (tick_isset(next_exp_table) && tick_is_lt(next_exp_table, task_exp)))
+                       task_exp = next_exp_table;
                HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &t->shards[shard].sh_lock);
+               tmpnode = eb32_next(table_eb);
 
-               shard++;
-               if (shard >= CONFIG_HAP_TBL_BUCKETS)
-                       shard = 0;
-       } while (to_visit > 0 && shard != init_shard);
+               if (table_eb->key != next_exp_table) {
+                       int old_exp;
+                       /*
+                        * We have to move the entry in the tree
+                        */
+                       old_exp = HA_ATOMIC_LOAD(&t->shards[shard].next_exp);
+                       if (old_exp >= table_eb->key) {
+                               HA_ATOMIC_CAS(&t->shards[shard].next_exp, &old_exp, next_exp_table);
+                       }
 
-       if (to_visit <= 0) {
+                       eb32_delete(table_eb);
+                       table_eb->key = TICK_ETERNITY;
+                       /*
+                        * If there's more entry, just put it back into the list,
+                        * it'll go back into the tree the next time the task runs.
+                        */
+                       if (next_exp_table != TICK_ETERNITY)
+                               MT_LIST_TRY_APPEND(&per_bucket[shard].toadd_tables, &t->shards[shard].in_bucket_toadd);
+               }
+               table_eb = tmpnode;
+       }
+
+       if (tick_is_le(task_exp, now_ms)) {
+               /*
+                * More to do, we should wake up immediately.
+                */
                task_wakeup(task, TASK_WOKEN_OTHER);
        } else {
-               /* Reset the task's expiration. We do this under the lock so as not
-                * to ruin a call to task_queue() in stktable_requeue_exp() if we
-                * were to update with TICK_ETERNITY.
+               /* Reset the task's expiration.
                 */
-               HA_RWLOCK_WRLOCK(STK_TABLE_LOCK, &t->lock);
                task->expire = task_exp;
-               HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &t->lock);
        }
 
        return task;
@@ -1086,7 +1151,6 @@ struct task *process_table_expire(struct task *task, void *context, unsigned int
  */
 int stktable_init(struct stktable *t, char **err_msg)
 {
-       static int operating_thread = 0;
        int peers_retval = 0;
        int shard;
        int i;
@@ -1098,6 +1162,7 @@ int stktable_init(struct stktable *t, char **err_msg)
                        t->shards[shard].keys = EB_ROOT_UNIQUE;
                        memset(&t->shards[shard].exps, 0, sizeof(t->shards[shard].exps));
                        HA_RWLOCK_INIT(&t->shards[shard].sh_lock);
+                       MT_LIST_INIT(&t->shards[shard].in_bucket_toadd);
                }
 
                t->updates = EB_ROOT_UNIQUE;
@@ -1105,15 +1170,6 @@ int stktable_init(struct stktable *t, char **err_msg)
 
                t->pool = create_pool("sticktables", sizeof(struct stksess) + round_ptr_size(t->data_size) + t->key_size, MEM_F_SHARED);
 
-               if ( t->expire ) {
-                       t->exp_task = task_new_on(operating_thread);
-                       if (!t->exp_task)
-                               goto mem_error;
-                       operating_thread = (operating_thread + 1) % global.nbthread;
-
-                       t->exp_task->process = process_table_expire;
-                       t->exp_task->context = (void *)t;
-               }
                if (t->peers.p && t->peers.p->peers_fe && !(t->peers.p->peers_fe->flags & (PR_FL_DISABLED|PR_FL_STOPPED))) {
                        peers_retval = peers_register_table(t->peers.p, t);
                }
@@ -1176,9 +1232,16 @@ int stktable_init(struct stktable *t, char **err_msg)
  */
 void stktable_deinit(struct stktable *t)
 {
+       int i;
+
        if (!t)
                return;
-       task_destroy(t->exp_task);
+       for (i = 0; i < CONFIG_HAP_TBL_BUCKETS; i++) {
+               HA_SPIN_LOCK(OTHER_LOCK, &per_bucket[i].lock);
+               eb32_delete(&t->shards[i].in_bucket);
+               MT_LIST_DELETE(&t->shards[i].in_bucket_toadd);
+               HA_SPIN_UNLOCK(OTHER_LOCK, &per_bucket[i].lock);
+       }
        tasklet_free(t->updt_task);
        ha_free(&t->pend_updts);
        pool_destroy(t->pool);
@@ -5897,11 +5960,25 @@ static int stkt_create_stk_ctr_pool(void)
 static void stkt_late_init(void)
 {
        struct sample_fetch *f;
+       int i;
 
        f = find_sample_fetch("src", strlen("src"));
        if (f)
                smp_fetch_src = f->process;
        stkt_create_stk_ctr_pool();
+
+       for (i = 0; i < CONFIG_HAP_TBL_BUCKETS; i++) {
+               MT_LIST_INIT(&per_bucket[i].toadd_tables);
+               per_bucket[i].tables = EB_ROOT;
+               per_bucket[i].exp_task = task_new_on(i % global.nbthread);
+               if (per_bucket[i].exp_task == NULL) {
+                       ha_alert("Failed to allocate per-shard task!\n");
+                       exit(1);
+               }
+               per_bucket[i].exp_task->process = process_tables_expire;
+               per_bucket[i].exp_task->context = &per_bucket[i];
+               HA_SPIN_INIT(&per_bucket[i].lock);
+       }
 }
 
 INITCALL0(STG_INIT_2, stkt_late_init);