From: Timo Sirainen Date: Tue, 25 Oct 2022 09:49:03 +0000 (+0300) Subject: replicator: Fix user placement in replication queue X-Git-Tag: 2.3.20~28 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=278d4a08f24abb03e88b6b85ccd946b6dd7ba738;p=thirdparty%2Fdovecot%2Fcore.git replicator: Fix user placement in replication queue Especially replicator queue importing and NOTIFY command could have placed the user to wrong place in the queue, because they modified the last sync timestamps afterwards. Fixed by splitting replicator_queue_add() into get/update/add(), so all the necessary changes can be done before replicator_queue_add() is used to place the user into the queue. --- diff --git a/src/replication/replicator/doveadm-connection.c b/src/replication/replicator/doveadm-connection.c index f1acba62e7..e3aeb1b01a 100644 --- a/src/replication/replicator/doveadm-connection.c +++ b/src/replication/replicator/doveadm-connection.c @@ -172,9 +172,12 @@ client_input_replicate(struct doveadm_connection *client, const char *const *arg full = strchr(args[1], 'f') != NULL; usermask = args[2]; if (strchr(usermask, '*') == NULL && strchr(usermask, '?') == NULL) { - user = replicator_queue_add(queue, usermask, priority); + struct replicator_user *user = + replicator_queue_get(queue, usermask); if (full) user->force_full_sync = TRUE; + replicator_queue_update(queue, user, priority); + replicator_queue_add(queue, user); o_stream_nsend_str(client->conn.output, "+1\n"); return 0; } @@ -184,9 +187,10 @@ client_input_replicate(struct doveadm_connection *client, const char *const *arg while ((user = replicator_queue_iter_next(iter)) != NULL) { if (!wildcard_match(user->username, usermask)) continue; - user = replicator_queue_add(queue, user->username, priority); if (full) user->force_full_sync = TRUE; + replicator_queue_update(queue, user, priority); + replicator_queue_add(queue, user); match_count++; } replicator_queue_iter_deinit(&iter); @@ -210,8 +214,9 @@ client_input_add(struct doveadm_connection *client, const char *const *args) } if (strchr(args[0], '*') == NULL && strchr(args[0], '?') == NULL) { - (void)replicator_queue_add(queue, args[0], - REPLICATION_PRIORITY_NONE); + struct replicator_user *user = + replicator_queue_get(queue, args[0]); + replicator_queue_add(queue, user); } else { replicator_queue_add_auth_users(queue, set->auth_socket_path, args[0], ioloop_time); @@ -255,11 +260,12 @@ client_input_notify(struct doveadm_connection *client, const char *const *args) return -1; } - user = replicator_queue_add(queue, args[0], REPLICATION_PRIORITY_NONE); + user = replicator_queue_get(queue, args[0]); if (args[1][0] == 'f') user->last_full_sync = ioloop_time; user->last_fast_sync = ioloop_time; user->last_update = ioloop_time; + replicator_queue_add(queue, user); if (args[2][0] != '\0') { i_free(user->state); diff --git a/src/replication/replicator/notify-connection.c b/src/replication/replicator/notify-connection.c index cfc2e20c09..ecf7639137 100644 --- a/src/replication/replicator/notify-connection.c +++ b/src/replication/replicator/notify-connection.c @@ -72,9 +72,12 @@ notify_connection_input_line(struct notify_connection *conn, const char *line) i_error("notify client sent invalid priority: %s", args[2]); return -1; } - if (priority != REPLICATION_PRIORITY_SYNC) - (void)replicator_queue_add(conn->queue, args[1], priority); - else if (args[3] == NULL || str_to_uint(args[3], &id) < 0) { + if (priority != REPLICATION_PRIORITY_SYNC) { + struct replicator_user *user = + replicator_queue_get(conn->queue, args[1]); + replicator_queue_update(conn->queue, user, priority); + replicator_queue_add(conn->queue, user); + } else if (args[3] == NULL || str_to_uint(args[3], &id) < 0) { i_error("notify client sent invalid sync id: %s", line); return -1; } else { diff --git a/src/replication/replicator/replicator-queue-auth.c b/src/replication/replicator/replicator-queue-auth.c index a71e9428c5..eedb814071 100644 --- a/src/replication/replicator/replicator-queue-auth.c +++ b/src/replication/replicator/replicator-queue-auth.c @@ -26,8 +26,9 @@ void replicator_queue_add_auth_users(struct replicator_queue *queue, full syncs for everyone whose state can't be found */ ctx = auth_master_user_list_init(auth_conn, usermask, &user_info); while ((username = auth_master_user_list_next(ctx)) != NULL) { - user = replicator_queue_add(queue, username, - REPLICATION_PRIORITY_NONE); + user = replicator_queue_get(queue, username); + replicator_queue_update(queue, user, REPLICATION_PRIORITY_NONE); + replicator_queue_add(queue, user); user->last_update = last_update; } if (auth_master_user_list_deinit(&ctx) < 0) diff --git a/src/replication/replicator/replicator-queue.c b/src/replication/replicator/replicator-queue.c index 430716a225..549cd36c5b 100644 --- a/src/replication/replicator/replicator-queue.c +++ b/src/replication/replicator/replicator-queue.c @@ -152,9 +152,8 @@ replicator_queue_lookup(struct replicator_queue *queue, const char *username) return hash_table_lookup(queue->user_hash, username); } -static struct replicator_user * -replicator_queue_add_int(struct replicator_queue *queue, const char *username, - enum replication_priority priority) +struct replicator_user * +replicator_queue_get(struct replicator_queue *queue, const char *username) { struct replicator_user *user; @@ -163,33 +162,35 @@ replicator_queue_add_int(struct replicator_queue *queue, const char *username, user = i_new(struct replicator_user, 1); user->refcount = 1; user->username = i_strdup(username); + user->last_update = ioloop_time; hash_table_insert(queue->user_hash, user->username, user); - } else { - if (user->priority > priority) { - /* user already has a higher priority than this */ - return user; - } if (!user->popped) - priorityq_remove(queue->user_queue, &user->item); + priorityq_add(queue->user_queue, &user->item); } - user->priority = priority; - user->last_update = ioloop_time; - - if (!user->popped) - priorityq_add(queue->user_queue, &user->item); return user; } -struct replicator_user * -replicator_queue_add(struct replicator_queue *queue, const char *username, - enum replication_priority priority) +void replicator_queue_update(struct replicator_queue *queue ATTR_UNUSED, + struct replicator_user *user, + enum replication_priority priority) { - struct replicator_user *user; + if (user->priority > priority) { + /* user already has a higher priority than this */ + return; + } + user->priority = priority; + user->last_update = ioloop_time; +} - user = replicator_queue_add_int(queue, username, priority); +void replicator_queue_add(struct replicator_queue *queue, + struct replicator_user *user) +{ + if (!user->popped) { + priorityq_remove(queue->user_queue, &user->item); + priorityq_add(queue->user_queue, &user->item); + } if (queue->change_callback != NULL) queue->change_callback(queue->change_context); - return user; } void replicator_queue_add_sync(struct replicator_queue *queue, @@ -200,8 +201,8 @@ void replicator_queue_add_sync(struct replicator_queue *queue, struct replicator_user *user; struct replicator_sync_lookup *lookup; - user = replicator_queue_add_int(queue, username, - REPLICATION_PRIORITY_SYNC); + user = replicator_queue_get(queue, username); + replicator_queue_update(queue, user, REPLICATION_PRIORITY_SYNC); lookup = array_append_space(&queue->sync_lookups); lookup->user = user; @@ -209,8 +210,7 @@ void replicator_queue_add_sync(struct replicator_queue *queue, lookup->context = context; lookup->wait_for_next_push = user->popped; - if (queue->change_callback != NULL) - queue->change_callback(queue->change_context); + replicator_queue_add(queue, user); } void replicator_queue_remove(struct replicator_queue *queue, @@ -362,8 +362,7 @@ replicator_queue_import_line(struct replicator_queue *queue, const char *line) return 0; } } - user = replicator_queue_add(queue, username, - tmp_user.priority); + user->priority = tmp_user.priority; user->last_update = tmp_user.last_update; user->last_fast_sync = tmp_user.last_fast_sync; user->last_full_sync = tmp_user.last_full_sync; @@ -371,6 +370,7 @@ replicator_queue_import_line(struct replicator_queue *queue, const char *line) user->last_sync_failed = tmp_user.last_sync_failed; i_free(user->state); user->state = i_strdup(state); + replicator_queue_add(queue, user); return 0; } diff --git a/src/replication/replicator/replicator-queue.h b/src/replication/replicator/replicator-queue.h index a298a5d9ed..81858316e7 100644 --- a/src/replication/replicator/replicator-queue.h +++ b/src/replication/replicator/replicator-queue.h @@ -46,11 +46,16 @@ bool replicator_user_unref(struct replicator_user **user); /* Lookup an existing user */ struct replicator_user * replicator_queue_lookup(struct replicator_queue *queue, const char *username); -/* Add a user to queue and return it. If the user already exists, it's updated - only if the new priority is higher. */ +/* Lookup or create a user and return it. Afterwards replicator_queue_add() + must be called to add/move the user to the proper place in the queue. */ struct replicator_user * -replicator_queue_add(struct replicator_queue *queue, const char *username, - enum replication_priority priority); +replicator_queue_get(struct replicator_queue *queue, const char *username); +/* Update user's priority, if it isn't already higher. */ +void replicator_queue_update(struct replicator_queue *queue, + struct replicator_user *user, + enum replication_priority priority); +void replicator_queue_add(struct replicator_queue *queue, + struct replicator_user *user); void replicator_queue_add_sync(struct replicator_queue *queue, const char *username, replicator_sync_callback_t *callback,