replicator_queue_get(queue, usermask);
if (full)
user->force_full_sync = TRUE;
+ e_debug(client->conn.event, "user %s: doveadm REPLICATE command (priority=%d full=%c)",
+ user->username, priority, full ? 'y' : 'n');
replicator_queue_update(queue, user, priority);
replicator_queue_add(queue, user);
o_stream_nsend_str(client->conn.output, "+1\n");
continue;
if (full)
user->force_full_sync = TRUE;
+ e_debug(client->conn.event, "user %s: doveadm REPLICATE command (priority=%d full=%c)",
+ user->username, priority, full ? 'y' : 'n');
replicator_queue_update(queue, user, priority);
replicator_queue_add(queue, user);
match_count++;
if (strchr(args[0], '*') == NULL && strchr(args[0], '?') == NULL) {
struct replicator_user *user =
replicator_queue_get(queue, args[0]);
+ e_debug(client->conn.event, "user %s: doveadm ADD command",
+ user->username);
replicator_queue_add(queue, user);
} else {
+ e_debug(client->conn.event, "doveadm ADD command: Add usermask '%s'",
+ args[0]);
replicator_queue_add_auth_users(queue, set->auth_socket_path,
args[0], ioloop_time);
}
return -1;
}
+ bool full = args[1][0] == 'f';
user = replicator_queue_get(queue, args[0]);
- if (args[1][0] == 'f')
+ if (full)
user->last_full_sync = ioloop_time;
user->last_fast_sync = ioloop_time;
user->last_update = ioloop_time;
+ e_debug(client->conn.event, "user %s: doveadm NOTIFY command (full=%c)",
+ user->username, full ? 'y' : 'n');
replicator_queue_add(queue, user);
if (args[2][0] != '\0') {
if (priority != REPLICATION_PRIORITY_SYNC) {
struct replicator_user *user =
replicator_queue_get(conn->queue, args[1]);
+ e_debug(conn->event, "user %s: notification from client (priority=%d)",
+ user->username, priority);
replicator_queue_update(conn->queue, user, priority);
replicator_queue_add(conn->queue, user);
} else if (args[3] == NULL || str_to_uint(args[3], &id) < 0) {
notify_connection_ref(conn);
struct replicator_user *user =
replicator_queue_get(conn->queue, args[1]);
+ e_debug(conn->event, "user %s: sync notification from client",
+ user->username);
replicator_queue_update(conn->queue, user,
REPLICATION_PRIORITY_SYNC);
replicator_queue_add_sync_callback(conn->queue, user,
struct replicator_sync_context {
struct replicator_brain *brain;
struct replicator_user *user;
+ struct event *event;
};
struct replicator_brain {
struct replicator_queue *queue;
const struct replicator_settings *set;
struct timeout *to;
+ struct event *event;
ARRAY_TYPE(dsync_client) dsync_clients;
static void replicator_brain_timeout(struct replicator_brain *brain)
{
+ e_debug(brain->event, "Delayed handling of changed queue");
+
timeout_remove(&brain->to);
replicator_brain_fill(brain);
}
brain->pool = pool;
brain->queue = queue;
brain->set = set;
+ brain->event = event_create(NULL);
+ event_add_category(brain->event, &event_category_replication);
+
p_array_init(&brain->dsync_clients, pool, 16);
replicator_queue_set_change_callback(queue,
replicator_brain_queue_changed, brain);
array_foreach_elem(&brain->dsync_clients, conn)
dsync_client_deinit(&conn);
timeout_remove(&brain->to);
+ event_unref(&brain->event);
pool_unref(&brain->pool);
}
struct replicator_user *user = ctx->user;
if (!replicator_user_unref(&user)) {
+ e_debug(ctx->event, "User was already removed");
/* user was already removed */
} else if (reply == DSYNC_REPLY_NOUSER ||
reply == DSYNC_REPLY_NOREPLICATE) {
/* user no longer exists, or is not wanted for replication,
remove from replication */
+ if (reply == DSYNC_REPLY_NOUSER) {
+ e_debug(ctx->event, "User does not exist");
+ } else {
+ e_debug(ctx->event, "User has 'noreplicate' flag and "
+ "will not be replicated");
+ }
replicator_queue_remove(ctx->brain->queue, &ctx->user);
} else {
i_free(ctx->user->state);
ctx->user->state = i_strdup_empty(state);
ctx->user->last_sync_failed = reply != DSYNC_REPLY_OK;
- if (reply == DSYNC_REPLY_OK)
+ if (reply == DSYNC_REPLY_OK) {
+ e_debug(ctx->event, "User was successfully synced");
ctx->user->last_successful_sync = ioloop_time;
+ } else {
+ e_debug(ctx->event, "User sync failed");
+ }
replicator_queue_push(ctx->brain->queue, ctx->user);
}
+ event_unref(&ctx->event);
+
if (!ctx->brain->deinitializing)
replicator_brain_fill(ctx->brain);
i_free(ctx);
struct dsync_client *conn;
time_t next_full_sync;
bool full;
+ struct event *event = event_create(brain->event);
+ event_set_append_log_prefix(event, t_strdup_printf(
+ "%s: ", user->username));
+ event_add_str(event, "user", user->username);
conn = get_dsync_client(brain);
- if (conn == NULL)
+ if (conn == NULL) {
+ e_debug(event, "Delay replication - dsync queue is full");
+ event_unref(&event);
return FALSE;
+ }
next_full_sync = user->last_full_sync +
brain->set->replication_full_sync_interval;
ctx = i_new(struct replicator_sync_context, 1);
ctx->brain = brain;
ctx->user = user;
+ ctx->event = event;
+
+ e_debug(ctx->event, "Starting %s replication",
+ full ? "full" : "incremental");
+
replicator_user_ref(user);
dsync_client_sync(conn, user->username, user->state, full,
dsync_callback, ctx);
user = replicator_queue_pop(brain->queue, &next_secs);
if (user == NULL) {
+ e_debug(brain->event, "Got no user from queue, waiting for %u seconds",
+ next_secs);
/* nothing more to do */
timeout_remove(&brain->to);
brain->to = timeout_add(next_secs * 1000,
if (!dsync_replicate(brain, user)) {
/* all connections were full, put the user back to queue */
+ e_debug(brain->event, "Could not replicate %s - pushing back to queue",
+ user->username);
replicator_queue_push(brain->queue, user);
return FALSE;
}
struct replicator_user *user;
const char *username;
+ e_debug(queue->event, "Add users from userdb with usermask '%s'",
+ usermask);
+
auth_conn = auth_master_init(auth_socket_path,
AUTH_MASTER_FLAG_NO_IDLE_TIMEOUT);
user = replicator_queue_lookup(queue, username);
if (user == NULL) {
+ e_debug(queue->event, "user %s: User not found from queue - adding", username);
user = i_new(struct replicator_user, 1);
user->refcount = 1;
user->username = i_strdup(username);
return user;
}
-void replicator_queue_update(struct replicator_queue *queue ATTR_UNUSED,
+void replicator_queue_update(struct replicator_queue *queue,
struct replicator_user *user,
enum replication_priority priority)
{
if (user->priority >= priority) {
/* user already has at least this high priority */
+ e_debug(queue->event, "user %s: Ignoring priority %u update, "
+ "since user already has priority=%u",
+ user->username, priority, user->priority);
return;
}
+ e_debug(queue->event, "user %s: Updating priority %u -> %u",
+ user->username, user->priority, priority);
user->priority = priority;
user->last_update = ioloop_time;
}
priorityq_remove(queue->user_queue, &user->item);
priorityq_add(queue->user_queue, &user->item);
}
- if (queue->change_callback != NULL)
+ if (queue->change_callback != NULL) {
+ e_debug(queue->event, "user %s: Queue changed - calling callback",
+ user->username);
queue->change_callback(queue->change_context);
+ }
}
void replicator_queue_add_sync_callback(struct replicator_queue *queue,
struct replicator_user *user = *_user;
*_user = NULL;
+ e_debug(queue->event, "user %s: Removing user from queue", user->username);
if (!user->popped)
priorityq_remove(queue->user_queue, &user->item);
hash_table_remove(queue->user_hash, user->username);
replicator_user_unref(&user);
- if (queue->change_callback != NULL)
+ if (queue->change_callback != NULL) {
+ e_debug(queue->event, "user %s: Queue changed - calling callback",
+ user->username);
queue->change_callback(queue->change_context);
+ }
}
unsigned int replicator_queue_count(struct replicator_queue *queue)
}
}
+ e_debug(queue->event, "user %s: Handled sync lookups", user->username);
+
array_foreach_modifiable(&callbacks, lookups)
lookups->callback(success, lookups->context);
}
const char *line;
int fd, ret = 0;
+ e_debug(queue->event, "Importing queue from %s", path);
+
fd = open(path, O_RDONLY);
if (fd == -1) {
if (errno == ENOENT)