struct shared_table *table; /* shared table */
struct peer *peer; /* current peer */
struct stream *stream; /* current transport stream */
+ struct appctx *appctx; /* the appctx running it */
unsigned int flags; /* peer session flags */
unsigned int statuscode; /* current/last session status code */
unsigned int update; /* current peer acked update */
if (ps) {
if (ps->stream == s) {
ps->stream = NULL;
+ ps->appctx = NULL;
if (ps->flags & PEER_F_LEARN_ASSIGN) {
/* unassign current peer for learning */
ps->flags &= ~(PEER_F_LEARN_ASSIGN);
peer_session_forceshutdown(ps->stream);
}
ps->stream = s;
+ ps->appctx = appctx;
break;
}
}
actconn++;
totalconn++;
+ ps->appctx = appctx;
+ ps->stream = s;
return s;
/* Error unrolling */
st->flags |= SHTABLE_F_RESYNC_ASSIGN;
/* awake peer stream task to handle a request of resync */
- task_wakeup(ps->stream->task, TASK_WOKEN_MSG);
+ appctx_wakeup(ps->appctx);
}
else if ((int)(ps->pushed - ps->table->table->localupdate) < 0) {
/* awake peer stream task to push local updates */
- task_wakeup(ps->stream->task, TASK_WOKEN_MSG);
+ appctx_wakeup(ps->appctx);
}
/* else do nothing */
} /* SUCCESSCODE */
if (ps->stream) {
peer_session_forceshutdown(ps->stream);
ps->stream = NULL;
+ ps->appctx = NULL;
}
}
}
* or during previous connect, peer replies a try again statuscode */
/* connect to the peer */
- ps->stream = peer_session_create(ps->peer, ps);
+ peer_session_create(ps->peer, ps);
}
else {
/* Other error cases */
(int)(ps->pushed - ps->table->table->localupdate) < 0) {
/* current stream active and established
awake stream to push remaining local updates */
- task_wakeup(ps->stream->task, TASK_WOKEN_MSG);
+ appctx_wakeup(ps->appctx);
}
} /* stopping */
/* Wakeup for re-connect */