unsigned int me_received:1;
unsigned int handshake_received:1;
unsigned int ignore_host_events:1;
+ unsigned int handshake_sending_hosts:1;
};
static void director_connection_ping(struct director_connection *conn);
net_ip2addr(&user->host->ip),
net_ip2addr(&host->ip));
- /* change the host anyway. we'll also need to remove the user
- from the old host's user_count, because we can't keep track
- of the user for more than one host */
- user->host->user_count--;
- user->host = host;
- user->host->user_count++;
+ /* we want all the directors to redirect the user to same
+ server, but we don't want two directors fighting over which
+ server it belongs to, so always use the lower IP address */
+ if (net_ip_cmp(&user->host->ip, &host->ip) > 0) {
+ /* change the host. we'll also need to remove the user
+ from the old host's user_count, because we can't
+ keep track of the user for more than one host */
+ user->host->user_count--;
+ user->host = host;
+ user->host->user_count++;
+ }
ret = TRUE;
}
*user_r = user;
/* ignore whatever remote sends */
conn->ignore_host_events = TRUE;
}
+ conn->handshake_sending_hosts = TRUE;
return TRUE;
}
if (conn->ignore_host_events) {
/* remote is sending hosts in a handshake, but it doesn't have
a completed ring and we do. */
+ i_assert(conn->handshake_sending_hosts);
return TRUE;
}
}
if (update) {
- /* FIXME: 1) shouldn't be unconditional, 2) if we're not
- handshaking, we should do SYNC before making it visible */
- host->vhost_count = vhost_count;
+ mail_host_set_vhost_count(conn->dir->mail_hosts,
+ host, vhost_count);
director_update_host(conn->dir, conn->host, host);
}
return TRUE;
}
}
- if (dir->left != NULL && dir->right != NULL) {
+ if (dir->left != NULL && dir->right != NULL &&
+ dir->left->handshake_received && dir->right->handshake_received) {
/* we're connected to both directors. see if the ring is
finished by sending a SYNC. if we get it back, it's done. */
dir->sync_seq = ++dir->self_host->last_seq;
/* only incoming connections get DIRECTOR and HOST lists */
if (conn->in && strcmp(cmd, "DIRECTOR") == 0 && conn->me_received)
return director_cmd_director(conn, args);
- if (conn->in && strcmp(cmd, "HOST") == 0 && conn->me_received)
+
+ if (strcmp(cmd, "HOST") == 0) {
+ /* allow hosts from all connections always,
+ this could be an host update */
return director_cmd_host(conn, args);
- if (strcmp(cmd, "HOST-HAND-START") == 0)
- return director_cmd_host_hand_start(conn, args);
- if (strcmp(cmd, "HOST-HAND-END") == 0) {
- conn->ignore_host_events = TRUE;
+ }
+ if (conn->handshake_sending_hosts &&
+ strcmp(cmd, "HOST-HAND-END") == 0) {
+ conn->ignore_host_events = FALSE;
+ conn->handshake_sending_hosts = FALSE;
return TRUE;
}
+ if (conn->in && strcmp(cmd, "HOST-HAND-START") == 0 &&
+ conn->me_received)
+ return director_cmd_host_hand_start(conn, args);
/* only incoming connections get a USER list */
if (conn->in && strcmp(cmd, "USER") == 0 && conn->me_received)
return director_handshake_cmd_user(conn, args);
/* both get DONE */
- if (strcmp(cmd, "DONE") == 0 && !conn->handshake_received) {
+ if (strcmp(cmd, "DONE") == 0 && !conn->handshake_received &&
+ !conn->handshake_sending_hosts) {
director_handshake_cmd_done(conn);
return TRUE;
}
static bool director_connection_sync(struct director_connection *conn,
const char *const *args, const char *line)
{
+ struct director *dir = conn->dir;
struct director_host *host;
struct ip_addr ip;
unsigned int port, seq;
/* find the originating director. if we don't see it, it was already
removed and we can ignore this sync. */
- host = director_host_lookup(conn->dir, &ip, port);
+ host = director_host_lookup(dir, &ip, port);
if (host == NULL)
return TRUE;
if (host->self) {
- if (conn->dir->sync_seq != seq) {
+ if (dir->sync_seq != seq) {
/* stale SYNC event */
return TRUE;
}
- if (conn->dir->ring_handshaked)
+ if (!dir->ring_handshaked) {
+ /* the ring is handshaked */
+ director_set_ring_handshaked(dir);
+ return TRUE;
+ }
+
+ if (dir->ring_synced) {
+ i_error("Received SYNC from %s (seq=%u) "
+ "while already synced", conn->name, seq);
return TRUE;
+ }
- /* the ring is handshaked */
- director_set_ring_handshaked(conn->dir);
+ if (dir->debug) {
+ i_debug("Ring is synced (%s sent seq=%u)",
+ conn->name, seq);
+ }
+ dir->ring_synced = TRUE;
+ director_set_state_changed(dir);
return TRUE;
}
/* forward it to the connection on right */
- if (conn->dir->right != NULL) {
- director_connection_send(conn->dir->right,
+ if (dir->right != NULL) {
+ director_connection_send(dir->right,
t_strconcat(line, "\n", NULL));
}
return TRUE;
director_connection_ping_timeout, conn);
director_connection_send(conn, "PING\n");
}
+
+const char *director_connection_get_name(struct director_connection *conn)
+{
+ return conn->name;
+}
i_debug("Director ring handshaked");
dir->ring_handshaked = TRUE;
+ dir->ring_synced = TRUE;
director_set_state_changed(dir);
}
+static void director_sync(struct director *dir)
+{
+ /* we're synced again, once we receive this SYNC back */
+ dir->sync_seq++;
+ dir->ring_synced = FALSE;
+
+ if (dir->debug) {
+ i_debug("Ring is desynced (seq=%u, sending SYNC to %s)",
+ dir->sync_seq, director_connection_get_name(dir->right));
+ }
+
+ director_connection_send(dir->right, t_strdup_printf(
+ "SYNC\t%s\t%u\t%u\n", net_ip2addr(&dir->self_ip),
+ dir->self_port, dir->sync_seq));
+}
+
void director_update_host(struct director *dir, struct director_host *src,
struct mail_host *host)
{
director_update_send(dir, src, t_strdup_printf(
"HOST\t%s\t%u\n", net_ip2addr(&host->ip), host->vhost_count));
+ director_sync(dir);
}
void director_remove_host(struct director *dir, struct director_host *src,
"HOST-REMOVE\t%s\n", net_ip2addr(&host->ip)));
user_directory_remove_host(dir->users, host);
mail_host_remove(dir->mail_hosts, host);
+ director_sync(dir);
}
void director_flush_host(struct director *dir, struct director_host *src,
director_update_send(dir, src, t_strdup_printf(
"HOST-FLUSH\t%s\n", net_ip2addr(&host->ip)));
user_directory_remove_host(dir->users, host);
+ director_sync(dir);
}
void director_update_user(struct director *dir, struct director_host *src,
dir->state_change_callback = callback;
i_array_init(&dir->dir_hosts, 16);
i_array_init(&dir->pending_requests, 16);
- i_array_init(&dir->desynced_host_changes, 16);
dir->users = user_directory_init(set->director_user_expire);
dir->mail_hosts = mail_hosts_init();
return dir;
timeout_remove(&dir->to_request);
array_foreach(&dir->dir_hosts, hostp)
director_host_free(*hostp);
- array_free(&dir->desynced_host_changes);
array_free(&dir->pending_requests);
array_free(&dir->dir_hosts);
i_free(dir);