#include "strescape.h"
#include "log-throttle.h"
#include "ipc-client.h"
+#include "program-client.h"
+#include "var-expand.h"
+#include "istream.h"
+#include "ostream.h"
+#include "iostream-temp.h"
#include "user-directory.h"
#include "mail-host.h"
#include "director-host.h"
#include "director.h"
#define DIRECTOR_IPC_PROXY_PATH "ipc"
-
#define DIRECTOR_RECONNECT_RETRY_SECS 60
#define DIRECTOR_RECONNECT_TIMEOUT_MSECS (30*1000)
#define DIRECTOR_USER_MOVE_TIMEOUT_MSECS (30*1000)
.unthrottle_at_max_per_interval = 2,
};
+static void
+director_user_kill_finish_delayed(struct director *dir, struct user *user,
+ bool skip_delay);
+
static bool director_is_self_ip_set(struct director *dir)
{
struct ip_addr ip;
struct director_user_kill_finish_ctx {
struct director *dir;
+ unsigned int username_hash;
struct user *user;
+ struct program_client *pclient;
+ struct ostream *reply;
+ char *socket_path;
};
+static void
+director_flush_user_continue(int result,
+ struct director_user_kill_finish_ctx *ctx)
+{
+ struct user *user =
+ user_directory_lookup(ctx->dir->users, ctx->username_hash);
+
+ if (user != NULL)
+ director_user_kill_finish_delayed(ctx->dir, user,
+ result == 1);
+ if (result == 0) {
+ struct istream *is = iostream_temp_finish(&ctx->reply, (size_t)-1);
+ char *data;
+ i_stream_set_return_partial_line(is, TRUE);
+ data = i_stream_read_next_line(is);
+ i_error("%s: Failed to flush user hash %u in host %s: %s",
+ ctx->socket_path,
+ user->username_hash,
+ net_ip2addr(&user->host->ip),
+ data == NULL ? "(no output to stdout)" : data);
+ while((data = i_stream_read_next_line(is)) != NULL) {
+ i_error("%s: Failed to flush user hash %u in host %s: %s",
+ ctx->socket_path,
+ user->username_hash,
+ net_ip2addr(&user->host->ip),
+ data);
+ }
+ i_stream_unref(&is);
+ } else {
+ o_stream_unref(&ctx->reply);
+ }
+ program_client_destroy(&ctx->pclient);
+ i_free(ctx->socket_path);
+ i_free(ctx);
+}
+
+static void
+director_flush_user(struct director *dir, struct user *user)
+{
+ struct var_expand_table tab[] = {
+ { 'i', net_ip2addr(&user->host->ip), "ip" },
+ { 'h', user->host->hostname, "host" },
+ { '\0', NULL, NULL }
+ };
+
+ /* execute flush script, if set */
+ if (*dir->set->director_flush_socket == '\0') {
+ director_user_kill_finish_delayed(dir, user, FALSE);
+ return;
+ }
+
+ struct director_user_kill_finish_ctx *ctx =
+ i_new(struct director_user_kill_finish_ctx, 1);
+ ctx->username_hash = user->username_hash;
+ ctx->dir = dir;
+
+ string_t *s_sock = str_new(default_pool, 32);
+ var_expand(s_sock, dir->set->director_flush_socket, tab);
+ ctx->socket_path = str_free_without_data(&s_sock);
+
+ const char *error;
+ struct program_client_settings set = {
+ .client_connect_timeout_msecs = 10000,
+ };
+
+ restrict_access_init(&set.restrict_set);
+
+ const char *const args[] = {"FLUSH",
+ t_strdup_printf("%u", user->username_hash), NULL};
+
+ if ((program_client_create(ctx->socket_path, args, &set, FALSE,
+ &ctx->pclient, &error)) != 0) {
+ i_error("%s: Failed to flush user hash %u in host %s: %s",
+ ctx->socket_path,
+ user->username_hash,
+ net_ip2addr(&user->host->ip),
+ error);
+ director_flush_user_continue(0, ctx);
+ return;
+ }
+
+ ctx->reply =
+ iostream_temp_create_named("/tmp", 0,
+ t_strdup_printf("flush response from %s",
+ net_ip2addr(&user->host->ip)));
+ o_stream_set_no_error_handling(ctx->reply, TRUE);
+ program_client_set_output(ctx->pclient, ctx->reply);
+ program_client_run_async(ctx->pclient, director_flush_user_continue, ctx);
+}
+
static void
director_user_kill_finish_delayed_to(struct director_user_kill_finish_ctx *ctx)
{
i_assert(ctx->user->kill_state == USER_KILL_STATE_DELAY);
ctx->user->kill_state = USER_KILL_STATE_NONE;
- timeout_remove(&ctx->user->to_move);
+ if (ctx->user->to_move != NULL)
+ timeout_remove(&ctx->user->to_move);
ctx->dir->state_change_callback(ctx->dir);
i_free(ctx);
}
static void
-director_user_kill_finish_delayed(struct director *dir, struct user *user)
+director_user_kill_finish_delayed(struct director *dir, struct user *user,
+ bool skip_delay)
{
struct director_user_kill_finish_ctx *ctx;
+ timeout_remove(&user->to_move);
+
+ if (skip_delay) {
+ user->kill_state = USER_KILL_STATE_NONE;
+ dir->state_change_callback(dir);
+ return;
+ }
+
ctx = i_new(struct director_user_kill_finish_ctx, 1);
ctx->dir = dir;
ctx->user = user;
user->kill_state = USER_KILL_STATE_DELAY;
- timeout_remove(&user->to_move);
/* wait for a while for the kills to finish in the backend server,
so there are no longer any processes running for the user before we
if (dir->right == NULL) {
/* we're alone */
- director_user_kill_finish_delayed(dir, user);
+ director_flush_user(dir, user);
} else if (self ||
user->kill_state == USER_KILL_STATE_KILLING_NOTIFY_RECEIVED) {
director_connection_send(dir->right, t_strdup_printf(
user->kill_state != USER_KILL_STATE_KILLED_WAITING_FOR_EVERYONE)
return;
- director_user_kill_finish_delayed(dir, user);
+ director_flush_user(dir, user);
if (orig_src == NULL) {
orig_src = dir->self_host;