]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
indexer: indexer-worker: Use connection.c functions
authorAki Tuomi <aki.tuomi@open-xchange.com>
Thu, 6 May 2021 09:03:16 +0000 (12:03 +0300)
committerTimo Sirainen <timo.sirainen@open-xchange.com>
Mon, 14 Jun 2021 14:56:40 +0000 (17:56 +0300)
src/indexer/indexer-worker.c
src/indexer/master-connection.c
src/indexer/master-connection.h

index 4f75dd4c5e7c27b0be45dd6e68b3891a662478b8..ef39aebb2f70282cfdf0792fc2c6c6c7c08698c8 100644 (file)
@@ -12,13 +12,13 @@ static struct mail_storage_service_ctx *storage_service;
 
 static void client_connected(struct master_service_connection *conn)
 {
-       if (master_conn != NULL) {
+       master_service_client_connection_accept(conn);
+
+       if (!master_connection_create(conn, storage_service)) {
                i_error("indexer-worker must be configured with client_limit=1");
-               return;
+               i_close_fd(&conn->fd);
+               master_service_client_connection_destroyed(master_service);
        }
-
-       master_service_client_connection_accept(conn);
-       master_conn = master_connection_create(conn->fd, storage_service);
 }
 
 static void drop_privileges(void)
@@ -75,8 +75,7 @@ int main(int argc, char *argv[])
 
        master_service_run(master_service, client_connected);
 
-       if (master_conn != NULL)
-               master_connection_destroy();
+       master_connections_destroy();
        mail_storage_service_deinit(&storage_service);
        master_service_deinit(&master_service);
         return 0;
index 2d484d7be436e97794d12ce72113bd50a94bb159..7cb426cc688d789abf3263cfcbeeb51c3bcd2b48 100644 (file)
@@ -4,7 +4,7 @@
 #include "connection.h"
 #include "ioloop.h"
 #include "istream.h"
-#include "write-full.h"
+#include "ostream.h"
 #include "strescape.h"
 #include "process-title.h"
 #include "master-service.h"
 #define INDEXER_PROTOCOL_MAJOR_VERSION 1
 #define INDEXER_PROTOCOL_MINOR_VERSION 0
 
-#define INDEXER_WORKER_HANDSHAKE "VERSION\tindexer-worker-master\t1\t0\n%u\n"
 #define INDEXER_MASTER_NAME "indexer-master-worker"
-
-struct master_connection *master_conn;
+#define INDEXER_WORKER_NAME "indexer-worker-master"
 
 struct master_connection {
        struct connection conn;
@@ -129,8 +127,8 @@ index_mailbox_precache(struct master_connection *conn, struct mailbox *box)
                                               sizeof(percentage_str), "%u\n",
                                               percentage) < 0)
                                        i_unreached();
-                               (void)write_full(conn->conn.fd_in, percentage_str,
-                                                strlen(percentage_str));
+                               o_stream_nsend_str(conn->conn.output,
+                                                  percentage_str);
                        }
                        indexer_worker_refresh_proctitle(username, box_vname,
                                                         counter, max);
@@ -240,9 +238,10 @@ index_mailbox(struct master_connection *conn, struct mail_user *user,
 }
 
 static int
-master_connection_input_line(struct master_connection *conn, const char *line)
+master_connection_input_args(struct connection *_conn, const char *const *args)
 {
-       const char *const *args = t_strsplit_tabescaped(line);
+       struct master_connection *conn =
+               container_of(_conn, struct master_connection, conn);
        struct mail_storage_service_input input;
        struct mail_storage_service_user *service_user;
        struct mail_user *user;
@@ -253,7 +252,8 @@ master_connection_input_line(struct master_connection *conn, const char *line)
        /* <username> <mailbox> <session ID> <max_recent_msgs> [i][o] */
        if (str_array_length(args) != 5 ||
            str_to_uint(args[3], &max_recent_msgs) < 0 || args[4][0] == '\0') {
-               i_error("Invalid input from master: %s", line);
+               i_error("Invalid input from master: %s",
+                       t_strarray_join(args, "\t"));
                return -1;
        }
 
@@ -285,74 +285,70 @@ master_connection_input_line(struct master_connection *conn, const char *line)
        }
 
        str = ret < 0 ? "-1\n" : "100\n";
-       return write_full(conn->conn.fd_in, str, strlen(str));
+       o_stream_nsend_str(conn->conn.output, str);
+       return ret;
 }
 
-static void master_connection_input(struct master_connection *conn)
+static void master_connection_destroy(struct connection *connection)
 {
-       const char *line;
-       int ret;
+       connection_deinit(connection);
+       i_free(connection);
+       master_service_client_connection_destroyed(master_service);
+}
 
