const char *error;
unsigned int lock_timeout;
+ unsigned int import_commit_msgs_interval;
bool lock:1;
bool purge_remote:1;
set.virtual_all_box = ctx->virtual_all_box;
memcpy(set.sync_box_guid, ctx->mailbox_guid, sizeof(set.sync_box_guid));
set.lock_timeout_secs = ctx->lock_timeout;
+ set.import_commit_msgs_interval = ctx->import_commit_msgs_interval;
set.state = ctx->state_input;
set.mailbox_alt_char = doveadm_settings->dsync_alt_char[0];
p_array_init(&ctx->namespace_prefixes, ctx->ctx.pool, 4);
if ((doveadm_settings->parsed_features & DSYNC_FEATURE_EMPTY_HDR_WORKAROUND) != 0)
ctx->empty_hdr_workaround = TRUE;
+ ctx->import_commit_msgs_interval = doveadm_settings->dsync_commit_msgs_interval;
return &ctx->ctx;
}
DEF(SET_STR, director_username_hash),
DEF(SET_STR, doveadm_api_key),
DEF(SET_STR, dsync_features),
+ DEF(SET_UINT, dsync_commit_msgs_interval),
DEF(SET_STR, doveadm_http_rawlog_dir),
{ SET_STRLIST, "plugin", offsetof(struct doveadm_settings, plugin_envs), NULL },
.dsync_alt_char = "_",
.dsync_remote_cmd = "ssh -l%{login} %{host} doveadm dsync-server -u%u -U",
.dsync_features = "",
+ .dsync_commit_msgs_interval = 100,
.ssl_client_ca_dir = "",
.ssl_client_ca_file = "",
.director_username_hash = "%Lu",
const char *director_username_hash;
const char *doveadm_api_key;
const char *dsync_features;
+ unsigned int dsync_commit_msgs_interval;
const char *doveadm_http_rawlog_dir;
enum dsync_features parsed_features;
ARRAY(const char *) plugin_envs;
brain->sync_until_timestamp,
brain->sync_max_size,
brain->sync_flag,
+ brain->import_commit_msgs_interval,
import_flags);
}
uoff_t sync_max_size;
const char *sync_flag;
char alt_char;
+ unsigned int import_commit_msgs_interval;
unsigned int lock_timeout;
int lock_fd;
memcpy(brain->sync_box_guid, set->sync_box_guid,
sizeof(brain->sync_box_guid));
brain->lock_timeout = set->lock_timeout_secs;
+ brain->import_commit_msgs_interval = set->import_commit_msgs_interval;
brain->master_brain = TRUE;
dsync_brain_set_flags(brain, flags);
ibc_set.sync_type = sync_type;
ibc_set.hdr_hash_v2 = TRUE;
ibc_set.lock_timeout = set->lock_timeout_secs;
+ ibc_set.import_commit_msgs_interval = set->import_commit_msgs_interval;
/* reverse the backup direction for the slave */
ibc_set.brain_flags = flags & ~(DSYNC_BRAIN_FLAG_BACKUP_SEND |
DSYNC_BRAIN_FLAG_BACKUP_RECV);
/* If non-zero, use dsync lock file for this user */
unsigned int lock_timeout_secs;
+ /* If non-zero, importing will attempt to commit transaction after
+ saving this many messages. */
+ unsigned int import_commit_msgs_interval;
/* Input state for DSYNC_BRAIN_SYNC_TYPE_STATE */
const char *state;
};
"send_mail_requests backup_send backup_recv lock_timeout "
"no_mail_sync no_mailbox_renames no_backup_overwrite purge_remote "
"no_notify sync_since_timestamp sync_max_size sync_flags sync_until_timestamp"
- "virtual_all_box empty_hdr_workaround"
+ "virtual_all_box empty_hdr_workaround import_commit_msgs_interval"
},
{ .name = "mailbox_state",
.chr = 'S',
dsync_serializer_encode_add(encoder, "lock_timeout",
t_strdup_printf("%u", set->lock_timeout));
}
+ if (set->import_commit_msgs_interval > 0) {
+ dsync_serializer_encode_add(encoder, "import_commit_msgs_interval",
+ t_strdup_printf("%u", set->import_commit_msgs_interval));
+ }
if (set->sync_since_timestamp > 0) {
dsync_serializer_encode_add(encoder, "sync_since_timestamp",
t_strdup_printf("%ld", (long)set->sync_since_timestamp));
return DSYNC_IBC_RECV_RET_TRYAGAIN;
}
}
+ if (dsync_deserializer_decode_try(decoder, "import_commit_msgs_interval", &value)) {
+ if (str_to_uint(value, &set->import_commit_msgs_interval) < 0 ||
+ set->import_commit_msgs_interval == 0) {
+ dsync_ibc_input_error(ibc, decoder,
+ "Invalid import_commit_msgs_interval: %s", value);
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+ }
+ }
if (dsync_deserializer_decode_try(decoder, "sync_since_timestamp", &value)) {
if (str_to_time(value, &set->sync_since_timestamp) < 0 ||
set->sync_since_timestamp == 0) {
enum dsync_brain_flags brain_flags;
bool hdr_hash_v2;
unsigned int lock_timeout;
+ unsigned int import_commit_msgs_interval;
};
void dsync_ibc_init_pipe(struct dsync_ibc **ibc1_r,
bool skip:1;
bool expunged:1;
bool copy_failed:1;
+ bool saved:1;
};
/* for quickly testing that two-way sync doesn't actually do any unexpected
uoff_t sync_max_size;
enum mailbox_transaction_flags transaction_flags;
unsigned int hdr_hash_version;
+ unsigned int commit_msgs_interval;
enum mail_flags sync_flag;
const char *sync_keyword;
uint32_t prev_uid, next_local_seq, local_uid_next;
uint64_t local_initial_highestmodseq, local_initial_highestpvtmodseq;
unsigned int import_pos, import_count;
+ unsigned int first_unsaved_idx, saves_since_commit;
enum mail_error mail_error;
time_t sync_until_timestamp,
uoff_t sync_max_size,
const char *sync_flag,
+ unsigned int commit_msgs_interval,
enum dsync_mailbox_import_flags flags)
{
struct dsync_mailbox_importer *importer;
else
importer->sync_keyword = p_strdup(pool, sync_flag);
}
+ importer->commit_msgs_interval = commit_msgs_interval;
importer->transaction_flags = MAILBOX_TRANSACTION_FLAG_SYNC;
if ((flags & DSYNC_MAILBOX_IMPORT_FLAG_NO_NOTIFY) != 0)
importer->transaction_flags |= MAILBOX_TRANSACTION_FLAG_NO_NOTIFY;
return importer->failed ? -1 : 0;
}
+static int
+importer_new_mail_final_uid_cmp(struct importer_new_mail *const *newmail1,
+ struct importer_new_mail *const *newmail2)
+{
+ if ((*newmail1)->final_uid < (*newmail2)->final_uid)
+ return -1;
+ if ((*newmail1)->final_uid > (*newmail2)->final_uid)
+ return 1;
+ return 0;
+}
+
static void
dsync_mailbox_import_assign_new_uids(struct dsync_mailbox_importer *importer)
{
newmail = *newmailp;
if (newmail->skip) {
/* already assigned */
+ i_assert(newmail->final_uid != 0);
continue;
}
}
importer->last_common_uid = common_uid_next-1;
importer->new_uids_assigned = TRUE;
+ /* Sort the newmails by their final_uid. This is used for tracking
+ whether an intermediate commit is allowed. */
+ array_sort(&importer->newmails, importer_new_mail_final_uid_cmp);
}
static int
array_append(&importer->wanted_uids, &uid, 1);
}
+static void
+dsync_mailbox_import_update_first_saved(struct dsync_mailbox_importer *importer)
+{
+ struct importer_new_mail *const *newmails;
+ unsigned int count;
+
+ newmails = array_get(&importer->newmails, &count);
+ while (importer->first_unsaved_idx < count) {
+ if (!newmails[importer->first_unsaved_idx]->saved)
+ break;
+ importer->first_unsaved_idx++;
+ }
+}
+
+static void
+dsync_mailbox_import_saved_newmail(struct dsync_mailbox_importer *importer,
+ struct importer_new_mail *newmail)
+{
+ dsync_mailbox_import_saved_uid(importer, newmail->final_uid);
+ newmail->saved = TRUE;
+
+ dsync_mailbox_import_update_first_saved(importer);
+ importer->saves_since_commit++;
+ /* we can commit only if all the upcoming mails will have UIDs that
+ are larger than we're committing.
+
+ Note that if any existing UIDs have been changed, the new UID is
+ usually higher than anything that is being saved so we can't do
+ an intermediate commit. It's too much extra work to try to handle
+ that situation. So here this never happens, because then
+ array_count(wanted_uids) is always higher than first_unsaved_idx. */
+ if (importer->saves_since_commit >= importer->commit_msgs_interval &&
+ importer->first_unsaved_idx == array_count(&importer->wanted_uids)) {
+ if (dsync_mailbox_import_commit(importer, FALSE) < 0)
+ importer->failed = TRUE;
+ importer->saves_since_commit = 0;
+ }
+}
+
static bool
dsync_msg_change_uid(struct dsync_mailbox_importer *importer,
uint32_t old_uid, uint32_t new_uid)
}
if (ret > 0) {
i_assert(save_ctx == NULL);
- dsync_mailbox_import_saved_uid(importer, newmail->final_uid);
+ dsync_mailbox_import_saved_newmail(importer, newmail);
return TRUE;
}
/* fallback to saving from remote stream */
&importer->mail_error));
importer->failed = TRUE;
} else {
- dsync_mailbox_import_saved_uid(importer,
- newmail->final_uid);
+ dsync_mailbox_import_saved_newmail(importer, newmail);
}
}
return TRUE;
time_t sync_until_timestamp,
uoff_t sync_max_size,
const char *sync_flag,
+ unsigned int commit_msgs_interval,
enum dsync_mailbox_import_flags flags);
int dsync_mailbox_import_attribute(struct dsync_mailbox_importer *importer,
const struct dsync_mailbox_attribute *attr);