flush_dcache(peer);
/* Re-init current table pointers to force announcement on re-connect */
- peer->remote_table = peer->last_local_table = NULL;
+ peer->remote_table = peer->last_local_table = peer->stop_local_table = NULL;
peer->appctx = NULL;
if (peer->flags & PEER_F_LEARN_ASSIGN) {
/* unassign current peer for learning */
* Returns 1 if succeeded, or -1 or 0 if failed.
* -1 means an internal error occurred, 0 is for a peer protocol error leading
* to a peer state change (from the peer I/O handler point of view).
+ *
+ * - peer->last_local_table is the last table for which we send an update
+ * messages.
+ *
+ * - peer->stop_local_table is the last evaluated table. It is unset when the
+ * teaching process starts. But we use it as a
+ * restart point when the loop is interrupted. It is
+ * especially useful whe the number of tables exceeds
+ * peers_max_updates_at_once value.
+ *
+ * When a teaching lopp is started, the peer's last_local_table is saved in a
+ * local variable. This variable is used as a finish point. When the crrent
+ * table is equal to it, it means all tables were evaluated, all updates where
+ * sent and the teaching process is finished.
+ *
+ * peer->stop_local_table is always NULL when the teaching process begins. It is
+ * only reset at the end. In the mean time, it always point on a table.
*/
+
static inline int peer_send_msgs(struct appctx *appctx,
struct peer *peer, struct peers *peers)
{
if (peer->tables) {
struct shared_table *st;
struct shared_table *last_local_table;
- int updates_sent = 0;
+ int updates = 0;
last_local_table = peer->last_local_table;
if (!last_local_table)
last_local_table = peer->tables;
- st = last_local_table->next;
+ if (!peer->stop_local_table)
+ peer->stop_local_table = last_local_table;
+ st = peer->stop_local_table->next;
while (1) {
if (!st)
st = peer->tables;
-
/* It remains some updates to ack */
if (st->last_get != st->last_acked) {
repl = peer_send_ackmsg(st, appctx);
repl = peer_send_teach_process_msgs(appctx, peer, st);
if (repl <= 0) {
HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &st->table->lock);
+ peer->stop_local_table = peer->last_local_table;
return repl;
}
}
else if (!(peer->flags & PEER_F_TEACH_FINISHED)) {
if (!(st->flags & SHTABLE_F_TEACH_STAGE1)) {
repl = peer_send_teach_stage1_msgs(appctx, peer, st);
- if (repl <= 0)
+ if (repl <= 0) {
+ peer->stop_local_table = peer->last_local_table;
return repl;
+ }
}
if (!(st->flags & SHTABLE_F_TEACH_STAGE2)) {
repl = peer_send_teach_stage2_msgs(appctx, peer, st);
- if (repl <= 0)
+ if (repl <= 0) {
+ peer->stop_local_table = peer->last_local_table;
return repl;
+ }
}
}
- if (st == last_local_table)
+ if (st == last_local_table) {
+ peer->stop_local_table = NULL;
break;
- st = st->next;
+ }
- updates_sent++;
- if (updates_sent >= peers_max_updates_at_once) {
+ /* This one is to be sure to restart from <st->next> if we are interrupted
+ * because of peer_send_teach_stage2_msgs or because buffer is full
+ * when sedning an ackmsg. In both cases current <st> was evaluated and
+ * we must restart from <st->next>
+ */
+ peer->stop_local_table = st;
+
+ updates++;
+ if (updates >= peers_max_updates_at_once) {
/* pretend we're full so that we get back ASAP */
struct stconn *sc = appctx_sc(appctx);
sc_need_room(sc, 0);
return -1;
}
+
+ st = st->next;
}
}