From: Timo Sirainen Date: Sat, 10 Jul 2010 14:56:45 +0000 (+0100) Subject: dsync: Fixed assert-crashing when messages couldn't be sent fast enough to remote X-Git-Tag: 2.0.rc3~101 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=ef7c0451e00f5f49e0c3f4266abaf78b8d383b3a;p=thirdparty%2Fdovecot%2Fcore.git dsync: Fixed assert-crashing when messages couldn't be sent fast enough to remote --HG-- branch : HEAD --- diff --git a/src/dsync/dsync-brain-msgs-new.c b/src/dsync/dsync-brain-msgs-new.c index 4017419337..3ac0d1477a 100644 --- a/src/dsync/dsync-brain-msgs-new.c +++ b/src/dsync/dsync-brain-msgs-new.c @@ -48,6 +48,15 @@ struct dsync_brain_msg_save_context { static void dsync_brain_msg_sync_add_new_msgs(struct dsync_brain_msg_iter *iter); +static void msg_save_callback(void *context) +{ + struct dsync_brain_msg_save_context *ctx = context; + + if (--ctx->iter->save_results_left == 0 && !ctx->iter->adding_msgs) + dsync_brain_msg_sync_add_new_msgs(ctx->iter); + i_free(ctx); +} + static void msg_get_callback(enum dsync_msg_get_result result, const struct dsync_msg_static_data *data, void *context) @@ -56,6 +65,8 @@ static void msg_get_callback(enum dsync_msg_get_result result, const struct dsync_brain_mailbox *mailbox; struct istream *input; + i_assert(ctx->iter->save_results_left > 0); + mailbox = array_idx(&ctx->iter->sync->mailboxes, ctx->mailbox_idx); switch (result) { case DSYNC_MSG_GET_RESULT_SUCCESS: @@ -64,21 +75,21 @@ static void msg_get_callback(enum dsync_msg_get_result result, dsync_worker_select_mailbox(ctx->iter->worker, &mailbox->box); input = data->input; - dsync_worker_msg_save(ctx->iter->worker, ctx->msg, data); + dsync_worker_msg_save(ctx->iter->worker, ctx->msg, data, + msg_save_callback, ctx); i_stream_unref(&input); break; case DSYNC_MSG_GET_RESULT_EXPUNGED: /* mail got expunged during sync. just skip this. */ + msg_save_callback(ctx); break; case DSYNC_MSG_GET_RESULT_FAILED: i_error("msg-get failed: box=%s uid=%u guid=%s", mailbox->box.name, ctx->msg->uid, ctx->msg->guid); dsync_brain_fail(ctx->iter->sync->brain); + msg_save_callback(ctx); break; } - if (--ctx->iter->save_results_left == 0 && !ctx->iter->adding_msgs) - dsync_brain_msg_sync_add_new_msgs(ctx->iter); - i_free(ctx); } static void @@ -297,8 +308,9 @@ dsync_brain_msg_sync_add_new_msgs(struct dsync_brain_msg_iter *iter) /* all messages saved for this mailbox. continue with saving its conflicts and waiting for copies to finish. */ dsync_brain_mailbox_save_conflicts(iter); - if (iter->copy_results_left > 0) { - /* wait for copies to finish */ + if (iter->save_results_left > 0 || + iter->copy_results_left > 0) { + /* wait for saves/copies to finish */ return; } diff --git a/src/dsync/dsync-proxy-client.c b/src/dsync/dsync-proxy-client.c index 7bf902c9a8..d804dd07f7 100644 --- a/src/dsync/dsync-proxy-client.c +++ b/src/dsync/dsync-proxy-client.c @@ -56,6 +56,8 @@ struct proxy_client_dsync_worker { mailbox_guid_t selected_box_guid; + dsync_worker_save_callback_t *save_callback; + void *save_context; struct istream *save_input; struct io *save_io; bool save_input_last_lf; @@ -907,6 +909,7 @@ proxy_client_worker_msg_copy(struct dsync_worker *_worker, static void proxy_client_send_stream(struct proxy_client_dsync_worker *worker) { + dsync_worker_save_callback_t *callback; const unsigned char *data; size_t size; int ret; @@ -947,13 +950,20 @@ static void proxy_client_send_stream(struct proxy_client_dsync_worker *worker) i_assert(!i_stream_have_bytes_left(worker->save_input)); o_stream_send(worker->output, "\n.\n", 3); } + + callback = worker->save_callback; + worker->save_callback = NULL; i_stream_unref(&worker->save_input); + + callback(worker->save_context); } static void proxy_client_worker_msg_save(struct dsync_worker *_worker, const struct dsync_message *msg, - const struct dsync_msg_static_data *data) + const struct dsync_msg_static_data *data, + dsync_worker_save_callback_t *callback, + void *context) { struct proxy_client_dsync_worker *worker = (struct proxy_client_dsync_worker *)_worker; @@ -972,6 +982,8 @@ proxy_client_worker_msg_save(struct dsync_worker *_worker, } T_END; i_assert(worker->save_io == NULL); + worker->save_callback = callback; + worker->save_context = context; worker->save_input = data->input; worker->save_input_last_lf = TRUE; i_stream_ref(worker->save_input); diff --git a/src/dsync/dsync-proxy-server-cmd.c b/src/dsync/dsync-proxy-server-cmd.c index 0c521f6e8f..52abd68cea 100644 --- a/src/dsync/dsync-proxy-server-cmd.c +++ b/src/dsync/dsync-proxy-server-cmd.c @@ -415,6 +415,13 @@ cmd_msg_copy(struct dsync_proxy_server *server, const char *const *args) return 1; } +static void cmd_msg_save_callback(void *context) +{ + struct dsync_proxy_server *server = context; + + server->save_finished = TRUE; +} + static int cmd_msg_save(struct dsync_proxy_server *server, const char *const *args) { @@ -437,10 +444,13 @@ cmd_msg_save(struct dsync_proxy_server *server, const char *const *args) } /* we rely on save reading the entire input */ + server->save_finished = FALSE; net_set_nonblock(server->fd_in, FALSE); - dsync_worker_msg_save(server->worker, &msg, &data); + dsync_worker_msg_save(server->worker, &msg, &data, + cmd_msg_save_callback, server); net_set_nonblock(server->fd_in, TRUE); ret = dsync_worker_has_failed(server->worker) ? -1 : 1; + i_assert(server->save_finished); i_assert(data.input->eof || ret < 0); i_stream_destroy(&data.input); return ret; diff --git a/src/dsync/dsync-proxy-server.h b/src/dsync/dsync-proxy-server.h index 44a48b00f6..4354e914ce 100644 --- a/src/dsync/dsync-proxy-server.h +++ b/src/dsync/dsync-proxy-server.h @@ -32,6 +32,7 @@ struct dsync_proxy_server { unsigned int handshake_received:1; unsigned int subs_sending_unsubscriptions:1; + unsigned int save_finished:1; unsigned int finished:1; }; diff --git a/src/dsync/dsync-worker-local.c b/src/dsync/dsync-worker-local.c index 332ffd2c0f..3d95f7507a 100644 --- a/src/dsync/dsync-worker-local.c +++ b/src/dsync/dsync-worker-local.c @@ -95,6 +95,8 @@ struct local_dsync_worker { struct io *save_io; struct mail_save_context *save_ctx; struct istream *save_input; + dsync_worker_save_callback_t *save_callback; + void *save_context; dsync_worker_finish_callback_t *finish_callback; void *finish_context; @@ -1516,6 +1518,7 @@ static void local_worker_save_msg_continue(struct local_dsync_worker *worker) { struct mailbox *dest_box = worker->ext_mail->box; + dsync_worker_save_callback_t *callback; int ret; while ((ret = i_stream_read(worker->save_input)) > 0) { @@ -1552,14 +1555,20 @@ local_worker_save_msg_continue(struct local_dsync_worker *worker) mail_storage_get_last_error(storage, NULL)); dsync_worker_set_failure(&worker->worker); } + callback = worker->save_callback; + worker->save_callback = NULL; i_stream_unref(&worker->save_input); + + callback(worker->save_context); dsync_worker_try_finish(worker); } static void local_worker_msg_save(struct dsync_worker *_worker, const struct dsync_message *msg, - const struct dsync_msg_static_data *data) + const struct dsync_msg_static_data *data, + dsync_worker_save_callback_t *callback, + void *context) { struct local_dsync_worker *worker = (struct local_dsync_worker *)_worker; @@ -1582,9 +1591,12 @@ local_worker_msg_save(struct dsync_worker *_worker, mailbox_get_vname(dest_box), mail_storage_get_last_error(storage, NULL)); dsync_worker_set_failure(_worker); + callback(context); return; } + worker->save_callback = callback; + worker->save_context = context; worker->save_input = data->input; worker->save_ctx = save_ctx; i_stream_ref(worker->save_input); diff --git a/src/dsync/dsync-worker-private.h b/src/dsync/dsync-worker-private.h index 86eddcde9d..6cf1a05b38 100644 --- a/src/dsync/dsync-worker-private.h +++ b/src/dsync/dsync-worker-private.h @@ -62,7 +62,9 @@ struct dsync_worker_vfuncs { dsync_worker_copy_callback_t *callback, void *context); void (*msg_save)(struct dsync_worker *worker, const struct dsync_message *msg, - const struct dsync_msg_static_data *data); + const struct dsync_msg_static_data *data, + dsync_worker_save_callback_t *callback, + void *context); void (*msg_save_cancel)(struct dsync_worker *worker); void (*msg_get)(struct dsync_worker *worker, const mailbox_guid_t *mailbox, uint32_t uid, diff --git a/src/dsync/dsync-worker.c b/src/dsync/dsync-worker.c index dddf0ce464..77599e17f9 100644 --- a/src/dsync/dsync-worker.c +++ b/src/dsync/dsync-worker.c @@ -222,11 +222,14 @@ void dsync_worker_msg_copy(struct dsync_worker *worker, void dsync_worker_msg_save(struct dsync_worker *worker, const struct dsync_message *msg, - const struct dsync_msg_static_data *data) + const struct dsync_msg_static_data *data, + dsync_worker_save_callback_t *callback, + void *context) { if (!worker->readonly) { if (!worker->failed) T_BEGIN { - worker->v.msg_save(worker, msg, data); + worker->v.msg_save(worker, msg, data, + callback, context); } T_END; } else { const unsigned char *d; @@ -234,6 +237,7 @@ void dsync_worker_msg_save(struct dsync_worker *worker, while ((i_stream_read_data(data->input, &d, &size, 0)) > 0) i_stream_skip(data->input, size); + callback(context); } } diff --git a/src/dsync/dsync-worker.h b/src/dsync/dsync-worker.h index 951a25cf97..3b858dc444 100644 --- a/src/dsync/dsync-worker.h +++ b/src/dsync/dsync-worker.h @@ -23,6 +23,7 @@ struct dsync_worker_unsubscription { }; typedef void dsync_worker_copy_callback_t(bool success, void *context); +typedef void dsync_worker_save_callback_t(void *context); typedef void dsync_worker_msg_callback_t(enum dsync_msg_get_result result, const struct dsync_msg_static_data *data, void *context); @@ -133,11 +134,13 @@ void dsync_worker_msg_copy(struct dsync_worker *worker, const struct dsync_message *dest_msg, dsync_worker_copy_callback_t *callback, void *context); -/* Save given message from the given input stream. The stream is destroyed once - saving is finished. */ +/* Save given message from the given input stream. Once saving is finished, + the given callback is called and the stream is destroyed. */ void dsync_worker_msg_save(struct dsync_worker *worker, const struct dsync_message *msg, - const struct dsync_msg_static_data *data); + const struct dsync_msg_static_data *data, + dsync_worker_save_callback_t *callback, + void *context); /* Cancel any pending saves */ void dsync_worker_msg_save_cancel(struct dsync_worker *worker); /* Get message data for saving. The callback is called once when the static diff --git a/src/dsync/test-dsync-worker.c b/src/dsync/test-dsync-worker.c index c05936d0cc..e784138e3c 100644 --- a/src/dsync/test-dsync-worker.c +++ b/src/dsync/test-dsync-worker.c @@ -388,7 +388,9 @@ test_worker_msg_copy(struct dsync_worker *_worker, static void test_worker_msg_save(struct dsync_worker *_worker, const struct dsync_message *msg, - const struct dsync_msg_static_data *data) + const struct dsync_msg_static_data *data, + dsync_worker_save_callback_t *callback, + void *context) { struct test_dsync_worker *worker = (struct test_dsync_worker *)_worker; struct test_dsync_msg_event *event; @@ -408,6 +410,8 @@ test_worker_msg_save(struct dsync_worker *_worker, } i_assert(ret == -1); event->save_body = p_strdup(worker->tmp_pool, str_c(body)); + + callback(context); } static void