]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: peers: Remove the update lock by using a mt-list to deal with updates
authorChristopher Faulet <cfaulet@haproxy.com>
Tue, 14 Oct 2025 06:17:33 +0000 (08:17 +0200)
committerChristopher Faulet <cfaulet@haproxy.com>
Tue, 21 Oct 2025 13:11:14 +0000 (15:11 +0200)
In this patch, the update tree is replaced by a mt-list. It is a huge patch
with several changes. Main ones are in the function sending updates.

By using a list instead of a tree, we loose the order between updates and
the ability to restart for a given update using its id (the key to index
updates in the tree). However, to use the tree, it had to be locked and it
was a cause of contention between threads, leading the watchdog to kill the
process in worst cases. Because the idea it to split the updates by buckets
to divide the contention on updated, the order between updates will be lost
anyway. So, the tree can be replaced by a list. By using a mt-list, we can
also remove the update lock.

To be able to use a list instead of a tree, each peer must save its position
in the list, to be able to process new entries only at each loop. These
marker are "special" sticky session of type STKSESS_UPDT_MARKER. Of course,
these marker are not in any stick-tables but only in updates lists. And only
the ownr of a marker can move it in the list. Each peer owns two markers for
each list (so two markers per shared table). The first one used a start
point for a loop, and the other one used as stop point. The way these marker
are moved in the list is not obvious, especially for the first one.

Updates sent during a full resync are now handled exactly a the same way
than other updates. Only the moment the stop marker is set is different.

include/haproxy/peers-t.h
include/haproxy/stick_table-t.h
src/peers.c
src/stick_table.c

index 981b8963fa1e45d4db32c7459b67d33143841fcf..fd5da5f251c32c5b4778873cd1d0de249dccf441 100644 (file)
@@ -138,12 +138,12 @@ struct shared_table {
        int remote_id;
        int flags;
        unsigned int update_id;
+       struct stksess *last;
+       struct stksess *end;
        uint64_t remote_data;
        unsigned int remote_data_nbelem[STKTABLE_DATA_TYPES];
        unsigned int last_acked;
-       unsigned int last_pushed;
        unsigned int last_get;
-       unsigned int teaching_origin;
        struct shared_table *next;    /* next shared table in list */
 };
 
