]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
dsync: Fixed assert-crashing when messages couldn't be sent fast enough to remote
authorTimo Sirainen <tss@iki.fi>
Sat, 10 Jul 2010 14:56:45 +0000 (15:56 +0100)
committerTimo Sirainen <tss@iki.fi>
Sat, 10 Jul 2010 14:56:45 +0000 (15:56 +0100)
--HG--
branch : HEAD

src/dsync/dsync-brain-msgs-new.c
src/dsync/dsync-proxy-client.c
src/dsync/dsync-proxy-server-cmd.c
src/dsync/dsync-proxy-server.h
src/dsync/dsync-worker-local.c
src/dsync/dsync-worker-private.h
src/dsync/dsync-worker.c
src/dsync/dsync-worker.h
src/dsync/test-dsync-worker.c

index 40174193370b1cc482433473a6fbff330c1c089c..3ac0d1477a358533cb91cd75d2cd1ff2a3665b4e 100644 (file)
@@ -48,6 +48,15 @@ struct dsync_brain_msg_save_context {
 static void
 dsync_brain_msg_sync_add_new_msgs(struct dsync_brain_msg_iter *iter);
 
+static void msg_save_callback(void *context)
+{
+       struct dsync_brain_msg_save_context *ctx = context;
+
+       if (--ctx->iter->save_results_left == 0 && !ctx->iter->adding_msgs)
+               dsync_brain_msg_sync_add_new_msgs(ctx->iter);
+       i_free(ctx);
+}
+
 static void msg_get_callback(enum dsync_msg_get_result result,
                             const struct dsync_msg_static_data *data,
                             void *context)
@@ -56,6 +65,8 @@ static void msg_get_callback(enum dsync_msg_get_result result,
        const struct dsync_brain_mailbox *mailbox;
        struct istream *input;
 
+       i_assert(ctx->iter->save_results_left > 0);
+
        mailbox = array_idx(&ctx->iter->sync->mailboxes, ctx->mailbox_idx);
        switch (result) {
        case DSYNC_MSG_GET_RESULT_SUCCESS:
@@ -64,21 +75,21 @@ static void msg_get_callback(enum dsync_msg_get_result result,
                dsync_worker_select_mailbox(ctx->iter->worker, &mailbox->box);
 
                input = data->input;
-               dsync_worker_msg_save(ctx->iter->worker, ctx->msg, data);
+               dsync_worker_msg_save(ctx->iter->worker, ctx->msg, data,
+                                     msg_save_callback, ctx);
                i_stream_unref(&input);
                break;
        case DSYNC_MSG_GET_RESULT_EXPUNGED:
                /* mail got expunged during sync. just skip this. */
+               msg_save_callback(ctx);
                break;
        case DSYNC_MSG_GET_RESULT_FAILED:
                i_error("msg-get failed: box=%s uid=%u guid=%s",
                        mailbox->box.name, ctx->msg->uid, ctx->msg->guid);
                dsync_brain_fail(ctx->iter->sync->brain);
+               msg_save_callback(ctx);
                break;
        }
-       if (--ctx->iter->save_results_left == 0 && !ctx->iter->adding_msgs)
-               dsync_brain_msg_sync_add_new_msgs(ctx->iter);
-       i_free(ctx);
 }
 
 static void
@@ -297,8 +308,9 @@ dsync_brain_msg_sync_add_new_msgs(struct dsync_brain_msg_iter *iter)
                /* all messages saved for this mailbox. continue with saving
                   its conflicts and waiting for copies to finish. */
                dsync_brain_mailbox_save_conflicts(iter);
-               if (iter->copy_results_left > 0) {
-                       /* wait for copies to finish */
+               if (iter->save_results_left > 0 ||
+                   iter->copy_results_left > 0) {
+                       /* wait for saves/copies to finish */
                        return;
                }
 
index 7bf902c9a8843b4fb33d69cfb2967efb1a08ca09..d804dd07f7d02d4e84e66e75a8578d5ca3ba1ec8 100644 (file)
@@ -56,6 +56,8 @@ struct proxy_client_dsync_worker {
 
        mailbox_guid_t selected_box_guid;
 
+       dsync_worker_save_callback_t *save_callback;
+       void *save_context;
        struct istream *save_input;
        struct io *save_io;
        bool save_input_last_lf;
@@ -907,6 +909,7 @@ proxy_client_worker_msg_copy(struct dsync_worker *_worker,
 
 static void proxy_client_send_stream(struct proxy_client_dsync_worker *worker)
 {
+       dsync_worker_save_callback_t *callback;
        const unsigned char *data;
        size_t size;
        int ret;
@@ -947,13 +950,20 @@ static void proxy_client_send_stream(struct proxy_client_dsync_worker *worker)
                i_assert(!i_stream_have_bytes_left(worker->save_input));
                o_stream_send(worker->output, "\n.\n", 3);
        }
+
+       callback = worker->save_callback;
+       worker->save_callback = NULL;
        i_stream_unref(&worker->save_input);
+
+       callback(worker->save_context);
 }
 
 static void
 proxy_client_worker_msg_save(struct dsync_worker *_worker,
                             const struct dsync_message *msg,
-                            const struct dsync_msg_static_data *data)
+                            const struct dsync_msg_static_data *data,
+                            dsync_worker_save_callback_t *callback,
+                            void *context)
 {
        struct proxy_client_dsync_worker *worker =
                (struct proxy_client_dsync_worker *)_worker;
@@ -972,6 +982,8 @@ proxy_client_worker_msg_save(struct dsync_worker *_worker,
        } T_END;
 
        i_assert(worker->save_io == NULL);
+       worker->save_callback = callback;
+       worker->save_context = context;
        worker->save_input = data->input;
        worker->save_input_last_lf = TRUE;
        i_stream_ref(worker->save_input);
index 0c521f6e8f83382e0f95177e0ad2b9be414552ae..52abd68ceaba5bc25cc65d8e55fdef1a0653d1e5 100644 (file)
@@ -415,6 +415,13 @@ cmd_msg_copy(struct dsync_proxy_server *server, const char *const *args)
        return 1;
 }
 
+static void cmd_msg_save_callback(void *context)
+{
+       struct dsync_proxy_server *server = context;
+
+       server->save_finished = TRUE;
+}
+
 static int
 cmd_msg_save(struct dsync_proxy_server *server, const char *const *args)
 {
@@ -437,10 +444,13 @@ cmd_msg_save(struct dsync_proxy_server *server, const char *const *args)
        }
 
        /* we rely on save reading the entire input */
+       server->save_finished = FALSE;
        net_set_nonblock(server->fd_in, FALSE);
-       dsync_worker_msg_save(server->worker, &msg, &data);
+       dsync_worker_msg_save(server->worker, &msg, &data,
+                             cmd_msg_save_callback, server);
        net_set_nonblock(server->fd_in, TRUE);
        ret = dsync_worker_has_failed(server->worker) ? -1 : 1;
+       i_assert(server->save_finished);
        i_assert(data.input->eof || ret < 0);
        i_stream_destroy(&data.input);
        return ret;
index 44a48b00f6f35fca519abd9b720388054cec8a96..4354e914ce029774382726eb93cbefbdb6c50015 100644 (file)
@@ -32,6 +32,7 @@ struct dsync_proxy_server {
 
        unsigned int handshake_received:1;
        unsigned int subs_sending_unsubscriptions:1;
+       unsigned int save_finished:1;
        unsigned int finished:1;
 };
 
index 332ffd2c0ff4473dd6ed2747a69c9299dc4527b1..3d95f7507ad7cd33c53bfa568173698f8990dfe1 100644 (file)
@@ -95,6 +95,8 @@ struct local_dsync_worker {
        struct io *save_io;
        struct mail_save_context *save_ctx;
        struct istream *save_input;
+       dsync_worker_save_callback_t *save_callback;
+       void *save_context;
 
        dsync_worker_finish_callback_t *finish_callback;
        void *finish_context;
@@ -1516,6 +1518,7 @@ static void
 local_worker_save_msg_continue(struct local_dsync_worker *worker)
 {
        struct mailbox *dest_box = worker->ext_mail->box;
+       dsync_worker_save_callback_t *callback;
        int ret;
 
        while ((ret = i_stream_read(worker->save_input)) > 0) {
@@ -1552,14 +1555,20 @@ local_worker_save_msg_continue(struct local_dsync_worker *worker)
                        mail_storage_get_last_error(storage, NULL));
                dsync_worker_set_failure(&worker->worker);
        }
+       callback = worker->save_callback;
+       worker->save_callback = NULL;
        i_stream_unref(&worker->save_input);
+
+       callback(worker->save_context);
        dsync_worker_try_finish(worker);
 }
 
 static void
 local_worker_msg_save(struct dsync_worker *_worker,
                      const struct dsync_message *msg,
-                     const struct dsync_msg_static_data *data)
+                     const struct dsync_msg_static_data *data,
+                     dsync_worker_save_callback_t *callback,
+                     void *context)
 {
        struct local_dsync_worker *worker =
                (struct local_dsync_worker *)_worker;
@@ -1582,9 +1591,12 @@ local_worker_msg_save(struct dsync_worker *_worker,
                        mailbox_get_vname(dest_box),
                        mail_storage_get_last_error(storage, NULL));
                dsync_worker_set_failure(_worker);
+               callback(context);
                return;
        }
 
+       worker->save_callback = callback;
+       worker->save_context = context;
        worker->save_input = data->input;
        worker->save_ctx = save_ctx;
        i_stream_ref(worker->save_input);
index 86eddcde9d4222405b8eaa3c47c58b360c8f7798..6cf1a05b3809ca25b52a20d75c993d44e5e4763a 100644 (file)
@@ -62,7 +62,9 @@ struct dsync_worker_vfuncs {
                         dsync_worker_copy_callback_t *callback, void *context);
        void (*msg_save)(struct dsync_worker *worker,
                         const struct dsync_message *msg,
-                        const struct dsync_msg_static_data *data);
+                        const struct dsync_msg_static_data *data,
+                        dsync_worker_save_callback_t *callback,
+                        void *context);
        void (*msg_save_cancel)(struct dsync_worker *worker);
        void (*msg_get)(struct dsync_worker *worker,
                        const mailbox_guid_t *mailbox, uint32_t uid,
index dddf0ce464c874c08effe970c54d909757cdc529..77599e17f9425431f87f832fc3765df9d8d51012 100644 (file)
@@ -222,11 +222,14 @@ void dsync_worker_msg_copy(struct dsync_worker *worker,
 
 void dsync_worker_msg_save(struct dsync_worker *worker,
                           const struct dsync_message *msg,
-                          const struct dsync_msg_static_data *data)
+                          const struct dsync_msg_static_data *data,
+                          dsync_worker_save_callback_t *callback,
+                          void *context)
 {
        if (!worker->readonly) {
                if (!worker->failed) T_BEGIN {
-                       worker->v.msg_save(worker, msg, data);
+                       worker->v.msg_save(worker, msg, data,
+                                          callback, context);
                } T_END;
        } else {
                const unsigned char *d;
@@ -234,6 +237,7 @@ void dsync_worker_msg_save(struct dsync_worker *worker,
 
                while ((i_stream_read_data(data->input, &d, &size, 0)) > 0)
                        i_stream_skip(data->input, size);
+               callback(context);
        }
 }
 
index 951a25cf9779760e42155ecd6e1f81979f11c0e1..3b858dc444cc2108cf13fec36aeab95f3424368c 100644 (file)
@@ -23,6 +23,7 @@ struct dsync_worker_unsubscription {
 };
 
 typedef void dsync_worker_copy_callback_t(bool success, void *context);
+typedef void dsync_worker_save_callback_t(void *context);
 typedef void dsync_worker_msg_callback_t(enum dsync_msg_get_result result,
                                         const struct dsync_msg_static_data *data,
                                         void *context);
@@ -133,11 +134,13 @@ void dsync_worker_msg_copy(struct dsync_worker *worker,
                           const struct dsync_message *dest_msg,
                           dsync_worker_copy_callback_t *callback,
                           void *context);
-/* Save given message from the given input stream. The stream is destroyed once
-   saving is finished. */
+/* Save given message from the given input stream. Once saving is finished,
+   the given callback is called and the stream is destroyed. */
 void dsync_worker_msg_save(struct dsync_worker *worker,
                           const struct dsync_message *msg,
-                          const struct dsync_msg_static_data *data);
+                          const struct dsync_msg_static_data *data,
+                          dsync_worker_save_callback_t *callback,
+                          void *context);
 /* Cancel any pending saves */
 void dsync_worker_msg_save_cancel(struct dsync_worker *worker);
 /* Get message data for saving. The callback is called once when the static
index c05936d0cc7d002d945688a4d5e2171865b91e81..e784138e3c4918e8ffc5071e61d8d94ab9cf588e 100644 (file)
@@ -388,7 +388,9 @@ test_worker_msg_copy(struct dsync_worker *_worker,
 static void
 test_worker_msg_save(struct dsync_worker *_worker,
                     const struct dsync_message *msg,
-                    const struct dsync_msg_static_data *data)
+                    const struct dsync_msg_static_data *data,
+                    dsync_worker_save_callback_t *callback,
+                    void *context)
 {
        struct test_dsync_worker *worker = (struct test_dsync_worker *)_worker;
        struct test_dsync_msg_event *event;
@@ -408,6 +410,8 @@ test_worker_msg_save(struct dsync_worker *_worker,
        }
        i_assert(ret == -1);
        event->save_body = p_strdup(worker->tmp_pool, str_c(body));
+
+       callback(context);
 }
 
 static void