#include <common/compat.h>
#include <common/config.h>
#include <common/time.h>
+#include <common/standard.h>
#include <types/global.h>
#include <types/listener.h>
#define PEER_F_TEACH_COMPLETE 0x00000010 /* All that we know already taught to current peer, used only for a local peer */
#define PEER_F_LEARN_ASSIGN 0x00000100 /* Current peer was assigned for a lesson */
#define PEER_F_LEARN_NOTUP2DATE 0x00000200 /* Learn from peer finished but peer is not up to date */
+#define PEER_F_DWNGRD 0x80000000 /* When this flag is enabled, we must downgrade the supported version announced during peer sessions. */
#define PEER_TEACH_RESET ~(PEER_F_TEACH_PROCESS|PEER_F_TEACH_FINISHED) /* PEER_F_TEACH_COMPLETE should never be reset */
#define PEER_LEARN_RESET ~(PEER_F_LEARN_ASSIGN|PEER_F_LEARN_NOTUP2DATE)
PEER_MSG_STKT_DEFINE,
PEER_MSG_STKT_SWITCH,
PEER_MSG_STKT_ACK,
+ PEER_MSG_STKT_UPDATE_TIMED,
+ PEER_MSG_STKT_INCUPDATE_TIMED,
};
/**********************************/
#define PEER_SESS_SC_ERRPEER 504 /* unknown peer */
#define PEER_SESSION_PROTO_NAME "HAProxyS"
+#define PEER_MAJOR_VER 2
+#define PEER_MINOR_VER 1
+#define PEER_DWNGRD_MINOR_VER 0
struct peers *peers = NULL;
static void peer_session_forceshutdown(struct stream * stream);
return i;
}
+/* Set the stick-table UPDATE message type byte at <msg_type> address,
+ * depending on <use_identifier> and <use_timed> boolean parameters.
+ * Always successful.
+ */
+static inline void peer_set_update_msg_type(char *msg_type, int use_identifier, int use_timed)
+{
+ if (use_timed) {
+ if (use_identifier)
+ *msg_type = PEER_MSG_STKT_UPDATE_TIMED;
+ else
+ *msg_type = PEER_MSG_STKT_INCUPDATE_TIMED;
+ }
+ else {
+ if (use_identifier)
+ *msg_type = PEER_MSG_STKT_UPDATE;
+ else
+ *msg_type = PEER_MSG_STKT_INCUPDATE;
+ }
+
+}
/*
* This prepare the data update message on the stick session <ts>, <st> is the considered
* stick table.
* If function returns 0, the caller should consider we were unable to encode this message (TODO:
* check size)
*/
-static int peer_prepare_updatemsg(struct stksess *ts, struct shared_table *st, char *msg, size_t size, int use_identifier)
+static int peer_prepare_updatemsg(struct stksess *ts, struct shared_table *st, char *msg, size_t size, int use_identifier, int use_timed)
{
uint32_t netinteger;
unsigned short datalen;
cursor += sizeof(netinteger);
}
+ if (use_timed) {
+ netinteger = htonl(tick_remain(now_ms, ts->expire));
+ memcpy(cursor, &netinteger, sizeof(netinteger));
+ cursor += sizeof(netinteger);
+ }
+
/* encode the key */
if (st->table->type == SMP_T_STR) {
int stlen = strlen((char *)ts->key.key);
/* prepare message header */
msg[0] = PEER_MSG_CLASS_STICKTABLE;
- if (use_identifier)
- msg[1] = PEER_MSG_STKT_UPDATE;
- else
- msg[1] = PEER_MSG_STKT_INCUPDATE;
-
+ peer_set_update_msg_type(&msg[1], use_identifier, use_timed);
cursor = &msg[2];
intencode(datalen, &cursor);
}
}
+/* Retrieve the major and minor versions of peers protocol
+ * announced by a remote peer. <str> is a null-terminated
+ * string with the following format: "<maj_ver>.<min_ver>".
+ */
+static int peer_get_version(const char *str,
+ unsigned int *maj_ver, unsigned int *min_ver)
+{
+ unsigned int majv, minv;
+ const char *pos, *saved;
+ const char *end;
+
+ saved = pos = str;
+ end = str + strlen(str);
+
+ majv = read_uint(&pos, end);
+ if (saved == pos || *pos++ != '.')
+ return -1;
+
+ saved = pos;
+ minv = read_uint(&pos, end);
+ if (saved == pos || pos != end)
+ return -1;
+
+ *maj_ver = majv;
+ *min_ver = minv;
+
+ return 0;
+}
/*
* IO Handler to handle message exchance with a peer
struct peers *curpeers = strm_fe(s)->parent;
int reql = 0;
int repl = 0;
+ size_t proto_len = strlen(PEER_SESSION_PROTO_NAME);
+ unsigned int maj_ver, min_ver;
while (1) {
switchstate:
+ maj_ver = min_ver = (unsigned int)-1;
switch(appctx->st0) {
case PEER_SESS_ST_ACCEPT:
appctx->ctx.peers.ptr = NULL;
bo_skip(si_oc(si), reql);
- /* test version */
- if (strcmp(PEER_SESSION_PROTO_NAME " 2.0", trash.str) != 0) {
+ /* test protocol */
+ if (strncmp(PEER_SESSION_PROTO_NAME " ", trash.str, proto_len + 1) != 0) {
+ appctx->st0 = PEER_SESS_ST_EXIT;
+ appctx->st1 = PEER_SESS_SC_ERRPROTO;
+ goto switchstate;
+ }
+ if (peer_get_version(trash.str + proto_len + 1, &maj_ver, &min_ver) == -1 ||
+ maj_ver != PEER_MAJOR_VER || min_ver > PEER_MINOR_VER) {
appctx->st0 = PEER_SESS_ST_EXIT;
appctx->st1 = PEER_SESS_SC_ERRVERSION;
- /* test protocol */
- if (strncmp(PEER_SESSION_PROTO_NAME " ", trash.str, strlen(PEER_SESSION_PROTO_NAME)+1) != 0)
- appctx->st1 = PEER_SESS_SC_ERRPROTO;
goto switchstate;
}
}
peer_session_forceshutdown(curpeer->stream);
}
+ if (maj_ver != (unsigned int)-1 && min_ver != (unsigned int)-1) {
+ if (min_ver == PEER_DWNGRD_MINOR_VER) {
+ curpeer->flags |= PEER_F_DWNGRD;
+ }
+ else {
+ curpeer->flags &= ~PEER_F_DWNGRD;
+ }
+ }
curpeer->stream = s;
curpeer->appctx = appctx;
appctx->ctx.peers.ptr = curpeer;
/* Send headers */
repl = snprintf(trash.str, trash.size,
- PEER_SESSION_PROTO_NAME " 2.0\n%s\n%s %d %d\n",
+ PEER_SESSION_PROTO_NAME " %u.%u\n%s\n%s %d %d\n",
+ PEER_MAJOR_VER,
+ (curpeer->flags & PEER_F_DWNGRD) ? PEER_DWNGRD_MINOR_VER : PEER_MINOR_VER,
curpeer->id,
localpeer,
(int)getpid(),
}
else {
+ if (curpeer->statuscode == PEER_SESS_SC_ERRVERSION)
+ curpeer->flags |= PEER_F_DWNGRD;
/* Status code is not success, abort */
appctx->st0 = PEER_SESS_ST_END;
goto switchstate;
}
else if (msg_head[1] == PEER_MSG_STKT_UPDATE
- || msg_head[1] == PEER_MSG_STKT_INCUPDATE) {
+ || msg_head[1] == PEER_MSG_STKT_INCUPDATE
+ || msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED
+ || msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED) {
struct shared_table *st = curpeer->remote_table;
uint32_t update;
+ int expire;
unsigned int data_type;
void *data_ptr;
if (!st)
goto ignore_msg;
- if (msg_head[1] == PEER_MSG_STKT_UPDATE) {
+ expire = MS_TO_TICKS(st->table->expire);
+
+ if (msg_head[1] == PEER_MSG_STKT_UPDATE ||
+ msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED) {
if (msg_len < sizeof(update)) {
/* malformed message */
appctx->st0 = PEER_SESS_ST_ERRPROTO;
st->last_get++;
}
+ if (msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED ||
+ msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED) {
+ size_t expire_sz = sizeof expire;
+
+ if (msg_cur + expire_sz > msg_end) {
+ appctx->st0 = PEER_SESS_ST_ERRPROTO;
+ goto switchstate;
+ }
+ memcpy(&expire, msg_cur, expire_sz);
+ msg_cur += expire_sz;
+ expire = ntohl(expire);
+ }
+
newts = stksess_new(st->table, NULL);
if (!newts)
goto ignore_msg;
ts = stktable_lookup(st->table, newts);
if (ts) {
/* the entry already exist, we can free ours */
- stktable_touch(st->table, ts, 0);
+ stktable_touch_with_exp(st->table, ts, 0, tick_add(now_ms, expire));
stksess_free(st->table, newts);
newts = NULL;
}
struct eb32_node *eb;
/* create new entry */
- ts = stktable_store(st->table, newts, 0);
+ ts = stktable_store_with_exp(st->table, newts, 0, tick_add(now_ms, expire));
newts = NULL; /* don't reuse it */
ts->upd.key= (++st->table->update)+(2147483648U);
}
ts = eb32_entry(eb, struct stksess, upd);
- msglen = peer_prepare_updatemsg(ts, st, trash.str, trash.size, new_pushed);
+ msglen = peer_prepare_updatemsg(ts, st, trash.str, trash.size, new_pushed, 0);
if (!msglen) {
/* internal error: message does not fit in trash */
appctx->st0 = PEER_SESS_ST_END;
while (1) {
uint32_t msglen;
struct stksess *ts;
+ int use_timed;
/* push local updates */
if (!eb) {
}
ts = eb32_entry(eb, struct stksess, upd);
- msglen = peer_prepare_updatemsg(ts, st, trash.str, trash.size, new_pushed);
+ use_timed = !(curpeer->flags & PEER_F_DWNGRD);
+ msglen = peer_prepare_updatemsg(ts, st, trash.str, trash.size, new_pushed, use_timed);
if (!msglen) {
/* internal error: message does not fit in trash */
appctx->st0 = PEER_SESS_ST_END;
while (1) {
uint32_t msglen;
struct stksess *ts;
+ int use_timed;
/* push local updates */
if (!eb || eb->key > st->teaching_origin) {
}
ts = eb32_entry(eb, struct stksess, upd);
- msglen = peer_prepare_updatemsg(ts, st, trash.str, trash.size, new_pushed);
+ use_timed = !(curpeer->flags & PEER_F_DWNGRD);
+ msglen = peer_prepare_updatemsg(ts, st, trash.str, trash.size, new_pushed, use_timed);
if (!msglen) {
/* internal error: message does not fit in trash */
appctx->st0 = PEER_SESS_ST_END;
/* Update the expiration timer for <ts> but do not touch its expiration node.
* The table's expiration timer is updated if set.
*/
-struct stksess *stktable_touch(struct stktable *t, struct stksess *ts, int local)
+struct stksess *stktable_touch_with_exp(struct stktable *t, struct stksess *ts,
+ int local, int expire)
{
struct eb32_node * eb;
- ts->expire = tick_add(now_ms, MS_TO_TICKS(t->expire));
+ ts->expire = expire;
if (t->expire) {
t->exp_task->expire = t->exp_next = tick_first(ts->expire, t->exp_next);
task_queue(t->exp_task);
return ts;
}
+/* Update the expiration timer for <ts> but do not touch its expiration node.
+ * The table's expiration timer is updated if set. The date of expiration coming from
+ * <t> stick-table configuration.
+ */
+struct stksess *stktable_touch(struct stktable *t, struct stksess *ts, int local)
+{
+ int expire = tick_add(now_ms, MS_TO_TICKS(t->expire));
+
+ return stktable_touch_with_exp(t, ts, local, expire);
+}
+
/* Insert new sticky session <ts> in the table. It is assumed that it does not
* yet exist (the caller must check this). The table's timeout is updated if it
* is set. <ts> is returned.
return ts;
}
+/* Same function as stktable_store(), but with <expire> as supplementary argument
+ * to set the date of expiration of <ts> new sticky session thanks to
+ * stktable_touch_with_exp().
+ */
+struct stksess *stktable_store_with_exp(struct stktable *t, struct stksess *ts,
+ int local, int expire)
+{
+ ebmb_insert(&t->keys, &ts->key, t->key_size);
+ stktable_touch_with_exp(t, ts, local, expire);
+ ts->exp.key = ts->expire;
+ eb32_insert(&t->exps, &ts->exp);
+ return ts;
+}
+
/* Returns a valid or initialized stksess for the specified stktable_key in the
* specified table, or NULL if the key was NULL, or if no entry was found nor
* could be created. The entry's expiration is updated.