]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
director: Lots of fixes. It should be pretty stable now.
authorTimo Sirainen <tss@iki.fi>
Thu, 24 Jun 2010 19:29:27 +0000 (20:29 +0100)
committerTimo Sirainen <tss@iki.fi>
Thu, 24 Jun 2010 19:29:27 +0000 (20:29 +0100)
--HG--
branch : HEAD

12 files changed:
src/director/auth-connection.c
src/director/auth-connection.h
src/director/director-connection.c
src/director/director-connection.h
src/director/director-host.h
src/director/director-request.c
src/director/director-test.c
src/director/director-test.sh
src/director/director.c
src/director/director.h
src/director/doveadm-connection.c
src/director/main.c

index 76349bafc0375f6f2d930796bce64290954d9150..5c3021bc1cb89a80e5fbf958fd7d635b57561abe 100644 (file)
@@ -27,6 +27,8 @@ struct auth_connection {
 
 static struct auth_connection *auth_connections;
 
+static void auth_connection_disconnected(struct auth_connection **conn);
+
 static void auth_connection_input(struct auth_connection *conn)
 {
        char *line;
@@ -36,13 +38,13 @@ static void auth_connection_input(struct auth_connection *conn)
                return;
        case -1:
                /* disconnected */
-               auth_connection_deinit(&conn);
+               auth_connection_disconnected(&conn);
                return;
        case -2:
                /* buffer full */
                i_error("BUG: Auth server sent us more than %d bytes",
                        (int)AUTH_CLIENT_MAX_LINE_LENGTH);
-               auth_connection_deinit(&conn);
+               auth_connection_disconnected(&conn);
                return;
        }
 
@@ -103,12 +105,20 @@ void auth_connection_deinit(struct auth_connection **_conn)
 
                if (close(conn->fd) < 0)
                        i_error("close(auth connection) failed: %m");
-               conn->callback(NULL, conn->context);
        }
        i_free(conn->path);
        i_free(conn);
 }
 
+static void auth_connection_disconnected(struct auth_connection **_conn)
+{
+       struct auth_connection *conn = *_conn;
+
+       *_conn = NULL;
+       /* notify callback. it should deinit this connection */
+       conn->callback(NULL, conn->context);
+}
+
 void auth_connection_send(struct auth_connection *conn,
                          const void *data, size_t size)
 {
@@ -122,6 +132,6 @@ void auth_connections_deinit(void)
        while (auth_connections != NULL) {
                struct auth_connection *conn = auth_connections;
 
-               auth_connection_deinit(&conn);
+               auth_connection_disconnected(&conn);
        }
 }
index 6808fc993391f41c3083c48e7281776c28b8c362..c5d5063aa1c122c7d161d52f194b364f47eb1f57 100644 (file)
@@ -2,7 +2,7 @@
 #define AUTH_CONNECTION_H
 
 /* Called for each input line. This is also called with line=NULL if
-   connection gets disonnected. */
+   connection gets disconnected. */
 typedef void auth_input_callback(const char *line, void *context);
 
 struct auth_connection *auth_connection_init(const char *path);
index 55d5c0ec7e3620f2a89ec709978bd679521720c5..b5a573381e6ecf284896f7bea92a0a629cc20e3e 100644 (file)
@@ -7,6 +7,7 @@
 #include "istream.h"
 #include "ostream.h"
 #include "str.h"
+#include "llist.h"
 #include "master-service.h"
 #include "mail-host.h"
 #include "director.h"
 #define MAX_INBUF_SIZE 1024
 #define MAX_OUTBUF_SIZE (1024*1024*10)
 #define OUTBUF_FLUSH_THRESHOLD (1024*128)
+/* Max idling time while connecting/handshaking before disconnecting */
+#define DIRECTOR_CONNECTION_INIT_TIMEOUT_MSECS (2*1000)
 /* How long to wait for PONG after PING request */
 #define DIRECTOR_CONNECTION_PING_TIMEOUT_MSECS (2*1000)
 /* How long to wait to send PING when connection is idle */
 #define DIRECTOR_CONNECTION_PING_INTERVAL_MSECS (15*1000)
