]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
lib-master: anvil-client - Convert to connection API
authorTimo Sirainen <timo.sirainen@open-xchange.com>
Wed, 5 Jan 2022 22:00:21 +0000 (00:00 +0200)
committerTimo Sirainen <timo.sirainen@open-xchange.com>
Tue, 8 Feb 2022 09:48:24 +0000 (10:48 +0100)
src/lib-master/anvil-client.c

index 84202c207f70083ecd1cca361539ea8ab7e26997..1306165c56dca1f6793292851085a11753017594 100644 (file)
@@ -11,6 +11,7 @@
 #include "hostpid.h"
 #include "array.h"
 #include "aqueue.h"
+#include "strescape.h"
 #include "master-service.h"
 #include "anvil-client.h"
 
@@ -31,14 +32,34 @@ struct anvil_client {
 
        bool (*reconnect_callback)(void);
        enum anvil_client_flags flags;
+       bool deinitializing;
 };
 
-#define ANVIL_HANDSHAKE "VERSION\tanvil\t2\t0\n%s\t%s\n"
 #define ANVIL_INBUF_SIZE 1024
 #define ANVIL_RECONNECT_MIN_SECS 5
 #define ANVIL_QUERY_TIMEOUT_MSECS (1000*5)
 
-static void anvil_client_disconnect(struct anvil_client *client);
+static void anvil_client_destroy(struct connection *conn);
+static int anvil_client_input_line(struct connection *conn, const char *line);
+
+static struct connection_list *anvil_connections;
+
+static struct connection_settings anvil_connections_set = {
+       .major_version = 2,
+       .minor_version = 0,
+       .service_name_out = "anvil-client",
+       .service_name_in = "anvil-server",
+
+       .input_max_size = ANVIL_INBUF_SIZE,
+       .output_max_size = SIZE_MAX,
+
+       .client = TRUE,
+};
+
+static struct connection_vfuncs anvil_connections_vfuncs = {
+       .destroy = anvil_client_destroy,
+       .input_line = anvil_client_input_line,
+};
 
 struct anvil_client *
 anvil_client_init(const char *path, bool (*reconnect_callback)(void),
@@ -46,11 +67,15 @@ anvil_client_init(const char *path, bool (*reconnect_callback)(void),
 {
        struct anvil_client *client;
 
+       if (anvil_connections == NULL) {
+               anvil_connections = connection_list_init(&anvil_connections_set,
+                                                        &anvil_connections_vfuncs);
+       }
+
        client = i_new(struct anvil_client, 1);
-       client->conn.base_name = i_strdup(path);
+       connection_init_client_unix(anvil_connections, &client->conn, path);
        client->reconnect_callback = reconnect_callback;
        client->flags = flags;
-       client->conn.fd_in = -1;
        i_array_init(&client->queries_arr, 32);
        client->queries = aqueue_init(&client->queries_arr.arr);
        return client;
@@ -62,12 +87,17 @@ void anvil_client_deinit(struct anvil_client **_client)
 
        *_client = NULL;
 
-       anvil_client_disconnect(client);
+       client->deinitializing = TRUE;
+       anvil_client_destroy(&client->conn);
+
        array_free(&client->queries_arr);
        aqueue_deinit(&client->queries);
-       i_free(client->conn.base_name);
        i_assert(client->to_reconnect == NULL);
+       connection_deinit(&client->conn);
        i_free(client);
+
+       if (anvil_connections->connections == NULL)
+               connection_list_deinit(&anvil_connections);
 }
 
 static void anvil_client_start_multiplex_input(struct anvil_client *client)
@@ -75,6 +105,8 @@ static void anvil_client_start_multiplex_input(struct anvil_client *client)
        struct istream *orig_input = client->conn.input;
        client->conn.input = i_stream_create_multiplex(orig_input, ANVIL_INBUF_SIZE);
        i_stream_unref(&orig_input);
+
+       connection_streams_changed(&client->conn);
 }
 
 static void
@@ -86,9 +118,8 @@ anvil_client_start_multiplex_output(struct anvil_client *client)
        o_stream_unref(&orig_output);
 }
 
-static void anvil_reconnect(struct anvil_client *client)
+static void anvil_client_reconnect(struct anvil_client *client)
 {
-       anvil_client_disconnect(client);
        if (client->reconnect_callback != NULL) {
                if (!client->reconnect_callback()) {
                        /* no reconnection */
@@ -100,7 +131,7 @@ static void anvil_reconnect(struct anvil_client *client)
                if (client->to_reconnect == NULL) {
                        client->to_reconnect =
                                timeout_add(ANVIL_RECONNECT_MIN_SECS*1000,
-                                           anvil_reconnect, client);
+                                           anvil_client_reconnect, client);
                }
        } else {
                client->last_reconnect = ioloop_time;
@@ -108,67 +139,57 @@ static void anvil_reconnect(struct anvil_client *client)
        }
 }
 
