const struct ist where, const struct ist func,
const void *a1, const void *a2, const void *a3, const void *a4);
+static const char *statuscode_str(int statuscode);
+static const char *peer_app_state_str(enum peer_app_state appstate);
+static const char *peer_learn_state_str(enum peer_learn_state learnstate);
+static const char *peer_applet_state_str(int state);
+
static const struct trace_event peers_trace_events[] = {
-#define PEERS_EV_UPDTMSG (1 << 0)
- { .mask = PEERS_EV_UPDTMSG, .name = "updtmsg", .desc = "update message received" },
-#define PEERS_EV_ACKMSG (1 << 1)
- { .mask = PEERS_EV_ACKMSG, .name = "ackmsg", .desc = "ack message received" },
-#define PEERS_EV_SWTCMSG (1 << 2)
- { .mask = PEERS_EV_SWTCMSG, .name = "swtcmsg", .desc = "switch message received" },
-#define PEERS_EV_DEFMSG (1 << 3)
- { .mask = PEERS_EV_DEFMSG, .name = "defmsg", .desc = "definition message received" },
-#define PEERS_EV_CTRLMSG (1 << 4)
- { .mask = PEERS_EV_CTRLMSG, .name = "ctrlmsg", .desc = "control message sent/received" },
-#define PEERS_EV_SESSREL (1 << 5)
- { .mask = PEERS_EV_SESSREL, .name = "sessrl", .desc = "peer session releasing" },
-#define PEERS_EV_PROTOERR (1 << 6)
- { .mask = PEERS_EV_PROTOERR, .name = "protoerr", .desc = "protocol error" },
+#define PEERS_EV_SESS_NEW (1ULL << 0)
+ { .mask = PEERS_EV_SESS_NEW, .name = "sess_new", .desc = "create new peer session" },
+#define PEERS_EV_SESS_END (1ULL << 1)
+ { .mask = PEERS_EV_SESS_END, .name = "sess_end", .desc = "peer session terminated" },
+#define PEERS_EV_SESS_ERR (1ULL << 2)
+ { .mask = PEERS_EV_SESS_ERR, .name = "sess_err", .desc = "error on peer session" },
+#define PEERS_EV_SESS_SHUT (1ULL << 3)
+ { .mask = PEERS_EV_SESS_SHUT, .name = "sess_shut", .desc = "peer session shutdown" },
+#define PEERS_EV_SESS_WAKE (1ULL << 4)
+ { .mask = PEERS_EV_SESS_WAKE, .name = "sess_wakeup", .desc = "peer session wakeup" },
+#define PEERS_EV_SESS_RESYNC (1ULL << 5)
+ { .mask = PEERS_EV_SESS_RESYNC, .name = "sess_resync", .desc = "peer session resync" },
+#define PEERS_EV_SESS_IO (1ULL << 6)
+ { .mask = PEERS_EV_SESS_IO, .name = "sess_io", .desc = "peer session I/O" },
+
+#define PEERS_EV_RX_MSG (1ULL << 7)
+ { .mask = PEERS_EV_RX_MSG, .name = "rx_msg", .desc = "message received" },
+#define PEERS_EV_RX_BLK (1ULL << 8)
+ { .mask = PEERS_EV_RX_BLK, .name = "rx_blocked", .desc = "receive blocked" },
+#define PEERS_EV_RX_ERR (1ULL << 9)
+ { .mask = PEERS_EV_RX_ERR, .name = "rx_error", .desc = "receive error" },
+
+#define PEERS_EV_TX_MSG (1ULL << 10)
+ { .mask = PEERS_EV_TX_MSG, .name = "tx_msg", .desc = "message sent" },
+#define PEERS_EV_TX_BLK (1ULL << 11)
+ { .mask = PEERS_EV_TX_BLK, .name = "tx_blocked", .desc = "send blocked" },
+#define PEERS_EV_TX_ERR (1ULL << 12)
+ { .mask = PEERS_EV_TX_ERR, .name = "tx_error", .desc = "send error" },
+
+
+#define PEERS_EV_PROTO_ERR (1ULL << 13)
+ { .mask = PEERS_EV_PROTO_ERR, .name = "proto_error", .desc = "protocol error" },
+#define PEERS_EV_PROTO_HELLO (1ULL << 14)
+ { .mask = PEERS_EV_PROTO_HELLO, .name = "proto_hello", .desc = "protocol hello mesage" },
+#define PEERS_EV_PROTO_SUCCESS (1ULL << 15)
+ { .mask = PEERS_EV_PROTO_SUCCESS, .name = "proto_success", .desc = "protocol success message" },
+#define PEERS_EV_PROTO_UPDATE (1ULL << 16)
+ { .mask = PEERS_EV_PROTO_UPDATE, .name = "proto_update", .desc = "protocol UPDATE message" },
+#define PEERS_EV_PROTO_ACK (1ULL << 17)
+ { .mask = PEERS_EV_PROTO_ACK, .name = "proto_ack", .desc = "protocol ACK message" },
+#define PEERS_EV_PROTO_SWITCH (1ULL << 18)
+ { .mask = PEERS_EV_PROTO_SWITCH, .name = "proto_switch", .desc = "protocol TABLE SWITCH message" },
+#define PEERS_EV_PROTO_DEF (1ULL << 19)
+ { .mask = PEERS_EV_PROTO_DEF, .name = "proto_def", .desc = "protocol TABLE DEFINITION message" },
+#define PEERS_EV_PROTO_CTRL (1ULL << 20)
+ { .mask = PEERS_EV_PROTO_CTRL, .name = "proto_ctrl", .desc = "protocol control message" },
{ }
};
static const struct name_desc peers_trace_lockon_args[4] = {
- /* arg1 */ { /* already used by the connection */ },
- /* arg2 */ { .name="peers", .desc="Peers protocol" },
- /* arg3 */ { },
+ /* arg1 */ { /* already used by the appctx */ },
+ /* arg2 */ { .name="peer", .desc="Peer" },
+ /* arg3 */ { .name="peers", .desc="Peers" },
/* arg4 */ { }
};
static const struct name_desc peers_trace_decoding[] = {
#define PEERS_VERB_CLEAN 1
{ .name="clean", .desc="only user-friendly stuff, generally suitable for level \"user\"" },
+#define PEERS_VERB_MINIMAL 2
+ { .name="minimal", .desc="report only peer state and flags, no real decoding" },
+#define PEERS_VERB_SIMPLE 3
+ { .name="simple", .desc="add simple info about messages when available" },
+#define PEERS_VERB_ADVANCED 4
+ { .name="advanced", .desc="add more info about messages when available" },
+#define PEERS_VERB_COMPLETE 5
+ { .name="complete", .desc="add full data dump when available" },
{ /* end */ }
};
struct trace_source trace_peers = {
.name = IST("peers"),
.desc = "Peers protocol",
- .arg_def = TRC_ARG1_CONN, /* TRACE()'s first argument is always a connection */
+ .arg_def = TRC_ARG1_APPCTX,
.default_cb = peers_trace,
.known_events = peers_trace_events,
.lockon_args = peers_trace_lockon_args,
};
/* Return peer control message types as strings (only for debugging purpose). */
-static inline char *ctrl_msg_type_str(unsigned int type)
+static inline __maybe_unused char *ctrl_msg_type_str(unsigned int type)
{
switch (type) {
case PEER_MSG_CTRL_RESYNCREQ:
const struct ist where, const struct ist func,
const void *a1, const void *a2, const void *a3, const void *a4)
{
- if (mask & (PEERS_EV_UPDTMSG|PEERS_EV_ACKMSG|PEERS_EV_SWTCMSG)) {
- if (a2) {
- const struct peer *peer = a2;
-
- chunk_appendf(&trace_buf, " peer=%s", peer->id);
- }
- if (a3) {
- const char *p = a3;
-
- chunk_appendf(&trace_buf, " @%p", p);
- }
- if (a4) {
- const size_t *val = a4;
-
- chunk_appendf(&trace_buf, " %llu", (unsigned long long)*val);
- }
- }
-
- if (mask & PEERS_EV_DEFMSG) {
- if (a2) {
- const struct peer *peer = a2;
-
- chunk_appendf(&trace_buf, " peer=%s", peer->id);
- }
- if (a3) {
- const char *p = a3;
-
- chunk_appendf(&trace_buf, " @%p", p);
- }
- if (a4) {
- const int *val = a4;
-
- chunk_appendf(&trace_buf, " %d", *val);
- }
- }
-
- if (mask & PEERS_EV_CTRLMSG) {
- if (a2) {
- const unsigned char *ctrl_msg_type = a2;
-
- chunk_appendf(&trace_buf, " %s", ctrl_msg_type_str(*ctrl_msg_type));
-
- }
- if (a3) {
- const char *local_peer = a3;
-
- chunk_appendf(&trace_buf, " %s", local_peer);
- }
+ const struct appctx *appctx = a1;
+ const struct peer *peer = a2;
+ const struct peers *peers = NULL;
+ const struct shared_table *st = a3;
+
+ if (!peer && appctx)
+ peer = appctx->svcctx;
+ if (!peer || src->verbosity < PEERS_VERB_CLEAN)
+ return;
+ if (!peers)
+ peers = peer->peers;
+ if (!appctx)
+ appctx = peer->appctx;
- if (a4) {
- const char *remote_peer = a4;
+ chunk_appendf(&trace_buf, " : [%c,%s] <%s/%s> ",
+ (appctx ? (appctx_is_back(appctx) ? 'B' : 'F') : '-'),
+ (appctx ? peer_applet_state_str(appctx->st0) : "-"),
+ peers->id, peer->id);
- chunk_appendf(&trace_buf, " -> %s", remote_peer);
- }
- }
+ if (peer->local)
+ chunk_appendf(&trace_buf, "RELOADING(%s) ", stopping ? "old" : "new");
- if (mask & (PEERS_EV_SESSREL|PEERS_EV_PROTOERR)) {
- if (a2) {
- const struct peer *peer = a2;
- struct peers *peers = NULL;
+ if (src->verbosity == PEERS_VERB_CLEAN)
+ return;
- if (peer->appctx)
- peers = peer->peers;
+ chunk_appendf(&trace_buf, "peer=(.fl=0x%08x, .app=%s, .learn=%s, .teach=%s, status=%s, ",
+ peer->flags, peer_app_state_str(peer->appstate), peer_learn_state_str(peer->learnstate),
+ ((peer->flags & PEER_TEACH_FLAGS) == PEER_F_TEACH_PROCESS ? "PROCESS" :
+ ((peer->flags & PEER_F_TEACH_FINISHED) ? "FINISHED" : "NONE")),
+ statuscode_str(peer->statuscode));
+
+ chunk_appendf(&trace_buf, ".reco=%s, ", (peer->reconnect
+ ? (tick_is_expired(peer->reconnect, now_ms)
+ ? "<PAST>"
+ : human_time(TICKS_TO_MS(peer->reconnect - now_ms), TICKS_TO_MS(1000)))
+ : "<NEVER>"));
+
+ chunk_appendf(&trace_buf, ".heart=%s, ", (peer->heartbeat
+ ? (tick_is_expired(peer->heartbeat, now_ms)
+ ? "<PAST>"
+ : human_time(TICKS_TO_MS(peer->heartbeat - now_ms), TICKS_TO_MS(1000)))
+ : "<NEVER>"));
+
+ chunk_appendf(&trace_buf, ".last_hdshk=%s) ", (peer->last_hdshk
+ ? (tick_is_expired(peer->last_hdshk, now_ms)
+ ? "<PAST>"
+ : human_time(TICKS_TO_MS(peer->last_hdshk - now_ms), TICKS_TO_MS(1000)))
+ : "<NEVER>"));
+
+ if (st)
+ chunk_appendf(&trace_buf, "st=(.id=%s, .fl=0x%08x, .pushed=%u, .acked=%u) ",
+ st->table->id, st->flags, st->last_pushed, st->last_acked);
+
+ if (src->verbosity == PEERS_VERB_MINIMAL)
+ return;
- if (peers)
- chunk_appendf(&trace_buf, " %s", peers->local->id);
- chunk_appendf(&trace_buf, " -> %s", peer->id);
- }
+ if (appctx)
+ chunk_appendf(&trace_buf, "appctx=(%p, .fl=0x%08x, .st0=%d, .st1=%d) ",
+ appctx, appctx->flags, appctx->st0, appctx->st1);
- if (a3) {
- const int *prev_state = a3;
+ if (peers)
+ chunk_appendf(&trace_buf, "peers=(.fl=0x%08x, local=%s) ",
+ peers->flags, peers->local->id);
- chunk_appendf(&trace_buf, " prev_state=%d\n", *prev_state);
- }
- }
+ if (src->verbosity == PEERS_VERB_SIMPLE)
+ return;
}
static const char *statuscode_str(int statuscode)
}
}
+static const char *peer_applet_state_str(int state)
+{
+ switch (state) {
+ case PEER_SESS_ST_ACCEPT: return "ACCEPT";
+ case PEER_SESS_ST_GETVERSION: return "GETVERSION";
+ case PEER_SESS_ST_GETHOST: return "GETHOST";
+ case PEER_SESS_ST_GETPEER: return "GETPEER";
+ case PEER_SESS_ST_SENDSUCCESS: return "SENDSUCCESS";
+ case PEER_SESS_ST_CONNECT: return "CONNECT";
+ case PEER_SESS_ST_GETSTATUS: return "GETSTATUS";
+ case PEER_SESS_ST_WAITMSG: return "WAITMSG";
+ case PEER_SESS_ST_EXIT: return "EXIT";
+ case PEER_SESS_ST_ERRPROTO: return "ERRPROTO";
+ case PEER_SESS_ST_ERRSIZE: return "ERRSIZE";
+ case PEER_SESS_ST_END: return "END";
+ default: return "UNKNOWN";
+ }
+}
+
/* This function encode an uint64 to 'dynamic' length format.
The encoded value is written at address *str, and the
caller must assure that size after *str is large enough.
/* Mark the peer as stopping and wait for the sync task */
peer->flags |= PEER_F_WAIT_SYNCTASK_ACK;
peer->appstate = PEER_APP_ST_STOPPING;
-
+ TRACE_STATE("peer session stopping", PEERS_EV_SESS_END, peer->appctx, peer);
task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
}
struct stream *s;
struct sockaddr_storage *addr = NULL;
+ TRACE_ENTER(PEERS_EV_SESS_NEW, appctx, peer);
if (!sockaddr_alloc(&addr, &peer->srv->addr, sizeof(peer->srv->addr)))
goto out_error;
set_host_port(addr, peer->srv->svc_port);
s->do_log = NULL;
s->uniq_id = 0;
-
_HA_ATOMIC_INC(&active_peers);
+ TRACE_LEAVE(PEERS_EV_SESS_NEW, appctx, peer);
return 0;
out_free_addr:
sockaddr_free(&addr);
out_error:
+ TRACE_ERROR("peer session init failed", PEERS_EV_SESS_NEW|PEERS_EV_SESS_END|PEERS_EV_SESS_ERR, NULL, peer);
return -1;
}
{
struct peer *peer = appctx->svcctx;
- TRACE_PROTO("releasing peer session", PEERS_EV_SESSREL, NULL, peer);
/* appctx->svcctx is not a peer session */
if (appctx->st0 < PEER_SESS_ST_SENDSUCCESS)
return;
__peer_session_deinit(peer);
peer->flags &= ~PEER_F_ALIVE;
HA_SPIN_UNLOCK(PEER_LOCK, &peer->lock);
+ TRACE_STATE("peer session released", PEERS_EV_SESS_END, appctx, peer);
}
}
*/
static inline int peer_getline(struct appctx *appctx)
{
- int n;
+ int n = 0;
+ TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG, appctx);
if (applet_get_inbuf(appctx) == NULL || !applet_input_data(appctx)) {
applet_need_more_data(appctx);
- return 0;
+ goto out;
}
n = applet_getline(appctx, trash.area, trash.size);
if (!n) {
applet_need_more_data(appctx);
- return 0;
+ goto out;
}
if (n < 0 || trash.area[n - 1] != '\n') {
appctx->st0 = PEER_SESS_ST_END;
+ TRACE_ERROR("failed to receive data (channel closed or full)", PEERS_EV_SESS_IO|PEERS_EV_RX_ERR, appctx);
return -1;
}
trash.area[n - 1] = 0;
applet_skip_input(appctx, n);
+ out:
+ TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG, appctx);
return n;
}
{
int ret, msglen;
+ TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_TX_MSG, appctx);
msglen = peer_prepare_msg(trash.area, trash.size, params);
if (!msglen) {
/* internal error: message does not fit in trash */
appctx->st0 = PEER_SESS_ST_END;
+ TRACE_ERROR("failed to send data (message too long)", PEERS_EV_SESS_IO|PEERS_EV_TX_ERR, appctx);
return 0;
}
/* message to buffer */
ret = applet_putblk(appctx, trash.area, msglen);
if (ret <= 0) {
- if (ret != -1)
+ if (ret != -1) {
+ TRACE_ERROR("failed to send data (channel closed)", PEERS_EV_SESS_IO|PEERS_EV_TX_ERR, appctx);
appctx->st0 = PEER_SESS_ST_END;
+ }
}
+ TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_TX_MSG, appctx);
return ret;
}
.hello.peer = peer,
};
+ TRACE_PROTO("send hello message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_HELLO, appctx, peer);
return peer_send_msg(appctx, peer_prepare_hellomsg, &p);
}
*/
static inline int peer_send_status_successmsg(struct appctx *appctx)
{
+ TRACE_PROTO("send status sucess message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_SUCCESS, appctx);
return peer_send_msg(appctx, peer_prepare_status_successmsg, NULL);
}
.error_status.st1 = appctx->st1,
};
+ TRACE_PROTO("send status error message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_ERR, appctx);
return peer_send_msg(appctx, peer_prepare_status_errormsg, &p);
}
.swtch.shared_table = st,
};
+ TRACE_PROTO("send table switch message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_SWITCH, appctx, NULL, st);
return peer_send_msg(appctx, peer_prepare_switchmsg, &p);
}
.ack.shared_table = st,
};
+ TRACE_PROTO("send ack message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_ACK, appctx, NULL, st);
return peer_send_msg(appctx, peer_prepare_ackmsg, &p);
}
},
};
+ TRACE_PROTO("send update message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_UPDATE, appctx, NULL, st);
return peer_send_msg(appctx, peer_prepare_updatemsg, &p);
}
.control.head = { PEER_MSG_CLASS_CONTROL, PEER_MSG_CTRL_RESYNCREQ, },
};
- TRACE_PROTO("send control message", PEERS_EV_CTRLMSG,
- NULL, &p.control.head[1], peers->local->id, peer->id);
-
+ TRACE_PROTO("send resync request message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
return peer_send_msg(appctx, peer_prepare_control_msg, &p);
}
.control.head = { PEER_MSG_CLASS_CONTROL, PEER_MSG_CTRL_RESYNCCONFIRM, },
};
- TRACE_PROTO("send control message", PEERS_EV_CTRLMSG,
- NULL, &p.control.head[1], peers->local->id, peer->id);
-
+ TRACE_PROTO("send resync confirm message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
return peer_send_msg(appctx, peer_prepare_control_msg, &p);
}
p.control.head[1] = (HA_ATOMIC_LOAD(&peers->flags) & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FINISHED ?
PEER_MSG_CTRL_RESYNCFINISHED : PEER_MSG_CTRL_RESYNCPARTIAL;
- TRACE_PROTO("send control message", PEERS_EV_CTRLMSG,
- NULL, &p.control.head[1], peers->local->id, peer->id);
-
+ TRACE_PROTO("send full resync finish message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
return peer_send_msg(appctx, peer_prepare_control_msg, &p);
}
.control.head = { PEER_MSG_CLASS_CONTROL, PEER_MSG_CTRL_HEARTBEAT, },
};
- TRACE_PROTO("send control message", PEERS_EV_CTRLMSG,
- NULL, &p.control.head[1], peers->local->id, peer->id);
-
+ TRACE_PROTO("send heartbeat message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
return peer_send_msg(appctx, peer_prepare_control_msg, &p);
}
.error.head = { PEER_MSG_CLASS_ERROR, PEER_MSG_ERR_SIZELIMIT, },
};
+ TRACE_PROTO("send error size limit message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_ERR, appctx);
return peer_send_msg(appctx, peer_prepare_error_msg, &p);
}
.error.head = { PEER_MSG_CLASS_ERROR, PEER_MSG_ERR_PROTOCOL, },
};
+ TRACE_PROTO("send protocol error message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_ERR, appctx);
return peer_send_msg(appctx, peer_prepare_error_msg, &p);
}
int updates_sent = 0;
int failed_once = 0;
+ TRACE_ENTER(PEERS_EV_SESS_IO, appctx, p, st);
+
ret = 1;
use_timed = 0;
if (st != p->last_local_table) {
ret = peer_send_switchmsg(st, appctx);
if (ret <= 0)
- return ret;
+ goto out_unlocked;
p->last_local_table = st;
+ TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, PEERS_EV_PROTO_SWITCH, appctx, NULL, st, NULL,
+ "table switch message sent (table=%s)", st->table->id);
}
if (peer_stksess_lookup != peer_teach_process_stksess_lookup)
break;
st->last_pushed = updateid;
+ TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, PEERS_EV_PROTO_UPDATE, appctx, NULL, st, NULL,
+ "update message sent (table=%s, updateid=%u)", st->table->id, st->last_pushed);
if (peer_stksess_lookup == peer_teach_process_stksess_lookup) {
uint commitid = _HA_ATOMIC_LOAD(&st->table->commitupdate);
out:
HA_RWLOCK_RDUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
out_unlocked:
+ TRACE_LEAVE(PEERS_EV_SESS_IO, appctx, p, st);
return ret;
}
static inline int peer_send_teach_process_msgs(struct appctx *appctx, struct peer *p,
struct shared_table *st)
{
+ TRACE_PROTO("send teach process messages", PEERS_EV_SESS_IO, appctx, p, st);
return peer_send_teachmsgs(appctx, p, peer_teach_process_stksess_lookup, st);
}
static inline int peer_send_teach_stage1_msgs(struct appctx *appctx, struct peer *p,
struct shared_table *st)
{
+ TRACE_PROTO("send teach stage1 messages", PEERS_EV_SESS_IO, appctx, p, st);
return peer_send_teachmsgs(appctx, p, peer_teach_stage1_stksess_lookup, st);
}
static inline int peer_send_teach_stage2_msgs(struct appctx *appctx, struct peer *p,
struct shared_table *st)
{
+ TRACE_PROTO("send teach stage2 messages", PEERS_EV_SESS_IO, appctx, p, st);
return peer_send_teachmsgs(appctx, p, peer_teach_stage2_stksess_lookup, st);
}
void *data_ptr;
char *msg_save;
- TRACE_ENTER(PEERS_EV_UPDTMSG, NULL, p);
+ TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_UPDATE, appctx, p, st);
+
/* Here we have data message */
- if (!st)
+ if (!st) {
+ TRACE_PROTO("ignore update message: no remote table", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_UPDATE, appctx, p);
goto ignore_msg;
+ }
table = st->table;
if (updt) {
if (msg_len < sizeof(update)) {
- TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
+ TRACE_ERROR("malformed update message: message too small", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
goto malformed_exit;
}
size_t expire_sz = sizeof expire;
if (*msg_cur + expire_sz > msg_end) {
- TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG,
- NULL, p, *msg_cur);
- TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG,
- NULL, p, msg_end, &expire_sz);
+ TRACE_ERROR("malformed update message: wrong expiration size", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
goto malformed_exit;
}
}
newts = stksess_new(table, NULL);
- if (!newts)
+ if (!newts) {
+ TRACE_PROTO("ignore update message: failed to get a new sticky session", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_UPDATE, appctx, p, st);
goto ignore_msg;
+ }
if (table->type == SMP_T_STR) {
unsigned int to_read, to_store;
to_read = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
- TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
+ TRACE_ERROR("malformed update message: invalid string length", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
goto malformed_free_newts;
}
to_store = MIN(to_read, table->key_size - 1);
if (*msg_cur + to_store > msg_end) {
- TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG,
- NULL, p, *msg_cur);
- TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG,
- NULL, p, msg_end, &to_store);
+ TRACE_ERROR("malformed update message: invalid string (too big)", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
goto malformed_free_newts;
}
unsigned int netinteger;
if (*msg_cur + sizeof(netinteger) > msg_end) {
- TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG,
- NULL, p, *msg_cur);
- TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG,
- NULL, p, msg_end);
+ TRACE_ERROR("malformed update message: invalid integer (too big)", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
goto malformed_free_newts;
}
}
else {
if (*msg_cur + table->key_size > msg_end) {
- TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG,
- NULL, p, *msg_cur);
- TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG,
- NULL, p, msg_end, &table->key_size);
+ TRACE_ERROR("malformed update message: invalid key (too big)", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
goto malformed_free_newts;
}
for (idx = 0; idx < st->remote_data_nbelem[data_type]; idx++) {
decoded_int = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
- TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
+ TRACE_ERROR("malformed update message: invalid integer data", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
goto malformed_unlock;
}
for (idx = 0; idx < st->remote_data_nbelem[data_type]; idx++) {
decoded_int = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
- TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
+ TRACE_ERROR("malformed update message: invalid unsigned integer data", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
goto malformed_unlock;
}
for (idx = 0; idx < st->remote_data_nbelem[data_type]; idx++) {
decoded_int = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
- TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
+ TRACE_ERROR("malformed update message: invalid unsigned long data", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
goto malformed_unlock;
}
decoded_int = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
- TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
+ TRACE_ERROR("malformed update message: invalid freq_ctr data", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
+ /* TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p); */
goto malformed_unlock;
}
data.curr_tick = tick_add(now_ms, -decoded_int) & ~0x1;
data.curr_ctr = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
- TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
+ TRACE_ERROR("malformed update message: invalid freq_ctr data", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
goto malformed_unlock;
}
data.prev_ctr = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
- TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
+ TRACE_ERROR("malformed update message: invalid freq_ctr data", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
goto malformed_unlock;
}
}
decoded_int = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
- TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
+ TRACE_ERROR("malformed update message: invalid data value", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
goto malformed_unlock;
}
data.curr_tick = tick_add(now_ms, -decoded_int) & ~0x1;
data.curr_ctr = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
- TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
+ TRACE_ERROR("malformed update message: invalid freq_ctr value", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
goto malformed_unlock;
}
data.prev_ctr = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
- TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
+ TRACE_ERROR("malformed update message: invalid freq_ctr value", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
goto malformed_unlock;
}
}
data_len = decoded_int;
if (*msg_cur + data_len > msg_end) {
- TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG,
- NULL, p, *msg_cur);
- TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG,
- NULL, p, msg_end, &data_len);
+ TRACE_ERROR("malformed update message: invalid dict value", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
goto malformed_unlock;
}
end = *msg_cur + data_len;
id = intdecode(msg_cur, end);
if (!*msg_cur || !id) {
- TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG,
- NULL, p, *msg_cur, &id);
+ TRACE_ERROR("malformed update message: invalid dict value", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
goto malformed_unlock;
}
if (*msg_cur == end) {
/* Dictionary entry key without value. */
if (id > dc->max_entries) {
- TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG,
- NULL, p, NULL, &id);
+ TRACE_ERROR("malformed update message: invalid dict value", PEERS_EV_SESS_IO|PEERS_EV_PROTO_ERR, appctx, p, st);
goto malformed_unlock;
}
/* IDs sent over the network are numbered from 1. */
value_len = intdecode(msg_cur, end);
if (!*msg_cur || *msg_cur + value_len > end ||
unlikely(value_len + 1 >= chunk->size)) {
- TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG,
- NULL, p, *msg_cur, &value_len);
- TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG,
- NULL, p, end, &chunk->size);
+ TRACE_ERROR("malformed update message: invalid dict value", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
goto malformed_unlock;
}
goto update_wts;
}
+ TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, PEERS_EV_PROTO_UPDATE, appctx, p, NULL, NULL,
+ "Update message successfully processed (table=%s, updateid=%u)", st->table->id, st->last_get);
+
ignore_msg:
- TRACE_LEAVE(PEERS_EV_UPDTMSG, NULL, p);
+ TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_UPDATE, appctx, p, st);
return 1;
malformed_unlock:
/* malformed message */
HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
stktable_touch_remote(st->table, ts, 1);
- appctx->st0 = PEER_SESS_ST_ERRPROTO;
- TRACE_DEVEL("leaving in error", PEERS_EV_UPDTMSG);
- return 0;
+ goto malformed_exit;
malformed_free_newts:
/* malformed message */
stksess_free(st->table, newts);
malformed_exit:
appctx->st0 = PEER_SESS_ST_ERRPROTO;
- TRACE_DEVEL("leaving in error", PEERS_EV_UPDTMSG);
+ TRACE_DEVEL("leaving in error", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
return 0;
}
/* ack message */
uint32_t table_id ;
uint32_t update;
- struct shared_table *st;
+ struct shared_table *st = NULL;
+ int ret = 1;
+ TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ACK, appctx, p);
/* ignore ack during teaching process */
- if (p->flags & PEER_F_TEACH_PROCESS)
- return 1;
+ if (p->flags & PEER_F_TEACH_PROCESS) {
+ TRACE_DEVEL("Ignore ack during teaching process", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ACK, appctx, p);
+ goto end;
+ }
table_id = intdecode(msg_cur, msg_end);
if (!*msg_cur || (*msg_cur + sizeof(update) > msg_end)) {
- /* malformed message */
-
- TRACE_PROTO("malformed message", PEERS_EV_ACKMSG,
- NULL, p, *msg_cur);
+ TRACE_ERROR("malformed ackk message: no table id", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p);
appctx->st0 = PEER_SESS_ST_ERRPROTO;
- return 0;
+ ret = 0;
+ goto end;
}
memcpy(&update, *msg_cur, sizeof(update));
update = ntohl(update);
-
for (st = p->tables; st; st = st->next) {
if (st->local_id == table_id) {
st->update = update;
+ TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, PEERS_EV_PROTO_ACK, appctx, p, NULL, NULL,
+ "Ack message successfully process (table=%s, updateid=%u)", st->table->id, st->update);
break;
}
}
- return 1;
+ end:
+ TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ACK, appctx, p, st);
+ return ret;
}
/*
struct shared_table *st;
int table_id;
+ TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_SWITCH, appctx, p);
table_id = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
- TRACE_PROTO("malformed message", PEERS_EV_SWTCMSG, NULL, p);
- /* malformed message */
+ TRACE_ERROR("malformed table switch message: no table id", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p);
appctx->st0 = PEER_SESS_ST_ERRPROTO;
return 0;
}
for (st = p->tables; st; st = st->next) {
if (st->remote_id == table_id) {
p->remote_table = st;
+ TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, PEERS_EV_PROTO_SWITCH, appctx, p, NULL, NULL,
+ "table switch message successfully process (table=%s)", st->table->id);
break;
}
}
+ TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_SWITCH, appctx, p, st);
return 1;
}
int table_id;
uint64_t table_data;
+ TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p);
table_id = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
- TRACE_PROTO("malformed message", PEERS_EV_DEFMSG, NULL, p);
+ TRACE_ERROR("malformed table definition message: no table id", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p);
goto malformed_exit;
}
table_id_len = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
- TRACE_PROTO("malformed message", PEERS_EV_DEFMSG, NULL, p, *msg_cur);
+ TRACE_ERROR("malformed table definition message: no table name length", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p);
goto malformed_exit;
}
p->remote_table = NULL;
if (!table_id_len || (*msg_cur + table_id_len) >= msg_end) {
- TRACE_PROTO("malformed message", PEERS_EV_DEFMSG, NULL, p, *msg_cur, &table_id_len);
+ TRACE_ERROR("malformed table definition message: no table name", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p);
goto malformed_exit;
}
}
if (!p->remote_table) {
- TRACE_PROTO("ignored message", PEERS_EV_DEFMSG, NULL, p);
+ TRACE_PROTO("ignore table definition message: table not found", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p);
goto ignore_msg;
}
*msg_cur += table_id_len;
if (*msg_cur >= msg_end) {
- TRACE_PROTO("malformed message", PEERS_EV_DEFMSG, NULL, p);
+ TRACE_ERROR("malformed table definition message: truncated message", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p);
goto malformed_exit;
}
table_type = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
- TRACE_PROTO("malformed message", PEERS_EV_DEFMSG, NULL, p);
+ TRACE_ERROR("malformed table definition message: no table type", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p);
goto malformed_exit;
}
table_keylen = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
- TRACE_PROTO("malformed message", PEERS_EV_DEFMSG, NULL, p);
+ TRACE_ERROR("malformed table definition message: no key length", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p);
goto malformed_exit;
}
table_data = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
- TRACE_PROTO("malformed message", PEERS_EV_DEFMSG, NULL, p);
+ TRACE_ERROR("malformed table definition message: no data type", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p);
goto malformed_exit;
}
if (p->remote_table->table->type != peer_int_key_type[table_type]
|| p->remote_table->table->key_size != table_keylen) {
p->remote_table = NULL;
- TRACE_PROTO("ignored message", PEERS_EV_DEFMSG, NULL, p);
+ TRACE_PROTO("ignore table definition message: no key/type match", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p);
goto ignore_msg;
}
type = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
p->remote_table = NULL;
- TRACE_PROTO("missing meta data for array", PEERS_EV_DEFMSG, NULL, p);
+ TRACE_PROTO("ignore table definition message: missing meta data for array", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p);
goto ignore_msg;
}
/* check if the data_type match the current from the bitfield */
if (type != data_type) {
p->remote_table = NULL;
- TRACE_PROTO("meta data mismatch type", PEERS_EV_DEFMSG, NULL, p);
+ TRACE_PROTO("ignore table definition message: meta data mismatch type", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p);
goto ignore_msg;
}
p->remote_table->remote_data_nbelem[type] = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
p->remote_table = NULL;
- TRACE_PROTO("missing array size meta data for array", PEERS_EV_DEFMSG, NULL, p);
+ TRACE_PROTO("ignore table definition message: missing array size meta data for array", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p);
goto ignore_msg;
}
intdecode(msg_cur, msg_end);
if (!*msg_cur) {
p->remote_table = NULL;
- TRACE_PROTO("missing period for frqp", PEERS_EV_DEFMSG, NULL, p);
+ TRACE_PROTO("ignore table definition message: missing period for frqp", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p);
goto ignore_msg;
}
}
type = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
p->remote_table = NULL;
- TRACE_PROTO("missing meta data for frqp", PEERS_EV_DEFMSG, NULL, p);
+ TRACE_PROTO("ignore table definition message: missing data for frqp", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p);
goto ignore_msg;
}
/* check if the data_type match the current from the bitfield */
if (type != data_type) {
p->remote_table = NULL;
- TRACE_PROTO("meta data mismatch type", PEERS_EV_DEFMSG, NULL, p);
+ TRACE_PROTO("ignore table definition message: meta data mismatch", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p);
goto ignore_msg;
}
intdecode(msg_cur, msg_end);
if (!*msg_cur) {
p->remote_table = NULL;
- TRACE_PROTO("missing period for frqp", PEERS_EV_DEFMSG, NULL, p);
+ TRACE_PROTO("ignore table definition message: mismatch period for frqp", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p);
goto ignore_msg;
}
}
if (table_data & (1ULL << data_type)) {
if (stktable_data_types[data_type].is_array) {
p->remote_table = NULL;
- TRACE_PROTO("missing array size meta data for array", PEERS_EV_DEFMSG, NULL, p);
+ TRACE_PROTO("ignore table definition message: missing array size meta data for array", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p);
goto ignore_msg;
}
}
p->remote_table->remote_data = table_data;
p->remote_table->remote_id = table_id;
+ TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, PEERS_EV_PROTO_DEF, appctx, p, NULL, NULL,
+ "table definition message successfully process (table=%s)", p->remote_table->table->id);
+
ignore_msg:
+ TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p);
return 1;
malformed_exit:
/* malformed message */
appctx->st0 = PEER_SESS_ST_ERRPROTO;
+ TRACE_DEVEL("leaving in error", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p);
return 0;
}
int reql;
char *cur;
+ TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG, appctx);
+
reql = applet_getblk(appctx, msg_head, 2 * sizeof(char), *totl);
if (reql <= 0) /* closed or EOL not found */
goto incomplete;
goto incomplete;
/* malformed message */
- TRACE_PROTO("malformed message: too large length encoding", PEERS_EV_UPDTMSG);
+ TRACE_PROTO("malformed message: bad message length encoding", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx);
appctx->st0 = PEER_SESS_ST_ERRPROTO;
return -1;
}
if (*msg_len > trash.size) {
/* Status code is not success, abort */
appctx->st0 = PEER_SESS_ST_ERRSIZE;
+ TRACE_PROTO("malformed message: too large length encoding", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx);
return -1;
}
*totl += reql;
}
+ TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG, appctx);
return 1;
incomplete:
if (reql < 0 || se_fl_test(appctx->sedesc, SE_FL_SHW)) {
/* there was an error or the message was truncated */
appctx->st0 = PEER_SESS_ST_END;
+ TRACE_ERROR("error or messafe truncated", PEERS_EV_SESS_IO|PEERS_EV_RX_ERR, appctx);
return -1;
}
+ TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG, appctx);
return 0;
}
{
struct peers *peers = peer->peers;
+ TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG, appctx, peer);
+
if (msg_head[0] == PEER_MSG_CLASS_CONTROL) {
if (msg_head[1] == PEER_MSG_CTRL_RESYNCREQ) {
struct shared_table *st;
/* Reset message: remote need resync */
-
- TRACE_PROTO("received control message", PEERS_EV_CTRLMSG,
- NULL, &msg_head[1], peers->local->id, peer->id);
+ TRACE_PROTO("Resync request message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
/* prepare tables for a global push */
for (st = peer->tables; st; st = st->next) {
st->teaching_origin = st->last_pushed = st->update;
/* flag to start to teach lesson */
peer->flags |= (PEER_F_TEACH_PROCESS|PEER_F_DBG_RESYNC_REQUESTED);
+ TRACE_STATE("peer elected to teach leasson to remote peer", PEERS_EV_SESS_RESYNC|PEERS_EV_PROTO_CTRL, appctx, peer);
}
else if (msg_head[1] == PEER_MSG_CTRL_RESYNCFINISHED) {
- TRACE_PROTO("received control message", PEERS_EV_CTRLMSG,
- NULL, &msg_head[1], peers->local->id, peer->id);
+ TRACE_PROTO("Full resync finished message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
if (peer->learnstate == PEER_LR_ST_PROCESSING) {
peer->learnstate = PEER_LR_ST_FINISHED;
peer->flags |= PEER_F_WAIT_SYNCTASK_ACK;
task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
+ TRACE_STATE("Full resync finished", PEERS_EV_SESS_RESYNC|PEERS_EV_PROTO_CTRL, appctx, peer);
}
peer->confirm++;
}
else if (msg_head[1] == PEER_MSG_CTRL_RESYNCPARTIAL) {
- TRACE_PROTO("received control message", PEERS_EV_CTRLMSG,
- NULL, &msg_head[1], peers->local->id, peer->id);
+ TRACE_PROTO("Partial resync finished message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
if (peer->learnstate == PEER_LR_ST_PROCESSING) {
peer->learnstate = PEER_LR_ST_FINISHED;
peer->flags |= (PEER_F_LEARN_NOTUP2DATE|PEER_F_WAIT_SYNCTASK_ACK);
task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
+ TRACE_STATE("partial resync finished", PEERS_EV_SESS_RESYNC|PEERS_EV_PROTO_CTRL, appctx, peer);
}
peer->confirm++;
}
else if (msg_head[1] == PEER_MSG_CTRL_RESYNCCONFIRM) {
struct shared_table *st;
- TRACE_PROTO("received control message", PEERS_EV_CTRLMSG,
- NULL, &msg_head[1], peers->local->id, peer->id);
+ TRACE_PROTO("Resync confirm message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
/* If stopping state */
if (stopping) {
/* Close session, push resync no more needed */
peer->flags |= PEER_F_LOCAL_TEACH_COMPLETE;
appctx->st0 = PEER_SESS_ST_END;
+ TRACE_STATE("process stopping, stop any resync", PEERS_EV_SESS_RESYNC|PEERS_EV_PROTO_CTRL, appctx, peer);
return 0;
}
for (st = peer->tables; st; st = st->next) {
/* reset teaching flags to 0 */
peer->flags &= ~PEER_TEACH_FLAGS;
+ TRACE_STATE("Stop teaching", PEERS_EV_SESS_RESYNC|PEERS_EV_PROTO_CTRL, appctx, peer);
}
else if (msg_head[1] == PEER_MSG_CTRL_HEARTBEAT) {
- TRACE_PROTO("received control message", PEERS_EV_CTRLMSG,
- NULL, &msg_head[1], peers->local->id, peer->id);
+ TRACE_PROTO("Heartbeat message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
peer->rx_hbt++;
}
}
else if (msg_head[0] == PEER_MSG_CLASS_STICKTABLE) {
if (msg_head[1] == PEER_MSG_STKT_DEFINE) {
+ TRACE_PROTO("Table definition message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
if (!peer_treat_definemsg(appctx, peer, msg_cur, msg_end, totl))
return 0;
}
else if (msg_head[1] == PEER_MSG_STKT_SWITCH) {
+ TRACE_PROTO("Table switch message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
if (!peer_treat_switchmsg(appctx, peer, msg_cur, msg_end))
return 0;
}
msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED) {
int update, expire;
+ TRACE_PROTO("Update message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_UPDATE, appctx, peer);
update = msg_head[1] == PEER_MSG_STKT_UPDATE || msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED;
expire = msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED || msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED;
if (!peer_treat_updatemsg(appctx, peer, update, expire,
}
else if (msg_head[1] == PEER_MSG_STKT_ACK) {
+ TRACE_PROTO("Ack message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ACK, appctx, peer);
if (!peer_treat_ackmsg(appctx, peer, msg_cur, msg_end))
return 0;
}
}
else if (msg_head[0] == PEER_MSG_CLASS_RESERVED) {
appctx->st0 = PEER_SESS_ST_ERRPROTO;
+ TRACE_PROTO("malformed message: reserved", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, peer);
return 0;
}
+ TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG, appctx, peer);
return 1;
}
int peer_send_msgs(struct appctx *appctx,
struct peer *peer, struct peers *peers)
{
- int repl;
+ int repl = 1;
+
+ TRACE_ENTER(PEERS_EV_SESS_IO, appctx, peer);
/* Need to request a resync (only possible for a remote peer at this stage) */
if (peer->learnstate == PEER_LR_ST_ASSIGNED) {
BUG_ON(peer->local);
repl = peer_send_resync_reqmsg(appctx, peer, peers);
if (repl <= 0)
- return repl;
+ goto end;
peer->learnstate = PEER_LR_ST_PROCESSING;
+ TRACE_STATE("Start processing resync", PEERS_EV_SESS_IO|PEERS_EV_SESS_RESYNC, appctx, peer);
}
/* Nothing to read, now we start to write */
if (st->last_get != st->last_acked) {
repl = peer_send_ackmsg(st, appctx);
if (repl <= 0)
- return repl;
+ goto end;
st->last_acked = st->last_get;
+ TRACE_PRINTF(TRACE_LEVEL_PROTO, PEERS_EV_PROTO_ACK, appctx, NULL, st, NULL,
+ "ack message sent (table=%s, updateid=%u)", st->table->id, st->last_acked);
}
if (!(peer->flags & PEER_F_TEACH_PROCESS)) {
if (HA_RWLOCK_TRYRDLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock)) {
applet_have_more_data(appctx);
- return -1;
+ repl = -1;
+ goto end;
}
must_send = (peer->learnstate == PEER_LR_ST_NOTASSIGNED) && (st->last_pushed != st->table->localupdate);
HA_RWLOCK_RDUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
repl = peer_send_teach_process_msgs(appctx, peer, st);
if (repl <= 0) {
peer->stop_local_table = peer->last_local_table;
- return repl;
+ goto end;
}
}
}
repl = peer_send_teach_stage1_msgs(appctx, peer, st);
if (repl <= 0) {
peer->stop_local_table = peer->last_local_table;
- return repl;
+ goto end;
}
}
repl = peer_send_teach_stage2_msgs(appctx, peer, st);
if (repl <= 0) {
peer->stop_local_table = peer->last_local_table;
- return repl;
+ goto end;
}
}
}
updates++;
if (updates >= peers_max_updates_at_once) {
applet_have_more_data(appctx);
- return -1;
+ repl = -1;
+ goto end;
}
st = st->next;
if ((peer->flags & PEER_F_TEACH_PROCESS) && !(peer->flags & PEER_F_TEACH_FINISHED)) {
repl = peer_send_resync_finishedmsg(appctx, peer, peers);
if (repl <= 0)
- return repl;
+ goto end;
/* flag finished message sent */
peer->flags |= PEER_F_TEACH_FINISHED;
+ TRACE_STATE("full/partial resync finished", PEERS_EV_SESS_IO|PEERS_EV_SESS_RESYNC, appctx, peer);
}
/* Confirm finished or partial messages */
while (peer->confirm) {
repl = peer_send_resync_confirmsg(appctx, peer, peers);
if (repl <= 0)
- return repl;
-
+ goto end;
+ TRACE_STATE("Confirm resync is finished", PEERS_EV_SESS_IO|PEERS_EV_SESS_RESYNC, appctx, peer);
peer->confirm--;
}
- return 1;
+ repl = 1;
+ end:
+ TRACE_LEAVE(PEERS_EV_SESS_IO, appctx, peer);
+ return repl;
}
/*
if (strncmp(PEER_SESSION_PROTO_NAME " ", trash.area, proto_len + 1) != 0) {
appctx->st0 = PEER_SESS_ST_EXIT;
appctx->st1 = PEER_SESS_SC_ERRPROTO;
+ TRACE_ERROR("protocol error: invalid version line", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx);
return -1;
}
if (peer_get_version(trash.area + proto_len + 1, maj_ver, min_ver) == -1 ||
*maj_ver != PEER_MAJOR_VER || *min_ver > PEER_MINOR_VER) {
appctx->st0 = PEER_SESS_ST_EXIT;
appctx->st1 = PEER_SESS_SC_ERRVERSION;
+ TRACE_ERROR("protocol error: invalid version", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx);
return -1;
}
+ TRACE_DATA("version line received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_HELLO, appctx);
return 1;
}
if (strcmp(localpeer, trash.area) != 0) {
appctx->st0 = PEER_SESS_ST_EXIT;
appctx->st1 = PEER_SESS_SC_ERRHOST;
+ TRACE_ERROR("protocol error: wrong host", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx);
return -1;
}
+ TRACE_DATA("host line received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_HELLO, appctx);
return 1;
}
if (!p) {
appctx->st0 = PEER_SESS_ST_EXIT;
appctx->st1 = PEER_SESS_SC_ERRPROTO;
+ TRACE_ERROR("protocol error: invalid peer line", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx);
return -1;
}
*p = 0;
if (!peer) {
appctx->st0 = PEER_SESS_ST_EXIT;
appctx->st1 = PEER_SESS_SC_ERRPEER;
+ TRACE_ERROR("protocol error: unknown peer", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx);
return -1;
}
*curpeer = peer;
+ TRACE_DATA("peer line received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_HELLO, appctx, peer);
return 1;
}
{
struct shared_table *st;
+ TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_SESS_NEW, NULL, peer);
peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
/* Init cursors */
* on the frontend side), flag it to start to teach lesson.
*/
peer->flags |= PEER_F_TEACH_PROCESS;
+ TRACE_STATE("peer elected to teach lesson to local peer", PEERS_EV_SESS_NEW|PEERS_EV_SESS_RESYNC, NULL, peer);
}
/* Mark the peer as starting and wait the sync task */
peer->flags |= PEER_F_WAIT_SYNCTASK_ACK;
peer->appstate = PEER_APP_ST_STARTING;
+ TRACE_STATE("peer session starting", PEERS_EV_SESS_NEW, NULL, peer);
+ TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_SESS_NEW, NULL, peer);
}
/*
int prev_state;
int msg_done = 0;
+ TRACE_ENTER(PEERS_EV_SESS_IO, appctx);
+
if (unlikely(applet_fl_test(appctx, APPCTX_FL_EOS|APPCTX_FL_ERROR))) {
applet_reset_input(appctx);
goto out;
__fallthrough;
case PEER_SESS_ST_GETVERSION:
prev_state = appctx->st0;
+ TRACE_STATE("get version line", PEERS_EV_SESS_IO, appctx);
reql = peer_getline_version(appctx, &maj_ver, &min_ver);
if (reql <= 0) {
if (!reql)
__fallthrough;
case PEER_SESS_ST_GETHOST:
prev_state = appctx->st0;
+ TRACE_STATE("get host line", PEERS_EV_SESS_IO, appctx);
reql = peer_getline_host(appctx);
if (reql <= 0) {
if (!reql)
__fallthrough;
case PEER_SESS_ST_GETPEER: {
prev_state = appctx->st0;
+ TRACE_STATE("get peer line", PEERS_EV_SESS_IO, appctx);
reql = peer_getline_last(appctx, &curpeer);
if (reql <= 0) {
if (!reql)
/* Local connection, reply a retry */
appctx->st0 = PEER_SESS_ST_EXIT;
appctx->st1 = PEER_SESS_SC_TRYAGAIN;
+ TRACE_STATE("local connection, retry", PEERS_EV_SESS_IO|PEERS_EV_SESS_END, appctx, curpeer);
goto switchstate;
}
+ TRACE_STATE("release old session", PEERS_EV_SESS_IO|PEERS_EV_SESS_END, appctx, curpeer);
/* we're killing a connection, we must apply a random delay before
* retrying otherwise the other end will do the same and we can loop
* for a while.
curpeer = appctx->svcctx;
HA_SPIN_LOCK(PEER_LOCK, &curpeer->lock);
if (curpeer->appctx != appctx) {
+ TRACE_STATE("release old session", PEERS_EV_SESS_IO|PEERS_EV_SESS_END, appctx, curpeer);
appctx->st0 = PEER_SESS_ST_END;
goto switchstate;
}
}
+ TRACE_STATE("send success", PEERS_EV_SESS_IO, appctx, curpeer);
repl = peer_send_status_successmsg(appctx);
if (repl <= 0) {
if (repl == -1)
/* switch to waiting message state */
_HA_ATOMIC_INC(&connected_peers);
appctx->st0 = PEER_SESS_ST_WAITMSG;
+ TRACE_STATE("connected, now wait for messages", PEERS_EV_SESS_IO, appctx, curpeer);
goto switchstate;
}
case PEER_SESS_ST_CONNECT: {
curpeer = appctx->svcctx;
HA_SPIN_LOCK(PEER_LOCK, &curpeer->lock);
if (curpeer->appctx != appctx) {
+ TRACE_STATE("release old session", PEERS_EV_SESS_IO|PEERS_EV_SESS_END, appctx, curpeer);
appctx->st0 = PEER_SESS_ST_END;
goto switchstate;
}
}
+ TRACE_STATE("send hello message", PEERS_EV_SESS_IO, appctx, curpeer);
repl = peer_send_hellomsg(appctx, curpeer);
if (repl <= 0) {
if (repl == -1)
curpeer = appctx->svcctx;
HA_SPIN_LOCK(PEER_LOCK, &curpeer->lock);
if (curpeer->appctx != appctx) {
+ TRACE_STATE("release old session", PEERS_EV_SESS_IO|PEERS_EV_SESS_END, appctx, curpeer);
appctx->st0 = PEER_SESS_ST_END;
goto switchstate;
}
}
curpeer->statuscode = PEER_SESS_SC_CONNECTEDCODE;
+ TRACE_STATE("get status", PEERS_EV_SESS_IO, appctx, curpeer);
reql = peer_getline(appctx);
if (!reql)
}
_HA_ATOMIC_INC(&connected_peers);
appctx->st0 = PEER_SESS_ST_WAITMSG;
+ TRACE_STATE("connected, now wait for messages", PEERS_EV_SESS_IO, appctx, curpeer);
}
__fallthrough;
case PEER_SESS_ST_WAITMSG: {
curpeer = appctx->svcctx;
HA_SPIN_LOCK(PEER_LOCK, &curpeer->lock);
if (curpeer->appctx != appctx) {
+ TRACE_STATE("release old session", PEERS_EV_SESS_IO|PEERS_EV_SESS_END, appctx, curpeer);
appctx->st0 = PEER_SESS_ST_END;
goto switchstate;
}
if (curpeer->flags & PEER_F_WAIT_SYNCTASK_ACK) {
applet_wont_consume(appctx);
+ TRACE_STATE("peer is waiting for sync task", PEERS_EV_SESS_IO, appctx, curpeer);
goto out;
}
applet_will_consume(appctx);
/* local peer is assigned of a lesson, start it */
- if (curpeer->learnstate == PEER_LR_ST_ASSIGNED && curpeer->local)
+ if (curpeer->learnstate == PEER_LR_ST_ASSIGNED && curpeer->local) {
curpeer->learnstate = PEER_LR_ST_PROCESSING;
+ TRACE_STATE("peer starts to learn", PEERS_EV_SESS_IO, appctx, curpeer);
+ }
reql = peer_recv_msg(appctx, (char *)msg_head, sizeof msg_head, &msg_len, &totl);
if (reql <= 0) {
if (prev_state == PEER_SESS_ST_WAITMSG)
_HA_ATOMIC_DEC(&connected_peers);
prev_state = appctx->st0;
+ TRACE_STATE("send status error message", PEERS_EV_SESS_IO|PEERS_EV_SESS_ERR, appctx, curpeer);
if (peer_send_status_errormsg(appctx) == -1)
goto out;
appctx->st0 = PEER_SESS_ST_END;
if (prev_state == PEER_SESS_ST_WAITMSG)
_HA_ATOMIC_DEC(&connected_peers);
prev_state = appctx->st0;
+ TRACE_STATE("send error size message", PEERS_EV_SESS_IO|PEERS_EV_SESS_ERR, appctx, curpeer);
if (peer_send_error_size_limitmsg(appctx) == -1)
goto out;
appctx->st0 = PEER_SESS_ST_END;
goto switchstate;
}
case PEER_SESS_ST_ERRPROTO: {
- TRACE_PROTO("protocol error", PEERS_EV_PROTOERR,
- NULL, curpeer, &prev_state);
if (curpeer)
curpeer->proto_err++;
if (prev_state == PEER_SESS_ST_WAITMSG)
_HA_ATOMIC_DEC(&connected_peers);
prev_state = appctx->st0;
- if (peer_send_error_protomsg(appctx) == -1) {
- TRACE_PROTO("could not send error message", PEERS_EV_PROTOERR);
+ TRACE_STATE("send proto error message", PEERS_EV_SESS_IO|PEERS_EV_SESS_ERR, appctx, curpeer);
+ if (peer_send_error_protomsg(appctx) == -1)
goto out;
- }
appctx->st0 = PEER_SESS_ST_END;
prev_state = appctx->st0;
}
if (prev_state == PEER_SESS_ST_WAITMSG)
_HA_ATOMIC_DEC(&connected_peers);
prev_state = appctx->st0;
+ TRACE_STATE("Terminate peer session", PEERS_EV_SESS_IO|PEERS_EV_SESS_END, appctx, curpeer);
if (curpeer) {
HA_SPIN_UNLOCK(PEER_LOCK, &curpeer->lock);
curpeer = NULL;
if (curpeer)
HA_SPIN_UNLOCK(PEER_LOCK, &curpeer->lock);
+
+ TRACE_LEAVE(PEERS_EV_SESS_IO, appctx, curpeer);
return;
}
if (appctx->applet != &peer_applet)
return;
+ TRACE_STATE("peer session shutdown", PEERS_EV_SESS_SHUT|PEERS_EV_SESS_END, appctx, peer);
__peer_session_deinit(peer);
appctx->st0 = PEER_SESS_ST_END;
unsigned int thr = 0;
int idx;
+ TRACE_ENTER(PEERS_EV_SESS_NEW, NULL, peer);
+
peer->new_conn++;
peer->reconnect = tick_add(now_ms, (stopping ? MS_TO_TICKS(PEER_LOCAL_RECONNECT_TIMEOUT) : MS_TO_TICKS(PEER_RECONNECT_TIMEOUT)));
peer->heartbeat = TICK_ETERNITY;
for (idx = 0; idx < global.nbthread; idx++)
thr = peers->applet_count[idx] < peers->applet_count[thr] ? idx : thr;
appctx = appctx_new_on(&peer_applet, NULL, thr);
- if (!appctx)
+ if (!appctx) {
+ TRACE_ERROR("peer APPCTX creation failed", PEERS_EV_SESS_NEW|PEERS_EV_SESS_END|PEERS_EV_SESS_ERR, NULL, peer);
goto out_close;
+ }
appctx->svcctx = (void *)peer;
appctx->st0 = PEER_SESS_ST_CONNECT;
HA_ATOMIC_INC(&peers->applet_count[thr]);
appctx_wakeup(appctx);
+
+ TRACE_LEAVE(PEERS_EV_SESS_NEW, appctx, peer);
return appctx;
out_close:
/* Partial resync */
flags |= (peer->local ? PEERS_F_DBG_RESYNC_LOCALPARTIAL : PEERS_F_DBG_RESYNC_REMOTEPARTIAL);
peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
+ TRACE_STATE("learning finished, peer session partially resync", PEERS_EV_SESS_RESYNC, NULL, peer);
}
else {
/* Full resync */
if (!commit_a_finish) {
/* it remains some shard to request, we schedule a new request */
peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
+ TRACE_STATE("Resync in progress, some shard not resync yet", PEERS_EV_SESS_RESYNC, NULL, peer);
}
}
if (commit_a_finish) {
flags |= (PEERS_F_RESYNC_LOCAL_FINISHED|PEERS_F_RESYNC_REMOTE_FINISHED);
flags |= (peer->local ? PEERS_F_DBG_RESYNC_LOCALFINISHED : PEERS_F_DBG_RESYNC_REMOTEFINISHED);
+ TRACE_STATE("learning finished, peer session fully resync", PEERS_EV_SESS_RESYNC, NULL, peer);
}
}
peer->learnstate = PEER_LR_ST_NOTASSIGNED;
if (peer->appstate == PEER_APP_ST_STOPPING) {
clear_peer_learning_status(peer);
peer->appstate = PEER_APP_ST_STOPPED;
+ TRACE_STATE("peer session now stopped", PEERS_EV_SESS_END, NULL, peer);
}
else if (peer->appstate == PEER_APP_ST_STARTING) {
clear_peer_learning_status(peer);
/* assign local peer for a lesson */
peer->learnstate = PEER_LR_ST_ASSIGNED;
HA_ATOMIC_OR(&peers->flags, PEERS_F_RESYNC_ASSIGN|PEERS_F_DBG_RESYNC_LOCALASSIGN);
+ TRACE_STATE("peer session assigned for a local resync", PEERS_EV_SESS_RESYNC|PEERS_EV_SESS_WAKE, NULL, peer);
}
}
else if (!peer->local) {
/* assign remote peer for a lesson */
peer->learnstate = PEER_LR_ST_ASSIGNED;
HA_ATOMIC_OR(&peers->flags, PEERS_F_RESYNC_ASSIGN|PEERS_F_DBG_RESYNC_REMOTEASSIGN);
+ TRACE_STATE("peer session assigned for a remote resync", PEERS_EV_SESS_RESYNC|PEERS_EV_SESS_WAKE, NULL, peer);
}
}
peer->appstate = PEER_APP_ST_RUNNING;
+ TRACE_STATE("peer session running", PEERS_EV_SESS_NEW|PEERS_EV_SESS_WAKE, NULL, peer);
appctx_wakeup(peer->appctx);
}
}
/* assign peer for the lesson */
peer->learnstate = PEER_LR_ST_ASSIGNED;
HA_ATOMIC_OR(&peers->flags, PEERS_F_RESYNC_ASSIGN|PEERS_F_DBG_RESYNC_REMOTEASSIGN);
+ TRACE_STATE("peer session assigned for a remote resync", PEERS_EV_SESS_RESYNC|PEERS_EV_SESS_WAKE, NULL, peer);
/* wake up peer handler to handle a request of resync */
appctx_wakeup(peer->appctx);
/* We are going to send updates, let's ensure we will
* come back to send heartbeat messages or to reconnect.
*/
+ TRACE_DEVEL("wakeup peer session to send update", PEERS_EV_SESS_WAKE, NULL, peer);
task->expire = tick_first(peer->reconnect, peer->heartbeat);
appctx_wakeup(peer->appctx);
break;
* Flag it as not alive again for the next period.
*/
peer->flags &= ~PEER_F_ALIVE;
+ TRACE_STATE("unresponsive peer session detected", PEERS_EV_SESS_SHUT, NULL, peer);
peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
}
else {
peer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
peer->heartbeat = TICK_ETERNITY;
+ TRACE_STATE("dead peer session, force shutdown", PEERS_EV_SESS_SHUT, NULL, peer);
peer_session_forceshutdown(peer);
sync_peer_app_state(peers, peer);
peer->no_hbt++;
else if (tick_is_expired(peer->heartbeat, now_ms)) {
peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
peer->flags |= PEER_F_HEARTBEAT;
+ TRACE_DEVEL("wakeup peer session to send heartbeat message", PEERS_EV_SESS_WAKE, NULL, peer);
appctx_wakeup(peer->appctx);
}
task->expire = tick_first(peer->reconnect, peer->heartbeat);