From: Aki Tuomi Date: Thu, 6 May 2021 09:03:16 +0000 (+0300) Subject: indexer: indexer-worker: Use connection.c functions X-Git-Tag: 2.3.16~45 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=bf42ae1ae719a743d43fb252946f0eb4ec684c77;p=thirdparty%2Fdovecot%2Fcore.git indexer: indexer-worker: Use connection.c functions --- diff --git a/src/indexer/indexer-worker.c b/src/indexer/indexer-worker.c index 4f75dd4c5e..ef39aebb2f 100644 --- a/src/indexer/indexer-worker.c +++ b/src/indexer/indexer-worker.c @@ -12,13 +12,13 @@ static struct mail_storage_service_ctx *storage_service; static void client_connected(struct master_service_connection *conn) { - if (master_conn != NULL) { + master_service_client_connection_accept(conn); + + if (!master_connection_create(conn, storage_service)) { i_error("indexer-worker must be configured with client_limit=1"); - return; + i_close_fd(&conn->fd); + master_service_client_connection_destroyed(master_service); } - - master_service_client_connection_accept(conn); - master_conn = master_connection_create(conn->fd, storage_service); } static void drop_privileges(void) @@ -75,8 +75,7 @@ int main(int argc, char *argv[]) master_service_run(master_service, client_connected); - if (master_conn != NULL) - master_connection_destroy(); + master_connections_destroy(); mail_storage_service_deinit(&storage_service); master_service_deinit(&master_service); return 0; diff --git a/src/indexer/master-connection.c b/src/indexer/master-connection.c index 2d484d7be4..7cb426cc68 100644 --- a/src/indexer/master-connection.c +++ b/src/indexer/master-connection.c @@ -4,7 +4,7 @@ #include "connection.h" #include "ioloop.h" #include "istream.h" -#include "write-full.h" +#include "ostream.h" #include "strescape.h" #include "process-title.h" #include "master-service.h" @@ -20,10 +20,8 @@ #define INDEXER_PROTOCOL_MAJOR_VERSION 1 #define INDEXER_PROTOCOL_MINOR_VERSION 0 -#define INDEXER_WORKER_HANDSHAKE "VERSION\tindexer-worker-master\t1\t0\n%u\n" #define INDEXER_MASTER_NAME "indexer-master-worker" - -struct master_connection *master_conn; +#define INDEXER_WORKER_NAME "indexer-worker-master" struct master_connection { struct connection conn; @@ -129,8 +127,8 @@ index_mailbox_precache(struct master_connection *conn, struct mailbox *box) sizeof(percentage_str), "%u\n", percentage) < 0) i_unreached(); - (void)write_full(conn->conn.fd_in, percentage_str, - strlen(percentage_str)); + o_stream_nsend_str(conn->conn.output, + percentage_str); } indexer_worker_refresh_proctitle(username, box_vname, counter, max); @@ -240,9 +238,10 @@ index_mailbox(struct master_connection *conn, struct mail_user *user, } static int -master_connection_input_line(struct master_connection *conn, const char *line) +master_connection_input_args(struct connection *_conn, const char *const *args) { - const char *const *args = t_strsplit_tabescaped(line); + struct master_connection *conn = + container_of(_conn, struct master_connection, conn); struct mail_storage_service_input input; struct mail_storage_service_user *service_user; struct mail_user *user; @@ -253,7 +252,8 @@ master_connection_input_line(struct master_connection *conn, const char *line) /* [i][o] */ if (str_array_length(args) != 5 || str_to_uint(args[3], &max_recent_msgs) < 0 || args[4][0] == '\0') { - i_error("Invalid input from master: %s", line); + i_error("Invalid input from master: %s", + t_strarray_join(args, "\t")); return -1; } @@ -285,74 +285,70 @@ master_connection_input_line(struct master_connection *conn, const char *line) } str = ret < 0 ? "-1\n" : "100\n"; - return write_full(conn->conn.fd_in, str, strlen(str)); + o_stream_nsend_str(conn->conn.output, str); + return ret; } -static void master_connection_input(struct master_connection *conn) +static void master_connection_destroy(struct connection *connection) { - const char *line; - int ret; + connection_deinit(connection); + i_free(connection); + master_service_client_connection_destroyed(master_service); +} - if (i_stream_read(conn->conn.input) < 0) { - master_connection_destroy(); - return; - } +static int master_connection_handshake_args(struct connection *connection, + const char *const *args) +{ + int ret; + if ((ret = connection_handshake_args_default(connection, args)) < 1) + return ret; + const char *limit = t_strdup_printf("%u\n", + master_service_get_process_limit(master_service)); + o_stream_nsend_str(connection->output, limit); + return 1; +} - if (!conn->conn.version_received) { - if ((line = i_stream_next_line(conn->conn.input)) == NULL) - return; +static struct connection_list *master_connection_list = NULL; - if (!version_string_verify(line, INDEXER_MASTER_NAME, - INDEXER_PROTOCOL_MAJOR_VERSION)) { - i_error("Indexer master not compatible with this master " - "(mixed old and new binaries?)"); - master_connection_destroy(); - return; - } - conn->conn.version_received = TRUE; - } +static const struct connection_vfuncs master_connection_vfuncs = { + .destroy = master_connection_destroy, + .input_args = master_connection_input_args, + .handshake_args = master_connection_handshake_args, +}; - while ((line = i_stream_next_line(conn->conn.input)) != NULL) { - T_BEGIN { - ret = master_connection_input_line(conn, line); - } T_END; - if (ret < 0) { - master_connection_destroy(); - break; - } - } -} +static const struct connection_settings master_connection_set = { + .service_name_in = INDEXER_MASTER_NAME, + .service_name_out = INDEXER_WORKER_NAME, + .major_version = INDEXER_PROTOCOL_MAJOR_VERSION, + .minor_version = INDEXER_PROTOCOL_MINOR_VERSION, + .input_max_size = SIZE_MAX, + .output_max_size = SIZE_MAX, +}; -struct master_connection * -master_connection_create(int fd, struct mail_storage_service_ctx *storage_service) +bool +master_connection_create(struct master_service_connection *master, + struct mail_storage_service_ctx *storage_service) { struct master_connection *conn; - const char *handshake; + + if (master_connection_list == NULL) { + master_connection_list = + connection_list_init(&master_connection_set, + &master_connection_vfuncs); + } else if (master_connection_list->connections_count > 0) { + return FALSE; + } conn = i_new(struct master_connection, 1); conn->storage_service = storage_service; - conn->conn.fd_in = fd; - conn->conn.io = io_add(conn->conn.fd_in, IO_READ, master_connection_input, conn); - conn->conn.input = i_stream_create_fd(conn->conn.fd_in, SIZE_MAX); + connection_init_server(master_connection_list, &conn->conn, + master->name, master->fd, master->fd); - handshake = t_strdup_printf(INDEXER_WORKER_HANDSHAKE, - master_service_get_process_limit(master_service)); - (void)write_full(conn->conn.fd_in, handshake, strlen(handshake)); - return conn; + return TRUE; } -void master_connection_destroy(void) +void master_connections_destroy(void) { - struct master_connection *conn = master_conn; - - master_conn = NULL; - - io_remove(&conn->conn.io); - i_stream_destroy(&conn->conn.input); - - if (close(conn->conn.fd_in) < 0) - i_error("close(master conn) failed: %m"); - i_free(conn); - - master_service_client_connection_destroyed(master_service); + if (master_connection_list != NULL) + connection_list_deinit(&master_connection_list); } diff --git a/src/indexer/master-connection.h b/src/indexer/master-connection.h index 85ac344695..7dcb70e296 100644 --- a/src/indexer/master-connection.h +++ b/src/indexer/master-connection.h @@ -1,10 +1,8 @@ #ifndef MASTER_CONNECTION_H #define MASTER_CONNECTION_H -extern struct master_connection *master_conn; - -struct master_connection * -master_connection_create(int fd, struct mail_storage_service_ctx *storage_service); -void master_connection_destroy(void); +bool master_connection_create(struct master_service_connection *conn, + struct mail_storage_service_ctx *storage_service); +void master_connections_destroy(void); #endif