From: Aki Tuomi Date: Thu, 29 Jun 2023 15:11:26 +0000 (+0300) Subject: global: Remove replicator X-Git-Tag: 2.4.0~2649 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=4c04e4c30fd4817a8b0e11d04d9681173f696f41;p=thirdparty%2Fdovecot%2Fcore.git global: Remove replicator --- diff --git a/configure.ac b/configure.ac index f09d0254ad..74090b44a1 100644 --- a/configure.ac +++ b/configure.ac @@ -804,9 +804,6 @@ src/pop3/Makefile src/pop3-login/Makefile src/submission/Makefile src/submission-login/Makefile -src/replication/Makefile -src/replication/aggregator/Makefile -src/replication/replicator/Makefile src/stats/Makefile src/util/Makefile src/plugins/Makefile @@ -828,7 +825,6 @@ src/plugins/pop3-migration/Makefile src/plugins/quota/Makefile src/plugins/quota-clone/Makefile src/plugins/imap-quota/Makefile -src/plugins/replication/Makefile src/plugins/trash/Makefile src/plugins/virtual/Makefile src/plugins/welcome/Makefile diff --git a/src/Makefile.am b/src/Makefile.am index 5f464f50b4..408933e9c1 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -66,7 +66,6 @@ SUBDIRS = \ lmtp \ log \ config \ - replication \ util \ doveadm \ stats \ diff --git a/src/doveadm/Makefile.am b/src/doveadm/Makefile.am index 0460fcc0cd..eafecc5f37 100644 --- a/src/doveadm/Makefile.am +++ b/src/doveadm/Makefile.am @@ -80,7 +80,6 @@ doveadm_common_cmds = \ doveadm-master.c \ doveadm-mutf7.c \ doveadm-penalty.c \ - doveadm-replicator.c \ doveadm-sis.c \ doveadm-stats.c \ doveadm-who.c diff --git a/src/doveadm/doveadm-cmd.c b/src/doveadm/doveadm-cmd.c index 2530998af9..1f64fd3446 100644 --- a/src/doveadm/doveadm-cmd.c +++ b/src/doveadm/doveadm-cmd.c @@ -115,7 +115,6 @@ void doveadm_cmds_init(void) doveadm_register_instance_commands(); doveadm_register_log_commands(); - doveadm_register_replicator_commands(); doveadm_register_dict_commands(); doveadm_register_fs_commands(); } diff --git a/src/doveadm/doveadm-cmd.h b/src/doveadm/doveadm-cmd.h index 3c1b47d364..739610d1e4 100644 --- a/src/doveadm/doveadm-cmd.h +++ b/src/doveadm/doveadm-cmd.h @@ -13,7 +13,6 @@ void doveadm_register_auth_server_commands(void); void doveadm_register_log_commands(void); void doveadm_register_instance_commands(void); void doveadm_register_mount_commands(void); -void doveadm_register_replicator_commands(void); void doveadm_register_dict_commands(void); void doveadm_register_fs_commands(void); diff --git a/src/doveadm/doveadm-dsync.c b/src/doveadm/doveadm-dsync.c index 14b6903ebc..514ce862f2 100644 --- a/src/doveadm/doveadm-dsync.c +++ b/src/doveadm/doveadm-dsync.c @@ -111,7 +111,6 @@ struct dsync_cmd_context { bool remote_user_prefix:1; bool no_mail_sync:1; bool local_location_from_arg:1; - bool replicator_notify:1; bool exited:1; bool empty_hdr_workaround:1; bool no_header_hashes:1; @@ -638,51 +637,6 @@ cmd_dsync_ibc_stream_init(struct dsync_cmd_context *ctx, name, temp_prefix, ctx->io_timeout_secs); } -static void -dsync_replicator_notify(struct dsync_cmd_context *ctx, - enum dsync_brain_sync_type sync_type, - const char *state_str) -{ -#define REPLICATOR_HANDSHAKE "VERSION\treplicator-doveadm-client\t1\t0\n" - const char *path; - string_t *str; - int fd; - - path = t_strdup_printf("%s/replicator-doveadm", - ctx->ctx.cur_mail_user->set->base_dir); - fd = net_connect_unix(path); - if (fd == -1) { - if (errno == ECONNREFUSED || errno == ENOENT) { - /* replicator not running on this server. ignore. */ - return; - } - e_error(ctx->ctx.cctx->event, - "net_connect_unix(%s) failed: %m", path); - return; - } - fd_set_nonblock(fd, FALSE); - - str = t_str_new(128); - str_append(str, REPLICATOR_HANDSHAKE"NOTIFY\t"); - str_append_tabescaped(str, ctx->ctx.cur_mail_user->username); - str_append_c(str, '\t'); - if (sync_type == DSYNC_BRAIN_SYNC_TYPE_FULL) - str_append_c(str, 'f'); - str_append_c(str, '\t'); - str_append_tabescaped(str, state_str); - str_append_c(str, '\n'); - if (write_full(fd, str_data(str), str_len(str)) < 0) { - e_error(ctx->ctx.cctx->event, - "write(%s) failed: %m", path); - } - /* we only wanted to notify replicator. we don't care enough about the - answer to wait for it. */ - if (close(fd) < 0) { - e_error(ctx->ctx.cctx->event, - "close(%s) failed: %m", path); - } -} - static void dsync_errors_finish(struct dsync_cmd_context *ctx) { if (ctx->err_stream == NULL) @@ -711,19 +665,10 @@ cmd_dsync_run(struct doveadm_mail_cmd_context *_ctx, struct mail_user *user) const char *const *strp; enum dsync_brain_flags brain_flags; enum mail_error mail_error = 0, mail_error2; - bool cli = (cctx->conn_type == DOVEADM_CONNECTION_TYPE_CLI); const char *changes_during_sync, *changes_during_sync2 = NULL; bool remote_only_changes; int ret = 0; - /* replicator_notify indicates here automated attempt, - we still want to allow manual sync/backup */ - if (!cli && ctx->replicator_notify && - mail_user_plugin_getenv_bool(_ctx->cur_mail_user, "noreplicate")) { - ctx->ctx.exit_code = DOVEADM_EX_NOREPLICATE; - return -1; - } - i_zero(&set); if (cctx->remote_ip.family != 0) { /* include the doveadm client's IP address in the ps output */ @@ -837,8 +782,7 @@ cmd_dsync_run(struct doveadm_mail_cmd_context *_ctx, struct mail_user *user) changes_during_sync = dsync_brain_get_unexpected_changes_reason(brain, &remote_only_changes); if (changes_during_sync != NULL || changes_during_sync2 != NULL) { - /* don't log a warning when running via doveadm server - (e.g. called by replicator) */ + /* don't log a warning when running via doveadm server */ const char *msg = t_strdup_printf( "Mailbox changes caused a desync. " "You may want to run dsync again: %s", @@ -919,11 +863,6 @@ static void dsync_connected_callback(const struct doveadm_server_reply *reply, case EX_NOUSER: ctx->error = "Unknown user in remote"; break; - case DOVEADM_EX_NOREPLICATE: - if (doveadm_debug) - e_debug(ctx->ctx.cctx->event, - "user is disabled for replication"); - break; default: ctx->error = p_strdup_printf(ctx->ctx.pool, "Failed to start remote dsync-server command: " @@ -946,8 +885,6 @@ static void dsync_server_run_command(struct dsync_cmd_context *ctx, str_append_tabescaped(cmd, cctx->username); str_append(cmd, "\tdsync-server\t-u"); str_append_tabescaped(cmd, cctx->username); - if (ctx->replicator_notify) - str_append(cmd, "\t-U"); str_append_c(cmd, '\n'); ctx->tcp_conn = conn; @@ -1240,8 +1177,6 @@ static void cmd_dsync_preinit(struct doveadm_mail_cmd_context *_ctx) i_fatal("Invalid -I parameter '%s': %s", value_str, error); (void)doveadm_cmd_param_uint32(cctx, "timeout", &ctx->io_timeout_secs); - ctx->replicator_notify = - doveadm_cmd_param_flag(cctx, "replicator-notify"); if (!doveadm_cmd_param_array(cctx, "destination", &ctx->destination)) ctx->destination = empty_str_array; @@ -1295,20 +1230,11 @@ cmd_dsync_server_run(struct doveadm_mail_cmd_context *_ctx, bool cli = (cctx->conn_type == DOVEADM_CONNECTION_TYPE_CLI); struct dsync_ibc *ibc; struct dsync_brain *brain; - string_t *temp_prefix, *state_str = NULL; - enum dsync_brain_sync_type sync_type; + string_t *temp_prefix; const char *name, *process_title_prefix = ""; enum mail_error mail_error; if (!cli) { - /* replicator_notify indicates here automated attempt, - we still want to allow manual sync/backup */ - if (ctx->replicator_notify && - mail_user_plugin_getenv_bool(_ctx->cur_mail_user, "noreplicate")) { - _ctx->exit_code = DOVEADM_EX_NOREPLICATE; - return -1; - } - /* doveadm-server connection. start with a success reply. after that follows the regular dsync protocol. */ ctx->fd_in = ctx->fd_out = -1; @@ -1346,12 +1272,6 @@ cmd_dsync_server_run(struct doveadm_mail_cmd_context *_ctx, /* io_loop_run() deactivates the context - put it back */ mail_storage_service_io_activate_user(ctx->ctx.cur_service_user); - if (ctx->replicator_notify) { - state_str = t_str_new(128); - dsync_brain_get_state(brain, state_str); - } - sync_type = dsync_brain_get_sync_type(brain); - if (dsync_brain_deinit(&brain, &mail_error) < 0) doveadm_mail_failed_error(_ctx, mail_error); dsync_ibc_deinit(&ibc); @@ -1364,8 +1284,6 @@ cmd_dsync_server_run(struct doveadm_mail_cmd_context *_ctx, i_stream_unref(&ctx->input); o_stream_unref(&ctx->output); - if (ctx->replicator_notify && _ctx->exit_code == 0) - dsync_replicator_notify(ctx, sync_type, str_c(state_str)); return _ctx->exit_code == 0 ? 0 : -1; } @@ -1381,7 +1299,6 @@ cmd_dsync_server_init(struct doveadm_mail_cmd_context *_ctx) (void)doveadm_cmd_param_str(cctx, "rawlog", &ctx->rawlog_path); (void)doveadm_cmd_param_uint32(cctx, "timeout", &ctx->io_timeout_secs); - ctx->replicator_notify = doveadm_cmd_param_flag(cctx, "replicator-notify"); } static struct doveadm_mail_cmd_context *cmd_dsync_server_alloc(void) @@ -1404,7 +1321,6 @@ DOVEADM_CMD_MAIL_COMMON \ DOVEADM_CMD_PARAM('f', "full-sync", CMD_PARAM_BOOL, 0) \ DOVEADM_CMD_PARAM('P', "purge-remote", CMD_PARAM_BOOL, 0) \ DOVEADM_CMD_PARAM('R', "reverse-sync", CMD_PARAM_BOOL, 0) \ -DOVEADM_CMD_PARAM('U', "replicator-notify", CMD_PARAM_BOOL, 0) \ DOVEADM_CMD_PARAM('l', "lock-timeout", CMD_PARAM_INT64, CMD_PARAM_FLAG_UNSIGNED) \ DOVEADM_CMD_PARAM('r', "rawlog", CMD_PARAM_STR, 0) \ DOVEADM_CMD_PARAM('m', "mailbox", CMD_PARAM_STR, 0) \ @@ -1458,7 +1374,6 @@ DOVEADM_CMD_MAIL_COMMON DOVEADM_CMD_PARAM('E', "legacy-dsync", CMD_PARAM_BOOL, 0) DOVEADM_CMD_PARAM('r', "rawlog", CMD_PARAM_STR, 0) DOVEADM_CMD_PARAM('T', "timeout", CMD_PARAM_INT64, CMD_PARAM_FLAG_UNSIGNED) -DOVEADM_CMD_PARAM('U', "replicator-notify", CMD_PARAM_BOOL, 0) /* previously dsync-server could have been added twice to the parameters */ DOVEADM_CMD_PARAM('\0', "ignore-arg", CMD_PARAM_STR, CMD_PARAM_FLAG_POSITIONAL) DOVEADM_CMD_PARAMS_END diff --git a/src/doveadm/doveadm-replicator.c b/src/doveadm/doveadm-replicator.c deleted file mode 100644 index 136d725036..0000000000 --- a/src/doveadm/doveadm-replicator.c +++ /dev/null @@ -1,381 +0,0 @@ -/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */ - -#include "lib.h" -#include "ioloop.h" -#include "str.h" -#include "strescape.h" -#include "istream.h" -#include "write-full.h" -#include "master-service.h" -#include "doveadm.h" -#include "doveadm-print.h" - -#include -#include -#include - -struct replicator_context { - const char *socket_path; - const char *priority; - const char *user_mask, *username; - struct istream *input; - bool full_sync; -}; - -extern struct doveadm_cmd_ver2 doveadm_cmd_replicator[]; - -static void replicator_cmd_help(const struct doveadm_cmd_ver2 *cmd) ATTR_NORETURN; - -static void -replicator_send(struct replicator_context *ctx, const char *data) -{ - if (write_full(i_stream_get_fd(ctx->input), data, strlen(data)) < 0) - i_fatal("write(%s) failed: %m", ctx->socket_path); -} - -static void replicator_connect(struct replicator_context *ctx) -{ -#define REPLICATOR_HANDSHAKE "VERSION\treplicator-doveadm-client\t1\t0\n" - const char *line; - int fd; - - fd = doveadm_connect(ctx->socket_path); - net_set_nonblock(fd, FALSE); - - ctx->input = i_stream_create_fd_autoclose(&fd, SIZE_MAX); - replicator_send(ctx, REPLICATOR_HANDSHAKE); - - alarm(5); - line = i_stream_read_next_line(ctx->input); - alarm(0); - if (line == NULL) { - if (ctx->input->stream_errno != 0) { - i_fatal("read(%s) failed: %s", ctx->socket_path, - i_stream_get_error(ctx->input)); - } else if (ctx->input->eof) - i_fatal("%s disconnected", ctx->socket_path); - else - i_fatal("read(%s) timed out", ctx->socket_path); - } - if (!version_string_verify(line, "replicator-doveadm-server", 1)) { - i_fatal_status(EX_PROTOCOL, - "%s not a compatible replicator-doveadm socket", - ctx->socket_path); - } -} - -static void replicator_disconnect(struct replicator_context *ctx) -{ - if (ctx->input->stream_errno != 0) { - i_fatal("read(%s) failed: %s", ctx->socket_path, - i_stream_get_error(ctx->input)); - } - i_stream_destroy(&ctx->input); -} - -static struct replicator_context * -cmd_replicator_init(struct doveadm_cmd_context *cctx) -{ - struct replicator_context *ctx; - - ctx = t_new(struct replicator_context, 1); - ctx->socket_path = t_strconcat(doveadm_settings->base_dir, - "/replicator-doveadm", NULL); - - (void)doveadm_cmd_param_str(cctx, "socket-path", &ctx->socket_path); - (void)doveadm_cmd_param_bool(cctx, "full-sync", &ctx->full_sync); - (void)doveadm_cmd_param_str(cctx, "priority", &ctx->priority); - (void)doveadm_cmd_param_str(cctx, "user-mask", &ctx->user_mask); - (void)doveadm_cmd_param_str(cctx, "user", &ctx->username); - - replicator_connect(ctx); - return ctx; -} - -static const char *time_formatted_hms(unsigned int secs) -{ - return t_strdup_printf("%02d:%02d:%02d", secs/3600, - (secs/60)%60, secs%60); -} - -static const char *time_ago(time_t t) -{ - int diff = ioloop_time - t; - - if (t == 0) - return "-"; - return time_formatted_hms(diff); -} - -static void cmd_replicator_status_overview(struct replicator_context *ctx) -{ - char *line, *value; - - doveadm_print_init(DOVEADM_PRINT_TYPE_TABLE); - doveadm_print_header("field", "field", - DOVEADM_PRINT_HEADER_FLAG_HIDE_TITLE); - doveadm_print_header("value", "value", - DOVEADM_PRINT_HEADER_FLAG_HIDE_TITLE); - - replicator_send(ctx, "STATUS\n"); - while ((line = i_stream_read_next_line(ctx->input)) != NULL) { - if (*line == '\0') - break; - value = strchr(line, '\t'); - if (value != NULL) - *value++ = '\0'; - else - value = ""; - doveadm_print(line); - doveadm_print(value); - } - replicator_disconnect(ctx); -} - -static void cmd_replicator_status(struct doveadm_cmd_context *cctx) -{ - struct replicator_context *ctx; - const char *line, *const *args; - time_t last_fast, last_full, last_success; - unsigned int next_secs; - - ctx = cmd_replicator_init(cctx); - if (ctx->user_mask == NULL) { - cmd_replicator_status_overview(ctx); - return; - } - - doveadm_print_init(DOVEADM_PRINT_TYPE_TABLE); - doveadm_print_header("username", "username", - DOVEADM_PRINT_HEADER_FLAG_EXPAND); - doveadm_print_header_simple("priority"); - doveadm_print_header_simple("fast sync"); - doveadm_print_header_simple("full sync"); - doveadm_print_header_simple("success sync"); - doveadm_print_header_simple("failed"); - doveadm_print_header_simple("next sync secs"); - - replicator_send(ctx, t_strdup_printf("STATUS\t%s\n", - str_tabescape(ctx->user_mask))); - while ((line = i_stream_read_next_line(ctx->input)) != NULL) { - if (*line == '\0') - break; - T_BEGIN { - args = t_strsplit_tabescaped(line); - if (str_array_length(args) >= 6 && - str_to_time(args[2], &last_fast) == 0 && - str_to_time(args[3], &last_full) == 0 && - str_to_time(args[5], &last_success) == 0 && - str_to_uint(args[6], &next_secs) == 0) { - doveadm_print(args[0]); - doveadm_print(args[1]); - doveadm_print(time_ago(last_fast)); - doveadm_print(time_ago(last_full)); - doveadm_print(time_ago(last_success)); - doveadm_print(args[4][0] == '0' ? "-" : "y"); - doveadm_print(time_formatted_hms(next_secs)); - } - } T_END; - } - if (line == NULL) { - e_error(cctx->event, "Replicator disconnected unexpectedly"); - doveadm_exit_code = EX_TEMPFAIL; - } - replicator_disconnect(ctx); -} - -static void cmd_replicator_dsync_status(struct doveadm_cmd_context *cctx) -{ - struct replicator_context *ctx; - const char *line; - unsigned int i; - - ctx = cmd_replicator_init(cctx); - - doveadm_print_init(DOVEADM_PRINT_TYPE_TABLE); - doveadm_print_header("username", "username", - DOVEADM_PRINT_HEADER_FLAG_EXPAND); - doveadm_print_header_simple("type"); - doveadm_print_header_simple("status"); - - replicator_send(ctx, "STATUS-DSYNC\n"); - while ((line = i_stream_read_next_line(ctx->input)) != NULL) { - if (*line == '\0') - break; - T_BEGIN { - const char *const *args = t_strsplit_tabescaped(line); - - for (i = 0; i < 3; i++) { - if (args[i] == NULL) - break; - doveadm_print(args[i]); - } - for (; i < 3; i++) - doveadm_print(""); - } T_END; - } - replicator_disconnect(ctx); -} - -static void cmd_replicator_replicate(struct doveadm_cmd_context *cctx) -{ - struct replicator_context *ctx; - string_t *str; - const char *line; - - ctx = cmd_replicator_init(cctx); - if (ctx->user_mask == NULL) - replicator_cmd_help(cctx->cmd); - - str = t_str_new(128); - str_append(str, "REPLICATE\t"); - if (ctx->priority == NULL) - str_append_tabescaped(str, "low"); - else - str_append_tabescaped(str, ctx->priority); - str_append_c(str, '\t'); - if (ctx->full_sync) - str_append_c(str, 'f'); - str_append_c(str, '\t'); - str_append_tabescaped(str, ctx->user_mask); - str_append_c(str, '\n'); - replicator_send(ctx, str_c(str)); - - doveadm_print_init(DOVEADM_PRINT_TYPE_FLOW); - doveadm_print_header("result", "result", - DOVEADM_PRINT_HEADER_FLAG_HIDE_TITLE); - - line = i_stream_read_next_line(ctx->input); - if (line == NULL) { - e_error(cctx->event, "Replicator disconnected unexpectedly"); - doveadm_exit_code = EX_TEMPFAIL; - } else if (line[0] != '+') { - e_error(cctx->event, "Replicator failed: %s", line+1); - doveadm_exit_code = EX_USAGE; - } else { - doveadm_print(t_strdup_printf("%s users updated", line+1)); - } - replicator_disconnect(ctx); -} - -static void cmd_replicator_add(struct doveadm_cmd_context *cctx) -{ - struct replicator_context *ctx; - string_t *str; - const char *line; - - ctx = cmd_replicator_init(cctx); - if (ctx->user_mask == NULL) - replicator_cmd_help(cctx->cmd); - - str = t_str_new(128); - str_append(str, "ADD\t"); - str_append_tabescaped(str, ctx->user_mask); - str_append_c(str, '\n'); - replicator_send(ctx, str_c(str)); - - line = i_stream_read_next_line(ctx->input); - if (line == NULL) { - e_error(cctx->event, "Replicator disconnected unexpectedly"); - doveadm_exit_code = EX_TEMPFAIL; - } else if (line[0] != '+') { - e_error(cctx->event, "Replicator failed: %s", line+1); - doveadm_exit_code = EX_USAGE; - } - replicator_disconnect(ctx); -} - -static void cmd_replicator_remove(struct doveadm_cmd_context *cctx) -{ - struct replicator_context *ctx; - string_t *str; - const char *line; - - ctx = cmd_replicator_init(cctx); - if (ctx->username == NULL) - replicator_cmd_help(cctx->cmd); - - str = t_str_new(128); - str_append(str, "REMOVE\t"); - str_append_tabescaped(str, ctx->username); - str_append_c(str, '\n'); - replicator_send(ctx, str_c(str)); - - line = i_stream_read_next_line(ctx->input); - if (line == NULL) { - e_error(cctx->event, "Replicator disconnected unexpectedly"); - doveadm_exit_code = EX_TEMPFAIL; - } else if (line[0] != '+') { - e_error(cctx->event, "Replicator failed: %s", line+1); - doveadm_exit_code = EX_USAGE; - } - replicator_disconnect(ctx); -} - -struct doveadm_cmd_ver2 doveadm_cmd_replicator[] = { -{ - .name = "replicator status", - .cmd = cmd_replicator_status, - .usage = "[-a ] []", -DOVEADM_CMD_PARAMS_START -DOVEADM_CMD_PARAM('a', "socket-path", CMD_PARAM_STR, 0) -DOVEADM_CMD_PARAM('\0', "user-mask", CMD_PARAM_STR, CMD_PARAM_FLAG_POSITIONAL) -DOVEADM_CMD_PARAMS_END -}, -{ - .name = "replicator dsync-status", - .cmd = cmd_replicator_dsync_status, - .usage = "[-a ]", -DOVEADM_CMD_PARAMS_START -DOVEADM_CMD_PARAM('a', "socket-path", CMD_PARAM_STR, 0) -DOVEADM_CMD_PARAMS_END -}, -{ - .name = "replicator replicate", - .cmd = cmd_replicator_replicate, - .usage = "[-a ] [-f] [-p ] ", -DOVEADM_CMD_PARAMS_START -DOVEADM_CMD_PARAM('a', "socket-path", CMD_PARAM_STR, 0) -DOVEADM_CMD_PARAM('f', "full-sync", CMD_PARAM_BOOL, 0) -DOVEADM_CMD_PARAM('p', "priority", CMD_PARAM_STR, 0) -DOVEADM_CMD_PARAM('\0', "user-mask", CMD_PARAM_STR, CMD_PARAM_FLAG_POSITIONAL) -DOVEADM_CMD_PARAMS_END -}, -{ - .name = "replicator add", - .cmd = cmd_replicator_add, - .usage = "[-a ] ", -DOVEADM_CMD_PARAMS_START -DOVEADM_CMD_PARAM('a', "socket-path", CMD_PARAM_STR, 0) -DOVEADM_CMD_PARAM('\0', "user-mask", CMD_PARAM_STR, CMD_PARAM_FLAG_POSITIONAL) -DOVEADM_CMD_PARAMS_END -}, -{ - .name = "replicator remove", - .cmd = cmd_replicator_remove, - .usage = "[-a ] ", -DOVEADM_CMD_PARAMS_START -DOVEADM_CMD_PARAM('a', "socket-path", CMD_PARAM_STR, 0) -DOVEADM_CMD_PARAM('\0', "user", CMD_PARAM_STR, CMD_PARAM_FLAG_POSITIONAL) -DOVEADM_CMD_PARAMS_END -}, -}; - -static void replicator_cmd_help(const struct doveadm_cmd_ver2 *cmd) -{ - unsigned int i; - - for (i = 0; i < N_ELEMENTS(doveadm_cmd_replicator); i++) { - if (doveadm_cmd_replicator[i].cmd == cmd->cmd) - help_ver2(&doveadm_cmd_replicator[i]); - } - i_unreached(); -} - -void doveadm_register_replicator_commands(void) -{ - unsigned int i; - - for (i = 0; i < N_ELEMENTS(doveadm_cmd_replicator); i++) - doveadm_cmd_register_ver2(&doveadm_cmd_replicator[i]); -} diff --git a/src/lib-doveadm/doveadm-protocol.c b/src/lib-doveadm/doveadm-protocol.c index 904de01fc1..275aef9983 100644 --- a/src/lib-doveadm/doveadm-protocol.c +++ b/src/lib-doveadm/doveadm-protocol.c @@ -17,7 +17,6 @@ static const struct exit_code_str { { EX_PROTOCOL, "PROTOCOL" }, { EX_DATAERR, "DATAERR" }, { DOVEADM_EX_CHANGED, "CHANGED" }, - { DOVEADM_EX_NOREPLICATE, "NOREPLICATE" }, { DOVEADM_EX_REFERRAL, "REFERRAL" }, { DOVEADM_EX_NOTFOUND, "NOTFOUND" }, { DOVEADM_EX_EXPIRED, "EXPIRED" }, diff --git a/src/lib-doveadm/doveadm-protocol.h b/src/lib-doveadm/doveadm-protocol.h index 89c8d524db..34741a25d6 100644 --- a/src/lib-doveadm/doveadm-protocol.h +++ b/src/lib-doveadm/doveadm-protocol.h @@ -18,7 +18,6 @@ #define DOVEADM_EX_UNKNOWN -1 #define DOVEADM_EX_CHANGED 2 -#define DOVEADM_EX_NOREPLICATE 1001 #define DOVEADM_EX_REFERRAL 1002 #define DOVEADM_EX_EXPIRED 1003 diff --git a/src/plugins/Makefile.am b/src/plugins/Makefile.am index ba6e0eee17..5dfef37c8e 100644 --- a/src/plugins/Makefile.am +++ b/src/plugins/Makefile.am @@ -30,7 +30,6 @@ SUBDIRS = \ quota-clone \ imap-quota \ pop3-migration \ - replication \ mail-compress \ mail-crypt \ trash \ diff --git a/src/plugins/replication/Makefile.am b/src/plugins/replication/Makefile.am deleted file mode 100644 index 461935cebc..0000000000 --- a/src/plugins/replication/Makefile.am +++ /dev/null @@ -1,25 +0,0 @@ -AM_CPPFLAGS = \ - -I$(top_srcdir)/src/lib \ - -I$(top_srcdir)/src/lib-mail \ - -I$(top_srcdir)/src/lib-imap \ - -I$(top_srcdir)/src/lib-index \ - -I$(top_srcdir)/src/lib-storage \ - -I$(top_srcdir)/src/replication \ - -I$(top_srcdir)/src/plugins/notify - -NOPLUGIN_LDFLAGS = -lib20_replication_plugin_la_LDFLAGS = -module -avoid-version - -module_LTLIBRARIES = \ - lib20_replication_plugin.la - -if DOVECOT_PLUGIN_DEPS -lib20_replication_plugin_la_LIBADD = \ - ../notify/lib15_notify_plugin.la -endif - -lib20_replication_plugin_la_SOURCES = \ - replication-plugin.c - -noinst_HEADERS = \ - replication-plugin.h diff --git a/src/plugins/replication/replication-plugin.c b/src/plugins/replication/replication-plugin.c deleted file mode 100644 index e846e52fbc..0000000000 --- a/src/plugins/replication/replication-plugin.c +++ /dev/null @@ -1,425 +0,0 @@ -/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */ - -#include "lib.h" -#include "array.h" -#include "str.h" -#include "strescape.h" -#include "ioloop.h" -#include "net.h" -#include "write-full.h" -#include "mail-user.h" -#include "mail-namespace.h" -#include "mail-storage-private.h" -#include "notify-plugin.h" -#include "replication-common.h" -#include "replication-plugin.h" - - -#define REPLICATION_SOCKET_NAME "replication-notify" -#define REPLICATION_FIFO_NAME "replication-notify-fifo" -#define REPLICATION_NOTIFY_DELAY_MSECS 500 - -#define REPLICATION_USER_CONTEXT(obj) \ - MODULE_CONTEXT(obj, replication_user_module) - -struct replication_user { - union mail_user_module_context module_ctx; - - struct event *event; - const char *socket_path; - - struct timeout *to; - enum replication_priority priority; - unsigned int sync_secs; -}; - -struct replication_mail_txn_context { - struct mail_namespace *ns; - bool new_messages; - bool sync_trans; - char *reason; -}; - -static struct event_category event_category_replication = { - .name = "replication" -}; - -static MODULE_CONTEXT_DEFINE_INIT(replication_user_module, - &mail_user_module_register); -static int fifo_fd; -static bool fifo_failed; -static char *fifo_path; - -static int -replication_fifo_notify(struct mail_user *user, - enum replication_priority priority, const char **error_r) -{ - string_t *str; - ssize_t ret; - - if (fifo_failed) { - *error_r = ""; - return -1; - } - if (fifo_fd == -1) { - fifo_fd = open(fifo_path, O_WRONLY | O_NONBLOCK); - if (fifo_fd == -1) { - *error_r = t_strdup_printf("open(%s) failed: %m", fifo_path); - fifo_failed = TRUE; - return -1; - } - } - /* \t */ - str = t_str_new(256); - str_append_tabescaped(str, user->username); - str_append_c(str, '\t'); - switch (priority) { - case REPLICATION_PRIORITY_NONE: - case REPLICATION_PRIORITY_SYNC: - i_unreached(); - case REPLICATION_PRIORITY_LOW: - str_append(str, "low"); - break; - case REPLICATION_PRIORITY_HIGH: - str_append(str, "high"); - break; - } - str_append_c(str, '\n'); - ret = write(fifo_fd, str_data(str), str_len(str)); - i_assert(ret != 0); - if (ret == (ssize_t)str_len(str)) - return 1; - - if (ret > 0) { - *error_r = t_strdup_printf( - "write(%s) wrote partial data", fifo_path); - } else if (errno == EAGAIN) { - /* busy, try again later */ - return 0; - } else if (errno != EPIPE) { - *error_r = t_strdup_printf("write(%s) failed: %m", fifo_path); - } else { - *error_r = ""; - /* server was probably restarted, don't bother logging this. */ - } - - if (close(fifo_fd) < 0 && **error_r == '\0') - *error_r = t_strdup_printf("close(%s) failed: %m", fifo_path); - fifo_fd = -1; - return -1; -} - -static void replication_notify_now(struct mail_user *user) -{ - struct replication_user *ruser = REPLICATION_USER_CONTEXT(user); - const char *error; - int ret; - - i_assert(ruser != NULL); - i_assert(ruser->priority != REPLICATION_PRIORITY_NONE); - i_assert(ruser->priority != REPLICATION_PRIORITY_SYNC); - - if ((ret = replication_fifo_notify(user, ruser->priority, &error)) < 0 && - !fifo_failed) { - /* retry once, in case replication server was restarted */ - ret = replication_fifo_notify(user, ruser->priority, &error); - } - if (ret < 0 && *error != '\0') - e_error(ruser->event, "%s", error); - if (ret != 0) { - timeout_remove(&ruser->to); - ruser->priority = REPLICATION_PRIORITY_NONE; - } -} - -static int replication_notify_sync(struct mail_user *user) -{ - struct replication_user *ruser = REPLICATION_USER_CONTEXT(user); - string_t *str; - char buf[1024]; - int fd; - ssize_t ret; - bool success = FALSE; - - i_assert(ruser != NULL); - - fd = net_connect_unix(ruser->socket_path); - if (fd == -1) { - e_error(ruser->event, - "net_connect_unix(%s) failed: %m", ruser->socket_path); - return -1; - } - net_set_nonblock(fd, FALSE); - - /* \t "sync" */ - str = t_str_new(256); - str_append_tabescaped(str, user->username); - str_append(str, "\tsync\n"); - alarm(ruser->sync_secs); - if (write_full(fd, str_data(str), str_len(str)) < 0) { - e_error(ruser->event, "write(%s) failed: %m", ruser->socket_path); - } else { - /* + | - */ - ret = read(fd, buf, sizeof(buf)); - if (ret < 0) { - if (errno != EINTR) { - e_error(ruser->event, "read(%s) failed: %m", - ruser->socket_path); - } else { - e_warning(ruser->event, - "Sync failure: Timeout in %u secs", - ruser->sync_secs); - } - } else if (ret == 0) { - e_error(ruser->event, - "read(%s) failed: EOF", ruser->socket_path); - } else if (buf[0] == '+') { - /* success */ - success = TRUE; - } else if (buf[0] == '-') { - /* failure */ - if (buf[ret-1] == '\n') ret--; - e_warning(ruser->event, - "Sync failure: %s", t_strndup(buf+1, ret-1)); - e_warning(ruser->event, - "Remote sent invalid input: %s", - t_strndup(buf, ret)); - } - } - alarm(0); - if (close(fd) < 0) - e_error(ruser->event, "close(%s) failed: %m", ruser->socket_path); - return success ? 0 : -1; -} - -static void replication_notify(struct mail_namespace *ns, - enum replication_priority priority, - const char *event) -{ - struct replication_user *ruser; - - ruser = REPLICATION_USER_CONTEXT(ns->user); - if (ruser == NULL) - return; - - e_debug(ruser->event, "Replication requested by '%s', priority=%d", - event, priority); - - if (priority == REPLICATION_PRIORITY_SYNC) { - if (replication_notify_sync(ns->user) == 0) { - timeout_remove(&ruser->to); - ruser->priority = REPLICATION_PRIORITY_NONE; - return; - } - /* sync replication failed, try as "high" via fifo */ - priority = REPLICATION_PRIORITY_HIGH; - } - - if (ruser->priority < priority) - ruser->priority = priority; - if (ruser->to == NULL) { - ruser->to = timeout_add_short(REPLICATION_NOTIFY_DELAY_MSECS, - replication_notify_now, ns->user); - } -} - -static void * -replication_mail_transaction_begin(struct mailbox_transaction_context *t) -{ - struct replication_mail_txn_context *ctx; - - ctx = i_new(struct replication_mail_txn_context, 1); - ctx->ns = mailbox_get_namespace(t->box); - ctx->reason = i_strdup(t->reason); - if ((t->flags & MAILBOX_TRANSACTION_FLAG_SYNC) != 0) { - /* Transaction is from dsync. Don't trigger replication back. */ - ctx->sync_trans = TRUE; - } - return ctx; -} - -static void replication_mail_save(void *txn, struct mail *mail ATTR_UNUSED) -{ - struct replication_mail_txn_context *ctx = - (struct replication_mail_txn_context *)txn; - - ctx->new_messages = TRUE; -} - -static void replication_mail_copy(void *txn, struct mail *src, - struct mail *dst) -{ - struct replication_mail_txn_context *ctx = - (struct replication_mail_txn_context *)txn; - - if (src->box->storage != dst->box->storage) { - /* copy between storages, e.g. new mail delivery */ - ctx->new_messages = TRUE; - } else { - /* copy within storage, which isn't as high priority since the - mail already exists. and especially copies to Trash or to - lazy-expunge namespace is pretty low priority. */ - } -} - -static bool -replication_want_sync_changes(const struct mail_transaction_commit_changes *changes) -{ - /* Replication needs to be triggered on all the user-visible changes, - but not e.g. due to writes to cache file. */ - return (changes->changes_mask & - ENUM_NEGATE(MAIL_INDEX_TRANSACTION_CHANGE_OTHERS)) != 0; -} - -static void -replication_mail_transaction_commit(void *txn, - struct mail_transaction_commit_changes *changes) -{ - struct replication_mail_txn_context *ctx = - (struct replication_mail_txn_context *)txn; - struct replication_user *ruser = - REPLICATION_USER_CONTEXT(ctx->ns->user); - enum replication_priority priority; - - if (ruser != NULL && !ctx->sync_trans && - (ctx->new_messages || replication_want_sync_changes(changes))) { - priority = !ctx->new_messages ? REPLICATION_PRIORITY_LOW : - ruser->sync_secs == 0 ? REPLICATION_PRIORITY_HIGH : - REPLICATION_PRIORITY_SYNC; - replication_notify(ctx->ns, priority, ctx->reason); - } - i_free(ctx->reason); - i_free(ctx); -} - -static void replication_mailbox_create(struct mailbox *box) -{ - replication_notify(mailbox_get_namespace(box), - REPLICATION_PRIORITY_LOW, "mailbox create"); -} - -static void -replication_mailbox_delete_commit(void *txn ATTR_UNUSED, - struct mailbox *box) -{ - replication_notify(mailbox_get_namespace(box), - REPLICATION_PRIORITY_LOW, "mailbox delete"); -} - -static void -replication_mailbox_rename(struct mailbox *src ATTR_UNUSED, - struct mailbox *dest) -{ - replication_notify(mailbox_get_namespace(dest), - REPLICATION_PRIORITY_LOW, "mailbox rename"); -} - -static void replication_mailbox_set_subscribed(struct mailbox *box, - bool subscribed ATTR_UNUSED) -{ - replication_notify(mailbox_get_namespace(box), - REPLICATION_PRIORITY_LOW, "mailbox subscribe"); -} - -static void replication_user_deinit(struct mail_user *user) -{ - struct replication_user *ruser = REPLICATION_USER_CONTEXT(user); - - i_assert(ruser != NULL); - - if (ruser->to != NULL) { - replication_notify_now(user); - if (ruser->to != NULL) { - e_warning(ruser->event, - "%s: Couldn't send final notification " - "due to fifo being busy", fifo_path); - timeout_remove(&ruser->to); - } - } - - event_unref(&ruser->event); - ruser->module_ctx.super.deinit(user); -} - -static void replication_user_created(struct mail_user *user) -{ - struct mail_user_vfuncs *v = user->vlast; - struct replication_user *ruser; - struct event *event; - const char *value; - - event = event_create(user->event); - event_set_append_log_prefix(event, "replication: "); - event_add_category(event, &event_category_replication); - - value = mail_user_plugin_getenv(user, "mail_replica"); - if (value == NULL || value[0] == '\0') { - e_debug(event, "No mail_replica setting - replication disabled"); - event_unref(&event); - return; - } - - if (user->dsyncing) { - /* we're running dsync, which means that the remote is telling - us about a change. don't trigger a replication back to it */ - e_debug(event, "We're running dsync - replication disabled"); - event_unref(&event); - return; - } - - ruser = p_new(user->pool, struct replication_user, 1); - ruser->module_ctx.super = *v; - ruser->event = event; - user->vlast = &ruser->module_ctx.super; - v->deinit = replication_user_deinit; - MODULE_CONTEXT_SET(user, replication_user_module, ruser); - - if (fifo_path == NULL) { - /* we'll assume that all users have the same base_dir. - they really should. */ - fifo_path = i_strconcat(user->set->base_dir, - "/"REPLICATION_FIFO_NAME, NULL); - } - ruser->socket_path = p_strconcat(user->pool, user->set->base_dir, - "/"REPLICATION_SOCKET_NAME, NULL); - value = mail_user_plugin_getenv(user, "replication_sync_timeout"); - if (value != NULL && str_to_uint(value, &ruser->sync_secs) < 0) { - e_error(event, "Invalid replication_sync_timeout value: %s", - value); - } -} - -static const struct notify_vfuncs replication_vfuncs = { - .mail_transaction_begin = replication_mail_transaction_begin, - .mail_save = replication_mail_save, - .mail_copy = replication_mail_copy, - .mail_transaction_commit = replication_mail_transaction_commit, - .mailbox_create = replication_mailbox_create, - .mailbox_delete_commit = replication_mailbox_delete_commit, - .mailbox_rename = replication_mailbox_rename, - .mailbox_set_subscribed = replication_mailbox_set_subscribed -}; - -static struct notify_context *replication_ctx; - -static struct mail_storage_hooks replication_mail_storage_hooks = { - .mail_user_created = replication_user_created -}; - -void replication_plugin_init(struct module *module) -{ - fifo_fd = -1; - replication_ctx = notify_register(&replication_vfuncs); - mail_storage_hooks_add(module, &replication_mail_storage_hooks); -} - -void replication_plugin_deinit(void) -{ - i_close_fd_path(&fifo_fd, fifo_path); - i_free_and_null(fifo_path); - - mail_storage_hooks_remove(&replication_mail_storage_hooks); - notify_unregister(&replication_ctx); -} - -const char *replication_plugin_dependencies[] = { "notify", NULL }; diff --git a/src/plugins/replication/replication-plugin.h b/src/plugins/replication/replication-plugin.h deleted file mode 100644 index 7fa344fec1..0000000000 --- a/src/plugins/replication/replication-plugin.h +++ /dev/null @@ -1,9 +0,0 @@ -#ifndef REPLICATION_PLUGIN_H -#define REPLICATION_PLUGIN_H - -extern const char *replication_plugin_dependencies[]; - -void replication_plugin_init(struct module *module); -void replication_plugin_deinit(void); - -#endif diff --git a/src/replication/Makefile.am b/src/replication/Makefile.am deleted file mode 100644 index c9ddbd088b..0000000000 --- a/src/replication/Makefile.am +++ /dev/null @@ -1,4 +0,0 @@ -SUBDIRS = aggregator replicator - -noinst_HEADERS = \ - replication-common.h diff --git a/src/replication/aggregator/Makefile.am b/src/replication/aggregator/Makefile.am deleted file mode 100644 index a0b350131b..0000000000 --- a/src/replication/aggregator/Makefile.am +++ /dev/null @@ -1,29 +0,0 @@ -pkglibexecdir = $(libexecdir)/dovecot - -pkglibexec_PROGRAMS = aggregator - -AM_CPPFLAGS = \ - -I$(top_srcdir)/src/lib \ - -I$(top_srcdir)/src/lib-settings \ - -I$(top_srcdir)/src/lib-auth \ - -I$(top_srcdir)/src/lib-master \ - -I$(top_srcdir)/src/replication \ - -DPKG_STATEDIR=\""$(statedir)"\" \ - $(BINARY_CFLAGS) - -aggregator_LDFLAGS = -export-dynamic \ - $(BINARY_LDFLAGS) - -aggregator_LDADD = $(LIBDOVECOT) -aggregator_DEPENDENCIES = $(LIBDOVECOT_DEPS) - -aggregator_SOURCES = \ - aggregator.c \ - aggregator-settings.c \ - notify-connection.c \ - replicator-connection.c - -noinst_HEADERS = \ - aggregator-settings.h \ - notify-connection.h \ - replicator-connection.h diff --git a/src/replication/aggregator/aggregator-settings.c b/src/replication/aggregator/aggregator-settings.c deleted file mode 100644 index c654d102f1..0000000000 --- a/src/replication/aggregator/aggregator-settings.c +++ /dev/null @@ -1,95 +0,0 @@ -/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */ - -#include "lib.h" -#include "buffer.h" -#include "settings-parser.h" -#include "service-settings.h" -#include "aggregator-settings.h" - -/* */ -static struct file_listener_settings aggregator_unix_listeners_array[] = { - { - .path = "replication-notify", - .mode = 0600, - .user = "", - .group = "", - }, -}; -static struct file_listener_settings *aggregator_unix_listeners[] = { - &aggregator_unix_listeners_array[0] -}; -static buffer_t aggregator_unix_listeners_buf = { - { { aggregator_unix_listeners, sizeof(aggregator_unix_listeners) } } -}; - -static struct file_listener_settings aggregator_fifo_listeners_array[] = { - { - .path = "replication-notify-fifo", - .mode = 0600, - .user = "", - .group = "", - }, -}; -static struct file_listener_settings *aggregator_fifo_listeners[] = { - &aggregator_fifo_listeners_array[0] -}; -static buffer_t aggregator_fifo_listeners_buf = { - { { aggregator_fifo_listeners, sizeof(aggregator_fifo_listeners) } } -}; -/* */ - -struct service_settings aggregator_service_settings = { - .name = "aggregator", - .protocol = "", - .type = "", - .executable = "aggregator", - .user = "$default_internal_user", - .group = "", - .privileged_group = "", - .extra_groups = "", - .chroot = ".", - - .drop_priv_before_exec = FALSE, - - .process_min_avail = 0, - .process_limit = 0, - .client_limit = 0, - .service_count = 0, - .idle_kill = 0, - .vsz_limit = UOFF_T_MAX, - - .unix_listeners = { { &aggregator_unix_listeners_buf, - sizeof(aggregator_unix_listeners[0]) } }, - .fifo_listeners = { { &aggregator_fifo_listeners_buf, - sizeof(aggregator_fifo_listeners[0]) } }, - .inet_listeners = ARRAY_INIT -}; - -#undef DEF -#define DEF(type, name) \ - SETTING_DEFINE_STRUCT_##type(#name, name, struct aggregator_settings) - -static const struct setting_define aggregator_setting_defines[] = { - DEF(STR, replicator_host), - DEF(IN_PORT, replicator_port), - - SETTING_DEFINE_LIST_END -}; - -const struct aggregator_settings aggregator_default_settings = { - .replicator_host = "replicator", - .replicator_port = 0 -}; - -const struct setting_parser_info aggregator_setting_parser_info = { - .module_name = "aggregator", - .defines = aggregator_setting_defines, - .defaults = &aggregator_default_settings, - - .type_offset = SIZE_MAX, - .struct_size = sizeof(struct aggregator_settings), - - .parent_offset = SIZE_MAX -}; - -const struct aggregator_settings *aggregator_settings; diff --git a/src/replication/aggregator/aggregator-settings.h b/src/replication/aggregator/aggregator-settings.h deleted file mode 100644 index 1ada71de06..0000000000 --- a/src/replication/aggregator/aggregator-settings.h +++ /dev/null @@ -1,13 +0,0 @@ -#ifndef AGGREGATOR_SETTINGS_H -#define AGGREGATOR_SETTINGS_H - -struct aggregator_settings { - const char *replicator_host; - in_port_t replicator_port; -}; - -extern const struct setting_parser_info aggregator_setting_parser_info; -extern const struct aggregator_settings *aggregator_settings; -extern struct event *aggregator_event; - -#endif diff --git a/src/replication/aggregator/aggregator.c b/src/replication/aggregator/aggregator.c deleted file mode 100644 index f251f7edc1..0000000000 --- a/src/replication/aggregator/aggregator.c +++ /dev/null @@ -1,88 +0,0 @@ -/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */ - -#include "lib.h" -#include "restrict-access.h" -#include "master-service.h" -#include "master-service-settings.h" -#include "aggregator-settings.h" -#include "notify-connection.h" -#include "replicator-connection.h" - -struct replicator_connection *replicator; -struct event *aggregator_event; - -static struct event_category event_category_replication = { - .name = "replication" -}; - -static void client_connected(struct master_service_connection *conn) -{ - const char *name; - - master_service_client_connection_accept(conn); - if (conn->remote_port == 0) - name = conn->name; - else - name = net_ipport2str(&conn->remote_ip, conn->remote_port); - notify_connection_create(conn->fd, conn->fifo, name); -} - -static void main_preinit(void) -{ - struct ip_addr *ips; - unsigned int ips_count; - const struct aggregator_settings *set; - int ret; - - set = master_service_settings_get_root_set(master_service, - &aggregator_setting_parser_info); - - aggregator_event = event_create(NULL); - event_add_category(aggregator_event, &event_category_replication); - - if (set->replicator_port != 0) { - ret = net_gethostbyname(set->replicator_host, &ips, &ips_count); - if (ret != 0) { - i_fatal("replicator_host: gethostbyname(%s) failed: %s", - set->replicator_host, net_gethosterror(ret)); - } - replicator = replicator_connection_create_inet(ips, ips_count, - set->replicator_port, - notify_connection_sync_callback); - } else { - replicator = replicator_connection_create_unix(set->replicator_host, - notify_connection_sync_callback); - } -} - -int main(int argc, char *argv[]) -{ - const struct setting_parser_info *set_roots[] = { - &aggregator_setting_parser_info, - NULL - }; - const char *error; - - master_service = master_service_init("aggregator", 0, &argc, &argv, ""); - if (master_getopt(master_service) > 0) - return FATAL_DEFAULT; - - if (master_service_settings_read_simple(master_service, set_roots, - &error) < 0) - i_fatal("Error reading configuration: %s", error); - master_service_init_log(master_service); - - main_preinit(); - - restrict_access_by_env(RESTRICT_ACCESS_FLAG_ALLOW_ROOT, NULL); - restrict_access_allow_coredumps(TRUE); - master_service_init_finish(master_service); - - master_service_run(master_service, client_connected); - - notify_connections_destroy_all(); - replicator_connection_destroy(&replicator); - event_unref(&aggregator_event); - master_service_deinit(&master_service); - return 0; -} diff --git a/src/replication/aggregator/notify-connection.c b/src/replication/aggregator/notify-connection.c deleted file mode 100644 index 6beb3d9338..0000000000 --- a/src/replication/aggregator/notify-connection.c +++ /dev/null @@ -1,174 +0,0 @@ -/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */ - -#include "lib.h" -#include "ioloop.h" -#include "net.h" -#include "istream.h" -#include "ostream.h" -#include "llist.h" -#include "strescape.h" -#include "master-service.h" -#include "replication-common.h" -#include "replicator-connection.h" -#include "notify-connection.h" -#include "aggregator-settings.h" - -#define MAX_INBUF_SIZE 8192 - -#define CONNECTION_IS_FIFO(conn) \ - ((conn)->output == NULL) - -struct notify_connection { - struct notify_connection *prev, *next; - struct event *event; - int refcount; - - int fd; - struct io *io; - struct istream *input; - struct ostream *output; -}; - -static struct notify_connection *conns = NULL; - -static void notify_connection_unref(struct notify_connection *conn); -static void notify_connection_destroy(struct notify_connection *conn); - -static bool notify_input_error(struct notify_connection *conn) -{ - if (CONNECTION_IS_FIFO(conn)) - return TRUE; - notify_connection_destroy(conn); - return FALSE; -} - -void notify_connection_sync_callback(bool success, void *context) -{ - struct notify_connection *conn = context; - - e_debug(conn->event, "Sending %s result", - success ? "success" : "failure"); - o_stream_nsend_str(conn->output, success ? "+\n" : "-\n"); - notify_connection_unref(conn); -} - -static int -notify_input_line(struct notify_connection *conn, const char *line, - const char **error_r) -{ - const char *const *args; - enum replication_priority priority; - - /* \t */ - args = t_strsplit_tabescaped(line); - if (str_array_length(args) < 2) { - *error_r = "Client sent invalid input"; - return -1; - } - if (replication_priority_parse(args[1], &priority) < 0) { - *error_r = t_strdup_printf( - "Client sent invalid priority: %s", args[1]); - return -1; - } - - e_debug(conn->event, "Received priority %s request for %s", - args[1], args[0]); - - if (priority != REPLICATION_PRIORITY_SYNC) - replicator_connection_notify(replicator, args[0], priority); - else { - conn->refcount++; - replicator_connection_notify_sync(replicator, args[0], conn); - } - return 0; -} - -static void notify_input(struct notify_connection *conn) -{ - const char *line; - int ret; - const char *error; - - switch (i_stream_read(conn->input)) { - case -2: - /* buffer full */ - e_error(conn->event, "Client sent too long line"); - (void)notify_input_error(conn); - return; - case -1: - /* disconnected */ - notify_connection_destroy(conn); - return; - } - - while ((line = i_stream_next_line(conn->input)) != NULL) { - T_BEGIN { - ret = notify_input_line(conn, line, &error); - if (ret < 0) - e_error(conn->event, "%s", error); - } T_END; - if (ret < 0) { - if (!notify_input_error(conn)) - return; - } - } -} - -void notify_connection_create(int fd, bool fifo, const char *name) -{ - struct notify_connection *conn; - - conn = i_new(struct notify_connection, 1); - conn->refcount = 1; - conn->fd = fd; - conn->io = io_add(fd, IO_READ, notify_input, conn); - conn->input = i_stream_create_fd(fd, MAX_INBUF_SIZE); - i_stream_set_name(conn->input, name); - conn->event = event_create(NULL); - event_set_append_log_prefix(conn->event, - t_strdup_printf("notify(%s): ", name)); - if (!fifo) { - conn->output = o_stream_create_fd(fd, SIZE_MAX); - o_stream_set_no_error_handling(conn->output, TRUE); - } - - DLLIST_PREPEND(&conns, conn); -} - -static void notify_connection_unref(struct notify_connection *conn) -{ - i_assert(conn->refcount > 0); - if (--conn->refcount > 0) - return; - - i_stream_destroy(&conn->input); - o_stream_destroy(&conn->output); - event_unref(&conn->event); - i_free(conn); -} - -static void notify_connection_destroy(struct notify_connection *conn) -{ - i_assert(conn->fd != -1); - - e_debug(conn->event, "Disconnected"); - - if (!CONNECTION_IS_FIFO(conn)) - master_service_client_connection_destroyed(master_service); - - DLLIST_REMOVE(&conns, conn); - - io_remove(&conn->io); - i_stream_close(conn->input); - o_stream_close(conn->output); - net_disconnect(conn->fd); - conn->fd = -1; - - notify_connection_unref(conn); -} - -void notify_connections_destroy_all(void) -{ - while (conns != NULL) - notify_connection_destroy(conns); -} diff --git a/src/replication/aggregator/notify-connection.h b/src/replication/aggregator/notify-connection.h deleted file mode 100644 index ba3b27b04e..0000000000 --- a/src/replication/aggregator/notify-connection.h +++ /dev/null @@ -1,9 +0,0 @@ -#ifndef NOTIFY_CONNECTION_H -#define NOTIFY_CONNECTION_H - -void notify_connection_create(int fd, bool fifo, const char *name); -void notify_connections_destroy_all(void); - -void notify_connection_sync_callback(bool success, void *context); - -#endif diff --git a/src/replication/aggregator/replicator-connection.c b/src/replication/aggregator/replicator-connection.c deleted file mode 100644 index 9fab8f02e9..0000000000 --- a/src/replication/aggregator/replicator-connection.c +++ /dev/null @@ -1,352 +0,0 @@ -/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */ - -#include "lib.h" -#include "ioloop.h" -#include "net.h" -#include "istream.h" -#include "ostream.h" -#include "buffer.h" -#include "hash.h" -#include "llist.h" -#include "strescape.h" -#include "replicator-connection.h" -#include "aggregator-settings.h" - -#define MAX_INBUF_SIZE 1024 -#define REPLICATOR_RECONNECT_MSECS 5000 -#define REPLICATOR_MEMBUF_MAX_SIZE 1024*1024 -#define REPLICATOR_HANDSHAKE "VERSION\treplicator-notify\t1\t0\n" - -struct replicator_connection { - char *path; - struct event *event; - struct ip_addr *ips; - unsigned int ips_count, ip_idx; - in_port_t port; - - int fd; - struct io *io; - struct istream *input; - struct ostream *output; - struct timeout *to; - - buffer_t *queue[REPLICATION_PRIORITY_SYNC + 1]; - - HASH_TABLE(void *, void *) requests; - unsigned int request_id_counter; - replicator_sync_callback_t *callback; -}; - -static void replicator_connection_disconnect(struct replicator_connection *conn); - -static int -replicator_input_line(struct replicator_connection *conn, const char *line) -{ - void *context; - unsigned int id; - - /* <+|-> \t */ - if ((line[0] != '+' && line[0] != '-') || line[1] != '\t' || - str_to_uint(line+2, &id) < 0 || id == 0) { - e_error(conn->event, "Replicator sent invalid input: %s", line); - return -1; - } - - context = hash_table_lookup(conn->requests, POINTER_CAST(id)); - if (context == NULL) { - e_error(conn->event, "Replicator sent invalid ID: %u", id); - return -1; - } - - e_debug(conn->event, "Request id %u has %s", - id, line[0] == '+' ? "succeeded" : "failed"); - hash_table_remove(conn->requests, POINTER_CAST(id)); - conn->callback(line[0] == '+', context); - return 0; -} - -static void replicator_input(struct replicator_connection *conn) -{ - const char *line; - - switch (i_stream_read(conn->input)) { - case -2: - /* buffer full */ - e_error(conn->event, "Replicator sent too long line"); - replicator_connection_disconnect(conn); - return; - case -1: - /* disconnected */ - replicator_connection_disconnect(conn); - return; - } - - while ((line = i_stream_next_line(conn->input)) != NULL) - (void)replicator_input_line(conn, line); -} - -static bool -replicator_send_buf(struct replicator_connection *conn, buffer_t *buf) -{ - const unsigned char *data = buf->data; - size_t len = IO_BLOCK_SIZE; - - /* try to send about IO_BLOCK_SIZE amount of data, - but only full lines */ - if (len >= buf->used) - len = buf->used - 1; - for (;; len++) { - i_assert(len < buf->used); /* there is always LF */ - if (data[len] == '\n') { - len++; - break; - } - } - - if (o_stream_send(conn->output, data, len) < 0) { - replicator_connection_disconnect(conn); - return FALSE; - } - buffer_delete(buf, 0, len); - return TRUE; -} - -static int replicator_output(struct replicator_connection *conn) -{ - enum replication_priority p; - - if (o_stream_flush(conn->output) < 0) { - replicator_connection_disconnect(conn); - return 1; - } - - for (p = REPLICATION_PRIORITY_SYNC;;) { - if (o_stream_get_buffer_used_size(conn->output) > 0) { - o_stream_set_flush_pending(conn->output, TRUE); - break; - } - /* output buffer is empty, send more data */ - if (conn->queue[p]->used > 0) { - if (!replicator_send_buf(conn, conn->queue[p])) - break; - } else { - if (p == REPLICATION_PRIORITY_LOW) - break; - p--; - } - } - return 1; -} - -static void replicator_connection_connect(struct replicator_connection *conn) -{ - unsigned int n; - int fd = -1; - - if (conn->fd != -1) - return; - - if (conn->port == 0) { - event_set_append_log_prefix(conn->event, - t_strdup_printf("(unix:%s): ", conn->path)); - e_debug(conn->event, "Connecting to replicator"); - fd = net_connect_unix(conn->path); - if (fd == -1) - e_error(conn->event, "net_connect_unix(%s) failed: %m", - conn->path); - } else { - for (n = 0; n < conn->ips_count; n++) { - unsigned int idx = conn->ip_idx; - - conn->ip_idx = (conn->ip_idx + 1) % conn->ips_count; - event_set_append_log_prefix(conn->event, t_strdup_printf( - "(%s): ", net_ipport2str(&conn->ips[idx], conn->port))); - e_debug(conn->event, "Connecting to replicator"); - fd = net_connect_ip(&conn->ips[idx], conn->port, NULL); - if (fd != -1) - break; - e_error(conn->event, "connect(%s, %u) failed: %m", - net_ip2addr(&conn->ips[idx]), conn->port); - } - } - - if (fd == -1) { - event_set_append_log_prefix(conn->event, ""); - if (conn->to == NULL) { - e_debug(conn->event, "Reconnecting in %u msecs", REPLICATOR_RECONNECT_MSECS); - conn->to = timeout_add(REPLICATOR_RECONNECT_MSECS, - replicator_connection_connect, - conn); - } - return; - } - - timeout_remove(&conn->to); - conn->fd = fd; - conn->io = io_add(fd, IO_READ, replicator_input, conn); - conn->input = i_stream_create_fd(fd, MAX_INBUF_SIZE); - conn->output = o_stream_create_fd(fd, SIZE_MAX); - e_debug(conn->event, "Sending handshake"); - o_stream_set_no_error_handling(conn->output, TRUE); - o_stream_nsend_str(conn->output, REPLICATOR_HANDSHAKE); - o_stream_set_flush_callback(conn->output, replicator_output, conn); -} - -static void replicator_abort_all_requests(struct replicator_connection *conn) -{ - struct hash_iterate_context *iter; - void *key, *value; - - e_debug(conn->event, "Aborting all requests"); - iter = hash_table_iterate_init(conn->requests); - while (hash_table_iterate(iter, conn->requests, &key, &value)) - conn->callback(FALSE, value); - hash_table_iterate_deinit(&iter); - hash_table_clear(conn->requests, TRUE); -} - -static void replicator_connection_disconnect(struct replicator_connection *conn) -{ - if (conn->fd == -1) - return; - - e_debug(conn->event, "Disconnecting"); - replicator_abort_all_requests(conn); - io_remove(&conn->io); - i_stream_destroy(&conn->input); - o_stream_destroy(&conn->output); - net_disconnect(conn->fd); - event_set_append_log_prefix(conn->event, ""); - conn->fd = -1; -} - -static struct replicator_connection *replicator_connection_create(void) -{ - struct replicator_connection *conn; - unsigned int i; - - conn = i_new(struct replicator_connection, 1); - conn->fd = -1; - conn->event = event_create(NULL); - hash_table_create_direct(&conn->requests, default_pool, 0); - for (i = REPLICATION_PRIORITY_LOW; i <= REPLICATION_PRIORITY_SYNC; i++) - conn->queue[i] = buffer_create_dynamic(default_pool, 1024); - return conn; -} - -struct replicator_connection * -replicator_connection_create_unix(const char *path, - replicator_sync_callback_t *callback) -{ - struct replicator_connection *conn; - - conn = replicator_connection_create(); - conn->callback = callback; - conn->path = i_strdup(path); - return conn; -} - -struct replicator_connection * -replicator_connection_create_inet(const struct ip_addr *ips, - unsigned int ips_count, in_port_t port, - replicator_sync_callback_t *callback) -{ - struct replicator_connection *conn; - - i_assert(ips_count > 0); - - conn = replicator_connection_create(); - conn->callback = callback; - conn->ips = i_new(struct ip_addr, ips_count); - memcpy(conn->ips, ips, sizeof(*ips) * ips_count); - conn->ips_count = ips_count; - conn->port = port; - return conn; -} - -void replicator_connection_destroy(struct replicator_connection **_conn) -{ - struct replicator_connection *conn = *_conn; - unsigned int i; - - *_conn = NULL; - replicator_connection_disconnect(conn); - - for (i = REPLICATION_PRIORITY_LOW; i <= REPLICATION_PRIORITY_SYNC; i++) - buffer_free(&conn->queue[i]); - - timeout_remove(&conn->to); - hash_table_destroy(&conn->requests); - event_unref(&conn->event); - i_free(conn->ips); - i_free(conn->path); - i_free(conn); -} - -static void -replicator_send(struct replicator_connection *conn, - enum replication_priority priority, const char *data) -{ - size_t data_len = strlen(data); - - if (conn->fd != -1 && - o_stream_get_buffer_used_size(conn->output) == 0) { - /* we can send data immediately */ - o_stream_nsend(conn->output, data, data_len); - } else if (conn->queue[priority]->used + data_len >= - REPLICATOR_MEMBUF_MAX_SIZE) { - /* FIXME: compress duplicates, start writing to file */ - } else { - /* queue internally to separate queues */ - buffer_append(conn->queue[priority], data, data_len); - if (conn->output != NULL) - o_stream_set_flush_pending(conn->output, TRUE); - } -} - -void replicator_connection_notify(struct replicator_connection *conn, - const char *username, - enum replication_priority priority) -{ - const char *priority_str = ""; - - replicator_connection_connect(conn); - - switch (priority) { - case REPLICATION_PRIORITY_NONE: - case REPLICATION_PRIORITY_SYNC: - i_unreached(); - case REPLICATION_PRIORITY_LOW: - priority_str = "low"; - break; - case REPLICATION_PRIORITY_HIGH: - priority_str = "high"; - break; - } - - T_BEGIN { - e_debug(conn->event, "Requesting replication of %s priority for user %s", - priority_str, username); - replicator_send(conn, priority, t_strdup_printf( - "U\t%s\t%s\n", str_tabescape(username), priority_str)); - } T_END; -} - -void replicator_connection_notify_sync(struct replicator_connection *conn, - const char *username, void *context) -{ - unsigned int id; - - replicator_connection_connect(conn); - - id = ++conn->request_id_counter; - if (id == 0) id++; - hash_table_insert(conn->requests, POINTER_CAST(id), context); - - T_BEGIN { - e_debug(conn->event, "Requesting synchronization with id %u for user %s", - id, username); - replicator_send(conn, REPLICATION_PRIORITY_SYNC, t_strdup_printf( - "U\t%s\tsync\t%u\n", str_tabescape(username), id)); - } T_END; -} diff --git a/src/replication/aggregator/replicator-connection.h b/src/replication/aggregator/replicator-connection.h deleted file mode 100644 index bc2c82a64c..0000000000 --- a/src/replication/aggregator/replicator-connection.h +++ /dev/null @@ -1,25 +0,0 @@ -#ifndef REPLICATOR_CONNECTION_H -#define REPLICATOR_CONNECTION_H - -#include "replication-common.h" - -typedef void replicator_sync_callback_t(bool success, void *context); - -struct replicator_connection * -replicator_connection_create_unix(const char *path, - replicator_sync_callback_t *callback); -struct replicator_connection * -replicator_connection_create_inet(const struct ip_addr *ips, - unsigned int ips_count, in_port_t port, - replicator_sync_callback_t *callback); -void replicator_connection_destroy(struct replicator_connection **conn); - -void replicator_connection_notify(struct replicator_connection *conn, - const char *username, - enum replication_priority priority); -void replicator_connection_notify_sync(struct replicator_connection *conn, - const char *username, void *context); - -extern struct replicator_connection *replicator; - -#endif diff --git a/src/replication/replication-common.h b/src/replication/replication-common.h deleted file mode 100644 index 77f711c9ba..0000000000 --- a/src/replication/replication-common.h +++ /dev/null @@ -1,48 +0,0 @@ -#ifndef REPLICATION_COMMON_H -#define REPLICATION_COMMON_H - -enum replication_priority { - /* user is fully replicated, as far as we know */ - REPLICATION_PRIORITY_NONE = 0, - /* flag changes, expunges, etc. */ - REPLICATION_PRIORITY_LOW, - /* new emails */ - REPLICATION_PRIORITY_HIGH, - /* synchronously wait for new emails to be replicated */ - REPLICATION_PRIORITY_SYNC -}; - -static inline const char * -replicator_priority_to_str(enum replication_priority priority) -{ - switch (priority) { - case REPLICATION_PRIORITY_NONE: - return "none"; - case REPLICATION_PRIORITY_LOW: - return "low"; - case REPLICATION_PRIORITY_HIGH: - return "high"; - case REPLICATION_PRIORITY_SYNC: - return "sync"; - } - i_unreached(); -} - -static inline int -replication_priority_parse(const char *str, - enum replication_priority *priority_r) -{ - if (strcmp(str, "none") == 0) - *priority_r = REPLICATION_PRIORITY_NONE; - else if (strcmp(str, "low") == 0) - *priority_r = REPLICATION_PRIORITY_LOW; - else if (strcmp(str, "high") == 0) - *priority_r = REPLICATION_PRIORITY_HIGH; - else if (strcmp(str, "sync") == 0) - *priority_r = REPLICATION_PRIORITY_SYNC; - else - return -1; - return 0; -} - -#endif diff --git a/src/replication/replicator/Makefile.am b/src/replication/replicator/Makefile.am deleted file mode 100644 index 0ae85377bb..0000000000 --- a/src/replication/replicator/Makefile.am +++ /dev/null @@ -1,61 +0,0 @@ -pkglibexecdir = $(libexecdir)/dovecot - -pkglibexec_PROGRAMS = replicator - -AM_CPPFLAGS = \ - -I$(top_srcdir)/src/lib \ - -I$(top_srcdir)/src/lib-test \ - -I$(top_srcdir)/src/lib-settings \ - -I$(top_srcdir)/src/lib-auth-client \ - -I$(top_srcdir)/src/lib-master \ - -I$(top_srcdir)/src/replication \ - -DPKG_STATEDIR=\""$(statedir)"\" \ - $(BINARY_CFLAGS) - -replicator_LDFLAGS = -export-dynamic \ - $(BINARY_LDFLAGS) - -replicator_LDADD = $(LIBDOVECOT) -replicator_DEPENDENCIES = $(LIBDOVECOT_DEPS) - -replicator_SOURCES = \ - doveadm-connection.c \ - dsync-client.c \ - replicator.c \ - replicator-brain.c \ - replicator-queue.c \ - replicator-queue-auth.c \ - replicator-settings.c \ - notify-connection.c - -noinst_HEADERS = \ - doveadm-connection.h \ - dsync-client.h \ - replicator-brain.h \ - replicator-queue.h \ - replicator-queue-private.h \ - replicator-settings.h \ - notify-connection.h - -test_programs = \ - test-replicator-queue - -noinst_PROGRAMS = $(test_programs) - -test_libs = \ - ../../lib-test/libtest.la \ - ../../lib/liblib.la - -test_deps = $(test_libs) - -test_replicator_queue_SOURCES = \ - replicator-queue.c \ - replicator-settings.c \ - test-replicator-queue.c -test_replicator_queue_LDADD = $(test_libs) -test_replicator_queue_DEPENDENCIES = $(test_deps) - -check-local: - for bin in $(test_programs); do \ - if ! $(RUN_TEST) ./$$bin; then exit 1; fi; \ - done diff --git a/src/replication/replicator/doveadm-connection.c b/src/replication/replicator/doveadm-connection.c deleted file mode 100644 index ff7c936e80..0000000000 --- a/src/replication/replicator/doveadm-connection.c +++ /dev/null @@ -1,366 +0,0 @@ -/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */ - -#include "lib.h" -#include "array.h" -#include "connection.h" -#include "ostream.h" -#include "str.h" -#include "strescape.h" -#include "wildcard-match.h" -#include "master-service.h" -#include "replicator-brain.h" -#include "replicator-queue.h" -#include "replicator-settings.h" -#include "dsync-client.h" -#include "doveadm-connection.h" - -#include - -#define REPLICATOR_DOVEADM_MAJOR_VERSION 1 -#define REPLICATOR_DOVEADM_MINOR_VERSION 0 - -struct doveadm_connection { - struct connection conn; - struct replicator_brain *brain; -}; -static struct connection_list *doveadm_connections; - -static int client_input_status_overview(struct doveadm_connection *client) -{ - struct replicator_queue *queue = - replicator_brain_get_queue(client->brain); - struct replicator_queue_iter *iter; - struct replicator_user *user; - enum replication_priority priority; - unsigned int pending_counts[REPLICATION_PRIORITY_SYNC+1]; - unsigned int user_count, next_secs, pending_failed_count; - unsigned int pending_full_resync_count, waiting_failed_count; - string_t *str = t_str_new(256); - - memset(pending_counts, 0, sizeof(pending_counts)); - pending_failed_count = 0; waiting_failed_count = 0; - pending_full_resync_count = 0; - - user_count = 0; - iter = replicator_queue_iter_init(queue); - while ((user = replicator_queue_iter_next(iter)) != NULL) { - if (user->priority != REPLICATION_PRIORITY_NONE) - pending_counts[user->priority]++; - else if (replicator_queue_want_sync_now(user, &next_secs)) { - if (user->last_sync_failed) - pending_failed_count++; - else - pending_full_resync_count++; - } else { - if (user->last_sync_failed) - waiting_failed_count++; - } - user_count++; - } - replicator_queue_iter_deinit(&iter); - - for (priority = REPLICATION_PRIORITY_SYNC; priority > 0; priority--) { - str_printfa(str, "Queued '%s' requests\t%u\n", - replicator_priority_to_str(priority), - pending_counts[priority]); - } - str_printfa(str, "Queued 'failed' requests\t%u\n", - pending_failed_count); - str_printfa(str, "Queued 'full resync' requests\t%u\n", - pending_full_resync_count); - str_printfa(str, "Waiting 'failed' requests\t%u\n", - waiting_failed_count); - str_printfa(str, "Total number of known users\t%u\n", user_count); - str_append_c(str, '\n'); - o_stream_nsend(client->conn.output, str_data(str), str_len(str)); - return 0; -} - -static int -client_input_status(struct doveadm_connection *client, const char *const *args) -{ - struct replicator_queue *queue = - replicator_brain_get_queue(client->brain); - struct replicator_queue_iter *iter; - struct replicator_user *user; - const char *mask = args[0]; - unsigned int next_secs; - string_t *str = t_str_new(128); - - if (mask == NULL) - return client_input_status_overview(client); - - iter = replicator_queue_iter_init(queue); - while ((user = replicator_queue_iter_next(iter)) != NULL) { - if (!wildcard_match(user->username, mask)) - continue; - - str_truncate(str, 0); - str_append_tabescaped(str, user->username); - str_append_c(str, '\t'); - str_append(str, replicator_priority_to_str(user->priority)); - if (replicator_queue_want_sync_now(user, &next_secs)) - next_secs = 0; - str_printfa(str, "\t%lld\t%lld\t%d\t%lld\t%u\n", - (long long)user->last_fast_sync, - (long long)user->last_full_sync, - user->last_sync_failed ? 1 : 0, - (long long)user->last_successful_sync, - next_secs); - o_stream_nsend(client->conn.output, str_data(str), str_len(str)); - } - replicator_queue_iter_deinit(&iter); - o_stream_nsend(client->conn.output, "\n", 1); - return 0; -} - -static int -client_input_status_dsyncs(struct doveadm_connection *client) -{ - string_t *str = t_str_new(256); - const ARRAY_TYPE(dsync_client) *clients; - struct dsync_client *dsync_client; - const char *username; - - clients = replicator_brain_get_dsync_clients(client->brain); - array_foreach_elem(clients, dsync_client) { - username = dsync_client_get_username(dsync_client); - if (username != NULL) { - str_append_tabescaped(str, username); - str_append_c(str, '\t'); - switch (dsync_client_get_type(dsync_client)) { - case DSYNC_TYPE_FULL: - str_append(str, "full"); - break; - case DSYNC_TYPE_NORMAL: - str_append(str, "normal"); - break; - case DSYNC_TYPE_INCREMENTAL: - str_append(str, "incremental"); - break; - } - } else { - str_append(str, "\t-"); - } - str_append_c(str, '\t'); - str_append_tabescaped(str, dsync_client_get_state(dsync_client)); - str_append_c(str, '\n'); - } - - str_append_c(str, '\n'); - o_stream_nsend(client->conn.output, str_data(str), str_len(str)); - return 0; -} - -static int -client_input_replicate(struct doveadm_connection *client, const char *const *args) -{ - struct replicator_queue *queue = - replicator_brain_get_queue(client->brain); - struct replicator_queue_iter *iter; - struct replicator_user *user; - const char *usermask; - enum replication_priority priority; - unsigned int match_count; - bool full; - - /* | */ - if (str_array_length(args) != 3) { - e_error(client->conn.event, "REPLICATE: Invalid parameters"); - return -1; - } - if (replication_priority_parse(args[0], &priority) < 0) { - o_stream_nsend_str(client->conn.output, "-Invalid priority\n"); - return 0; - } - full = strchr(args[1], 'f') != NULL; - usermask = args[2]; - if (strchr(usermask, '*') == NULL && strchr(usermask, '?') == NULL) { - struct replicator_user *user = - replicator_queue_get(queue, usermask); - if (full) - user->force_full_sync = TRUE; - e_debug(client->conn.event, "user %s: doveadm REPLICATE command (priority=%d full=%c)", - user->username, priority, full ? 'y' : 'n'); - replicator_queue_update(queue, user, priority); - replicator_queue_add(queue, user); - o_stream_nsend_str(client->conn.output, "+1\n"); - return 0; - } - - match_count = 0; - iter = replicator_queue_iter_init(queue); - while ((user = replicator_queue_iter_next(iter)) != NULL) { - if (!wildcard_match(user->username, usermask)) - continue; - if (full) - user->force_full_sync = TRUE; - e_debug(client->conn.event, "user %s: doveadm REPLICATE command (priority=%d full=%c)", - user->username, priority, full ? 'y' : 'n'); - replicator_queue_update(queue, user, priority); - replicator_queue_add(queue, user); - match_count++; - } - replicator_queue_iter_deinit(&iter); - o_stream_nsend_str(client->conn.output, - t_strdup_printf("+%u\n", match_count)); - return 0; -} - -static int -client_input_add(struct doveadm_connection *client, const char *const *args) -{ - struct replicator_queue *queue = - replicator_brain_get_queue(client->brain); - const struct replicator_settings *set = - replicator_brain_get_settings(client->brain); - - /* */ - if (str_array_length(args) != 1) { - e_error(client->conn.event, "ADD: Invalid parameters"); - return -1; - } - - if (strchr(args[0], '*') == NULL && strchr(args[0], '?') == NULL) { - struct replicator_user *user = - replicator_queue_get(queue, args[0]); - e_debug(client->conn.event, "user %s: doveadm ADD command", - user->username); - replicator_queue_add(queue, user); - } else { - e_debug(client->conn.event, "doveadm ADD command: Add usermask '%s'", - args[0]); - replicator_queue_add_auth_users(queue, set->auth_socket_path, - args[0], ioloop_time); - } - o_stream_nsend_str(client->conn.output, "+\n"); - return 0; -} - -static int -client_input_remove(struct doveadm_connection *client, const char *const *args) -{ - struct replicator_queue *queue = - replicator_brain_get_queue(client->brain); - struct replicator_user *user; - - /* */ - if (str_array_length(args) != 1) { - e_error(client->conn.event, "REMOVE: Invalid parameters"); - return -1; - } - user = replicator_queue_lookup(queue, args[0]); - if (user == NULL) - o_stream_nsend_str(client->conn.output, "-User not found\n"); - else { - replicator_queue_remove(queue, &user); - o_stream_nsend_str(client->conn.output, "+\n"); - } - return 0; -} - -static int -client_input_notify(struct doveadm_connection *client, const char *const *args) -{ - struct replicator_queue *queue = - replicator_brain_get_queue(client->brain); - struct replicator_user *user; - - /* */ - if (str_array_length(args) < 3) { - e_error(client->conn.event, "NOTIFY: Invalid parameters"); - return -1; - } - - bool full = args[1][0] == 'f'; - user = replicator_queue_get(queue, args[0]); - if (full) - user->last_full_sync = ioloop_time; - user->last_fast_sync = ioloop_time; - user->last_update = ioloop_time; - e_debug(client->conn.event, "user %s: doveadm NOTIFY command (full=%c)", - user->username, full ? 'y' : 'n'); - replicator_queue_add(queue, user); - - if (args[2][0] != '\0') { - i_free(user->state); - user->state = i_strdup(args[2]); - } - o_stream_nsend_str(client->conn.output, "+\n"); - return 0; -} - -static int client_input_args(struct connection *conn, const char *const *args) -{ - struct doveadm_connection *client = (struct doveadm_connection *)conn; - const char *cmd = args[0]; - - if (cmd == NULL) { - e_error(client->conn.event, "Empty command"); - return 0; - } - args++; - - if (strcmp(cmd, "STATUS") == 0) - return client_input_status(client, args); - else if (strcmp(cmd, "STATUS-DSYNC") == 0) - return client_input_status_dsyncs(client); - else if (strcmp(cmd, "REPLICATE") == 0) - return client_input_replicate(client, args); - else if (strcmp(cmd, "ADD") == 0) - return client_input_add(client, args); - else if (strcmp(cmd, "REMOVE") == 0) - return client_input_remove(client, args); - else if (strcmp(cmd, "NOTIFY") == 0) - return client_input_notify(client, args); - e_error(client->conn.event, "Unknown command: %s", cmd); - return -1; -} - -static void client_destroy(struct connection *conn) -{ - struct doveadm_connection *client = (struct doveadm_connection *)conn; - - connection_deinit(&client->conn); - i_free(client); - - master_service_client_connection_destroyed(master_service); -} - -void doveadm_connection_create(struct replicator_brain *brain, int fd) -{ - struct doveadm_connection *client; - - client = i_new(struct doveadm_connection, 1); - client->brain = brain; - connection_init_server(doveadm_connections, &client->conn, - "doveadm-client", fd, fd); - event_add_category(client->conn.event, &event_category_replication); -} - -static struct connection_settings doveadm_conn_set = { - .service_name_in = "replicator-doveadm-client", - .service_name_out = "replicator-doveadm-server", - .major_version = REPLICATOR_DOVEADM_MAJOR_VERSION, - .minor_version = REPLICATOR_DOVEADM_MINOR_VERSION, - - .input_max_size = SIZE_MAX, - .output_max_size = SIZE_MAX, - .client = FALSE -}; - -static const struct connection_vfuncs doveadm_conn_vfuncs = { - .destroy = client_destroy, - .input_args = client_input_args -}; - -void doveadm_connections_init(void) -{ - doveadm_connections = connection_list_init(&doveadm_conn_set, - &doveadm_conn_vfuncs); -} - -void doveadm_connections_deinit(void) -{ - connection_list_deinit(&doveadm_connections); -} diff --git a/src/replication/replicator/doveadm-connection.h b/src/replication/replicator/doveadm-connection.h deleted file mode 100644 index 066fc7bc44..0000000000 --- a/src/replication/replicator/doveadm-connection.h +++ /dev/null @@ -1,11 +0,0 @@ -#ifndef DOVEADM_CONNECTION_H -#define DOVEADM_CONNECTION_H - -struct replicator_brain; - -void doveadm_connection_create(struct replicator_brain *brain, int fd); - -void doveadm_connections_init(void); -void doveadm_connections_deinit(void); - -#endif diff --git a/src/replication/replicator/dsync-client.c b/src/replication/replicator/dsync-client.c deleted file mode 100644 index 939dc000ce..0000000000 --- a/src/replication/replicator/dsync-client.c +++ /dev/null @@ -1,278 +0,0 @@ -/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */ - -#include "lib.h" -#include "ioloop.h" -#include "net.h" -#include "istream.h" -#include "ostream.h" -#include "str.h" -#include "strescape.h" -#include "replicator-settings.h" -#include "dsync-client.h" - -#include - -#define DSYNC_FAIL_TIMEOUT_MSECS (1000*5) -#define DOVEADM_HANDSHAKE "VERSION\tdoveadm-server\t1\t0\n" - -struct dsync_client { - char *path; - int fd; - struct io *io; - struct istream *input; - struct ostream *output; - struct timeout *to; - struct event *event; - - char *dsync_params; - char *username; - char *state; - enum dsync_type sync_type; - dsync_callback_t *callback; - void *context; - - time_t last_connect_failure; - bool handshaked:1; - bool cmd_sent:1; -}; - -struct dsync_client * -dsync_client_init(const char *path, const char *dsync_params) -{ - struct dsync_client *client; - - client = i_new(struct dsync_client, 1); - client->path = i_strdup(path); - client->fd = -1; - client->dsync_params = i_strdup(dsync_params); - client->event = event_create(NULL); - event_add_category(client->event, &event_category_replication); - event_set_append_log_prefix(client->event, t_strdup_printf( - "%s: ", client->path)); - return client; -} - -static void dsync_callback(struct dsync_client *client, - const char *state, enum dsync_reply reply) -{ - dsync_callback_t *callback = client->callback; - void *context = client->context; - - timeout_remove(&client->to); - - client->callback = NULL; - client->context = NULL; - - /* make sure callback doesn't try to reuse this connection, since - we can't currently handle it */ - i_assert(!client->cmd_sent); - client->cmd_sent = TRUE; - callback(reply, state, context); - client->cmd_sent = FALSE; -} - -static void dsync_close(struct dsync_client *client) -{ - client->cmd_sent = FALSE; - client->handshaked = FALSE; - i_free_and_null(client->state); - i_free_and_null(client->username); - - if (client->fd == -1) - return; - - io_remove(&client->io); - o_stream_destroy(&client->output); - i_stream_destroy(&client->input); - i_close_fd(&client->fd); -} - -static void dsync_disconnect(struct dsync_client *client) -{ - dsync_close(client); - if (client->callback != NULL) - dsync_callback(client, "", DSYNC_REPLY_FAIL); -} - -void dsync_client_deinit(struct dsync_client **_client) -{ - struct dsync_client *client = *_client; - - *_client = NULL; - - dsync_disconnect(client); - event_unref(&client->event); - i_free(client->dsync_params); - i_free(client->path); - i_free(client); -} - -static int dsync_input_line(struct dsync_client *client, const char *line) -{ - const char *state; - - if (!client->handshaked) { - if (strcmp(line, "+") != 0) { - e_error(client->event, "Unexpected handshake: %s", line); - return -1; - } - client->handshaked = TRUE; - return 0; - } - if (client->callback == NULL) { - e_error(client->event, "Unexpected input: %s", line); - return -1; - } - if (client->state == NULL) { - client->state = i_strdup(t_strcut(line, '\t')); - return 0; - } - state = t_strdup(client->state); - line = t_strdup(line); - dsync_close(client); - - if (line[0] == '+') - dsync_callback(client, state, DSYNC_REPLY_OK); - else if (line[0] == '-') { - if (strcmp(line+1, "NOUSER") == 0) - dsync_callback(client, "", DSYNC_REPLY_NOUSER); - else if (strcmp(line+1, "NOREPLICATE") == 0) - dsync_callback(client, "", DSYNC_REPLY_NOREPLICATE); - else - dsync_callback(client, line+1, DSYNC_REPLY_FAIL); - } else { - e_error(client->event, "Invalid input: %s", line); - return -1; - } - /* FIXME: disconnect after each request for now. - doveadm server's getopt() handling seems to break otherwise. - also with multiple UIDs doveadm-server fails because setid() fails */ - return -1; -} - -static void dsync_input(struct dsync_client *client) -{ - const char *line; - - while ((line = i_stream_read_next_line(client->input)) != NULL) { - if (dsync_input_line(client, line) < 0) { - dsync_disconnect(client); - return; - } - } - if (client->input->eof) - dsync_disconnect(client); -} - -static int dsync_connect(struct dsync_client *client) -{ - if (client->fd != -1) - return 0; - - if (client->last_connect_failure == ioloop_time) - return -1; - - client->fd = net_connect_unix(client->path); - if (client->fd == -1) { - e_error(client->event, "net_connect_unix() failed: %m"); - client->last_connect_failure = ioloop_time; - return -1; - } - client->last_connect_failure = 0; - client->io = io_add(client->fd, IO_READ, dsync_input, client); - client->input = i_stream_create_fd(client->fd, SIZE_MAX); - client->output = o_stream_create_fd(client->fd, SIZE_MAX); - o_stream_set_no_error_handling(client->output, TRUE); - o_stream_nsend_str(client->output, DOVEADM_HANDSHAKE); - return 0; -} - -static void dsync_fail_timeout(struct dsync_client *client) -{ - dsync_disconnect(client); -} - -void dsync_client_sync(struct dsync_client *client, - const char *username, const char *state, bool full, - dsync_callback_t *callback, void *context) -{ - string_t *cmd; - unsigned int pos; - char *p; - - i_assert(callback != NULL); - i_assert(!dsync_client_is_busy(client)); - - client->username = i_strdup(username); - client->cmd_sent = TRUE; - client->callback = callback; - client->context = context; - if (full) - client->sync_type = DSYNC_TYPE_FULL; - else if (state != NULL && state[0] != '\0') - client->sync_type = DSYNC_TYPE_INCREMENTAL; - else - client->sync_type = DSYNC_TYPE_NORMAL; - - if (dsync_connect(client) < 0) { - i_assert(client->to == NULL); - client->to = timeout_add(DSYNC_FAIL_TIMEOUT_MSECS, - dsync_fail_timeout, client); - } else { - /* [] */ - cmd = t_str_new(256); - str_append_c(cmd, '\t'); - str_append_tabescaped(cmd, username); - str_append(cmd, "\tsync\t"); - pos = str_len(cmd); - /* insert the parameters. we can do it simply by converting - spaces into tabs, it's unlikely we'll ever need anything - more complex here. */ - str_append(cmd, client->dsync_params); - p = str_c_modifiable(cmd) + pos; - for (; *p != '\0'; p++) { - if (*p == ' ') - *p = '\t'; - } - if (full) - str_append(cmd, "\t-f"); - str_append(cmd, "\t-s\t"); - if (state != NULL) - str_append(cmd, state); - str_append_c(cmd, '\n'); - o_stream_nsend(client->output, str_data(cmd), str_len(cmd)); - } -} - -bool dsync_client_is_busy(struct dsync_client *client) -{ - return client->cmd_sent; -} - -const char *dsync_client_get_username(struct dsync_client *conn) -{ - return conn->username; -} - -enum dsync_type dsync_client_get_type(struct dsync_client *conn) -{ - return conn->sync_type; -} - -const char *dsync_client_get_state(struct dsync_client *conn) -{ - if (conn->fd == -1) { - if (conn->last_connect_failure == 0) - return "Not connected"; - return t_strdup_printf("Failed to connect to '%s' - last attempt %ld secs ago", conn->path, - (long)(ioloop_time - conn->last_connect_failure)); - } - if (!dsync_client_is_busy(conn)) - return "Idle"; - if (!conn->handshaked) - return "Waiting for handshake"; - if (conn->state == NULL) - return "Waiting for dsync to finish"; - else - return "Waiting for dsync to finish (second line)"; -} diff --git a/src/replication/replicator/dsync-client.h b/src/replication/replicator/dsync-client.h deleted file mode 100644 index c55b815bbf..0000000000 --- a/src/replication/replicator/dsync-client.h +++ /dev/null @@ -1,37 +0,0 @@ -#ifndef DSYNC_CLIENT_H -#define DSYNC_CLIENT_H - -struct dsync_client; - -enum dsync_reply { - DSYNC_REPLY_OK, - DSYNC_REPLY_FAIL, - DSYNC_REPLY_NOUSER, - DSYNC_REPLY_NOREPLICATE, -}; - -enum dsync_type { - DSYNC_TYPE_FULL, - DSYNC_TYPE_NORMAL, - DSYNC_TYPE_INCREMENTAL -}; - -ARRAY_DEFINE_TYPE(dsync_client, struct dsync_client *); - -typedef void dsync_callback_t(enum dsync_reply reply, - const char *state, void *context); - -struct dsync_client * -dsync_client_init(const char *path, const char *dsync_params); -void dsync_client_deinit(struct dsync_client **conn); - -void dsync_client_sync(struct dsync_client *conn, - const char *username, const char *state, bool full, - dsync_callback_t *callback, void *context); -bool dsync_client_is_busy(struct dsync_client *conn); - -const char *dsync_client_get_username(struct dsync_client *conn); -enum dsync_type dsync_client_get_type(struct dsync_client *conn); -const char *dsync_client_get_state(struct dsync_client *conn); - -#endif diff --git a/src/replication/replicator/notify-connection.c b/src/replication/replicator/notify-connection.c deleted file mode 100644 index 1e8634f0f8..0000000000 --- a/src/replication/replicator/notify-connection.c +++ /dev/null @@ -1,219 +0,0 @@ -/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */ - -#include "lib.h" -#include "llist.h" -#include "istream.h" -#include "ostream.h" -#include "strescape.h" -#include "master-service.h" -#include "replicator-settings.h" -#include "replicator-queue-private.h" -#include "notify-connection.h" - -#include - -#define MAX_INBUF_SIZE (1024*64) -#define NOTIFY_CLIENT_PROTOCOL_MAJOR_VERSION 1 -#define NOTIFY_CLIENT_PROTOCOL_MINOR_VERSION 0 - -struct notify_connection { - struct notify_connection *prev, *next; - int refcount; - - int fd; - struct io *io; - struct istream *input; - struct ostream *output; - struct event *event; - - struct replicator_queue *queue; - - bool version_received:1; - bool destroyed:1; -}; - -struct notify_sync_request { - struct notify_connection *conn; - unsigned int id; -}; - -static struct notify_connection *connections; - -static void notify_connection_destroy(struct notify_connection *conn); - -static void notify_sync_callback(bool success, void *context) -{ - struct notify_sync_request *request = context; - - o_stream_nsend_str(request->conn->output, t_strdup_printf( - "%c\t%u\n", success ? '+' : '-', request->id)); - - notify_connection_unref(&request->conn); - i_free(request); -} - -static int -notify_connection_input_line(struct notify_connection *conn, const char *line) -{ - struct notify_sync_request *request; - const char *const *args; - enum replication_priority priority; - unsigned int id; - - /* U \t \t [\t ] */ - args = t_strsplit_tabescaped(line); - if (str_array_length(args) < 2) { - e_error(conn->event, - "notify client sent invalid input: %s", line); - return -1; - } - if (strcmp(args[0], "U") != 0) { - e_error(conn->event, "notify client sent unknown command: %s", - args[0]); - return -1; - } - if (replication_priority_parse(args[2], &priority) < 0) { - e_error(conn->event, "notify client sent invalid priority: %s", - args[2]); - return -1; - } - if (priority != REPLICATION_PRIORITY_SYNC) { - struct replicator_user *user = - replicator_queue_get(conn->queue, args[1]); - e_debug(conn->event, "user %s: notification from client (priority=%d)", - user->username, priority); - replicator_queue_update(conn->queue, user, priority); - replicator_queue_add(conn->queue, user); - } else if (args[3] == NULL || str_to_uint(args[3], &id) < 0) { - e_error(conn->event, "notify client sent invalid sync id: %s", - line); - return -1; - } else { - request = i_new(struct notify_sync_request, 1); - request->conn = conn; - request->id = id; - notify_connection_ref(conn); - struct replicator_user *user = - replicator_queue_get(conn->queue, args[1]); - e_debug(conn->event, "user %s: sync notification from client", - user->username); - replicator_queue_update(conn->queue, user, - REPLICATION_PRIORITY_SYNC); - replicator_queue_add_sync_callback(conn->queue, user, - notify_sync_callback, - request); - } - return 0; -} - -static void notify_connection_input(struct notify_connection *conn) -{ - const char *line; - int ret; - - switch (i_stream_read(conn->input)) { - case -2: - e_error(conn->event, - "BUG: Client connection sent too much data"); - notify_connection_destroy(conn); - return; - case -1: - notify_connection_destroy(conn); - return; - } - - if (!conn->version_received) { - if ((line = i_stream_next_line(conn->input)) == NULL) - return; - - if (!version_string_verify(line, "replicator-notify", - NOTIFY_CLIENT_PROTOCOL_MAJOR_VERSION)) { - e_error(conn->event, - "Notify client not compatible with this server " - "(mixed old and new binaries?)"); - notify_connection_destroy(conn); - return; - } - conn->version_received = TRUE; - } - - while ((line = i_stream_next_line(conn->input)) != NULL) { - T_BEGIN { - ret = notify_connection_input_line(conn, line); - } T_END; - if (ret < 0) { - notify_connection_destroy(conn); - break; - } - } -} - -struct notify_connection * -notify_connection_create(int fd, struct replicator_queue *queue) -{ - struct notify_connection *conn; - - i_assert(fd >= 0); - - conn = i_new(struct notify_connection, 1); - conn->refcount = 1; - conn->event = event_create(queue->event); - event_add_category(conn->event, &event_category_replication); - conn->queue = queue; - conn->fd = fd; - conn->input = i_stream_create_fd(fd, MAX_INBUF_SIZE); - conn->output = o_stream_create_fd(fd, SIZE_MAX); - o_stream_set_no_error_handling(conn->output, TRUE); - conn->io = io_add(fd, IO_READ, notify_connection_input, conn); - conn->queue = queue; - - DLLIST_PREPEND(&connections, conn); - return conn; -} - -static void notify_connection_destroy(struct notify_connection *conn) -{ - if (conn->destroyed) - return; - conn->destroyed = TRUE; - - DLLIST_REMOVE(&connections, conn); - - io_remove(&conn->io); - i_stream_close(conn->input); - o_stream_close(conn->output); - i_close_fd(&conn->fd); - - notify_connection_unref(&conn); - master_service_client_connection_destroyed(master_service); -} - -void notify_connection_ref(struct notify_connection *conn) -{ - i_assert(conn->refcount > 0); - - conn->refcount++; -} - -void notify_connection_unref(struct notify_connection **_conn) -{ - struct notify_connection *conn = *_conn; - - i_assert(conn->refcount > 0); - - *_conn = NULL; - if (--conn->refcount > 0) - return; - - notify_connection_destroy(conn); - i_stream_unref(&conn->input); - o_stream_unref(&conn->output); - event_unref(&conn->event); - i_free(conn); -} - -void notify_connections_destroy_all(void) -{ - while (connections != NULL) - notify_connection_destroy(connections); -} diff --git a/src/replication/replicator/notify-connection.h b/src/replication/replicator/notify-connection.h deleted file mode 100644 index fa62fc7a9f..0000000000 --- a/src/replication/replicator/notify-connection.h +++ /dev/null @@ -1,13 +0,0 @@ -#ifndef NOTIFY_CONNECTION_H -#define NOTIFY_CONNECTION_H - -struct replicator_queue; - -struct notify_connection * -notify_connection_create(int fd, struct replicator_queue *queue); -void notify_connection_ref(struct notify_connection *conn); -void notify_connection_unref(struct notify_connection **conn); - -void notify_connections_destroy_all(void); - -#endif diff --git a/src/replication/replicator/replicator-brain.c b/src/replication/replicator/replicator-brain.c deleted file mode 100644 index 8e40b8887b..0000000000 --- a/src/replication/replicator/replicator-brain.c +++ /dev/null @@ -1,239 +0,0 @@ -/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */ - -#include "lib.h" -#include "array.h" -#include "ioloop.h" -#include "dsync-client.h" -#include "replicator-settings.h" -#include "replicator-queue.h" -#include "replicator-brain.h" - -struct replicator_sync_context { - struct replicator_brain *brain; - struct replicator_user *user; - struct event *event; -}; - -struct replicator_brain { - pool_t pool; - struct replicator_queue *queue; - const struct replicator_settings *set; - struct timeout *to; - struct event *event; - - ARRAY_TYPE(dsync_client) dsync_clients; - - bool deinitializing:1; -}; - -static void replicator_brain_fill(struct replicator_brain *brain); - -static void replicator_brain_timeout(struct replicator_brain *brain) -{ - e_debug(brain->event, "Delayed handling of changed queue"); - - timeout_remove(&brain->to); - replicator_brain_fill(brain); -} - -static void replicator_brain_queue_changed(void *context) -{ - struct replicator_brain *brain = context; - - /* Delay a bit filling the replication. We could have gotten here - before the replicator_user change was fully filled out. */ - timeout_remove(&brain->to); - brain->to = timeout_add_short(0, replicator_brain_timeout, brain); -} - -struct replicator_brain * -replicator_brain_init(struct replicator_queue *queue, - const struct replicator_settings *set) -{ - struct replicator_brain *brain; - pool_t pool; - - pool = pool_alloconly_create("replication brain", 1024); - brain = p_new(pool, struct replicator_brain, 1); - brain->pool = pool; - brain->queue = queue; - brain->set = set; - brain->event = event_create(NULL); - event_add_category(brain->event, &event_category_replication); - - p_array_init(&brain->dsync_clients, pool, 16); - replicator_queue_set_change_callback(queue, - replicator_brain_queue_changed, brain); - replicator_brain_fill(brain); - return brain; -} - -void replicator_brain_deinit(struct replicator_brain **_brain) -{ - struct replicator_brain *brain = *_brain; - struct dsync_client *conn; - - *_brain = NULL; - - brain->deinitializing = TRUE; - array_foreach_elem(&brain->dsync_clients, conn) - dsync_client_deinit(&conn); - timeout_remove(&brain->to); - event_unref(&brain->event); - pool_unref(&brain->pool); -} - -struct replicator_queue * -replicator_brain_get_queue(struct replicator_brain *brain) -{ - return brain->queue; -} - -const struct replicator_settings * -replicator_brain_get_settings(struct replicator_brain *brain) -{ - return brain->set; -} - -const ARRAY_TYPE(dsync_client) * -replicator_brain_get_dsync_clients(struct replicator_brain *brain) -{ - return &brain->dsync_clients; -} - -static struct dsync_client * -get_dsync_client(struct replicator_brain *brain) -{ - struct dsync_client *conn; - - array_foreach_elem(&brain->dsync_clients, conn) { - if (!dsync_client_is_busy(conn)) - return conn; - } - if (array_count(&brain->dsync_clients) == - brain->set->replication_max_conns) - return NULL; - - conn = dsync_client_init(brain->set->doveadm_socket_path, - brain->set->replication_dsync_parameters); - array_push_back(&brain->dsync_clients, &conn); - return conn; -} - -static void dsync_callback(enum dsync_reply reply, const char *state, - void *context) -{ - struct replicator_sync_context *ctx = context; - struct replicator_user *user = ctx->user; - - if (!replicator_user_unref(&user)) { - e_debug(ctx->event, "User was already removed"); - /* user was already removed */ - } else if (reply == DSYNC_REPLY_NOUSER || - reply == DSYNC_REPLY_NOREPLICATE) { - /* user no longer exists, or is not wanted for replication, - remove from replication */ - if (reply == DSYNC_REPLY_NOUSER) { - e_debug(ctx->event, "User does not exist"); - } else { - e_debug(ctx->event, "User has 'noreplicate' flag and " - "will not be replicated"); - } - replicator_queue_remove(ctx->brain->queue, &ctx->user); - } else { - i_free(ctx->user->state); - ctx->user->last_sync_failed = reply != DSYNC_REPLY_OK; - if (reply == DSYNC_REPLY_OK) { - e_debug(ctx->event, "User was successfully synced"); - ctx->user->state = i_strdup_empty(state); - ctx->user->last_successful_sync = ioloop_time; - } else { - e_debug(ctx->event, "User sync failed: %s", state); - } - replicator_queue_push(ctx->brain->queue, ctx->user); - } - event_unref(&ctx->event); - - if (!ctx->brain->deinitializing) - replicator_brain_fill(ctx->brain); - i_free(ctx); -} - -static bool -dsync_replicate(struct replicator_brain *brain, struct replicator_user *user) -{ - struct replicator_sync_context *ctx; - struct dsync_client *conn; - time_t next_full_sync; - bool full; - struct event *event = event_create(brain->event); - event_set_append_log_prefix(event, t_strdup_printf( - "%s: ", user->username)); - event_add_str(event, "user", user->username); - - conn = get_dsync_client(brain); - if (conn == NULL) { - e_debug(event, "Delay replication - dsync queue is full"); - event_unref(&event); - return FALSE; - } - - next_full_sync = user->last_full_sync + - brain->set->replication_full_sync_interval; - full = next_full_sync <= ioloop_time; - /* update the sync times immediately. if the replication fails we still - wouldn't want it to be retried immediately. */ - user->last_fast_sync = ioloop_time; - if (full || user->force_full_sync) { - user->last_full_sync = ioloop_time; - user->force_full_sync = FALSE; - } - /* reset priority also. if more updates arrive during replication - we'll do another replication to make sure nothing gets lost */ - user->priority = REPLICATION_PRIORITY_NONE; - - ctx = i_new(struct replicator_sync_context, 1); - ctx->brain = brain; - ctx->user = user; - ctx->event = event; - - e_debug(ctx->event, "Starting %s replication", - full ? "full" : "incremental"); - - replicator_user_ref(user); - dsync_client_sync(conn, user->username, user->state, full, - dsync_callback, ctx); - return TRUE; -} - -static bool replicator_brain_fill_next(struct replicator_brain *brain) -{ - struct replicator_user *user; - unsigned int next_secs; - - user = replicator_queue_pop(brain->queue, &next_secs); - if (user == NULL) { - e_debug(brain->event, "Got no user from queue, waiting for %u seconds", - next_secs); - /* nothing more to do */ - timeout_remove(&brain->to); - brain->to = timeout_add(next_secs * 1000, - replicator_brain_timeout, brain); - return FALSE; - } - - if (!dsync_replicate(brain, user)) { - /* all connections were full, put the user back to queue */ - e_debug(brain->event, "Could not replicate %s - pushing back to queue", - user->username); - replicator_queue_push(brain->queue, user); - return FALSE; - } - /* replication started for the user */ - return TRUE; -} - -static void replicator_brain_fill(struct replicator_brain *brain) -{ - while (replicator_brain_fill_next(brain)) ; -} diff --git a/src/replication/replicator/replicator-brain.h b/src/replication/replicator/replicator-brain.h deleted file mode 100644 index 6e9ae23c8c..0000000000 --- a/src/replication/replicator/replicator-brain.h +++ /dev/null @@ -1,20 +0,0 @@ -#ifndef REPLICATOR_BRAIN_H -#define REPLICATOR_BRAIN_H - -struct replicator_settings; -struct replicator_queue; - -struct replicator_brain * -replicator_brain_init(struct replicator_queue *queue, - const struct replicator_settings *set); -void replicator_brain_deinit(struct replicator_brain **brain); - -struct replicator_queue * -replicator_brain_get_queue(struct replicator_brain *brain); -const struct replicator_settings * -replicator_brain_get_settings(struct replicator_brain *brain); - -const ARRAY_TYPE(dsync_client) * -replicator_brain_get_dsync_clients(struct replicator_brain *brain); - -#endif diff --git a/src/replication/replicator/replicator-queue-auth.c b/src/replication/replicator/replicator-queue-auth.c deleted file mode 100644 index 7aed1907cd..0000000000 --- a/src/replication/replicator/replicator-queue-auth.c +++ /dev/null @@ -1,41 +0,0 @@ -/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */ - -#include "lib.h" -#include "auth-master.h" -#include "replicator-queue-private.h" - -#define REPLICATOR_AUTH_SERVICE_NAME "replicator" - -void replicator_queue_add_auth_users(struct replicator_queue *queue, - const char *auth_socket_path, - const char *usermask, time_t last_update) -{ - struct auth_master_connection *auth_conn; - struct auth_master_user_list_ctx *ctx; - struct auth_user_info user_info; - struct replicator_user *user; - const char *username; - - e_debug(queue->event, "Add users from userdb with usermask '%s'", - usermask); - - auth_conn = auth_master_init(auth_socket_path, - AUTH_MASTER_FLAG_NO_IDLE_TIMEOUT); - - i_zero(&user_info); - user_info.service = REPLICATOR_AUTH_SERVICE_NAME; - - /* add all users into replication queue, so that we can start doing - full syncs for everyone whose state can't be found */ - ctx = auth_master_user_list_init(auth_conn, usermask, &user_info); - while ((username = auth_master_user_list_next(ctx)) != NULL) { - user = replicator_queue_get(queue, username); - replicator_queue_update(queue, user, REPLICATION_PRIORITY_NONE); - replicator_queue_add(queue, user); - user->last_update = last_update; - } - if (auth_master_user_list_deinit(&ctx) < 0) - e_error(queue->event, - "listing users failed, can't replicate existing data"); - auth_master_deinit(&auth_conn); -} diff --git a/src/replication/replicator/replicator-queue-private.h b/src/replication/replicator/replicator-queue-private.h deleted file mode 100644 index fe57c1d787..0000000000 --- a/src/replication/replicator/replicator-queue-private.h +++ /dev/null @@ -1,21 +0,0 @@ -#ifndef REPLICATOR_QUEUE_PRIVATE_H -#define REPLICATOR_QUEUE_PRIVATE_H - -#include "replicator-queue.h" - -struct replicator_queue { - struct priorityq *user_queue; - struct event *event; - /* username => struct replicator_user* */ - HASH_TABLE(char *, struct replicator_user *) user_hash; - - ARRAY(struct replicator_sync_lookup) sync_lookups; - - unsigned int full_sync_interval; - unsigned int failure_resync_interval; - - void (*change_callback)(void *context); - void *change_context; -}; - -#endif diff --git a/src/replication/replicator/replicator-queue.c b/src/replication/replicator/replicator-queue.c deleted file mode 100644 index 17c76fbcc8..0000000000 --- a/src/replication/replicator/replicator-queue.c +++ /dev/null @@ -1,534 +0,0 @@ -/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */ - -#include "lib.h" -#include "array.h" -#include "ioloop.h" -#include "istream.h" -#include "ostream.h" -#include "str.h" -#include "strescape.h" -#include "hash.h" -#include "replicator-queue-private.h" -#include "replicator-settings.h" - -#include -#include - -struct replicator_sync_lookup { - struct replicator_user *user; - - replicator_sync_callback_t *callback; - void *context; - - bool wait_for_next_push; -}; - -struct replicator_queue_iter { - struct replicator_queue *queue; - struct hash_iterate_context *iter; -}; - -static unsigned int replicator_full_sync_interval = 0; -static unsigned int replicator_failure_resync_interval = 0; - -static time_t replicator_user_next_sync_time(const struct replicator_user *user) -{ - /* The idea is that the higher the priority, the more likely it will - be prioritized over low priority syncs. However, to avoid permanent - starvation of lower priority users, the priority boost is only - temporary. - - The REPLICATION_PRIORITY_*_SECS macros effectively specify how long - lower priority requests are allowed to be waiting. */ -#define REPLICATION_PRIORITY_LOW_SECS (60*15) -#define REPLICATION_PRIORITY_HIGH_SECS (60*30) -#define REPLICATION_PRIORITY_SYNC_SECS (60*45) - /* When priority != none, user needs to be replicated ASAP. - The question is just whether the queue is already busy and other - users need to be synced even more faster. */ - if (user->last_fast_sync == 0) { - /* User has never been synced yet. These will be replicated - first. Still, try to replicate higher priority users faster - than lower priority users. */ - if (user->priority != REPLICATION_PRIORITY_NONE) - return REPLICATION_PRIORITY_SYNC - user->priority; - } - switch (user->priority) { - case REPLICATION_PRIORITY_NONE: - break; - case REPLICATION_PRIORITY_LOW: - i_assert(user->last_update >= REPLICATION_PRIORITY_LOW_SECS); - return user->last_update - REPLICATION_PRIORITY_LOW_SECS; - case REPLICATION_PRIORITY_HIGH: - i_assert(user->last_update >= REPLICATION_PRIORITY_HIGH_SECS); - return user->last_update - REPLICATION_PRIORITY_HIGH_SECS; - case REPLICATION_PRIORITY_SYNC: - i_assert(user->last_update >= REPLICATION_PRIORITY_HIGH_SECS); - return user->last_update - REPLICATION_PRIORITY_SYNC_SECS; - } - if (user->last_sync_failed) { - /* failures need to be retried at specific intervals */ - return user->last_fast_sync + - replicator_failure_resync_interval; - } - /* full resyncs should be done at configured intervals */ - return user->last_full_sync + replicator_full_sync_interval; -} - -static int user_priority_cmp(const void *p1, const void *p2) -{ - const struct replicator_user *user1 = p1, *user2 = p2; - time_t next_sync1 = replicator_user_next_sync_time(user1); - time_t next_sync2 = replicator_user_next_sync_time(user2); - if (next_sync1 < next_sync2) - return -1; - if (next_sync1 > next_sync2) - return 1; - return 0; -} - -struct replicator_queue * -replicator_queue_init(unsigned int full_sync_interval, - unsigned int failure_resync_interval) -{ - struct replicator_queue *queue; - - /* priorityq callback needs to access these */ - i_assert(replicator_full_sync_interval == 0 || - replicator_full_sync_interval == full_sync_interval); - replicator_full_sync_interval = full_sync_interval; - i_assert(replicator_failure_resync_interval == 0 || - replicator_failure_resync_interval == failure_resync_interval); - replicator_full_sync_interval = full_sync_interval; - replicator_failure_resync_interval = failure_resync_interval; - - queue = i_new(struct replicator_queue, 1); - queue->full_sync_interval = full_sync_interval; - queue->failure_resync_interval = failure_resync_interval; - queue->user_queue = priorityq_init(user_priority_cmp, 1024); - hash_table_create(&queue->user_hash, default_pool, 1024, - str_hash, strcmp); - i_array_init(&queue->sync_lookups, 32); - queue->event = event_create(NULL); - event_add_category(queue->event, &event_category_replication); - return queue; -} - -void replicator_queue_deinit(struct replicator_queue **_queue) -{ - struct replicator_queue *queue = *_queue; - struct priorityq_item *item; - - *_queue = NULL; - - queue->change_callback = NULL; - - while ((item = priorityq_pop(queue->user_queue)) != NULL) { - struct replicator_user *user = (struct replicator_user *)item; - - user->popped = TRUE; - replicator_queue_remove(queue, &user); - } - - priorityq_deinit(&queue->user_queue); - hash_table_destroy(&queue->user_hash); - i_assert(array_count(&queue->sync_lookups) == 0); - array_free(&queue->sync_lookups); - event_unref(&queue->event); - i_free(queue); -} - -void replicator_queue_set_change_callback(struct replicator_queue *queue, - void (*callback)(void *context), - void *context) -{ - queue->change_callback = callback; - queue->change_context = context; -} - -void replicator_user_ref(struct replicator_user *user) -{ - i_assert(user->refcount > 0); - user->refcount++; -} - -bool replicator_user_unref(struct replicator_user **_user) -{ - struct replicator_user *user = *_user; - - i_assert(user->refcount > 0); - *_user = NULL; - if (--user->refcount > 0) - return TRUE; - - i_free(user->state); - i_free(user->username); - i_free(user); - return FALSE; -} - -struct replicator_user * -replicator_queue_lookup(struct replicator_queue *queue, const char *username) -{ - return hash_table_lookup(queue->user_hash, username); -} - -struct replicator_user * -replicator_queue_get(struct replicator_queue *queue, const char *username) -{ - struct replicator_user *user; - - user = replicator_queue_lookup(queue, username); - if (user == NULL) { - e_debug(queue->event, "user %s: User not found from queue - adding", username); - user = i_new(struct replicator_user, 1); - user->refcount = 1; - user->username = i_strdup(username); - user->last_update = ioloop_time; - hash_table_insert(queue->user_hash, user->username, user); - if (!user->popped) - priorityq_add(queue->user_queue, &user->item); - } - return user; -} - -void replicator_queue_update(struct replicator_queue *queue, - struct replicator_user *user, - enum replication_priority priority) -{ - if (user->priority >= priority) { - /* user already has at least this high priority */ - e_debug(queue->event, "user %s: Ignoring priority %u update, " - "since user already has priority=%u", - user->username, priority, user->priority); - return; - } - e_debug(queue->event, "user %s: Updating priority %u -> %u", - user->username, user->priority, priority); - user->priority = priority; - user->last_update = ioloop_time; -} - -void replicator_queue_add(struct replicator_queue *queue, - struct replicator_user *user) -{ - if (!user->popped) { - priorityq_remove(queue->user_queue, &user->item); - priorityq_add(queue->user_queue, &user->item); - } - if (queue->change_callback != NULL) { - e_debug(queue->event, "user %s: Queue changed - calling callback", - user->username); - queue->change_callback(queue->change_context); - } -} - -void replicator_queue_add_sync_callback(struct replicator_queue *queue, - struct replicator_user *user, - replicator_sync_callback_t *callback, - void *context) -{ - struct replicator_sync_lookup *lookup; - - i_assert(user->priority == REPLICATION_PRIORITY_SYNC); - - lookup = array_append_space(&queue->sync_lookups); - lookup->user = user; - lookup->callback = callback; - lookup->context = context; - lookup->wait_for_next_push = user->popped; - - replicator_queue_add(queue, user); -} - -void replicator_queue_remove(struct replicator_queue *queue, - struct replicator_user **_user) -{ - struct replicator_user *user = *_user; - - *_user = NULL; - e_debug(queue->event, "user %s: Removing user from queue", user->username); - if (!user->popped) - priorityq_remove(queue->user_queue, &user->item); - hash_table_remove(queue->user_hash, user->username); - - if (queue->change_callback != NULL) { - e_debug(queue->event, "user %s: Queue changed - calling callback", - user->username); - queue->change_callback(queue->change_context); - } - replicator_user_unref(&user); -} - -unsigned int replicator_queue_count(struct replicator_queue *queue) -{ - return priorityq_count(queue->user_queue); -} - -bool replicator_queue_want_sync_now(struct replicator_user *user, - unsigned int *next_secs_r) -{ - time_t next_sync = replicator_user_next_sync_time(user); - if (next_sync <= ioloop_time) { - *next_secs_r = 0; - return TRUE; - } - *next_secs_r = next_sync - ioloop_time; - return FALSE; -} - -struct replicator_user * -replicator_queue_peek(struct replicator_queue *queue, - unsigned int *next_secs_r) -{ - struct priorityq_item *item; - struct replicator_user *user; - - item = priorityq_peek(queue->user_queue); - if (item == NULL) { - /* no users defined. we shouldn't normally get here */ - *next_secs_r = 3600; - return NULL; - } - user = (struct replicator_user *)item; - (void)replicator_queue_want_sync_now(user, next_secs_r); - return user; -} - -struct replicator_user * -replicator_queue_pop(struct replicator_queue *queue, - unsigned int *next_secs_r) -{ - struct replicator_user *user; - - user = replicator_queue_peek(queue, next_secs_r); - if (*next_secs_r > 0) { - /* we don't want to sync the user yet */ - return NULL; - } - if (user != NULL) { - priorityq_remove(queue->user_queue, &user->item); - user->popped = TRUE; - } - return user; -} - -static void -replicator_queue_handle_sync_lookups(struct replicator_queue *queue, - struct replicator_user *user) -{ - struct replicator_sync_lookup *lookups; - ARRAY(struct replicator_sync_lookup) callbacks; - unsigned int i, count; - bool success = !user->last_sync_failed; - - t_array_init(&callbacks, 8); - lookups = array_get_modifiable(&queue->sync_lookups, &count); - for (i = 0; i < count; ) { - if (lookups[i].user != user) - i++; - else if (lookups[i].wait_for_next_push) { - /* another sync request came while user was being - replicated */ - i_assert(user->priority == REPLICATION_PRIORITY_SYNC); - lookups[i].wait_for_next_push = FALSE; - i++; - } else { - array_push_back(&callbacks, &lookups[i]); - array_delete(&queue->sync_lookups, i, 1); - } - } - - e_debug(queue->event, "user %s: Handled sync lookups", user->username); - - array_foreach_modifiable(&callbacks, lookups) - lookups->callback(success, lookups->context); -} - -void replicator_queue_push(struct replicator_queue *queue, - struct replicator_user *user) -{ - i_assert(user->popped); - - priorityq_add(queue->user_queue, &user->item); - user->popped = FALSE; - - T_BEGIN { - replicator_queue_handle_sync_lookups(queue, user); - } T_END; -} - -static int -replicator_queue_import_line(struct replicator_queue *queue, const char *line) -{ - const char *const *args, *username, *state; - unsigned int priority; - struct replicator_user *user, tmp_user; - - /* - */ - args = t_strsplit_tabescaped(line); - if (str_array_length(args) < 7) - return -1; - - i_zero(&tmp_user); - username = args[0]; - state = t_strdup_noconst(args[6]); - if (username[0] == '\0' || - str_to_uint(args[1], &priority) < 0 || - str_to_time(args[2], &tmp_user.last_update) < 0 || - str_to_time(args[3], &tmp_user.last_fast_sync) < 0 || - str_to_time(args[4], &tmp_user.last_full_sync) < 0) - return -1; - tmp_user.priority = priority; - tmp_user.last_sync_failed = args[5][0] != '0'; - - if (str_array_length(args) >= 8) { - if (str_to_time(args[7], &tmp_user.last_successful_sync) < 0) - return -1; - } else { - tmp_user.last_successful_sync = 0; - /* On-disk format didn't have this yet */ - } - - user = hash_table_lookup(queue->user_hash, username); - if (user != NULL) { - if (user->last_update > tmp_user.last_update) { - /* we already have a newer state */ - return 0; - } - if (user->last_update == tmp_user.last_update) { - /* either one of these could be newer. use the one - with higher priority. */ - if (user->priority > tmp_user.priority) - return 0; - } - } else { - user = replicator_queue_get(queue, username); - } - user->priority = tmp_user.priority; - user->last_update = tmp_user.last_update; - user->last_fast_sync = tmp_user.last_fast_sync; - user->last_full_sync = tmp_user.last_full_sync; - user->last_successful_sync = tmp_user.last_successful_sync; - user->last_sync_failed = tmp_user.last_sync_failed; - i_free(user->state); - user->state = i_strdup(state); - replicator_queue_add(queue, user); - return 0; -} - -int replicator_queue_import(struct replicator_queue *queue, const char *path) -{ - struct istream *input; - const char *line; - int fd, ret = 0; - - e_debug(queue->event, "Importing queue from %s", path); - - fd = open(path, O_RDONLY); - if (fd == -1) { - if (errno == ENOENT) - return 0; - e_error(queue->event, "open(%s) failed: %m", path); - return -1; - } - - input = i_stream_create_fd_autoclose(&fd, SIZE_MAX); - while ((line = i_stream_read_next_line(input)) != NULL) { - T_BEGIN { - ret = replicator_queue_import_line(queue, line); - } T_END; - if (ret < 0) { - e_error(queue->event, - "Corrupted replicator record in %s: %s", path, line); - break; - } - } - if (input->stream_errno != 0) { - e_error(queue->event, "read(%s) failed: %s", path, i_stream_get_error(input)); - ret = -1; - } - i_stream_destroy(&input); - return ret; -} - -static void -replicator_queue_export_user(struct replicator_user *user, string_t *str) -{ - str_append_tabescaped(str, user->username); - str_printfa(str, "\t%d\t%lld\t%lld\t%lld\t%d\t", (int)user->priority, - (long long)user->last_update, - (long long)user->last_fast_sync, - (long long)user->last_full_sync, - user->last_sync_failed ? 1 : 0); - if (user->state != NULL) - str_append_tabescaped(str, user->state); - str_printfa(str, "\t%lld\n", (long long)user->last_successful_sync); -} - -int replicator_queue_export(struct replicator_queue *queue, const char *path) -{ - struct replicator_queue_iter *iter; - struct replicator_user *user; - struct ostream *output; - string_t *str; - int fd, ret = 0; - - fd = creat(path, 0600); - if (fd == -1) { - e_error(queue->event, "creat(%s) failed: %m", path); - return -1; - } - output = o_stream_create_fd_file_autoclose(&fd, 0); - o_stream_cork(output); - - str = t_str_new(128); - iter = replicator_queue_iter_init(queue); - while ((user = replicator_queue_iter_next(iter)) != NULL) { - str_truncate(str, 0); - replicator_queue_export_user(user, str); - if (o_stream_send(output, str_data(str), str_len(str)) < 0) - break; - } - replicator_queue_iter_deinit(&iter); - if (o_stream_finish(output) < 0) { - e_error(queue->event, "write(%s) failed: %s", path, o_stream_get_error(output)); - ret = -1; - } - o_stream_destroy(&output); - return ret; -} - -struct replicator_queue_iter * -replicator_queue_iter_init(struct replicator_queue *queue) -{ - struct replicator_queue_iter *iter; - - iter = i_new(struct replicator_queue_iter, 1); - iter->queue = queue; - iter->iter = hash_table_iterate_init(queue->user_hash); - return iter; -} - -struct replicator_user * -replicator_queue_iter_next(struct replicator_queue_iter *iter) -{ - struct replicator_user *user; - char *username; - - if (!hash_table_iterate(iter->iter, iter->queue->user_hash, - &username, &user)) - return NULL; - return user; -} - -void replicator_queue_iter_deinit(struct replicator_queue_iter **_iter) -{ - struct replicator_queue_iter *iter = *_iter; - - *_iter = NULL; - - hash_table_iterate_deinit(&iter->iter); - i_free(iter); -} diff --git a/src/replication/replicator/replicator-queue.h b/src/replication/replicator/replicator-queue.h deleted file mode 100644 index 4e021e20bd..0000000000 --- a/src/replication/replicator/replicator-queue.h +++ /dev/null @@ -1,104 +0,0 @@ -#ifndef REPLICATOR_QUEUE_H -#define REPLICATOR_QUEUE_H - -#include "priorityq.h" -#include "replication-common.h" - -struct replicator_user { - struct priorityq_item item; - - char *username; - /* dsync state for incremental syncing */ - char *state; - /* last time this user's state was updated */ - time_t last_update; - /* last_fast_sync is always >= last_full_sync. */ - time_t last_fast_sync, last_full_sync, last_successful_sync; - - int refcount; - enum replication_priority priority; - /* User isn't currently in replication queue */ - bool popped:1; - /* Last replication sync failed */ - bool last_sync_failed:1; - /* Force a full sync on the next replication */ - bool force_full_sync:1; -}; - -typedef void replicator_sync_callback_t(bool success, void *context); - -struct replicator_queue * -replicator_queue_init(unsigned int full_sync_interval, - unsigned int failure_resync_interval); -void replicator_queue_deinit(struct replicator_queue **queue); - -/* Call the specified callback when data is added/removed/moved in queue - via _add(), _add_sync() or _remove() functions (not push/pop). */ -void replicator_queue_set_change_callback(struct replicator_queue *queue, - void (*callback)(void *context), - void *context); - -/* Reference the user */ -void replicator_user_ref(struct replicator_user *user); -/* Unreference the user. Returns TRUE if refcount is still >0. */ -bool replicator_user_unref(struct replicator_user **user); - -/* Lookup an existing user */ -struct replicator_user * -replicator_queue_lookup(struct replicator_queue *queue, const char *username); -/* Lookup or create a user and return it. Afterwards replicator_queue_add() - must be called to add/move the user to the proper place in the queue. */ -struct replicator_user * -replicator_queue_get(struct replicator_queue *queue, const char *username); -/* Update user's priority if it's currently lower. */ -void replicator_queue_update(struct replicator_queue *queue, - struct replicator_user *user, - enum replication_priority priority); -void replicator_queue_add(struct replicator_queue *queue, - struct replicator_user *user); -/* Call the callback when user with SYNC priority has finished syncing. */ -void replicator_queue_add_sync_callback(struct replicator_queue *queue, - struct replicator_user *user, - replicator_sync_callback_t *callback, - void *context); -/* Remove user from replication queue and free it. */ -void replicator_queue_remove(struct replicator_queue *queue, - struct replicator_user **user); -/* Return the number of users in the queue. */ -unsigned int replicator_queue_count(struct replicator_queue *queue); - -/* Return the next user from replication queue and how many seconds from now - the returned user should be synced (0 = immediately). Returns NULL only if - there are no users in the queue. */ -struct replicator_user * -replicator_queue_peek(struct replicator_queue *queue, - unsigned int *next_secs_r); -/* Return the next user from replication queue, and remove it from the queue. - If there's nothing to be replicated currently, returns NULL and sets - next_secs_r to when there should be more work to do. */ -struct replicator_user * -replicator_queue_pop(struct replicator_queue *queue, - unsigned int *next_secs_r); -/* Add user back to queue. */ -void replicator_queue_push(struct replicator_queue *queue, - struct replicator_user *user); - -int replicator_queue_import(struct replicator_queue *queue, const char *path); -int replicator_queue_export(struct replicator_queue *queue, const char *path); - -/* Returns TRUE if user replication can be started now, FALSE if not. When - returning FALSE, next_secs_r is set to user's next replication time. */ -bool replicator_queue_want_sync_now(struct replicator_user *user, - unsigned int *next_secs_r); -/* Iterate through all users in the queue. */ -struct replicator_queue_iter * -replicator_queue_iter_init(struct replicator_queue *queue); -struct replicator_user * -replicator_queue_iter_next(struct replicator_queue_iter *iter); -void replicator_queue_iter_deinit(struct replicator_queue_iter **iter); - -void replicator_queue_add_auth_users(struct replicator_queue *queue, - const char *auth_socket_path, - const char *usermask, time_t last_update); - -#endif diff --git a/src/replication/replicator/replicator-settings.c b/src/replication/replicator/replicator-settings.c deleted file mode 100644 index 25e2238d66..0000000000 --- a/src/replication/replicator/replicator-settings.c +++ /dev/null @@ -1,101 +0,0 @@ -/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */ - -#include "lib.h" -#include "buffer.h" -#include "settings-parser.h" -#include "service-settings.h" -#include "replicator-settings.h" - -struct event_category event_category_replication = { - .name = "replication" -}; - -/* */ -static struct file_listener_settings replicator_unix_listeners_array[] = { - { - .path = "replicator", - .mode = 0600, - .user = "$default_internal_user", - .group = "", - }, - { - .path = "replicator-doveadm", - .type = "doveadm", - .mode = 0, - .user = "$default_internal_user", - .group = "", - }, -}; -static struct file_listener_settings *replicator_unix_listeners[] = { - &replicator_unix_listeners_array[0], - &replicator_unix_listeners_array[1] -}; -static buffer_t replicator_unix_listeners_buf = { - { { replicator_unix_listeners, sizeof(replicator_unix_listeners) } } -}; -/* */ - -struct service_settings replicator_service_settings = { - .name = "replicator", - .protocol = "", - .type = "", - .executable = "replicator", - .user = "", - .group = "", - .privileged_group = "", - .extra_groups = "", - .chroot = "", - - .drop_priv_before_exec = FALSE, - - .process_min_avail = 0, - .process_limit = 1, - .client_limit = 0, - .service_count = 0, - .idle_kill = UINT_MAX, - .vsz_limit = UOFF_T_MAX, - - .unix_listeners = { { &replicator_unix_listeners_buf, - sizeof(replicator_unix_listeners[0]) } }, - .fifo_listeners = ARRAY_INIT, - .inet_listeners = ARRAY_INIT, - - .process_limit_1 = TRUE -}; - -#undef DEF -#define DEF(type, name) \ - SETTING_DEFINE_STRUCT_##type(#name, name, struct replicator_settings) - -static const struct setting_define replicator_setting_defines[] = { - DEF(STR, auth_socket_path), - DEF(STR, doveadm_socket_path), - DEF(STR, replication_dsync_parameters), - - DEF(TIME, replication_full_sync_interval), - DEF(UINT, replication_max_conns), - - SETTING_DEFINE_LIST_END -}; - -const struct replicator_settings replicator_default_settings = { - .auth_socket_path = "auth-userdb", - .doveadm_socket_path = "doveadm-server", - .replication_dsync_parameters = "-d -N -l 30 -U", - - .replication_full_sync_interval = 60*60*24, - .replication_max_conns = 10 -}; - -const struct setting_parser_info replicator_setting_parser_info = { - .module_name = "replicator", - .defines = replicator_setting_defines, - .defaults = &replicator_default_settings, - - .type_offset = SIZE_MAX, - .struct_size = sizeof(struct replicator_settings), - - .parent_offset = SIZE_MAX -}; - -const struct replicator_settings *replicator_settings; diff --git a/src/replication/replicator/replicator-settings.h b/src/replication/replicator/replicator-settings.h deleted file mode 100644 index 5e0f6569cb..0000000000 --- a/src/replication/replicator/replicator-settings.h +++ /dev/null @@ -1,17 +0,0 @@ -#ifndef REPLICATOR_SETTINGS_H -#define REPLICATOR_SETTINGS_H - -struct replicator_settings { - const char *auth_socket_path; - const char *doveadm_socket_path; - const char *replication_dsync_parameters; - - unsigned int replication_full_sync_interval; - unsigned int replication_max_conns; -}; - -extern const struct setting_parser_info replicator_setting_parser_info; -extern const struct replicator_settings *replicator_settings; -extern struct event_category event_category_replication; - -#endif diff --git a/src/replication/replicator/replicator.c b/src/replication/replicator/replicator.c deleted file mode 100644 index 37ea293e38..0000000000 --- a/src/replication/replicator/replicator.c +++ /dev/null @@ -1,118 +0,0 @@ -/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */ - -#include "lib.h" -#include "ioloop.h" -#include "restrict-access.h" -#include "auth-master.h" -#include "master-service.h" -#include "master-service-settings.h" -#include "notify-connection.h" -#include "doveadm-connection.h" -#include "replicator-brain.h" -#include "replicator-queue.h" -#include "replicator-settings.h" - -#define REPLICATOR_DB_DUMP_INTERVAL_MSECS (1000*60*15) -/* if syncing fails, try again in 5 minutes */ -#define REPLICATOR_FAILURE_RESYNC_INTERVAL_SECS (60*5) -#define REPLICATOR_DB_FNAME "replicator.db" - -static struct replicator_queue *queue; -static struct replicator_brain *brain; -static const struct master_service_settings *service_set; -static const struct replicator_settings *set; -static struct timeout *to_dump; - -static void client_connected(struct master_service_connection *conn) -{ - const char *type; - - master_service_client_connection_accept(conn); - type = master_service_connection_get_type(conn); - if (strcmp(type, "doveadm") == 0) - doveadm_connection_create(brain, conn->fd); - else - (void)notify_connection_create(conn->fd, queue); -} - -static void replication_add_users(struct replicator_queue *queue) -{ - const char *path; - - replicator_queue_add_auth_users(queue, set->auth_socket_path, "*", 0); - - /* add updates from replicator db, if it exists */ - path = t_strconcat(service_set->state_dir, "/"REPLICATOR_DB_FNAME, NULL); - (void)replicator_queue_import(queue, path); -} - -static void ATTR_NULL(1) -replicator_dump_timeout(void *context ATTR_UNUSED) -{ - const char *path; - - path = t_strconcat(service_set->state_dir, "/"REPLICATOR_DB_FNAME, NULL); - (void)replicator_queue_export(queue, path); -} - -static void main_init(void) -{ - service_set = master_service_settings_get(master_service); - set = master_service_settings_get_root_set(master_service, - &replicator_setting_parser_info); - - queue = replicator_queue_init(set->replication_full_sync_interval, - REPLICATOR_FAILURE_RESYNC_INTERVAL_SECS); - replication_add_users(queue); - to_dump = timeout_add(REPLICATOR_DB_DUMP_INTERVAL_MSECS, - replicator_dump_timeout, NULL); - brain = replicator_brain_init(queue, set); - doveadm_connections_init(); -} - -static void main_deinit(void) -{ - const char *path; - - doveadm_connections_deinit(); - notify_connections_destroy_all(); - replicator_brain_deinit(&brain); - timeout_remove(&to_dump); - path = t_strconcat(service_set->state_dir, "/"REPLICATOR_DB_FNAME, NULL); - (void)replicator_queue_export(queue, path); - replicator_queue_deinit(&queue); -} - -int main(int argc, char *argv[]) -{ - const struct setting_parser_info *set_roots[] = { - &replicator_setting_parser_info, - NULL - }; - const enum master_service_flags service_flags = - MASTER_SERVICE_FLAG_NO_IDLE_DIE; - const char *error; - - master_service = master_service_init("replicator", service_flags, - &argc, &argv, ""); - if (master_getopt(master_service) > 0) - return FATAL_DEFAULT; - - if (master_service_settings_read_simple(master_service, set_roots, - &error) < 0) - i_fatal("Error reading configuration: %s", error); - master_service_init_log(master_service); - - restrict_access_by_env(RESTRICT_ACCESS_FLAG_ALLOW_ROOT, NULL); - restrict_access_allow_coredumps(TRUE); - /* finish init before we get list of users from auth, because that - can take long enough for master process to kill us otherwise. */ - master_service_init_finish(master_service); - - main_init(); - master_service_run(master_service, client_connected); - main_deinit(); - - master_service_deinit(&master_service); - return 0; -} diff --git a/src/replication/replicator/test-replicator-queue.c b/src/replication/replicator/test-replicator-queue.c deleted file mode 100644 index 9ffa5ff54d..0000000000 --- a/src/replication/replicator/test-replicator-queue.c +++ /dev/null @@ -1,260 +0,0 @@ -/* Copyright (c) 2022 Dovecot authors, see the included COPYING file */ - -#include "lib.h" -#include "ioloop.h" -#include "test-common.h" -#include "replicator-queue.h" - -#define TEST_REPLICATION_FULL_SYNC_INTERVAL 60 -#define TEST_REPLICATION_FAILURE_RESYNC_INTERVAL 10 - -static void test_replicator_queue(void) -{ - struct replicator_queue *queue; - struct replicator_user *user1, *user2, *user3, *user4; - unsigned int next_secs; - - test_begin("replicator queue"); - queue = replicator_queue_init(TEST_REPLICATION_FULL_SYNC_INTERVAL, - TEST_REPLICATION_FAILURE_RESYNC_INTERVAL); - ioloop_time = time(NULL); - - /* 1) Add users */ - - /* add the 1st user with priority=none */ - user1 = replicator_queue_get(queue, "user1"); - replicator_queue_update(queue, user1, REPLICATION_PRIORITY_NONE); - replicator_queue_add(queue, user1); - test_assert(replicator_queue_count(queue) == 1); - test_assert(replicator_queue_peek(queue, &next_secs) == user1 && next_secs == 0); - - /* add the 2nd user with priority=none */ - user2 = replicator_queue_get(queue, "user2"); - replicator_queue_update(queue, user2, REPLICATION_PRIORITY_NONE); - replicator_queue_add(queue, user2); - test_assert(replicator_queue_count(queue) == 2); - test_assert(replicator_queue_peek(queue, &next_secs) == user1 && next_secs == 0); - - /* add the 3rd user with priority=none */ - user3 = replicator_queue_get(queue, "user3"); - replicator_queue_update(queue, user3, REPLICATION_PRIORITY_NONE); - replicator_queue_add(queue, user3); - test_assert(replicator_queue_count(queue) == 3); - test_assert(replicator_queue_peek(queue, &next_secs) == user1 && next_secs == 0); - - /* 2) User hasn't been synced yet, but priority is updated */ - - /* update the 2nd user's priority to low */ - user2 = replicator_queue_get(queue, "user2"); - replicator_queue_update(queue, user2, REPLICATION_PRIORITY_LOW); - replicator_queue_add(queue, user2); - test_assert(replicator_queue_peek(queue, &next_secs) == user2 && next_secs == 0); - - /* update the 1st user's priority to high */ - user1 = replicator_queue_get(queue, "user1"); - replicator_queue_update(queue, user1, REPLICATION_PRIORITY_HIGH); - replicator_queue_add(queue, user1); - test_assert(replicator_queue_peek(queue, &next_secs) == user1 && next_secs == 0); - - /* update the 2nd user's priority to sync */ - user2 = replicator_queue_get(queue, "user2"); - replicator_queue_update(queue, user2, REPLICATION_PRIORITY_SYNC); - replicator_queue_add(queue, user2); - test_assert(replicator_queue_peek(queue, &next_secs) == user2 && next_secs == 0); - - /* 3) User hasn't been synced, and priority is being updated. - user1 was synced 1 second before user2. */ - user1->last_fast_sync = ioloop_time; - user1->last_full_sync = ioloop_time; - user1->priority = REPLICATION_PRIORITY_NONE; - replicator_queue_add(queue, user1); - ioloop_time++; - user2->last_fast_sync = ioloop_time; - user2->last_full_sync = ioloop_time; - user2->priority = REPLICATION_PRIORITY_NONE; - replicator_queue_add(queue, user2); - ioloop_time++; - user3->last_fast_sync = ioloop_time; - user3->last_full_sync = ioloop_time; - user3->priority = REPLICATION_PRIORITY_NONE; - replicator_queue_add(queue, user3); - test_assert(replicator_queue_peek(queue, &next_secs) == user1 && next_secs > 0); - - /* update the 2nd user's priority to low */ - user2 = replicator_queue_get(queue, "user2"); - replicator_queue_update(queue, user2, REPLICATION_PRIORITY_LOW); - replicator_queue_add(queue, user2); - test_assert(replicator_queue_peek(queue, &next_secs) == user2 && next_secs == 0); - - /* update the 1st user's priority to high */ - user1 = replicator_queue_get(queue, "user1"); - replicator_queue_update(queue, user1, REPLICATION_PRIORITY_HIGH); - replicator_queue_add(queue, user1); - test_assert(replicator_queue_peek(queue, &next_secs) == user1 && next_secs == 0); - - /* update the 2nd user's priority to sync */ - user2 = replicator_queue_get(queue, "user2"); - replicator_queue_update(queue, user2, REPLICATION_PRIORITY_SYNC); - replicator_queue_add(queue, user2); - test_assert(replicator_queue_peek(queue, &next_secs) == user2 && next_secs == 0); - - /* 4) Test failed sync with a new user */ - user1->priority = REPLICATION_PRIORITY_NONE; - replicator_queue_add(queue, user1); - user2->priority = REPLICATION_PRIORITY_NONE; - replicator_queue_add(queue, user2); - - user4 = replicator_queue_get(queue, "user4"); - user4->last_fast_sync = ioloop_time - 5; - user4->last_sync_failed = TRUE; - replicator_queue_add(queue, user4); - - test_assert(replicator_queue_count(queue) == 4); - test_assert(replicator_queue_peek(queue, &next_secs) == user4 && - next_secs == TEST_REPLICATION_FAILURE_RESYNC_INTERVAL - 5); - - /* low priority sync is prioritized over failed sync */ - replicator_queue_update(queue, user1, REPLICATION_PRIORITY_LOW); - replicator_queue_add(queue, user1); - test_assert(replicator_queue_peek(queue, &next_secs) == user1 && next_secs == 0); - - /* However, if the last failure was old enough it will be before - the low priority one. Test the edge case. */ - user4->last_fast_sync = ioloop_time - - TEST_REPLICATION_FAILURE_RESYNC_INTERVAL - - (60*15) - 1; - replicator_queue_add(queue, user4); - test_assert(replicator_queue_peek(queue, &next_secs) == user4 && next_secs == 0); - user4->last_fast_sync++; - replicator_queue_add(queue, user4); - test_assert(replicator_queue_peek(queue, &next_secs) == user1 && next_secs == 0); - - /* 5) Test priority starvation */ - - /* high priority is normally prioritized over low priority */ - i_assert(user1->priority == REPLICATION_PRIORITY_LOW); - user2 = replicator_queue_get(queue, "user2"); - replicator_queue_update(queue, user2, REPLICATION_PRIORITY_HIGH); - replicator_queue_add(queue, user2); - test_assert(replicator_queue_peek(queue, &next_secs) == user2 && next_secs == 0); - - /* if low priority is old enough, it gets prioritized over high */ - user1->last_update = ioloop_time - (60*15) - 1; - replicator_queue_add(queue, user1); - test_assert(replicator_queue_peek(queue, &next_secs) == user1 && next_secs == 0); - user1->last_update++; - replicator_queue_add(queue, user1); - test_assert(replicator_queue_peek(queue, &next_secs) == user2 && next_secs == 0); - - /* similarly low priority eventually gets prioritized over sync - priority */ - replicator_queue_update(queue, user2, REPLICATION_PRIORITY_SYNC); - replicator_queue_add(queue, user2); - user1->last_update = ioloop_time - (60*30) - 1; - replicator_queue_add(queue, user1); - test_assert(replicator_queue_peek(queue, &next_secs) == user1 && next_secs == 0); - user1->last_update++; - replicator_queue_add(queue, user1); - test_assert(replicator_queue_peek(queue, &next_secs) == user2 && next_secs == 0); - - /* likewise for none priority also */ - user1->priority = REPLICATION_PRIORITY_NONE; - user1->last_update = ioloop_time; - user1->last_fast_sync = ioloop_time; - user1->last_full_sync = ioloop_time - (60*45) - - TEST_REPLICATION_FULL_SYNC_INTERVAL - 1; - replicator_queue_add(queue, user1); - test_assert(replicator_queue_peek(queue, &next_secs) == user1 && next_secs == 0); - user1->last_full_sync++; - replicator_queue_add(queue, user1); - test_assert(replicator_queue_peek(queue, &next_secs) == user2 && next_secs == 0); - - replicator_queue_deinit(&queue); - test_end(); -} - -static void test_replicator_queue_verify_drained(struct replicator_queue *queue) -{ - struct replicator_queue_iter *iter = - replicator_queue_iter_init(queue); - struct replicator_user *user; - while ((user = replicator_queue_iter_next(iter)) != NULL) { - i_assert(user->priority == REPLICATION_PRIORITY_NONE); - i_assert(user->last_sync_failed || - ioloop_time - user->last_full_sync < TEST_REPLICATION_FULL_SYNC_INTERVAL); - } - replicator_queue_iter_deinit(&iter); -} - -static void test_replicator_queue_drain(struct replicator_queue *queue) -{ - struct replicator_user *user; - unsigned int next_secs; - enum replication_priority prev_priority = REPLICATION_PRIORITY_SYNC; - time_t prev_sync = INT_MAX; - - while ((user = replicator_queue_pop(queue, &next_secs)) != NULL) { - if (user->priority < prev_priority) { - prev_sync = INT_MAX; - } else { - test_assert(user->priority == prev_priority); - if (user->priority == REPLICATION_PRIORITY_NONE) { - test_assert(user->last_full_sync <= prev_sync); - prev_sync = user->last_full_sync; - } else { - test_assert(user->last_fast_sync <= prev_sync); - prev_sync = user->last_fast_sync; - } - } - user->priority = REPLICATION_PRIORITY_NONE; - user->last_fast_sync = user->last_full_sync = ioloop_time-1; - /* dsync runs here */ - if (i_rand_limit(5) == 0) - user->last_sync_failed = TRUE; - else { - user->last_successful_sync = ioloop_time; - user->last_sync_failed = FALSE; - } - replicator_queue_push(queue, user); - } - test_replicator_queue_verify_drained(queue); -} - -static void test_replicator_queue_random(void) -{ - struct replicator_queue *queue; - struct replicator_user *user; - - test_begin("replicator queue random"); - queue = replicator_queue_init(TEST_REPLICATION_FULL_SYNC_INTERVAL, - TEST_REPLICATION_FAILURE_RESYNC_INTERVAL); - /* fill some users */ - ioloop_time = time(NULL); - for (unsigned int i = 0; i < 1000; i++) T_BEGIN { - enum replication_priority priority = - i_rand_minmax(REPLICATION_PRIORITY_NONE, - REPLICATION_PRIORITY_SYNC); - const char *username = - t_strdup_printf("test%u", i_rand_minmax(1, 200)); - user = replicator_queue_get(queue, username); - replicator_queue_update(queue, user, priority); - replicator_queue_add(queue, user); - ioloop_time++; - } T_END; - for (unsigned int i = 0; i < 1000; i++) { - test_replicator_queue_drain(queue); - ioloop_time++; - } - replicator_queue_deinit(&queue); - test_end(); -} - -int main(void) -{ - static void (*const test_functions[])(void) = { - test_replicator_queue, - test_replicator_queue_random, - NULL - }; - return test_run(test_functions); -}