From: Aki Tuomi Date: Wed, 26 May 2021 10:55:52 +0000 (+0300) Subject: plugins/fts: fts-indexer - Use connection.c functions X-Git-Tag: 2.3.16~38 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=cf114f90e0ba25c18db846ee582e3a130bd52949;p=thirdparty%2Fdovecot%2Fcore.git plugins/fts: fts-indexer - Use connection.c functions --- diff --git a/src/plugins/fts/fts-indexer.c b/src/plugins/fts/fts-indexer.c index 293ab72124..1e30274b1e 100644 --- a/src/plugins/fts/fts-indexer.c +++ b/src/plugins/fts/fts-indexer.c @@ -3,9 +3,8 @@ #include "lib.h" #include "ioloop.h" #include "connection.h" -#include "net.h" #include "istream.h" -#include "write-full.h" +#include "ostream.h" #include "strescape.h" #include "time-util.h" #include "settings-parser.h" @@ -15,49 +14,24 @@ #include "fts-indexer.h" #define INDEXER_NOTIFY_INTERVAL_SECS 10 - #define INDEXER_SOCKET_NAME "indexer" #define INDEXER_WAIT_MSECS 250 -#define INDEXER_HANDSHAKE "VERSION\tindexer\t1\t0\n" struct fts_indexer_context { struct connection conn; struct mailbox *box; + struct ioloop *ioloop; struct timeval search_start_time, last_notify; unsigned int percentage; - unsigned int timeout_secs; - struct connection_list *connections; + struct connection_list *connection_list; bool notified:1; bool failed:1; + bool completed:1; }; -int fts_indexer_cmd(struct mail_user *user, const char *cmd, - const char **path_r) -{ - const char *path; - int fd; - - path = t_strconcat(user->set->base_dir, - "/"INDEXER_SOCKET_NAME, NULL); - fd = net_connect_unix_with_retries(path, 1000); - if (fd == -1) { - i_error("net_connect_unix(%s) failed: %m", path); - return -1; - } - - cmd = t_strconcat(INDEXER_HANDSHAKE, cmd, NULL); - if (write_full(fd, cmd, strlen(cmd)) < 0) { - i_error("write(%s) failed: %m", path); - i_close_fd(&fd); - return -1; - } - *path_r = path; - return fd; -} - static void fts_indexer_notify(struct fts_indexer_context *ctx) { unsigned long long elapsed_msecs, est_total_msecs; @@ -89,127 +63,174 @@ static void fts_indexer_notify(struct fts_indexer_context *ctx) } T_END; } +static int fts_indexer_more_int(struct fts_indexer_context *ctx) +{ + struct ioloop *prev_ioloop = current_ioloop; + struct timeout *to; + + if (ctx->failed) + return -1; + if (ctx->completed) + return 1; + + /* wait for a while for the reply. FIXME: once search API supports + asynchronous waits, get rid of this wait and use the mail IO loop */ + io_loop_set_current(ctx->ioloop); + to = timeout_add_short(INDEXER_WAIT_MSECS, io_loop_stop, ctx->ioloop); + io_loop_run(ctx->ioloop); + timeout_remove(&to); + io_loop_set_current(prev_ioloop); + + if (ctx->failed) + return -1; + if (ctx->completed) + return 1; + return 0; +} + +int fts_indexer_more(struct fts_indexer_context *ctx) +{ + int ret; + + if ((ret = fts_indexer_more_int(ctx)) < 0) { + mail_storage_set_internal_error(ctx->box->storage); + ctx->failed = TRUE; + return -1; + } + + if (ret == 0) + fts_indexer_notify(ctx); + + return ret; +} + +static void fts_indexer_destroy(struct connection *conn) +{ + struct fts_indexer_context *ctx = + container_of(conn, struct fts_indexer_context, conn); + connection_deinit(conn); + if (!ctx->completed) + ctx->failed = TRUE; + ctx->completed = TRUE; +} + int fts_indexer_deinit(struct fts_indexer_context **_ctx) { struct fts_indexer_context *ctx = *_ctx; - int ret = ctx->failed ? -1 : 0; - + i_assert(ctx != NULL); *_ctx = NULL; - - i_stream_destroy(&ctx->conn.input); - if (close(ctx->conn.fd_in) < 0) - i_error("close(%s) failed: %m", ctx->conn.label); + if (!ctx->completed) + ctx->failed = TRUE; + int ret = ctx->failed ? -1 : 0; if (ctx->notified) { /* we notified at least once */ ctx->box->storage->callbacks. notify_ok(ctx->box, "Mailbox indexing finished", ctx->box->storage->callback_context); } - i_free(ctx->conn.label); + connection_list_deinit(&ctx->connection_list); + io_loop_set_current(ctx->ioloop); + io_loop_destroy(&ctx->ioloop); i_free(ctx); return ret; } -static int fts_indexer_input(struct fts_indexer_context *ctx) +static int +fts_indexer_input_args(struct connection *conn, const char *const *args) { - const char *line; + struct fts_indexer_context *ctx = + container_of(conn, struct fts_indexer_context, conn); int percentage; - - while ((line = i_stream_read_next_line(ctx->conn.input)) != NULL) { - /* initial reply: \t OK - following: \t */ - if (!str_begins(line, "1\t")) { - i_error("indexer sent invalid reply: %s", line); - return -1; - } - line += 2; - if (strcmp(line, "OK") == 0) - continue; - if (str_to_int(line, &percentage) < 0 || percentage > 100) { - i_error("indexer sent invalid percentage: %s", line); - return -1; - } - if (percentage < 0) { - /* indexing failed */ - i_error("indexer failed to index mailbox %s", - ctx->box->vname); - return -1; - } - ctx->percentage = percentage; - if (percentage == 100) { - /* finished */ - return 1; - } + if (args[1] == NULL) { + i_error("indexer sent invalid reply"); + return -1; } - if (ctx->conn.input->stream_errno != 0) { - i_error("indexer read(%s) failed: %s", - i_stream_get_name(ctx->conn.input), - i_stream_get_error(ctx->conn.input)); + if (strcmp(args[0], "1") != 0) { + i_error("indexer sent invalid reply"); return -1; } - if (ctx->conn.input->eof) { - i_error("indexer disconnected unexpectedly"); + if (strcmp(args[1], "OK") == 0) + return 1; + if (str_to_int(args[1], &percentage) < 0) { + i_error("indexer sent invalid progress: %s", args[1]); + ctx->failed = TRUE; return -1; } - return 0; + if (percentage < 0) { + i_error("indexer failed to index mailbox %s", ctx->box->vname); + ctx->failed = TRUE; + return -1; + } + ctx->percentage = percentage; + if (ctx->percentage == 100) + ctx->completed = TRUE; + return 1; } -static int fts_indexer_more_int(struct fts_indexer_context *ctx) +static void fts_indexer_client_connected(struct connection *conn, bool success) { - struct ioloop *ioloop; - struct io *io; - struct timeout *to; - int ret; - - if ((ret = fts_indexer_input(ctx)) != 0) - return ret; - - /* wait for a while for the reply. FIXME: once search API supports - asynchronous waits, get rid of this wait and use the mail IO loop */ - ioloop = io_loop_create(); - io = io_add(ctx->conn.fd_in, IO_READ, io_loop_stop, ioloop); - to = timeout_add_short(INDEXER_WAIT_MSECS, io_loop_stop, ioloop); - io_loop_run(ioloop); - io_remove(&io); - timeout_remove(&to); - io_loop_destroy(&ioloop); - - return fts_indexer_input(ctx); + struct fts_indexer_context *ctx = + container_of(conn, struct fts_indexer_context, conn); + if (!success) { + ctx->completed = TRUE; + ctx->failed = TRUE; + return; + } + ctx->failed = ctx->completed = FALSE; + const char *cmd = t_strdup_printf("PREPEND\t1\t%s\t%s\t0\t%s\n", + str_tabescape(ctx->box->storage->user->username), + str_tabescape(ctx->box->vname), + str_tabescape(ctx->box->storage->user->session_id)); + o_stream_nsend_str(conn->output, cmd); } -int fts_indexer_more(struct fts_indexer_context *ctx) +static void fts_indexer_idle_timeout(struct connection *conn) { - int ret, diff; + struct fts_indexer_context *ctx = + container_of(conn, struct fts_indexer_context, conn); + mail_storage_set_error(ctx->box->storage, MAIL_ERROR_INUSE, + "Timeout while waiting for indexing to finish"); + ctx->failed = TRUE; + connection_disconnect(conn); +} - if ((ret = fts_indexer_more_int(ctx)) < 0) { - mail_storage_set_internal_error(ctx->box->storage); - ctx->failed = TRUE; - return -1; - } +static const struct connection_settings indexer_client_set = +{ + .service_name_in = "indexer", + .service_name_out = "indexer", + .major_version = 1, + .minor_version = 0, + .client_connect_timeout_msecs = 2000, + .input_max_size = SIZE_MAX, + .output_max_size = IO_BLOCK_SIZE, + .client = TRUE, +}; - if (ctx->timeout_secs > 0) { - diff = ioloop_time - ctx->search_start_time.tv_sec; - if (diff > (int)ctx->timeout_secs) { - mail_storage_set_error(ctx->box->storage, - MAIL_ERROR_INUSE, - "Timeout while waiting for indexing to finish"); - ctx->failed = TRUE; - return -1; - } - } - if (ret == 0) - fts_indexer_notify(ctx); - return ret; -} +static const struct connection_vfuncs indexer_client_vfuncs = +{ + .destroy = fts_indexer_destroy, + .client_connected = fts_indexer_client_connected, + .input_args = fts_indexer_input_args, + .idle_timeout = fts_indexer_idle_timeout, +}; int fts_indexer_init(struct fts_backend *backend, struct mailbox *box, struct fts_indexer_context **ctx_r) { + struct ioloop *prev_ioloop = current_ioloop; struct fts_indexer_context *ctx; struct mailbox_status status; uint32_t last_uid, seq1, seq2; - const char *path, *cmd, *value, *error; - int fd; + const char *path, *value, *error; + unsigned int timeout_secs = 0; + int ret; + + value = mail_user_plugin_getenv(box->storage->user, "fts_index_timeout"); + if (value != NULL) { + if (settings_get_time(value, &timeout_secs, &error) < 0) + i_error("Invalid fts_index_timeout setting: %s", error); + return -1; + } if (fts_backend_get_last_uid(backend, box, &last_uid) < 0) return -1; @@ -226,28 +247,21 @@ int fts_indexer_init(struct fts_backend *backend, struct mailbox *box, return 0; } - cmd = t_strdup_printf("PREPEND\t1\t%s\t%s\t0\t%s\n", - str_tabescape(box->storage->user->username), - str_tabescape(box->vname), - str_tabescape(box->storage->user->session_id)); - fd = fts_indexer_cmd(box->storage->user, cmd, &path); - if (fd == -1) - return -1; + path = t_strconcat(box->storage->user->set->base_dir, + "/"INDEXER_SOCKET_NAME, NULL); - /* connect to indexer and request immediate indexing of the mailbox */ ctx = i_new(struct fts_indexer_context, 1); ctx->box = box; - ctx->conn.label = i_strdup(path); - ctx->conn.fd_in = fd; - ctx->conn.input = i_stream_create_fd(fd, 128); ctx->search_start_time = ioloop_timeval; - - value = mail_user_plugin_getenv(box->storage->user, "fts_index_timeout"); - if (value != NULL) { - if (settings_get_time(value, &ctx->timeout_secs, &error) < 0) - i_error("Invalid fts_index_timeout setting: %s", error); - } - + ctx->conn.event_parent = box->event; + ctx->ioloop = io_loop_create(); + ctx->connection_list = connection_list_init(&indexer_client_set, + &indexer_client_vfuncs); + ctx->conn.input_idle_timeout_secs = timeout_secs; + connection_init_client_unix(ctx->connection_list, &ctx->conn, + path); + ret = connection_client_connect(&ctx->conn); + io_loop_set_current(prev_ioloop); *ctx_r = ctx; - return 1; + return ctx->failed || ret < 0 ? -1 : 1; }