int ret, new_pushed, use_timed;
int updates_sent = 0;
struct stksess *ts = NULL;
- struct mt_list back;
TRACE_ENTER(PEERS_EV_SESS_IO, appctx, p, st);
/* We force new pushed to 1 to force identifier in update message */
new_pushed = 1;
- MT_LIST_FOR_EACH_ENTRY_LOCKED(ts, &st->last->upd, upd, back) {
- if (ts == st->end) {
- ret = 1; // done
+ while (1) {
+ struct mt_list el1, el2;
+
+ if (st->last->upd.next == &st->end->upd || st->last->upd.next == &st->table->updates)
+ break;
+
+ if (buffer_almost_full(&appctx->outbuf)) {
+ applet_have_more_data(appctx);
+ ret = -1;
break;
}
- if (!(p->flags & PEER_F_TEACH_PROCESS) && ts->updt_type != STKSESS_UPDT_LOCAL)
- continue;
- else if (ts->updt_type != STKSESS_UPDT_LOCAL && ts->updt_type != STKSESS_UPDT_REMOTE)
+ if (updates_sent >= peers_max_updates_at_once) {
+ applet_have_more_data(appctx);
+ ret = -1;
+ break;
+ }
+
+ el1 = mt_list_try_lock_full(&st->last->upd);
+ if (el1.prev == NULL) {
+ applet_have_more_data(appctx);
+ ret = -1;
+ break;
+ }
+ el2 = mt_list_lock_next(el1.next);
+ /* el2 = mt_list_try_lock_next(el1.next); */
+ /* if (el2.next == NULL) { */
+ /* mt_list_unlock_full(&st->last->upd, el1); */
+ /* applet_have_more_data(appctx); */
+ /* ret = -1; */
+ /* break; */
+ /* } */
+
+ ts = MT_LIST_ELEM(el1.next, typeof(ts), upd);
+ HA_ATOMIC_INC(&ts->ref_cnt);
+
+ mt_list_unlock_full(&st->last->upd, el2);
+ mt_list_unlock_link(el1);
+
+ if ((!(p->flags & PEER_F_TEACH_PROCESS) && ts->updt_type != STKSESS_UPDT_LOCAL) ||
+ (ts->updt_type != STKSESS_UPDT_LOCAL && ts->updt_type != STKSESS_UPDT_REMOTE) ||
+ (p->srv->shard && ts->shard != p->srv->shard) ||
+ ts->updt_type == STKSESS_UPDT_MARKER) {
+ HA_ATOMIC_DEC(&ts->ref_cnt);
continue;
+ }
if (st != p->last_local_table) {
ret = peer_send_switchmsg(st, appctx);
- if (ret <= 0)
+ if (ret <= 0) {
+ HA_ATOMIC_DEC(&ts->ref_cnt);
break;
-
+ }
p->last_local_table = st;
}
if (!_HA_ATOMIC_LOAD(&ts->seen))
_HA_ATOMIC_STORE(&ts->seen, 1);
- if (p->srv->shard && ts->shard != p->srv->shard)
- continue;
-
- if (updates_sent >= peers_max_updates_at_once) {
- applet_have_more_data(appctx);
- ret = -1;
- }
- else {
- HA_ATOMIC_INC(&ts->ref_cnt);
- ret = peer_send_updatemsg(st, appctx, ts, st->update_id, new_pushed, use_timed);
- HA_ATOMIC_DEC(&ts->ref_cnt);
- }
-
- if (ret <= 0) {
- /* Insert <last> marker before <ts> to process it again
- * on next iteration. <ts> remains locked
- *
- * Before:
- * <back>: <A> <==============> <C>
- *
- * <last> <==> ... <A> <==> x x <==> <C> ...
- *
- * x <==> TS <==> x
- *
- * <back>.prev == <A> and <back>.prev->next == MT_LIST_BUSY
- *
- * Step 1: detach <last> from the list
- * Step 2: insert <last> after <A>
- * - <last>.prev = <A> (or <back>.prev)
- * - <last>.next = MT_LIST_BUSY (or <back>.prev->next)
- * Step 3: move <A> before <last>
- * - <A>.next = <last> (or back.prev->next = <last>)
- *
- * Step 4: update <back>.prev to point on <last>
- *
- * After:
- * <back>: <last> <==============> <C>
- *
- * ... <A> <==> <last> <==> x x <==> <C> ...
- *
- * x <==> TS <==> x
- *
- */
- MT_LIST_DELETE(&st->last->upd);
- st->last->upd.next = back.prev->next; /* == MT_LIST_BUSY */
- __ha_barrier_atomic_store();
- st->last->upd.prev = back.prev;
- back.prev->next = &st->last->upd;
- back.prev = &st->last->upd;
+ ret = peer_send_updatemsg(st, appctx, ts, st->update_id, new_pushed, use_timed);
+ HA_ATOMIC_DEC(&ts->ref_cnt);
+ if (ret <= 0)
break;
- }
p->last.table = st;
p->last.id = st->update_id;
updates_sent++;
}
- if (ret == 1) {
- MT_LIST_DELETE(&st->last->upd);
- MT_LIST_APPEND(&st->end->upd, &st->last->upd);
+ if (ret == 1)
MT_LIST_DELETE(&st->end->upd);
- }
-
+ end:
TRACE_LEAVE(PEERS_EV_SESS_IO, appctx, p, st);
return ret;
}