From: Aki Tuomi Date: Thu, 6 May 2021 07:29:23 +0000 (+0300) Subject: indexer: worker-connection - Use connection.h structures X-Git-Tag: 2.3.16~51 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=4225108434fe5039f97f4fdf39086d1522c3ba7f;p=thirdparty%2Fdovecot%2Fcore.git indexer: worker-connection - Use connection.h structures --- diff --git a/src/indexer/indexer.c b/src/indexer/indexer.c index 504e3b32c6..711a0608db 100644 --- a/src/indexer/indexer.c +++ b/src/indexer/indexer.c @@ -12,7 +12,7 @@ #include "worker-connection.h" struct worker_request { - struct worker_connection *conn; + struct connection *conn; struct indexer_request *request; }; @@ -43,7 +43,7 @@ static void client_connected(struct master_service_connection *conn) (void)indexer_client_create(conn->fd, queue); } -static void worker_send_request(struct worker_connection *conn, +static void worker_send_request(struct connection *conn, struct indexer_request *request) { struct worker_request *wrequest; @@ -58,7 +58,7 @@ static void worker_send_request(struct worker_connection *conn, static void queue_try_send_more(struct indexer_queue *queue) { - struct worker_connection *conn; + struct connection *conn; struct indexer_request *request, *first_moved_request = NULL; timeout_remove(&to_send_more); @@ -98,7 +98,7 @@ static void queue_listen_callback(struct indexer_queue *queue) static void worker_status_callback(int percentage, void *context) { - struct worker_connection *conn = context; + struct connection *conn = context; struct indexer_request *request = worker_connection_get_request(conn); if (percentage >= 0 && percentage < 100) { diff --git a/src/indexer/worker-connection.c b/src/indexer/worker-connection.c index 72ccfbcbee..a2f619576a 100644 --- a/src/indexer/worker-connection.c +++ b/src/indexer/worker-connection.c @@ -3,6 +3,7 @@ #include "lib.h" #include "array.h" #include "aqueue.h" +#include "connection.h" #include "ioloop.h" #include "istream.h" #include "ostream.h" @@ -21,21 +22,16 @@ #define INDEXER_WORKER_NAME "indexer-worker-master" struct worker_connection { + struct connection conn; + int refcount; - char *socket_path; indexer_status_callback_t *callback; - int fd; - struct io *io; - struct istream *input; - struct ostream *output; - char *request_username; struct indexer_request *request; unsigned int process_limit; - bool version_received:1; }; static void worker_connection_unref(struct worker_connection *conn) @@ -44,41 +40,47 @@ static void worker_connection_unref(struct worker_connection *conn) if (--conn->refcount > 0) return; - i_free(conn->socket_path); + i_free(conn->conn.base_name); i_free(conn); } -static void worker_connection_disconnect(struct worker_connection *conn) +static void worker_connection_disconnect(struct connection *conn) { - if (conn->fd != -1) { + struct worker_connection *worker = + container_of(conn, struct worker_connection, conn); + if (conn->fd_in != -1) { io_remove(&conn->io); i_stream_destroy(&conn->input); o_stream_destroy(&conn->output); - if (close(conn->fd) < 0) - i_error("close(%s) failed: %m", conn->socket_path); - conn->fd = -1; + if (close(conn->fd_in) < 0) + i_error("close(%s) failed: %m", conn->base_name); + conn->fd_in = -1; } /* conn->callback() can try to destroy us */ - conn->refcount++; - i_free_and_null(conn->request_username); - worker_connection_unref(conn); + worker->refcount++; + i_free_and_null(worker->request_username); + worker_connection_unref(worker); } -void worker_connection_destroy(struct worker_connection **_conn) +void worker_connection_destroy(struct connection **_conn) { - struct worker_connection *conn = *_conn; + struct connection *conn = *_conn; + struct worker_connection *worker = + container_of(conn, struct worker_connection, conn); *_conn = NULL; worker_connection_disconnect(conn); - worker_connection_unref(conn); + worker_connection_unref(worker); } static int -worker_connection_input_line(struct worker_connection *conn, const char *line) +worker_connection_input_line(struct connection *conn, const char *line) { + struct worker_connection *worker = + container_of(conn, struct worker_connection, conn); int percentage; /* return -1 -> error 0 -> request completed (100%) @@ -98,12 +100,14 @@ worker_connection_input_line(struct worker_connection *conn, const char *line) else if (percentage == 100) ret = 0; - conn->callback(percentage, conn); + worker->callback(percentage, conn); return ret; } -static void worker_connection_input(struct worker_connection *conn) +static void worker_connection_input(struct connection *conn) { + struct worker_connection *worker = + container_of(conn, struct worker_connection, conn); const char *line; if (i_stream_read(conn->input) < 0) { @@ -124,11 +128,11 @@ static void worker_connection_input(struct worker_connection *conn) } conn->version_received = TRUE; } - if (conn->process_limit == 0) { + if (worker->process_limit == 0) { if ((line = i_stream_next_line(conn->input)) == NULL) return; - if (str_to_uint(line, &conn->process_limit) < 0 || - conn->process_limit == 0) { + if (str_to_uint(line, &worker->process_limit) < 0 || + worker->process_limit == 0) { i_error("Indexer worker sent invalid handshake: %s", line); worker_connection_disconnect(conn); @@ -143,54 +147,60 @@ static void worker_connection_input(struct worker_connection *conn) } } -int worker_connection_connect(struct worker_connection *conn) +int worker_connection_connect(struct connection *conn) { - i_assert(conn->fd == -1); + i_assert(conn->fd_in == -1); - conn->fd = net_connect_unix(conn->socket_path); - if (conn->fd == -1) { - i_error("connect(%s) failed: %m", conn->socket_path); + conn->fd_in = net_connect_unix(conn->base_name); + if (conn->fd_in == -1) { + i_error("connect(%s) failed: %m", conn->base_name); return -1; } - conn->io = io_add(conn->fd, IO_READ, worker_connection_input, conn); - conn->input = i_stream_create_fd(conn->fd, SIZE_MAX); - conn->output = o_stream_create_fd(conn->fd, SIZE_MAX); + conn->io = io_add(conn->fd_in, IO_READ, worker_connection_input, conn); + conn->input = i_stream_create_fd(conn->fd_in, SIZE_MAX); + conn->output = o_stream_create_fd(conn->fd_in, SIZE_MAX); o_stream_set_no_error_handling(conn->output, TRUE); o_stream_nsend_str(conn->output, INDEXER_MASTER_HANDSHAKE); return 0; } -bool worker_connection_is_connected(struct worker_connection *conn) +bool worker_connection_is_connected(struct connection *conn) { - return conn->fd != -1; + return conn->fd_in != -1; } -bool worker_connection_get_process_limit(struct worker_connection *conn, +bool worker_connection_get_process_limit(struct connection *conn, unsigned int *limit_r) { - if (conn->process_limit == 0) + struct worker_connection *worker = + container_of(conn, struct worker_connection, conn); + + if (worker->process_limit == 0) return FALSE; - *limit_r = conn->process_limit; + *limit_r = worker->process_limit; return TRUE; } -void worker_connection_request(struct worker_connection *conn, +void worker_connection_request(struct connection *conn, struct indexer_request *request, void *context) { + struct worker_connection *worker = + container_of(conn, struct worker_connection, conn); + i_assert(worker_connection_is_connected(conn)); i_assert(context != NULL); i_assert(request->index || request->optimize); - if (conn->request_username == NULL) - conn->request_username = i_strdup(request->username); + if (worker->request_username == NULL) + worker->request_username = i_strdup(request->username); else { - i_assert(strcmp(conn->request_username, + i_assert(strcmp(worker->request_username, request->username) == 0); } - conn->request = request; + worker->request = request; T_BEGIN { string_t *str = t_str_new(128); @@ -211,33 +221,38 @@ void worker_connection_request(struct worker_connection *conn, } T_END; } -bool worker_connection_is_busy(struct worker_connection *conn) +bool worker_connection_is_busy(struct connection *conn) { - return conn->request != NULL; + struct worker_connection *worker = + container_of(conn, struct worker_connection, conn); + return worker->request != NULL; } -const char *worker_connection_get_username(struct worker_connection *conn) +const char *worker_connection_get_username(struct connection *conn) { - return conn->request_username; + struct worker_connection *worker = + container_of(conn, struct worker_connection, conn); + return worker->request_username; } struct indexer_request * -worker_connection_get_request(struct worker_connection *conn) +worker_connection_get_request(struct connection *conn) { - return conn->request; + struct worker_connection *worker = + container_of(conn, struct worker_connection, conn); + return worker->request; } -struct worker_connection * +struct connection * worker_connection_create(const char *socket_path, indexer_status_callback_t *callback) { struct worker_connection *conn; conn = i_new(struct worker_connection, 1); - conn->refcount = 1; - conn->socket_path = i_strdup(socket_path); + conn->conn.base_name = i_strdup(socket_path); conn->callback = callback; - conn->fd = -1; - return conn; + conn->conn.fd_in = -1; + return &conn->conn; } diff --git a/src/indexer/worker-connection.h b/src/indexer/worker-connection.h index e6c320d5cb..80f83fb1b4 100644 --- a/src/indexer/worker-connection.h +++ b/src/indexer/worker-connection.h @@ -5,34 +5,34 @@ struct indexer_request; -struct worker_connection * +struct connection * worker_connection_create(const char *socket_path, indexer_status_callback_t *callback); -void worker_connection_destroy(struct worker_connection **conn); +void worker_connection_destroy(struct connection **conn); -int worker_connection_connect(struct worker_connection *conn); +int worker_connection_connect(struct connection *conn); /* Returns TRUE if worker is connected to (not necessarily handshaked yet) */ -bool worker_connection_is_connected(struct worker_connection *conn); +bool worker_connection_is_connected(struct connection *conn); /* After initial handshake the worker process tells how many of its kind can be at maximum. This returns the value, of FALSE if handshake isn't finished yet. */ -bool worker_connection_get_process_limit(struct worker_connection *conn, +bool worker_connection_get_process_limit(struct connection *conn, unsigned int *limit_r); /* Send a new indexing request for username+mailbox. The status callback is called as necessary with the given context. Requests can be queued, but only for the same username. */ -void worker_connection_request(struct worker_connection *conn, +void worker_connection_request(struct connection *conn, struct indexer_request *request, void *context); /* Returns TRUE if a request is being handled. */ -bool worker_connection_is_busy(struct worker_connection *conn); +bool worker_connection_is_busy(struct connection *conn); /* Returns username of the currently pending requests, or NULL if there are none. */ -const char *worker_connection_get_username(struct worker_connection *conn); +const char *worker_connection_get_username(struct connection *conn); struct indexer_request * -worker_connection_get_request(struct worker_connection *conn); +worker_connection_get_request(struct connection *conn); #endif diff --git a/src/indexer/worker-pool.c b/src/indexer/worker-pool.c index e7eef51007..14df98c439 100644 --- a/src/indexer/worker-pool.c +++ b/src/indexer/worker-pool.c @@ -12,7 +12,7 @@ struct worker_connection_list { struct worker_connection_list *prev, *next; - struct worker_connection *conn; + struct connection *conn; time_t last_use; }; @@ -62,9 +62,9 @@ bool worker_pool_have_busy_connections(struct worker_pool *pool) } static int worker_pool_add_connection(struct worker_pool *pool, - struct worker_connection **conn_r) + struct connection **conn_r) { - struct worker_connection *conn; + struct connection *conn; pool->connection_count++; conn = worker_connection_create(pool->socket_path, pool->callback); @@ -106,7 +106,7 @@ static unsigned int worker_pool_find_max_connections(struct worker_pool *pool) } bool worker_pool_get_connection(struct worker_pool *pool, - struct worker_connection **conn_r) + struct connection **conn_r) { struct worker_connection_list *list; unsigned int max_connections; @@ -125,7 +125,7 @@ bool worker_pool_get_connection(struct worker_pool *pool, } void worker_pool_release_connection(struct worker_pool *pool, - struct worker_connection *conn) + struct connection *conn) { struct worker_connection_list *list; @@ -141,7 +141,7 @@ void worker_pool_release_connection(struct worker_pool *pool, worker_connection_destroy(&conn); } -struct worker_connection * +struct connection * worker_pool_find_username_connection(struct worker_pool *pool, const char *username) { diff --git a/src/indexer/worker-pool.h b/src/indexer/worker-pool.h index 80ec4b0bc0..057d67f37c 100644 --- a/src/indexer/worker-pool.h +++ b/src/indexer/worker-pool.h @@ -3,7 +3,7 @@ #include "indexer.h" -struct worker_connection; +struct connection; struct worker_pool * worker_pool_init(const char *socket_path, indexer_status_callback_t *callback); @@ -12,11 +12,11 @@ void worker_pool_deinit(struct worker_pool **pool); bool worker_pool_have_busy_connections(struct worker_pool *pool); bool worker_pool_get_connection(struct worker_pool *pool, - struct worker_connection **conn_r); + struct connection **conn_r); void worker_pool_release_connection(struct worker_pool *pool, - struct worker_connection *conn); + struct connection *conn); -struct worker_connection * +struct connection * worker_pool_find_username_connection(struct worker_pool *pool, const char *username);