]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
imapc: Implement bulk copying for imapc
authorMarkus Valentin <markus.valentin@open-xchange.com>
Thu, 11 Nov 2021 13:52:54 +0000 (14:52 +0100)
committerMarkus Valentin <markus.valentin@open-xchange.com>
Mon, 14 Mar 2022 13:06:49 +0000 (14:06 +0100)
src/lib-storage/index/imapc/imapc-save.c
src/lib-storage/index/imapc/imapc-storage.c
src/lib-storage/index/imapc/imapc-storage.h

index b910e1d4c2053e29c1d1bb0b155fe92646afe31a..1d9af003d30ef26703e09a714112de8f857fff46 100644 (file)
@@ -7,6 +7,8 @@
 #include "ostream.h"
 #include "imap-date.h"
 #include "imap-util.h"
+#include "imap-seqset.h"
+#include "imap-quote.h"
 #include "index-mail.h"
 #include "mail-copy.h"
 #include "mailbox-list-private.h"
 #include "imapc-storage.h"
 #include "imapc-sync.h"
 #include "imapc-mail.h"
+#include "seq-set-builder.h"
 
 struct imapc_save_context {
        struct mail_save_context ctx;
 
        struct imapc_mailbox *mbox;
+       struct imapc_mailbox *src_mbox;
        struct mail_index_transaction *trans;
 
        int fd;
@@ -41,6 +45,7 @@ struct imapc_save_cmd_context {
 #define IMAPC_SAVECTX(s)       container_of(s, struct imapc_save_context, ctx)
 
 void imapc_transaction_save_rollback(struct mail_save_context *_ctx);
+static void imapc_mail_copy_bulk_flush(struct imapc_mailbox *mbox);
 
 struct mail_save_context *
 imapc_save_alloc(struct mailbox_transaction_context *t)
@@ -54,6 +59,7 @@ imapc_save_alloc(struct mailbox_transaction_context *t)
                ctx = i_new(struct imapc_save_context, 1);
                ctx->ctx.transaction = t;
                ctx->mbox = mbox;
+               ctx->src_mbox = NULL;
                ctx->trans = t->itrans;
                ctx->fd = -1;
                t->save_ctx = &ctx->ctx;
@@ -309,6 +315,12 @@ void imapc_save_cancel(struct mail_save_context *_ctx)
        (void)imapc_save_finish(_ctx);
 }
 
+static void imapc_copy_bulk_finish(struct imapc_save_context *ctx)
+{
+       while (ctx->src_mbox != NULL && ctx->src_mbox->pending_copy_request != NULL)
+               imapc_mailbox_run_nofetch(ctx->src_mbox);
+}
+
 int imapc_transaction_save_commit_pre(struct mail_save_context *_ctx)
 {
        struct imapc_save_context *ctx = IMAPC_SAVECTX(_ctx);
@@ -330,12 +342,40 @@ int imapc_transaction_save_commit_pre(struct mail_save_context *_ctx)
        return 0;
 }
 
+int imapc_transaction_save_commit(struct mailbox_transaction_context *t)
+{
+       struct imapc_save_context *ctx = NULL;
+       struct imapc_mailbox *src_mbox = NULL;
+
+       if (t->save_ctx != NULL) {
+               ctx = IMAPC_SAVECTX(t->save_ctx);
+               src_mbox = ctx->src_mbox;
+       }
+
+       if (src_mbox != NULL && src_mbox->pending_copy_request != NULL) {
+              /* If there is still a copy command to send flush it now */
+              imapc_mail_copy_bulk_flush(src_mbox);
+              imapc_copy_bulk_finish(ctx);
+       }
+
+       if (ctx != NULL)
+              return ctx->failed ? -1 : 0;
+       return 0;
+}
+
 void imapc_transaction_save_commit_post(struct mail_save_context *_ctx,
                                        struct mail_index_transaction_commit_result *result ATTR_UNUSED)
 {
        imapc_transaction_save_rollback(_ctx);
 }
 
+static void imapc_copy_bulk_ctx_deinit(struct imapc_save_context *ctx)
+{
+       /* Clean up the pending copy and the context attached to it */
+       str_truncate(ctx->src_mbox->pending_copy_cmd, 0);
+       i_free(ctx->src_mbox->copy_dest_box);
+}
+
 void imapc_transaction_save_rollback(struct mail_save_context *_ctx)
 {
        struct imapc_save_context *ctx = IMAPC_SAVECTX(_ctx);
@@ -379,6 +419,54 @@ static void imapc_save_copyuid(struct imapc_save_context *ctx,
        }
 }
 
+static bool imapc_save_bulk_copyuid(struct imapc_save_context *ctx,
+                                   const struct imapc_command_reply *reply)
+{
+       ARRAY_TYPE(seq_range) dest_uidset, source_uidset;
+       const char *const *args;
+       uint32_t uid_validity;
+
+       /* <uidvalidity> <source uid-set> <dest uid-set> */
+       args = t_strsplit(reply->resp_text_value, " ");
+       if (str_array_length(args) != 3)
+               return FALSE;
+
+       if (str_to_uint32(args[0], &uid_validity) < 0)
+               return FALSE;
+       if (ctx->dest_uid_validity == 0)
+               ctx->dest_uid_validity = uid_validity;
+       else if (ctx->dest_uid_validity != uid_validity)
+               return FALSE;
+
+       t_array_init(&source_uidset, 8);
+       t_array_init(&dest_uidset, 8);
+
+       if (imap_seq_set_nostar_parse(args[1], &source_uidset) < 0)
+               return FALSE;
+       if (imap_seq_set_nostar_parse(args[2], &dest_uidset) < 0)
+               return FALSE;
+
+       if (!array_is_created(&ctx->dest_saved_uids))
+               i_array_init(&ctx->dest_saved_uids, 8);
+
+       seq_range_array_merge(&ctx->dest_saved_uids, &dest_uidset);
+       return TRUE;
+}
+
+static void imapc_copy_set_error(struct imapc_save_context *sctx,
+                                const struct imapc_command_reply *reply)
+{
+       sctx->failed = TRUE;
+
+       if (reply->state != IMAPC_COMMAND_STATE_BAD)
+               imapc_copy_error_from_reply(sctx->mbox->storage,
+                                           MAIL_ERROR_PARAMS, reply);
+       else
+               mailbox_set_critical(&sctx->mbox->box,
+                                    "imapc: COPY failed: %s",
+                                    reply->text_full);
+}
+
 static void
 imapc_copy_simple_callback(const struct imapc_command_reply *reply,
                           void *context)
@@ -409,27 +497,152 @@ imapc_copy_simple(struct mail_save_context *_ctx, struct mail *mail)
 {
        struct imapc_save_context *ctx = IMAPC_SAVECTX(_ctx);
        struct mailbox_transaction_context *_t = _ctx->transaction;
-       struct imapc_mailbox *src_mbox = IMAPC_MAILBOX(mail->box);
        struct imapc_save_cmd_context sctx;
        struct imapc_command *cmd;
 
        sctx.ret = -2;
        sctx.ctx = ctx;
-       cmd = imapc_client_mailbox_cmd(src_mbox->client_box,
+       cmd = imapc_client_mailbox_cmd(ctx->src_mbox->client_box,
                                       imapc_copy_simple_callback,
                                       &sctx);
        imapc_command_sendf(cmd, "UID COPY %u %s", mail->uid, _t->box->name);
        while (sctx.ret == -2)
-               imapc_mailbox_run(src_mbox);
+               imapc_mailbox_run(ctx->src_mbox);
        ctx->finished = TRUE;
        return sctx.ret;
 }
 
+static void imapc_copy_bulk_callback(const struct imapc_command_reply *reply,
+                                    void *context)
+{
+       struct imapc_copy_request *request = context;
+       struct imapc_save_context *ctx = request->sctx;
+       struct imapc_mailbox *mbox = ctx->src_mbox;
+
+       i_assert(mbox != NULL);
+       i_assert(request == mbox->pending_copy_request);
+
+       /* Check the reply state and add uid's to index and
+          dest_saved_uids. */
+       if (ctx->failed) {
+               /* If the saving already failed try to find UIDs already
+                  copied from the reply so that rollback can expunge
+                  them */
+               if (null_strcasecmp(reply->resp_text_key, "COPYUID") == 0) {
+                       (void)imapc_save_bulk_copyuid(ctx, reply);
+                       imapc_transaction_save_rollback(&ctx->ctx);
+               }
+       } else if (reply->state == IMAPC_COMMAND_STATE_OK) {
+               if (reply->resp_text_key != NULL &&
+                  strcasecmp(reply->resp_text_key, "COPYUID") == 0 &&
+                  imapc_save_bulk_copyuid(ctx, reply)) {
+                       ctx->finished = TRUE;
+               }
+       } else {
+               imapc_copy_set_error(ctx, reply);
+       }
+
+       ctx->src_mbox->pending_copy_request = NULL;
+       i_free(request);
+       imapc_client_stop(mbox->storage->client->client);
+}
+
+static void imapc_mail_copy_bulk_flush(struct imapc_mailbox *mbox)
+{
+       struct imapc_command *cmd;
+
+       i_assert(mbox != NULL);
+       i_assert(mbox->pending_copy_request != NULL);
+       i_assert(mbox->client_box != NULL);
+
+       cmd = imapc_client_mailbox_cmd(mbox->client_box,
+                                      imapc_copy_bulk_callback,
+                                      mbox->pending_copy_request);
+
+       seqset_builder_deinit(&mbox->pending_copy_request->uidset_builder);
+
+       str_append(mbox->pending_copy_cmd, " ");
+       imap_append_astring(mbox->pending_copy_cmd, mbox->copy_dest_box);
+
+       imapc_command_send(cmd, str_c(mbox->pending_copy_cmd));
+
+       imapc_copy_bulk_ctx_deinit(mbox->pending_copy_request->sctx);
+}
+
+static bool
+imapc_mail_copy_bulk_try_merge(struct imapc_mailbox *mbox, uint32_t uid,
+                              const char *box)
+{
+       i_assert(str_begins(str_c(mbox->pending_copy_cmd), "UID COPY "));
+
+       if (strcmp(box, mbox->copy_dest_box) != 0) {
+               /* Not the same mailbox merging not possible */
+               return FALSE;
+       }
+       return seqset_builder_try_add(mbox->pending_copy_request->uidset_builder,
+                                     IMAPC_SERVER_CMDLINE_MAX_LEN, uid);
+}
+
+static void
+imapc_mail_copy_bulk_delayed_send_or_merge(struct imapc_save_context *ctx,
+                                          uint32_t uid,
+                                          const char *box)
+{
+       struct imapc_mailbox *mbox = ctx->src_mbox;
+
+       if (mbox->pending_copy_request != NULL &&
+           !imapc_mail_copy_bulk_try_merge(mbox, uid, box)) {
+               /* send the previous COPY and create new one after
+                  waiting for this one to be finished. */
+               imapc_mail_copy_bulk_flush(mbox);
+               imapc_copy_bulk_finish(mbox->pending_copy_request->sctx);
+       }
+       if (mbox->pending_copy_request == NULL) {
+               mbox->pending_copy_request =
+                       i_new(struct imapc_copy_request, 1);
+               str_printfa(mbox->pending_copy_cmd, "UID COPY ");
+               mbox->pending_copy_request->uidset_builder =
+                       seqset_builder_init(mbox->pending_copy_cmd);
+               seqset_builder_add(mbox->pending_copy_request->uidset_builder,
+                                  uid);
+               mbox->copy_dest_box = i_strdup(box);
+       } else {
+               i_assert(mbox->pending_copy_request->sctx == ctx);
+       }
+       mbox->pending_copy_request->sctx = ctx;
+}
+
+static int
+imapc_copy_bulk(struct imapc_save_context *ctx, struct mail *mail)
+{
+       struct imapc_mailbox *mbox = IMAPC_MAILBOX(ctx->ctx.transaction->box);
+
+       imapc_mail_copy_bulk_delayed_send_or_merge(ctx, mail->uid,
+                                                  imapc_mailbox_get_remote_name(mbox));
+       imapc_save_add_to_index(ctx, 0);
+
+       return ctx->failed ? -1 : 0;
+}
+
+static bool imapc_is_mail_expunged(struct imapc_mailbox *mbox, uint32_t uid)
+{
+       if (array_is_created(&mbox->delayed_expunged_uids) &&
+           seq_range_exists(&mbox->delayed_expunged_uids, uid))
+               return TRUE;
+       if (mbox->delayed_sync_trans == NULL)
+               return FALSE;
+
+       struct mail_index_view *view =
+               mail_index_transaction_get_view(mbox->delayed_sync_trans);
+       uint32_t seq;
+       return mail_index_lookup_seq(view, uid, &seq) &&
+               mail_index_transaction_is_expunged(mbox->delayed_sync_trans, seq);
+}
+
 int imapc_copy(struct mail_save_context *_ctx, struct mail *mail)
 {
        struct imapc_save_context *ctx = IMAPC_SAVECTX(_ctx);
        struct mailbox_transaction_context *_t = _ctx->transaction;
-       struct imapc_mailbox *src_mbox;
        struct imapc_msgmap *src_msgmap;
        uint32_t rseq;
        int ret;
@@ -437,10 +650,15 @@ int imapc_copy(struct mail_save_context *_ctx, struct mail *mail)
        i_assert((_t->flags & MAILBOX_TRANSACTION_FLAG_EXTERNAL) != 0);
 
        if (_t->box->storage == mail->box->storage) {
-               src_mbox = IMAPC_MAILBOX(mail->box);
+               /* Currently we don't support copying mails from multiple
+                  different source mailboxes within the same transaction. */
+               i_assert(ctx->src_mbox == NULL || &ctx->src_mbox->box == mail->box);
+               ctx->src_mbox = IMAPC_MAILBOX(mail->box);
+               if (!mail->expunged && imapc_is_mail_expunged(ctx->mbox, mail->uid))
+                       mail_set_expunged(mail);
                /* same server, we can use COPY for the mail */
                src_msgmap =
-                       imapc_client_mailbox_get_msgmap(src_mbox->client_box);
+                       imapc_client_mailbox_get_msgmap(ctx->src_mbox->client_box);
                if (mail->expunged ||
                    !imapc_msgmap_uid_to_rseq(src_msgmap, mail->uid, &rseq)) {
                        mail_storage_set_error(mail->box->storage,
index d0aca8ab15b415f4e6eede6a70832e48a3d0b1fa..8b7edbe00ae1b3e5de76c552004d7b5a16468cc0 100644 (file)
@@ -541,6 +541,7 @@ imapc_mailbox_alloc(struct mail_storage *storage, struct mailbox_list *list,
        p_array_init(&mbox->untagged_fetch_contexts, pool, 16);
        p_array_init(&mbox->delayed_expunged_uids, pool, 16);
        mbox->pending_fetch_cmd = str_new(pool, 128);
+       mbox->pending_copy_cmd = str_new(pool, 128);
        mbox->prev_mail_cache.fd = -1;
        imapc_mailbox_register_callbacks(mbox);
        return &mbox->box;
@@ -1282,6 +1283,15 @@ struct mail_storage imapc_storage = {
        }
 };
 
+static int
+imapc_mailbox_transaction_commit(struct mailbox_transaction_context *t,
+                                struct mail_transaction_commit_changes *changes_r)
+{
+       int ret = imapc_transaction_save_commit(t);
+       int ret2 = index_transaction_commit(t, changes_r);
+       return ret >= 0 && ret2 >= 0 ? 0 : -1;
+}
+
 struct mailbox imapc_mailbox = {
        .v = {
                index_storage_is_readonly,
@@ -1310,7 +1320,7 @@ struct mailbox imapc_mailbox = {
                NULL,
                imapc_notify_changes,
                index_transaction_begin,
-               index_transaction_commit,
+               imapc_mailbox_transaction_commit,
                index_transaction_rollback,
                NULL,
                imapc_mail_alloc,
index 6040c3389a8d0ad936cee641dd9b54c311423c83..8af437455e7addd7c469636ee00aebb12df329e8 100644 (file)
@@ -113,6 +113,11 @@ struct imapc_untagged_fetch_ctx {
        bool have_flags:1;
 };
 
+struct imapc_copy_request {
+       struct imapc_save_context *sctx;
+       struct seqset_builder *uidset_builder;
+};
+
 struct imapc_mailbox {
        struct mailbox box;
        struct imapc_storage *storage;
@@ -131,7 +136,12 @@ struct imapc_mailbox {
           sending soon (but still waiting to see if we can increase its
           UID range) */
        string_t *pending_fetch_cmd;
+       /* if non-empty, contains the latest COPY command we're going to be
+          sending soon. */
+       string_t *pending_copy_cmd;
+       char *copy_dest_box;
        struct imapc_fetch_request *pending_fetch_request;
+       struct imapc_copy_request *pending_copy_request;
        struct timeout *to_pending_fetch_send;
 
        ARRAY(struct imapc_mailbox_event_callback) untagged_callbacks;
@@ -195,6 +205,7 @@ int imapc_save_finish(struct mail_save_context *ctx);
 void imapc_save_cancel(struct mail_save_context *ctx);
 int imapc_copy(struct mail_save_context *ctx, struct mail *mail);
 
+int imapc_transaction_save_commit(struct mailbox_transaction_context *t);
 int imapc_transaction_save_commit_pre(struct mail_save_context *ctx);
 void imapc_transaction_save_commit_post(struct mail_save_context *ctx,
                                        struct mail_index_transaction_commit_result *result);