/* Copyright (c) 2009-2010 Dovecot authors, see the included COPYING file */
+/*
+ This code contains the step 6 explained in dsync-brain-msgs.c:
+ It saves/copies new messages and gives new UIDs for conflicting messages.
+
+ The input is both workers' msg iterators' new_msgs and uid_conflicts
+ variables. They're first sorted by mailbox and secondarily by wanted
+ destination UID. Destination UIDs of conflicts should always be higher
+ than new messages'.
+
+ Mailboxes are handled one at a time:
+
+ 1. Go through all saved messages. If we've already seen an instance of this
+ message, try to copy it. Otherwise save a new instance of it.
+ 2. Some of the copies may fail because they're already expunged by that
+ time. A list of these failed copies are saved to copy_retry_indexes.
+ 3. UID conflicts are resolved by assigning a new UID to the message.
+ To avoid delays with remote dsync, this is done via worker API.
+ Internally the local worker copies the message to its new UID and
+ once the copy succeeds, the old UID is expunged. If the copy fails, it's
+ either due to message already being expunged or something more fatal.
+ 4. Once all messages are saved/copied, see if there are any failed copies.
+ If so, goto 1, but going through only the failed messages.
+ 5. If there are more mailboxes left, go to next one and goto 1.
+
+ Step 4 may require waiting for remote worker to send all replies.
+*/
+
#include "lib.h"
#include "array.h"
#include "istream.h"
}
if (--ctx->iter->save_results_left == 0 && !ctx->iter->adding_msgs)
dsync_brain_msg_sync_add_new_msgs(ctx->iter);
+ i_free(ctx);
+}
+
+static void
+dsync_brain_sync_remove_guid_instance(struct dsync_brain_msg_iter *iter,
+ const struct dsync_brain_new_msg *msg)
+{
+ struct dsync_brain_guid_instance *inst;
+ void *orig_key, *orig_value;
+
+ if (!hash_table_lookup_full(iter->guid_hash, msg->msg->guid,
+ &orig_key, &orig_value)) {
+ /* another failed copy already removed it */
+ return;
+ }
+ inst = orig_value;
+
+ if (inst->next == NULL)
+ hash_table_remove(iter->guid_hash, orig_key);
+ else
+ hash_table_update(iter->guid_hash, orig_key, inst->next);
}
static void dsync_brain_copy_callback(bool success, void *context)
{
struct dsync_brain_msg_copy_context *ctx = context;
- const struct dsync_brain_new_msg *msg;
- struct dsync_brain_guid_instance *inst;
+ struct dsync_brain_new_msg *msg;
if (!success) {
/* mark the guid instance invalid and try again later */
- msg = array_idx(&ctx->iter->new_msgs, ctx->msg_idx);
- inst = hash_table_lookup(ctx->iter->guid_hash, msg->msg->guid);
- inst->failed = TRUE;
- array_append(&ctx->iter->copy_retry_indexes, &ctx->msg_idx, 1);
+ msg = array_idx_modifiable(&ctx->iter->new_msgs, ctx->msg_idx);
+ i_assert(msg->saved);
+ msg->saved = FALSE;
+
+ if (ctx->iter->next_new_msg > ctx->msg_idx)
+ ctx->iter->next_new_msg = ctx->msg_idx;
+
+ dsync_brain_sync_remove_guid_instance(ctx->iter, msg);
}
if (--ctx->iter->copy_results_left == 0 && !ctx->iter->adding_msgs)
dsync_brain_msg_sync_add_new_msgs(ctx->iter);
+ i_free(ctx);
}
static int
dsync_brain_msg_sync_add_new_msg(struct dsync_brain_msg_iter *dest_iter,
const mailbox_guid_t *src_mailbox,
unsigned int msg_idx,
- const struct dsync_brain_new_msg *msg)
+ struct dsync_brain_new_msg *msg)
{
struct dsync_brain_msg_save_context *save_ctx;
struct dsync_brain_msg_copy_context *copy_ctx;
const struct dsync_brain_guid_instance *inst;
const struct dsync_brain_mailbox *inst_box;
+ msg->saved = TRUE;
+
inst = hash_table_lookup(dest_iter->guid_hash, msg->msg->guid);
if (inst != NULL) {
/* we can save this by copying an existing message */
inst_box = array_idx(&dest_iter->sync->mailboxes,
inst->mailbox_idx);
- copy_ctx = p_new(dest_iter->sync->pool,
- struct dsync_brain_msg_copy_context, 1);
+ copy_ctx = i_new(struct dsync_brain_msg_copy_context, 1);
copy_ctx->iter = dest_iter;
copy_ctx->msg_idx = msg_idx;
dest_iter->sync->src_msg_iter :
dest_iter->sync->dest_msg_iter;
- save_ctx = p_new(src_iter->sync->pool,
- struct dsync_brain_msg_save_context, 1);
+ save_ctx = i_new(struct dsync_brain_msg_save_context, 1);
save_ctx->iter = dest_iter;
- save_ctx->msg = dsync_message_dup(src_iter->sync->pool,
- msg->msg);
+ save_ctx->msg = msg->msg;
save_ctx->mailbox_idx = dest_iter->mailbox_idx;
dest_iter->adding_msgs = TRUE;
dsync_brain_mailbox_add_new_msgs(struct dsync_brain_msg_iter *iter,
const mailbox_guid_t *mailbox_guid)
{
- const struct dsync_brain_new_msg *msgs;
+ struct dsync_brain_new_msg *msgs;
unsigned int i, msg_count;
bool ret = TRUE;
- msgs = array_get(&iter->new_msgs, &msg_count);
+ msgs = array_get_modifiable(&iter->new_msgs, &msg_count);
for (i = iter->next_new_msg; i < msg_count; i++) {
+ if (msgs[i].saved)
+ continue;
if (msgs[i].mailbox_idx != iter->mailbox_idx) {
i_assert(msgs[i].mailbox_idx > iter->mailbox_idx);
ret = FALSE;
}
static void
-dsync_brain_mailbox_retry_copies(struct dsync_brain_msg_iter *iter,
- const mailbox_guid_t *mailbox_guid)
+dsync_brain_msg_sync_finish(struct dsync_brain_msg_iter *iter)
{
- const uint32_t *indexes;
- const struct dsync_brain_new_msg *msgs;
- unsigned int i, msg_idx, idx_count, msg_count;
- struct dsync_brain_guid_instance *inst;
- const char *guid_str;
- void *orig_key, *orig_value;
+ struct dsync_brain_mailbox_sync *sync = iter->sync;
- /* first remove GUID instances that had failed. */
- msgs = array_get(&iter->new_msgs, &msg_count);
- indexes = array_get(&iter->copy_retry_indexes, &idx_count);
- for (i = 0; i < idx_count; i++) {
- guid_str = msgs[indexes[i]].msg->guid;
- if (hash_table_lookup_full(iter->guid_hash, guid_str,
- &orig_key, &orig_value))
- inst = orig_value;
- else
- inst = NULL;
- if (inst != NULL && inst->failed) {
- inst = inst->next;
- if (inst == NULL)
- hash_table_remove(iter->guid_hash, guid_str);
- else {
- hash_table_update(iter->guid_hash, orig_key,
- inst);
- }
- }
- }
+ iter->msgs_sent = TRUE;
- /* try saving again. there probably weren't many of them, so don't
- worry about filling output buffer. */
- for (i = 0; i < idx_count; i++) {
- msg_idx = indexes[i];
- // FIXME: if buffer fills, we assert-crash
- (void)dsync_brain_msg_sync_add_new_msg(iter, mailbox_guid,
- msg_idx, &msgs[msg_idx]);
- }
+ /* done with all mailboxes from this iter */
+ dsync_worker_set_input_callback(iter->worker, NULL, NULL);
- /* if we copied anything, we'll again have to wait for the results */
- array_clear(&iter->copy_retry_indexes);
+ if (sync->src_msg_iter->msgs_sent &&
+ sync->dest_msg_iter->msgs_sent &&
+ sync->src_msg_iter->save_results_left == 0 &&
+ sync->dest_msg_iter->save_results_left == 0 &&
+ dsync_worker_output_flush(sync->dest_worker) > 0 &&
+ dsync_worker_output_flush(sync->src_worker) > 0) {
+ sync->brain->state++;
+ dsync_brain_sync(sync->brain);
+ }
}
-static void
-dsync_brain_msg_sync_add_new_msgs(struct dsync_brain_msg_iter *iter)
+static bool
+dsync_brain_msg_sync_select_mailbox(struct dsync_brain_msg_iter *iter)
{
const struct dsync_brain_mailbox *mailbox;
- const mailbox_guid_t *mailbox_guid;
while (iter->mailbox_idx < array_count(&iter->sync->mailboxes)) {
- mailbox = array_idx(&iter->sync->mailboxes, iter->mailbox_idx);
- mailbox_guid = &mailbox->box.mailbox_guid;
-
- if (array_count(&iter->new_msgs) == 0) {
- /* optimization: don't even bother selecting the
+ if (array_count(&iter->new_msgs) == 0 &&
+ array_count(&iter->uid_conflicts) == 0) {
+ /* optimization: don't even bother selecting this
mailbox */
iter->mailbox_idx++;
continue;
}
+ mailbox = array_idx(&iter->sync->mailboxes, iter->mailbox_idx);
dsync_worker_select_mailbox(iter->worker, &mailbox->box);
+ return TRUE;
+ }
+ dsync_brain_msg_sync_finish(iter);
+ return FALSE;
+}
+static void
+dsync_brain_msg_sync_add_new_msgs(struct dsync_brain_msg_iter *iter)
+{
+ const struct dsync_brain_mailbox *mailbox;
+ const mailbox_guid_t *mailbox_guid;
+
+ do {
+ mailbox = array_idx(&iter->sync->mailboxes, iter->mailbox_idx);
+ mailbox_guid = &mailbox->box.mailbox_guid;
if (dsync_brain_mailbox_add_new_msgs(iter, mailbox_guid)) {
/* continue later */
return;
/* all messages saved for this mailbox. continue with saving
its conflicts and waiting for copies to finish. */
dsync_brain_mailbox_save_conflicts(iter);
-
- while (iter->copy_results_left == 0 &&
- array_count(&iter->copy_retry_indexes) > 0)
- dsync_brain_mailbox_retry_copies(iter, mailbox_guid);
-
if (iter->copy_results_left > 0) {
/* wait for copies to finish */
return;
/* done with this mailbox, try the next one */
iter->mailbox_idx++;
- }
- iter->msgs_sent = TRUE;
-
- /* done with all mailboxes from this iter */
- dsync_worker_set_input_callback(iter->worker, NULL, NULL);
-
- if (iter->sync->src_msg_iter->msgs_sent &&
- iter->sync->dest_msg_iter->msgs_sent &&
- iter->sync->src_msg_iter->save_results_left == 0 &&
- iter->sync->dest_msg_iter->save_results_left == 0 &&
- dsync_worker_output_flush(iter->sync->dest_worker) > 0 &&
- dsync_worker_output_flush(iter->sync->src_worker) > 0) {
- iter->sync->brain->state++;
- dsync_brain_sync(iter->sync->brain);
- }
+ } while (dsync_brain_msg_sync_select_mailbox(iter));
}
static void dsync_worker_new_msg_output(void *context)
{
iter->mailbox_idx = 0;
+ /* sort input by 1) mailbox, 2) new message UID */
array_sort(&iter->new_msgs, dsync_brain_new_msg_cmp);
array_sort(&iter->uid_conflicts, dsync_brain_uid_conflict_cmp);
dsync_worker_set_input_callback(iter->worker, NULL, iter);
dsync_worker_set_output_callback(iter->worker,
dsync_worker_new_msg_output, iter);
- dsync_brain_msg_sync_add_new_msgs(iter);
+
+ if (dsync_brain_msg_sync_select_mailbox(iter))
+ dsync_brain_msg_sync_add_new_msgs(iter);
}
void dsync_brain_msg_sync_new_msgs(struct dsync_brain_mailbox_sync *sync)
dsync_brain_msg_iter_sync_new_msgs(sync->src_msg_iter);
dsync_brain_msg_iter_sync_new_msgs(sync->dest_msg_iter);
}
-
-static void
-sync_iter_resolve_uid_conflicts(struct dsync_brain_msg_iter *iter)
-{
- const struct dsync_brain_uid_conflict *conflicts;
- const struct dsync_brain_mailbox *mailboxes, *mailbox;
- unsigned int i, count, mailbox_count;
-
- mailboxes = array_get(&iter->sync->mailboxes, &mailbox_count);
- conflicts = array_get(&iter->uid_conflicts, &count);
- for (i = 0; i < count; i++) {
- mailbox = &mailboxes[conflicts[i].mailbox_idx];
- dsync_worker_select_mailbox(iter->worker, &mailbox->box);
- dsync_worker_msg_update_uid(iter->worker, conflicts[i].old_uid,
- conflicts[i].new_uid);
- }
-}
-
-void dsync_brain_msg_sync_resolve_uid_conflicts(struct dsync_brain_mailbox_sync *sync)
-{
- sync_iter_resolve_uid_conflicts(sync->src_msg_iter);
- sync_iter_resolve_uid_conflicts(sync->dest_msg_iter);
-}
/* Copyright (c) 2009-2010 Dovecot authors, see the included COPYING file */
+/* This code synchronizes messages in all mailboxes between two workers.
+ The "src" and "dest" terms don't really have anything to do with reality,
+ they're both treated equal.
+
+ 1. Iterate through all messages in all (wanted) mailboxes. The mailboxes
+ are iterated in the same order and messages in ascending order.
+ All of the expunged messages at the end of mailbox (i.e.
+ last_existing_uid+1 .. next_uid-1) are also returned with
+ DSYNC_MAIL_FLAG_EXPUNGED set. We only care about the end of the mailbox,
+ because we can detect UID conflicts for messages in the middle by looking
+ at the next existing message and seeing if it has UID conflict.
+ 2. For each seen non-expunged message, save it to GUID instance hash table:
+ message GUID => linked list of { uid, mailbox }
+ 3. Each message in a mailbox is matched between the two workers as long as
+ both have messages left (the last ones may be expunged).
+ The possibilities are:
+
+ i) We don't know the GUIDs of both messages:
+
+ a) Message is expunged in both. Do nothing.
+ b) Message is expunged in only one of them. If there have been no UID
+ conflicts seen so far, expunge the message in the other one.
+ Otherwise, give the existing a message a new UID (at step 6).
+
+ ii) We know GUIDs of both messages (one/both of them may be expunged):
+
+ a) Messages have conflicting GUIDs. Give new UIDs for the non-expunged
+ message(s) (at step 6).
+ b) Messages have matching GUIDs and one of them is expunged.
+ Expunge also the other one. (We don't need to care about previous
+ UID conflicts here, because we know this message is the same with
+ both workers, since they have the same GUID.)
+ c) Messages have matching GUIDs and both of them exist. Sync flags from
+ whichever has the higher modseq. If both modseqs equal but flags
+ don't, pick the one that has more flags. If even the flag count is
+ the same, just pick one of them.
+ 4. One of the workers may messages left in the mailbox. Copy these
+ (non-expunged) messages to the other worker (at step 6).
+ 5. If there are more mailboxes left, go to next one and goto 2.
+
+ 6. Copy the new messages and give new UIDs to conflicting messages.
+ This code exists in dsync-brain-msgs-new.c
+*/
+
#include "lib.h"
#include "array.h"
#include "hash.h"
new_msg->msg->uid = new_uid;
}
+static int
+dsync_message_flag_importance_cmp(const struct dsync_message *m1,
+ const struct dsync_message *m2)
+{
+ unsigned int i, count1, count2;
+
+ if (m1->modseq > m2->modseq)
+ return -1;
+ else if (m1->modseq < m2->modseq)
+ return 1;
+
+ if (m1->flags == m2->flags &&
+ dsync_keyword_list_equals(m1->keywords, m2->keywords))
+ return 0;
+
+ /* modseqs match, but flags aren't the same. pick the one that
+ has more flags. */
+ count1 = str_array_length(m1->keywords);
+ count2 = str_array_length(m2->keywords);
+ for (i = 1; i != MAIL_RECENT; i <<= 1) {
+ if ((m1->flags & i) != 0)
+ count1++;
+ if ((m2->flags & i) != 0)
+ count2++;
+ }
+ if (count1 > count2)
+ return -1;
+ else if (count1 < count2)
+ return 1;
+
+ /* they even have the same number of flags. don't bother with further
+ guessing, just pick the first one. */
+ return -1;
+}
+
static void dsync_brain_msg_sync_existing(struct dsync_brain_mailbox_sync *sync,
struct dsync_message *src_msg,
struct dsync_message *dest_msg)
{
- if (src_msg->modseq > dest_msg->modseq)
+ int ret;
+
+ ret = dsync_message_flag_importance_cmp(src_msg, dest_msg);
+ if (ret < 0)
dsync_worker_msg_update_metadata(sync->dest_worker, src_msg);
- else if (src_msg->modseq < dest_msg->modseq)
+ else if (ret > 0)
dsync_worker_msg_update_metadata(sync->src_worker, dest_msg);
- else if (src_msg->flags != dest_msg->flags ||
- !dsync_keyword_list_equals(src_msg->keywords,
- dest_msg->keywords)) {
- /* modseqs match, but flags aren't the same. we can't really
- know which one we should use, so just pick one. */
- dsync_worker_msg_update_metadata(sync->dest_worker, src_msg);
- }
}
static int dsync_brain_msg_sync_pair(struct dsync_brain_mailbox_sync *sync)
src_expunged = (src_msg->flags & DSYNC_MAIL_FLAG_EXPUNGED) != 0;
dest_expunged = (dest_msg->flags & DSYNC_MAIL_FLAG_EXPUNGED) != 0;
+ /* If a message is expunged, it's guaranteed to have a 128bit GUID.
+ If the other message isn't expunged, we'll need to convert its GUID
+ to the 128bit GUID form (if it's not already) so that we can compare
+ them. */
if (src_expunged) {
src_guid = src_msg->guid;
dest_guid = dsync_get_guid_128_str(dest_msg->guid,
dest_guid = dest_msg->guid;
}
+ /* FIXME: checking for sync->uid_conflict isn't fully reliable here.
+ we should be checking if the next matching message pair has a
+ conflict, not if the previous pair had one. */
if (src_msg->uid < dest_msg->uid) {
/* message has been expunged from dest. */
if (src_expunged) {
return TRUE;
}
-static void dsync_brain_msg_sync_finish(struct dsync_brain_mailbox_sync *sync)
-{
- /* synced all existing messages. now add the new messages. */
- if (dsync_worker_msg_iter_deinit(&sync->src_msg_iter->iter) < 0 ||
- dsync_worker_msg_iter_deinit(&sync->dest_msg_iter->iter) < 0)
- dsync_brain_fail(sync->brain);
-
- dsync_brain_msg_sync_new_msgs(sync);
-}
-
void dsync_brain_msg_sync_more(struct dsync_brain_mailbox_sync *sync)
{
const struct dsync_brain_mailbox *mailboxes;
- unsigned int count, mailbox_idx;
+ unsigned int count, mailbox_idx = 0;
mailboxes = array_get(&sync->mailboxes, &count);
while (dsync_brain_msg_sync_mailbox_more(sync)) {
/* sync the next mailbox */
sync->uid_conflict = FALSE;
mailbox_idx = ++sync->wanted_mailbox_idx;
- if (mailbox_idx >= count) {
- dsync_brain_msg_sync_finish(sync);
- return;
- }
+ if (mailbox_idx >= count)
+ break;
+
dsync_worker_select_mailbox(sync->src_worker,
&mailboxes[mailbox_idx].box);
dsync_worker_select_mailbox(sync->dest_worker,
&mailboxes[mailbox_idx].box);
}
+ if (mailbox_idx < count) {
+ /* output buffer is full */
+ return;
+ }
+
+ /* finished with all mailboxes. */
+ if (dsync_worker_msg_iter_deinit(&sync->src_msg_iter->iter) < 0 ||
+ dsync_worker_msg_iter_deinit(&sync->dest_msg_iter->iter) < 0) {
+ dsync_brain_fail(sync->brain);
+ return;
+ }
+
+ dsync_brain_msg_sync_new_msgs(sync);
}
static void dsync_worker_msg_callback(void *context)
iter->worker = worker;
i_array_init(&iter->uid_conflicts, 128);
i_array_init(&iter->new_msgs, 128);
- i_array_init(&iter->copy_retry_indexes, 32);
iter->guid_hash = hash_table_create(default_pool, sync->pool, 10000,
strcase_hash,
(hash_cmp_callback_t *)strcasecmp);
hash_table_destroy(&iter->guid_hash);
array_free(&iter->uid_conflicts);
array_free(&iter->new_msgs);
- array_free(&iter->copy_retry_indexes);
}
static void