#define DIRECTOR_WAIT_DISCONNECT_SECS 10
#define DIRECTOR_HANDSHAKE_WARN_SECS 29
#define DIRECTOR_HANDSHAKE_BYTES_LOG_MIN_SECS (60*30)
+#define DIRECTOR_MAX_SYNC_SEQ_DUPLICATES 4
#if DIRECTOR_CONNECTION_DONE_TIMEOUT_MSECS <= DIRECTOR_CONNECTION_PING_TIMEOUT_MSECS
# error DIRECTOR_CONNECTION_DONE_TIMEOUT_MSECS is too low
dir->sync_seq++;
director_set_ring_unsynced(dir);
director_sync_send(dir, dir->self_host, dir->sync_seq,
- DIRECTOR_VERSION_MINOR);
+ DIRECTOR_VERSION_MINOR, ioloop_time);
}
director_connection_set_ping_timeout(conn);
}
return 0;
}
-static void
+static bool
director_connection_sync_host(struct director_connection *conn,
struct director_host *host,
- uint32_t seq, unsigned int minor_version)
+ uint32_t seq, unsigned int minor_version,
+ unsigned int timestamp)
{
struct director *dir = conn->dir;
if (host->self) {
if (dir->sync_seq != seq) {
/* stale SYNC event */
- return;
+ return FALSE;
}
/* sync_seq increases when we get disconnected, so we must be
successfully connected to both directions */
conn->name, seq);
director_set_ring_synced(dir);
}
- } else if (dir->right != NULL) {
- /* forward it to the connection on right */
- director_sync_send(dir, host, seq, minor_version);
+ } else {
+ if (seq < host->last_sync_seq) {
+ /* stale SYNC event */
+ return FALSE;
+ } else if (host->last_sync_seq != seq ||
+ timestamp < host->last_sync_timestamp) {
+ host->last_sync_seq = seq;
+ host->last_sync_timestamp = timestamp;
+ host->last_sync_seq_counter = 1;
+ } else if (++host->last_sync_seq_counter >
+ DIRECTOR_MAX_SYNC_SEQ_DUPLICATES) {
+ /* we've received this too many times already */
+ return FALSE;
+ }
+
+ if (dir->right != NULL) {
+ /* forward it to the connection on right */
+ director_sync_send(dir, host, seq, minor_version,
+ timestamp);
+ }
}
+ return TRUE;
}
static bool director_connection_sync(struct director_connection *conn,
struct director *dir = conn->dir;
struct director_host *host;
struct ip_addr ip;
- unsigned int port, seq, minor_version = 0;
+ unsigned int port, seq, minor_version = 0, timestamp = ioloop_time;
if (str_array_length(args) < 3 ||
!director_args_parse_ip_port(conn, args, &ip, &port) ||
director_cmd_error(conn, "Invalid parameters");
return FALSE;
}
- if (args[3] != NULL)
+ if (args[3] != NULL) {
minor_version = atoi(args[3]);
+ if (args[4] != NULL && str_to_uint(args[4], ×tamp) < 0) {
+ director_cmd_error(conn, "Invalid parameters");
+ return FALSE;
+ }
+ }
/* find the originating director. if we don't see it, it was already
removed and we can ignore this sync. */
host = director_host_lookup(dir, &ip, port);
if (host != NULL) {
- director_connection_sync_host(conn, host, seq,
- minor_version);
+ if (!director_connection_sync_host(conn, host, seq,
+ minor_version, timestamp))
+ return TRUE;
}
- if (host == NULL || !host->self)
+ if ((host == NULL || !host->self) &&
+ dir->self_host->last_sync_timestamp != ioloop_time)
(void)director_resend_sync(dir);
return TRUE;
}
}
void director_sync_send(struct director *dir, struct director_host *host,
- uint32_t seq, unsigned int minor_version)
+ uint32_t seq, unsigned int minor_version,
+ unsigned int timestamp)
{
string_t *str;
net_ip2addr(&host->ip), host->port, seq);
if (minor_version > 0 &&
director_connection_get_minor_version(dir->right) > 0) {
- /* only minor_version>0 supports this parameter */
- str_printfa(str, "\t%u", minor_version);
+ /* only minor_version>0 supports extra parameters */
+ str_printfa(str, "\t%u\t%u", minor_version, timestamp);
}
str_append_c(str, '\n');
director_connection_send(dir->right, str_c(str));
{
if (!dir->ring_synced && dir->left != NULL && dir->right != NULL) {
/* send a new SYNC in case the previous one got dropped */
+ dir->self_host->last_sync_timestamp = ioloop_time;
director_sync_send(dir, dir->self_host, dir->sync_seq,
- DIRECTOR_VERSION_MINOR);
+ DIRECTOR_VERSION_MINOR, ioloop_time);
if (dir->to_sync != NULL)
timeout_reset(dir->to_sync);
return TRUE;
director_connection_set_synced(dir->left, FALSE);
director_connection_set_synced(dir->right, FALSE);
director_sync_send(dir, dir->self_host, dir->sync_seq,
- DIRECTOR_VERSION_MINOR);
+ DIRECTOR_VERSION_MINOR, ioloop_time);
}
void director_sync_freeze(struct director *dir)