]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
imapc: Implement rollback for failed copies
authorMarkus Valentin <markus.valentin@open-xchange.com>
Thu, 11 Nov 2021 15:17:35 +0000 (16:17 +0100)
committerMarkus Valentin <markus.valentin@open-xchange.com>
Mon, 14 Mar 2022 13:06:49 +0000 (14:06 +0100)
src/lib-storage/index/imapc/imapc-save.c
src/lib-storage/index/imapc/imapc-storage.c
src/lib-storage/index/imapc/imapc-storage.h

index dd35633c39d9c7b7110d3138dc3e486bbfb4cf72..bea825ab5a334b44ef86ad7d64f5a7572adaf6f7 100644 (file)
@@ -43,6 +43,7 @@ struct imapc_save_cmd_context {
 };
 
 #define IMAPC_SAVECTX(s)       container_of(s, struct imapc_save_context, ctx)
+#define IMAPC_SERVER_CMDLINE_MAX_LEN   8000
 
 void imapc_transaction_save_rollback(struct mail_save_context *_ctx);
 static void imapc_mail_copy_bulk_flush(struct imapc_mailbox *mbox);
@@ -204,6 +205,47 @@ imapc_save_noop_callback(const struct imapc_command_reply *reply ATTR_UNUSED,
        imapc_client_stop(ctx->ctx->mbox->storage->client->client);
 }
 
+static void
+imapc_copy_rollback_store_callback(const struct imapc_command_reply *reply ATTR_UNUSED,
+                                  void *context)
+{
+       struct imapc_save_context *ctx = context;
+       /* Can't do much about a non successful STORE here */
+       if (reply->state != IMAPC_COMMAND_STATE_OK) {
+               e_error(ctx->src_mbox->box.event,
+                       "imapc: Failed to set \\Deleted flag for rolling back "
+                       "failed copy: %s", reply->text_full);
+               ctx->src_mbox->rollback_pending = FALSE;
+               ctx->finished = TRUE;
+               ctx->failed = TRUE;
+       } else {
+               i_assert(ctx->src_mbox->rollback_pending);
+       }
+       /* No need stop the imapc client here there is always an additional
+          expunge callback after this. */
+}
+
+static void
+imapc_copy_rollback_expunge_callback(const struct imapc_command_reply *reply ATTR_UNUSED,
+                                    void *context)
+{
+       struct imapc_save_context *ctx = context;
+
+       /* Can't do much about a non successful EXPUNGE here */
+       if (reply->state != IMAPC_COMMAND_STATE_OK) {
+               e_error(ctx->src_mbox->box.event,
+                       "imapc: Failed to expunge messages for rolling back "
+                       "failed copy: %s", reply->text_full);
+               ctx->src_mbox->rollback_pending = FALSE;
+               ctx->finished = TRUE;
+               ctx->failed = TRUE;
+       } else {
+               ctx->finished = TRUE;
+               ctx->src_mbox->rollback_pending = FALSE;
+       }
+       imapc_client_stop(ctx->src_mbox->storage->client->client);
+}
+
 static void
 imapc_append_keywords(string_t *str, struct mail_keywords *kw)
 {
@@ -312,6 +354,7 @@ void imapc_save_cancel(struct mail_save_context *_ctx)
        struct imapc_save_context *ctx = IMAPC_SAVECTX(_ctx);
 
        ctx->failed = TRUE;
+       (void)imapc_transaction_save_commit_pre(_ctx);
        (void)imapc_save_finish(_ctx);
 }
 
@@ -328,14 +371,16 @@ int imapc_transaction_save_commit_pre(struct mail_save_context *_ctx)
                _ctx->transaction->changes;
        uint32_t i, last_seq;
 
-       i_assert(ctx->finished);
+       i_assert(ctx->finished || ctx->failed);
 
        /* expunge all added messages from index before commit */
        last_seq = mail_index_view_get_messages_count(_ctx->transaction->view);
+       if (last_seq == 0)
+               return -1;
        for (i = 0; i < ctx->save_count; i++)
                mail_index_expunge(ctx->trans, last_seq - i);
 
