From: Timo Sirainen Date: Tue, 7 Sep 2021 13:56:16 +0000 (+0300) Subject: indexer: Handle more requests whenever indexer-worker connection closes X-Git-Tag: 2.3.17~125 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=2f8bdc93727099c11b443f842a9d412ab748618c;p=thirdparty%2Fdovecot%2Fcore.git indexer: Handle more requests whenever indexer-worker connection closes Previously this was done only when worker process sent a "request finished" notification. Crashing worker processes could have caused the queue to get stuck until more requests were added to the queue. --- diff --git a/src/indexer/indexer.c b/src/indexer/indexer.c index 219af539f4..04ba8e6ed8 100644 --- a/src/indexer/indexer.c +++ b/src/indexer/indexer.c @@ -19,7 +19,6 @@ struct worker_request { static const struct master_service_settings *set; static struct indexer_queue *queue; static struct worker_pool *worker_pool; -static struct timeout *to_send_more; void indexer_refresh_proctitle(void) { @@ -61,8 +60,6 @@ static void queue_try_send_more(struct indexer_queue *queue) struct connection *conn; struct indexer_request *request, *first_moved_request = NULL; - timeout_remove(&to_send_more); - while ((request = indexer_queue_request_peek(queue)) != NULL) { conn = worker_pool_find_username_connection(worker_pool, request->username); @@ -106,12 +103,12 @@ worker_status_callback(int percentage, struct indexer_request *request) indexer_queue_request_finish(queue, &request, percentage == 100); +} - /* if this was the last request for the connection, we can send more - through it. delay it a bit, since we may be coming here from - worker_connection_disconnect() and we want to finish it up. */ - if (to_send_more == NULL) - to_send_more = timeout_add_short(0, queue_try_send_more, queue); +static void worker_avail_callback(void) +{ + /* A new worker became available. Try to shrink the queue. */ + queue_try_send_more(queue); } int main(int argc, char *argv[]) @@ -135,7 +132,8 @@ int main(int argc, char *argv[]) queue = indexer_queue_init(indexer_client_status_callback); indexer_queue_set_listen_callback(queue, queue_listen_callback); worker_pool = worker_pool_init("indexer-worker", - worker_status_callback); + worker_status_callback, + worker_avail_callback); master_service_init_finish(master_service); master_service_run(master_service, client_connected); @@ -144,7 +142,6 @@ int main(int argc, char *argv[]) indexer_clients_destroy_all(); worker_pool_deinit(&worker_pool); indexer_queue_deinit(&queue); - timeout_remove(&to_send_more); master_service_deinit(&master_service); return 0; diff --git a/src/indexer/worker-connection.c b/src/indexer/worker-connection.c index 871974d6a7..02a07ebdce 100644 --- a/src/indexer/worker-connection.c +++ b/src/indexer/worker-connection.c @@ -26,6 +26,7 @@ struct worker_connection { struct connection conn; indexer_status_callback_t *callback; + worker_available_callback_t *avail_callback; char *request_username; struct indexer_request *request; @@ -50,6 +51,8 @@ void worker_connection_destroy(struct connection *conn) worker->request = NULL; i_free_and_null(worker->request_username); connection_deinit(conn); + + worker->avail_callback(); i_free(conn); } @@ -188,12 +191,14 @@ struct connection_list *worker_connection_list_create(void) struct connection * worker_connection_create(const char *socket_path, indexer_status_callback_t *callback, + worker_available_callback_t *avail_callback, struct connection_list *list) { struct worker_connection *conn; conn = i_new(struct worker_connection, 1); conn->callback = callback; + conn->avail_callback = avail_callback; connection_init_client_unix(list, &conn->conn, socket_path); return &conn->conn; diff --git a/src/indexer/worker-connection.h b/src/indexer/worker-connection.h index aea46e0860..a20e7af482 100644 --- a/src/indexer/worker-connection.h +++ b/src/indexer/worker-connection.h @@ -6,9 +6,12 @@ struct indexer_request; struct connection_list; +typedef void worker_available_callback_t(void); + struct connection * worker_connection_create(const char *socket_path, indexer_status_callback_t *callback, + worker_available_callback_t *avail_callback, struct connection_list *list); void worker_connection_destroy(struct connection *conn); diff --git a/src/indexer/worker-pool.c b/src/indexer/worker-pool.c index 991aaabee5..36fd16f1b6 100644 --- a/src/indexer/worker-pool.c +++ b/src/indexer/worker-pool.c @@ -13,18 +13,21 @@ struct worker_pool { char *socket_path; indexer_status_callback_t *callback; + worker_available_callback_t *avail_callback; struct connection_list *connection_list; }; struct worker_pool * -worker_pool_init(const char *socket_path, indexer_status_callback_t *callback) +worker_pool_init(const char *socket_path, indexer_status_callback_t *callback, + worker_available_callback_t *avail_callback) { struct worker_pool *pool; pool = i_new(struct worker_pool, 1); pool->socket_path = i_strdup(socket_path); pool->callback = callback; + pool->avail_callback = avail_callback; pool->connection_list = worker_connection_list_create(); return pool; } @@ -58,6 +61,7 @@ static int worker_pool_add_connection(struct worker_pool *pool, struct connection *conn; conn = worker_connection_create(pool->socket_path, pool->callback, + pool->avail_callback, pool->connection_list); if (connection_client_connect(conn) < 0) { worker_connection_destroy(conn); diff --git a/src/indexer/worker-pool.h b/src/indexer/worker-pool.h index 3f114d2024..bcab8287ae 100644 --- a/src/indexer/worker-pool.h +++ b/src/indexer/worker-pool.h @@ -2,11 +2,13 @@ #define WORKER_POOL_H #include "indexer.h" +#include "worker-connection.h" struct connection; struct worker_pool * -worker_pool_init(const char *socket_path, indexer_status_callback_t *callback); +worker_pool_init(const char *socket_path, indexer_status_callback_t *callback, + worker_available_callback_t *avail_callback); void worker_pool_deinit(struct worker_pool **pool); bool worker_pool_have_busy_connections(struct worker_pool *pool);