]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
replicator: Fix user placement in replication queue
authorTimo Sirainen <timo.sirainen@open-xchange.com>
Tue, 25 Oct 2022 09:49:03 +0000 (12:49 +0300)
committerMartti Rannanjärvi <martti.rannanjarvi@open-xchange.com>
Wed, 2 Nov 2022 13:50:15 +0000 (15:50 +0200)
Especially replicator queue importing and NOTIFY command could have
placed the user to wrong place in the queue, because they modified the
last sync timestamps afterwards.

Fixed by splitting replicator_queue_add() into get/update/add(), so all
the necessary changes can be done before replicator_queue_add() is used
to place the user into the queue.

src/replication/replicator/doveadm-connection.c
src/replication/replicator/notify-connection.c
src/replication/replicator/replicator-queue-auth.c
src/replication/replicator/replicator-queue.c
src/replication/replicator/replicator-queue.h

index f1acba62e764d013915670c261a4cc5f9e77f839..e3aeb1b01ae70024949e0522c2fcdb5cd9eac943 100644 (file)
@@ -172,9 +172,12 @@ client_input_replicate(struct doveadm_connection *client, const char *const *arg
        full = strchr(args[1], 'f') != NULL;
        usermask = args[2];
        if (strchr(usermask, '*') == NULL && strchr(usermask, '?') == NULL) {
-               user = replicator_queue_add(queue, usermask, priority);
+               struct replicator_user *user =
+                       replicator_queue_get(queue, usermask);
                if (full)
                        user->force_full_sync = TRUE;
+               replicator_queue_update(queue, user, priority);
+               replicator_queue_add(queue, user);
                o_stream_nsend_str(client->conn.output, "+1\n");
                return 0;
        }
@@ -184,9 +187,10 @@ client_input_replicate(struct doveadm_connection *client, const char *const *arg
        while ((user = replicator_queue_iter_next(iter)) != NULL) {
                if (!wildcard_match(user->username, usermask))
                        continue;
-               user = replicator_queue_add(queue, user->username, priority);
                if (full)
                        user->force_full_sync = TRUE;
+               replicator_queue_update(queue, user, priority);
+               replicator_queue_add(queue, user);
                match_count++;
        }
        replicator_queue_iter_deinit(&iter);
@@ -210,8 +214,9 @@ client_input_add(struct doveadm_connection *client, const char *const *args)
        }
 
        if (strchr(args[0], '*') == NULL && strchr(args[0], '?') == NULL) {
-               (void)replicator_queue_add(queue, args[0],
-                                          REPLICATION_PRIORITY_NONE);
+               struct replicator_user *user =
+                       replicator_queue_get(queue, args[0]);
+               replicator_queue_add(queue, user);
        } else {
                replicator_queue_add_auth_users(queue, set->auth_socket_path,
                                                args[0], ioloop_time);
@@ -255,11 +260,12 @@ client_input_notify(struct doveadm_connection *client, const char *const *args)
                return -1;
        }
 
-       user = replicator_queue_add(queue, args[0], REPLICATION_PRIORITY_NONE);
+       user = replicator_queue_get(queue, args[0]);
        if (args[1][0] == 'f')
                user->last_full_sync = ioloop_time;
        user->last_fast_sync = ioloop_time;
        user->last_update = ioloop_time;
+       replicator_queue_add(queue, user);
 
        if (args[2][0] != '\0') {
                i_free(user->state);
index cfc2e20c091e5c9579d4810f2238259773dab0e5..ecf7639137bf4b9c62dc0253799141d9e40300cf 100644 (file)
@@ -72,9 +72,12 @@ notify_connection_input_line(struct notify_connection *conn, const char *line)
                i_error("notify client sent invalid priority: %s", args[2]);
                return -1;
        }
-       if (priority != REPLICATION_PRIORITY_SYNC)
-               (void)replicator_queue_add(conn->queue, args[1], priority);
-       else if (args[3] == NULL || str_to_uint(args[3], &id) < 0) {
+       if (priority != REPLICATION_PRIORITY_SYNC) {
+               struct replicator_user *user =
+                       replicator_queue_get(conn->queue, args[1]);
+               replicator_queue_update(conn->queue, user, priority);
+               replicator_queue_add(conn->queue, user);
+       } else if (args[3] == NULL || str_to_uint(args[3], &id) < 0) {
                i_error("notify client sent invalid sync id: %s", line);
                return -1;
        } else {
index a71e9428c557eb4453aa842fc5cbba358d88cb50..eedb8140713af960dad208df8faef4e7239a145d 100644 (file)
@@ -26,8 +26,9 @@ void replicator_queue_add_auth_users(struct replicator_queue *queue,
           full syncs for everyone whose state can't be found */
        ctx = auth_master_user_list_init(auth_conn, usermask, &user_info);
        while ((username = auth_master_user_list_next(ctx)) != NULL) {
-               user = replicator_queue_add(queue, username,
-                                           REPLICATION_PRIORITY_NONE);
+               user = replicator_queue_get(queue, username);
+               replicator_queue_update(queue, user, REPLICATION_PRIORITY_NONE);
+               replicator_queue_add(queue, user);
                user->last_update = last_update;
        }
        if (auth_master_user_list_deinit(&ctx) < 0)
index 430716a22590bbdb5a90b9f5120f45a59b716073..549cd36c5b8d92394594e28da360bffc494b443e 100644 (file)
@@ -152,9 +152,8 @@ replicator_queue_lookup(struct replicator_queue *queue, const char *username)
        return hash_table_lookup(queue->user_hash, username);
 }
 
-static struct replicator_user *
-replicator_queue_add_int(struct replicator_queue *queue, const char *username,
-                        enum replication_priority priority)
+struct replicator_user *
+replicator_queue_get(struct replicator_queue *queue, const char *username)
 {
        struct replicator_user *user;
 
@@ -163,33 +162,35 @@ replicator_queue_add_int(struct replicator_queue *queue, const char *username,
                user = i_new(struct replicator_user, 1);
                user->refcount = 1;
                user->username = i_strdup(username);
+               user->last_update = ioloop_time;
                hash_table_insert(queue->user_hash, user->username, user);
-       } else {
-               if (user->priority > priority) {
-                       /* user already has a higher priority than this */
-                       return user;
-               }
                if (!user->popped)
-                       priorityq_remove(queue->user_queue, &user->item);
+                       priorityq_add(queue->user_queue, &user->item);
        }
-       user->priority = priority;
-       user->last_update = ioloop_time;
-
-       if (!user->popped)
-               priorityq_add(queue->user_queue, &user->item);
        return user;
 }
 
-struct replicator_user *
-replicator_queue_add(struct replicator_queue *queue, const char *username,
-                    enum replication_priority priority)
+void replicator_queue_update(struct replicator_queue *queue ATTR_UNUSED,
+                            struct replicator_user *user,
+                            enum replication_priority priority)
 {
-       struct replicator_user *user;
+       if (user->priority > priority) {
+               /* user already has a higher priority than this */
+               return;
+       }
+       user->priority = priority;
+       user->last_update = ioloop_time;
+}
 
-       user = replicator_queue_add_int(queue, username, priority);
+void replicator_queue_add(struct replicator_queue *queue,
+                         struct replicator_user *user)
+{
+       if (!user->popped) {
+               priorityq_remove(queue->user_queue, &user->item);
+               priorityq_add(queue->user_queue, &user->item);
+       }
        if (queue->change_callback != NULL)
                queue->change_callback(queue->change_context);
-       return user;
 }
 
 void replicator_queue_add_sync(struct replicator_queue *queue,
@@ -200,8 +201,8 @@ void replicator_queue_add_sync(struct replicator_queue *queue,
        struct replicator_user *user;
        struct replicator_sync_lookup *lookup;
 
-       user = replicator_queue_add_int(queue, username,
-                                       REPLICATION_PRIORITY_SYNC);
+       user = replicator_queue_get(queue, username);
+       replicator_queue_update(queue, user, REPLICATION_PRIORITY_SYNC);
 
        lookup = array_append_space(&queue->sync_lookups);
        lookup->user = user;
@@ -209,8 +210,7 @@ void replicator_queue_add_sync(struct replicator_queue *queue,
        lookup->context = context;
        lookup->wait_for_next_push = user->popped;
 
-       if (queue->change_callback != NULL)
-               queue->change_callback(queue->change_context);
+       replicator_queue_add(queue, user);
 }
 
 void replicator_queue_remove(struct replicator_queue *queue,
@@ -362,8 +362,7 @@ replicator_queue_import_line(struct replicator_queue *queue, const char *line)
                                return 0;
                }
        }
-       user = replicator_queue_add(queue, username,
-                                   tmp_user.priority);
+       user->priority = tmp_user.priority;
        user->last_update = tmp_user.last_update;
        user->last_fast_sync = tmp_user.last_fast_sync;
        user->last_full_sync = tmp_user.last_full_sync;
@@ -371,6 +370,7 @@ replicator_queue_import_line(struct replicator_queue *queue, const char *line)
        user->last_sync_failed = tmp_user.last_sync_failed;
        i_free(user->state);
        user->state = i_strdup(state);
+       replicator_queue_add(queue, user);
        return 0;
 }
 
index a298a5d9ed8009e31350eeb6d42180997af6f266..81858316e724866e6168722f46e54ee9189ae54c 100644 (file)
@@ -46,11 +46,16 @@ bool replicator_user_unref(struct replicator_user **user);
 /* Lookup an existing user */
 struct replicator_user *
 replicator_queue_lookup(struct replicator_queue *queue, const char *username);
-/* Add a user to queue and return it. If the user already exists, it's updated
-   only if the new priority is higher. */
+/* Lookup or create a user and return it. Afterwards replicator_queue_add()
+   must be called to add/move the user to the proper place in the queue. */
 struct replicator_user *
-replicator_queue_add(struct replicator_queue *queue, const char *username,
-                    enum replication_priority priority);
+replicator_queue_get(struct replicator_queue *queue, const char *username);
+/* Update user's priority, if it isn't already higher. */
+void replicator_queue_update(struct replicator_queue *queue,
+                            struct replicator_user *user,
+                            enum replication_priority priority);
+void replicator_queue_add(struct replicator_queue *queue,
+                         struct replicator_user *user);
 void replicator_queue_add_sync(struct replicator_queue *queue,
                               const char *username,
                               replicator_sync_callback_t *callback,