/* peer session identified */
if (peer) {
+ SPIN_LOCK(PEER_LOCK, &peer->lock);
if (peer->appctx == appctx) {
/* Re-init current table pointers to force announcement on re-connect */
peer->remote_table = peer->last_local_table = NULL;
peer->flags &= PEER_TEACH_RESET;
peer->flags &= PEER_LEARN_RESET;
}
+ SPIN_UNLOCK(PEER_LOCK, &peer->lock);
task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
}
}
struct stream_interface *si = appctx->owner;
struct stream *s = si_strm(si);
struct peers *curpeers = strm_fe(s)->parent;
+ struct peer *curpeer = NULL;
int reql = 0;
int repl = 0;
size_t proto_len = strlen(PEER_SESSION_PROTO_NAME);
appctx->st0 = PEER_SESS_ST_GETPEER;
/* fall through */
case PEER_SESS_ST_GETPEER: {
- struct peer *curpeer;
char *p;
reql = co_getline(si_oc(si), trash.str, trash.size);
if (reql <= 0) { /* closed or EOL not found */
goto switchstate;
}
+ SPIN_LOCK(PEER_LOCK, &curpeer->lock);
if (curpeer->appctx && curpeer->appctx != appctx) {
if (curpeer->local) {
/* Local connection, reply a retry */
appctx->st1 = PEER_SESS_SC_TRYAGAIN;
goto switchstate;
}
+
+ /* we're killing a connection, we must apply a random delay before
+ * retrying otherwise the other end will do the same and we can loop
+ * for a while.
+ */
+ curpeer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + random() % 2000));
peer_session_forceshutdown(curpeer->appctx);
}
if (maj_ver != (unsigned int)-1 && min_ver != (unsigned int)-1) {
/* fall through */
}
case PEER_SESS_ST_SENDSUCCESS: {
- struct peer *curpeer = appctx->ctx.peers.ptr;
struct shared_table *st;
+ if (!curpeer) {
+ curpeer = appctx->ctx.peers.ptr;
+ SPIN_LOCK(PEER_LOCK, &curpeer->lock);
+ if (curpeer->appctx != appctx) {
+ appctx->st0 = PEER_SESS_ST_END;
+ goto switchstate;
+ }
+ }
repl = snprintf(trash.str, trash.size, "%d\n", PEER_SESS_SC_SUCCESSCODE);
repl = ci_putblk(si_ic(si), trash.str, repl);
if (repl <= 0) {
goto switchstate;
}
case PEER_SESS_ST_CONNECT: {
- struct peer *curpeer = appctx->ctx.peers.ptr;
+
+ if (!curpeer) {
+ curpeer = appctx->ctx.peers.ptr;
+ SPIN_LOCK(PEER_LOCK, &curpeer->lock);
+ if (curpeer->appctx != appctx) {
+ appctx->st0 = PEER_SESS_ST_END;
+ goto switchstate;
+ }
+ }
/* Send headers */
repl = snprintf(trash.str, trash.size,
/* fall through */
}
case PEER_SESS_ST_GETSTATUS: {
- struct peer *curpeer = appctx->ctx.peers.ptr;
struct shared_table *st;
+ if (!curpeer) {
+ curpeer = appctx->ctx.peers.ptr;
+ SPIN_LOCK(PEER_LOCK, &curpeer->lock);
+ if (curpeer->appctx != appctx) {
+ appctx->st0 = PEER_SESS_ST_END;
+ goto switchstate;
+ }
+ }
+
if (si_ic(si)->flags & CF_WRITE_PARTIAL)
curpeer->statuscode = PEER_SESS_SC_CONNECTEDCODE;
/* fall through */
}
case PEER_SESS_ST_WAITMSG: {
- struct peer *curpeer = appctx->ctx.peers.ptr;
struct stksess *ts, *newts = NULL;
uint32_t msg_len = 0;
char *msg_cur = trash.str;
unsigned char msg_head[7];
int totl = 0;
+ if (!curpeer) {
+ curpeer = appctx->ctx.peers.ptr;
+ SPIN_LOCK(PEER_LOCK, &curpeer->lock);
+ if (curpeer->appctx != appctx) {
+ appctx->st0 = PEER_SESS_ST_END;
+ goto switchstate;
+ }
+ }
+
reql = co_getblk(si_oc(si), (char *)msg_head, 2*sizeof(unsigned char), totl);
if (reql <= 0) /* closed or EOL not found */
goto incomplete;
}
if (!(curpeer->flags & PEER_F_TEACH_PROCESS)) {
+ SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
if (!(curpeer->flags & PEER_F_LEARN_ASSIGN) &&
((int)(st->last_pushed - st->table->localupdate) < 0)) {
struct eb32_node *eb;
/* We force new pushed to 1 to force identifier in update message */
new_pushed = 1;
- SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
while (1) {
uint32_t msglen;
struct stksess *ts;
/* identifier may not needed in next update message */
new_pushed = 0;
}
- SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
}
+ SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
}
else {
if (!(st->flags & SHTABLE_F_TEACH_STAGE1)) {
/* fall through */
}
case PEER_SESS_ST_END: {
+ if (curpeer) {
+ SPIN_UNLOCK(PEER_LOCK, &curpeer->lock);
+ curpeer = NULL;
+ }
si_shutw(si);
si_shutr(si);
si_ic(si)->flags |= CF_READ_NULL;
}
out:
si_oc(si)->flags |= CF_READ_DONTWAIT;
+
+ if (curpeer)
+ SPIN_UNLOCK(PEER_LOCK, &curpeer->lock);
return;
full:
si_applet_cant_put(si);
*/
static void peer_session_forceshutdown(struct appctx *appctx)
{
- struct peer *ps;
-
/* Note that the peer sessions which have just been created
* (->st0 == PEER_SESS_ST_CONNECT) must not
* be shutdown, if not, the TCP session will never be closed
if (appctx->applet != &peer_applet)
return;
- ps = appctx->ctx.peers.ptr;
- /* we're killing a connection, we must apply a random delay before
- * retrying otherwise the other end will do the same and we can loop
- * for a while.
- */
- if (ps)
- ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + random() % 2000));
-
appctx->st0 = PEER_SESS_ST_END;
- appctx->ctx.peers.ptr = NULL;
appctx_wakeup(appctx);
}
return NULL;
}
+ /* Acquire lock for all peers of the section */
+ for (ps = peers->remote; ps; ps = ps->next)
+ SPIN_LOCK(PEER_LOCK, &ps->lock);
+
if (!stopping) {
/* Normal case (not soft stop)*/
/* disconnect all connected peers */
for (ps = peers->remote; ps; ps = ps->next) {
+ /* we're killing a connection, we must apply a random delay before
+ * retrying otherwise the other end will do the same and we can loop
+ * for a while.
+ */
+ ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + random() % 2000));
if (ps->appctx) {
peer_session_forceshutdown(ps->appctx);
ps->appctx = NULL;
}
}
} /* stopping */
+
+ /* Release lock for all peers of the section */
+ for (ps = peers->remote; ps; ps = ps->next)
+ SPIN_UNLOCK(PEER_LOCK, &ps->lock);
+
/* Wakeup for re-connect */
return task;
}