From: Markus Valentin Date: Thu, 11 Nov 2021 15:17:35 +0000 (+0100) Subject: imapc: Implement rollback for failed copies X-Git-Tag: 2.3.19~31 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=09bfbb4cb12d8ce86713b68a736e5b7144af876e;p=thirdparty%2Fdovecot%2Fcore.git imapc: Implement rollback for failed copies --- diff --git a/src/lib-storage/index/imapc/imapc-save.c b/src/lib-storage/index/imapc/imapc-save.c index dd35633c39..bea825ab5a 100644 --- a/src/lib-storage/index/imapc/imapc-save.c +++ b/src/lib-storage/index/imapc/imapc-save.c @@ -43,6 +43,7 @@ struct imapc_save_cmd_context { }; #define IMAPC_SAVECTX(s) container_of(s, struct imapc_save_context, ctx) +#define IMAPC_SERVER_CMDLINE_MAX_LEN 8000 void imapc_transaction_save_rollback(struct mail_save_context *_ctx); static void imapc_mail_copy_bulk_flush(struct imapc_mailbox *mbox); @@ -204,6 +205,47 @@ imapc_save_noop_callback(const struct imapc_command_reply *reply ATTR_UNUSED, imapc_client_stop(ctx->ctx->mbox->storage->client->client); } +static void +imapc_copy_rollback_store_callback(const struct imapc_command_reply *reply ATTR_UNUSED, + void *context) +{ + struct imapc_save_context *ctx = context; + /* Can't do much about a non successful STORE here */ + if (reply->state != IMAPC_COMMAND_STATE_OK) { + e_error(ctx->src_mbox->box.event, + "imapc: Failed to set \\Deleted flag for rolling back " + "failed copy: %s", reply->text_full); + ctx->src_mbox->rollback_pending = FALSE; + ctx->finished = TRUE; + ctx->failed = TRUE; + } else { + i_assert(ctx->src_mbox->rollback_pending); + } + /* No need stop the imapc client here there is always an additional + expunge callback after this. */ +} + +static void +imapc_copy_rollback_expunge_callback(const struct imapc_command_reply *reply ATTR_UNUSED, + void *context) +{ + struct imapc_save_context *ctx = context; + + /* Can't do much about a non successful EXPUNGE here */ + if (reply->state != IMAPC_COMMAND_STATE_OK) { + e_error(ctx->src_mbox->box.event, + "imapc: Failed to expunge messages for rolling back " + "failed copy: %s", reply->text_full); + ctx->src_mbox->rollback_pending = FALSE; + ctx->finished = TRUE; + ctx->failed = TRUE; + } else { + ctx->finished = TRUE; + ctx->src_mbox->rollback_pending = FALSE; + } + imapc_client_stop(ctx->src_mbox->storage->client->client); +} + static void imapc_append_keywords(string_t *str, struct mail_keywords *kw) { @@ -312,6 +354,7 @@ void imapc_save_cancel(struct mail_save_context *_ctx) struct imapc_save_context *ctx = IMAPC_SAVECTX(_ctx); ctx->failed = TRUE; + (void)imapc_transaction_save_commit_pre(_ctx); (void)imapc_save_finish(_ctx); } @@ -328,14 +371,16 @@ int imapc_transaction_save_commit_pre(struct mail_save_context *_ctx) _ctx->transaction->changes; uint32_t i, last_seq; - i_assert(ctx->finished); + i_assert(ctx->finished || ctx->failed); /* expunge all added messages from index before commit */ last_seq = mail_index_view_get_messages_count(_ctx->transaction->view); + if (last_seq == 0) + return -1; for (i = 0; i < ctx->save_count; i++) mail_index_expunge(ctx->trans, last_seq - i); - if (array_is_created(&ctx->dest_saved_uids)) { + if (!ctx->failed && array_is_created(&ctx->dest_saved_uids)) { changes->uid_validity = ctx->dest_uid_validity; array_append_array(&changes->saved_uids, &ctx->dest_saved_uids); } @@ -369,6 +414,92 @@ void imapc_transaction_save_commit_post(struct mail_save_context *_ctx, imapc_transaction_save_rollback(_ctx); } +static void +imapc_expunge_construct_cmd_str(string_t *store_cmd, + string_t *expunge_cmd, + string_t *uids) +{ + str_append(store_cmd, "UID STORE "); + str_append_str(store_cmd, uids); + str_append(store_cmd, " +FLAGS (\\Deleted)"); + str_append(expunge_cmd, "UID EXPUNGE "); + str_append_str(expunge_cmd, uids); + /* Clear already appened uids */ + str_truncate(uids, 0); +} + +static void +imapc_expunge_send_cmd_str(struct imapc_save_context *ctx, + string_t *uids) +{ + struct imapc_command *store_cmd, *expunge_cmd; + + string_t *store_cmd_str, *expunge_cmd_str; + store_cmd_str = t_str_new(128); + expunge_cmd_str = t_str_new(128); + + imapc_expunge_construct_cmd_str(store_cmd_str, expunge_cmd_str, uids); + /* Make sure line length is less than 8k */ + i_assert(str_len(store_cmd_str) < IMAPC_SERVER_CMDLINE_MAX_LEN); + i_assert(str_len(expunge_cmd_str) < IMAPC_SERVER_CMDLINE_MAX_LEN); + + store_cmd = imapc_client_mailbox_cmd(ctx->src_mbox->client_box, + imapc_copy_rollback_store_callback, + ctx); + expunge_cmd = imapc_client_mailbox_cmd(ctx->src_mbox->client_box, + imapc_copy_rollback_expunge_callback, + ctx); + ctx->src_mbox->rollback_pending = TRUE; + imapc_command_send(store_cmd, str_c(store_cmd_str)); + imapc_command_send(expunge_cmd, str_c(expunge_cmd_str)); +} + +static void +imapc_rollback_send_expunge(struct imapc_save_context *ctx) +{ + string_t *uids_str; + struct seqset_builder *seqset_builder; + struct seq_range_iter iter; + unsigned int i = 0; + uint32_t uid; + + if (!array_not_empty(&ctx->src_mbox->copy_rollback_expunge_uids)) + return; + + uids_str = t_str_new(128); + seqset_builder = seqset_builder_init(uids_str); + seq_range_array_iter_init(&iter, &ctx->src_mbox->copy_rollback_expunge_uids); + + /* Iterate over all uids that must be rolled back */ + while (seq_range_array_iter_nth(&iter, i++, &uid)) { + /* Try to add the to the seqset builder while respecting + the maximum length of IMAPC_SERVER_CMDLINE_MAX_LEN. */ + if (!seqset_builder_try_add(seqset_builder, + IMAPC_SERVER_CMDLINE_MAX_LEN - + strlen("UID STORE +FLAGS (\\Deleted)"), + uid)) { + /* Maximum length is reached send the rollback + and wait for it to be finished. */ + imapc_expunge_send_cmd_str(ctx, uids_str); + while (ctx->src_mbox->rollback_pending) + imapc_mailbox_run_nofetch(ctx->src_mbox); + + /* Truncate the uids_str and create a new + seqset_builder for the next command */ + seqset_builder_deinit(&seqset_builder); + str_truncate(uids_str, 0); + seqset_builder = seqset_builder_init(uids_str); + /* Make sure the current uid which is part of + the next uid_str */ + seqset_builder_add(seqset_builder, uid); + } + } + if (str_len(uids_str) > 0) + imapc_expunge_send_cmd_str(ctx, uids_str); + while (ctx->src_mbox->rollback_pending) + imapc_mailbox_run_nofetch(ctx->src_mbox); +} + static void imapc_copy_bulk_ctx_deinit(struct imapc_save_context *ctx) { /* Clean up the pending copy and the context attached to it */ @@ -380,18 +511,40 @@ void imapc_transaction_save_rollback(struct mail_save_context *_ctx) { struct imapc_save_context *ctx = IMAPC_SAVECTX(_ctx); - /* FIXME: if we really want to rollback, we should expunge messages - we already saved */ + if ((ctx->src_mbox != NULL && ctx->src_mbox->pending_copy_request != NULL) || + !ctx->finished) { + /* There is still a pending copy which should not be send + as rollback() is called or the transaction has not yet + finished and rollback is called */ + ctx->failed = TRUE; + (void)imapc_transaction_save_commit_pre(_ctx); + + /* Clean up the pending copy and the context attached to it */ + if (ctx->src_mbox->pending_copy_request != NULL) + seqset_builder_deinit(&ctx->src_mbox->pending_copy_request->uidset_builder); + + imapc_copy_bulk_ctx_deinit(ctx); + i_free(ctx->src_mbox->pending_copy_request); + + imapc_client_stop(ctx->src_mbox->storage->client->client); + } - if (!ctx->finished) - imapc_save_cancel(_ctx); + /* Expunge all added messages from index */ + if (ctx->failed && array_is_created(&ctx->dest_saved_uids)) { + i_assert(ctx->src_mbox != NULL); + seq_range_array_merge(&ctx->src_mbox->copy_rollback_expunge_uids, &ctx->dest_saved_uids); + /* Make sure context is not finished already */ + ctx->finished = FALSE; + imapc_rollback_send_expunge(ctx); + array_free(&ctx->dest_saved_uids); + } - if (array_is_created(&ctx->dest_saved_uids)) + if (ctx->finished || ctx->failed) { array_free(&ctx->dest_saved_uids); - i_free(ctx); + i_free(ctx); + } } - static bool imapc_save_copyuid(struct imapc_save_context *ctx, const struct imapc_command_reply *reply, uint32_t *uid_r) diff --git a/src/lib-storage/index/imapc/imapc-storage.c b/src/lib-storage/index/imapc/imapc-storage.c index 8b7edbe00a..43d90de90b 100644 --- a/src/lib-storage/index/imapc/imapc-storage.c +++ b/src/lib-storage/index/imapc/imapc-storage.c @@ -540,6 +540,7 @@ imapc_mailbox_alloc(struct mail_storage *storage, struct mailbox_list *list, p_array_init(&mbox->fetch_requests, pool, 16); p_array_init(&mbox->untagged_fetch_contexts, pool, 16); p_array_init(&mbox->delayed_expunged_uids, pool, 16); + p_array_init(&mbox->copy_rollback_expunge_uids, pool, 16); mbox->pending_fetch_cmd = str_new(pool, 128); mbox->pending_copy_cmd = str_new(pool, 128); mbox->prev_mail_cache.fd = -1; diff --git a/src/lib-storage/index/imapc/imapc-storage.h b/src/lib-storage/index/imapc/imapc-storage.h index 8af437455e..d5e51bcb1b 100644 --- a/src/lib-storage/index/imapc/imapc-storage.h +++ b/src/lib-storage/index/imapc/imapc-storage.h @@ -152,6 +152,7 @@ struct imapc_mailbox { ARRAY(uint64_t) rseq_modseqs; ARRAY_TYPE(seq_range) delayed_expunged_uids; + ARRAY_TYPE(seq_range) copy_rollback_expunge_uids; uint32_t sync_uid_validity; uint32_t sync_uid_next; uint64_t sync_highestmodseq; @@ -179,6 +180,7 @@ struct imapc_mailbox { bool exists_received:1; bool state_fetching_uid1:1; bool state_fetched_success:1; + bool rollback_pending:1; }; struct imapc_simple_context {