-       if (i_stream_read(conn->conn.input) < 0) {
-               master_connection_destroy();
-               return;
-       }
+static int master_connection_handshake_args(struct connection *connection,
+                                           const char *const *args)
+{
+       int ret;
+       if ((ret = connection_handshake_args_default(connection, args)) < 1)
+               return ret;
+       const char *limit = t_strdup_printf("%u\n",
+               master_service_get_process_limit(master_service));
+       o_stream_nsend_str(connection->output, limit);
+       return 1;
+}
 
-       if (!conn->conn.version_received) {
-               if ((line = i_stream_next_line(conn->conn.input)) == NULL)
-                       return;
+static struct connection_list *master_connection_list = NULL;
 
-               if (!version_string_verify(line, INDEXER_MASTER_NAME,
-                               INDEXER_PROTOCOL_MAJOR_VERSION)) {
-                       i_error("Indexer master not compatible with this master "
-                               "(mixed old and new binaries?)");
-                       master_connection_destroy();
-                       return;
-               }
-               conn->conn.version_received = TRUE;
-       }
+static const struct connection_vfuncs master_connection_vfuncs = {
+       .destroy = master_connection_destroy,
+       .input_args = master_connection_input_args,
+       .handshake_args = master_connection_handshake_args,
+};
 
-       while ((line = i_stream_next_line(conn->conn.input)) != NULL) {
-               T_BEGIN {
-                       ret = master_connection_input_line(conn, line);
-               } T_END;
-               if (ret < 0) {
-                       master_connection_destroy();
-                       break;
-               }
-       }
-}
+static const struct connection_settings master_connection_set = {
+       .service_name_in = INDEXER_MASTER_NAME,
+       .service_name_out = INDEXER_WORKER_NAME,
+       .major_version = INDEXER_PROTOCOL_MAJOR_VERSION,
+       .minor_version = INDEXER_PROTOCOL_MINOR_VERSION,
+       .input_max_size = SIZE_MAX,
+       .output_max_size = SIZE_MAX,
+};
 
-struct master_connection *
-master_connection_create(int fd, struct mail_storage_service_ctx *storage_service)
+bool
+master_connection_create(struct master_service_connection *master,
+                        struct mail_storage_service_ctx *storage_service)
 {
        struct master_connection *conn;
-       const char *handshake;
+
+       if (master_connection_list == NULL) {
+               master_connection_list =
+                       connection_list_init(&master_connection_set,
+                                            &master_connection_vfuncs);
+       } else if (master_connection_list->connections_count > 0) {
+               return FALSE;
+       }
 
        conn = i_new(struct master_connection, 1);
        conn->storage_service = storage_service;
-       conn->conn.fd_in = fd;
-       conn->conn.io = io_add(conn->conn.fd_in, IO_READ, master_connection_input, conn);
-       conn->conn.input = i_stream_create_fd(conn->conn.fd_in, SIZE_MAX);
+       connection_init_server(master_connection_list, &conn->conn,
+                              master->name, master->fd, master->fd);
 
-       handshake = t_strdup_printf(INDEXER_WORKER_HANDSHAKE,
-               master_service_get_process_limit(master_service));
-       (void)write_full(conn->conn.fd_in, handshake, strlen(handshake));
-       return conn;
+       return TRUE;
 }
 
-void master_connection_destroy(void)
+void master_connections_destroy(void)
 {
-       struct master_connection *conn = master_conn;
-
-       master_conn = NULL;
-
-       io_remove(&conn->conn.io);
-       i_stream_destroy(&conn->conn.input);
-
-       if (close(conn->conn.fd_in) < 0)
-               i_error("close(master conn) failed: %m");
-       i_free(conn);
-
-       master_service_client_connection_destroyed(master_service);
+       if (master_connection_list != NULL)
+               connection_list_deinit(&master_connection_list);
 }
index 85ac344695e1df4068df1b2adb137712ce2c8569..7dcb70e296d112f1f57f2208665934880a975e2e 100644 (file)
@@ -1,10 +1,8 @@
 #ifndef MASTER_CONNECTION_H
 #define MASTER_CONNECTION_H
 
-extern struct master_connection *master_conn;
-
-struct master_connection *
-master_connection_create(int fd, struct mail_storage_service_ctx *storage_service);
-void master_connection_destroy(void);
+bool master_connection_create(struct master_service_connection *conn,
+                             struct mail_storage_service_ctx *storage_service);
+void master_connections_destroy(void);
 
 #endif