]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
indexer: Handle more requests whenever indexer-worker connection closes
authorTimo Sirainen <timo.sirainen@open-xchange.com>
Tue, 7 Sep 2021 13:56:16 +0000 (16:56 +0300)
committeraki.tuomi <aki.tuomi@open-xchange.com>
Wed, 15 Sep 2021 10:14:44 +0000 (10:14 +0000)
Previously this was done only when worker process sent a "request finished"
notification. Crashing worker processes could have caused the queue to get
stuck until more requests were added to the queue.

src/indexer/indexer.c
src/indexer/worker-connection.c
src/indexer/worker-connection.h
src/indexer/worker-pool.c
src/indexer/worker-pool.h

index 219af539f4c53063a53f2ca9b0a42f4d2b0f22e4..04ba8e6ed8025c8991bb961ce5ab27cb07bc22e4 100644 (file)
@@ -19,7 +19,6 @@ struct worker_request {
 static const struct master_service_settings *set;
 static struct indexer_queue *queue;
 static struct worker_pool *worker_pool;
-static struct timeout *to_send_more;
 
 void indexer_refresh_proctitle(void)
 {
@@ -61,8 +60,6 @@ static void queue_try_send_more(struct indexer_queue *queue)
        struct connection *conn;
        struct indexer_request *request, *first_moved_request = NULL;
 
-       timeout_remove(&to_send_more);
-
        while ((request = indexer_queue_request_peek(queue)) != NULL) {
                conn = worker_pool_find_username_connection(worker_pool,
                                                            request->username);
@@ -106,12 +103,12 @@ worker_status_callback(int percentage, struct indexer_request *request)
 
        indexer_queue_request_finish(queue, &request,
                                     percentage == 100);
+}
 
-       /* if this was the last request for the connection, we can send more
-          through it. delay it a bit, since we may be coming here from
-          worker_connection_disconnect() and we want to finish it up. */
-       if (to_send_more == NULL)
-               to_send_more = timeout_add_short(0, queue_try_send_more, queue);
+static void worker_avail_callback(void)
+{
+       /* A new worker became available. Try to shrink the queue. */
+       queue_try_send_more(queue);
 }
 
 int main(int argc, char *argv[])
@@ -135,7 +132,8 @@ int main(int argc, char *argv[])
        queue = indexer_queue_init(indexer_client_status_callback);
        indexer_queue_set_listen_callback(queue, queue_listen_callback);
        worker_pool = worker_pool_init("indexer-worker",
-                                      worker_status_callback);
+                                      worker_status_callback,
+                                      worker_avail_callback);
        master_service_init_finish(master_service);
 
        master_service_run(master_service, client_connected);
@@ -144,7 +142,6 @@ int main(int argc, char *argv[])
        indexer_clients_destroy_all();
        worker_pool_deinit(&worker_pool);
        indexer_queue_deinit(&queue);
-       timeout_remove(&to_send_more);
 
        master_service_deinit(&master_service);
         return 0;
index 871974d6a75bb59ecbeebad0125d972a57aa9988..02a07ebdce3e0156ff44d69116b955d5137eb459 100644 (file)
@@ -26,6 +26,7 @@ struct worker_connection {
        struct connection conn;
 
        indexer_status_callback_t *callback;
+       worker_available_callback_t *avail_callback;
 
        char *request_username;
        struct indexer_request *request;
@@ -50,6 +51,8 @@ void worker_connection_destroy(struct connection *conn)
        worker->request = NULL;
        i_free_and_null(worker->request_username);
        connection_deinit(conn);
+
+       worker->avail_callback();
        i_free(conn);
 }
 
@@ -188,12 +191,14 @@ struct connection_list *worker_connection_list_create(void)
 struct connection *
 worker_connection_create(const char *socket_path,
                         indexer_status_callback_t *callback,
+                        worker_available_callback_t *avail_callback,
                         struct connection_list *list)
 {
        struct worker_connection *conn;
 
        conn = i_new(struct worker_connection, 1);
        conn->callback = callback;
+       conn->avail_callback = avail_callback;
        connection_init_client_unix(list, &conn->conn, socket_path);
 
        return &conn->conn;
index aea46e0860266302f34bbf3dd30d4d39b549b1f3..a20e7af482fd1be61dcc73b6029676be21f5802e 100644 (file)
@@ -6,9 +6,12 @@
 struct indexer_request;
 struct connection_list;
 
+typedef void worker_available_callback_t(void);
+
 struct connection *
 worker_connection_create(const char *socket_path,
                         indexer_status_callback_t *callback,
+                        worker_available_callback_t *avail_callback,
                         struct connection_list *list);
 void worker_connection_destroy(struct connection *conn);
 
index 991aaabee56b03b2dadd01534c5bcd205da7e009..36fd16f1b653c88b47aada0ee21a90f2ba9f2c5e 100644 (file)
 struct worker_pool {
        char *socket_path;
        indexer_status_callback_t *callback;
+       worker_available_callback_t *avail_callback;
 
        struct connection_list *connection_list;
 };
 
 struct worker_pool *
-worker_pool_init(const char *socket_path, indexer_status_callback_t *callback)
+worker_pool_init(const char *socket_path, indexer_status_callback_t *callback,
+                worker_available_callback_t *avail_callback)
 {
        struct worker_pool *pool;
 
        pool = i_new(struct worker_pool, 1);
        pool->socket_path = i_strdup(socket_path);
        pool->callback = callback;
+       pool->avail_callback = avail_callback;
        pool->connection_list = worker_connection_list_create();
        return pool;
 }
@@ -58,6 +61,7 @@ static int worker_pool_add_connection(struct worker_pool *pool,
        struct connection *conn;
 
        conn = worker_connection_create(pool->socket_path, pool->callback,
+                                       pool->avail_callback,
                                        pool->connection_list);
        if (connection_client_connect(conn) < 0) {
                worker_connection_destroy(conn);
index 3f114d2024e21b1613c5f60f04a37ccc0fb052ea..bcab8287ae86bafcde42cc66a7611f1a998fb444 100644 (file)
@@ -2,11 +2,13 @@
 #define WORKER_POOL_H
 
 #include "indexer.h"
+#include "worker-connection.h"
 
 struct connection;
 
 struct worker_pool *
-worker_pool_init(const char *socket_path, indexer_status_callback_t *callback);
+worker_pool_init(const char *socket_path, indexer_status_callback_t *callback,
+                worker_available_callback_t *avail_callback);
 void worker_pool_deinit(struct worker_pool **pool);
 
 bool worker_pool_have_busy_connections(struct worker_pool *pool);