From: Markus Valentin Date: Thu, 11 Nov 2021 13:52:54 +0000 (+0100) Subject: imapc: Implement bulk copying for imapc X-Git-Tag: 2.3.19~34 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=79b5dc60246f4db64f4e0f48dcd53c55bb20858c;p=thirdparty%2Fdovecot%2Fcore.git imapc: Implement bulk copying for imapc --- diff --git a/src/lib-storage/index/imapc/imapc-save.c b/src/lib-storage/index/imapc/imapc-save.c index b910e1d4c2..1d9af003d3 100644 --- a/src/lib-storage/index/imapc/imapc-save.c +++ b/src/lib-storage/index/imapc/imapc-save.c @@ -7,6 +7,8 @@ #include "ostream.h" #include "imap-date.h" #include "imap-util.h" +#include "imap-seqset.h" +#include "imap-quote.h" #include "index-mail.h" #include "mail-copy.h" #include "mailbox-list-private.h" @@ -14,11 +16,13 @@ #include "imapc-storage.h" #include "imapc-sync.h" #include "imapc-mail.h" +#include "seq-set-builder.h" struct imapc_save_context { struct mail_save_context ctx; struct imapc_mailbox *mbox; + struct imapc_mailbox *src_mbox; struct mail_index_transaction *trans; int fd; @@ -41,6 +45,7 @@ struct imapc_save_cmd_context { #define IMAPC_SAVECTX(s) container_of(s, struct imapc_save_context, ctx) void imapc_transaction_save_rollback(struct mail_save_context *_ctx); +static void imapc_mail_copy_bulk_flush(struct imapc_mailbox *mbox); struct mail_save_context * imapc_save_alloc(struct mailbox_transaction_context *t) @@ -54,6 +59,7 @@ imapc_save_alloc(struct mailbox_transaction_context *t) ctx = i_new(struct imapc_save_context, 1); ctx->ctx.transaction = t; ctx->mbox = mbox; + ctx->src_mbox = NULL; ctx->trans = t->itrans; ctx->fd = -1; t->save_ctx = &ctx->ctx; @@ -309,6 +315,12 @@ void imapc_save_cancel(struct mail_save_context *_ctx) (void)imapc_save_finish(_ctx); } +static void imapc_copy_bulk_finish(struct imapc_save_context *ctx) +{ + while (ctx->src_mbox != NULL && ctx->src_mbox->pending_copy_request != NULL) + imapc_mailbox_run_nofetch(ctx->src_mbox); +} + int imapc_transaction_save_commit_pre(struct mail_save_context *_ctx) { struct imapc_save_context *ctx = IMAPC_SAVECTX(_ctx); @@ -330,12 +342,40 @@ int imapc_transaction_save_commit_pre(struct mail_save_context *_ctx) return 0; } +int imapc_transaction_save_commit(struct mailbox_transaction_context *t) +{ + struct imapc_save_context *ctx = NULL; + struct imapc_mailbox *src_mbox = NULL; + + if (t->save_ctx != NULL) { + ctx = IMAPC_SAVECTX(t->save_ctx); + src_mbox = ctx->src_mbox; + } + + if (src_mbox != NULL && src_mbox->pending_copy_request != NULL) { + /* If there is still a copy command to send flush it now */ + imapc_mail_copy_bulk_flush(src_mbox); + imapc_copy_bulk_finish(ctx); + } + + if (ctx != NULL) + return ctx->failed ? -1 : 0; + return 0; +} + void imapc_transaction_save_commit_post(struct mail_save_context *_ctx, struct mail_index_transaction_commit_result *result ATTR_UNUSED) { imapc_transaction_save_rollback(_ctx); } +static void imapc_copy_bulk_ctx_deinit(struct imapc_save_context *ctx) +{ + /* Clean up the pending copy and the context attached to it */ + str_truncate(ctx->src_mbox->pending_copy_cmd, 0); + i_free(ctx->src_mbox->copy_dest_box); +} + void imapc_transaction_save_rollback(struct mail_save_context *_ctx) { struct imapc_save_context *ctx = IMAPC_SAVECTX(_ctx); @@ -379,6 +419,54 @@ static void imapc_save_copyuid(struct imapc_save_context *ctx, } } +static bool imapc_save_bulk_copyuid(struct imapc_save_context *ctx, + const struct imapc_command_reply *reply) +{ + ARRAY_TYPE(seq_range) dest_uidset, source_uidset; + const char *const *args; + uint32_t uid_validity; + + /* */ + args = t_strsplit(reply->resp_text_value, " "); + if (str_array_length(args) != 3) + return FALSE; + + if (str_to_uint32(args[0], &uid_validity) < 0) + return FALSE; + if (ctx->dest_uid_validity == 0) + ctx->dest_uid_validity = uid_validity; + else if (ctx->dest_uid_validity != uid_validity) + return FALSE; + + t_array_init(&source_uidset, 8); + t_array_init(&dest_uidset, 8); + + if (imap_seq_set_nostar_parse(args[1], &source_uidset) < 0) + return FALSE; + if (imap_seq_set_nostar_parse(args[2], &dest_uidset) < 0) + return FALSE; + + if (!array_is_created(&ctx->dest_saved_uids)) + i_array_init(&ctx->dest_saved_uids, 8); + + seq_range_array_merge(&ctx->dest_saved_uids, &dest_uidset); + return TRUE; +} + +static void imapc_copy_set_error(struct imapc_save_context *sctx, + const struct imapc_command_reply *reply) +{ + sctx->failed = TRUE; + + if (reply->state != IMAPC_COMMAND_STATE_BAD) + imapc_copy_error_from_reply(sctx->mbox->storage, + MAIL_ERROR_PARAMS, reply); + else + mailbox_set_critical(&sctx->mbox->box, + "imapc: COPY failed: %s", + reply->text_full); +} + static void imapc_copy_simple_callback(const struct imapc_command_reply *reply, void *context) @@ -409,27 +497,152 @@ imapc_copy_simple(struct mail_save_context *_ctx, struct mail *mail) { struct imapc_save_context *ctx = IMAPC_SAVECTX(_ctx); struct mailbox_transaction_context *_t = _ctx->transaction; - struct imapc_mailbox *src_mbox = IMAPC_MAILBOX(mail->box); struct imapc_save_cmd_context sctx; struct imapc_command *cmd; sctx.ret = -2; sctx.ctx = ctx; - cmd = imapc_client_mailbox_cmd(src_mbox->client_box, + cmd = imapc_client_mailbox_cmd(ctx->src_mbox->client_box, imapc_copy_simple_callback, &sctx); imapc_command_sendf(cmd, "UID COPY %u %s", mail->uid, _t->box->name); while (sctx.ret == -2) - imapc_mailbox_run(src_mbox); + imapc_mailbox_run(ctx->src_mbox); ctx->finished = TRUE; return sctx.ret; } +static void imapc_copy_bulk_callback(const struct imapc_command_reply *reply, + void *context) +{ + struct imapc_copy_request *request = context; + struct imapc_save_context *ctx = request->sctx; + struct imapc_mailbox *mbox = ctx->src_mbox; + + i_assert(mbox != NULL); + i_assert(request == mbox->pending_copy_request); + + /* Check the reply state and add uid's to index and + dest_saved_uids. */ + if (ctx->failed) { + /* If the saving already failed try to find UIDs already + copied from the reply so that rollback can expunge + them */ + if (null_strcasecmp(reply->resp_text_key, "COPYUID") == 0) { + (void)imapc_save_bulk_copyuid(ctx, reply); + imapc_transaction_save_rollback(&ctx->ctx); + } + } else if (reply->state == IMAPC_COMMAND_STATE_OK) { + if (reply->resp_text_key != NULL && + strcasecmp(reply->resp_text_key, "COPYUID") == 0 && + imapc_save_bulk_copyuid(ctx, reply)) { + ctx->finished = TRUE; + } + } else { + imapc_copy_set_error(ctx, reply); + } + + ctx->src_mbox->pending_copy_request = NULL; + i_free(request); + imapc_client_stop(mbox->storage->client->client); +} + +static void imapc_mail_copy_bulk_flush(struct imapc_mailbox *mbox) +{ + struct imapc_command *cmd; + + i_assert(mbox != NULL); + i_assert(mbox->pending_copy_request != NULL); + i_assert(mbox->client_box != NULL); + + cmd = imapc_client_mailbox_cmd(mbox->client_box, + imapc_copy_bulk_callback, + mbox->pending_copy_request); + + seqset_builder_deinit(&mbox->pending_copy_request->uidset_builder); + + str_append(mbox->pending_copy_cmd, " "); + imap_append_astring(mbox->pending_copy_cmd, mbox->copy_dest_box); + + imapc_command_send(cmd, str_c(mbox->pending_copy_cmd)); + + imapc_copy_bulk_ctx_deinit(mbox->pending_copy_request->sctx); +} + +static bool +imapc_mail_copy_bulk_try_merge(struct imapc_mailbox *mbox, uint32_t uid, + const char *box) +{ + i_assert(str_begins(str_c(mbox->pending_copy_cmd), "UID COPY ")); + + if (strcmp(box, mbox->copy_dest_box) != 0) { + /* Not the same mailbox merging not possible */ + return FALSE; + } + return seqset_builder_try_add(mbox->pending_copy_request->uidset_builder, + IMAPC_SERVER_CMDLINE_MAX_LEN, uid); +} + +static void +imapc_mail_copy_bulk_delayed_send_or_merge(struct imapc_save_context *ctx, + uint32_t uid, + const char *box) +{ + struct imapc_mailbox *mbox = ctx->src_mbox; + + if (mbox->pending_copy_request != NULL && + !imapc_mail_copy_bulk_try_merge(mbox, uid, box)) { + /* send the previous COPY and create new one after + waiting for this one to be finished. */ + imapc_mail_copy_bulk_flush(mbox); + imapc_copy_bulk_finish(mbox->pending_copy_request->sctx); + } + if (mbox->pending_copy_request == NULL) { + mbox->pending_copy_request = + i_new(struct imapc_copy_request, 1); + str_printfa(mbox->pending_copy_cmd, "UID COPY "); + mbox->pending_copy_request->uidset_builder = + seqset_builder_init(mbox->pending_copy_cmd); + seqset_builder_add(mbox->pending_copy_request->uidset_builder, + uid); + mbox->copy_dest_box = i_strdup(box); + } else { + i_assert(mbox->pending_copy_request->sctx == ctx); + } + mbox->pending_copy_request->sctx = ctx; +} + +static int +imapc_copy_bulk(struct imapc_save_context *ctx, struct mail *mail) +{ + struct imapc_mailbox *mbox = IMAPC_MAILBOX(ctx->ctx.transaction->box); + + imapc_mail_copy_bulk_delayed_send_or_merge(ctx, mail->uid, + imapc_mailbox_get_remote_name(mbox)); + imapc_save_add_to_index(ctx, 0); + + return ctx->failed ? -1 : 0; +} + +static bool imapc_is_mail_expunged(struct imapc_mailbox *mbox, uint32_t uid) +{ + if (array_is_created(&mbox->delayed_expunged_uids) && + seq_range_exists(&mbox->delayed_expunged_uids, uid)) + return TRUE; + if (mbox->delayed_sync_trans == NULL) + return FALSE; + + struct mail_index_view *view = + mail_index_transaction_get_view(mbox->delayed_sync_trans); + uint32_t seq; + return mail_index_lookup_seq(view, uid, &seq) && + mail_index_transaction_is_expunged(mbox->delayed_sync_trans, seq); +} + int imapc_copy(struct mail_save_context *_ctx, struct mail *mail) { struct imapc_save_context *ctx = IMAPC_SAVECTX(_ctx); struct mailbox_transaction_context *_t = _ctx->transaction; - struct imapc_mailbox *src_mbox; struct imapc_msgmap *src_msgmap; uint32_t rseq; int ret; @@ -437,10 +650,15 @@ int imapc_copy(struct mail_save_context *_ctx, struct mail *mail) i_assert((_t->flags & MAILBOX_TRANSACTION_FLAG_EXTERNAL) != 0); if (_t->box->storage == mail->box->storage) { - src_mbox = IMAPC_MAILBOX(mail->box); + /* Currently we don't support copying mails from multiple + different source mailboxes within the same transaction. */ + i_assert(ctx->src_mbox == NULL || &ctx->src_mbox->box == mail->box); + ctx->src_mbox = IMAPC_MAILBOX(mail->box); + if (!mail->expunged && imapc_is_mail_expunged(ctx->mbox, mail->uid)) + mail_set_expunged(mail); /* same server, we can use COPY for the mail */ src_msgmap = - imapc_client_mailbox_get_msgmap(src_mbox->client_box); + imapc_client_mailbox_get_msgmap(ctx->src_mbox->client_box); if (mail->expunged || !imapc_msgmap_uid_to_rseq(src_msgmap, mail->uid, &rseq)) { mail_storage_set_error(mail->box->storage, diff --git a/src/lib-storage/index/imapc/imapc-storage.c b/src/lib-storage/index/imapc/imapc-storage.c index d0aca8ab15..8b7edbe00a 100644 --- a/src/lib-storage/index/imapc/imapc-storage.c +++ b/src/lib-storage/index/imapc/imapc-storage.c @@ -541,6 +541,7 @@ imapc_mailbox_alloc(struct mail_storage *storage, struct mailbox_list *list, p_array_init(&mbox->untagged_fetch_contexts, pool, 16); p_array_init(&mbox->delayed_expunged_uids, pool, 16); mbox->pending_fetch_cmd = str_new(pool, 128); + mbox->pending_copy_cmd = str_new(pool, 128); mbox->prev_mail_cache.fd = -1; imapc_mailbox_register_callbacks(mbox); return &mbox->box; @@ -1282,6 +1283,15 @@ struct mail_storage imapc_storage = { } }; +static int +imapc_mailbox_transaction_commit(struct mailbox_transaction_context *t, + struct mail_transaction_commit_changes *changes_r) +{ + int ret = imapc_transaction_save_commit(t); + int ret2 = index_transaction_commit(t, changes_r); + return ret >= 0 && ret2 >= 0 ? 0 : -1; +} + struct mailbox imapc_mailbox = { .v = { index_storage_is_readonly, @@ -1310,7 +1320,7 @@ struct mailbox imapc_mailbox = { NULL, imapc_notify_changes, index_transaction_begin, - index_transaction_commit, + imapc_mailbox_transaction_commit, index_transaction_rollback, NULL, imapc_mail_alloc, diff --git a/src/lib-storage/index/imapc/imapc-storage.h b/src/lib-storage/index/imapc/imapc-storage.h index 6040c3389a..8af437455e 100644 --- a/src/lib-storage/index/imapc/imapc-storage.h +++ b/src/lib-storage/index/imapc/imapc-storage.h @@ -113,6 +113,11 @@ struct imapc_untagged_fetch_ctx { bool have_flags:1; }; +struct imapc_copy_request { + struct imapc_save_context *sctx; + struct seqset_builder *uidset_builder; +}; + struct imapc_mailbox { struct mailbox box; struct imapc_storage *storage; @@ -131,7 +136,12 @@ struct imapc_mailbox { sending soon (but still waiting to see if we can increase its UID range) */ string_t *pending_fetch_cmd; + /* if non-empty, contains the latest COPY command we're going to be + sending soon. */ + string_t *pending_copy_cmd; + char *copy_dest_box; struct imapc_fetch_request *pending_fetch_request; + struct imapc_copy_request *pending_copy_request; struct timeout *to_pending_fetch_send; ARRAY(struct imapc_mailbox_event_callback) untagged_callbacks; @@ -195,6 +205,7 @@ int imapc_save_finish(struct mail_save_context *ctx); void imapc_save_cancel(struct mail_save_context *ctx); int imapc_copy(struct mail_save_context *ctx, struct mail *mail); +int imapc_transaction_save_commit(struct mailbox_transaction_context *t); int imapc_transaction_save_commit_pre(struct mail_save_context *ctx); void imapc_transaction_save_commit_post(struct mail_save_context *ctx, struct mail_index_transaction_commit_result *result);