]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
indexer: worker-connection - Use connection.c functions
authorAki Tuomi <aki.tuomi@open-xchange.com>
Thu, 6 May 2021 08:21:43 +0000 (11:21 +0300)
committerTimo Sirainen <timo.sirainen@open-xchange.com>
Mon, 14 Jun 2021 14:56:40 +0000 (17:56 +0300)
src/indexer/worker-connection.c
src/indexer/worker-connection.h
src/indexer/worker-pool.c

index fe52d9b3a2254d2ea5635e87e7b4b53b178cfc7e..90f536d2b2e3fba35832e100c76de27af10755a2 100644 (file)
@@ -19,7 +19,7 @@
 #define INDEXER_PROTOCOL_MAJOR_VERSION 1
 #define INDEXER_PROTOCOL_MINOR_VERSION 0
 
-#define INDEXER_MASTER_HANDSHAKE "VERSION\tindexer-master-worker\t1\t0\n"
+#define INDEXER_MASTER_NAME "indexer-master-worker"
 #define INDEXER_WORKER_NAME "indexer-worker-master"
 
 struct worker_connection {
@@ -35,139 +35,85 @@ struct worker_connection {
        unsigned int process_limit;
 };
 
-static void worker_connection_unref(struct worker_connection *conn)
+static void worker_connection_call_callback(struct worker_connection *worker,
+                                           int percentage)
 {
-       i_assert(conn->refcount > 0);
-       if (--conn->refcount > 0)
-               return;
-
-       i_free(conn->conn.base_name);
-       i_free(conn);
+       if (worker->request != NULL)
+               worker->callback(percentage, &worker->conn);
+       if (percentage < 0 || percentage == 100)
+               worker->request = NULL;
 }
 
-static void worker_connection_disconnect(struct connection *conn)
+static void worker_connection_destroy(struct connection *conn)
 {
        struct worker_connection *worker =
                container_of(conn, struct worker_connection, conn);
-       if (conn->fd_in != -1) {
-               io_remove(&conn->io);
-               i_stream_destroy(&conn->input);
-               o_stream_destroy(&conn->output);
-
-               if (close(conn->fd_in) < 0)
-                       i_error("close(%s) failed: %m", conn->base_name);
-               conn->fd_in = -1;
-       }
 
-       /* conn->callback() can try to destroy us */
-       worker->refcount++;
+       worker->request = NULL;
        i_free_and_null(worker->request_username);
-       worker_connection_unref(worker);
+       connection_deinit(conn);
 }
 
-void worker_connection_destroy(struct connection **_conn)
+void worker_connection_unref(struct connection **_conn)
 {
        struct connection *conn = *_conn;
        struct worker_connection *worker =
                container_of(conn, struct worker_connection, conn);
 
-       *_conn = NULL;
-
-       worker_connection_disconnect(conn);
-       worker_connection_unref(worker);
+       i_assert(worker->refcount > 0);
+       if (--worker->refcount > 0)
+               return;
+       worker_connection_destroy(conn);
+       i_free(conn);
 }
 
 static int
-worker_connection_input_line(struct connection *conn, const char *line)
+worker_connection_handshake_args(struct connection *conn, const char *const *args)
 {
        struct worker_connection *worker =
                container_of(conn, struct worker_connection, conn);
-       int percentage;
-       /* return -1 -> error
-                  0 -> request completed (100%)
-                  1 -> request continues (<100%)
-       */
-       int ret = 1;
-
-       if (str_to_int(line, &percentage) < 0 ||
-           percentage < -1 || percentage > 100) {
-               i_error("Invalid input from worker: %s", line);
+       int ret;
+       if (!conn->version_received) {
+               if ((ret = connection_handshake_args_default(conn, args)) < 1)
+                       return ret;
+               /* we are not done yet */
+               return 0;
+       }
+       i_assert(worker->process_limit == 0);
+       if (str_to_uint(args[0], &worker->process_limit) < 0 ||
+           worker->process_limit == 0) {
+               e_error(conn->event, "Worker sent invalid process limit '%s'",
+                       args[0]);
                return -1;
        }
-
-       /* is request finished */
-       if (percentage < 0)
-               ret = -1;
-       else if (percentage == 100)
-               ret = 0;
-
-       worker->callback(percentage, conn);
-       return ret;
+       return 1;
 }
 
-static void worker_connection_input(struct connection *conn)
+static int
+worker_connection_input_args(struct connection *conn, const char *const *args)
 {
        struct worker_connection *worker =
                container_of(conn, struct worker_connection, conn);
-       const char *line;
-
-       if (i_stream_read(conn->input) < 0) {
-               worker_connection_disconnect(conn);
-               return;
-       }
+       int percentage;
+       int ret = 1;
 
-       if (!conn->version_received) {
-               if ((line = i_stream_next_line(conn->input)) == NULL)
-                       return;
-
-               if (!version_string_verify(line, INDEXER_WORKER_NAME,
-                               INDEXER_PROTOCOL_MAJOR_VERSION)) {
-                       i_error("Indexer worker not compatible with this master "
-                               "(mixed old and new binaries?)");
-                       worker_connection_disconnect(conn);
-                       return;
-               }
-               conn->version_received = TRUE;
-       }
-       if (worker->process_limit == 0) {
-               if ((line = i_stream_next_line(conn->input)) == NULL)
-                       return;
-               if (str_to_uint(line, &worker->process_limit) < 0 ||
-                   worker->process_limit == 0) {
-                       i_error("Indexer worker sent invalid handshake: %s",
-                               line);
-                       worker_connection_disconnect(conn);
-                       return;
-               }
+       if (str_to_int(args[0], &percentage) < 0 ||
+           percentage < -1 || percentage > 100) {
+               e_error(conn->event, "Worker sent invalid progress '%s'", args[0]);
+               return -1;
        }
 
-       while ((line = i_stream_next_line(conn->input)) != NULL) {
-               if (worker_connection_input_line(conn, line) <= 0) {
-                       break;
-               }
-       }
-}
+       if (percentage < 0)
+               ret = -1;
 
-int worker_connection_connect(struct connection *conn)
-{
-       i_assert(conn->fd_in == -1);
+       worker_connection_call_callback(worker, percentage);
 
-       conn->fd_in = net_connect_unix(conn->base_name);
-       if (conn->fd_in == -1) {
-               i_error("connect(%s) failed: %m", conn->base_name);
-               return -1;
-       }
-       conn->io = io_add(conn->fd_in, IO_READ, worker_connection_input, conn);
-       conn->input = i_stream_create_fd(conn->fd_in, SIZE_MAX);
-       conn->output = o_stream_create_fd(conn->fd_in, SIZE_MAX);
-       o_stream_set_no_error_handling(conn->output, TRUE);
-       o_stream_nsend_str(conn->output, INDEXER_MASTER_HANDSHAKE);
-       return 0;
+       return ret;
 }
 
 bool worker_connection_is_connected(struct connection *conn)
 {
-       return conn->fd_in != -1;
+       return !conn->disconnected;
 }
 
 bool worker_connection_get_process_limit(struct connection *conn,
@@ -244,6 +190,28 @@ worker_connection_get_request(struct connection *conn)
        return worker->request;
 }
 
+static const struct connection_vfuncs worker_connection_vfuncs = {
+       .destroy = worker_connection_destroy,
+       .input_args = worker_connection_input_args,
+       .handshake_args = worker_connection_handshake_args,
+};
+
+static const struct connection_settings worker_connection_set = {
+       .service_name_in = INDEXER_WORKER_NAME,
+       .service_name_out = INDEXER_MASTER_NAME,
+       .major_version = INDEXER_PROTOCOL_MAJOR_VERSION,
+       .minor_version = INDEXER_PROTOCOL_MINOR_VERSION,
+       .input_max_size = SIZE_MAX,
+       .output_max_size = SIZE_MAX,
+       .client = TRUE,
+};
+
+struct connection_list *worker_connection_list_create(void)
+{
+       return connection_list_init(&worker_connection_set,
+                                   &worker_connection_vfuncs);
+}
+
 struct connection *
 worker_connection_create(const char *socket_path,
                         indexer_status_callback_t *callback,
@@ -253,9 +221,8 @@ worker_connection_create(const char *socket_path,
 
        conn = i_new(struct worker_connection, 1);
        conn->refcount = 1;
-       conn->conn.base_name = i_strdup(socket_path);
        conn->callback = callback;
-       conn->conn.fd_in = -1;
-       DLLIST_PREPEND(&list->connections, &conn->conn);
+       connection_init_client_unix(list, &conn->conn, socket_path);
+
        return &conn->conn;
 }
index 9b84a71221e75eb6770d618adf55c22311b396e2..58fd050660272656becf9ce58761a5ab83996e31 100644 (file)
@@ -10,11 +10,10 @@ struct connection *
 worker_connection_create(const char *socket_path,
                         indexer_status_callback_t *callback,
                         struct connection_list *list);
-void worker_connection_destroy(struct connection **conn);
+void worker_connection_unref(struct connection **_conn);
 
 struct connection_list *worker_connection_list_create(void);
 
-int worker_connection_connect(struct connection *conn);
 /* Returns TRUE if worker is connected to (not necessarily handshaked yet) */
 bool worker_connection_is_connected(struct connection *conn);
 
index 072ea50a6a11fa008436c5bf191385a6d555c8be..5fc380330b72a7888e9afc00e367be8c192bf5d1 100644 (file)
@@ -35,7 +35,8 @@ void worker_pool_deinit(struct worker_pool **_pool)
 
        *_pool = NULL;
 
-       connection_list_deinit(&pool->connection_list);
+       if (pool->connection_list != NULL)
+               connection_list_deinit(&pool->connection_list);
 
        i_free(pool->connection_list);
        i_free(pool->socket_path);