-       if (array_is_created(&ctx->dest_saved_uids)) {
+       if (!ctx->failed && array_is_created(&ctx->dest_saved_uids)) {
                changes->uid_validity = ctx->dest_uid_validity;
                array_append_array(&changes->saved_uids, &ctx->dest_saved_uids);
        }
@@ -369,6 +414,92 @@ void imapc_transaction_save_commit_post(struct mail_save_context *_ctx,
        imapc_transaction_save_rollback(_ctx);
 }
 
+static void
+imapc_expunge_construct_cmd_str(string_t *store_cmd,
+                               string_t *expunge_cmd,
+                               string_t *uids)
+{
+       str_append(store_cmd, "UID STORE ");
+       str_append_str(store_cmd, uids);
+       str_append(store_cmd, " +FLAGS (\\Deleted)");
+       str_append(expunge_cmd, "UID EXPUNGE ");
+       str_append_str(expunge_cmd, uids);
+       /* Clear already appened uids */
+       str_truncate(uids, 0);
+}
+
+static void
+imapc_expunge_send_cmd_str(struct imapc_save_context *ctx,
+                          string_t *uids)
+{
+       struct imapc_command *store_cmd, *expunge_cmd;
+
+       string_t *store_cmd_str, *expunge_cmd_str;
+       store_cmd_str = t_str_new(128);
+       expunge_cmd_str = t_str_new(128);
+
+       imapc_expunge_construct_cmd_str(store_cmd_str, expunge_cmd_str, uids);
+       /* Make sure line length is less than 8k */
+       i_assert(str_len(store_cmd_str) < IMAPC_SERVER_CMDLINE_MAX_LEN);
+       i_assert(str_len(expunge_cmd_str) < IMAPC_SERVER_CMDLINE_MAX_LEN);
+
+       store_cmd = imapc_client_mailbox_cmd(ctx->src_mbox->client_box,
+                                            imapc_copy_rollback_store_callback,
+                                            ctx);
+       expunge_cmd = imapc_client_mailbox_cmd(ctx->src_mbox->client_box,
+                                              imapc_copy_rollback_expunge_callback,
+                                              ctx);
+       ctx->src_mbox->rollback_pending = TRUE;
+       imapc_command_send(store_cmd, str_c(store_cmd_str));
+       imapc_command_send(expunge_cmd, str_c(expunge_cmd_str));
+}
+
+static void
+imapc_rollback_send_expunge(struct imapc_save_context *ctx)
+{
+       string_t *uids_str;
+       struct seqset_builder *seqset_builder;
+       struct seq_range_iter iter;
+       unsigned int i = 0;
+       uint32_t uid;
+
+       if (!array_not_empty(&ctx->src_mbox->copy_rollback_expunge_uids))
+               return;
+
+       uids_str = t_str_new(128);
+       seqset_builder = seqset_builder_init(uids_str);
+       seq_range_array_iter_init(&iter, &ctx->src_mbox->copy_rollback_expunge_uids);
+
+       /* Iterate over all uids that must be rolled back */
+       while (seq_range_array_iter_nth(&iter, i++, &uid)) {
+               /* Try to add the to the seqset builder while respecting
+                  the maximum length of IMAPC_SERVER_CMDLINE_MAX_LEN. */
+               if (!seqset_builder_try_add(seqset_builder,
+                                           IMAPC_SERVER_CMDLINE_MAX_LEN -
+                                           strlen("UID STORE  +FLAGS (\\Deleted)"),
+                                           uid)) {
+                       /* Maximum length is reached send the rollback
+                          and wait for it to be finished. */
+                       imapc_expunge_send_cmd_str(ctx, uids_str);
+                       while (ctx->src_mbox->rollback_pending)
+                               imapc_mailbox_run_nofetch(ctx->src_mbox);
+
+                       /* Truncate the uids_str and create a new
+                          seqset_builder for the next command */
+                       seqset_builder_deinit(&seqset_builder);
+                       str_truncate(uids_str, 0);
+                       seqset_builder = seqset_builder_init(uids_str);
+                       /* Make sure the current uid which is part of
+                          the next uid_str */
+                       seqset_builder_add(seqset_builder, uid);
+               }
+       }
+       if (str_len(uids_str) > 0)
+               imapc_expunge_send_cmd_str(ctx, uids_str);
+       while (ctx->src_mbox->rollback_pending)
+               imapc_mailbox_run_nofetch(ctx->src_mbox);
+}
+
 static void imapc_copy_bulk_ctx_deinit(struct imapc_save_context *ctx)
 {
        /* Clean up the pending copy and the context attached to it */
@@ -380,18 +511,40 @@ void imapc_transaction_save_rollback(struct mail_save_context *_ctx)
 {
        struct imapc_save_context *ctx = IMAPC_SAVECTX(_ctx);
 
-       /* FIXME: if we really want to rollback, we should expunge messages
-          we already saved */
+       if ((ctx->src_mbox != NULL && ctx->src_mbox->pending_copy_request != NULL) ||
+          !ctx->finished) {
+              /* There is still a pending copy which should not be send
+                 as rollback() is called or the transaction has not yet
+                 finished and rollback is called */
+              ctx->failed = TRUE;
+              (void)imapc_transaction_save_commit_pre(_ctx);
+
+              /* Clean up the pending copy and the context attached to it */
+              if (ctx->src_mbox->pending_copy_request != NULL)
+                      seqset_builder_deinit(&ctx->src_mbox->pending_copy_request->uidset_builder);
+
+              imapc_copy_bulk_ctx_deinit(ctx);
+              i_free(ctx->src_mbox->pending_copy_request);
+
+              imapc_client_stop(ctx->src_mbox->storage->client->client);
+       }
 
-       if (!ctx->finished)
-               imapc_save_cancel(_ctx);
+       /* Expunge all added messages from index */
+       if (ctx->failed && array_is_created(&ctx->dest_saved_uids)) {
+               i_assert(ctx->src_mbox != NULL);
+               seq_range_array_merge(&ctx->src_mbox->copy_rollback_expunge_uids, &ctx->dest_saved_uids);
+               /* Make sure context is not finished already */
+               ctx->finished = FALSE;
+               imapc_rollback_send_expunge(ctx);
+               array_free(&ctx->dest_saved_uids);
+       }
 
-       if (array_is_created(&ctx->dest_saved_uids))
+       if (ctx->finished || ctx->failed) {
                array_free(&ctx->dest_saved_uids);
-       i_free(ctx);
+               i_free(ctx);
+       }
 }
 
-
 static bool imapc_save_copyuid(struct imapc_save_context *ctx,
                               const struct imapc_command_reply *reply,
                               uint32_t *uid_r)
index 8b7edbe00ae1b3e5de76c552004d7b5a16468cc0..43d90de90bc6249bafeaa9dc3fadda609300890b 100644 (file)
@@ -540,6 +540,7 @@ imapc_mailbox_alloc(struct mail_storage *storage, struct mailbox_list *list,
        p_array_init(&mbox->fetch_requests, pool, 16);
        p_array_init(&mbox->untagged_fetch_contexts, pool, 16);
        p_array_init(&mbox->delayed_expunged_uids, pool, 16);
+       p_array_init(&mbox->copy_rollback_expunge_uids, pool, 16);
        mbox->pending_fetch_cmd = str_new(pool, 128);
        mbox->pending_copy_cmd = str_new(pool, 128);
        mbox->prev_mail_cache.fd = -1;
index 8af437455e7addd7c469636ee00aebb12df329e8..d5e51bcb1b29c53b0f08abe28d0848da27779648 100644 (file)
@@ -152,6 +152,7 @@ struct imapc_mailbox {
 
        ARRAY(uint64_t) rseq_modseqs;
        ARRAY_TYPE(seq_range) delayed_expunged_uids;
+       ARRAY_TYPE(seq_range) copy_rollback_expunge_uids;
        uint32_t sync_uid_validity;
        uint32_t sync_uid_next;
        uint64_t sync_highestmodseq;
@@ -179,6 +180,7 @@ struct imapc_mailbox {
        bool exists_received:1;
        bool state_fetching_uid1:1;
        bool state_fetched_success:1;
+       bool rollback_pending:1;
 };
 
 struct imapc_simple_context {