index c4434e89847cf86d396cbc7690ffe0a0f81ef725..64ace526d2c19b44286b1bd10920b0189484497f 100644 (file)
@@ -160,8 +160,8 @@ struct stksess {
        int shard;                /* shard number used by peers */
        int seen;                 /* 0 only when no peer has seen this entry yet */
        struct eb32_node exp;     /* ebtree node used to hold the session in expiration tree */
-       struct eb32_node upd;     /* ebtree node used to hold the update sequence tree */
-       struct mt_list pend_updts;/* list of entries to be inserted/moved in the update sequence tree */
+       struct mt_list upd;       /* entry in the table's update sequence list */
+       struct mt_list pend_updts;/* entry in list of pending updates  */
        unsigned int updt_type;   /* One of STKSESS_UPDT_* value */
        struct ebmb_node key;     /* ebtree node used to hold the session in table */
        /* WARNING! do not put anything after <keys>, it's used by the key */
@@ -232,17 +232,11 @@ struct stktable {
        unsigned int current;     /* number of sticky sessions currently in table */
        THREAD_ALIGN(64);
 
-       struct eb_root updates;   /* head of sticky updates sequence tree, uses updt_lock */
-       struct mt_list *pend_updts; /* list of updates to be added to the update sequence tree, one per thread-group */
-       unsigned int update;      /* uses updt_lock */
-       unsigned int localupdate; /* uses updt_lock */
+       struct mt_list updates;     /* list of sticky updates sequence */
+       struct mt_list *pend_updts; /* list of updates to be added to the update sequence list, one per thread-group */
        struct tasklet *updt_task;/* tasklet responsible for pushing the pending updates into the tree */
 
-       THREAD_ALIGN(64);
-       /* this lock is heavily used and must be on its own cache line */
-       __decl_thread(HA_RWLOCK_T updt_lock); /* lock protecting the updates part */
-
-       /* rarely used config stuff below (should not interfere with updt_lock) */
+       /* rarely used config stuff below */
        struct proxy *proxies_list; /* The list of proxies which reference this stick-table. */
        struct {
                const char *file;     /* The file where the stick-table is declared (global name). */
index 7f041cdfcbf1c0c65d8efe3012db06cd321a8364..3ef6052fafc28940d81db58908a4e9de6c618396 100644 (file)
@@ -52,9 +52,6 @@
 /***********************************/
 /* Current shared table sync state */
 /***********************************/
-#define SHTABLE_F_TEACH_STAGE1      0x00000001 /* Teach state 1 complete */
-#define SHTABLE_F_TEACH_STAGE2      0x00000002 /* Teach state 2 complete */
-
 
 #define PEER_RESYNC_TIMEOUT         5000 /* 5 seconds */
 #define PEER_RECONNECT_TIMEOUT      5000 /* 5 seconds */
@@ -1519,91 +1516,20 @@ static inline int peer_send_error_protomsg(struct appctx *appctx)
        return peer_send_msg(appctx, peer_prepare_error_msg, &p);
 }
 
-/*
- * Function used to lookup for recent stick-table updates associated with
- * <st> shared stick-table when a lesson must be taught a peer (learn state is not PEER_LR_ST_NOTASSIGNED).
- */
-static inline struct stksess *peer_teach_process_stksess_lookup(struct shared_table *st)
-{
-       struct eb32_node *eb;
-       struct stksess *ret;
-
-       eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
-       if (!eb) {
-               eb = eb32_first(&st->table->updates);
-               if (!eb || (eb->key == st->last_pushed)) {
-                       st->last_pushed = st->table->localupdate;
-                       return NULL;
-               }
-       }
-
-       /* if distance between the last pushed and the retrieved key
-        * is greater than the distance last_pushed and the local_update
-        * this means we are beyond localupdate.
-        */
-       if ((eb->key - st->last_pushed) > (st->table->localupdate - st->last_pushed)) {
-               st->last_pushed = st->table->localupdate;
-               return NULL;
-       }
-
-       ret = eb32_entry(eb, struct stksess, upd);
-       if (!_HA_ATOMIC_LOAD(&ret->seen))
-               _HA_ATOMIC_STORE(&ret->seen, 1);
-       return ret;
-}
-
-/*
- * Function used to lookup for recent stick-table updates associated with
- * <st> shared stick-table during teach state 1 step.
- */
-static inline struct stksess *peer_teach_stage1_stksess_lookup(struct shared_table *st)
-{
-       struct eb32_node *eb;
-       struct stksess *ret;
-
-       eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
-       if (!eb) {
-               st->flags |= SHTABLE_F_TEACH_STAGE1;
-               eb = eb32_first(&st->table->updates);
-               if (eb)
-                       st->last_pushed = eb->key - 1;
-               return NULL;
-       }
-
-       ret = eb32_entry(eb, struct stksess, upd);
-       if (!_HA_ATOMIC_LOAD(&ret->seen))
-               _HA_ATOMIC_STORE(&ret->seen, 1);
-       return ret;
-}
-
-/*
- * Function used to lookup for recent stick-table updates associated with
- * <st> shared stick-table during teach state 2 step.
- */
-static inline struct stksess *peer_teach_stage2_stksess_lookup(struct shared_table *st)
-{
-       struct eb32_node *eb;
-       struct stksess *ret;
-
-       eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
-       if (!eb || eb->key > st->teaching_origin) {
-               st->flags |= SHTABLE_F_TEACH_STAGE2;
-               return NULL;
-       }
-
-       ret = eb32_entry(eb, struct stksess, upd);
-       if (!_HA_ATOMIC_LOAD(&ret->seen))
-               _HA_ATOMIC_STORE(&ret->seen, 1);
-       return ret;
-}
-
 /*
  * Generic function to emit update messages for <st> stick-table when a lesson must
  * be taught to the peer <p>.
  *
- * This function temporary unlock/lock <st> when it sends stick-table updates or
- * when decrementing its refcount in case of any error when it sends this updates.
- * It must be called with the stick-table lock released.
+ * This function loops on the stick-table update list from the <last> to the
+ * <end> markers of the peer shared table. Before starting the loop, the <end>
+ * marker, if not in the update list, is appended to the end of the list. If the
+ * loop is interrupted, the <last> maker is inserted before the current sticky
+ * session. It is the restart point for the next time. If the <end> marker is
+ * reached, it means all updated that should be send were send. The <last>
+ * marker is move just after <end> marker and this last one is removed from the
+ * update list.
+ *
+ * When a sticky session is processed, the element is locked.
  *
  * Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
  * Returns -1 if there was not enough room left to send the message,
@@ -1612,86 +1538,98 @@ static inline struct stksess *peer_teach_stage2_stksess_lookup(struct shared_tab
  * If it returns 0 or -1, this function leave <st> locked if already locked when entering this function
  * unlocked if not already locked when entering this function.
  */
-int peer_send_teachmsgs(struct appctx *appctx, struct peer *p,
-                        struct stksess *(*peer_stksess_lookup)(struct shared_table *),
-                        struct shared_table *st)
+int peer_send_teachmsgs(struct appctx *appctx, struct peer *p, struct shared_table *st)
 {
        int ret, new_pushed, use_timed;
        int updates_sent = 0;
-       int failed_once = 0;
+       struct stksess *ts = NULL;
+       struct mt_list back;
 
        TRACE_ENTER(PEERS_EV_SESS_IO, appctx, p, st);
 
        ret = 1;
        use_timed = 0;
-       if (st != p->last_local_table) {
-               ret = peer_send_switchmsg(st, appctx);
-               if (ret <= 0)
-                       goto out;
 
-               p->last_local_table = st;
-       }
-
-       if (peer_stksess_lookup != peer_teach_process_stksess_lookup)
+       if (!(p->flags & PEER_F_TEACH_PROCESS))
                use_timed = !(p->flags & PEER_F_DWNGRD);
 
        /* We force new pushed to 1 to force identifier in update message */
        new_pushed = 1;
 
-       if (HA_RWLOCK_TRYRDLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock) != 0) {
-               /* just don't engage here if there is any contention */
-               applet_have_more_data(appctx);
-               ret = -1;
-               goto out_unlocked;
-       }
-
-       while (1) {
-               struct stksess *ts;
-               unsigned last_pushed;
-
-               /* push local updates */
-               ts = peer_stksess_lookup(st);
-               if (!ts) {
+       MT_LIST_FOR_EACH_ENTRY_LOCKED(ts, &st->last->upd, upd, back) {
+               if (ts == st->end) {
                        ret = 1; // done
                        break;
                }
 
-               last_pushed = ts->upd.key;
-               if (p->srv->shard && ts->shard != p->srv->shard) {
-                       /* Skip this entry */
-                       st->last_pushed = last_pushed;
-                       new_pushed = 1;
+               if (!(p->flags & PEER_F_TEACH_PROCESS) && ts->updt_type != STKSESS_UPDT_LOCAL)
                        continue;
+               else if (ts->updt_type != STKSESS_UPDT_LOCAL && ts->updt_type != STKSESS_UPDT_REMOTE)
+                       continue;
+
+               if (st != p->last_local_table) {
+                       ret = peer_send_switchmsg(st, appctx);
+                       if (ret <= 0)
+                               break;
+
+                       p->last_local_table = st;
                }
 
-               HA_ATOMIC_INC(&ts->ref_cnt);
-               HA_RWLOCK_RDUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
+               if (!_HA_ATOMIC_LOAD(&ts->seen))
+                       _HA_ATOMIC_STORE(&ts->seen, 1);
 
-               ret = peer_send_updatemsg(st, appctx, ts, st->update_id, new_pushed, use_timed);
+               if (p->srv->shard && ts->shard != p->srv->shard)
+                       continue;
 
-               if (HA_RWLOCK_TRYRDLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock) != 0) {
-                       if (failed_once) {
-                               /* we've already faced contention twice in this
-                                * loop, this is getting serious, do not insist
-                                * anymore and come back later
-                                */
-                               HA_ATOMIC_DEC(&ts->ref_cnt);
-                               applet_have_more_data(appctx);
-                               ret = -1;
-                               goto out_unlocked;
-                       }
-                       /* OK contention happens, for this one we'll wait on the
-                        * lock, but only once.
-                        */
-                       failed_once++;
-                       HA_RWLOCK_RDLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
+               if (updates_sent >= peers_max_updates_at_once) {
+                       applet_have_more_data(appctx);
+                       ret = -1;
+               }
+               else {
+                       HA_ATOMIC_INC(&ts->ref_cnt);
+                       ret = peer_send_updatemsg(st, appctx, ts, st->update_id, new_pushed, use_timed);
+                       HA_ATOMIC_DEC(&ts->ref_cnt);
                }
 
-               HA_ATOMIC_DEC(&ts->ref_cnt);
-               if (ret <= 0)
+               if (ret <= 0) {
+                       /* Insert <last> marker before <ts> to process it again
+                        * on next iteration. <ts> remains locked
+                        *
+                        * Before:
+                        *           <back>: <A> <==============> <C>
+                        *
+                        *   <last> <==> ... <A> <==> x    x <==> <C> ...
+                        *
+                        *                       x <==> TS <==> x
+                        *
+                        *  <back>.prev == <A> and <back>.prev->next == MT_LIST_BUSY
+                        *
+                        *  Step 1: detach <last> from the list
+                        *  Step 2: insert <last> after <A>
+                        *           - <last>.prev = <A> (or <back>.prev)
+                        *           - <last>.next = MT_LIST_BUSY (or <back>.prev->next)
+                        *  Step 3: move <A> before <last>
+                        *           - <A>.next = <last> (or back.prev->next = <last>)
+                        *
+                        *  Step 4: update <back>.prev to point on <last>
+                        *
+                        *  After:
+                        *        <back>: <last> <==============> <C>
+                        *
+                        *   ... <A> <==> <last> <==> x    x <==> <C> ...
+                        *
+                        *                       x <==> TS <==> x
+                        *
+                        */
+                       MT_LIST_DELETE(&st->last->upd);
+                       st->last->upd.next = back.prev->next; /* == MT_LIST_BUSY */
+                       __ha_barrier_atomic_store();
+                       st->last->upd.prev = back.prev;
+                       back.prev->next = &st->last->upd;
+                       back.prev = &st->last->upd;
                        break;
+               }
 
-               st->last_pushed = last_pushed;
                p->last.table = st;
                p->last.id = st->update_id;
                st->update_id++;
@@ -1699,76 +1637,19 @@ int peer_send_teachmsgs(struct appctx *appctx, struct peer *p,
 
                /* identifier may not needed in next update message */
                new_pushed = 0;
-
                updates_sent++;
-               if (updates_sent >= peers_max_updates_at_once) {
-                       applet_have_more_data(appctx);
-                       ret = -1;
-                       break;
-               }
        }
 
- out:
-       HA_RWLOCK_RDUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
- out_unlocked:
+       if (ret == 1) {
+               MT_LIST_DELETE(&st->last->upd);
+               MT_LIST_APPEND(&st->end->upd, &st->last->upd);
+               MT_LIST_DELETE(&st->end->upd);
+       }
+
        TRACE_LEAVE(PEERS_EV_SESS_IO, appctx, p, st);
        return ret;
 }
 
-/*
- * Function to emit update messages for <st> stick-table when a lesson must
- * be taught to the peer <p> (learn state is not PEER_LR_ST_NOTASSIGNED).
- *
- * Note that <st> shared stick-table is locked when calling this function, and
- * the lock is dropped then re-acquired.
- *
- * Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
- * Returns -1 if there was not enough room left to send the message,
- * any other negative returned value must  be considered as an error with an appcxt st0
- * returned value equal to PEER_SESS_ST_END.
- */
-static inline int peer_send_teach_process_msgs(struct appctx *appctx, struct peer *p,
-                                               struct shared_table *st)
-{
-       TRACE_PROTO("send teach process messages", PEERS_EV_SESS_IO, appctx, p, st);
-       return peer_send_teachmsgs(appctx, p, peer_teach_process_stksess_lookup, st);
-}
-
-/*
- * Function to emit update messages for <st> stick-table when a lesson must
- * be taught to the peer <p> during teach state 1 step. It must be called with
- * the stick-table lock released.
- *
- * Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
- * Returns -1 if there was not enough room left to send the message,
- * any other negative returned value must  be considered as an error with an appcxt st0
- * returned value equal to PEER_SESS_ST_END.
- */
-static inline int peer_send_teach_stage1_msgs(struct appctx *appctx, struct peer *p,
-                                              struct shared_table *st)
-{
-       TRACE_PROTO("send teach stage1 messages", PEERS_EV_SESS_IO, appctx, p, st);
-       return peer_send_teachmsgs(appctx, p, peer_teach_stage1_stksess_lookup, st);
-}
-
-/*
- * Function to emit update messages for <st> stick-table when a lesson must
- * be taught to the peer <p> during teach state 1 step. It must be called with
- * the stick-table lock released.
- *
- * Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
- * Returns -1 if there was not enough room left to send the message,
- * any other negative returned value must  be considered as an error with an appcxt st0
- * returned value equal to PEER_SESS_ST_END.
- */
-static inline int peer_send_teach_stage2_msgs(struct appctx *appctx, struct peer *p,
-                                              struct shared_table *st)
-{
-       TRACE_PROTO("send teach stage2 messages", PEERS_EV_SESS_IO, appctx, p, st);
-       return peer_send_teachmsgs(appctx, p, peer_teach_stage2_stksess_lookup, st);
-}
-
-
 /*
  * Function used to parse a stick-table update message after it has been received
  * by <p> peer with <msg_cur> as address of the pointer to the position in the
@@ -2578,7 +2459,10 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee
                        TRACE_PROTO("Resync request message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
                        /* prepare tables for a global push */
                        for (st = peer->tables; st; st = st->next) {
-                               st->teaching_origin = st->last_pushed;
+                               MT_LIST_DELETE(&st->last->upd);
+                               MT_LIST_INSERT(&st->table->updates, &st->last->upd);
+                               MT_LIST_DELETE(&st->end->upd);
+                               MT_LIST_APPEND(&st->table->updates, &st->end->upd);
                                st->flags = 0;
                        }
 
@@ -2622,10 +2506,8 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee
                                return 0;
                        }
                        peer->flags |= PEER_F_SYNCHED;
-                       for (st = peer->tables; st; st = st->next) {
-                               st->last_pushed = st->teaching_origin;
-                               st->flags = 0;
-                       }
+                       for (st = peer->tables; st; st = st->next)
+                               MT_LIST_DELETE(&st->end->upd);
 
                        /* reset teaching flags to 0 */
                        peer->flags &= ~PEER_TEACH_FLAGS;
@@ -2745,41 +2627,14 @@ int peer_send_msgs(struct appctx *appctx,
                                st->last_acked = st->last_get;
                        }
 
-                       if (!(peer->flags & PEER_F_TEACH_PROCESS)) {
-                               int must_send;
-
-                               if (HA_RWLOCK_TRYRDLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock)) {
-                                       applet_have_more_data(appctx);
-                                       repl = -1;
+                       if ((!(peer->flags & PEER_F_TEACH_PROCESS) && (peer->learnstate == PEER_LR_ST_NOTASSIGNED)) ||
+                           (peer->flags & (PEER_F_TEACH_PROCESS|PEER_F_TEACH_FINISHED)) == PEER_F_TEACH_PROCESS) {
+                               TRACE_PROTO("send teach messages", PEERS_EV_SESS_IO, appctx, peer, st);
+                               repl = peer_send_teachmsgs(appctx, peer, st);
+                               if (repl <= 0) {
+                                       peer->stop_local_table = peer->last_local_table;
                                        goto end;
                                }
-                               must_send = (peer->learnstate == PEER_LR_ST_NOTASSIGNED) && (st->last_pushed != st->table->localupdate);
-                               HA_RWLOCK_RDUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
-
-                               if (must_send) {
-                                       repl = peer_send_teach_process_msgs(appctx, peer, st);
-                                       if (repl <= 0) {
-                                               peer->stop_local_table = peer->last_local_table;
-                                               goto end;
-                                       }
-                               }
-                       }
-                       else if (!(peer->flags & PEER_F_TEACH_FINISHED)) {
-                               if (!(st->flags & SHTABLE_F_TEACH_STAGE1)) {
-                                       repl = peer_send_teach_stage1_msgs(appctx, peer, st);
-                                       if (repl <= 0) {
-                                               peer->stop_local_table = peer->last_local_table;
-                                               goto end;
-                                       }
-                               }
-
-                               if (!(st->flags & SHTABLE_F_TEACH_STAGE2)) {
-                                       repl = peer_send_teach_stage2_msgs(appctx, peer, st);
-                                       if (repl <= 0) {
-                                               peer->stop_local_table = peer->last_local_table;
-                                               goto end;
-                                       }
-                               }
                        }
 
                        if (st == last_local_table) {
@@ -2955,28 +2810,25 @@ static inline void init_connected_peer(struct peer *peer, struct peers *peers)
        TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_SESS_NEW, NULL, peer);
        peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
 
+       if (peer->local && !(appctx_is_back(peer->appctx))) {
+               /* If the local peer has established the connection (appctx is
+                * on the frontend side), flag it to start to teach lesson.
+                */
+                peer->flags |= PEER_F_TEACH_PROCESS;
+               peer->flags &= ~PEER_F_SYNCHED;
+               TRACE_STATE("peer elected to teach lesson to lacal peer", PEERS_EV_SESS_NEW, NULL, peer);
+       }
+
        /* Init cursors */
        for (st = peer->tables; st ; st = st->next) {
                st->last_get = st->last_acked = 0;
-
-               HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
-               /* if st->last_pushed appears to be in future it means
-                * that the last update is very old and we
-                * remain unconnected a too long time to use this
-                * acknowledgement as a reset.
-                * We should update the protocol to be able to
-                * signal the remote peer that it needs a full resync.
-                * Here a partial fix consist to set st->last_pushed at
-                * the max past value.
-                */
-               if (!(peer->flags & PEER_F_SYNCHED) || (int)(st->table->localupdate - st->last_pushed) < 0) {
-                       st->last_pushed = st->table->localupdate + (2147483648U);
-                       peer->flags &= ~PEER_F_SYNCHED;
-               }
-               st->teaching_origin = st->last_pushed;
                st->flags = 0;
 
-               HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
+               if (!MT_LIST_INLIST(&st->last->upd) || !(peer->flags & PEER_F_SYNCHED)) {
+                       MT_LIST_DELETE(&st->last->upd);
+                       MT_LIST_INSERT(&st->table->updates, &st->last->upd);
+               }
+               MT_LIST_DELETE(&st->end->upd);
        }
 
        /* Awake main task to ack the new peer state */
@@ -2988,15 +2840,6 @@ static inline void init_connected_peer(struct peer *peer, struct peers *peers)
         /* reset teaching flags to 0 */
         peer->flags &= ~PEER_TEACH_FLAGS;
 
-       if (peer->local && !(appctx_is_back(peer->appctx))) {
-               /* If the local peer has established the connection (appctx is
-                * on the frontend side), flag it to start to teach lesson.
-                */
-                peer->flags |= PEER_F_TEACH_PROCESS;
-               peer->flags &= ~PEER_F_SYNCHED;
-               TRACE_STATE("peer elected to teach lesson to lacal peer", PEERS_EV_SESS_NEW, NULL, peer);
-       }
-
        /* Mark the peer as starting and wait the sync task */
        peer->flags |= PEER_F_WAIT_SYNCTASK_ACK;
        peer->appstate = PEER_APP_ST_STARTING;
@@ -3678,9 +3521,20 @@ static void __process_running_peer_sync(struct task *task, struct peers *peers,
 
                                        /* Awake session if there is data to push */
                                        for (st = peer->tables; st ; st = st->next) {
-                                               if (st->last_pushed != st->table->localupdate) {
+                                               struct stksess *ts;
+                                               struct mt_list back;
+
+                                               // TODO: may be handled via an atomic flags ?
+                                               MT_LIST_FOR_EACH_ENTRY_LOCKED(ts, &st->last->upd, upd, back) {
+                                                       if (&ts->upd == &st->table->updates)
+                                                               break;
+                                                       if (&ts->upd != &st->table->updates && ts->updt_type == STKSESS_UPDT_LOCAL) {
+                                                               update_to_push = 1;
+                                                               break;
+                                                       }
+                                               }
+                                               if (update_to_push == 1) {
                                                        /* wake up the peer handler to push local updates */
-                                                       update_to_push = 1;
                                                        /* There is no need to send a heartbeat message
                                                         * when some updates must be pushed. The remote
                                                         * peer will consider <peer> peer as alive when it will
@@ -3867,7 +3721,17 @@ static void __process_stopping_peer_sync(struct task *task, struct peers *peers,
                /* current peer connection is active and established
                 * wake up all peer handlers to push remaining local updates */
                for (st = peer->tables; st ; st = st->next) {
-                       if (st->last_pushed != st->table->localupdate) {
+                       struct stksess *ts;
+                       struct mt_list back;
+                       int wakeup = 0;
+
+                       MT_LIST_FOR_EACH_ENTRY_LOCKED(ts, &st->last->upd, upd, back) {
+                               if (&ts->upd != &st->table->updates && ts->updt_type == STKSESS_UPDT_LOCAL) {
+                                       wakeup = 1;
+                                       break;
+                               }
+                       }
+                       if (wakeup) {
                                appctx_wakeup(peer->appctx);
                                break;
                        }
@@ -4102,6 +3966,14 @@ int peers_register_table(struct peers *peers, struct stktable *table)
                        id = curpeer->tables->local_id;
                st->local_id = id + 1;
 
+               st->last = calloc(1, sizeof(*st->last));
+               st->last->updt_type = STKSESS_UPDT_MARKER;
+               MT_LIST_INIT(&st->last->upd);
+
+               st->end = calloc(1, sizeof(*st->end));
+               st->end->updt_type = STKSESS_UPDT_MARKER;
+               MT_LIST_INIT(&st->end->upd);
+
                /* If peer is local we inc table
                 * refcnt to protect against flush
                 * until this process pushed all
@@ -4314,11 +4186,10 @@ static int peers_dump_peer(struct buffer *msg, struct appctx *appctx, struct pee
                                      "flags=0x%x remote_data=0x%llx",
                                      st, st->local_id, st->remote_id,
                                      st->flags, (unsigned long long)st->remote_data);
-                       chunk_appendf(&trash, "\n              last_acked=%u last_pushed=%u last_get=%u"
-                                     " teaching_origin=%u",
-                                     st->last_acked, st->last_pushed, st->last_get, st->teaching_origin);
-                       chunk_appendf(&trash, "\n              table:%p id=%s update=%u localupdate=%u refcnt=%u",
-                                     t, t->id, t->update, t->localupdate, t->refcnt);
+                       chunk_appendf(&trash, "\n              last_acked=%u last_get=%u",
+                                     st->last_acked, st->last_get);
+                       chunk_appendf(&trash, "\n              table:%p id=%s refcnt=%u",
+                                     t, t->id, t->refcnt);
                        if (flags & PEERS_SHOW_F_DICT) {
                                chunk_appendf(&trash, "\n        TX dictionary cache:");
                                count = 0;
index 5cd37a6671a7424b6cd4568876d9b365df168aa4..b5a90c8eeef45a9dea31d36ef6173d9cb47abaa9 100644 (file)
@@ -130,7 +130,7 @@ void stksess_free(struct stktable *t, struct stksess *ts)
  */
 int __stksess_kill(struct stktable *t, struct stksess *ts)
 {
-       int updt_locked = 0;
+       struct mt_list link;
 
        if (HA_ATOMIC_LOAD(&ts->ref_cnt))
                return 0;
@@ -143,20 +143,21 @@ int __stksess_kill(struct stktable *t, struct stksess *ts)
                return 0;
 
        /* ... and that we didn't leave the update list for the tree */
-       if (ts->upd.node.leaf_p) {
-               updt_locked = 1;
-               HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
-               if (HA_ATOMIC_LOAD(&ts->ref_cnt))
-                       goto out_unlock;
+       if (MT_LIST_INLIST(&ts->upd)) {
+               link = mt_list_lock_full(&ts->upd);
+               if (HA_ATOMIC_LOAD(&ts->ref_cnt)) {
+                       mt_list_unlock_full(&ts->upd, link);
+                       goto out;
+               }
+               mt_list_unlock_link(link);
+               mt_list_unlock_self(&ts->upd);
        }
+
        eb32_delete(&ts->exp);
-       eb32_delete(&ts->upd);
        ebmb_delete(&ts->key);
        __stksess_free(t, ts);
 
-  out_unlock:
-       if (updt_locked)
-               HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
+  out:
        return 1;
 }
 
@@ -269,8 +270,8 @@ static struct stksess *__stksess_init(struct stktable *t, struct stksess * ts)
        ts->seen = 0;
        ts->key.node.leaf_p = NULL;
        ts->exp.node.leaf_p = NULL;
-       ts->upd.node.leaf_p = NULL;
        ts->updt_type = STKSESS_UPDT_NONE;
+       MT_LIST_INIT(&ts->upd);
        MT_LIST_INIT(&ts->pend_updts);
        ts->expire = tick_add(now_ms, MS_TO_TICKS(t->expire));
        HA_RWLOCK_INIT(&ts->lock);
@@ -287,12 +288,12 @@ int stktable_trash_oldest(struct stktable *t)
 {
        struct stksess *ts;
        struct eb32_node *eb;
+       struct mt_list link;
        int max_search; // no more than 50% misses
        int max_per_shard;
        int done_per_shard;
        int batched = 0;
        int to_batch;
-       int updt_locked;
        int failed_once = 0;
        int looped;
        int shard;
@@ -309,7 +310,6 @@ int stktable_trash_oldest(struct stktable *t)
        do {
                done_per_shard = 0;
                looped = 0;
-               updt_locked = 0;
 
                if (HA_RWLOCK_TRYWRLOCK(STK_TABLE_LOCK, &t->shards[shard].sh_lock) != 0) {
                        if (batched)
@@ -381,26 +381,28 @@ int stktable_trash_oldest(struct stktable *t)
                                continue;
                        }
 
+                       if (HA_ATOMIC_LOAD(&ts->ref_cnt))
+                               goto requeue;
+
                        /* if the entry is in the update list, we must be extremely careful
-                        * because peers can see it at any moment and start to use it. Peers
-                        * will take the table's updt_lock for reading when doing that, and
-                        * with that lock held, will grab a ref_cnt before releasing the
-                        * lock. So we must take this lock as well and check the ref_cnt.
+                        * because peers can see it at any moment and start to use it. In this case,
+                        * Peers will lock the element. So to the same here to avoid any conflict
                         */
-                       if (!updt_locked) {
-                               updt_locked = 1;
-                               HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
+                       MT_LIST_DELETE(&ts->pend_updts);
+
+                       if (MT_LIST_INLIST(&ts->upd)) {
+                               link = mt_list_lock_full(&ts->upd);
+                               if (HA_ATOMIC_LOAD(&ts->ref_cnt)) {
+                                       mt_list_unlock_full(&ts->upd, link);
+                                       goto requeue;
+                               }
+                               mt_list_unlock_link(link);
+                               mt_list_unlock_self(&ts->upd);
                        }
-                       /* now we're locked, new peers can't grab it anymore,
-                        * existing ones already have the ref_cnt.
-                        */
-                       if (HA_ATOMIC_LOAD(&ts->ref_cnt))
-                               goto requeue;
+
 
                        /* session expired, trash it */
                        ebmb_delete(&ts->key);
-                       MT_LIST_DELETE(&ts->pend_updts);
-                       eb32_delete(&ts->upd);
                        __stksess_free(t, ts);
                        batched++;
                        done_per_shard++;
@@ -410,9 +412,6 @@ int stktable_trash_oldest(struct stktable *t)
                                break;
                }
 
-               if (updt_locked)
-                       HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
-
                HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &t->shards[shard].sh_lock);
 
                /*
@@ -617,11 +616,9 @@ struct stksess *stktable_lookup(struct stktable *t, struct stksess *ts)
 }
 
 /* Update the expiration timer for <ts> but do not touch its expiration node.
- * The table's expiration timer is updated if set.
- * The node will be also inserted into the update tree if needed, at a position
- * depending if the update is a local or coming from a remote node.
- * If <decrefcnt> is set, the ts entry's ref_cnt will be decremented. The table's
- * updt_lock may be taken for writes.
+ * The table's expiration timer is updated if set.  <ts> will also be inserted
+ * into the pending update list to be added later in the update list. If
+ * <decrefcnt> is set, the <ts> entry's ref_cnt will be decremented.
  */
 void stktable_touch_with_exp(struct stktable *t, struct stksess *ts, int local, int expire, int decrefcnt)
 {
@@ -639,13 +636,13 @@ void stktable_touch_with_exp(struct stktable *t, struct stksess *ts, int local,
                        /* Check if this entry is not in the tree or not
                         * scheduled for at least one peer.
                         */
-                       if (!ts->upd.node.leaf_p || _HA_ATOMIC_LOAD(&ts->seen)) {
+                       if (!MT_LIST_INLIST(&ts->upd) || _HA_ATOMIC_LOAD(&ts->seen)) {
                                _HA_ATOMIC_STORE(&ts->updt_type, STKSESS_UPDT_LOCAL);
                                did_append = MT_LIST_TRY_APPEND(&t->pend_updts[tgid - 1], &ts->pend_updts);
                        }
                }
                else {
-                       if (!ts->upd.node.leaf_p) {
+                       if (!MT_LIST_INLIST(&ts->upd)) {
                                _HA_ATOMIC_STORE(&ts->updt_type, STKSESS_UPDT_REMOTE);
                                did_append = MT_LIST_TRY_APPEND(&t->pend_updts[tgid - 1], &ts->pend_updts);
                        }
@@ -833,12 +830,7 @@ struct stksess *stktable_get_entry(struct stktable *table, struct stktable_key *
 struct task *stktable_add_pend_updates(struct task *t, void *ctx, unsigned int state)
 {
        struct stktable *table = ctx;
-       struct eb32_node *eb;
-       int i = 0, is_local, cur_tgid = tgid - 1, empty_tgid = 0;
-
-       /* we really don't want to wait on this one */
-       if (HA_RWLOCK_TRYWRLOCK(STK_TABLE_LOCK, &table->updt_lock) != 0)
-               goto leave;
+       int i = 0, cur_tgid = tgid - 1, empty_tgid = 0;
 
        for (i = 0; i < STKTABLE_MAX_UPDATES_AT_ONCE; i++) {
                struct stksess *stksess = MT_LIST_POP_LOCKED(&table->pend_updts[cur_tgid], typeof(stksess), pend_updts);
@@ -857,27 +849,10 @@ struct task *stktable_add_pend_updates(struct task *t, void *ctx, unsigned int s
                empty_tgid = 0;
                if (cur_tgid == global.nbtgroups)
                        cur_tgid = 0;
-               is_local = (stksess->updt_type == STKSESS_UPDT_LOCAL);
                stksess->seen = 0;
-               if (is_local) {
-                       stksess->upd.key = ++table->update;
-                       table->localupdate = table->update;
-                       eb32_delete(&stksess->upd);
-               } else {
-                       stksess->upd.key = (++table->update) + (2147483648U);
-               }
+               MT_LIST_DELETE(&stksess->upd);
+               MT_LIST_APPEND(&table->updates, &stksess->upd);
 
-               /* even though very unlikely, it seldom happens that the entry
-                * is already in the tree (both for local and remote ones). We
-                * must dequeue it and requeue it at its new position (e.g. it
-                * might already have been seen by some peers).
-                */
-               eb32_delete(&stksess->upd);
-               eb = eb32_insert(&table->updates, &stksess->upd);
-               if (eb != &stksess->upd)  {
-                       eb32_delete(eb);
-                       eb32_insert(&table->updates, &stksess->upd);
-               }
                /*
                 * Now that we're done inserting the stksess, unlock it.
                 * It is kept locked here to prevent a race condition
@@ -887,9 +862,6 @@ struct task *stktable_add_pend_updates(struct task *t, void *ctx, unsigned int s
                MT_LIST_INIT(&stksess->pend_updts);
        }
 
-       HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &table->updt_lock);
-
-leave:
        /* There's more to do, let's schedule another session */
        if (empty_tgid < global.nbtgroups)
                tasklet_wakeup(table->updt_task);
@@ -957,7 +929,7 @@ struct task *process_tables_expire(struct task *task, void *context, unsigned in
        struct stktable *t;
        struct stksess *ts;
        struct eb32_node *table_eb, *eb;
-       int updt_locked;
+       struct mt_list link;
        int to_visit;
        int task_exp;
        int shard;
@@ -991,7 +963,6 @@ struct task *process_tables_expire(struct task *task, void *context, unsigned in
                unsigned int next_exp_table = TICK_ETERNITY;
 
                t = eb32_entry(table_eb, struct stktable, shards[shard].in_bucket);
-               updt_locked = 0;
 
                if (tick_is_lt(now_ms, table_eb->key)) {
                        /*
@@ -1024,8 +995,7 @@ struct task *process_tables_expire(struct task *task, void *context, unsigned in
                                break;
                        }
 
-                       /* Let's quit earlier if we currently hold the update lock */
-                       to_visit -= 1 + 3 * updt_locked;
+                       to_visit--;
 
                        /* timer looks expired, detach it from the queue */
                        ts = eb32_entry(eb, struct stksess, exp);
@@ -1070,32 +1040,29 @@ struct task *process_tables_expire(struct task *task, void *context, unsigned in
                                continue;
                        }
 
+                       if (HA_ATOMIC_LOAD(&ts->ref_cnt))
+                               goto requeue;
+
                        /* if the entry is in the update list, we must be extremely careful
-                        * because peers can see it at any moment and start to use it. Peers
-                        * will take the table's updt_lock for reading when doing that, and
-                        * with that lock held, will grab a ref_cnt before releasing the
-                        * lock. So we must take this lock as well and check the ref_cnt.
+                        * because peers can see it at any moment and start to use it. In this case,
+                        * Peers will lock the element. So to the same here to avoid any conflict
                         */
-                       if (!updt_locked) {
-                               updt_locked = 1;
-                               HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
+                       MT_LIST_DELETE(&ts->pend_updts);
+                       if (MT_LIST_INLIST(&ts->upd)) {
+                               link = mt_list_lock_full(&ts->upd);
+                               if (HA_ATOMIC_LOAD(&ts->ref_cnt)) {
+                                       mt_list_unlock_full(&ts->upd, link);
+                                       goto requeue;
+                               }
+                               mt_list_unlock_link(link);
+                               mt_list_unlock_self(&ts->upd);
                        }
-                       /* now we're locked, new peers can't grab it anymore,
-                        * existing ones already have the ref_cnt.
-                        */
-                       if (HA_ATOMIC_LOAD(&ts->ref_cnt))
-                               goto requeue;
 
                        /* session expired, trash it */
                        ebmb_delete(&ts->key);
-                       MT_LIST_DELETE(&ts->pend_updts);
-                       eb32_delete(&ts->upd);
                        __stksess_free(t, ts);
                }
 
-               if (updt_locked)
-                       HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
-
                /*
                 * Now find the first element, so that we can reposition
                 * the table in the shard tree.
@@ -1111,6 +1078,7 @@ struct task *process_tables_expire(struct task *task, void *context, unsigned in
 
                if (!tick_isset(task_exp) || (tick_isset(next_exp_table) && tick_is_lt(next_exp_table, task_exp)))
                        task_exp = next_exp_table;
+
                HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &t->shards[shard].sh_lock);
                tmpnode = eb32_next(table_eb);
 
@@ -1172,7 +1140,7 @@ int stktable_init(struct stktable *t, char **err_msg)
                        MT_LIST_INIT(&t->shards[shard].in_bucket_toadd);
                }
 
-               t->updates = EB_ROOT_UNIQUE;
+               MT_LIST_INIT(&t->updates);
 
                t->pool = create_pool("sticktables", sizeof(struct stksess) + round_ptr_size(t->data_size) + t->key_size, MEM_F_SHARED);