From: Timo Sirainen Date: Wed, 5 Jan 2022 22:00:21 +0000 (+0200) Subject: lib-master: anvil-client - Convert to connection API X-Git-Tag: 2.4.0~4504 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=afa1e3e723f8b0d782516d9981b9d6a11d41101d;p=thirdparty%2Fdovecot%2Fcore.git lib-master: anvil-client - Convert to connection API --- diff --git a/src/lib-master/anvil-client.c b/src/lib-master/anvil-client.c index 84202c207f..1306165c56 100644 --- a/src/lib-master/anvil-client.c +++ b/src/lib-master/anvil-client.c @@ -11,6 +11,7 @@ #include "hostpid.h" #include "array.h" #include "aqueue.h" +#include "strescape.h" #include "master-service.h" #include "anvil-client.h" @@ -31,14 +32,34 @@ struct anvil_client { bool (*reconnect_callback)(void); enum anvil_client_flags flags; + bool deinitializing; }; -#define ANVIL_HANDSHAKE "VERSION\tanvil\t2\t0\n%s\t%s\n" #define ANVIL_INBUF_SIZE 1024 #define ANVIL_RECONNECT_MIN_SECS 5 #define ANVIL_QUERY_TIMEOUT_MSECS (1000*5) -static void anvil_client_disconnect(struct anvil_client *client); +static void anvil_client_destroy(struct connection *conn); +static int anvil_client_input_line(struct connection *conn, const char *line); + +static struct connection_list *anvil_connections; + +static struct connection_settings anvil_connections_set = { + .major_version = 2, + .minor_version = 0, + .service_name_out = "anvil-client", + .service_name_in = "anvil-server", + + .input_max_size = ANVIL_INBUF_SIZE, + .output_max_size = SIZE_MAX, + + .client = TRUE, +}; + +static struct connection_vfuncs anvil_connections_vfuncs = { + .destroy = anvil_client_destroy, + .input_line = anvil_client_input_line, +}; struct anvil_client * anvil_client_init(const char *path, bool (*reconnect_callback)(void), @@ -46,11 +67,15 @@ anvil_client_init(const char *path, bool (*reconnect_callback)(void), { struct anvil_client *client; + if (anvil_connections == NULL) { + anvil_connections = connection_list_init(&anvil_connections_set, + &anvil_connections_vfuncs); + } + client = i_new(struct anvil_client, 1); - client->conn.base_name = i_strdup(path); + connection_init_client_unix(anvil_connections, &client->conn, path); client->reconnect_callback = reconnect_callback; client->flags = flags; - client->conn.fd_in = -1; i_array_init(&client->queries_arr, 32); client->queries = aqueue_init(&client->queries_arr.arr); return client; @@ -62,12 +87,17 @@ void anvil_client_deinit(struct anvil_client **_client) *_client = NULL; - anvil_client_disconnect(client); + client->deinitializing = TRUE; + anvil_client_destroy(&client->conn); + array_free(&client->queries_arr); aqueue_deinit(&client->queries); - i_free(client->conn.base_name); i_assert(client->to_reconnect == NULL); + connection_deinit(&client->conn); i_free(client); + + if (anvil_connections->connections == NULL) + connection_list_deinit(&anvil_connections); } static void anvil_client_start_multiplex_input(struct anvil_client *client) @@ -75,6 +105,8 @@ static void anvil_client_start_multiplex_input(struct anvil_client *client) struct istream *orig_input = client->conn.input; client->conn.input = i_stream_create_multiplex(orig_input, ANVIL_INBUF_SIZE); i_stream_unref(&orig_input); + + connection_streams_changed(&client->conn); } static void @@ -86,9 +118,8 @@ anvil_client_start_multiplex_output(struct anvil_client *client) o_stream_unref(&orig_output); } -static void anvil_reconnect(struct anvil_client *client) +static void anvil_client_reconnect(struct anvil_client *client) { - anvil_client_disconnect(client); if (client->reconnect_callback != NULL) { if (!client->reconnect_callback()) { /* no reconnection */ @@ -100,7 +131,7 @@ static void anvil_reconnect(struct anvil_client *client) if (client->to_reconnect == NULL) { client->to_reconnect = timeout_add(ANVIL_RECONNECT_MIN_SECS*1000, - anvil_reconnect, client); + anvil_client_reconnect, client); } } else { client->last_reconnect = ioloop_time; @@ -108,67 +139,57 @@ static void anvil_reconnect(struct anvil_client *client) } } -static void anvil_input(struct anvil_client *client) +static int anvil_client_input_line(struct connection *conn, const char *line) { + struct anvil_client *client = + container_of(conn, struct anvil_client, conn); struct anvil_query *const *queries; struct anvil_query *query; - const char *line; unsigned int count; - queries = array_get(&client->queries_arr, &count); - while ((line = i_stream_read_next_line(client->conn.input)) != NULL) { - if (aqueue_count(client->queries) == 0) { - i_error("anvil: Unexpected input: %s", line); - continue; - } - - query = queries[aqueue_idx(client->queries, 0)]; - if (query->callback != NULL) T_BEGIN { - query->callback(line, query->context); - } T_END; - i_free(query); - aqueue_delete_tail(client->queries); + if (aqueue_count(client->queries) == 0) { + e_error(client->conn.event, "Unexpected input: %s", line); + return -1; } - if (client->conn.input->stream_errno != 0) { - i_error("read(%s) failed: %s", client->conn.base_name, - i_stream_get_error(client->conn.input)); - anvil_reconnect(client); - } else if (client->conn.input->eof) { - i_error("read(%s) failed: EOF", client->conn.base_name); - anvil_reconnect(client); - } else if (client->to_query != NULL) { + + queries = array_get(&client->queries_arr, &count); + query = queries[aqueue_idx(client->queries, 0)]; + if (query->callback != NULL) T_BEGIN { + query->callback(line, query->context); + } T_END; + i_free(query); + aqueue_delete_tail(client->queries); + + if (client->to_query != NULL) { if (aqueue_count(client->queries) == 0) timeout_remove(&client->to_query); else timeout_reset(client->to_query); } + return 1; } int anvil_client_connect(struct anvil_client *client, bool retry) { - int fd; + int ret; i_assert(client->conn.fd_in == -1); - fd = retry ? net_connect_unix_with_retries(client->conn.base_name, 5000) : - net_connect_unix(client->conn.base_name); - if (fd == -1) { + ret = retry ? connection_client_connect_with_retries(&client->conn, 5000) : + connection_client_connect(&client->conn); + if (ret < 0) { if (errno != ENOENT || (client->flags & ANVIL_CLIENT_FLAG_HIDE_ENOENT) == 0) { - i_error("net_connect_unix(%s) failed: %m", + e_error(client->conn.event, + "net_connect_unix(%s) failed: %m", client->conn.base_name); } return -1; } - timeout_remove(&client->to_reconnect); - client->conn.fd_in = fd; - client->conn.input = i_stream_create_fd(fd, ANVIL_INBUF_SIZE); - client->conn.output = o_stream_create_fd(fd, SIZE_MAX); - client->conn.io = io_add(fd, IO_READ, anvil_input, client); const char *anvil_handshake = - t_strdup_printf(ANVIL_HANDSHAKE, + t_strdup_printf("%s\t%s\n", master_service_get_name(master_service), my_pid); o_stream_nsend_str(client->conn.output, anvil_handshake); @@ -193,34 +214,35 @@ static void anvil_client_cancel_queries(struct anvil_client *client) timeout_remove(&client->to_query); } -static void anvil_client_disconnect(struct anvil_client *client) +static void anvil_client_destroy(struct connection *conn) { + struct anvil_client *client = + container_of(conn, struct anvil_client, conn); + + connection_disconnect(&client->conn); anvil_client_cancel_queries(client); - if (client->conn.fd_in != -1) { - io_remove(&client->conn.io); - i_stream_destroy(&client->conn.input); - o_stream_destroy(&client->conn.output); - net_disconnect(client->conn.fd_in); - client->conn.fd_in = -1; - } timeout_remove(&client->to_reconnect); + + if (!client->deinitializing) + anvil_client_reconnect(client); } static void anvil_client_timeout(struct anvil_client *client) { i_assert(aqueue_count(client->queries) > 0); - i_error("%s: Anvil queries timed out after %u secs - aborting queries", - client->conn.base_name, ANVIL_QUERY_TIMEOUT_MSECS/1000); + e_error(client->conn.event, + "Anvil queries timed out after %u secs - aborting queries", + ANVIL_QUERY_TIMEOUT_MSECS/1000); /* perhaps reconnect helps */ - anvil_reconnect(client); + anvil_client_destroy(&client->conn); } static int anvil_client_send(struct anvil_client *client, const char *cmd) { struct const_iovec iov[2]; - if (client->conn.fd_in == -1) { + if (client->conn.disconnected) { if (anvil_client_connect(client, FALSE) < 0) return -1; } @@ -284,5 +306,5 @@ void anvil_client_cmd(struct anvil_client *client, const char *cmd) bool anvil_client_is_connected(struct anvil_client *client) { - return client->conn.fd_in != -1; + return !client->conn.disconnected; }