/***********************************/
/* Current shared table sync state */
/***********************************/
-#define SHTABLE_F_TEACH_STAGE1 0x00000001 /* Teach state 1 complete */
-#define SHTABLE_F_TEACH_STAGE2 0x00000002 /* Teach state 2 complete */
-
#define PEER_RESYNC_TIMEOUT 5000 /* 5 seconds */
#define PEER_RECONNECT_TIMEOUT 5000 /* 5 seconds */
return peer_send_msg(appctx, peer_prepare_error_msg, &p);
}
-/*
- * Function used to lookup for recent stick-table updates associated with
- * <st> shared stick-table when a lesson must be taught a peer (learn state is not PEER_LR_ST_NOTASSIGNED).
- */
-static inline struct stksess *peer_teach_process_stksess_lookup(struct shared_table *st)
-{
- struct eb32_node *eb;
- struct stksess *ret;
-
- eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
- if (!eb) {
- eb = eb32_first(&st->table->updates);
- if (!eb || (eb->key == st->last_pushed)) {
- st->last_pushed = st->table->localupdate;
- return NULL;
- }
- }
-
- /* if distance between the last pushed and the retrieved key
- * is greater than the distance last_pushed and the local_update
- * this means we are beyond localupdate.
- */
- if ((eb->key - st->last_pushed) > (st->table->localupdate - st->last_pushed)) {
- st->last_pushed = st->table->localupdate;
- return NULL;
- }
-
- ret = eb32_entry(eb, struct stksess, upd);
- if (!_HA_ATOMIC_LOAD(&ret->seen))
- _HA_ATOMIC_STORE(&ret->seen, 1);
- return ret;
-}
-
-/*
- * Function used to lookup for recent stick-table updates associated with
- * <st> shared stick-table during teach state 1 step.
- */
-static inline struct stksess *peer_teach_stage1_stksess_lookup(struct shared_table *st)
-{
- struct eb32_node *eb;
- struct stksess *ret;
-
- eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
- if (!eb) {
- st->flags |= SHTABLE_F_TEACH_STAGE1;
- eb = eb32_first(&st->table->updates);
- if (eb)
- st->last_pushed = eb->key - 1;
- return NULL;
- }
-
- ret = eb32_entry(eb, struct stksess, upd);
- if (!_HA_ATOMIC_LOAD(&ret->seen))
- _HA_ATOMIC_STORE(&ret->seen, 1);
- return ret;
-}
-
-/*
- * Function used to lookup for recent stick-table updates associated with
- * <st> shared stick-table during teach state 2 step.
- */
-static inline struct stksess *peer_teach_stage2_stksess_lookup(struct shared_table *st)
-{
- struct eb32_node *eb;
- struct stksess *ret;
-
- eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
- if (!eb || eb->key > st->teaching_origin) {
- st->flags |= SHTABLE_F_TEACH_STAGE2;
- return NULL;
- }
-
- ret = eb32_entry(eb, struct stksess, upd);
- if (!_HA_ATOMIC_LOAD(&ret->seen))
- _HA_ATOMIC_STORE(&ret->seen, 1);
- return ret;
-}
-
/*
* Generic function to emit update messages for <st> stick-table when a lesson must
* be taught to the peer <p>.
*
- * This function temporary unlock/lock <st> when it sends stick-table updates or
- * when decrementing its refcount in case of any error when it sends this updates.
- * It must be called with the stick-table lock released.
+ * This function loops on the stick-table update list from the <last> to the
+ * <end> markers of the peer shared table. Before starting the loop, the <end>
+ * marker, if not in the update list, is appended to the end of the list. If the
+ * loop is interrupted, the <last> maker is inserted before the current sticky
+ * session. It is the restart point for the next time. If the <end> marker is
+ * reached, it means all updated that should be send were send. The <last>
+ * marker is move just after <end> marker and this last one is removed from the
+ * update list.
+ *
+ * When a sticky session is processed, the element is locked.
*
* Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
* Returns -1 if there was not enough room left to send the message,
* If it returns 0 or -1, this function leave <st> locked if already locked when entering this function
* unlocked if not already locked when entering this function.
*/
-int peer_send_teachmsgs(struct appctx *appctx, struct peer *p,
- struct stksess *(*peer_stksess_lookup)(struct shared_table *),
- struct shared_table *st)
+int peer_send_teachmsgs(struct appctx *appctx, struct peer *p, struct shared_table *st)
{
int ret, new_pushed, use_timed;
int updates_sent = 0;
- int failed_once = 0;
+ struct stksess *ts = NULL;
+ struct mt_list back;
TRACE_ENTER(PEERS_EV_SESS_IO, appctx, p, st);
ret = 1;
use_timed = 0;
- if (st != p->last_local_table) {
- ret = peer_send_switchmsg(st, appctx);
- if (ret <= 0)
- goto out;
- p->last_local_table = st;
- }
-
- if (peer_stksess_lookup != peer_teach_process_stksess_lookup)
+ if (!(p->flags & PEER_F_TEACH_PROCESS))
use_timed = !(p->flags & PEER_F_DWNGRD);
/* We force new pushed to 1 to force identifier in update message */
new_pushed = 1;
- if (HA_RWLOCK_TRYRDLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock) != 0) {
- /* just don't engage here if there is any contention */
- applet_have_more_data(appctx);
- ret = -1;
- goto out_unlocked;
- }
-
- while (1) {
- struct stksess *ts;
- unsigned last_pushed;
-
- /* push local updates */
- ts = peer_stksess_lookup(st);
- if (!ts) {
+ MT_LIST_FOR_EACH_ENTRY_LOCKED(ts, &st->last->upd, upd, back) {
+ if (ts == st->end) {
ret = 1; // done
break;
}
- last_pushed = ts->upd.key;
- if (p->srv->shard && ts->shard != p->srv->shard) {
- /* Skip this entry */
- st->last_pushed = last_pushed;
- new_pushed = 1;
+ if (!(p->flags & PEER_F_TEACH_PROCESS) && ts->updt_type != STKSESS_UPDT_LOCAL)
continue;
+ else if (ts->updt_type != STKSESS_UPDT_LOCAL && ts->updt_type != STKSESS_UPDT_REMOTE)
+ continue;
+
+ if (st != p->last_local_table) {
+ ret = peer_send_switchmsg(st, appctx);
+ if (ret <= 0)
+ break;
+
+ p->last_local_table = st;
}
- HA_ATOMIC_INC(&ts->ref_cnt);
- HA_RWLOCK_RDUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
+ if (!_HA_ATOMIC_LOAD(&ts->seen))
+ _HA_ATOMIC_STORE(&ts->seen, 1);
- ret = peer_send_updatemsg(st, appctx, ts, st->update_id, new_pushed, use_timed);
+ if (p->srv->shard && ts->shard != p->srv->shard)
+ continue;
- if (HA_RWLOCK_TRYRDLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock) != 0) {
- if (failed_once) {
- /* we've already faced contention twice in this
- * loop, this is getting serious, do not insist
- * anymore and come back later
- */
- HA_ATOMIC_DEC(&ts->ref_cnt);
- applet_have_more_data(appctx);
- ret = -1;
- goto out_unlocked;
- }
- /* OK contention happens, for this one we'll wait on the
- * lock, but only once.
- */
- failed_once++;
- HA_RWLOCK_RDLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
+ if (updates_sent >= peers_max_updates_at_once) {
+ applet_have_more_data(appctx);
+ ret = -1;
+ }
+ else {
+ HA_ATOMIC_INC(&ts->ref_cnt);
+ ret = peer_send_updatemsg(st, appctx, ts, st->update_id, new_pushed, use_timed);
+ HA_ATOMIC_DEC(&ts->ref_cnt);
}
- HA_ATOMIC_DEC(&ts->ref_cnt);
- if (ret <= 0)
+ if (ret <= 0) {
+ /* Insert <last> marker before <ts> to process it again
+ * on next iteration. <ts> remains locked
+ *
+ * Before:
+ * <back>: <A> <==============> <C>
+ *
+ * <last> <==> ... <A> <==> x x <==> <C> ...
+ *
+ * x <==> TS <==> x
+ *
+ * <back>.prev == <A> and <back>.prev->next == MT_LIST_BUSY
+ *
+ * Step 1: detach <last> from the list
+ * Step 2: insert <last> after <A>
+ * - <last>.prev = <A> (or <back>.prev)
+ * - <last>.next = MT_LIST_BUSY (or <back>.prev->next)
+ * Step 3: move <A> before <last>
+ * - <A>.next = <last> (or back.prev->next = <last>)
+ *
+ * Step 4: update <back>.prev to point on <last>
+ *
+ * After:
+ * <back>: <last> <==============> <C>
+ *
+ * ... <A> <==> <last> <==> x x <==> <C> ...
+ *
+ * x <==> TS <==> x
+ *
+ */
+ MT_LIST_DELETE(&st->last->upd);
+ st->last->upd.next = back.prev->next; /* == MT_LIST_BUSY */
+ __ha_barrier_atomic_store();
+ st->last->upd.prev = back.prev;
+ back.prev->next = &st->last->upd;
+ back.prev = &st->last->upd;
break;
+ }
- st->last_pushed = last_pushed;
p->last.table = st;
p->last.id = st->update_id;
st->update_id++;
/* identifier may not needed in next update message */
new_pushed = 0;
-
updates_sent++;
- if (updates_sent >= peers_max_updates_at_once) {
- applet_have_more_data(appctx);
- ret = -1;
- break;
- }
}
- out:
- HA_RWLOCK_RDUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
- out_unlocked:
+ if (ret == 1) {
+ MT_LIST_DELETE(&st->last->upd);
+ MT_LIST_APPEND(&st->end->upd, &st->last->upd);
+ MT_LIST_DELETE(&st->end->upd);
+ }
+
TRACE_LEAVE(PEERS_EV_SESS_IO, appctx, p, st);
return ret;
}
-/*
- * Function to emit update messages for <st> stick-table when a lesson must
- * be taught to the peer <p> (learn state is not PEER_LR_ST_NOTASSIGNED).
- *
- * Note that <st> shared stick-table is locked when calling this function, and
- * the lock is dropped then re-acquired.
- *
- * Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
- * Returns -1 if there was not enough room left to send the message,
- * any other negative returned value must be considered as an error with an appcxt st0
- * returned value equal to PEER_SESS_ST_END.
- */
-static inline int peer_send_teach_process_msgs(struct appctx *appctx, struct peer *p,
- struct shared_table *st)
-{
- TRACE_PROTO("send teach process messages", PEERS_EV_SESS_IO, appctx, p, st);
- return peer_send_teachmsgs(appctx, p, peer_teach_process_stksess_lookup, st);
-}
-
-/*
- * Function to emit update messages for <st> stick-table when a lesson must
- * be taught to the peer <p> during teach state 1 step. It must be called with
- * the stick-table lock released.
- *
- * Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
- * Returns -1 if there was not enough room left to send the message,
- * any other negative returned value must be considered as an error with an appcxt st0
- * returned value equal to PEER_SESS_ST_END.
- */
-static inline int peer_send_teach_stage1_msgs(struct appctx *appctx, struct peer *p,
- struct shared_table *st)
-{
- TRACE_PROTO("send teach stage1 messages", PEERS_EV_SESS_IO, appctx, p, st);
- return peer_send_teachmsgs(appctx, p, peer_teach_stage1_stksess_lookup, st);
-}
-
-/*
- * Function to emit update messages for <st> stick-table when a lesson must
- * be taught to the peer <p> during teach state 1 step. It must be called with
- * the stick-table lock released.
- *
- * Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
- * Returns -1 if there was not enough room left to send the message,
- * any other negative returned value must be considered as an error with an appcxt st0
- * returned value equal to PEER_SESS_ST_END.
- */
-static inline int peer_send_teach_stage2_msgs(struct appctx *appctx, struct peer *p,
- struct shared_table *st)
-{
- TRACE_PROTO("send teach stage2 messages", PEERS_EV_SESS_IO, appctx, p, st);
- return peer_send_teachmsgs(appctx, p, peer_teach_stage2_stksess_lookup, st);
-}
-
-
/*
* Function used to parse a stick-table update message after it has been received
* by <p> peer with <msg_cur> as address of the pointer to the position in the
TRACE_PROTO("Resync request message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
/* prepare tables for a global push */
for (st = peer->tables; st; st = st->next) {
- st->teaching_origin = st->last_pushed;
+ MT_LIST_DELETE(&st->last->upd);
+ MT_LIST_INSERT(&st->table->updates, &st->last->upd);
+ MT_LIST_DELETE(&st->end->upd);
+ MT_LIST_APPEND(&st->table->updates, &st->end->upd);
st->flags = 0;
}
return 0;
}
peer->flags |= PEER_F_SYNCHED;
- for (st = peer->tables; st; st = st->next) {
- st->last_pushed = st->teaching_origin;
- st->flags = 0;
- }
+ for (st = peer->tables; st; st = st->next)
+ MT_LIST_DELETE(&st->end->upd);
/* reset teaching flags to 0 */
peer->flags &= ~PEER_TEACH_FLAGS;
st->last_acked = st->last_get;
}
- if (!(peer->flags & PEER_F_TEACH_PROCESS)) {
- int must_send;
-
- if (HA_RWLOCK_TRYRDLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock)) {
- applet_have_more_data(appctx);
- repl = -1;
+ if ((!(peer->flags & PEER_F_TEACH_PROCESS) && (peer->learnstate == PEER_LR_ST_NOTASSIGNED)) ||
+ (peer->flags & (PEER_F_TEACH_PROCESS|PEER_F_TEACH_FINISHED)) == PEER_F_TEACH_PROCESS) {
+ TRACE_PROTO("send teach messages", PEERS_EV_SESS_IO, appctx, peer, st);
+ repl = peer_send_teachmsgs(appctx, peer, st);
+ if (repl <= 0) {
+ peer->stop_local_table = peer->last_local_table;
goto end;
}
- must_send = (peer->learnstate == PEER_LR_ST_NOTASSIGNED) && (st->last_pushed != st->table->localupdate);
- HA_RWLOCK_RDUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
-
- if (must_send) {
- repl = peer_send_teach_process_msgs(appctx, peer, st);
- if (repl <= 0) {
- peer->stop_local_table = peer->last_local_table;
- goto end;
- }
- }
- }
- else if (!(peer->flags & PEER_F_TEACH_FINISHED)) {
- if (!(st->flags & SHTABLE_F_TEACH_STAGE1)) {
- repl = peer_send_teach_stage1_msgs(appctx, peer, st);
- if (repl <= 0) {
- peer->stop_local_table = peer->last_local_table;
- goto end;
- }
- }
-
- if (!(st->flags & SHTABLE_F_TEACH_STAGE2)) {
- repl = peer_send_teach_stage2_msgs(appctx, peer, st);
- if (repl <= 0) {
- peer->stop_local_table = peer->last_local_table;
- goto end;
- }
- }
}
if (st == last_local_table) {
TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_SESS_NEW, NULL, peer);
peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
+ if (peer->local && !(appctx_is_back(peer->appctx))) {
+ /* If the local peer has established the connection (appctx is
+ * on the frontend side), flag it to start to teach lesson.
+ */
+ peer->flags |= PEER_F_TEACH_PROCESS;
+ peer->flags &= ~PEER_F_SYNCHED;
+ TRACE_STATE("peer elected to teach lesson to lacal peer", PEERS_EV_SESS_NEW, NULL, peer);
+ }
+
/* Init cursors */
for (st = peer->tables; st ; st = st->next) {
st->last_get = st->last_acked = 0;
-
- HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
- /* if st->last_pushed appears to be in future it means
- * that the last update is very old and we
- * remain unconnected a too long time to use this
- * acknowledgement as a reset.
- * We should update the protocol to be able to
- * signal the remote peer that it needs a full resync.
- * Here a partial fix consist to set st->last_pushed at
- * the max past value.
- */
- if (!(peer->flags & PEER_F_SYNCHED) || (int)(st->table->localupdate - st->last_pushed) < 0) {
- st->last_pushed = st->table->localupdate + (2147483648U);
- peer->flags &= ~PEER_F_SYNCHED;
- }
- st->teaching_origin = st->last_pushed;
st->flags = 0;
- HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
+ if (!MT_LIST_INLIST(&st->last->upd) || !(peer->flags & PEER_F_SYNCHED)) {
+ MT_LIST_DELETE(&st->last->upd);
+ MT_LIST_INSERT(&st->table->updates, &st->last->upd);
+ }
+ MT_LIST_DELETE(&st->end->upd);
}
/* Awake main task to ack the new peer state */
/* reset teaching flags to 0 */
peer->flags &= ~PEER_TEACH_FLAGS;
- if (peer->local && !(appctx_is_back(peer->appctx))) {
- /* If the local peer has established the connection (appctx is
- * on the frontend side), flag it to start to teach lesson.
- */
- peer->flags |= PEER_F_TEACH_PROCESS;
- peer->flags &= ~PEER_F_SYNCHED;
- TRACE_STATE("peer elected to teach lesson to lacal peer", PEERS_EV_SESS_NEW, NULL, peer);
- }
-
/* Mark the peer as starting and wait the sync task */
peer->flags |= PEER_F_WAIT_SYNCTASK_ACK;
peer->appstate = PEER_APP_ST_STARTING;
/* Awake session if there is data to push */
for (st = peer->tables; st ; st = st->next) {
- if (st->last_pushed != st->table->localupdate) {
+ struct stksess *ts;
+ struct mt_list back;
+
+ // TODO: may be handled via an atomic flags ?
+ MT_LIST_FOR_EACH_ENTRY_LOCKED(ts, &st->last->upd, upd, back) {
+ if (&ts->upd == &st->table->updates)
+ break;
+ if (&ts->upd != &st->table->updates && ts->updt_type == STKSESS_UPDT_LOCAL) {
+ update_to_push = 1;
+ break;
+ }
+ }
+ if (update_to_push == 1) {
/* wake up the peer handler to push local updates */
- update_to_push = 1;
/* There is no need to send a heartbeat message
* when some updates must be pushed. The remote
* peer will consider <peer> peer as alive when it will
/* current peer connection is active and established
* wake up all peer handlers to push remaining local updates */
for (st = peer->tables; st ; st = st->next) {
- if (st->last_pushed != st->table->localupdate) {
+ struct stksess *ts;
+ struct mt_list back;
+ int wakeup = 0;
+
+ MT_LIST_FOR_EACH_ENTRY_LOCKED(ts, &st->last->upd, upd, back) {
+ if (&ts->upd != &st->table->updates && ts->updt_type == STKSESS_UPDT_LOCAL) {
+ wakeup = 1;
+ break;
+ }
+ }
+ if (wakeup) {
appctx_wakeup(peer->appctx);
break;
}
id = curpeer->tables->local_id;
st->local_id = id + 1;
+ st->last = calloc(1, sizeof(*st->last));
+ st->last->updt_type = STKSESS_UPDT_MARKER;
+ MT_LIST_INIT(&st->last->upd);
+
+ st->end = calloc(1, sizeof(*st->end));
+ st->end->updt_type = STKSESS_UPDT_MARKER;
+ MT_LIST_INIT(&st->end->upd);
+
/* If peer is local we inc table
* refcnt to protect against flush
* until this process pushed all
"flags=0x%x remote_data=0x%llx",
st, st->local_id, st->remote_id,
st->flags, (unsigned long long)st->remote_data);
- chunk_appendf(&trash, "\n last_acked=%u last_pushed=%u last_get=%u"
- " teaching_origin=%u",
- st->last_acked, st->last_pushed, st->last_get, st->teaching_origin);
- chunk_appendf(&trash, "\n table:%p id=%s update=%u localupdate=%u refcnt=%u",
- t, t->id, t->update, t->localupdate, t->refcnt);
+ chunk_appendf(&trash, "\n last_acked=%u last_get=%u",
+ st->last_acked, st->last_get);
+ chunk_appendf(&trash, "\n table:%p id=%s refcnt=%u",
+ t, t->id, t->refcnt);
if (flags & PEERS_SHOW_F_DICT) {
chunk_appendf(&trash, "\n TX dictionary cache:");
count = 0;
*/
int __stksess_kill(struct stktable *t, struct stksess *ts)
{
- int updt_locked = 0;
+ struct mt_list link;
if (HA_ATOMIC_LOAD(&ts->ref_cnt))
return 0;
return 0;
/* ... and that we didn't leave the update list for the tree */
- if (ts->upd.node.leaf_p) {
- updt_locked = 1;
- HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
- if (HA_ATOMIC_LOAD(&ts->ref_cnt))
- goto out_unlock;
+ if (MT_LIST_INLIST(&ts->upd)) {
+ link = mt_list_lock_full(&ts->upd);
+ if (HA_ATOMIC_LOAD(&ts->ref_cnt)) {
+ mt_list_unlock_full(&ts->upd, link);
+ goto out;
+ }
+ mt_list_unlock_link(link);
+ mt_list_unlock_self(&ts->upd);
}
+
eb32_delete(&ts->exp);
- eb32_delete(&ts->upd);
ebmb_delete(&ts->key);
__stksess_free(t, ts);
- out_unlock:
- if (updt_locked)
- HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
+ out:
return 1;
}
ts->seen = 0;
ts->key.node.leaf_p = NULL;
ts->exp.node.leaf_p = NULL;
- ts->upd.node.leaf_p = NULL;
ts->updt_type = STKSESS_UPDT_NONE;
+ MT_LIST_INIT(&ts->upd);
MT_LIST_INIT(&ts->pend_updts);
ts->expire = tick_add(now_ms, MS_TO_TICKS(t->expire));
HA_RWLOCK_INIT(&ts->lock);
{
struct stksess *ts;
struct eb32_node *eb;
+ struct mt_list link;
int max_search; // no more than 50% misses
int max_per_shard;
int done_per_shard;
int batched = 0;
int to_batch;
- int updt_locked;
int failed_once = 0;
int looped;
int shard;
do {
done_per_shard = 0;
looped = 0;
- updt_locked = 0;
if (HA_RWLOCK_TRYWRLOCK(STK_TABLE_LOCK, &t->shards[shard].sh_lock) != 0) {
if (batched)
continue;
}
+ if (HA_ATOMIC_LOAD(&ts->ref_cnt))
+ goto requeue;
+
/* if the entry is in the update list, we must be extremely careful
- * because peers can see it at any moment and start to use it. Peers
- * will take the table's updt_lock for reading when doing that, and
- * with that lock held, will grab a ref_cnt before releasing the
- * lock. So we must take this lock as well and check the ref_cnt.
+ * because peers can see it at any moment and start to use it. In this case,
+ * Peers will lock the element. So to the same here to avoid any conflict
*/
- if (!updt_locked) {
- updt_locked = 1;
- HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
+ MT_LIST_DELETE(&ts->pend_updts);
+
+ if (MT_LIST_INLIST(&ts->upd)) {
+ link = mt_list_lock_full(&ts->upd);
+ if (HA_ATOMIC_LOAD(&ts->ref_cnt)) {
+ mt_list_unlock_full(&ts->upd, link);
+ goto requeue;
+ }
+ mt_list_unlock_link(link);
+ mt_list_unlock_self(&ts->upd);
}
- /* now we're locked, new peers can't grab it anymore,
- * existing ones already have the ref_cnt.
- */
- if (HA_ATOMIC_LOAD(&ts->ref_cnt))
- goto requeue;
+
/* session expired, trash it */
ebmb_delete(&ts->key);
- MT_LIST_DELETE(&ts->pend_updts);
- eb32_delete(&ts->upd);
__stksess_free(t, ts);
batched++;
done_per_shard++;
break;
}
- if (updt_locked)
- HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
-
HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &t->shards[shard].sh_lock);
/*
}
/* Update the expiration timer for <ts> but do not touch its expiration node.
- * The table's expiration timer is updated if set.
- * The node will be also inserted into the update tree if needed, at a position
- * depending if the update is a local or coming from a remote node.
- * If <decrefcnt> is set, the ts entry's ref_cnt will be decremented. The table's
- * updt_lock may be taken for writes.
+ * The table's expiration timer is updated if set. <ts> will also be inserted
+ * into the pending update list to be added later in the update list. If
+ * <decrefcnt> is set, the <ts> entry's ref_cnt will be decremented.
*/
void stktable_touch_with_exp(struct stktable *t, struct stksess *ts, int local, int expire, int decrefcnt)
{
/* Check if this entry is not in the tree or not
* scheduled for at least one peer.
*/
- if (!ts->upd.node.leaf_p || _HA_ATOMIC_LOAD(&ts->seen)) {
+ if (!MT_LIST_INLIST(&ts->upd) || _HA_ATOMIC_LOAD(&ts->seen)) {
_HA_ATOMIC_STORE(&ts->updt_type, STKSESS_UPDT_LOCAL);
did_append = MT_LIST_TRY_APPEND(&t->pend_updts[tgid - 1], &ts->pend_updts);
}
}
else {
- if (!ts->upd.node.leaf_p) {
+ if (!MT_LIST_INLIST(&ts->upd)) {
_HA_ATOMIC_STORE(&ts->updt_type, STKSESS_UPDT_REMOTE);
did_append = MT_LIST_TRY_APPEND(&t->pend_updts[tgid - 1], &ts->pend_updts);
}
struct task *stktable_add_pend_updates(struct task *t, void *ctx, unsigned int state)
{
struct stktable *table = ctx;
- struct eb32_node *eb;
- int i = 0, is_local, cur_tgid = tgid - 1, empty_tgid = 0;
-
- /* we really don't want to wait on this one */
- if (HA_RWLOCK_TRYWRLOCK(STK_TABLE_LOCK, &table->updt_lock) != 0)
- goto leave;
+ int i = 0, cur_tgid = tgid - 1, empty_tgid = 0;
for (i = 0; i < STKTABLE_MAX_UPDATES_AT_ONCE; i++) {
struct stksess *stksess = MT_LIST_POP_LOCKED(&table->pend_updts[cur_tgid], typeof(stksess), pend_updts);
empty_tgid = 0;
if (cur_tgid == global.nbtgroups)
cur_tgid = 0;
- is_local = (stksess->updt_type == STKSESS_UPDT_LOCAL);
stksess->seen = 0;
- if (is_local) {
- stksess->upd.key = ++table->update;
- table->localupdate = table->update;
- eb32_delete(&stksess->upd);
- } else {
- stksess->upd.key = (++table->update) + (2147483648U);
- }
+ MT_LIST_DELETE(&stksess->upd);
+ MT_LIST_APPEND(&table->updates, &stksess->upd);
- /* even though very unlikely, it seldom happens that the entry
- * is already in the tree (both for local and remote ones). We
- * must dequeue it and requeue it at its new position (e.g. it
- * might already have been seen by some peers).
- */
- eb32_delete(&stksess->upd);
- eb = eb32_insert(&table->updates, &stksess->upd);
- if (eb != &stksess->upd) {
- eb32_delete(eb);
- eb32_insert(&table->updates, &stksess->upd);
- }
/*
* Now that we're done inserting the stksess, unlock it.
* It is kept locked here to prevent a race condition
MT_LIST_INIT(&stksess->pend_updts);
}
- HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &table->updt_lock);
-
-leave:
/* There's more to do, let's schedule another session */
if (empty_tgid < global.nbtgroups)
tasklet_wakeup(table->updt_task);
struct stktable *t;
struct stksess *ts;
struct eb32_node *table_eb, *eb;
- int updt_locked;
+ struct mt_list link;
int to_visit;
int task_exp;
int shard;
unsigned int next_exp_table = TICK_ETERNITY;
t = eb32_entry(table_eb, struct stktable, shards[shard].in_bucket);
- updt_locked = 0;
if (tick_is_lt(now_ms, table_eb->key)) {
/*
break;
}
- /* Let's quit earlier if we currently hold the update lock */
- to_visit -= 1 + 3 * updt_locked;
+ to_visit--;
/* timer looks expired, detach it from the queue */
ts = eb32_entry(eb, struct stksess, exp);
continue;
}
+ if (HA_ATOMIC_LOAD(&ts->ref_cnt))
+ goto requeue;
+
/* if the entry is in the update list, we must be extremely careful
- * because peers can see it at any moment and start to use it. Peers
- * will take the table's updt_lock for reading when doing that, and
- * with that lock held, will grab a ref_cnt before releasing the
- * lock. So we must take this lock as well and check the ref_cnt.
+ * because peers can see it at any moment and start to use it. In this case,
+ * Peers will lock the element. So to the same here to avoid any conflict
*/
- if (!updt_locked) {
- updt_locked = 1;
- HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
+ MT_LIST_DELETE(&ts->pend_updts);
+ if (MT_LIST_INLIST(&ts->upd)) {
+ link = mt_list_lock_full(&ts->upd);
+ if (HA_ATOMIC_LOAD(&ts->ref_cnt)) {
+ mt_list_unlock_full(&ts->upd, link);
+ goto requeue;
+ }
+ mt_list_unlock_link(link);
+ mt_list_unlock_self(&ts->upd);
}
- /* now we're locked, new peers can't grab it anymore,
- * existing ones already have the ref_cnt.
- */
- if (HA_ATOMIC_LOAD(&ts->ref_cnt))
- goto requeue;
/* session expired, trash it */
ebmb_delete(&ts->key);
- MT_LIST_DELETE(&ts->pend_updts);
- eb32_delete(&ts->upd);
__stksess_free(t, ts);
}
- if (updt_locked)
- HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
-
/*
* Now find the first element, so that we can reposition
* the table in the shard tree.
if (!tick_isset(task_exp) || (tick_isset(next_exp_table) && tick_is_lt(next_exp_table, task_exp)))
task_exp = next_exp_table;
+
HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &t->shards[shard].sh_lock);
tmpnode = eb32_next(table_eb);
MT_LIST_INIT(&t->shards[shard].in_bucket_toadd);
}
- t->updates = EB_ROOT_UNIQUE;
+ MT_LIST_INIT(&t->updates);
t->pool = create_pool("sticktables", sizeof(struct stksess) + round_ptr_size(t->data_size) + t->key_size, MEM_F_SHARED);