}
}
-struct director_user_kill_finish_ctx {
- struct director *dir;
- unsigned int username_hash;
- struct ip_addr host_ip;
- struct user *user;
- struct program_client *pclient;
- struct ostream *reply;
- char *socket_path;
-};
-
static void
-director_flush_user_continue(int result,
- struct director_user_kill_finish_ctx *ctx)
+director_flush_user_continue(int result, struct director_kill_context *ctx)
{
+ struct director *dir = ctx->dir;
struct user *user =
- user_directory_lookup(ctx->dir->users, ctx->username_hash);
+ user_directory_lookup(dir->users, ctx->username_hash);
+
+ ctx->callback_pending = FALSE;
- if (user == NULL) {
- dir_debug("User %u freed while flushing, result=%d",
- ctx->username_hash, result);
- } else if (user->kill_state != USER_KILL_STATE_FLUSHING) {
- dir_debug("User %u move state changed while flushing, result=%d",
- ctx->username_hash, result);
- } else {
- dir_debug("Flushing user %u finished, result=%d",
- ctx->username_hash, result);
- director_user_kill_finish_delayed(ctx->dir, user,
- result == 1);
- }
if (result == 0) {
struct istream *is = iostream_temp_finish(&ctx->reply, (size_t)-1);
char *data;
o_stream_unref(&ctx->reply);
}
program_client_destroy(&ctx->pclient);
- i_free(ctx->socket_path);
- i_free(ctx);
+
+ if (!DIRECTOR_KILL_CONTEXT_IS_VALID(user, ctx)) {
+ /* user was already freed - ignore */
+ dir_debug("User %u freed while flushing, result=%d",
+ ctx->username_hash, result);
+ i_assert(ctx->to_move == NULL);
+ i_free(ctx);
+ } else {
+ /* ctx is freed later via user->kill_ctx */
+ dir_debug("Flushing user %u finished, result=%d",
+ ctx->username_hash, result);
+ director_user_kill_finish_delayed(dir, user, result == 1);
+ }
}
static void
director_flush_user(struct director *dir, struct user *user)
{
+ struct director_kill_context *ctx = user->kill_ctx;
struct var_expand_table tab[] = {
{ 'i', net_ip2addr(&user->host->ip), "ip" },
{ 'h', user->host->hostname, "host" },
would be redundant since they're all supposed to be performing the
same flush task to the same backend. */
if (*dir->set->director_flush_socket == '\0' ||
- !user->kill_is_self_initiated) {
+ !ctx->kill_is_self_initiated) {
director_user_kill_finish_delayed(dir, user, FALSE);
return;
}
- struct director_user_kill_finish_ctx *ctx =
- i_new(struct director_user_kill_finish_ctx, 1);
- ctx->username_hash = user->username_hash;
ctx->host_ip = user->host->ip;
- ctx->dir = dir;
string_t *s_sock = str_new(default_pool, 32);
var_expand(s_sock, dir->set->director_flush_socket, tab);
const char *const args[] = {"FLUSH",
t_strdup_printf("%u", user->username_hash), NULL};
- user->kill_state = USER_KILL_STATE_FLUSHING;
+ ctx->kill_state = USER_KILL_STATE_FLUSHING;
dir_debug("Flushing user %u via %s", user->username_hash,
ctx->socket_path);
net_ip2addr(&user->host->ip)));
o_stream_set_no_error_handling(ctx->reply, TRUE);
program_client_set_output(ctx->pclient, ctx->reply);
+ ctx->callback_pending = TRUE;
program_client_run_async(ctx->pclient, director_flush_user_continue, ctx);
}
-static void director_user_move_free(struct director *dir, struct user *user)
+static void director_user_move_free(struct user *user)
{
- i_assert(user->to_move != NULL);
+ struct director *dir = user->kill_ctx->dir;
+ struct director_kill_context *kill_ctx = user->kill_ctx;
+
+ i_assert(kill_ctx != NULL);
dir_debug("User %u move finished at state=%s", user->username_hash,
- user_kill_state_names[user->kill_state]);
+ user_kill_state_names[kill_ctx->kill_state]);
- user->kill_is_self_initiated = FALSE;
- user->kill_state = USER_KILL_STATE_NONE;
- timeout_remove(&user->to_move);
+ if (kill_ctx->to_move != NULL)
+ timeout_remove(&kill_ctx->to_move);
+ i_free(kill_ctx->socket_path);
+ i_free(kill_ctx);
+ user->kill_ctx = NULL;
i_assert(dir->users_moving_count > 0);
dir->users_moving_count--;
}
static void
-director_user_kill_finish_delayed_to(struct director_user_kill_finish_ctx *ctx)
+director_user_kill_finish_delayed_to(struct user *user)
{
- i_assert(ctx->user->kill_state == USER_KILL_STATE_DELAY);
+ i_assert(user->kill_ctx != NULL);
+ i_assert(user->kill_ctx->kill_state == USER_KILL_STATE_DELAY);
- director_user_move_free(ctx->dir, ctx->user);
- i_free(ctx);
+ director_user_move_free(user);
}
static void
director_user_kill_finish_delayed(struct director *dir, struct user *user,
bool skip_delay)
{
- struct director_user_kill_finish_ctx *ctx;
-
if (skip_delay) {
- director_user_move_free(dir, user);
+ user->kill_ctx->kill_state = USER_KILL_STATE_NONE;
+ director_user_move_free(user);
return;
}
- ctx = i_new(struct director_user_kill_finish_ctx, 1);
- ctx->dir = dir;
- ctx->user = user;
-
- user->kill_state = USER_KILL_STATE_DELAY;
+ user->kill_ctx->kill_state = USER_KILL_STATE_DELAY;
/* wait for a while for the kills to finish in the backend server,
so there are no longer any processes running for the user before we
start letting new in connections to the new server. */
- timeout_remove(&user->to_move);
- user->to_move = timeout_add(dir->set->director_user_kick_delay * 1000,
- director_user_kill_finish_delayed_to, ctx);
+ timeout_remove(&user->kill_ctx->to_move);
+ user->kill_ctx->to_move =
+ timeout_add(dir->set->director_user_kick_delay * 1000,
+ director_user_kill_finish_delayed_to, user);
}
-struct director_kill_context {
- struct director *dir;
- unsigned int username_hash;
- bool kill_is_self_initiated;
-};
-
static void
director_finish_user_kill(struct director *dir, struct user *user, bool self)
{
- i_assert(user->kill_state != USER_KILL_STATE_FLUSHING);
- i_assert(user->kill_state != USER_KILL_STATE_DELAY);
+ struct director_kill_context *kill_ctx = user->kill_ctx;
+
+ i_assert(kill_ctx != NULL);
+ i_assert(kill_ctx->kill_state != USER_KILL_STATE_FLUSHING);
+ i_assert(kill_ctx->kill_state != USER_KILL_STATE_DELAY);
if (dir->right == NULL) {
/* we're alone */
director_flush_user(dir, user);
} else if (self ||
- user->kill_state == USER_KILL_STATE_KILLING_NOTIFY_RECEIVED) {
+ kill_ctx->kill_state == USER_KILL_STATE_KILLING_NOTIFY_RECEIVED) {
director_connection_send(dir->right, t_strdup_printf(
"USER-KILLED\t%u\n", user->username_hash));
- user->kill_state = USER_KILL_STATE_KILLED_WAITING_FOR_EVERYONE;
+ kill_ctx->kill_state = USER_KILL_STATE_KILLED_WAITING_FOR_EVERYONE;
} else {
- i_assert(user->kill_state == USER_KILL_STATE_KILLING);
- user->kill_state = USER_KILL_STATE_KILLED_WAITING_FOR_NOTIFY;
+ i_assert(kill_ctx->kill_state == USER_KILL_STATE_KILLING);
+ kill_ctx->kill_state = USER_KILL_STATE_KILLED_WAITING_FOR_NOTIFY;
}
}
break;
}
+ ctx->callback_pending = FALSE;
+
user = user_directory_lookup(ctx->dir->users, ctx->username_hash);
- if (user == NULL) {
+ if (!DIRECTOR_KILL_CONTEXT_IS_VALID(user, ctx)) {
/* user was already freed - ignore */
- } else if (user->kill_state == USER_KILL_STATE_KILLING ||
- user->kill_state == USER_KILL_STATE_KILLING_NOTIFY_RECEIVED) {
+ i_assert(ctx->to_move == NULL);
+ i_free(ctx);
+ } else {
+ i_assert(ctx->kill_state == USER_KILL_STATE_KILLING ||
+ ctx->kill_state == USER_KILL_STATE_KILLING_NOTIFY_RECEIVED);
/* we were still waiting for the kill notification */
director_finish_user_kill(ctx->dir, user, ctx->kill_is_self_initiated);
- } else {
- /* we don't currently want to kill the user */
}
- i_free(ctx);
}
static void director_user_move_throttled(unsigned int new_events_count,
static void director_user_move_timeout(struct user *user)
{
- i_assert(user->kill_state != USER_KILL_STATE_FLUSHING);
- i_assert(user->kill_state != USER_KILL_STATE_DELAY);
+ i_assert(user->kill_ctx != NULL);
+ i_assert(user->kill_ctx->kill_state != USER_KILL_STATE_FLUSHING);
+ i_assert(user->kill_ctx->kill_state != USER_KILL_STATE_DELAY);
if (log_throttle_accept(user_move_throttle)) {
i_error("Finishing user %u move timed out, "
"its state may now be inconsistent (state=%s)",
user->username_hash,
- user_kill_state_names[user->kill_state]);
+ user_kill_state_names[user->kill_ctx->kill_state]);
}
-
- /* FIXME: shouldn't use global director, but for now there's no easy
- way to get access to it otherwise */
- director_user_move_free(director, user);
+ director_user_move_free(user);
}
void director_move_user(struct director *dir, struct director_host *src,
user->timestamp = ioloop_time;
}
if (!USER_IS_BEING_KILLED(user)) {
- ctx = i_new(struct director_kill_context, 1);
+ user->kill_ctx = ctx = i_new(struct director_kill_context, 1);
ctx->dir = dir;
ctx->username_hash = username_hash;
ctx->kill_is_self_initiated = src->self;
dir->users_moving_count++;
- user->to_move = timeout_add(DIRECTOR_USER_MOVE_TIMEOUT_MSECS,
- director_user_move_timeout, user);
- user->kill_is_self_initiated = src->self;
- user->kill_state = USER_KILL_STATE_KILLING;
+ ctx->to_move = timeout_add(DIRECTOR_USER_MOVE_TIMEOUT_MSECS,
+ director_user_move_timeout, user);
+ ctx->kill_state = USER_KILL_STATE_KILLING;
+
cmd = t_strdup_printf("proxy\t*\tKICK-DIRECTOR-HASH\t%u",
username_hash);
+ ctx->callback_pending = TRUE;
ipc_client_cmd(dir->ipc_proxy, cmd,
director_kill_user_callback, ctx);
} else {
finished. We'll just continue wherever we left off
earlier. */
dir_debug("User %u move restarted - previous kill_state=%s",
- username_hash, user_kill_state_names[user->kill_state]);
+ username_hash, user_kill_state_names[user->kill_ctx->kill_state]);
}
if (orig_src == NULL) {
struct user *user;
user = user_directory_lookup(dir->users, username_hash);
- if (user == NULL)
+ if (user == NULL || !USER_IS_BEING_KILLED(user))
return;
- switch (user->kill_state) {
+ switch (user->kill_ctx->kill_state) {
case USER_KILL_STATE_KILLING:
- user->kill_state = USER_KILL_STATE_KILLING_NOTIFY_RECEIVED;
+ user->kill_ctx->kill_state = USER_KILL_STATE_KILLING_NOTIFY_RECEIVED;
break;
case USER_KILL_STATE_KILLED_WAITING_FOR_NOTIFY:
director_finish_user_kill(dir, user, TRUE);
break;
case USER_KILL_STATE_KILLING_NOTIFY_RECEIVED:
dir_debug("User %u kill_state=%s - ignoring USER-KILLED",
- username_hash, user_kill_state_names[user->kill_state]);
+ username_hash, user_kill_state_names[user->kill_ctx->kill_state]);
break;
case USER_KILL_STATE_NONE:
case USER_KILL_STATE_FLUSHING:
username_hash);
return;
}
- if (user->kill_state != USER_KILL_STATE_KILLED_WAITING_FOR_EVERYONE) {
+ if (!USER_IS_BEING_KILLED(user)) {
+ dir_debug("User %u is no longer being killed - ignoring USER-KILLED-EVERYWHERE",
+ username_hash);
+ return;
+ }
+ if (user->kill_ctx->kill_state != USER_KILL_STATE_KILLED_WAITING_FOR_EVERYONE) {
dir_debug("User %u kill_state=%s - ignoring USER-KILLED-EVERYWHERE",
- username_hash, user_kill_state_names[user->kill_state]);
+ username_hash, user_kill_state_names[user->kill_ctx->kill_state]);
return;
}
}
}
+static void director_user_freed(struct user *user)
+{
+ if (user->kill_ctx != NULL) {
+ /* director_user_expire is very short. user expired before
+ moving the user finished or timed out. */
+ if (user->kill_ctx->callback_pending) {
+ /* kill_ctx is used as a callback parameter.
+ only remove the timeout and finish the free later. */
+ if (user->kill_ctx->to_move != NULL)
+ timeout_remove(&user->kill_ctx->to_move);
+ } else {
+ director_user_move_free(user);
+ }
+ }
+}
+
struct director *
director_init(const struct director_settings *set,
const struct ip_addr *listen_ip, in_port_t listen_port,
i_array_init(&dir->pending_requests, 16);
i_array_init(&dir->connections, 8);
dir->users = user_directory_init(set->director_user_expire,
- set->director_username_hash);
+ set->director_username_hash,
+ director_user_freed);
dir->mail_hosts = mail_hosts_init(set->director_consistent_hashing);
dir->ipc_proxy = ipc_client_init(DIRECTOR_IPC_PROXY_PATH);