]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
dsync: Commit large transactions every 100 new messages.
authorTimo Sirainen <tss@iki.fi>
Mon, 8 Apr 2013 15:14:32 +0000 (18:14 +0300)
committerTimo Sirainen <tss@iki.fi>
Mon, 8 Apr 2013 15:14:32 +0000 (18:14 +0300)
This way if the dsync crashes or transaction fails in the middle, the next
run can finish faster. Also the rollbacking finishes faster.

src/doveadm/dsync/dsync-mailbox-import.c

index 7e956e9326a698ea87aa3ec828f692a7fbcafb2e..f0c90f45d4e0d4c7c49cdaf86027b0aafde9b277 100644 (file)
@@ -13,6 +13,8 @@
 #include "dsync-mailbox.h"
 #include "dsync-mailbox-import.h"
 
+#define DSYNC_COMMIT_MSGS_INTERVAL 100
+
 struct importer_mail {
        const char *guid;
        uint32_t uid;
@@ -79,6 +81,7 @@ struct dsync_mailbox_importer {
 
        ARRAY(struct importer_new_mail *) newmails;
        ARRAY_TYPE(uint32_t) wanted_uids;
+       ARRAY_TYPE(uint32_t) saved_uids;
        uint32_t highest_wanted_uid;
 
        ARRAY(struct dsync_mail_request) mail_requests;
@@ -104,6 +107,8 @@ struct dsync_mailbox_importer {
 static void dsync_mailbox_save_newmails(struct dsync_mailbox_importer *importer,
                                        const struct dsync_mail *mail,
                                        struct importer_new_mail *all_newmails);
+static int dsync_mailbox_import_commit(struct dsync_mailbox_importer *importer,
+                                      bool final);
 
 static void
 dsync_mailbox_import_search_init(struct dsync_mailbox_importer *importer)
@@ -128,6 +133,22 @@ dsync_mailbox_import_search_init(struct dsync_mailbox_importer *importer)
        importer->cur_mail_skip = TRUE;
 }
 
+static void
+dsync_mailbox_import_transaction_begin(struct dsync_mailbox_importer *importer)
+{
+       const enum mailbox_transaction_flags ext_trans_flags =
+               MAILBOX_TRANSACTION_FLAG_SYNC |
+               MAILBOX_TRANSACTION_FLAG_EXTERNAL |
+               MAILBOX_TRANSACTION_FLAG_ASSIGN_UIDS;
+
+       importer->trans = mailbox_transaction_begin(importer->box,
+                                               MAILBOX_TRANSACTION_FLAG_SYNC);
+       importer->ext_trans = mailbox_transaction_begin(importer->box,
+                                                       ext_trans_flags);
+       importer->mail = mail_alloc(importer->trans, 0, NULL);
+       importer->ext_mail = mail_alloc(importer->ext_trans, 0, NULL);
+}
+
 struct dsync_mailbox_importer *
 dsync_mailbox_import_init(struct mailbox *box,
                          struct dsync_transaction_log_scan *log_scan,
@@ -140,10 +161,6 @@ dsync_mailbox_import_init(struct mailbox *box,
                          uint64_t remote_highest_pvt_modseq,
                          enum dsync_mailbox_import_flags flags)
 {
-       const enum mailbox_transaction_flags ext_trans_flags =
-               MAILBOX_TRANSACTION_FLAG_SYNC |
-               MAILBOX_TRANSACTION_FLAG_EXTERNAL |
-               MAILBOX_TRANSACTION_FLAG_ASSIGN_UIDS;
        struct dsync_mailbox_importer *importer;
        struct mailbox_status status;
        pool_t pool;
@@ -170,12 +187,9 @@ dsync_mailbox_import_init(struct mailbox *box,
        i_array_init(&importer->maybe_saves, 128);
        i_array_init(&importer->newmails, 128);
        i_array_init(&importer->wanted_uids, 128);
+       i_array_init(&importer->saved_uids, 128);
 
-       importer->trans = mailbox_transaction_begin(importer->box,
-               MAILBOX_TRANSACTION_FLAG_SYNC);
-       importer->ext_trans = mailbox_transaction_begin(box, ext_trans_flags);
-       importer->mail = mail_alloc(importer->trans, 0, NULL);
-       importer->ext_mail = mail_alloc(importer->ext_trans, 0, NULL);
+       dsync_mailbox_import_transaction_begin(importer);
 
        if ((flags & DSYNC_MAILBOX_IMPORT_FLAG_WANT_MAIL_REQUESTS) != 0) {
                i_array_init(&importer->mail_requests, 128);
@@ -1498,12 +1512,21 @@ dsync_mailbox_import_local_uid(struct dsync_mailbox_importer *importer,
 }
 
 static void
-dsync_mailbox_import_want_uid(struct dsync_mailbox_importer *importer,
-                             uint32_t uid)
+dsync_mailbox_import_saved_uid(struct dsync_mailbox_importer *importer,
+                              uint32_t uid)
 {
+       i_assert(importer->search_ctx == NULL);
+
        if (importer->highest_wanted_uid < uid)
                importer->highest_wanted_uid = uid;
        array_append(&importer->wanted_uids, &uid, 1);
+
+       /* commit the transaction once in a while, so if we fail we don't
+          rollback everything. */
+       if (array_count(&importer->wanted_uids) % DSYNC_COMMIT_MSGS_INTERVAL == 0) {
+               if (dsync_mailbox_import_commit(importer, FALSE) < 0)
+                       importer->failed = TRUE;
+       }
 }
 
 static bool
@@ -1522,7 +1545,7 @@ dsync_msg_change_uid(struct dsync_mailbox_importer *importer,
        mailbox_save_set_uid(save_ctx, new_uid);
        if (mailbox_move(&save_ctx, importer->mail) < 0)
                return FALSE;
-       dsync_mailbox_import_want_uid(importer, new_uid);
+       dsync_mailbox_import_saved_uid(importer, new_uid);
        return TRUE;
 }
 
@@ -1737,6 +1760,15 @@ void dsync_mailbox_import_changes_finish(struct dsync_mailbox_importer *importer
        /* save mails from local sources where possible,
           request the rest from remote */
        dsync_mailbox_import_handle_local_mails(importer);
+
+       if (importer->search_ctx != NULL) {
+               if (mailbox_search_deinit(&importer->search_ctx) < 0) {
+                       i_error("Mailbox %s: Search failed: %s",
+                               mailbox_get_vname(importer->box),
+                               mailbox_get_last_error(importer->box, NULL));
+                       importer->failed = TRUE;
+               }
+       }
 }
 
 const struct dsync_mail_request *
@@ -1872,7 +1904,7 @@ static void dsync_mailbox_save_body(struct dsync_mailbox_importer *importer,
        }
        if (ret > 0) {
                i_assert(save_ctx == NULL);
-               dsync_mailbox_import_want_uid(importer, newmail->final_uid);
+               dsync_mailbox_import_saved_uid(importer, newmail->final_uid);
                return;
        }
        /* fallback to saving from remote stream */
@@ -1920,8 +1952,8 @@ static void dsync_mailbox_save_body(struct dsync_mailbox_importer *importer,
                                mailbox_get_last_error(importer->box, NULL));
                        importer->failed = TRUE;
                } else {
-                       dsync_mailbox_import_want_uid(importer,
-                                                     newmail->final_uid);
+                       dsync_mailbox_import_saved_uid(importer,
+                                                      newmail->final_uid);
                }
        }
 }
@@ -2031,24 +2063,22 @@ reassign_uids_in_seq_range(struct mailbox *box,
 
 static int
 reassign_unwanted_uids(struct dsync_mailbox_importer *importer,
-                      const struct mail_transaction_commit_changes *changes,
                       bool *changes_during_sync_r)
 {
        ARRAY_TYPE(seq_range) unwanted_uids;
-       struct seq_range_iter iter;
-       const uint32_t *wanted_uids;
-       uint32_t saved_uid, highest_seen_uid;
-       unsigned int i, n, wanted_count;
+       const uint32_t *wanted_uids, *saved_uids;
+       uint32_t highest_seen_uid;
+       unsigned int i, wanted_count, saved_count;
        int ret = 0;
 
        wanted_uids = array_get(&importer->wanted_uids, &wanted_count);
-       if (wanted_count == 0) {
-               i_assert(array_count(&changes->saved_uids) == 0);
+       saved_uids = array_get(&importer->saved_uids, &saved_count);
+       i_assert(wanted_count == saved_count);
+       if (wanted_count == 0)
                return 0;
-       }
        /* wanted_uids contains the UIDs we tried to save mails with.
           if nothing changed during dsync, we should have the expected UIDs
-          (changes->saved_uids) and all is well.
+          (saved_uids) and all is well.
 
           if any new messages got inserted during dsync, we'll need to fix up
           the UIDs and let the next dsync fix up the other side. for example:
@@ -2069,14 +2099,11 @@ reassign_unwanted_uids(struct dsync_mailbox_importer *importer,
        i_assert(importer->local_uid_next <= highest_seen_uid);
        seq_range_array_add_range(&unwanted_uids,
                                  importer->local_uid_next, highest_seen_uid);
-       seq_range_array_iter_init(&iter, &changes->saved_uids); i = n = 0;
-       while (seq_range_array_iter_nth(&iter, n++, &saved_uid)) {
+       for (i = 0; i < wanted_count; i++) {
                i_assert(i < wanted_count);
-               if (saved_uid == wanted_uids[i])
-                       seq_range_array_remove(&unwanted_uids, saved_uid);
-               i++;
+               if (saved_uids[i] == wanted_uids[i])
+                       seq_range_array_remove(&unwanted_uids, saved_uids[i]);
        }
-       i_assert(i == wanted_count);
 
        ret = reassign_uids_in_seq_range(importer->box, &unwanted_uids);
        if (ret == 0) {
@@ -2088,12 +2115,17 @@ reassign_unwanted_uids(struct dsync_mailbox_importer *importer,
        return ret < 0 ? -1 : 0;
 }
 
-static int dsync_mailbox_import_commit(struct dsync_mailbox_importer *importer,
-                                      bool *changes_during_sync_r)
+static int
+dsync_mailbox_import_commit(struct dsync_mailbox_importer *importer, bool final)
 {
        struct mail_transaction_commit_changes changes;
-       struct mailbox_update update;
-       int ret = 0;
+       struct seq_range_iter iter;
+       uint32_t uid;
+       unsigned int n;
+       int ret = importer->failed ? -1 : 0;
+
+       mail_free(&importer->mail);
+       mail_free(&importer->ext_mail);
 
        /* commit saves */
        if (mailbox_transaction_commit_get_changes(&importer->ext_trans,
@@ -2101,33 +2133,59 @@ static int dsync_mailbox_import_commit(struct dsync_mailbox_importer *importer,
                i_error("Mailbox %s: Save commit failed: %s",
                        mailbox_get_vname(importer->box),
                        mailbox_get_last_error(importer->box, NULL));
+               /* removed wanted_uids that weren't actually saved */
+               array_delete(&importer->wanted_uids,
+                            array_count(&importer->saved_uids),
+                            array_count(&importer->wanted_uids) -
+                            array_count(&importer->saved_uids));
                mailbox_transaction_rollback(&importer->trans);
-               return -1;
-       }
-
-       /* commit flag changes and expunges */
-       if (mailbox_transaction_commit(&importer->trans) < 0) {
-               i_error("Mailbox %s: Commit failed: %s",
-                       mailbox_get_vname(importer->box),
-                       mailbox_get_last_error(importer->box, NULL));
+               ret = -1;
+       } else {
+               /* remember the UIDs that were successfully saved */
+               seq_range_array_iter_init(&iter, &changes.saved_uids); n = 0;
+               while (seq_range_array_iter_nth(&iter, n++, &uid))
+                       array_append(&importer->saved_uids, &uid, 1);
                pool_unref(&changes.pool);
-               return -1;
+
+               /* commit flag changes and expunges */
+               if (mailbox_transaction_commit(&importer->trans) < 0) {
+                       i_error("Mailbox %s: Commit failed: %s",
+                               mailbox_get_vname(importer->box),
+                               mailbox_get_last_error(importer->box, NULL));
+                       ret = -1;
+               }
        }
 
-       /* update mailbox metadata. */
-       memset(&update, 0, sizeof(update));
-       update.min_next_uid = importer->remote_uid_next;
-       update.min_first_recent_uid =
-               I_MIN(importer->last_common_uid+1,
-                     importer->remote_first_recent_uid);
-       update.min_highest_modseq = importer->remote_highest_modseq;
-       update.min_highest_pvt_modseq = importer->remote_highest_pvt_modseq;
+       if (!final)
+               dsync_mailbox_import_transaction_begin(importer);
+       return ret;
+}
 
-       if (mailbox_update(importer->box, &update) < 0) {
-               i_error("Mailbox %s: Update failed: %s",
-                       mailbox_get_vname(importer->box),
-                       mailbox_get_last_error(importer->box, NULL));
-               ret = -1;
+static int dsync_mailbox_import_finish(struct dsync_mailbox_importer *importer,
+                                      bool *changes_during_sync_r)
+{
+       struct mailbox_update update;
+       int ret;
+
+       ret = dsync_mailbox_import_commit(importer, TRUE);
+
+       if (ret == 0) {
+               /* update mailbox metadata if we successfully saved
+                  everything. */
+               memset(&update, 0, sizeof(update));
+               update.min_next_uid = importer->remote_uid_next;
+               update.min_first_recent_uid =
+                       I_MIN(importer->last_common_uid+1,
+                             importer->remote_first_recent_uid);
+               update.min_highest_modseq = importer->remote_highest_modseq;
+               update.min_highest_pvt_modseq = importer->remote_highest_pvt_modseq;
+
+               if (mailbox_update(importer->box, &update) < 0) {
+                       i_error("Mailbox %s: Update failed: %s",
+                               mailbox_get_vname(importer->box),
+                               mailbox_get_last_error(importer->box, NULL));
+                       ret = -1;
+               }
        }
 
        /* sync mailbox to finish flag changes and expunges. */
@@ -2137,11 +2195,12 @@ static int dsync_mailbox_import_commit(struct dsync_mailbox_importer *importer,
                        mailbox_get_last_error(importer->box, NULL));
                ret = -1;
        }
-
-       if (reassign_unwanted_uids(importer, &changes,
-                                  changes_during_sync_r) < 0)
-               ret = -1;
-       pool_unref(&changes.pool);
+       if (ret == 0) {
+               /* give new UIDs to messages that got saved with unwanted UIDs.
+                  do it only if the whole transaction succeeded. */
+               if (reassign_unwanted_uids(importer, changes_during_sync_r) < 0)
+                       ret = -1;
+       }
        return ret;
 }
 
@@ -2221,10 +2280,7 @@ int dsync_mailbox_import_deinit(struct dsync_mailbox_importer **_importer,
                        importer->failed = TRUE;
                }
        }
-       mail_free(&importer->mail);
-       mail_free(&importer->ext_mail);
-
-       if (dsync_mailbox_import_commit(importer, changes_during_sync_r) < 0)
+       if (dsync_mailbox_import_finish(importer, changes_during_sync_r) < 0)
                importer->failed = TRUE;
 
        hash_table_destroy(&importer->import_guids);
@@ -2232,6 +2288,7 @@ int dsync_mailbox_import_deinit(struct dsync_mailbox_importer **_importer,
        array_free(&importer->maybe_expunge_uids);
        array_free(&importer->maybe_saves);
        array_free(&importer->wanted_uids);
+       array_free(&importer->saved_uids);
        array_free(&importer->newmails);
        if (array_is_created(&importer->mail_requests))
                array_free(&importer->mail_requests);