net_addr2ip(args[0], &ip) < 0 ||
str_to_uint(args[1], &port) < 0 ||
str_to_uint(args[2], &seq) < 0) {
- i_error("director(%s): Command is missing parameters",
- conn->name);
+ i_error("director(%s): Command is missing parameters: %s",
+ conn->name, t_strarray_join(args, " "));
return -1;
}
*_args = args + 3;
return TRUE;
}
+static bool
+director_cmd_user_move(struct director_connection *conn,
+ const char *const *args)
+{
+ struct director_host *dir_host;
+ struct mail_host *host;
+ struct ip_addr ip;
+ unsigned int username_hash;
+ int ret;
+
+ if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
+ return ret > 0;
+
+ if (str_array_length(args) != 2 ||
+ str_to_uint(args[0], &username_hash) < 0 ||
+ net_addr2ip(args[1], &ip) < 0) {
+ i_error("director(%s): Invalid USER-MOVE args", conn->name);
+ return FALSE;
+ }
+
+ host = mail_host_lookup(conn->dir->mail_hosts, &ip);
+ if (host != NULL) {
+ director_move_user(conn->dir, conn->host, dir_host,
+ username_hash, host);
+ }
+ return TRUE;
+}
+
+static bool
+director_cmd_user_killed(struct director_connection *conn,
+ const char *const *args)
+{
+ struct director_host *dir_host;
+ unsigned int username_hash;
+
+ if (str_array_length(args) != 1 ||
+ str_to_uint(args[0], &username_hash) < 0) {
+ i_error("director(%s): Invalid USER-KILLED args", conn->name);
+ return FALSE;
+ }
+
+ director_user_killed(conn->dir, username_hash);
+ return TRUE;
+}
+
+static bool
+director_cmd_user_killed_everywhere(struct director_connection *conn,
+ const char *const *args)
+{
+ struct director_host *dir_host;
+ unsigned int username_hash;
+ int ret;
+
+ if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0)
+ return ret > 0;
+
+ if (str_array_length(args) != 1 ||
+ str_to_uint(args[0], &username_hash) < 0) {
+ i_error("director(%s): Invalid USER-KILLED-EVERYWHERE args",
+ conn->name);
+ return FALSE;
+ }
+
+ director_user_killed_everywhere(conn->dir, conn->host,
+ dir_host, username_hash);
+ return TRUE;
+}
+
static void director_handshake_cmd_done(struct director_connection *conn)
{
struct director *dir = conn->dir;
return director_cmd_host_remove(conn, args);
if (strcmp(cmd, "HOST-FLUSH") == 0)
return director_cmd_host_flush(conn, args);
+ if (strcmp(cmd, "USER-MOVE") == 0)
+ return director_cmd_user_move(conn, args);
+ if (strcmp(cmd, "USER-KILLED") == 0)
+ return director_cmd_user_killed(conn, args);
+ if (strcmp(cmd, "USER-KILLED-EVERYWHERE") == 0)
+ return director_cmd_user_killed_everywhere(conn, args);
if (strcmp(cmd, "DIRECTOR") == 0)
return director_cmd_director(conn, args);
if (strcmp(cmd, "SYNC") == 0)
}
user = user_directory_lookup(dir->users, request->username_hash);
- if (user != NULL)
+ if (user != NULL) {
+ if (user->kill_state != USER_KILL_STATE_NONE) {
+ /* delay processing this user's connections until
+ its existing connections have been killed */
+ return FALSE;
+ }
user_directory_refresh(dir->users, user);
- else {
+ } else {
if (!dir->ring_synced) {
/* delay adding new users until ring is again synced */
ring_log_delayed_warning(dir);
while [ $i != $director_count ]; do
i=`expr $i + 1`
dirs="$dirs 127.0.1.$i"
- echo "director 127.0.1.$i"
+ echo "127.0.1.$i director"
cat > dovecot-director$i.conf <<EOF
listen = 127.0.1.$i
base_dir = /var/run/dovecot$i
info_log_path = /var/log/dovecot-access.log
director_servers =$dirs
director_mail_servers = 127.0.0.1-127.0.0.255
+disable_plaintext_auth = no
ssl = no
service director {
#include "ioloop.h"
#include "array.h"
#include "str.h"
+#include "ipc-client.h"
#include "user-directory.h"
#include "mail-host.h"
#include "director-host.h"
#include "director-connection.h"
#include "director.h"
+#define DIRECTOR_IPC_PROXY_PATH "ipc"
+
#define DIRECTOR_RECONNECT_RETRY_SECS 60
#define DIRECTOR_RECONNECT_TIMEOUT_MSECS (30*1000)
+#define DIRECTOR_USER_MOVE_TIMEOUT_MSECS (30*1000)
+#define DIRECTOR_USER_MOVE_FINISH_DELAY_MSECS (12*1000)
static bool director_is_self_ip_set(struct director *dir)
{
net_ip2addr(&user->host->ip)));
}
+struct director_user_kill_finish_ctx {
+ struct director *dir;
+ struct user *user;
+};
+
+static void
+director_user_kill_finish_delayed_to(struct director_user_kill_finish_ctx *ctx)
+{
+ i_assert(ctx->user->kill_state == USER_KILL_STATE_DELAY);
+
+ ctx->user->kill_state = USER_KILL_STATE_NONE;
+ timeout_remove(&ctx->user->to_move);
+
+ ctx->dir->state_change_callback(ctx->dir);
+ i_free(ctx);
+}
+
+static void
+director_user_kill_finish_delayed(struct director *dir, struct user *user)
+{
+ struct director_user_kill_finish_ctx *ctx;
+
+ ctx = i_new(struct director_user_kill_finish_ctx, 1);
+ ctx->dir = dir;
+ ctx->user = user;
+
+ user->kill_state = USER_KILL_STATE_DELAY;
+ timeout_remove(&user->to_move);
+
+ user->to_move = timeout_add(DIRECTOR_USER_MOVE_FINISH_DELAY_MSECS,
+ director_user_kill_finish_delayed_to, ctx);
+}
+
+struct director_kill_context {
+ struct director *dir;
+ unsigned int username_hash;
+ bool self;
+};
+
+static void
+director_finish_user_kill(struct director *dir, struct user *user, bool self)
+{
+ if (dir->right == NULL || dir->right == dir->left) {
+ /* we're alone */
+ director_user_kill_finish_delayed(dir, user);
+ } else if (self ||
+ user->kill_state == USER_KILL_STATE_KILLING_NOTIFY_RECEIVED) {
+ director_connection_send(dir->right, t_strdup_printf(
+ "USER-KILLED\t%u\n", user->username_hash));
+ user->kill_state = USER_KILL_STATE_KILLED_WAITING_FOR_EVERYONE;
+ } else {
+ i_assert(user->kill_state == USER_KILL_STATE_KILLING);
+ user->kill_state = USER_KILL_STATE_KILLED_WAITING_FOR_NOTIFY;
+ }
+}
+
+static void director_kill_user_callback(enum ipc_client_cmd_state state,
+ const char *data, void *context)
+{
+ struct director_kill_context *ctx = context;
+ struct user *user;
+
+ switch (state) {
+ case IPC_CLIENT_CMD_STATE_REPLY:
+ return;
+ case IPC_CLIENT_CMD_STATE_OK:
+ break;
+ case IPC_CLIENT_CMD_STATE_ERROR:
+ i_error("Failed to kill user %u connections: %s",
+ ctx->username_hash, data);
+ /* we can't really do anything but continue anyway */
+ break;
+ }
+
+ user = user_directory_lookup(ctx->dir->users, ctx->username_hash);
+ if (user == NULL || user->kill_state == USER_KILL_STATE_NONE)
+ return;
+
+ director_finish_user_kill(ctx->dir, user, ctx->self);
+}
+
+static void director_user_move_timeout(struct user *user)
+{
+ i_error("Finishing user %u move timed out, "
+ "its state may now be inconsistent", user->username_hash);
+
+ user->kill_state = USER_KILL_STATE_NONE;
+ timeout_remove(&user->to_move);
+}
+
+void director_move_user(struct director *dir, struct director_host *src,
+ struct director_host *orig_src,
+ unsigned int username_hash, struct mail_host *host)
+{
+ struct user *user;
+ const char *cmd;
+ struct director_kill_context *ctx;
+
+ /* 1. move this user's host, and set its "killing" flag to delay all of
+ its future connections until all directors have killed the
+ connections and notified us about it.
+
+ 2. tell the other directors about the move
+
+ 3. once user kill callback is called, tell the other directors
+ with USER-KILLED that we're done killing the user.
+
+ 4. when some director gets a duplicate USER-KILLED, it's
+ responsible for notifying all directors that user is completely
+ killed.
+
+ 5. after receiving USER-KILLED-EVERYWHERE notification,
+ new connections are again allowed for the user.
+ */
+ user = user_directory_lookup(dir->users, username_hash);
+ if (user == NULL) {
+ user = user_directory_add(dir->users, username_hash,
+ host, ioloop_time);
+ } else {
+ if (user->host == host) {
+ /* user is already in this host */
+ return;
+ }
+ user->host->user_count--;
+ user->host = host;
+ user->host->user_count++;
+ user->timestamp = ioloop_time;
+ }
+ if (user->kill_state == USER_KILL_STATE_NONE) {
+ ctx = i_new(struct director_kill_context, 1);
+ ctx->dir = dir;
+ ctx->username_hash = username_hash;
+ ctx->self = src->self;
+
+ user->to_move = timeout_add(DIRECTOR_USER_MOVE_TIMEOUT_MSECS,
+ director_user_move_timeout, user);
+ user->kill_state = USER_KILL_STATE_KILLING;
+ cmd = t_strdup_printf("proxy\t*\tKILL-DIRECTOR-HASH\t%u",
+ username_hash);
+ ipc_client_cmd(dir->ipc_proxy, cmd,
+ director_kill_user_callback, ctx);
+ }
+
+ if (orig_src == NULL) {
+ orig_src = dir->self_host;
+ orig_src->last_seq++;
+ }
+ director_update_send(dir, src, t_strdup_printf(
+ "USER-MOVE\t%s\t%u\t%u\t%u\t%s\n",
+ net_ip2addr(&orig_src->ip), orig_src->port, orig_src->last_seq,
+ user->username_hash, net_ip2addr(&user->host->ip)));
+}
+
+void director_user_killed(struct director *dir, unsigned int username_hash)
+{
+ struct user *user;
+
+ user = user_directory_lookup(dir->users, username_hash);
+ if (user == NULL)
+ return;
+
+ switch (user->kill_state) {
+ case USER_KILL_STATE_KILLING:
+ user->kill_state = USER_KILL_STATE_KILLING_NOTIFY_RECEIVED;
+ break;
+ case USER_KILL_STATE_KILLED_WAITING_FOR_NOTIFY:
+ director_finish_user_kill(dir, user, TRUE);
+ break;
+ case USER_KILL_STATE_NONE:
+ case USER_KILL_STATE_DELAY:
+ case USER_KILL_STATE_KILLING_NOTIFY_RECEIVED:
+ break;
+ case USER_KILL_STATE_KILLED_WAITING_FOR_EVERYONE:
+ director_user_killed_everywhere(dir, dir->self_host,
+ NULL, username_hash);
+ break;
+ }
+}
+
+void director_user_killed_everywhere(struct director *dir,
+ struct director_host *src,
+ struct director_host *orig_src,
+ unsigned int username_hash)
+{
+ struct user *user;
+
+ user = user_directory_lookup(dir->users, username_hash);
+ if (user == NULL ||
+ user->kill_state != USER_KILL_STATE_KILLED_WAITING_FOR_EVERYONE)
+ return;
+
+ director_user_kill_finish_delayed(dir, user);
+
+ if (orig_src == NULL) {
+ orig_src = dir->self_host;
+ orig_src->last_seq++;
+ }
+ director_update_send(dir, src, t_strdup_printf(
+ "USER-KILLED-EVERYWHERE\t%s\t%u\t%u\t%u\n",
+ net_ip2addr(&orig_src->ip), orig_src->port, orig_src->last_seq,
+ user->username_hash));
+}
+
void director_set_state_changed(struct director *dir)
{
dir->state_change_callback(dir);
director_state_change_callback_t *callback)
{
struct director *dir;
+ const char *path;
dir = i_new(struct director, 1);
dir->set = set;
i_array_init(&dir->pending_requests, 16);
dir->users = user_directory_init(set->director_user_expire);
dir->mail_hosts = mail_hosts_init();
+
+ path = t_strconcat(set->base_dir, "/" DIRECTOR_IPC_PROXY_PATH, NULL);
+ dir->ipc_proxy = ipc_client_init(path);
return dir;
}
user_directory_deinit(&dir->users);
mail_hosts_deinit(&dir->mail_hosts);
mail_hosts_deinit(&dir->orig_config_hosts);
+
+ ipc_client_deinit(&dir->ipc_proxy);
if (dir->to_reconnect != NULL)
timeout_remove(&dir->to_reconnect);
if (dir->to_handshake_warning != NULL)
/* director hosts are sorted by IP (and port) */
ARRAY_DEFINE(dir_hosts, struct director_host *);
+ struct ipc_client *ipc_proxy;
unsigned int sync_seq;
/* director ring handshaking is complete.
struct mail_host *host);
void director_update_user(struct director *dir, struct director_host *src,
struct user *user);
+void director_move_user(struct director *dir, struct director_host *src,
+ struct director_host *orig_src,
+ unsigned int username_hash, struct mail_host *host);
+void director_user_killed(struct director *dir, unsigned int username_hash);
+void director_user_killed_everywhere(struct director *dir,
+ struct director_host *src,
+ struct director_host *orig_src,
+ unsigned int username_hash);
void director_sync_freeze(struct director *dir);
void director_sync_thaw(struct director *dir);
return TRUE;
}
+static bool
+doveadm_cmd_user_move(struct doveadm_connection *conn, const char *line)
+{
+ unsigned int username_hash;
+ const char *const *args;
+ struct user *user;
+ struct mail_host *host;
+ struct ip_addr ip;
+
+ args = t_strsplit(line, "\t");
+ if (args[0] == NULL || args[1] == NULL ||
+ net_addr2ip(args[1], &ip) < 0) {
+ i_error("doveadm sent invalid USER-MOVE parameters: %s", line);
+ return FALSE;
+ }
+ host = mail_host_lookup(conn->dir->mail_hosts, &ip);
+ if (host == NULL) {
+ o_stream_send_str(conn->output, "NOTFOUND\n");
+ return TRUE;
+ }
+
+ if (str_to_uint(args[0], &username_hash) < 0)
+ username_hash = user_directory_get_username_hash(line);
+ user = user_directory_lookup(conn->dir->users, username_hash);
+ if (user != NULL && user->kill_state != USER_KILL_STATE_NONE) {
+ o_stream_send_str(conn->output, "TRYAGAIN\n");
+ return TRUE;
+ }
+
+ director_move_user(conn->dir, conn->dir->self_host, NULL,
+ username_hash, host);
+ o_stream_send(conn->output, "OK\n", 3);
+ return TRUE;
+}
+
static void doveadm_connection_input(struct doveadm_connection *conn)
{
const char *line, *cmd, *args;
ret = doveadm_cmd_user_lookup(conn, args);
else if (strcmp(cmd, "USER-LIST") == 0)
ret = doveadm_cmd_user_list(conn, args);
+ else if (strcmp(cmd, "USER-MOVE") == 0)
+ ret = doveadm_cmd_user_move(conn, args);
else {
i_error("doveadm sent unknown command: %s", line);
ret = FALSE;
unsigned int user_directory_get_username_hash(const char *username)
{
+ /* NOTE: If you modify this, modify also
+ director_username_hash() in login-common/login-proxy.c */
unsigned char md5[MD5_RESULTLEN];
unsigned int i, hash = 0;
#ifndef USER_DIRECTORY_H
#define USER_DIRECTORY_H
+enum user_kill_state {
+ /* User isn't being killed */
+ USER_KILL_STATE_NONE,
+ /* We're still killing the user's connections */
+ USER_KILL_STATE_KILLING,
+ /* Like above, but our left side already announced it was finished
+ with killing its user connections */
+ USER_KILL_STATE_KILLING_NOTIFY_RECEIVED,
+ /* We're done killing, but we have to wait for the left side to
+ finish killing its user connections before sending USER-KILLED to
+ our right side */
+ USER_KILL_STATE_KILLED_WAITING_FOR_NOTIFY,
+ /* We're done killing, but waiting for USER-KILLED-EVERYWHERE
+ notification until this state gets reset. */
+ USER_KILL_STATE_KILLED_WAITING_FOR_EVERYONE,
+ /* Wait for a while for the user connections to actually die */
+ USER_KILL_STATE_DELAY
+};
+
struct user {
/* sorted by time */
struct user *prev, *next;
unsigned int timestamp;
struct mail_host *host;
+
+ /* Move timeout to make sure user's connections won't silently hang
+ indefinitely if there is some trouble moving it. */
+ struct timeout *to_move;
+ /* If not USER_KILL_STATE_NONE, don't allow new connections until all
+ directors have killed the user's connections. */
+ enum user_kill_state kill_state;
};
/* Create a new directory. Users are dropped if their time gets older
director_disconnect(ctx);
}
+static void cmd_director_move(int argc, char *argv[])
+{
+ struct director_context *ctx;
+ struct ip_addr *ips;
+ unsigned int ips_count, user_hash;
+ const char *host, *line, *ip_str;
+
+ ctx = cmd_director_init(argc, argv, "a:", cmd_director_move);
+ if (argv[optind] == NULL || argv[optind+1] == NULL ||
+ argv[optind+2] != NULL)
+ director_cmd_help(cmd_director_move);
+
+ user_hash = director_username_hash(argv[optind++]);
+ host = argv[optind];
+
+ director_get_host(host, &ips, &ips_count);
+ ip_str = net_ip2addr(&ips[0]);
+ director_send(ctx, t_strdup_printf(
+ "USER-MOVE\t%u\t%s\n", user_hash, ip_str));
+ line = i_stream_read_next_line(ctx->input);
+ if (line == NULL)
+ fprintf(stderr, "failed\n");
+ else if (strcmp(line, "OK") == 0) {
+ if (doveadm_verbose)
+ printf("User hash %u moved to %s\n", user_hash, ip_str);
+ } else if (strcmp(line, "NOTFOUND") == 0) {
+ fprintf(stderr, "Host '%s' doesn't exist\n", ip_str);
+ } else if (strcmp(line, "TRYAGAIN") == 0) {
+ fprintf(stderr, "User is already being moved, "
+ "wait a while for it to be finished\n");
+ } else {
+ fprintf(stderr, "failed: %s\n", line);
+ }
+ director_disconnect(ctx);
+}
+
static void cmd_director_flush_all(struct director_context *ctx)
{
const char *line;
"[-a <director socket path>] <host> [<vhost count>]" },
{ cmd_director_remove, "director remove",
"[-a <director socket path>] <host>" },
+ { cmd_director_move, "director move",
+ "[-a <director socket path>] <user> <host>" },
{ cmd_director_flush, "director flush",
"[-a <director socket path>] <host>|all" },
{ cmd_director_dump, "director dump",
ipc_cmd_success_reply(&cmd, t_strdup_printf("%u", count));
}
+static unsigned int director_username_hash(const char *username)
+{
+ /* NOTE: If you modify this, modify also
+ user_directory_get_username_hash() in director/user-director.c */
+ unsigned char md5[MD5_RESULTLEN];
+ unsigned int i, hash = 0;
+
+ md5_get_digest(username, strlen(username), md5);
+ for (i = 0; i < sizeof(hash); i++)
+ hash = (hash << CHAR_BIT) | md5[i];
+ return hash;
+}
+
+static void
+login_proxy_cmd_kill_director_hash(struct ipc_cmd *cmd, const char *const *args)
+{
+ struct login_proxy *proxy, *next;
+ unsigned int hash, count = 0;
+
+ if (args[0] == NULL || str_to_uint(args[0], &hash) < 0) {
+ ipc_cmd_fail(&cmd, "Invalid parameters");
+ return;
+ }
+
+ for (proxy = login_proxies; proxy != NULL; proxy = next) {
+ next = proxy->next;
+
+ if (director_username_hash(proxy->client->virtual_user) == hash) {
+ login_proxy_free_reason(&proxy, KILLED_BY_ADMIN_REASON);
+ count++;
+ }
+ }
+ for (proxy = login_proxies_pending; proxy != NULL; proxy = next) {
+ next = proxy->next;
+
+ if (director_username_hash(proxy->client->virtual_user) == hash) {
+ client_destroy(proxy->client, "Connection killed");
+ count++;
+ }
+ }
+ ipc_cmd_success_reply(&cmd, t_strdup_printf("%u", count));
+}
+
static void
login_proxy_cmd_list_reply(struct ipc_cmd *cmd,
struct login_proxy *proxy)
args++;
if (strcmp(name, "KILL") == 0)
login_proxy_cmd_kill(cmd, args);
+ else if (strcmp(name, "KILL-DIRECTOR-HASH") == 0)
+ login_proxy_cmd_kill_director_hash(cmd, args);
else if (strcmp(name, "LIST") == 0)
login_proxy_cmd_list(cmd, args);
else