*/
static inline int peer_getline(struct appctx *appctx)
{
- struct stconn *sc = appctx_sc(appctx);
int n;
- n = co_getline(sc_oc(sc), trash.area, trash.size);
- if (!n)
+ if (applet_get_inbuf(appctx) == NULL || !applet_input_data(appctx)) {
+ applet_need_more_data(appctx);
return 0;
+ }
+
+ n = applet_getline(appctx, trash.area, trash.size);
+ if (!n) {
+ applet_need_more_data(appctx);
+ return 0;
+ }
if (n < 0 || trash.area[n - 1] != '\n') {
appctx->st0 = PEER_SESS_ST_END;
else
trash.area[n - 1] = 0;
- co_skip(sc_oc(sc), n);
-
+ applet_skip_input(appctx, n);
return n;
}
uint32_t *msg_len, int *totl)
{
int reql;
- struct stconn *sc = appctx_sc(appctx);
char *cur;
- reql = co_getblk(sc_oc(sc), msg_head, 2 * sizeof(char), *totl);
+ reql = applet_getblk(appctx, msg_head, 2 * sizeof(char), *totl);
if (reql <= 0) /* closed or EOL not found */
goto incomplete;
/* Read and Decode message length */
msg_head += *totl;
msg_head_sz -= *totl;
- reql = co_data(sc_oc(sc)) - *totl;
+ reql = applet_input_data(appctx) - *totl;
if (reql > msg_head_sz)
reql = msg_head_sz;
- reql = co_getblk(sc_oc(sc), msg_head, reql, *totl);
+ reql = applet_getblk(appctx, msg_head, reql, *totl);
if (reql <= 0) /* closed */
goto incomplete;
return -1;
}
- reql = co_getblk(sc_oc(sc), trash.area, *msg_len, *totl);
+ reql = applet_getblk(appctx, trash.area, *msg_len, *totl);
if (reql <= 0) /* closed */
goto incomplete;
*totl += reql;
return 1;
incomplete:
- if (reql < 0 || (sc->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED))) {
+ if (reql < 0 || se_fl_test(appctx->sedesc, SE_FL_SHW)) {
/* there was an error or the message was truncated */
appctx->st0 = PEER_SESS_ST_END;
return -1;
char *p;
int reql;
struct peer *peer;
- struct stream *s = appctx_strm(appctx);
- struct peers *peers = strm_fe(s)->parent;
+ struct peers *peers = strm_fe(appctx_strm(appctx))->parent;
reql = peer_getline(appctx);
if (!reql)
*/
static void peer_io_handler(struct appctx *appctx)
{
- struct stconn *sc = appctx_sc(appctx);
- struct stream *s = __sc_strm(sc);
- struct peers *curpeers = strm_fe(s)->parent;
struct peer *curpeer = NULL;
int reql = 0;
int repl = 0;
int prev_state;
int msg_done = 0;
- if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR)))) {
- co_skip(sc_oc(sc), co_data(sc_oc(sc)));
+ if (unlikely(applet_fl_test(appctx, APPCTX_FL_EOS|APPCTX_FL_ERROR))) {
+ applet_reset_input(appctx);
goto out;
}
- /* Check if the input buffer is available. */
- if (sc_ib(sc)->size == 0) {
- sc_need_room(sc, 0);
+ /* Check if the out buffer is available. */
+ if (!applet_get_outbuf(appctx)) {
+ applet_have_more_data(appctx);
goto out;
}
curpeer->statuscode = PEER_SESS_SC_SUCCESSCODE;
curpeer->last_hdshk = now_ms;
- init_connected_peer(curpeer, curpeers);
+ init_connected_peer(curpeer, curpeer->peers);
/* switch to waiting message state */
_HA_ATOMIC_INC(&connected_peers);
goto switchstate;
}
}
-
- if (sc_ic(sc)->flags & CF_WROTE_DATA)
- curpeer->statuscode = PEER_SESS_SC_CONNECTEDCODE;
+ curpeer->statuscode = PEER_SESS_SC_CONNECTEDCODE;
reql = peer_getline(appctx);
if (!reql)
curpeer->last_hdshk = now_ms;
/* Awake main task */
- task_wakeup(curpeers->sync_task, TASK_WOKEN_MSG);
+ task_wakeup(curpeer->peers->sync_task, TASK_WOKEN_MSG);
/* If status code is success */
if (curpeer->statuscode == PEER_SESS_SC_SUCCESSCODE) {
- init_connected_peer(curpeer, curpeers);
+ init_connected_peer(curpeer, curpeer->peers);
}
else {
if (curpeer->statuscode == PEER_SESS_SC_ERRVERSION)
curpeer->flags |= PEER_F_ALIVE;
/* skip consumed message */
- co_skip(sc_oc(sc), totl);
+ applet_skip_input(appctx, totl);
/* make sure we don't process too many at once */
if (msg_done >= peers_max_updates_at_once)
send_msgs:
if (curpeer->flags & PEER_F_HEARTBEAT) {
curpeer->flags &= ~PEER_F_HEARTBEAT;
- repl = peer_send_heartbeatmsg(appctx, curpeer, curpeers);
+ repl = peer_send_heartbeatmsg(appctx, curpeer, curpeer->peers);
if (repl <= 0) {
if (repl == -1)
goto out;
curpeer->tx_hbt++;
}
/* we get here when a peer_recv_msg() returns 0 in reql */
- repl = peer_send_msgs(appctx, curpeer, curpeers);
+ repl = peer_send_msgs(appctx, curpeer, curpeer->peers);
if (repl <= 0) {
if (repl == -1)
goto out;
HA_SPIN_UNLOCK(PEER_LOCK, &curpeer->lock);
curpeer = NULL;
}
- se_fl_set(appctx->sedesc, SE_FL_EOS|SE_FL_EOI);
- co_skip(sc_oc(sc), co_data(sc_oc(sc)));
+ applet_set_eos(appctx);
+ applet_reset_input(appctx);
goto out;
}
}
}
out:
- sc_opposite(sc)->flags |= SC_FL_RCV_ONCE;
+ /* sc_opposite(sc)->flags |= SC_FL_RCV_ONCE; */
if (curpeer)
HA_SPIN_UNLOCK(PEER_LOCK, &curpeer->lock);
.obj_type = OBJ_TYPE_APPLET,
.name = "<PEER>", /* used for logging */
.fct = peer_io_handler,
+ .rcv_buf = appctx_raw_rcv_buf,
+ .snd_buf = appctx_raw_snd_buf,
.init = peer_session_init,
.release = peer_session_release,
};