static struct auth_connection *auth_connections;
+static void auth_connection_disconnected(struct auth_connection **conn);
+
static void auth_connection_input(struct auth_connection *conn)
{
char *line;
return;
case -1:
/* disconnected */
- auth_connection_deinit(&conn);
+ auth_connection_disconnected(&conn);
return;
case -2:
/* buffer full */
i_error("BUG: Auth server sent us more than %d bytes",
(int)AUTH_CLIENT_MAX_LINE_LENGTH);
- auth_connection_deinit(&conn);
+ auth_connection_disconnected(&conn);
return;
}
if (close(conn->fd) < 0)
i_error("close(auth connection) failed: %m");
- conn->callback(NULL, conn->context);
}
i_free(conn->path);
i_free(conn);
}
+static void auth_connection_disconnected(struct auth_connection **_conn)
+{
+ struct auth_connection *conn = *_conn;
+
+ *_conn = NULL;
+ /* notify callback. it should deinit this connection */
+ conn->callback(NULL, conn->context);
+}
+
void auth_connection_send(struct auth_connection *conn,
const void *data, size_t size)
{
while (auth_connections != NULL) {
struct auth_connection *conn = auth_connections;
- auth_connection_deinit(&conn);
+ auth_connection_disconnected(&conn);
}
}
#define AUTH_CONNECTION_H
/* Called for each input line. This is also called with line=NULL if
- connection gets disonnected. */
+ connection gets disconnected. */
typedef void auth_input_callback(const char *line, void *context);
struct auth_connection *auth_connection_init(const char *path);
#include "istream.h"
#include "ostream.h"
#include "str.h"
+#include "llist.h"
#include "master-service.h"
#include "mail-host.h"
#include "director.h"
#define MAX_INBUF_SIZE 1024
#define MAX_OUTBUF_SIZE (1024*1024*10)
#define OUTBUF_FLUSH_THRESHOLD (1024*128)
+/* Max idling time while connecting/handshaking before disconnecting */
+#define DIRECTOR_CONNECTION_INIT_TIMEOUT_MSECS (2*1000)
/* How long to wait for PONG after PING request */
#define DIRECTOR_CONNECTION_PING_TIMEOUT_MSECS (2*1000)
/* How long to wait to send PING when connection is idle */
#define DIRECTOR_CONNECTION_PING_INTERVAL_MSECS (15*1000)
+/* How long to wait before sending PING while waiting for SYNC reply */
+#define DIRECTOR_CONNECTION_SYNC_TIMEOUT_MSECS 1000
struct director_connection {
+ struct director_connection *prev, *next;
+
struct director *dir;
- const char *name;
+ char *name;
/* for incoming connections the director host isn't known until
ME-line is received */
unsigned int ignore_host_events:1;
unsigned int handshake_sending_hosts:1;
unsigned int ping_waiting:1;
+ unsigned int sync_ping:1;
};
static void director_connection_ping(struct director_connection *conn);
+static void director_connection_disconnected(struct director_connection **conn);
static bool
director_args_parse_ip_port(struct director_connection *conn,
if (!conn->in)
return TRUE;
+ i_free(conn->name);
+ conn->name = i_strdup_printf("%s/left", host->name);
conn->host = host;
+ /* make sure we don't keep old sequence values across restarts */
+ host->last_seq = 0;
+
connect_str = t_strdup_printf("CONNECT\t%s\t%u\n",
net_ip2addr(&host->ip), host->port);
/* make sure this is the correct incoming connection */
one, but before that tell it to connect to the new one.
that message might not reach it, so also send the same
message to right side. */
+ i_warning("Replacing director connection %s with %s",
+ dir->left->host->name, host->name);
director_connection_send(dir->left, connect_str);
(void)o_stream_flush(dir->left->output);
director_connection_deinit(&dir->left);
return TRUE;
}
+static bool
+director_cmd_user(struct director_connection *conn, const char *const *args)
+{
+ unsigned int username_hash;
+ struct ip_addr ip;
+ struct mail_host *host;
+ struct user *user;
+
+ if (str_array_length(args) != 2 ||
+ str_to_uint(args[0], &username_hash) < 0 ||
+ net_addr2ip(args[1], &ip) < 0) {
+ i_error("director(%s): Invalid USER args", conn->name);
+ return FALSE;
+ }
+
+ host = mail_host_lookup(conn->dir->mail_hosts, &ip);
+ if (host == NULL) {
+ /* we probably just removed this host. */
+ return TRUE;
+ }
+
+ if (director_user_refresh(conn->dir, username_hash,
+ host, ioloop_time, &user))
+ director_update_user(conn->dir, conn->host, user);
+ return TRUE;
+}
+
static bool director_cmd_director(struct director_connection *conn,
const char *const *args)
{
hosts = mail_hosts_get(conn->dir->mail_hosts);
while (array_count(hosts) > 0) {
hostp = array_idx(hosts, 0);
- director_remove_host(conn->dir, conn->host, *hostp);
+ director_remove_host(conn->dir, NULL, NULL, *hostp);
}
} else if (!remote_ring_completed && conn->dir->ring_handshaked) {
/* ignore whatever remote sends */
return TRUE;
}
+static int
+director_cmd_is_seen(struct director_connection *conn,
+ const char *const **_args,
+ struct director_host **host_r)
+{
+ const char *const *args = *_args;
+ struct ip_addr ip;
+ unsigned int port, seq;
+ struct director_host *host;
+
+ if (str_array_length(args) < 3 ||
+ net_addr2ip(args[0], &ip) < 0 ||
+ str_to_uint(args[1], &port) < 0 ||
+ str_to_uint(args[2], &seq) < 0) {
+ i_error("director(%s): Command is missing parameters",
+ conn->name);
+ return -1;
+ }
+ *_args = args + 3;
+
+ host = director_host_lookup(conn->dir, &ip, port);
+ if (host == NULL) {
+ /* director is already gone, but we can't be sure if this
+ command was sent everywhere. re-send it as if it was from
+ ourself. */
+ *host_r = NULL;
+ } else {
+ if (seq <= host->last_seq) {
+ /* already seen this */
+ return 1;
+ }
+ *host_r = host;
+ host->last_seq = seq;
+ }
+ return 0;
+}
+
static bool
-director_cmd_host(struct director_connection *conn, const char *const *args)
+director_cmd_host_int(struct director_connection *conn, const char *const *args,
+ struct director_host *dir_host)
{
struct mail_host *host;
struct ip_addr ip;
if (update) {
mail_host_set_vhost_count(conn->dir->mail_hosts,
host, vhost_count);
- director_update_host(conn->dir, conn->host, host);
+ director_update_host(conn->dir, conn->host, dir_host, host);
}
return TRUE;
}
+static bool
+director_cmd_host_handshake(struct director_connection *conn,
+ const char *const *args)
+{
+ return director_cmd_host_int(conn, args, NULL);
+}
+
+static bool
+director_cmd_host(struct director_connection *conn, const char *const *args)
+{
+ struct director_host *dir_host;
+ int ret;
+
+ if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
+ return ret > 0;
+ return director_cmd_host_int(conn, args, dir_host);
+}
+
static bool
director_cmd_host_remove(struct director_connection *conn,
const char *const *args)
{
+ struct director_host *dir_host;
struct mail_host *host;
struct ip_addr ip;
+ int ret;
+
+ if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
+ return ret > 0;
if (str_array_length(args) != 1 ||
net_addr2ip(args[0], &ip) < 0) {
host = mail_host_lookup(conn->dir->mail_hosts, &ip);
if (host != NULL)
- director_remove_host(conn->dir, conn->host, host);
+ director_remove_host(conn->dir, conn->host, dir_host, host);
return TRUE;
}
director_cmd_host_flush(struct director_connection *conn,
const char *const *args)
{
+ struct director_host *dir_host;
struct mail_host *host;
struct ip_addr ip;
+ unsigned int seq;
+ int ret;
- if (str_array_length(args) != 1 ||
- net_addr2ip(args[0], &ip) < 0) {
+ if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
+ return ret > 0;
+
+ if (str_array_length(args) != 2 ||
+ net_addr2ip(args[0], &ip) < 0 ||
+ str_to_uint(args[1], &seq) < 0) {
i_error("director(%s): Invalid HOST-FLUSH args", conn->name);
return FALSE;
}
host = mail_host_lookup(conn->dir->mail_hosts, &ip);
if (host != NULL)
- director_flush_host(conn->dir, conn->host, host);
+ director_flush_host(conn->dir, conn->host, dir_host, host);
return TRUE;
}
dir->left->handshake_received && dir->right->handshake_received) {
/* we're connected to both directors. see if the ring is
finished by sending a SYNC. if we get it back, it's done. */
- dir->sync_seq = ++dir->self_host->last_seq;
+ dir->sync_seq++;
director_connection_send(dir->right,
t_strdup_printf("SYNC\t%s\t%u\t%u\n",
net_ip2addr(&dir->self_ip),
dir->self_port, dir->sync_seq));
}
- if (conn->to_ping == NULL) {
- conn->to_ping = timeout_add(DIRECTOR_CONNECTION_PING_INTERVAL_MSECS,
- director_connection_ping, conn);
- }
+ if (conn->to_ping != NULL)
+ timeout_remove(&conn->to_ping);
+ conn->to_ping = timeout_add(DIRECTOR_CONNECTION_PING_INTERVAL_MSECS,
+ director_connection_ping, conn);
}
static bool
if (strcmp(cmd, "HOST") == 0) {
/* allow hosts from all connections always,
this could be an host update */
- return director_cmd_host(conn, args);
+ if (conn->handshake_sending_hosts)
+ return director_cmd_host_handshake(conn, args);
+ else
+ return director_cmd_host(conn, args);
}
if (conn->handshake_sending_hosts &&
strcmp(cmd, "HOST-HAND-END") == 0) {
conn->me_received)
return director_cmd_host_hand_start(conn, args);
- /* only incoming connections get a USER list */
- if (conn->in && strcmp(cmd, "USER") == 0 && conn->me_received)
- return director_handshake_cmd_user(conn, args);
+ /* only incoming connections get a full USER list, but outgoing
+ connections can also receive USER updates during handshake and
+ it wouldn't be safe to ignore them. */
+ if (strcmp(cmd, "USER") == 0 && conn->me_received) {
+ if (conn->in)
+ return director_handshake_cmd_user(conn, args);
+ else
+ return director_cmd_user(conn, args);
+ }
/* both get DONE */
if (strcmp(cmd, "DONE") == 0 && !conn->handshake_received &&
!conn->handshake_sending_hosts) {
director_handshake_cmd_done(conn);
return TRUE;
}
- i_error("director(%s): Invalid handshake command: %s",
- conn->name, cmd);
+ i_error("director(%s): Invalid handshake command: %s "
+ "(in=%d me_received=%d)", conn->name, cmd,
+ conn->in, conn->me_received);
return FALSE;
}
-static bool
-director_cmd_user(struct director_connection *conn, const char *const *args)
-{
- unsigned int username_hash;
- struct ip_addr ip;
- struct mail_host *host;
- struct user *user;
-
- if (str_array_length(args) != 2 ||
- str_to_uint(args[0], &username_hash) < 0 ||
- net_addr2ip(args[1], &ip) < 0) {
- i_error("director(%s): Invalid USER args", conn->name);
- return FALSE;
- }
-
- host = mail_host_lookup(conn->dir->mail_hosts, &ip);
- if (host == NULL) {
- /* we probably just removed this host. */
- return TRUE;
- }
-
- if (director_user_refresh(conn->dir, username_hash,
- host, ioloop_time, &user))
- director_update_user(conn->dir, conn->host, user);
- return TRUE;
-}
-
static bool director_connection_sync(struct director_connection *conn,
const char *const *args, const char *line)
{
/* stale SYNC event */
return TRUE;
}
+
if (!dir->ring_handshaked) {
/* the ring is handshaked */
director_set_ring_handshaked(dir);
- return TRUE;
- }
-
- if (dir->ring_synced) {
+ } else if (dir->ring_synced) {
i_error("Received SYNC from %s (seq=%u) "
"while already synced", conn->name, seq);
return TRUE;
+ } else {
+ if (dir->debug) {
+ i_debug("Ring is synced (%s sent seq=%u)",
+ conn->name, seq);
+ }
+ director_set_ring_synced(dir);
}
-
- if (dir->debug) {
- i_debug("Ring is synced (%s sent seq=%u)",
- conn->name, seq);
- }
- dir->ring_synced = TRUE;
- director_set_state_changed(dir);
return TRUE;
}
}
}
- /* connect here, disconnect old one */
- if (dir->right != NULL)
- director_connection_deinit(&dir->right);
-
+ /* connect here */
(void)director_connect_host(dir, host);
return TRUE;
}
i_error("director(%s): Received empty line", conn->name);
return FALSE;
}
+
+ /* ping/pong is always handled */
+ if (strcmp(cmd, "PING") == 0) {
+ director_connection_send(conn, "PONG\n");
+ return TRUE;
+ }
+ if (strcmp(cmd, "PONG") == 0)
+ return director_cmd_pong(conn);
+
if (!conn->handshake_received) {
if (!director_connection_handle_handshake(conn, cmd, args)) {
/* invalid commands during handshake,
if (strcmp(cmd, "CONNECT") == 0)
return director_cmd_connect(conn, args);
- if (strcmp(cmd, "PING") == 0) {
- director_connection_send(conn, "PONG\n");
- return TRUE;
- }
- if (strcmp(cmd, "PONG") == 0)
- return director_cmd_pong(conn);
i_error("director(%s): Unknown command (in this state): %s",
conn->name, cmd);
return FALSE;
static void director_connection_input(struct director_connection *conn)
{
+ struct director *dir = conn->dir;
char *line;
bool ret;
return;
case -1:
/* disconnected */
- director_connection_deinit(&conn);
+ i_error("Director %s disconnected%s", conn->name,
+ conn->handshake_received ? "" :
+ " before handshake finished");
+ director_connection_disconnected(&conn);
return;
case -2:
/* buffer full */
- i_error("BUG: Director sent us more than %d bytes",
- MAX_INBUF_SIZE);
- director_connection_deinit(&conn);
+ i_error("BUG: Director %s sent us more than %d bytes",
+ conn->name, MAX_INBUF_SIZE);
+ director_connection_disconnected(&conn);
return;
}
+ director_sync_freeze(dir);
while ((line = i_stream_next_line(conn->input)) != NULL) {
T_BEGIN {
ret = director_connection_handle_line(conn, line);
} T_END;
if (!ret) {
- director_connection_deinit(&conn);
+ director_connection_disconnected(&conn);
break;
}
}
+ director_sync_thaw(dir);
}
static void director_connection_send_directors(struct director_connection *conn,
return o_stream_flush(conn->output);
}
+static void
+director_connection_init_timeout(struct director_connection *conn)
+{
+ if (conn->host != NULL)
+ conn->host->last_failed = ioloop_time;
+ if (!conn->connected)
+ i_error("director(%s): Connect timed out", conn->name);
+ else
+ i_error("director(%s): Handshaking timed out", conn->name);
+ director_connection_disconnected(&conn);
+}
+
static struct director_connection *
director_connection_init_common(struct director *dir, int fd)
{
conn->output = o_stream_create_fd(conn->fd, MAX_OUTBUF_SIZE, FALSE);
o_stream_set_flush_callback(conn->output,
director_connection_output, conn);
+ conn->to_ping = timeout_add(DIRECTOR_CONNECTION_INIT_TIMEOUT_MSECS,
+ director_connection_init_timeout, conn);
+ DLLIST_PREPEND(&dir->connections, conn);
return conn;
}
}
struct director_connection *
-director_connection_init_in(struct director *dir, int fd)
+director_connection_init_in(struct director *dir, int fd,
+ const struct ip_addr *ip)
{
struct director_connection *conn;
conn = director_connection_init_common(dir, fd);
conn->in = TRUE;
conn->connected = TRUE;
- conn->name = "<incoming>";
+ conn->name = i_strdup_printf("%s/in", net_ip2addr(ip));
conn->io = io_add(conn->fd, IO_READ, director_connection_input, conn);
director_connection_send_handshake(conn);
conn->host->last_failed = ioloop_time;
i_error("director(%s): connect() failed: %s", conn->name,
strerror(err));
- director_connection_deinit(&conn);
-
- /* try connecting to next server */
- director_connect(dir);
+ director_connection_disconnected(&conn);
return;
}
conn->connected = TRUE;
+ if (dir->right != NULL) {
+ /* see if we should disconnect or keep the existing
+ connection. */
+ if (director_host_cmp_to_self(conn->host, dir->right->host,
+ dir->self_host) <= 0) {
+ /* the old connection is the correct one */
+ director_connection_deinit(&conn);
+ return;
+ }
+ director_connection_deinit(&dir->right);
+ }
+ dir->right = conn;
+ i_free(conn->name);
+ conn->name = i_strdup_printf("%s/right", conn->host->name);
+
io_remove(&conn->io);
director_connection_send_handshake(conn);
{
struct director_connection *conn;
+ /* make sure we don't keep old sequence values across restarts */
+ host->last_seq = 0;
+
conn = director_connection_init_common(dir, fd);
- conn->name = host->name;
+ conn->name = i_strdup_printf("%s/out", host->name);
conn->host = host;
- conn->io = io_add(conn->fd, IO_WRITE,
+ /* use IO_READ instead of IO_WRITE, so that we don't assign
+ dir->right until remote has actually sent some data */
+ conn->io = io_add(conn->fd, IO_READ,
director_connection_connected, conn);
return conn;
}
void director_connection_deinit(struct director_connection **_conn)
{
struct director_connection *conn = *_conn;
+ struct director *dir = conn->dir;
*_conn = NULL;
- if (conn->dir->debug && conn->host != NULL) {
- i_debug("Director %s:%u disconnected",
- net_ip2addr(&conn->host->ip), conn->host->port);
- }
-
- if (conn->dir->left == conn)
- conn->dir->left = NULL;
- if (conn->dir->right == conn)
- conn->dir->right = NULL;
+ DLLIST_REMOVE(&dir->connections, conn);
+ if (dir->left == conn)
+ dir->left = NULL;
+ if (dir->right == conn)
+ dir->right = NULL;
+ if (conn->user_iter != NULL)
+ user_directory_iter_deinit(&conn->user_iter);
if (conn->to != NULL)
timeout_remove(&conn->to);
if (conn->to_ping != NULL)
if (conn->in)
master_service_client_connection_destroyed(master_service);
+ i_free(conn->name);
i_free(conn);
+
+ if (dir->left == NULL || dir->right == NULL) {
+ /* we aren't synced until we're again connected to a ring */
+ dir->sync_seq++;
+ dir->ring_synced = FALSE;
+ }
+}
+
+void director_connection_disconnected(struct director_connection **_conn)
+{
+ struct director_connection *conn = *_conn;
+ struct director *dir = conn->dir;
+
+ director_connection_deinit(_conn);
+ if (dir->right == NULL)
+ director_connect(dir);
}
static void director_connection_timeout(struct director_connection *conn)
{
- director_connection_deinit(&conn);
+ director_connection_disconnected(&conn);
}
void director_connection_send(struct director_connection *conn,
static void director_connection_ping_timeout(struct director_connection *conn)
{
i_error("director(%s): Ping timed out, disconnecting", conn->name);
- director_connection_deinit(&conn);
+ director_connection_disconnected(&conn);
}
static void director_connection_ping(struct director_connection *conn)
{
+ conn->sync_ping = FALSE;
if (conn->ping_waiting)
return;
- timeout_remove(&conn->to_ping);
+ if (conn->to_ping != NULL)
+ timeout_remove(&conn->to_ping);
conn->to_ping = timeout_add(DIRECTOR_CONNECTION_PING_TIMEOUT_MSECS,
director_connection_ping_timeout, conn);
director_connection_send(conn, "PING\n");
{
return conn->name;
}
+
+struct director_host *
+director_connection_get_host(struct director_connection *conn)
+{
+ return conn->host;
+}
+
+struct director_connection *
+director_connection_find_outgoing(struct director *dir,
+ struct director_host *host)
+{
+ struct director_connection *conn;
+
+ for (conn = dir->connections; conn != NULL; conn = conn->next) {
+ if (conn->host == host && !conn->in)
+ return conn;
+ }
+ return NULL;
+}
+
+void director_connection_cork(struct director_connection *conn)
+{
+ o_stream_cork(conn->output);
+}
+
+void director_connection_uncork(struct director_connection *conn)
+{
+ o_stream_uncork(conn->output);
+}
+
+void director_connection_wait_sync(struct director_connection *conn)
+{
+ /* switch to faster ping timeout. avoid reseting the timeout if it's
+ already fast. */
+ if (conn->ping_waiting || conn->sync_ping)
+ return;
+
+ if (conn->to_ping != NULL)
+ timeout_remove(&conn->to_ping);
+ conn->to_ping = timeout_add(DIRECTOR_CONNECTION_SYNC_TIMEOUT_MSECS,
+ director_connection_ping, conn);
+ conn->sync_ping = TRUE;
+}
struct director;
struct director_connection *
-director_connection_init_in(struct director *dir, int fd);
+director_connection_init_in(struct director *dir, int fd,
+ const struct ip_addr *ip);
struct director_connection *
director_connection_init_out(struct director *dir, int fd,
struct director_host *host);
void director_connection_send(struct director_connection *conn,
const char *data);
+void director_connection_wait_sync(struct director_connection *conn);
void director_connection_send_except(struct director_connection *conn,
struct director_host *skip_host,
const char *data);
const char *director_connection_get_name(struct director_connection *conn);
+struct director_host *
+director_connection_get_host(struct director_connection *conn);
+struct director_connection *
+director_connection_find_outgoing(struct director *dir,
+ struct director_host *host);
+
+void director_connection_cork(struct director_connection *conn);
+void director_connection_uncork(struct director_connection *conn);
#endif
/* name contains "ip:port" */
char *name;
-
- /* each command between directors contains an increasing sequence.
- if director A gets conflicting information about director B, it can
- trust the one that has the highest sequence. */
+ /* change commands each have originating host and originating sequence.
+ we'll keep track of the highest sequence we've seen from the host.
+ if we find a lower sequence, we've already handled the command and
+ it can be ignored (or: it must be ignored to avoid potential command
+ loops) */
unsigned int last_seq;
-
/* Last time host was detected to be down/broken */
time_t last_failed;
-
/* we are this director */
unsigned int self:1;
};
#include "director-request.h"
#define DIRECTOR_REQUEST_TIMEOUT_SECS 30
+#define RING_NOCONN_WARNING_DELAY_MSECS (2*1000)
struct director_request {
struct director *dir;
array_append(&dir->pending_requests, &request, 1);
}
+static void ring_noconn_warning(struct director *dir)
+{
+ if (!dir->ring_handshaked) {
+ i_warning("Delaying all requests "
+ "until all directors have connected");
+ } else {
+ i_warning("Delaying new user requests until ring is synced");
+ }
+ dir->ring_handshake_warning_sent = TRUE;
+ timeout_remove(&dir->to_handshake_warning);
+}
+
+static void ring_log_delayed_warning(struct director *dir)
+{
+ if (dir->ring_handshake_warning_sent ||
+ dir->to_handshake_warning != NULL)
+ return;
+
+ dir->to_handshake_warning = timeout_add(RING_NOCONN_WARNING_DELAY_MSECS,
+ ring_noconn_warning, dir);
+}
+
bool director_request_continue(struct director_request *request)
{
struct director *dir = request->dir;
if (!dir->ring_handshaked) {
/* delay requests until ring handshaking is complete */
- if (!dir->ring_handshake_warning_sent) {
- i_warning("Delaying requests until all "
- "directors have connected");
- dir->ring_handshake_warning_sent = TRUE;
- }
+ ring_log_delayed_warning(dir);
return FALSE;
}
else {
if (!dir->ring_synced) {
/* delay adding new users until ring is again synced */
- if (dir->debug)
- i_debug("Delaying request until ring is synced");
+ ring_log_delayed_warning(dir);
return FALSE;
}
host = mail_host_get_by_hash(dir->mail_hosts,
#define USER_TIMEOUT_MSECS (1000*60)
#define ADMIN_RANDOM_TIMEOUT_MSECS 500
#define DIRECTOR_CONN_MAX_DELAY_MSECS 100
+#define DIRECTOR_DISCONNECT_TIMEOUT_SECS 10
struct host {
int refcount;
struct io *io;
struct istream *input;
struct timeout *to_random;
+ bool pending_command;
};
static struct imap_client *imap_clients;
static struct hash_table *hosts;
static ARRAY_DEFINE(hosts_array, struct host *);
static struct admin_connection *admin;
+static struct timeout *to_disconnect;
static void imap_client_destroy(struct imap_client **client);
static void director_connection_destroy(struct director_connection **conn);
director_connection_create(int in_fd, const struct ip_addr *local_ip)
{
struct director_connection *conn;
+ int out_fd;
+
+ out_fd = net_connect_ip(local_ip, DIRECTOR_OUT_PORT, NULL);
+ if (out_fd == -1) {
+ (void)close(in_fd);
+ return;
+ }
conn = i_new(struct director_connection, 1);
conn->in_fd = in_fd;
conn->in_io = io_add(conn->in_fd, IO_READ,
director_connection_in_input, conn);
- conn->out_fd = net_connect_ip(local_ip, DIRECTOR_OUT_PORT, NULL);
+ conn->out_fd = out_fd;
conn->out_input = i_stream_create_fd(conn->out_fd, (size_t)-1, FALSE);
conn->out_output = o_stream_create_fd(conn->out_fd, (size_t)-1, FALSE);
conn->out_io = io_add(conn->out_fd, IO_READ,
while ((line = i_stream_read_next_line(conn->input)) != NULL) {
if (strcmp(line, "OK") != 0)
i_error("director-doveadm: Unexpected input: %s", line);
+ conn->pending_command = FALSE;
}
if (conn->input->stream_errno != 0 || conn->input->eof)
i_fatal("director-doveadm: Connection lost");
struct host *const *hosts;
unsigned int i, count;
+ if (conn->pending_command)
+ return;
+
hosts = array_get(&hosts_array, &count);
i = rand() % count;
admin_send(conn, t_strdup_printf("HOST-SET\t%s\t%u\n",
net_ip2addr(&hosts[i]->ip), hosts[i]->vhost_count));
+ conn->pending_command = TRUE;
}
static struct admin_connection *admin_connect(const char *path)
struct admin_connection *conn = *_conn;
*_conn = NULL;
- //timeout_remove(&conn->to_random);
+ if (conn->to_random != NULL)
+ timeout_remove(&conn->to_random);
i_stream_destroy(&conn->input);
io_remove(&conn->io);
net_disconnect(conn->fd);
net_set_nonblock(admin->fd, TRUE);
}
+static void
+director_connection_disconnect_timeout(void *context ATTR_UNUSED)
+{
+ struct director_connection *conn;
+ unsigned int i, count = 0;
+
+ for (conn = director_connections; conn != NULL; conn = conn->next)
+ count++;
+
+ if (count != 0) {
+ i = 0; count = rand() % count;
+ for (conn = director_connections; i < count; conn = conn->next)
+ i++;
+ director_connection_destroy(&conn);
+ }
+}
+
static void main_init(const char *admin_path)
{
users = hash_table_create(default_pool, default_pool, 0,
admin = admin_connect(admin_path);
admin_send(admin, "HOST-LIST\n");
admin_read_hosts(admin);
+
+ to_disconnect =
+ timeout_add(1000*(1 + rand()%DIRECTOR_DISCONNECT_TIMEOUT_SECS),
+ director_connection_disconnect_timeout, NULL);
}
static void main_deinit(void)
imap_client_destroy(&client);
}
+ timeout_remove(&to_disconnect);
while (director_connections != NULL) {
struct director_connection *conn = director_connections;
director_connection_destroy(&conn);
listen = 127.0.1.$i
base_dir = /var/run/dovecot$i
-!include dovecot-director-common.conf
+!include dovecot-director-common.conf.inc
EOF
done
-cat > dovecot-director-common.conf <<EOF
+cat > dovecot-director-common.conf.inc <<EOF
log_path = /var/log/dovecot.log
info_log_path = /var/log/dovecot-access.log
director_servers =$dirs
echo
echo "Start up dovecot instances:"
echo
-echo "dovecot -c dovecot-test.conf"
-i=0
-while [ $i != $director_count ]; do
- i=`expr $i + 1`
- echo "dovecot -c dovecot-director$i.conf"
-done
+echo 'for conf in dovecot*.conf; do dovecot -c $conf; done'
echo
echo "Start testing:"
echo
#include "director.h"
#define DIRECTOR_RECONNECT_RETRY_SECS 60
+#define DIRECTOR_RECONNECT_TIMEOUT_MSECS (30*1000)
static bool director_is_self_ip_set(struct director *dir)
{
unsigned int port;
int fd;
- i_assert(dir->right == NULL);
+ if (director_connection_find_outgoing(dir, host) != NULL)
+ return 0;
if (dir->debug) {
i_debug("Connecting to %s:%u",
return -1;
}
- dir->right = director_connection_init_out(dir, fd, host);
+ director_connection_init_out(dir, fd, host);
return 0;
}
+static struct director_host *
+director_get_preferred_right_host(struct director *dir)
+{
+ struct director_host *const *hosts;
+ unsigned int count, self_idx;
+
+ hosts = array_get(&dir->dir_hosts, &count);
+ if (count == 1)
+ return NULL;
+
+ self_idx = director_find_self_idx(dir);
+ return hosts[(self_idx + 1) % count];
+}
+
void director_connect(struct director *dir)
{
struct director_host *const *hosts;
}
if (i == count) {
/* we're the only one */
- director_set_ring_handshaked(dir);
+ if (dir->left != NULL) {
+ /* since we couldn't connect to it,
+ it must have failed recently */
+ director_connection_deinit(&dir->left);
+ }
+ if (!dir->ring_handshaked)
+ director_set_ring_handshaked(dir);
+ else
+ director_set_ring_synced(dir);
}
}
void director_set_ring_handshaked(struct director *dir)
{
+ i_assert(!dir->ring_handshaked);
+
+ if (dir->to_handshake_warning != NULL)
+ timeout_remove(&dir->to_handshake_warning);
if (dir->ring_handshake_warning_sent) {
i_warning("Directors have been connected, "
- "continuing delayed connections");
+ "continuing delayed requests");
dir->ring_handshake_warning_sent = FALSE;
}
if (dir->debug)
i_debug("Director ring handshaked");
dir->ring_handshaked = TRUE;
+ director_set_ring_synced(dir);
+}
+
+static void director_reconnect_timeout(struct director *dir)
+{
+ struct director_host *cur_host, *preferred_host =
+ director_get_preferred_right_host(dir);
+
+ cur_host = dir->right == NULL ? NULL :
+ director_connection_get_host(dir->right);
+
+ if (cur_host != preferred_host)
+ (void)director_connect_host(dir, preferred_host);
+ else {
+ /* the connection hasn't finished sync yet.
+ keep this timeout for now. */
+ }
+}
+
+void director_set_ring_synced(struct director *dir)
+{
+ struct director_host *host;
+
+ i_assert(!dir->ring_synced);
+ i_assert((dir->left != NULL && dir->right != NULL) ||
+ (dir->left == NULL && dir->right == NULL));
+
+ if (dir->to_handshake_warning != NULL)
+ timeout_remove(&dir->to_handshake_warning);
+ if (dir->ring_handshake_warning_sent) {
+ i_warning("Ring is synced, continuing delayed requests");
+ dir->ring_handshake_warning_sent = FALSE;
+ }
+
+ host = dir->right == NULL ? NULL :
+ director_connection_get_host(dir->right);
+ if (host != director_get_preferred_right_host(dir)) {
+ /* try to reconnect to preferred host later */
+ if (dir->to_reconnect == NULL) {
+ dir->to_reconnect =
+ timeout_add(DIRECTOR_RECONNECT_TIMEOUT_MSECS,
+ director_reconnect_timeout, dir);
+ }
+ } else {
+ if (dir->to_reconnect != NULL)
+ timeout_remove(&dir->to_reconnect);
+ }
+
dir->ring_synced = TRUE;
director_set_state_changed(dir);
}
static void director_sync(struct director *dir)
{
- /* we're synced again, once we receive this SYNC back */
+ if (dir->sync_frozen) {
+ dir->sync_pending = TRUE;
+ return;
+ }
+ if (dir->right == NULL) {
+ i_assert(!dir->ring_synced ||
+ (dir->left == NULL && dir->right == NULL));
+ return;
+ }
+
+ /* we're synced again when we receive this SYNC back */
dir->sync_seq++;
dir->ring_synced = FALSE;
director_connection_get_name(dir->right));
}
+ if (dir->left != NULL)
+ director_connection_wait_sync(dir->left);
+ director_connection_wait_sync(dir->right);
director_connection_send(dir->right, t_strdup_printf(
"SYNC\t%s\t%u\t%u\n", net_ip2addr(&dir->self_ip),
dir->self_port, dir->sync_seq));
}
+void director_sync_freeze(struct director *dir)
+{
+ i_assert(!dir->sync_frozen);
+ i_assert(!dir->sync_pending);
+
+ if (dir->left != NULL)
+ director_connection_cork(dir->left);
+ if (dir->right != NULL)
+ director_connection_cork(dir->right);
+ dir->sync_frozen = TRUE;
+}
+
+void director_sync_thaw(struct director *dir)
+{
+ i_assert(dir->sync_frozen);
+
+ dir->sync_frozen = FALSE;
+ if (dir->sync_pending) {
+ dir->sync_pending = FALSE;
+ director_sync(dir);
+ }
+ if (dir->left != NULL)
+ director_connection_uncork(dir->left);
+ if (dir->right != NULL)
+ director_connection_uncork(dir->right);
+}
+
void director_update_host(struct director *dir, struct director_host *src,
+ struct director_host *orig_src,
struct mail_host *host)
{
+ /* update state in case this is the first mail host being added */
director_set_state_changed(dir);
+ if (orig_src == NULL) {
+ orig_src = dir->self_host;
+ orig_src->last_seq++;
+ }
+
director_update_send(dir, src, t_strdup_printf(
- "HOST\t%s\t%u\n", net_ip2addr(&host->ip), host->vhost_count));
+ "HOST\t%s\t%u\t%u\t%s\t%u\n",
+ net_ip2addr(&orig_src->ip), orig_src->port, orig_src->last_seq,
+ net_ip2addr(&host->ip), host->vhost_count));
director_sync(dir);
}
void director_remove_host(struct director *dir, struct director_host *src,
+ struct director_host *orig_src,
struct mail_host *host)
{
- director_update_send(dir, src, t_strdup_printf(
- "HOST-REMOVE\t%s\n", net_ip2addr(&host->ip)));
+ if (src != NULL) {
+ if (orig_src == NULL) {
+ orig_src = dir->self_host;
+ orig_src->last_seq++;
+ }
+
+ director_update_send(dir, src, t_strdup_printf(
+ "HOST-REMOVE\t%s\t%u\t%u\t%s\n",
+ net_ip2addr(&orig_src->ip), orig_src->port,
+ orig_src->last_seq, net_ip2addr(&host->ip)));
+ }
+
user_directory_remove_host(dir->users, host);
mail_host_remove(dir->mail_hosts, host);
director_sync(dir);
}
void director_flush_host(struct director *dir, struct director_host *src,
+ struct director_host *orig_src,
struct mail_host *host)
{
+ if (orig_src == NULL) {
+ orig_src = dir->self_host;
+ orig_src->last_seq++;
+ }
+
director_update_send(dir, src, t_strdup_printf(
- "HOST-FLUSH\t%s\n", net_ip2addr(&host->ip)));
+ "HOST-FLUSH\t%s\t%u\t%u\t%s\n",
+ net_ip2addr(&orig_src->ip), orig_src->port, orig_src->last_seq,
+ net_ip2addr(&host->ip)));
user_directory_remove_host(dir->users, host);
director_sync(dir);
}
user_directory_deinit(&dir->users);
mail_hosts_deinit(&dir->mail_hosts);
mail_hosts_deinit(&dir->orig_config_hosts);
+ if (dir->to_reconnect != NULL)
+ timeout_remove(&dir->to_reconnect);
+ if (dir->to_handshake_warning != NULL)
+ timeout_remove(&dir->to_handshake_warning);
if (dir->to_request != NULL)
timeout_remove(&dir->to_request);
array_foreach(&dir->dir_hosts, hostp)
typedef void director_state_change_callback_t(struct director *dir);
-struct director_host_change {
- /* originating director for this change. keep ip/port here separately,
- because by the time its sync comes, the director itself may have
- already been removed. */
- struct ip_addr ip;
- unsigned int port;
- /* highest change sequence from this director */
- unsigned int seq;
-};
-
struct director {
const struct director_settings *set;
struct director_host *self_host;
struct director_connection *left, *right;
+ /* all director connections */
+ struct director_connection *connections;
+ struct timeout *to_reconnect;
/* current mail hosts */
struct mail_host_list *mail_hosts;
/* these requests are waiting for directors to be in synced */
ARRAY_DEFINE(pending_requests, struct director_request *);
struct timeout *to_request;
+ struct timeout *to_handshake_warning;
director_state_change_callback_t *state_change_callback;
unsigned int ring_handshaked:1;
unsigned int ring_handshake_warning_sent:1;
unsigned int ring_synced:1;
+ unsigned int sync_frozen:1;
+ unsigned int sync_pending:1;
unsigned int debug:1;
};
void director_connect(struct director *dir);
void director_set_ring_handshaked(struct director *dir);
+void director_set_ring_synced(struct director *dir);
void director_set_state_changed(struct director *dir);
void director_update_host(struct director *dir, struct director_host *src,
+ struct director_host *orig_src,
struct mail_host *host);
void director_remove_host(struct director *dir, struct director_host *src,
+ struct director_host *orig_src,
struct mail_host *host);
void director_flush_host(struct director *dir, struct director_host *src,
+ struct director_host *orig_src,
struct mail_host *host);
void director_update_user(struct director *dir, struct director_host *src,
struct user *user);
+void director_sync_freeze(struct director *dir);
+void director_sync_thaw(struct director *dir);
+
/* Send data to all directors using both left and right connections
(unless they're the same). */
void director_update_send(struct director *dir, struct director_host *src,
host = mail_host_add_ip(dir->mail_hosts, &ip);
if (vhost_count != -1U)
mail_host_set_vhost_count(dir->mail_hosts, host, vhost_count);
- director_update_host(dir, dir->self_host, host);
+ director_update_host(dir, dir->self_host, NULL, host);
o_stream_send(conn->output, "OK\n", 3);
return TRUE;
if (host == NULL)
o_stream_send_str(conn->output, "NOTFOUND\n");
else {
- director_remove_host(conn->dir, conn->dir->self_host, host);
+ director_remove_host(conn->dir, conn->dir->self_host,
+ NULL, host);
o_stream_send(conn->output, "OK\n", 3);
}
return TRUE;
{
struct mail_host *const *hostp;
- array_foreach(mail_hosts_get(conn->dir->mail_hosts), hostp)
- director_flush_host(conn->dir, conn->dir->self_host, *hostp);
+ array_foreach(mail_hosts_get(conn->dir->mail_hosts), hostp) {
+ director_flush_host(conn->dir, conn->dir->self_host,
+ NULL, *hostp);
+ }
o_stream_send(conn->output, "OK\n", 3);
}
if (host == NULL)
o_stream_send_str(conn->output, "NOTFOUND\n");
else {
- director_flush_host(conn->dir, conn->dir->self_host, host);
+ director_flush_host(conn->dir, conn->dir->self_host,
+ NULL, host);
o_stream_send(conn->output, "OK\n", 3);
}
return TRUE;
return -1;
}
- director_connection_init_in(director, fd);
+ director_connection_init_in(director, fd, ip);
return 0;
}
&director_setting_parser_info,
NULL
};
- unsigned int test_port;
+ unsigned int test_port = 0;
const char *error;
bool debug = FALSE;
int c;