]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
indexer: worker-connection - Use connection.h structures
authorAki Tuomi <aki.tuomi@open-xchange.com>
Thu, 6 May 2021 07:29:23 +0000 (10:29 +0300)
committerTimo Sirainen <timo.sirainen@open-xchange.com>
Mon, 14 Jun 2021 14:56:40 +0000 (17:56 +0300)
src/indexer/indexer.c
src/indexer/worker-connection.c
src/indexer/worker-connection.h
src/indexer/worker-pool.c
src/indexer/worker-pool.h

index 504e3b32c6d59a35f0f3aefa886b7d0dfd4e853b..711a0608db8008aa5380eb4278611bca1e8f146f 100644 (file)
@@ -12,7 +12,7 @@
 #include "worker-connection.h"
 
 struct worker_request {
-       struct worker_connection *conn;
+       struct connection *conn;
        struct indexer_request *request;
 };
 
@@ -43,7 +43,7 @@ static void client_connected(struct master_service_connection *conn)
        (void)indexer_client_create(conn->fd, queue);
 }
 
-static void worker_send_request(struct worker_connection *conn,
+static void worker_send_request(struct connection *conn,
                                struct indexer_request *request)
 {
        struct worker_request *wrequest;
@@ -58,7 +58,7 @@ static void worker_send_request(struct worker_connection *conn,
 
 static void queue_try_send_more(struct indexer_queue *queue)
 {
-       struct worker_connection *conn;
+       struct connection *conn;
        struct indexer_request *request, *first_moved_request = NULL;
 
        timeout_remove(&to_send_more);
@@ -98,7 +98,7 @@ static void queue_listen_callback(struct indexer_queue *queue)
 
 static void worker_status_callback(int percentage, void *context)
 {
-       struct worker_connection *conn = context;
+       struct connection *conn = context;
        struct indexer_request *request = worker_connection_get_request(conn);
 
        if (percentage >= 0 && percentage < 100) {
index 72ccfbcbee09870e387cb39dff8161b4c8cc1434..a2f619576a07ad8d2fef52c378747398e092b2b6 100644 (file)
@@ -3,6 +3,7 @@
 #include "lib.h"
 #include "array.h"
 #include "aqueue.h"
+#include "connection.h"
 #include "ioloop.h"
 #include "istream.h"
 #include "ostream.h"
 #define INDEXER_WORKER_NAME "indexer-worker-master"
 
 struct worker_connection {
+       struct connection conn;
+
        int refcount;
 
-       char *socket_path;
        indexer_status_callback_t *callback;
 
-       int fd;
-       struct io *io;
-       struct istream *input;
-       struct ostream *output;
-
        char *request_username;
        struct indexer_request *request;
 
        unsigned int process_limit;
-       bool version_received:1;
 };
 
 static void worker_connection_unref(struct worker_connection *conn)
@@ -44,41 +40,47 @@ static void worker_connection_unref(struct worker_connection *conn)
        if (--conn->refcount > 0)
                return;
 
-       i_free(conn->socket_path);
+       i_free(conn->conn.base_name);
        i_free(conn);
 }
 
-static void worker_connection_disconnect(struct worker_connection *conn)
+static void worker_connection_disconnect(struct connection *conn)
 {
-       if (conn->fd != -1) {
+       struct worker_connection *worker =
+               container_of(conn, struct worker_connection, conn);
+       if (conn->fd_in != -1) {
                io_remove(&conn->io);
                i_stream_destroy(&conn->input);
                o_stream_destroy(&conn->output);
 
-               if (close(conn->fd) < 0)
-                       i_error("close(%s) failed: %m", conn->socket_path);
-               conn->fd = -1;
+               if (close(conn->fd_in) < 0)
+                       i_error("close(%s) failed: %m", conn->base_name);
+               conn->fd_in = -1;
        }
 
        /* conn->callback() can try to destroy us */
-       conn->refcount++;
-       i_free_and_null(conn->request_username);
-       worker_connection_unref(conn);
+       worker->refcount++;
+       i_free_and_null(worker->request_username);
+       worker_connection_unref(worker);
 }
 
-void worker_connection_destroy(struct worker_connection **_conn)
+void worker_connection_destroy(struct connection **_conn)
 {
-       struct worker_connection *conn = *_conn;
+       struct connection *conn = *_conn;
+       struct worker_connection *worker =
+               container_of(conn, struct worker_connection, conn);
 
        *_conn = NULL;
 
        worker_connection_disconnect(conn);
-       worker_connection_unref(conn);
+       worker_connection_unref(worker);
 }
 
 static int
-worker_connection_input_line(struct worker_connection *conn, const char *line)
+worker_connection_input_line(struct connection *conn, const char *line)
 {
+       struct worker_connection *worker =
+               container_of(conn, struct worker_connection, conn);
        int percentage;
        /* return -1 -> error
                   0 -> request completed (100%)
@@ -98,12 +100,14 @@ worker_connection_input_line(struct worker_connection *conn, const char *line)
        else if (percentage == 100)
                ret = 0;
 
-       conn->callback(percentage, conn);
+       worker->callback(percentage, conn);
        return ret;
 }
 
-static void worker_connection_input(struct worker_connection *conn)
+static void worker_connection_input(struct connection *conn)
 {
+       struct worker_connection *worker =
+               container_of(conn, struct worker_connection, conn);
        const char *line;
 
        if (i_stream_read(conn->input) < 0) {
@@ -124,11 +128,11 @@ static void worker_connection_input(struct worker_connection *conn)
                }
                conn->version_received = TRUE;
        }
-       if (conn->process_limit == 0) {
+       if (worker->process_limit == 0) {
                if ((line = i_stream_next_line(conn->input)) == NULL)
                        return;
-               if (str_to_uint(line, &conn->process_limit) < 0 ||
-                   conn->process_limit == 0) {
+               if (str_to_uint(line, &worker->process_limit) < 0 ||
+                   worker->process_limit == 0) {
                        i_error("Indexer worker sent invalid handshake: %s",
                                line);
                        worker_connection_disconnect(conn);
@@ -143,54 +147,60 @@ static void worker_connection_input(struct worker_connection *conn)
        }
 }
 
-int worker_connection_connect(struct worker_connection *conn)
+int worker_connection_connect(struct connection *conn)
 {
-       i_assert(conn->fd == -1);
+       i_assert(conn->fd_in == -1);
 
-       conn->fd = net_connect_unix(conn->socket_path);
-       if (conn->fd == -1) {
-               i_error("connect(%s) failed: %m", conn->socket_path);
+       conn->fd_in = net_connect_unix(conn->base_name);
+       if (conn->fd_in == -1) {
+               i_error("connect(%s) failed: %m", conn->base_name);
                return -1;
        }
-       conn->io = io_add(conn->fd, IO_READ, worker_connection_input, conn);
-       conn->input = i_stream_create_fd(conn->fd, SIZE_MAX);
-       conn->output = o_stream_create_fd(conn->fd, SIZE_MAX);
+       conn->io = io_add(conn->fd_in, IO_READ, worker_connection_input, conn);
+       conn->input = i_stream_create_fd(conn->fd_in, SIZE_MAX);
+       conn->output = o_stream_create_fd(conn->fd_in, SIZE_MAX);
        o_stream_set_no_error_handling(conn->output, TRUE);
        o_stream_nsend_str(conn->output, INDEXER_MASTER_HANDSHAKE);
        return 0;
 }
 
-bool worker_connection_is_connected(struct worker_connection *conn)
+bool worker_connection_is_connected(struct connection *conn)
 {
-       return conn->fd != -1;
+       return conn->fd_in != -1;
 }
 
-bool worker_connection_get_process_limit(struct worker_connection *conn,
+bool worker_connection_get_process_limit(struct connection *conn,
                                         unsigned int *limit_r)
 {
-       if (conn->process_limit == 0)
+       struct worker_connection *worker =
+               container_of(conn, struct worker_connection, conn);
+
+       if (worker->process_limit == 0)
                return FALSE;
 
-       *limit_r = conn->process_limit;
+       *limit_r = worker->process_limit;
        return TRUE;
 }
 
-void worker_connection_request(struct worker_connection *conn,
+void worker_connection_request(struct connection *conn,
                               struct indexer_request *request,
                               void *context)
 {
+       struct worker_connection *worker =
+               container_of(conn, struct worker_connection, conn);
+
        i_assert(worker_connection_is_connected(conn));
        i_assert(context != NULL);
        i_assert(request->index || request->optimize);
 
-       if (conn->request_username == NULL)
-               conn->request_username = i_strdup(request->username);
+       if (worker->request_username == NULL)
+               worker->request_username = i_strdup(request->username);
        else {
-               i_assert(strcmp(conn->request_username,
+               i_assert(strcmp(worker->request_username,
                                request->username) == 0);
        }
 
-       conn->request = request;
+       worker->request = request;
 
        T_BEGIN {
                string_t *str = t_str_new(128);
@@ -211,33 +221,38 @@ void worker_connection_request(struct worker_connection *conn,
        } T_END;
 }
 
-bool worker_connection_is_busy(struct worker_connection *conn)
+bool worker_connection_is_busy(struct connection *conn)
 {
-       return conn->request != NULL;
+       struct worker_connection *worker =
+               container_of(conn, struct worker_connection, conn);
+       return worker->request != NULL;
 }
 
-const char *worker_connection_get_username(struct worker_connection *conn)
+const char *worker_connection_get_username(struct connection *conn)
 {
-       return conn->request_username;
+       struct worker_connection *worker =
+               container_of(conn, struct worker_connection, conn);
+       return worker->request_username;
 }
 
 struct indexer_request *
-worker_connection_get_request(struct worker_connection *conn)
+worker_connection_get_request(struct connection *conn)
 {
-       return conn->request;
+       struct worker_connection *worker =
+               container_of(conn, struct worker_connection, conn);
+       return worker->request;
 }
 
-struct worker_connection *
+struct connection *
 worker_connection_create(const char *socket_path,
                         indexer_status_callback_t *callback)
 {
        struct worker_connection *conn;
 
        conn = i_new(struct worker_connection, 1);
-
        conn->refcount = 1;
-       conn->socket_path = i_strdup(socket_path);
+       conn->conn.base_name = i_strdup(socket_path);
        conn->callback = callback;
-       conn->fd = -1;
-       return conn;
+       conn->conn.fd_in = -1;
+       return &conn->conn;
 }
index e6c320d5cb9d1a7646afbd181b06ca406be3d03b..80f83fb1b4d69c45d31504baabf160b20651122c 100644 (file)
@@ -5,34 +5,34 @@
 
 struct indexer_request;
 
-struct worker_connection *
+struct connection *
 worker_connection_create(const char *socket_path,
                         indexer_status_callback_t *callback);
-void worker_connection_destroy(struct worker_connection **conn);
+void worker_connection_destroy(struct connection **conn);
 
-int worker_connection_connect(struct worker_connection *conn);
+int worker_connection_connect(struct connection *conn);
 /* Returns TRUE if worker is connected to (not necessarily handshaked yet) */
-bool worker_connection_is_connected(struct worker_connection *conn);
+bool worker_connection_is_connected(struct connection *conn);
 
 /* After initial handshake the worker process tells how many of its kind
    can be at maximum. This returns the value, of FALSE if handshake isn't
    finished yet. */
-bool worker_connection_get_process_limit(struct worker_connection *conn,
+bool worker_connection_get_process_limit(struct connection *conn,
                                         unsigned int *limit_r);
 
 /* Send a new indexing request for username+mailbox. The status callback is
    called as necessary with the given context. Requests can be queued, but
    only for the same username. */
-void worker_connection_request(struct worker_connection *conn,
+void worker_connection_request(struct connection *conn,
                               struct indexer_request *request,
                               void *context);
 /* Returns TRUE if a request is being handled. */
-bool worker_connection_is_busy(struct worker_connection *conn);
+bool worker_connection_is_busy(struct connection *conn);
 /* Returns username of the currently pending requests,
    or NULL if there are none. */
-const char *worker_connection_get_username(struct worker_connection *conn);
+const char *worker_connection_get_username(struct connection *conn);
 
 struct indexer_request *
-worker_connection_get_request(struct worker_connection *conn);
+worker_connection_get_request(struct connection *conn);
 
 #endif
index e7eef51007f0fe48df2b2acc463f5ea203b9d25b..14df98c43941bb2c7a95be11d6b273ec5dda785b 100644 (file)
@@ -12,7 +12,7 @@
 struct worker_connection_list {
        struct worker_connection_list *prev, *next;
 
-       struct worker_connection *conn;
+       struct connection *conn;
        time_t last_use;
 };
 
@@ -62,9 +62,9 @@ bool worker_pool_have_busy_connections(struct worker_pool *pool)
 }
 
 static int worker_pool_add_connection(struct worker_pool *pool,
-                                     struct worker_connection **conn_r)
+                                     struct connection **conn_r)
 {
-       struct worker_connection *conn;
+       struct connection *conn;
 
        pool->connection_count++;
        conn = worker_connection_create(pool->socket_path, pool->callback);
@@ -106,7 +106,7 @@ static unsigned int worker_pool_find_max_connections(struct worker_pool *pool)
 }
 
 bool worker_pool_get_connection(struct worker_pool *pool,
-                               struct worker_connection **conn_r)
+                               struct connection **conn_r)
 {
        struct worker_connection_list *list;
        unsigned int max_connections;
@@ -125,7 +125,7 @@ bool worker_pool_get_connection(struct worker_pool *pool,
 }
 
 void worker_pool_release_connection(struct worker_pool *pool,
-                                   struct worker_connection *conn)
+                                   struct connection *conn)
 {
        struct worker_connection_list *list;
 
@@ -141,7 +141,7 @@ void worker_pool_release_connection(struct worker_pool *pool,
        worker_connection_destroy(&conn);
 }
 
-struct worker_connection *
+struct connection *
 worker_pool_find_username_connection(struct worker_pool *pool,
                                     const char *username)
 {
index 80ec4b0bc0a090dbe100526e3c9a545853916121..057d67f37c2606a3cdafce09c62deb165afa3874 100644 (file)
@@ -3,7 +3,7 @@
 
 #include "indexer.h"
 
-struct worker_connection;
+struct connection;
 
 struct worker_pool *
 worker_pool_init(const char *socket_path, indexer_status_callback_t *callback);
@@ -12,11 +12,11 @@ void worker_pool_deinit(struct worker_pool **pool);
 bool worker_pool_have_busy_connections(struct worker_pool *pool);
 
 bool worker_pool_get_connection(struct worker_pool *pool,
-                               struct worker_connection **conn_r);
+                               struct connection **conn_r);
 void worker_pool_release_connection(struct worker_pool *pool,
-                                   struct worker_connection *conn);
+                                   struct connection *conn);
 
-struct worker_connection *
+struct connection *
 worker_pool_find_username_connection(struct worker_pool *pool,
                                     const char *username);