full = strchr(args[1], 'f') != NULL;
usermask = args[2];
if (strchr(usermask, '*') == NULL && strchr(usermask, '?') == NULL) {
- user = replicator_queue_add(queue, usermask, priority);
+ struct replicator_user *user =
+ replicator_queue_get(queue, usermask);
if (full)
user->force_full_sync = TRUE;
+ replicator_queue_update(queue, user, priority);
+ replicator_queue_add(queue, user);
o_stream_nsend_str(client->conn.output, "+1\n");
return 0;
}
while ((user = replicator_queue_iter_next(iter)) != NULL) {
if (!wildcard_match(user->username, usermask))
continue;
- user = replicator_queue_add(queue, user->username, priority);
if (full)
user->force_full_sync = TRUE;
+ replicator_queue_update(queue, user, priority);
+ replicator_queue_add(queue, user);
match_count++;
}
replicator_queue_iter_deinit(&iter);
}
if (strchr(args[0], '*') == NULL && strchr(args[0], '?') == NULL) {
- (void)replicator_queue_add(queue, args[0],
- REPLICATION_PRIORITY_NONE);
+ struct replicator_user *user =
+ replicator_queue_get(queue, args[0]);
+ replicator_queue_add(queue, user);
} else {
replicator_queue_add_auth_users(queue, set->auth_socket_path,
args[0], ioloop_time);
return -1;
}
- user = replicator_queue_add(queue, args[0], REPLICATION_PRIORITY_NONE);
+ user = replicator_queue_get(queue, args[0]);
if (args[1][0] == 'f')
user->last_full_sync = ioloop_time;
user->last_fast_sync = ioloop_time;
user->last_update = ioloop_time;
+ replicator_queue_add(queue, user);
if (args[2][0] != '\0') {
i_free(user->state);
i_error("notify client sent invalid priority: %s", args[2]);
return -1;
}
- if (priority != REPLICATION_PRIORITY_SYNC)
- (void)replicator_queue_add(conn->queue, args[1], priority);
- else if (args[3] == NULL || str_to_uint(args[3], &id) < 0) {
+ if (priority != REPLICATION_PRIORITY_SYNC) {
+ struct replicator_user *user =
+ replicator_queue_get(conn->queue, args[1]);
+ replicator_queue_update(conn->queue, user, priority);
+ replicator_queue_add(conn->queue, user);
+ } else if (args[3] == NULL || str_to_uint(args[3], &id) < 0) {
i_error("notify client sent invalid sync id: %s", line);
return -1;
} else {
full syncs for everyone whose state can't be found */
ctx = auth_master_user_list_init(auth_conn, usermask, &user_info);
while ((username = auth_master_user_list_next(ctx)) != NULL) {
- user = replicator_queue_add(queue, username,
- REPLICATION_PRIORITY_NONE);
+ user = replicator_queue_get(queue, username);
+ replicator_queue_update(queue, user, REPLICATION_PRIORITY_NONE);
+ replicator_queue_add(queue, user);
user->last_update = last_update;
}
if (auth_master_user_list_deinit(&ctx) < 0)
return hash_table_lookup(queue->user_hash, username);
}
-static struct replicator_user *
-replicator_queue_add_int(struct replicator_queue *queue, const char *username,
- enum replication_priority priority)
+struct replicator_user *
+replicator_queue_get(struct replicator_queue *queue, const char *username)
{
struct replicator_user *user;
user = i_new(struct replicator_user, 1);
user->refcount = 1;
user->username = i_strdup(username);
+ user->last_update = ioloop_time;
hash_table_insert(queue->user_hash, user->username, user);
- } else {
- if (user->priority > priority) {
- /* user already has a higher priority than this */
- return user;
- }
if (!user->popped)
- priorityq_remove(queue->user_queue, &user->item);
+ priorityq_add(queue->user_queue, &user->item);
}
- user->priority = priority;
- user->last_update = ioloop_time;
-
- if (!user->popped)
- priorityq_add(queue->user_queue, &user->item);
return user;
}
-struct replicator_user *
-replicator_queue_add(struct replicator_queue *queue, const char *username,
- enum replication_priority priority)
+void replicator_queue_update(struct replicator_queue *queue ATTR_UNUSED,
+ struct replicator_user *user,
+ enum replication_priority priority)
{
- struct replicator_user *user;
+ if (user->priority > priority) {
+ /* user already has a higher priority than this */
+ return;
+ }
+ user->priority = priority;
+ user->last_update = ioloop_time;
+}
- user = replicator_queue_add_int(queue, username, priority);
+void replicator_queue_add(struct replicator_queue *queue,
+ struct replicator_user *user)
+{
+ if (!user->popped) {
+ priorityq_remove(queue->user_queue, &user->item);
+ priorityq_add(queue->user_queue, &user->item);
+ }
if (queue->change_callback != NULL)
queue->change_callback(queue->change_context);
- return user;
}
void replicator_queue_add_sync(struct replicator_queue *queue,
struct replicator_user *user;
struct replicator_sync_lookup *lookup;
- user = replicator_queue_add_int(queue, username,
- REPLICATION_PRIORITY_SYNC);
+ user = replicator_queue_get(queue, username);
+ replicator_queue_update(queue, user, REPLICATION_PRIORITY_SYNC);
lookup = array_append_space(&queue->sync_lookups);
lookup->user = user;
lookup->context = context;
lookup->wait_for_next_push = user->popped;
- if (queue->change_callback != NULL)
- queue->change_callback(queue->change_context);
+ replicator_queue_add(queue, user);
}
void replicator_queue_remove(struct replicator_queue *queue,
return 0;
}
}
- user = replicator_queue_add(queue, username,
- tmp_user.priority);
+ user->priority = tmp_user.priority;
user->last_update = tmp_user.last_update;
user->last_fast_sync = tmp_user.last_fast_sync;
user->last_full_sync = tmp_user.last_full_sync;
user->last_sync_failed = tmp_user.last_sync_failed;
i_free(user->state);
user->state = i_strdup(state);
+ replicator_queue_add(queue, user);
return 0;
}
/* Lookup an existing user */
struct replicator_user *
replicator_queue_lookup(struct replicator_queue *queue, const char *username);
-/* Add a user to queue and return it. If the user already exists, it's updated
- only if the new priority is higher. */
+/* Lookup or create a user and return it. Afterwards replicator_queue_add()
+ must be called to add/move the user to the proper place in the queue. */
struct replicator_user *
-replicator_queue_add(struct replicator_queue *queue, const char *username,
- enum replication_priority priority);
+replicator_queue_get(struct replicator_queue *queue, const char *username);
+/* Update user's priority, if it isn't already higher. */
+void replicator_queue_update(struct replicator_queue *queue,
+ struct replicator_user *user,
+ enum replication_priority priority);
+void replicator_queue_add(struct replicator_queue *queue,
+ struct replicator_user *user);
void replicator_queue_add_sync(struct replicator_queue *queue,
const char *username,
replicator_sync_callback_t *callback,