-static void anvil_input(struct anvil_client *client)
+static int anvil_client_input_line(struct connection *conn, const char *line)
 {
+       struct anvil_client *client =
+               container_of(conn, struct anvil_client, conn);
        struct anvil_query *const *queries;
        struct anvil_query *query;
-       const char *line;
        unsigned int count;
 
-       queries = array_get(&client->queries_arr, &count);
-       while ((line = i_stream_read_next_line(client->conn.input)) != NULL) {
-               if (aqueue_count(client->queries) == 0) {
-                       i_error("anvil: Unexpected input: %s", line);
-                       continue;
-               }
-
-               query = queries[aqueue_idx(client->queries, 0)];
-               if (query->callback != NULL) T_BEGIN {
-                       query->callback(line, query->context);
-               } T_END;
-               i_free(query);
-               aqueue_delete_tail(client->queries);
+       if (aqueue_count(client->queries) == 0) {
+               e_error(client->conn.event, "Unexpected input: %s", line);
+               return -1;
        }
-       if (client->conn.input->stream_errno != 0) {
-               i_error("read(%s) failed: %s", client->conn.base_name,
-                       i_stream_get_error(client->conn.input));
-               anvil_reconnect(client);
-       } else if (client->conn.input->eof) {
-               i_error("read(%s) failed: EOF", client->conn.base_name);
-               anvil_reconnect(client);
-       } else if (client->to_query != NULL) {
+
+       queries = array_get(&client->queries_arr, &count);
+       query = queries[aqueue_idx(client->queries, 0)];
+       if (query->callback != NULL) T_BEGIN {
+               query->callback(line, query->context);
+       } T_END;
+       i_free(query);
+       aqueue_delete_tail(client->queries);
+
+       if (client->to_query != NULL) {
                if (aqueue_count(client->queries) == 0)
                        timeout_remove(&client->to_query);
                else
                        timeout_reset(client->to_query);
        }
+       return 1;
 }
 
 int anvil_client_connect(struct anvil_client *client, bool retry)
 {
-       int fd;
+       int ret;
 
        i_assert(client->conn.fd_in == -1);
 
-       fd = retry ? net_connect_unix_with_retries(client->conn.base_name, 5000) :
-               net_connect_unix(client->conn.base_name);
-       if (fd == -1) {
+       ret = retry ? connection_client_connect_with_retries(&client->conn, 5000) :
+               connection_client_connect(&client->conn);
+       if (ret < 0) {
                if (errno != ENOENT ||
                    (client->flags & ANVIL_CLIENT_FLAG_HIDE_ENOENT) == 0) {
-                       i_error("net_connect_unix(%s) failed: %m",
+                       e_error(client->conn.event,
+                               "net_connect_unix(%s) failed: %m",
                                client->conn.base_name);
                }
                return -1;
        }
-
        timeout_remove(&client->to_reconnect);
 
-       client->conn.fd_in = fd;
-       client->conn.input = i_stream_create_fd(fd, ANVIL_INBUF_SIZE);
-       client->conn.output = o_stream_create_fd(fd, SIZE_MAX);
-       client->conn.io = io_add(fd, IO_READ, anvil_input, client);
        const char *anvil_handshake =
-               t_strdup_printf(ANVIL_HANDSHAKE,
+               t_strdup_printf("%s\t%s\n",
                                master_service_get_name(master_service),
                                my_pid);
        o_stream_nsend_str(client->conn.output, anvil_handshake);
@@ -193,34 +214,35 @@ static void anvil_client_cancel_queries(struct anvil_client *client)
        timeout_remove(&client->to_query);
 }
 
-static void anvil_client_disconnect(struct anvil_client *client)
+static void anvil_client_destroy(struct connection *conn)
 {
+       struct anvil_client *client =
+               container_of(conn, struct anvil_client, conn);
+
+       connection_disconnect(&client->conn);
        anvil_client_cancel_queries(client);
-       if (client->conn.fd_in != -1) {
-               io_remove(&client->conn.io);
-               i_stream_destroy(&client->conn.input);
-               o_stream_destroy(&client->conn.output);
-               net_disconnect(client->conn.fd_in);
-               client->conn.fd_in = -1;
-       }
        timeout_remove(&client->to_reconnect);
+
+       if (!client->deinitializing)
+               anvil_client_reconnect(client);
 }
 
 static void anvil_client_timeout(struct anvil_client *client)
 {
        i_assert(aqueue_count(client->queries) > 0);
 
-       i_error("%s: Anvil queries timed out after %u secs - aborting queries",
-               client->conn.base_name, ANVIL_QUERY_TIMEOUT_MSECS/1000);
+       e_error(client->conn.event,
+               "Anvil queries timed out after %u secs - aborting queries",
+               ANVIL_QUERY_TIMEOUT_MSECS/1000);
        /* perhaps reconnect helps */
-       anvil_reconnect(client);
+       anvil_client_destroy(&client->conn);
 }
 
 static int anvil_client_send(struct anvil_client *client, const char *cmd)
 {
        struct const_iovec iov[2];
 
-       if (client->conn.fd_in == -1) {
+       if (client->conn.disconnected) {
                if (anvil_client_connect(client, FALSE) < 0)
                        return -1;
        }
@@ -284,5 +306,5 @@ void anvil_client_cmd(struct anvil_client *client, const char *cmd)
 
 bool anvil_client_is_connected(struct anvil_client *client)
 {
-       return client->conn.fd_in != -1;
+       return !client->conn.disconnected;
 }