]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
dsync: Try to commit transactions every dsync_commit_msgs_interval messages
authorTimo Sirainen <timo.sirainen@dovecot.fi>
Sun, 30 Apr 2017 09:31:48 +0000 (12:31 +0300)
committerGitLab <gitlab@git.dovecot.net>
Wed, 17 May 2017 10:35:49 +0000 (13:35 +0300)
This was first attempted to be implemented by
ec0cc8fa647794e44a1afaa448f495a713048dc4, but it was later partially
reverted by 5973d496b16721af6d2c1fa90b016aacddf13554. This current
commit should fix its problems.

src/doveadm/doveadm-dsync.c
src/doveadm/doveadm-settings.c
src/doveadm/doveadm-settings.h
src/doveadm/dsync/dsync-brain-mailbox.c
src/doveadm/dsync/dsync-brain-private.h
src/doveadm/dsync/dsync-brain.c
src/doveadm/dsync/dsync-brain.h
src/doveadm/dsync/dsync-ibc-stream.c
src/doveadm/dsync/dsync-ibc.h
src/doveadm/dsync/dsync-mailbox-import.c
src/doveadm/dsync/dsync-mailbox-import.h

index 75f1229788c4e7240cdd4a3265be7054c2834074..caf569f1d5760190243ebeebb96a957c8ca91a4e 100644 (file)
@@ -93,6 +93,7 @@ struct dsync_cmd_context {
        const char *error;
 
        unsigned int lock_timeout;
+       unsigned int import_commit_msgs_interval;
 
        bool lock:1;
        bool purge_remote:1;
@@ -592,6 +593,7 @@ cmd_dsync_run(struct doveadm_mail_cmd_context *_ctx, struct mail_user *user)
        set.virtual_all_box = ctx->virtual_all_box;
        memcpy(set.sync_box_guid, ctx->mailbox_guid, sizeof(set.sync_box_guid));
        set.lock_timeout_secs = ctx->lock_timeout;
+       set.import_commit_msgs_interval = ctx->import_commit_msgs_interval;
        set.state = ctx->state_input;
        set.mailbox_alt_char = doveadm_settings->dsync_alt_char[0];
 
@@ -1111,6 +1113,7 @@ static struct doveadm_mail_cmd_context *cmd_dsync_alloc(void)
        p_array_init(&ctx->namespace_prefixes, ctx->ctx.pool, 4);
         if ((doveadm_settings->parsed_features & DSYNC_FEATURE_EMPTY_HDR_WORKAROUND) != 0)
                 ctx->empty_hdr_workaround = TRUE;
+       ctx->import_commit_msgs_interval = doveadm_settings->dsync_commit_msgs_interval;
        return &ctx->ctx;
 }
 
