]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
indexer: Remove worker-specific request queue
authorSiavash Tavakoli <siavash.tavakoli@open-xchange.com>
Mon, 21 Dec 2020 10:45:32 +0000 (10:45 +0000)
committeraki.tuomi <aki.tuomi@open-xchange.com>
Fri, 15 Jan 2021 18:14:24 +0000 (18:14 +0000)
- Each request is done in a single connection and indexer master disconnects
from worker after indexing job is complete.
- No need to keep a worker queue since it always have length of 1. Instead,
keep a pointer to indexer_request for each worker connection.
- If indexing is in progress for a user, don't try to use the same connection
when a new request arrives for the same user. Instead, move the request to the
back of the queue and wait for the in progress request to finish then
create a new connection and submit request to worker.

src/indexer/indexer-queue.c
src/indexer/indexer-queue.h
src/indexer/indexer.c
src/indexer/worker-connection.c
src/indexer/worker-connection.h

index cef92ca1b76e33a9f0b4f10c1bd8fa51c9b49675..977fbacf8cb00563fe910a474655feca23d3a6a4 100644 (file)
@@ -192,6 +192,14 @@ void indexer_queue_request_status(struct indexer_queue *queue,
        indexer_queue_request_status_int(queue, request, percentage);
 }
 
+void indexer_queue_move_head_to_tail(struct indexer_queue *queue)
+{
+       struct indexer_request *request = queue->head;
+
+       indexer_queue_request_remove(queue);
+       DLLIST2_APPEND(&queue->head, &queue->tail, request);
+}
+
 void indexer_queue_request_work(struct indexer_request *request)
 {
        request->working = TRUE;
index 2b5dd7a6c804dd25bb66e041b2fcd2b61f0f1eb6..fe56829faa989f43f42a046d6b695488ab5a5926 100644 (file)
@@ -58,6 +58,8 @@ void indexer_queue_request_remove(struct indexer_queue *queue);
 void indexer_queue_request_status(struct indexer_queue *queue,
                                  struct indexer_request *request,
                                  int percentage);
+/* Move the next request to the end of the queue. */
+void indexer_queue_move_head_to_tail(struct indexer_queue *queue);
 /* Start working on a request */
 void indexer_queue_request_work(struct indexer_request *request);
 /* Finish the request and free its memory. */
index 23b4cd57c66de831e878bba1c48c3d5fe917ebf3..504e3b32c6d59a35f0f3aefa886b7d0dfd4e853b 100644 (file)
@@ -59,7 +59,7 @@ static void worker_send_request(struct worker_connection *conn,
 static void queue_try_send_more(struct indexer_queue *queue)
 {
        struct worker_connection *conn;
-       struct indexer_request *request;
+       struct indexer_request *request, *first_moved_request = NULL;
 
        timeout_remove(&to_send_more);
 
@@ -67,14 +67,25 @@ static void queue_try_send_more(struct indexer_queue *queue)
                conn = worker_pool_find_username_connection(worker_pool,
                                                            request->username);
                if (conn != NULL) {
-                       /* there is already a worker handling this user.
-                          it must be the one doing the indexing. use the same
-                          connection for sending this next request. */
-               } else {
-                       /* try to find an empty worker */
-                       if (!worker_pool_get_connection(worker_pool, &conn))
+                       /* There is already a connection handling a request
+                        * for this user. Move the request to the back of the
+                        * queue and handle requests from other users.
+                        * Terminate if we went through all requests. */
+                       if (request == first_moved_request) {
+                               /* all requests are waiting for existing users
+                                  to finish. */
                                break;
+                       }
+                       if (first_moved_request == NULL)
+                               first_moved_request = request;
+                       indexer_queue_move_head_to_tail(queue);
+                       continue;
                }
+
+               /* create a new connection to a worker */
+               if (!worker_pool_get_connection(worker_pool, &conn))
+                       break;
+
                indexer_queue_request_remove(queue);
                worker_send_request(conn, request);
        }
@@ -87,19 +98,19 @@ static void queue_listen_callback(struct indexer_queue *queue)
 
 static void worker_status_callback(int percentage, void *context)
 {
-       struct worker_request *request = context;
+       struct worker_connection *conn = context;
+       struct indexer_request *request = worker_connection_get_request(conn);
 
        if (percentage >= 0 && percentage < 100) {
-               indexer_queue_request_status(queue, request->request,
+               indexer_queue_request_status(queue, request,
                                             percentage);
                return;
        }
 
-       indexer_queue_request_finish(queue, &request->request,
+       indexer_queue_request_finish(queue, &request,
                                     percentage == 100);
        if (worker_pool != NULL) /* not in deinit */
-               worker_pool_release_connection(worker_pool, request->conn);
-       i_free(request);
+               worker_pool_release_connection(worker_pool, conn);
 
        /* if this was the last request for the connection, we can send more
           through it. delay it a bit, since we may be coming here from
index 382d56d5140a171f2fd3343c1177b4c75f76bc49..35a3b95cb3418819e8b3b812892449ebed5b0f91 100644 (file)
@@ -32,8 +32,7 @@ struct worker_connection {
        struct ostream *output;
 
        char *request_username;
-       ARRAY(void *) request_contexts;
-       struct aqueue *request_queue;
+       struct indexer_request *request;
 
        unsigned int process_limit;
        bool version_received:1;
@@ -50,8 +49,6 @@ worker_connection_create(const char *socket_path,
        conn->socket_path = i_strdup(socket_path);
        conn->callback = callback;
        conn->fd = -1;
-       i_array_init(&conn->request_contexts, 32);
-       conn->request_queue = aqueue_init(&conn->request_contexts.arr);
        return conn;
 }
 
@@ -61,16 +58,12 @@ static void worker_connection_unref(struct worker_connection *conn)
        if (--conn->refcount > 0)
                return;
 
-       aqueue_deinit(&conn->request_queue);
-       array_free(&conn->request_contexts);
        i_free(conn->socket_path);
        i_free(conn);
 }
 
 static void worker_connection_disconnect(struct worker_connection *conn)
 {
-       unsigned int i, count = aqueue_count(conn->request_queue);
-
        if (conn->fd != -1) {
                io_remove(&conn->io);
                i_stream_destroy(&conn->input);
@@ -81,24 +74,8 @@ static void worker_connection_disconnect(struct worker_connection *conn)
                conn->fd = -1;
        }
 
-       /* cancel any pending requests */
-       if (count > 0) {
-               i_error("Indexer worker disconnected, "
-                       "discarding %u requests for %s",
-                       count, conn->request_username);
-       }
-
        /* conn->callback() can try to destroy us */
        conn->refcount++;
-       for (i = 0; i < count; i++) {
-               void *const *contextp =
-                       array_idx(&conn->request_contexts,
-                                 aqueue_idx(conn->request_queue, 0));
-               void *context = *contextp;
-
-               aqueue_delete_tail(conn->request_queue);
-               conn->callback(-1, context);
-       }
        i_free_and_null(conn->request_username);
        worker_connection_unref(conn);
 }
@@ -116,7 +93,6 @@ void worker_connection_destroy(struct worker_connection **_conn)
 static int
 worker_connection_input_line(struct worker_connection *conn, const char *line)
 {
-       void *const *contextp, *context;
        int percentage;
        /* return -1 -> error
                   0 -> request completed (100%)
@@ -124,32 +100,19 @@ worker_connection_input_line(struct worker_connection *conn, const char *line)
         */
        int ret = 1;
 
-       if (aqueue_count(conn->request_queue) == 0) {
-               i_error("Input from worker without pending requests: %s", line);
-               return -1;
-       }
-
        if (str_to_int(line, &percentage) < 0 ||
            percentage < -1 || percentage > 100) {
                i_error("Invalid input from worker: %s", line);
                return -1;
        }
 
-       contextp = array_idx(&conn->request_contexts,
-                            aqueue_idx(conn->request_queue, 0));
-       context = *contextp;
-       if (percentage < 0 || percentage == 100) {
-               /* the request is finished */
-               aqueue_delete_tail(conn->request_queue);
-               if (aqueue_count(conn->request_queue) == 0)
-                       i_free_and_null(conn->request_username);
-               if (percentage < 0)
-                       ret = -1;
-               else
-                       ret = 0;
-       }
+       /* is request finished */
+       if (percentage < 0)
+               ret = -1;
+       else if (percentage == 100)
+               ret = 0;
 
-       conn->callback(percentage, context);
+       conn->callback(percentage, conn);
        return ret;
 }
 
@@ -227,7 +190,7 @@ bool worker_connection_get_process_limit(struct worker_connection *conn,
 }
 
 void worker_connection_request(struct worker_connection *conn,
-                              const struct indexer_request *request,
+                              struct indexer_request *request,
                               void *context)
 {
        i_assert(worker_connection_is_connected(conn));
@@ -241,7 +204,7 @@ void worker_connection_request(struct worker_connection *conn,
                                request->username) == 0);
        }
 
-       aqueue_append(conn->request_queue, &context);
+       conn->request = request;
 
        T_BEGIN {
                string_t *str = t_str_new(128);
@@ -264,10 +227,16 @@ void worker_connection_request(struct worker_connection *conn,
 
 bool worker_connection_is_busy(struct worker_connection *conn)
 {
-       return aqueue_count(conn->request_queue) > 0;
+       return conn->request != NULL;
 }
 
 const char *worker_connection_get_username(struct worker_connection *conn)
 {
        return conn->request_username;
 }
+
+struct indexer_request *
+worker_connection_get_request(struct worker_connection *conn)
+{
+       return conn->request;
+}
index e734de77010052baa804d94922c87d8a8590c516..e6c320d5cb9d1a7646afbd181b06ca406be3d03b 100644 (file)
@@ -24,7 +24,7 @@ bool worker_connection_get_process_limit(struct worker_connection *conn,
    called as necessary with the given context. Requests can be queued, but
    only for the same username. */
 void worker_connection_request(struct worker_connection *conn,
-                              const struct indexer_request *request,
+                              struct indexer_request *request,
                               void *context);
 /* Returns TRUE if a request is being handled. */
 bool worker_connection_is_busy(struct worker_connection *conn);
@@ -32,4 +32,7 @@ bool worker_connection_is_busy(struct worker_connection *conn);
    or NULL if there are none. */
 const char *worker_connection_get_username(struct worker_connection *conn);
 
+struct indexer_request *
+worker_connection_get_request(struct worker_connection *conn);
+
 #endif