+/* How long to wait before sending PING while waiting for SYNC reply */
+#define DIRECTOR_CONNECTION_SYNC_TIMEOUT_MSECS 1000
 
 struct director_connection {
+       struct director_connection *prev, *next;
+
        struct director *dir;
-       const char *name;
+       char *name;
 
        /* for incoming connections the director host isn't known until
           ME-line is received */
@@ -54,9 +61,11 @@ struct director_connection {
        unsigned int ignore_host_events:1;
        unsigned int handshake_sending_hosts:1;
        unsigned int ping_waiting:1;
+       unsigned int sync_ping:1;
 };
 
 static void director_connection_ping(struct director_connection *conn);
+static void director_connection_disconnected(struct director_connection **conn);
 
 static bool
 director_args_parse_ip_port(struct director_connection *conn,
@@ -102,7 +111,12 @@ static bool director_cmd_me(struct director_connection *conn,
        if (!conn->in)
                return TRUE;
 
+       i_free(conn->name);
+       conn->name = i_strdup_printf("%s/left", host->name);
        conn->host = host;
+       /* make sure we don't keep old sequence values across restarts */
+       host->last_seq = 0;
+
        connect_str = t_strdup_printf("CONNECT\t%s\t%u\n",
                                      net_ip2addr(&host->ip), host->port);
        /* make sure this is the correct incoming connection */
@@ -135,6 +149,8 @@ static bool director_cmd_me(struct director_connection *conn,
                   one, but before that tell it to connect to the new one.
                   that message might not reach it, so also send the same
                   message to right side. */
+               i_warning("Replacing director connection %s with %s",
+                         dir->left->host->name, host->name);
                director_connection_send(dir->left, connect_str);
                (void)o_stream_flush(dir->left->output);
                director_connection_deinit(&dir->left);
@@ -227,6 +243,33 @@ director_handshake_cmd_user(struct director_connection *conn,
        return TRUE;
 }
 
+static bool
+director_cmd_user(struct director_connection *conn, const char *const *args)
+{
+       unsigned int username_hash;
+       struct ip_addr ip;
+       struct mail_host *host;
+       struct user *user;
+
+       if (str_array_length(args) != 2 ||
+           str_to_uint(args[0], &username_hash) < 0 ||
+           net_addr2ip(args[1], &ip) < 0) {
+               i_error("director(%s): Invalid USER args", conn->name);
+               return FALSE;
+       }
+
+       host = mail_host_lookup(conn->dir->mail_hosts, &ip);
+       if (host == NULL) {
+               /* we probably just removed this host. */
+               return TRUE;
+       }
+
+       if (director_user_refresh(conn->dir, username_hash,
+                                 host, ioloop_time, &user))
+               director_update_user(conn->dir, conn->host, user);
+       return TRUE;
+}
+
 static bool director_cmd_director(struct director_connection *conn,
                                  const char *const *args)
 {
@@ -269,7 +312,7 @@ director_cmd_host_hand_start(struct director_connection *conn,
                hosts = mail_hosts_get(conn->dir->mail_hosts);
                while (array_count(hosts) > 0) {
                        hostp = array_idx(hosts, 0);
-                       director_remove_host(conn->dir, conn->host, *hostp);
+                       director_remove_host(conn->dir, NULL, NULL, *hostp);
                }
        } else if (!remote_ring_completed && conn->dir->ring_handshaked) {
                /* ignore whatever remote sends */
@@ -279,8 +322,46 @@ director_cmd_host_hand_start(struct director_connection *conn,
        return TRUE;
 }
 
+static int
+director_cmd_is_seen(struct director_connection *conn,
+                    const char *const **_args,
+                    struct director_host **host_r)
+{
+       const char *const *args = *_args;
+       struct ip_addr ip;
+       unsigned int port, seq;
+       struct director_host *host;
+
+       if (str_array_length(args) < 3 ||
+           net_addr2ip(args[0], &ip) < 0 ||
+           str_to_uint(args[1], &port) < 0 ||
+           str_to_uint(args[2], &seq) < 0) {
+               i_error("director(%s): Command is missing parameters",
+                       conn->name);
+               return -1;
+       }
+       *_args = args + 3;
+
+       host = director_host_lookup(conn->dir, &ip, port);
+       if (host == NULL) {
+               /* director is already gone, but we can't be sure if this
+                  command was sent everywhere. re-send it as if it was from
+                  ourself. */
+               *host_r = NULL;
+       } else {
+               if (seq <= host->last_seq) {
+                       /* already seen this */
+                       return 1;
+               }
+               *host_r = host;
+               host->last_seq = seq;
+       }
+       return 0;
+}
+
 static bool
-director_cmd_host(struct director_connection *conn, const char *const *args)
+director_cmd_host_int(struct director_connection *conn, const char *const *args,
+                     struct director_host *dir_host)
 {
        struct mail_host *host;
        struct ip_addr ip;
@@ -311,17 +392,40 @@ director_cmd_host(struct director_connection *conn, const char *const *args)
        if (update) {
                mail_host_set_vhost_count(conn->dir->mail_hosts,
                                          host, vhost_count);
-               director_update_host(conn->dir, conn->host, host);
+               director_update_host(conn->dir, conn->host, dir_host, host);
        }
        return TRUE;
 }
 
+static bool
+director_cmd_host_handshake(struct director_connection *conn,
+                           const char *const *args)
+{
+       return director_cmd_host_int(conn, args, NULL);
+}
+
+static bool
+director_cmd_host(struct director_connection *conn, const char *const *args)
+{
+       struct director_host *dir_host;
+       int ret;
+
+       if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
+               return ret > 0;
+       return director_cmd_host_int(conn, args, dir_host);
+}
+
 static bool
 director_cmd_host_remove(struct director_connection *conn,
                         const char *const *args)
 {
+       struct director_host *dir_host;
        struct mail_host *host;
        struct ip_addr ip;
+       int ret;
+
+       if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
+               return ret > 0;
 
        if (str_array_length(args) != 1 ||
            net_addr2ip(args[0], &ip) < 0) {
@@ -331,7 +435,7 @@ director_cmd_host_remove(struct director_connection *conn,
 
        host = mail_host_lookup(conn->dir->mail_hosts, &ip);
        if (host != NULL)
-               director_remove_host(conn->dir, conn->host, host);
+               director_remove_host(conn->dir, conn->host, dir_host, host);
        return TRUE;
 }
 
@@ -339,18 +443,25 @@ static bool
 director_cmd_host_flush(struct director_connection *conn,
                         const char *const *args)
 {
+       struct director_host *dir_host;
        struct mail_host *host;
        struct ip_addr ip;
+       unsigned int seq;
+       int ret;
 
-       if (str_array_length(args) != 1 ||
-           net_addr2ip(args[0], &ip) < 0) {
+       if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
+               return ret > 0;
+
+       if (str_array_length(args) != 2 ||
+           net_addr2ip(args[0], &ip) < 0 ||
+           str_to_uint(args[1], &seq) < 0) {
                i_error("director(%s): Invalid HOST-FLUSH args", conn->name);
                return FALSE;
        }
 
        host = mail_host_lookup(conn->dir->mail_hosts, &ip);
        if (host != NULL)
-               director_flush_host(conn->dir, conn->host, host);
+               director_flush_host(conn->dir, conn->host, dir_host, host);
        return TRUE;
 }
 
@@ -380,16 +491,16 @@ static void director_handshake_cmd_done(struct director_connection *conn)
            dir->left->handshake_received && dir->right->handshake_received) {
                /* we're connected to both directors. see if the ring is
                   finished by sending a SYNC. if we get it back, it's done. */
-               dir->sync_seq = ++dir->self_host->last_seq;
+               dir->sync_seq++;
                director_connection_send(dir->right,
                        t_strdup_printf("SYNC\t%s\t%u\t%u\n",
                                        net_ip2addr(&dir->self_ip),
                                        dir->self_port, dir->sync_seq));
        }
-       if (conn->to_ping == NULL) {
-               conn->to_ping = timeout_add(DIRECTOR_CONNECTION_PING_INTERVAL_MSECS,
-                                           director_connection_ping, conn);
-       }
+       if (conn->to_ping != NULL)
+               timeout_remove(&conn->to_ping);
+       conn->to_ping = timeout_add(DIRECTOR_CONNECTION_PING_INTERVAL_MSECS,
+                                   director_connection_ping, conn);
 }
 
 static bool
@@ -444,7 +555,10 @@ director_connection_handle_handshake(struct director_connection *conn,
        if (strcmp(cmd, "HOST") == 0) {
                /* allow hosts from all connections always,
                   this could be an host update */
-               return director_cmd_host(conn, args);
+               if (conn->handshake_sending_hosts)
+                       return director_cmd_host_handshake(conn, args);
+               else
+                       return director_cmd_host(conn, args);
        }
        if (conn->handshake_sending_hosts &&
            strcmp(cmd, "HOST-HAND-END") == 0) {
@@ -456,47 +570,27 @@ director_connection_handle_handshake(struct director_connection *conn,
            conn->me_received)
                return director_cmd_host_hand_start(conn, args);
 
-       /* only incoming connections get a USER list */
-       if (conn->in && strcmp(cmd, "USER") == 0 && conn->me_received)
-               return director_handshake_cmd_user(conn, args);
+       /* only incoming connections get a full USER list, but outgoing
+          connections can also receive USER updates during handshake and
+          it wouldn't be safe to ignore them. */
+       if (strcmp(cmd, "USER") == 0 && conn->me_received) {
+               if (conn->in)
+                       return director_handshake_cmd_user(conn, args);
+               else
+                       return director_cmd_user(conn, args);
+       }
        /* both get DONE */
        if (strcmp(cmd, "DONE") == 0 && !conn->handshake_received &&
            !conn->handshake_sending_hosts) {
                director_handshake_cmd_done(conn);
                return TRUE;
        }
-       i_error("director(%s): Invalid handshake command: %s",
-               conn->name, cmd);
+       i_error("director(%s): Invalid handshake command: %s "
+               "(in=%d me_received=%d)", conn->name, cmd,
+               conn->in, conn->me_received);
        return FALSE;
 }
 
-static bool
-director_cmd_user(struct director_connection *conn, const char *const *args)
-{
-       unsigned int username_hash;
-       struct ip_addr ip;
-       struct mail_host *host;
-       struct user *user;
-
-       if (str_array_length(args) != 2 ||
-           str_to_uint(args[0], &username_hash) < 0 ||
-           net_addr2ip(args[1], &ip) < 0) {
-               i_error("director(%s): Invalid USER args", conn->name);
-               return FALSE;
-       }
-
-       host = mail_host_lookup(conn->dir->mail_hosts, &ip);
-       if (host == NULL) {
-               /* we probably just removed this host. */
-               return TRUE;
-       }
-
-       if (director_user_refresh(conn->dir, username_hash,
-                                 host, ioloop_time, &user))
-               director_update_user(conn->dir, conn->host, user);
-       return TRUE;
-}
-
 static bool director_connection_sync(struct director_connection *conn,
                                     const char *const *args, const char *line)
 {
@@ -523,24 +617,21 @@ static bool director_connection_sync(struct director_connection *conn,
                        /* stale SYNC event */
                        return TRUE;
                }
+
                if (!dir->ring_handshaked) {
                        /* the ring is handshaked */
                        director_set_ring_handshaked(dir);
-                       return TRUE;
-               }
-
-               if (dir->ring_synced) {
+               } else if (dir->ring_synced) {
                        i_error("Received SYNC from %s (seq=%u) "
                                "while already synced", conn->name, seq);
                        return TRUE;
+               } else {
+                       if (dir->debug) {
+                               i_debug("Ring is synced (%s sent seq=%u)",
+                                       conn->name, seq);
+                       }
+                       director_set_ring_synced(dir);
                }
-
-               if (dir->debug) {
-                       i_debug("Ring is synced (%s sent seq=%u)",
-                               conn->name, seq);
-               }
-               dir->ring_synced = TRUE;
-               director_set_state_changed(dir);
                return TRUE;
        }
 
@@ -597,10 +688,7 @@ static bool director_cmd_connect(struct director_connection *conn,
                }
        }
 
-       /* connect here, disconnect old one */
-       if (dir->right != NULL)
-               director_connection_deinit(&dir->right);
-
+       /* connect here */
        (void)director_connect_host(dir, host);
        return TRUE;
 }
@@ -629,6 +717,15 @@ director_connection_handle_line(struct director_connection *conn,
                i_error("director(%s): Received empty line", conn->name);
                return FALSE;
        }
+
+       /* ping/pong is always handled */
+       if (strcmp(cmd, "PING") == 0) {
+               director_connection_send(conn, "PONG\n");
+               return TRUE;
+       }
+       if (strcmp(cmd, "PONG") == 0)
+               return director_cmd_pong(conn);
+
        if (!conn->handshake_received) {
                if (!director_connection_handle_handshake(conn, cmd, args)) {
                        /* invalid commands during handshake,
@@ -655,12 +752,6 @@ director_connection_handle_line(struct director_connection *conn,
        if (strcmp(cmd, "CONNECT") == 0)
                return director_cmd_connect(conn, args);
 
-       if (strcmp(cmd, "PING") == 0) {
-               director_connection_send(conn, "PONG\n");
-               return TRUE;
-       }
-       if (strcmp(cmd, "PONG") == 0)
-               return director_cmd_pong(conn);
        i_error("director(%s): Unknown command (in this state): %s",
                conn->name, cmd);
        return FALSE;
@@ -668,6 +759,7 @@ director_connection_handle_line(struct director_connection *conn,
 
 static void director_connection_input(struct director_connection *conn)
 {
+       struct director *dir = conn->dir;
        char *line;
        bool ret;
 
@@ -678,26 +770,31 @@ static void director_connection_input(struct director_connection *conn)
                return;
        case -1:
                /* disconnected */
-               director_connection_deinit(&conn);
+               i_error("Director %s disconnected%s", conn->name,
+                       conn->handshake_received ? "" :
+                       " before handshake finished");
+               director_connection_disconnected(&conn);
                return;
        case -2:
                /* buffer full */
-               i_error("BUG: Director sent us more than %d bytes",
-                       MAX_INBUF_SIZE);
-               director_connection_deinit(&conn);
+               i_error("BUG: Director %s sent us more than %d bytes",
+                       conn->name, MAX_INBUF_SIZE);
+               director_connection_disconnected(&conn);
                return;
        }
 
+       director_sync_freeze(dir);
        while ((line = i_stream_next_line(conn->input)) != NULL) {
                T_BEGIN {
                        ret = director_connection_handle_line(conn, line);
                } T_END;
 
                if (!ret) {
-                       director_connection_deinit(&conn);
+                       director_connection_disconnected(&conn);
                        break;
                }
        }
+       director_sync_thaw(dir);
 }
 
 static void director_connection_send_directors(struct director_connection *conn,
@@ -773,6 +870,18 @@ static int director_connection_output(struct director_connection *conn)
                return o_stream_flush(conn->output);
 }
 
+static void
+director_connection_init_timeout(struct director_connection *conn)
+{
+       if (conn->host != NULL)
+               conn->host->last_failed = ioloop_time;
+       if (!conn->connected)
+               i_error("director(%s): Connect timed out", conn->name);
+       else
+               i_error("director(%s): Handshaking timed out", conn->name);
+       director_connection_disconnected(&conn);
+}
+
 static struct director_connection *
 director_connection_init_common(struct director *dir, int fd)
 {
@@ -785,6 +894,9 @@ director_connection_init_common(struct director *dir, int fd)
        conn->output = o_stream_create_fd(conn->fd, MAX_OUTBUF_SIZE, FALSE);
        o_stream_set_flush_callback(conn->output,
                                    director_connection_output, conn);
+       conn->to_ping = timeout_add(DIRECTOR_CONNECTION_INIT_TIMEOUT_MSECS,
+                                   director_connection_init_timeout, conn);
+       DLLIST_PREPEND(&dir->connections, conn);
        return conn;
 }
 
@@ -798,14 +910,15 @@ static void director_connection_send_handshake(struct director_connection *conn)
 }
 
 struct director_connection *
-director_connection_init_in(struct director *dir, int fd)
+director_connection_init_in(struct director *dir, int fd,
+                           const struct ip_addr *ip)
 {
        struct director_connection *conn;
 
        conn = director_connection_init_common(dir, fd);
        conn->in = TRUE;
        conn->connected = TRUE;
-       conn->name = "<incoming>";
+       conn->name = i_strdup_printf("%s/in", net_ip2addr(ip));
        conn->io = io_add(conn->fd, IO_READ, director_connection_input, conn);
 
        director_connection_send_handshake(conn);
@@ -822,14 +935,26 @@ static void director_connection_connected(struct director_connection *conn)
                conn->host->last_failed = ioloop_time;
                i_error("director(%s): connect() failed: %s", conn->name,
                        strerror(err));
-               director_connection_deinit(&conn);
-
-               /* try connecting to next server */
-               director_connect(dir);
+               director_connection_disconnected(&conn);
                return;
        }
        conn->connected = TRUE;
 
+       if (dir->right != NULL) {
+               /* see if we should disconnect or keep the existing
+                  connection. */
+               if (director_host_cmp_to_self(conn->host, dir->right->host,
+                                             dir->self_host) <= 0) {
+                       /* the old connection is the correct one */
+                       director_connection_deinit(&conn);
+                       return;
+               }
+               director_connection_deinit(&dir->right);
+       }
+       dir->right = conn;
+       i_free(conn->name);
+       conn->name = i_strdup_printf("%s/right", conn->host->name);
+
        io_remove(&conn->io);
 
        director_connection_send_handshake(conn);
@@ -847,10 +972,15 @@ director_connection_init_out(struct director *dir, int fd,
 {
        struct director_connection *conn;
 
+       /* make sure we don't keep old sequence values across restarts */
+       host->last_seq = 0;
+
        conn = director_connection_init_common(dir, fd);
-       conn->name = host->name;
+       conn->name = i_strdup_printf("%s/out", host->name);
        conn->host = host;
-       conn->io = io_add(conn->fd, IO_WRITE,
+       /* use IO_READ instead of IO_WRITE, so that we don't assign
+          dir->right until remote has actually sent some data */
+       conn->io = io_add(conn->fd, IO_READ,
                          director_connection_connected, conn);
        return conn;
 }
@@ -858,19 +988,18 @@ director_connection_init_out(struct director *dir, int fd,
 void director_connection_deinit(struct director_connection **_conn)
 {
        struct director_connection *conn = *_conn;
+       struct director *dir = conn->dir;
 
        *_conn = NULL;
 
-       if (conn->dir->debug && conn->host != NULL) {
-               i_debug("Director %s:%u disconnected",
-                       net_ip2addr(&conn->host->ip), conn->host->port);
-       }
-
-       if (conn->dir->left == conn)
-               conn->dir->left = NULL;
-       if (conn->dir->right == conn)
-               conn->dir->right = NULL;
+       DLLIST_REMOVE(&dir->connections, conn);
+       if (dir->left == conn)
+               dir->left = NULL;
+       if (dir->right == conn)
+               dir->right = NULL;
 
+       if (conn->user_iter != NULL)
+               user_directory_iter_deinit(&conn->user_iter);
        if (conn->to != NULL)
                timeout_remove(&conn->to);
        if (conn->to_ping != NULL)
@@ -884,12 +1013,29 @@ void director_connection_deinit(struct director_connection **_conn)
 
        if (conn->in)
                master_service_client_connection_destroyed(master_service);
+       i_free(conn->name);
        i_free(conn);
+
+       if (dir->left == NULL || dir->right == NULL) {
+               /* we aren't synced until we're again connected to a ring */
+               dir->sync_seq++;
+               dir->ring_synced = FALSE;
+       }
+}
+
+void director_connection_disconnected(struct director_connection **_conn)
+{
+       struct director_connection *conn = *_conn;
+       struct director *dir = conn->dir;
+
+       director_connection_deinit(_conn);
+       if (dir->right == NULL)
+               director_connect(dir);
 }
 
 static void director_connection_timeout(struct director_connection *conn)
 {
-       director_connection_deinit(&conn);
+       director_connection_disconnected(&conn);
 }
 
 void director_connection_send(struct director_connection *conn,
@@ -925,15 +1071,17 @@ void director_connection_send_except(struct director_connection *conn,
 static void director_connection_ping_timeout(struct director_connection *conn)
 {
        i_error("director(%s): Ping timed out, disconnecting", conn->name);
-       director_connection_deinit(&conn);
+       director_connection_disconnected(&conn);
 }
 
 static void director_connection_ping(struct director_connection *conn)
 {
+       conn->sync_ping = FALSE;
        if (conn->ping_waiting)
                return;
 
-       timeout_remove(&conn->to_ping);
+       if (conn->to_ping != NULL)
+               timeout_remove(&conn->to_ping);
        conn->to_ping = timeout_add(DIRECTOR_CONNECTION_PING_TIMEOUT_MSECS,
                                    director_connection_ping_timeout, conn);
        director_connection_send(conn, "PING\n");
@@ -944,3 +1092,46 @@ const char *director_connection_get_name(struct director_connection *conn)
 {
        return conn->name;
 }
+
+struct director_host *
+director_connection_get_host(struct director_connection *conn)
+{
+       return conn->host;
+}
+
+struct director_connection *
+director_connection_find_outgoing(struct director *dir,
+                                 struct director_host *host)
+{
+       struct director_connection *conn;
+
+       for (conn = dir->connections; conn != NULL; conn = conn->next) {
+               if (conn->host == host && !conn->in)
+                       return conn;
+       }
+       return NULL;
+}
+
+void director_connection_cork(struct director_connection *conn)
+{
+       o_stream_cork(conn->output);
+}
+
+void director_connection_uncork(struct director_connection *conn)
+{
+       o_stream_uncork(conn->output);
+}
+
+void director_connection_wait_sync(struct director_connection *conn)
+{
+       /* switch to faster ping timeout. avoid reseting the timeout if it's
+          already fast. */
+       if (conn->ping_waiting || conn->sync_ping)
+               return;
+
+       if (conn->to_ping != NULL)
+               timeout_remove(&conn->to_ping);
+       conn->to_ping = timeout_add(DIRECTOR_CONNECTION_SYNC_TIMEOUT_MSECS,
+                                   director_connection_ping, conn);
+       conn->sync_ping = TRUE;
+}
index e2272a43f666e35951449847fd31370dd87c63fc..9e248e52dec73e2ef09dc2a06bd2fadb101645a8 100644 (file)
@@ -5,7 +5,8 @@ struct director_host;
 struct director;
 
 struct director_connection *
-director_connection_init_in(struct director *dir, int fd);
+director_connection_init_in(struct director *dir, int fd,
+                           const struct ip_addr *ip);
 struct director_connection *
 director_connection_init_out(struct director *dir, int fd,
                             struct director_host *host);
@@ -13,10 +14,19 @@ void director_connection_deinit(struct director_connection **conn);
 
 void director_connection_send(struct director_connection *conn,
                              const char *data);
+void director_connection_wait_sync(struct director_connection *conn);
 void director_connection_send_except(struct director_connection *conn,
                                     struct director_host *skip_host,
                                     const char *data);
 
 const char *director_connection_get_name(struct director_connection *conn);
+struct director_host *
+director_connection_get_host(struct director_connection *conn);
+struct director_connection *
+director_connection_find_outgoing(struct director *dir,
+                                 struct director_host *host);
+
+void director_connection_cork(struct director_connection *conn);
+void director_connection_uncork(struct director_connection *conn);
 
 #endif
index 2b8f84b2927e238f7c995ca4166a15bbe951983f..3dcab70fdbe4359996ca62c55f3b6661c0e26fe1 100644 (file)
@@ -11,15 +11,14 @@ struct director_host {
 
        /* name contains "ip:port" */
        char *name;
-
-       /* each command between directors contains an increasing sequence.
-          if director A gets conflicting information about director B, it can
-          trust the one that has the highest sequence. */
+       /* change commands each have originating host and originating sequence.
+          we'll keep track of the highest sequence we've seen from the host.
+          if we find a lower sequence, we've already handled the command and
+          it can be ignored (or: it must be ignored to avoid potential command
+          loops) */
        unsigned int last_seq;
-
        /* Last time host was detected to be down/broken */
        time_t last_failed;
-
        /* we are this director */
        unsigned int self:1;
 };
index 716232a0a84466f248fee6e55c4c1d9ada4382ed..f0326b05d8ac6397ec3342a93c199e27b5192332 100644 (file)
@@ -9,6 +9,7 @@
 #include "director-request.h"
 
 #define DIRECTOR_REQUEST_TIMEOUT_SECS 30
+#define RING_NOCONN_WARNING_DELAY_MSECS (2*1000)
 
 struct director_request {
        struct director *dir;
@@ -66,6 +67,28 @@ void director_request(struct director *dir, const char *username,
        array_append(&dir->pending_requests, &request, 1);
 }
 
+static void ring_noconn_warning(struct director *dir)
+{
+       if (!dir->ring_handshaked) {
+               i_warning("Delaying all requests "
+                         "until all directors have connected");
+       } else {
+               i_warning("Delaying new user requests until ring is synced");
+       }
+       dir->ring_handshake_warning_sent = TRUE;
+       timeout_remove(&dir->to_handshake_warning);
+}
+
+static void ring_log_delayed_warning(struct director *dir)
+{
+       if (dir->ring_handshake_warning_sent ||
+           dir->to_handshake_warning != NULL)
+               return;
+
+       dir->to_handshake_warning = timeout_add(RING_NOCONN_WARNING_DELAY_MSECS,
+                                               ring_noconn_warning, dir);
+}
+
 bool director_request_continue(struct director_request *request)
 {
        struct director *dir = request->dir;
@@ -74,11 +97,7 @@ bool director_request_continue(struct director_request *request)
 
        if (!dir->ring_handshaked) {
                /* delay requests until ring handshaking is complete */
-               if (!dir->ring_handshake_warning_sent) {
-                       i_warning("Delaying requests until all "
-                                 "directors have connected");
-                       dir->ring_handshake_warning_sent = TRUE;
-               }
+               ring_log_delayed_warning(dir);
                return FALSE;
        }
 
@@ -88,8 +107,7 @@ bool director_request_continue(struct director_request *request)
        else {
                if (!dir->ring_synced) {
                        /* delay adding new users until ring is again synced */
-                       if (dir->debug)
-                               i_debug("Delaying request until ring is synced");
+                       ring_log_delayed_warning(dir);
                        return FALSE;
                }
                host = mail_host_get_by_hash(dir->mail_hosts,
index 81150e2cbd1a757b07098eda3689f845b124681e..f1ef8a269f11c1a259fe90baf2e3bdf7ef417ba3 100644 (file)
@@ -36,6 +36,7 @@
 #define USER_TIMEOUT_MSECS (1000*60)
 #define ADMIN_RANDOM_TIMEOUT_MSECS 500
 #define DIRECTOR_CONN_MAX_DELAY_MSECS 100
+#define DIRECTOR_DISCONNECT_TIMEOUT_SECS 10
 
 struct host {
        int refcount;
@@ -83,6 +84,7 @@ struct admin_connection {
        struct io *io;
        struct istream *input;
        struct timeout *to_random;
+       bool pending_command;
 };
 
 static struct imap_client *imap_clients;
@@ -91,6 +93,7 @@ static struct hash_table *users;
 static struct hash_table *hosts;
 static ARRAY_DEFINE(hosts_array, struct host *);
 static struct admin_connection *admin;
+static struct timeout *to_disconnect;
 
 static void imap_client_destroy(struct imap_client **client);
 static void director_connection_destroy(struct director_connection **conn);
@@ -330,6 +333,13 @@ static void
 director_connection_create(int in_fd, const struct ip_addr *local_ip)
 {
        struct director_connection *conn;
+       int out_fd;
+
+       out_fd = net_connect_ip(local_ip, DIRECTOR_OUT_PORT, NULL);
+       if (out_fd == -1) {
+               (void)close(in_fd);
+               return;
+       }
 
        conn = i_new(struct director_connection, 1);
        conn->in_fd = in_fd;
@@ -338,7 +348,7 @@ director_connection_create(int in_fd, const struct ip_addr *local_ip)
        conn->in_io = io_add(conn->in_fd, IO_READ,
                             director_connection_in_input, conn);
 
-       conn->out_fd = net_connect_ip(local_ip, DIRECTOR_OUT_PORT, NULL);
+       conn->out_fd = out_fd;
        conn->out_input = i_stream_create_fd(conn->out_fd, (size_t)-1, FALSE);
        conn->out_output = o_stream_create_fd(conn->out_fd, (size_t)-1, FALSE);
        conn->out_io = io_add(conn->out_fd, IO_READ,
@@ -404,6 +414,7 @@ static void admin_input(struct admin_connection *conn)
        while ((line = i_stream_read_next_line(conn->input)) != NULL) {
                if (strcmp(line, "OK") != 0)
                        i_error("director-doveadm: Unexpected input: %s", line);
+               conn->pending_command = FALSE;
        }
        if (conn->input->stream_errno != 0 || conn->input->eof)
                i_fatal("director-doveadm: Connection lost");
@@ -414,6 +425,9 @@ static void admin_random_action(struct admin_connection *conn)
        struct host *const *hosts;
        unsigned int i, count;
 
+       if (conn->pending_command)
+               return;
+
        hosts = array_get(&hosts_array, &count);
        i = rand() % count;
 
@@ -421,6 +435,7 @@ static void admin_random_action(struct admin_connection *conn)
 
        admin_send(conn, t_strdup_printf("HOST-SET\t%s\t%u\n",
                net_ip2addr(&hosts[i]->ip), hosts[i]->vhost_count));
+       conn->pending_command = TRUE;
 }
 
 static struct admin_connection *admin_connect(const char *path)
@@ -458,7 +473,8 @@ static void admin_disconnect(struct admin_connection **_conn)
        struct admin_connection *conn = *_conn;
 
        *_conn = NULL;
-       //timeout_remove(&conn->to_random);
+       if (conn->to_random != NULL)
+               timeout_remove(&conn->to_random);
        i_stream_destroy(&conn->input);
        io_remove(&conn->io);
        net_disconnect(conn->fd);
@@ -492,6 +508,23 @@ static void admin_read_hosts(struct admin_connection *conn)
        net_set_nonblock(admin->fd, TRUE);
 }
 
+static void
+director_connection_disconnect_timeout(void *context ATTR_UNUSED)
+{
+       struct director_connection *conn;
+       unsigned int i, count = 0;
+
+       for (conn = director_connections; conn != NULL; conn = conn->next)
+               count++;
+
+       if (count != 0) {
+               i = 0; count = rand() % count;
+               for (conn = director_connections; i < count; conn = conn->next)
+                       i++;
+               director_connection_destroy(&conn);
+       }
+}
+
 static void main_init(const char *admin_path)
 {
        users = hash_table_create(default_pool, default_pool, 0,
@@ -504,6 +537,10 @@ static void main_init(const char *admin_path)
        admin = admin_connect(admin_path);
        admin_send(admin, "HOST-LIST\n");
        admin_read_hosts(admin);
+
+       to_disconnect =
+               timeout_add(1000*(1 + rand()%DIRECTOR_DISCONNECT_TIMEOUT_SECS),
+                           director_connection_disconnect_timeout, NULL);
 }
 
 static void main_deinit(void)
@@ -516,6 +553,7 @@ static void main_deinit(void)
                imap_client_destroy(&client);
        }
 
+       timeout_remove(&to_disconnect);
        while (director_connections != NULL) {
                struct director_connection *conn = director_connections;
                director_connection_destroy(&conn);
index 9813c9fb229c94218450dbc8804b84c580df8f82..4fa1ac0a04302ed535534fe784da2e7e23c65b7f 100755 (executable)
@@ -15,11 +15,11 @@ while [ $i != $director_count ]; do
 listen = 127.0.1.$i
 base_dir = /var/run/dovecot$i
 
-!include dovecot-director-common.conf
+!include dovecot-director-common.conf.inc
 EOF
 done
 
-cat > dovecot-director-common.conf <<EOF
+cat > dovecot-director-common.conf.inc <<EOF
 log_path = /var/log/dovecot.log
 info_log_path = /var/log/dovecot-access.log
 director_servers =$dirs
@@ -83,12 +83,7 @@ EOF
 echo
 echo "Start up dovecot instances:"
 echo
-echo "dovecot -c dovecot-test.conf"
-i=0
-while [ $i != $director_count ]; do
-  i=`expr $i + 1`
-  echo "dovecot -c dovecot-director$i.conf"
-done
+echo 'for conf in dovecot*.conf; do dovecot -c $conf; done'
 echo
 echo "Start testing:"
 echo
index 2444a5995a0da4bc34677150b9444e34aa65a963..fdd832e986eed2ae2a82964a94d41dbfe139c938 100644 (file)
@@ -11,6 +11,7 @@
 #include "director.h"
 
 #define DIRECTOR_RECONNECT_RETRY_SECS 60
+#define DIRECTOR_RECONNECT_TIMEOUT_MSECS (30*1000)
 
 static bool director_is_self_ip_set(struct director *dir)
 {
@@ -79,7 +80,8 @@ int director_connect_host(struct director *dir, struct director_host *host)
        unsigned int port;
        int fd;
 
-       i_assert(dir->right == NULL);
+       if (director_connection_find_outgoing(dir, host) != NULL)
+               return 0;
 
        if (dir->debug) {
                i_debug("Connecting to %s:%u",
@@ -93,10 +95,24 @@ int director_connect_host(struct director *dir, struct director_host *host)
                return -1;
        }
 
-       dir->right = director_connection_init_out(dir, fd, host);
+       director_connection_init_out(dir, fd, host);
        return 0;
 }
 
+static struct director_host *
+director_get_preferred_right_host(struct director *dir)
+{
+       struct director_host *const *hosts;
+       unsigned int count, self_idx;
+
+       hosts = array_get(&dir->dir_hosts, &count);
+       if (count == 1)
+               return NULL;
+
+       self_idx = director_find_self_idx(dir);
+       return hosts[(self_idx + 1) % count];
+}
+
 void director_connect(struct director *dir)
 {
        struct director_host *const *hosts;
@@ -122,28 +138,98 @@ void director_connect(struct director *dir)
        }
        if (i == count) {
                /* we're the only one */
-               director_set_ring_handshaked(dir);
+               if (dir->left != NULL) {
+                       /* since we couldn't connect to it,
+                          it must have failed recently */
+                       director_connection_deinit(&dir->left);
+               }
+               if (!dir->ring_handshaked)
+                       director_set_ring_handshaked(dir);
+               else
+                       director_set_ring_synced(dir);
        }
 }
 
 void director_set_ring_handshaked(struct director *dir)
 {
+       i_assert(!dir->ring_handshaked);
+
+       if (dir->to_handshake_warning != NULL)
+               timeout_remove(&dir->to_handshake_warning);
        if (dir->ring_handshake_warning_sent) {
                i_warning("Directors have been connected, "
-                         "continuing delayed connections");
+                         "continuing delayed requests");
                dir->ring_handshake_warning_sent = FALSE;
        }
        if (dir->debug)
                i_debug("Director ring handshaked");
 
        dir->ring_handshaked = TRUE;
+       director_set_ring_synced(dir);
+}
+
+static void director_reconnect_timeout(struct director *dir)
+{
+       struct director_host *cur_host, *preferred_host =
+               director_get_preferred_right_host(dir);
+
+       cur_host = dir->right == NULL ? NULL :
+               director_connection_get_host(dir->right);
+
+       if (cur_host != preferred_host)
+               (void)director_connect_host(dir, preferred_host);
+       else {
+               /* the connection hasn't finished sync yet.
+                  keep this timeout for now. */
+       }
+}
+
+void director_set_ring_synced(struct director *dir)
+{
+       struct director_host *host;
+
+       i_assert(!dir->ring_synced);
+       i_assert((dir->left != NULL && dir->right != NULL) ||
+                (dir->left == NULL && dir->right == NULL));
+
+       if (dir->to_handshake_warning != NULL)
+               timeout_remove(&dir->to_handshake_warning);
+       if (dir->ring_handshake_warning_sent) {
+               i_warning("Ring is synced, continuing delayed requests");
+               dir->ring_handshake_warning_sent = FALSE;
+       }
+
+       host = dir->right == NULL ? NULL :
+               director_connection_get_host(dir->right);
+       if (host != director_get_preferred_right_host(dir)) {
+               /* try to reconnect to preferred host later */
+               if (dir->to_reconnect == NULL) {
+                       dir->to_reconnect =
+                               timeout_add(DIRECTOR_RECONNECT_TIMEOUT_MSECS,
+                                           director_reconnect_timeout, dir);
+               }
+       } else {
+               if (dir->to_reconnect != NULL)
+                       timeout_remove(&dir->to_reconnect);
+       }
+
        dir->ring_synced = TRUE;
        director_set_state_changed(dir);
 }
 
 static void director_sync(struct director *dir)
 {
-       /* we're synced again, once we receive this SYNC back */
+       if (dir->sync_frozen) {
+               dir->sync_pending = TRUE;
+               return;
+       }
+       if (dir->right == NULL) {
+               i_assert(!dir->ring_synced ||
+                        (dir->left == NULL && dir->right == NULL));
+               return;
+       }
+
+       /* we're synced again when we receive this SYNC back */
        dir->sync_seq++;
        dir->ring_synced = FALSE;
 
@@ -153,36 +239,94 @@ static void director_sync(struct director *dir)
                        director_connection_get_name(dir->right));
        }
 
+       if (dir->left != NULL)
+               director_connection_wait_sync(dir->left);
+       director_connection_wait_sync(dir->right);
        director_connection_send(dir->right, t_strdup_printf(
                "SYNC\t%s\t%u\t%u\n", net_ip2addr(&dir->self_ip),
                dir->self_port, dir->sync_seq));
 }
 
+void director_sync_freeze(struct director *dir)
+{
+       i_assert(!dir->sync_frozen);
+       i_assert(!dir->sync_pending);
+
+       if (dir->left != NULL)
+               director_connection_cork(dir->left);
+       if (dir->right != NULL)
+               director_connection_cork(dir->right);
+       dir->sync_frozen = TRUE;
+}
+
+void director_sync_thaw(struct director *dir)
+{
+       i_assert(dir->sync_frozen);
+
+       dir->sync_frozen = FALSE;
+       if (dir->sync_pending) {
+               dir->sync_pending = FALSE;
+               director_sync(dir);
+       }
+       if (dir->left != NULL)
+               director_connection_uncork(dir->left);
+       if (dir->right != NULL)
+               director_connection_uncork(dir->right);
+}
+
 void director_update_host(struct director *dir, struct director_host *src,
+                         struct director_host *orig_src,
                          struct mail_host *host)
 {
+       /* update state in case this is the first mail host being added */
        director_set_state_changed(dir);
 
+       if (orig_src == NULL) {
+               orig_src = dir->self_host;
+               orig_src->last_seq++;
+       }
+
        director_update_send(dir, src, t_strdup_printf(
-               "HOST\t%s\t%u\n", net_ip2addr(&host->ip), host->vhost_count));
+               "HOST\t%s\t%u\t%u\t%s\t%u\n",
+               net_ip2addr(&orig_src->ip), orig_src->port, orig_src->last_seq,
+               net_ip2addr(&host->ip), host->vhost_count));
        director_sync(dir);
 }
 
 void director_remove_host(struct director *dir, struct director_host *src,
+                         struct director_host *orig_src,
                          struct mail_host *host)
 {
-       director_update_send(dir, src, t_strdup_printf(
-               "HOST-REMOVE\t%s\n", net_ip2addr(&host->ip)));
+       if (src != NULL) {
+               if (orig_src == NULL) {
+                       orig_src = dir->self_host;
+                       orig_src->last_seq++;
+               }
+
+               director_update_send(dir, src, t_strdup_printf(
+                       "HOST-REMOVE\t%s\t%u\t%u\t%s\n",
+                       net_ip2addr(&orig_src->ip), orig_src->port,
+                       orig_src->last_seq, net_ip2addr(&host->ip)));
+       }
+
        user_directory_remove_host(dir->users, host);
        mail_host_remove(dir->mail_hosts, host);
        director_sync(dir);
 }
 
 void director_flush_host(struct director *dir, struct director_host *src,
+                        struct director_host *orig_src,
                         struct mail_host *host)
 {
+       if (orig_src == NULL) {
+               orig_src = dir->self_host;
+               orig_src->last_seq++;
+       }
+
        director_update_send(dir, src, t_strdup_printf(
-               "HOST-FLUSH\t%s\n", net_ip2addr(&host->ip)));
+               "HOST-FLUSH\t%s\t%u\t%u\t%s\n",
+               net_ip2addr(&orig_src->ip), orig_src->port, orig_src->last_seq,
+               net_ip2addr(&host->ip)));
        user_directory_remove_host(dir->users, host);
        director_sync(dir);
 }
@@ -245,6 +389,10 @@ void director_deinit(struct director **_dir)
        user_directory_deinit(&dir->users);
        mail_hosts_deinit(&dir->mail_hosts);
        mail_hosts_deinit(&dir->orig_config_hosts);
+       if (dir->to_reconnect != NULL)
+               timeout_remove(&dir->to_reconnect);
+       if (dir->to_handshake_warning != NULL)
+               timeout_remove(&dir->to_handshake_warning);
        if (dir->to_request != NULL)
                timeout_remove(&dir->to_request);
        array_foreach(&dir->dir_hosts, hostp)
index 1782dc2473b3129e7a84d94dd4af68f9f6591094..9e3afe1c56f374ec820daaefa08af218892552e9 100644 (file)
@@ -10,16 +10,6 @@ struct user;
 
 typedef void director_state_change_callback_t(struct director *dir);
 
-struct director_host_change {
-       /* originating director for this change. keep ip/port here separately,
-          because by the time its sync comes, the director itself may have
-          already been removed. */
-       struct ip_addr ip;
-       unsigned int port;
-       /* highest change sequence from this director */
-       unsigned int seq;
-};
-
 struct director {
        const struct director_settings *set;
 
@@ -31,6 +21,9 @@ struct director {
 
        struct director_host *self_host;
        struct director_connection *left, *right;
+       /* all director connections */
+       struct director_connection *connections;
+       struct timeout *to_reconnect;
 
        /* current mail hosts */
        struct mail_host_list *mail_hosts;
@@ -43,6 +36,7 @@ struct director {
        /* these requests are waiting for directors to be in synced */
        ARRAY_DEFINE(pending_requests, struct director_request *);
        struct timeout *to_request;
+       struct timeout *to_handshake_warning;
 
        director_state_change_callback_t *state_change_callback;
 
@@ -56,6 +50,8 @@ struct director {
        unsigned int ring_handshaked:1;
        unsigned int ring_handshake_warning_sent:1;
        unsigned int ring_synced:1;
+       unsigned int sync_frozen:1;
+       unsigned int sync_pending:1;
        unsigned int debug:1;
 };
 
@@ -73,17 +69,24 @@ void director_deinit(struct director **dir);
 void director_connect(struct director *dir);
 
 void director_set_ring_handshaked(struct director *dir);
+void director_set_ring_synced(struct director *dir);
 void director_set_state_changed(struct director *dir);
 
 void director_update_host(struct director *dir, struct director_host *src,
+                         struct director_host *orig_src,
                          struct mail_host *host);
 void director_remove_host(struct director *dir, struct director_host *src,
+                         struct director_host *orig_src,
                          struct mail_host *host);
 void director_flush_host(struct director *dir, struct director_host *src,
+                        struct director_host *orig_src,
                         struct mail_host *host);
 void director_update_user(struct director *dir, struct director_host *src,
                          struct user *user);
 
+void director_sync_freeze(struct director *dir);
+void director_sync_thaw(struct director *dir);
+
 /* Send data to all directors using both left and right connections
    (unless they're the same). */
 void director_update_send(struct director *dir, struct director_host *src,
index 6cd3004b4cb7bb6c5cd648e3c1e589e537c1a917..bb31c18ce7e6fd1100a41698ddae2121b8e9d3a0 100644 (file)
@@ -91,7 +91,7 @@ doveadm_cmd_host_set(struct doveadm_connection *conn, const char *line)
                host = mail_host_add_ip(dir->mail_hosts, &ip);
        if (vhost_count != -1U)
                mail_host_set_vhost_count(dir->mail_hosts, host, vhost_count);
-       director_update_host(dir, dir->self_host, host);
+       director_update_host(dir, dir->self_host, NULL, host);
 
        o_stream_send(conn->output, "OK\n", 3);
        return TRUE;
@@ -111,7 +111,8 @@ doveadm_cmd_host_remove(struct doveadm_connection *conn, const char *line)
        if (host == NULL)
                o_stream_send_str(conn->output, "NOTFOUND\n");
        else {
-               director_remove_host(conn->dir, conn->dir->self_host, host);
+               director_remove_host(conn->dir, conn->dir->self_host,
+                                    NULL, host);
                o_stream_send(conn->output, "OK\n", 3);
        }
        return TRUE;
@@ -122,8 +123,10 @@ doveadm_cmd_host_flush_all(struct doveadm_connection *conn)
 {
        struct mail_host *const *hostp;
 
-       array_foreach(mail_hosts_get(conn->dir->mail_hosts), hostp)
-               director_flush_host(conn->dir, conn->dir->self_host, *hostp);
+       array_foreach(mail_hosts_get(conn->dir->mail_hosts), hostp) {
+               director_flush_host(conn->dir, conn->dir->self_host,
+                                   NULL, *hostp);
+       }
        o_stream_send(conn->output, "OK\n", 3);
 }
 
@@ -146,7 +149,8 @@ doveadm_cmd_host_flush(struct doveadm_connection *conn, const char *line)
        if (host == NULL)
                o_stream_send_str(conn->output, "NOTFOUND\n");
        else {
-               director_flush_host(conn->dir, conn->dir->self_host, host);
+               director_flush_host(conn->dir, conn->dir->self_host,
+                                   NULL, host);
                o_stream_send(conn->output, "OK\n", 3);
        }
        return TRUE;
index ea5047b0610b55c903d20b20cae3b8ecb69a50ca..d0c108464ca670fc8b920316c8fb0cd89fae6bfa 100644 (file)
@@ -34,7 +34,7 @@ static int director_client_connected(int fd, const struct ip_addr *ip)
                return -1;
        }
 
-       director_connection_init_in(director, fd);
+       director_connection_init_in(director, fd, ip);
        return 0;
 }
 
@@ -166,7 +166,7 @@ int main(int argc, char *argv[])
                &director_setting_parser_info,
                NULL
        };
-       unsigned int test_port;
+       unsigned int test_port = 0;
        const char *error;
        bool debug = FALSE;
        int c;