index c41d013c0fed22d2509a2a63ee9b6eafc59125c5..9c031b72882ccfded2e9d526d963d70944e28041 100644 (file)
@@ -72,6 +72,7 @@ static const struct setting_define doveadm_setting_defines[] = {
        DEF(SET_STR, director_username_hash),
        DEF(SET_STR, doveadm_api_key),
        DEF(SET_STR, dsync_features),
+       DEF(SET_UINT, dsync_commit_msgs_interval),
        DEF(SET_STR, doveadm_http_rawlog_dir),
 
        { SET_STRLIST, "plugin", offsetof(struct doveadm_settings, plugin_envs), NULL },
@@ -95,6 +96,7 @@ const struct doveadm_settings doveadm_default_settings = {
        .dsync_alt_char = "_",
        .dsync_remote_cmd = "ssh -l%{login} %{host} doveadm dsync-server -u%u -U",
        .dsync_features = "",
+       .dsync_commit_msgs_interval = 100,
        .ssl_client_ca_dir = "",
        .ssl_client_ca_file = "",
        .director_username_hash = "%Lu",
index 8bfa39c7021c0173786bfc1292eef4e51db0390b..2990a91e8ffebd2f55b5f1af935aed82acd75afc 100644 (file)
@@ -29,6 +29,7 @@ struct doveadm_settings {
        const char *director_username_hash;
        const char *doveadm_api_key;
        const char *dsync_features;
+       unsigned int dsync_commit_msgs_interval;
        const char *doveadm_http_rawlog_dir;
        enum dsync_features parsed_features;
        ARRAY(const char *) plugin_envs;
index ab7602390444a41a8252022a128823ea9a3140bd..937338347e1c39d8276bc9a42a642b5195b36682 100644 (file)
@@ -236,6 +236,7 @@ dsync_brain_sync_mailbox_init_remote(struct dsync_brain *brain,
                                          brain->sync_until_timestamp,
                                          brain->sync_max_size,
                                          brain->sync_flag,
+                                         brain->import_commit_msgs_interval,
                                          import_flags);
 }
 
index 1de0313baabe4d257cd33e25a2e72a79db7a5d45..b97b57836449dbdb724e270b147daaf097854ab5 100644 (file)
@@ -62,6 +62,7 @@ struct dsync_brain {
        uoff_t sync_max_size;
        const char *sync_flag;
        char alt_char;
+       unsigned int import_commit_msgs_interval;
 
        unsigned int lock_timeout;
        int lock_fd;
index 17f695a912abd98d828418a78ec5c7612e26761b..cb8d290f9242fc71d167f227cba752848d22c615 100644 (file)
@@ -221,6 +221,7 @@ dsync_brain_master_init(struct mail_user *user, struct dsync_ibc *ibc,
        memcpy(brain->sync_box_guid, set->sync_box_guid,
               sizeof(brain->sync_box_guid));
        brain->lock_timeout = set->lock_timeout_secs;
+       brain->import_commit_msgs_interval = set->import_commit_msgs_interval;
        brain->master_brain = TRUE;
        dsync_brain_set_flags(brain, flags);
 
@@ -260,6 +261,7 @@ dsync_brain_master_init(struct mail_user *user, struct dsync_ibc *ibc,
        ibc_set.sync_type = sync_type;
        ibc_set.hdr_hash_v2 = TRUE;
        ibc_set.lock_timeout = set->lock_timeout_secs;
+       ibc_set.import_commit_msgs_interval = set->import_commit_msgs_interval;
        /* reverse the backup direction for the slave */
        ibc_set.brain_flags = flags & ~(DSYNC_BRAIN_FLAG_BACKUP_SEND |
                                        DSYNC_BRAIN_FLAG_BACKUP_RECV);
index 5027378a11e8771147a4b0738dfc7ca720c0a44f..929b80344bda9a3572f52e68be34979cce1143ca 100644 (file)
@@ -79,6 +79,9 @@ struct dsync_brain_settings {
 
        /* If non-zero, use dsync lock file for this user */
        unsigned int lock_timeout_secs;
+       /* If non-zero, importing will attempt to commit transaction after
+          saving this many messages. */
+       unsigned int import_commit_msgs_interval;
        /* Input state for DSYNC_BRAIN_SYNC_TYPE_STATE */
        const char *state;
 };
index ecd6327cd26f7819b507bc186789842be6b5fc74..eebde83d6d11133d5eac6ce256357d84214d87b2 100644 (file)
@@ -78,7 +78,7 @@ static const struct {
                "send_mail_requests backup_send backup_recv lock_timeout "
                "no_mail_sync no_mailbox_renames no_backup_overwrite purge_remote "
                "no_notify sync_since_timestamp sync_max_size sync_flags sync_until_timestamp"
-               "virtual_all_box empty_hdr_workaround"
+               "virtual_all_box empty_hdr_workaround import_commit_msgs_interval"
        },
        { .name = "mailbox_state",
          .chr = 'S',
@@ -704,6 +704,10 @@ dsync_ibc_stream_send_handshake(struct dsync_ibc *_ibc,
                dsync_serializer_encode_add(encoder, "lock_timeout",
                        t_strdup_printf("%u", set->lock_timeout));
        }
+       if (set->import_commit_msgs_interval > 0) {
+               dsync_serializer_encode_add(encoder, "import_commit_msgs_interval",
+                       t_strdup_printf("%u", set->import_commit_msgs_interval));
+       }
        if (set->sync_since_timestamp > 0) {
                dsync_serializer_encode_add(encoder, "sync_since_timestamp",
                        t_strdup_printf("%ld", (long)set->sync_since_timestamp));
@@ -820,6 +824,14 @@ dsync_ibc_stream_recv_handshake(struct dsync_ibc *_ibc,
                        return DSYNC_IBC_RECV_RET_TRYAGAIN;
                }
        }
+       if (dsync_deserializer_decode_try(decoder, "import_commit_msgs_interval", &value)) {
+               if (str_to_uint(value, &set->import_commit_msgs_interval) < 0 ||
+                   set->import_commit_msgs_interval == 0) {
+                       dsync_ibc_input_error(ibc, decoder,
+                               "Invalid import_commit_msgs_interval: %s", value);
+                       return DSYNC_IBC_RECV_RET_TRYAGAIN;
+               }
+       }
        if (dsync_deserializer_decode_try(decoder, "sync_since_timestamp", &value)) {
                if (str_to_time(value, &set->sync_since_timestamp) < 0 ||
                    set->sync_since_timestamp == 0) {
index db02a04aaed9500be13204df89728a38de646842..7cc8ece9d85fd852f78a0b5b2bcb83bad4520c52 100644 (file)
@@ -69,6 +69,7 @@ struct dsync_ibc_settings {
        enum dsync_brain_flags brain_flags;
        bool hdr_hash_v2;
        unsigned int lock_timeout;
+       unsigned int import_commit_msgs_interval;
 };
 
 void dsync_ibc_init_pipe(struct dsync_ibc **ibc1_r,
index 16ddab8353dd8b286fa2aa7e45bd66a3a95b3c32..f3f7032ae1ff09b57a4c52651023df1566a5626f 100644 (file)
@@ -44,6 +44,7 @@ struct importer_new_mail {
        bool skip:1;
        bool expunged:1;
        bool copy_failed:1;
+       bool saved:1;
 };
 
 /* for quickly testing that two-way sync doesn't actually do any unexpected
@@ -66,6 +67,7 @@ struct dsync_mailbox_importer {
        uoff_t sync_max_size;
        enum mailbox_transaction_flags transaction_flags;
        unsigned int hdr_hash_version;
+       unsigned int commit_msgs_interval;
 
        enum mail_flags sync_flag;
        const char *sync_keyword;
@@ -106,6 +108,7 @@ struct dsync_mailbox_importer {
        uint32_t prev_uid, next_local_seq, local_uid_next;
        uint64_t local_initial_highestmodseq, local_initial_highestpvtmodseq;
        unsigned int import_pos, import_count;
+       unsigned int first_unsaved_idx, saves_since_commit;
 
        enum mail_error mail_error;
 
@@ -222,6 +225,7 @@ dsync_mailbox_import_init(struct mailbox *box,
                          time_t sync_until_timestamp,
                          uoff_t sync_max_size,
                          const char *sync_flag,
+                         unsigned int commit_msgs_interval,
                          enum dsync_mailbox_import_flags flags)
 {
        struct dsync_mailbox_importer *importer;
@@ -257,6 +261,7 @@ dsync_mailbox_import_init(struct mailbox *box,
                else
                        importer->sync_keyword = p_strdup(pool, sync_flag);
        }
+       importer->commit_msgs_interval = commit_msgs_interval;
        importer->transaction_flags = MAILBOX_TRANSACTION_FLAG_SYNC;
        if ((flags & DSYNC_MAILBOX_IMPORT_FLAG_NO_NOTIFY) != 0)
                importer->transaction_flags |= MAILBOX_TRANSACTION_FLAG_NO_NOTIFY;
@@ -1817,6 +1822,17 @@ int dsync_mailbox_import_change(struct dsync_mailbox_importer *importer,
        return importer->failed ? -1 : 0;
 }
 
+static int
+importer_new_mail_final_uid_cmp(struct importer_new_mail *const *newmail1,
+                               struct importer_new_mail *const *newmail2)
+{
+       if ((*newmail1)->final_uid < (*newmail2)->final_uid)
+               return -1;
+       if ((*newmail1)->final_uid > (*newmail2)->final_uid)
+               return 1;
+       return 0;
+}
+
 static void
 dsync_mailbox_import_assign_new_uids(struct dsync_mailbox_importer *importer)
 {
@@ -1829,6 +1845,7 @@ dsync_mailbox_import_assign_new_uids(struct dsync_mailbox_importer *importer)
                newmail = *newmailp;
                if (newmail->skip) {
                        /* already assigned */
+                       i_assert(newmail->final_uid != 0);
                        continue;
                }
 
@@ -1856,6 +1873,9 @@ dsync_mailbox_import_assign_new_uids(struct dsync_mailbox_importer *importer)
        }
        importer->last_common_uid = common_uid_next-1;
        importer->new_uids_assigned = TRUE;
+       /* Sort the newmails by their final_uid. This is used for tracking
+          whether an intermediate commit is allowed. */
+       array_sort(&importer->newmails, importer_new_mail_final_uid_cmp);
 }
 
 static int
@@ -1901,6 +1921,45 @@ dsync_mailbox_import_saved_uid(struct dsync_mailbox_importer *importer,
        array_append(&importer->wanted_uids, &uid, 1);
 }
 
+static void
+dsync_mailbox_import_update_first_saved(struct dsync_mailbox_importer *importer)
+{
+       struct importer_new_mail *const *newmails;
+       unsigned int count;
+
+       newmails = array_get(&importer->newmails, &count);
+       while (importer->first_unsaved_idx < count) {
+               if (!newmails[importer->first_unsaved_idx]->saved)
+                       break;
+               importer->first_unsaved_idx++;
+       }
+}
+
+static void
+dsync_mailbox_import_saved_newmail(struct dsync_mailbox_importer *importer,
+                                  struct importer_new_mail *newmail)
+{
+       dsync_mailbox_import_saved_uid(importer, newmail->final_uid);
+       newmail->saved = TRUE;
+
+       dsync_mailbox_import_update_first_saved(importer);
+       importer->saves_since_commit++;
+       /* we can commit only if all the upcoming mails will have UIDs that
+          are larger than we're committing.
+
+          Note that if any existing UIDs have been changed, the new UID is
+          usually higher than anything that is being saved so we can't do
+          an intermediate commit. It's too much extra work to try to handle
+          that situation. So here this never happens, because then
+          array_count(wanted_uids) is always higher than first_unsaved_idx. */
+       if (importer->saves_since_commit >= importer->commit_msgs_interval &&
+           importer->first_unsaved_idx == array_count(&importer->wanted_uids)) {
+               if (dsync_mailbox_import_commit(importer, FALSE) < 0)
+                       importer->failed = TRUE;
+               importer->saves_since_commit = 0;
+       }
+}
+
 static bool
 dsync_msg_change_uid(struct dsync_mailbox_importer *importer,
                     uint32_t old_uid, uint32_t new_uid)
@@ -2367,7 +2426,7 @@ dsync_mailbox_save_body(struct dsync_mailbox_importer *importer,
        }
        if (ret > 0) {
                i_assert(save_ctx == NULL);
-               dsync_mailbox_import_saved_uid(importer, newmail->final_uid);
+               dsync_mailbox_import_saved_newmail(importer, newmail);
                return TRUE;
        }
        /* fallback to saving from remote stream */
@@ -2447,8 +2506,7 @@ dsync_mailbox_save_body(struct dsync_mailbox_importer *importer,
                                                                &importer->mail_error));
                        importer->failed = TRUE;
                } else {
-                       dsync_mailbox_import_saved_uid(importer,
-                                                      newmail->final_uid);
+                       dsync_mailbox_import_saved_newmail(importer, newmail);
                }
        }
        return TRUE;
index 50d74abf6c3d4a8a9e44b14455e36703c07d7c00..b9babe5c119a1927ff346b6e90700f744850c123 100644 (file)
@@ -36,6 +36,7 @@ dsync_mailbox_import_init(struct mailbox *box,
                          time_t sync_until_timestamp,
                          uoff_t sync_max_size,
                          const char *sync_flag,
+                         unsigned int commit_msgs_interval,
                          enum dsync_mailbox_import_flags flags);
 int dsync_mailbox_import_attribute(struct dsync_mailbox_importer *importer,
                                   const struct dsync_mailbox_attribute *attr);