]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
plugins/fts: fts-indexer - Use connection.c functions
authorAki Tuomi <aki.tuomi@open-xchange.com>
Wed, 26 May 2021 10:55:52 +0000 (13:55 +0300)
committerTimo Sirainen <timo.sirainen@open-xchange.com>
Mon, 14 Jun 2021 14:56:40 +0000 (17:56 +0300)
src/plugins/fts/fts-indexer.c

index 293ab72124a89801ef70d420f8dad6edb865ebc2..1e30274b1e4007cfec3fedbb14de60156955e6c3 100644 (file)
@@ -3,9 +3,8 @@
 #include "lib.h"
 #include "ioloop.h"
 #include "connection.h"
-#include "net.h"
 #include "istream.h"
-#include "write-full.h"
+#include "ostream.h"
 #include "strescape.h"
 #include "time-util.h"
 #include "settings-parser.h"
 #include "fts-indexer.h"
 
 #define INDEXER_NOTIFY_INTERVAL_SECS 10
-
 #define INDEXER_SOCKET_NAME "indexer"
 #define INDEXER_WAIT_MSECS 250
-#define INDEXER_HANDSHAKE "VERSION\tindexer\t1\t0\n"
 
 struct fts_indexer_context {
        struct connection conn;
 
        struct mailbox *box;
+       struct ioloop *ioloop;
 
        struct timeval search_start_time, last_notify;
        unsigned int percentage;
-       unsigned int timeout_secs;
-       struct connection_list *connections;
+       struct connection_list *connection_list;
 
        bool notified:1;
        bool failed:1;
+       bool completed:1;
 };
 
-int fts_indexer_cmd(struct mail_user *user, const char *cmd,
-                   const char **path_r)
-{
-       const char *path;
-       int fd;
-
-       path = t_strconcat(user->set->base_dir,
-                          "/"INDEXER_SOCKET_NAME, NULL);
-       fd = net_connect_unix_with_retries(path, 1000);
-       if (fd == -1) {
-               i_error("net_connect_unix(%s) failed: %m", path);
-               return -1;
-       }
-
-       cmd = t_strconcat(INDEXER_HANDSHAKE, cmd, NULL);
-       if (write_full(fd, cmd, strlen(cmd)) < 0) {
-               i_error("write(%s) failed: %m", path);
-               i_close_fd(&fd);
-               return -1;
-       }
-       *path_r = path;
-       return fd;
-}
-
 static void fts_indexer_notify(struct fts_indexer_context *ctx)
 {
        unsigned long long elapsed_msecs, est_total_msecs;
@@ -89,127 +63,174 @@ static void fts_indexer_notify(struct fts_indexer_context *ctx)
        } T_END;
 }
 
+static int fts_indexer_more_int(struct fts_indexer_context *ctx)
+{
+       struct ioloop *prev_ioloop = current_ioloop;
+       struct timeout *to;
+
+       if (ctx->failed)
+               return -1;
+       if (ctx->completed)
+               return 1;
+
+       /* wait for a while for the reply. FIXME: once search API supports
+          asynchronous waits, get rid of this wait and use the mail IO loop */
+       io_loop_set_current(ctx->ioloop);
+       to = timeout_add_short(INDEXER_WAIT_MSECS, io_loop_stop, ctx->ioloop);
+       io_loop_run(ctx->ioloop);
+       timeout_remove(&to);
+       io_loop_set_current(prev_ioloop);
+
+       if (ctx->failed)
+               return -1;
+       if (ctx->completed)
+               return 1;
+       return 0;
+}
+
+int fts_indexer_more(struct fts_indexer_context *ctx)
+{
+       int ret;
+
+       if ((ret = fts_indexer_more_int(ctx)) < 0) {
+               mail_storage_set_internal_error(ctx->box->storage);
+               ctx->failed = TRUE;
+               return -1;
+       }
+
+       if (ret == 0)
+               fts_indexer_notify(ctx);
+
+       return ret;
+}
+
+static void fts_indexer_destroy(struct connection *conn)
+{
+       struct fts_indexer_context *ctx =
+               container_of(conn, struct fts_indexer_context, conn);
+       connection_deinit(conn);
+       if (!ctx->completed)
+               ctx->failed = TRUE;
+       ctx->completed = TRUE;
+}
+
 int fts_indexer_deinit(struct fts_indexer_context **_ctx)
 {
        struct fts_indexer_context *ctx = *_ctx;
-       int ret = ctx->failed ? -1 : 0;
-
+       i_assert(ctx != NULL);
        *_ctx = NULL;
-
-       i_stream_destroy(&ctx->conn.input);
-       if (close(ctx->conn.fd_in) < 0)
-               i_error("close(%s) failed: %m", ctx->conn.label);
+       if (!ctx->completed)
+               ctx->failed = TRUE;
+       int ret = ctx->failed ? -1 : 0;
        if (ctx->notified) {
                /* we notified at least once */
                ctx->box->storage->callbacks.
                        notify_ok(ctx->box, "Mailbox indexing finished",
                                  ctx->box->storage->callback_context);
        }
-       i_free(ctx->conn.label);
+       connection_list_deinit(&ctx->connection_list);
+       io_loop_set_current(ctx->ioloop);
+       io_loop_destroy(&ctx->ioloop);
        i_free(ctx);
        return ret;
 }
 
-static int fts_indexer_input(struct fts_indexer_context *ctx)
+static int
+fts_indexer_input_args(struct connection *conn, const char *const *args)
 {
-       const char *line;
+       struct fts_indexer_context *ctx =
+               container_of(conn, struct fts_indexer_context, conn);
        int percentage;
-
-       while ((line = i_stream_read_next_line(ctx->conn.input)) != NULL) {
-               /* initial reply: <tag> \t OK
-                  following: <tag> \t <percentage> */
-               if (!str_begins(line, "1\t")) {
-                       i_error("indexer sent invalid reply: %s", line);
-                       return -1;
-               }
-               line += 2;
-               if (strcmp(line, "OK") == 0)
-                       continue;
-               if (str_to_int(line, &percentage) < 0 || percentage > 100) {
-                       i_error("indexer sent invalid percentage: %s", line);
-                       return -1;
-               }
-               if (percentage < 0) {
-                       /* indexing failed */
-                       i_error("indexer failed to index mailbox %s",
-                               ctx->box->vname);
-                       return -1;
-               }
-               ctx->percentage = percentage;
-               if (percentage == 100) {
-                       /* finished */
-                       return 1;
-               }
+       if (args[1] == NULL) {
+               i_error("indexer sent invalid reply");
+               return -1;
        }
-       if (ctx->conn.input->stream_errno != 0) {
-               i_error("indexer read(%s) failed: %s",
-                       i_stream_get_name(ctx->conn.input),
-                       i_stream_get_error(ctx->conn.input));
+       if (strcmp(args[0], "1") != 0) {
+               i_error("indexer sent invalid reply");
                return -1;
        }
-       if (ctx->conn.input->eof) {
-               i_error("indexer disconnected unexpectedly");
+       if (strcmp(args[1], "OK") == 0)
+               return 1;
+       if (str_to_int(args[1], &percentage) < 0) {
+               i_error("indexer sent invalid progress: %s", args[1]);
+               ctx->failed = TRUE;
                return -1;
        }
-       return 0;
+       if (percentage < 0) {
+               i_error("indexer failed to index mailbox %s", ctx->box->vname);
+               ctx->failed = TRUE;
+               return -1;
+       }
+       ctx->percentage = percentage;
+       if (ctx->percentage == 100)
+               ctx->completed = TRUE;
+       return 1;
 }
 
-static int fts_indexer_more_int(struct fts_indexer_context *ctx)
+static void fts_indexer_client_connected(struct connection *conn, bool success)
 {
-       struct ioloop *ioloop;
-       struct io *io;
-       struct timeout *to;
-       int ret;
-
-       if ((ret = fts_indexer_input(ctx)) != 0)
-               return ret;
-
-       /* wait for a while for the reply. FIXME: once search API supports
-          asynchronous waits, get rid of this wait and use the mail IO loop */
-       ioloop = io_loop_create();
-       io = io_add(ctx->conn.fd_in, IO_READ, io_loop_stop, ioloop);
-       to = timeout_add_short(INDEXER_WAIT_MSECS, io_loop_stop, ioloop);
-       io_loop_run(ioloop);
-       io_remove(&io);
-       timeout_remove(&to);
-       io_loop_destroy(&ioloop);
-
-       return fts_indexer_input(ctx);
+       struct fts_indexer_context *ctx =
+               container_of(conn, struct fts_indexer_context, conn);
+       if (!success) {
+               ctx->completed = TRUE;
+               ctx->failed = TRUE;
+               return;
+       }
+       ctx->failed = ctx->completed = FALSE;
+       const char *cmd = t_strdup_printf("PREPEND\t1\t%s\t%s\t0\t%s\n",
+                             str_tabescape(ctx->box->storage->user->username),
+                             str_tabescape(ctx->box->vname),
+                             str_tabescape(ctx->box->storage->user->session_id));
+       o_stream_nsend_str(conn->output, cmd);
 }
 
-int fts_indexer_more(struct fts_indexer_context *ctx)
+static void fts_indexer_idle_timeout(struct connection *conn)
 {
-       int ret, diff;
+       struct fts_indexer_context *ctx =
+               container_of(conn, struct fts_indexer_context, conn);
+       mail_storage_set_error(ctx->box->storage, MAIL_ERROR_INUSE,
+                              "Timeout while waiting for indexing to finish");
+       ctx->failed = TRUE;
+       connection_disconnect(conn);
+}
 
-       if ((ret = fts_indexer_more_int(ctx)) < 0) {
-               mail_storage_set_internal_error(ctx->box->storage);
-               ctx->failed = TRUE;
-               return -1;
-       }
+static const struct connection_settings indexer_client_set =
+{
+       .service_name_in = "indexer",
+       .service_name_out = "indexer",
+       .major_version = 1,
+       .minor_version = 0,
+       .client_connect_timeout_msecs = 2000,
+       .input_max_size = SIZE_MAX,
+       .output_max_size = IO_BLOCK_SIZE,
+       .client = TRUE,
+};
 
-       if (ctx->timeout_secs > 0) {
-               diff = ioloop_time - ctx->search_start_time.tv_sec;
-               if (diff > (int)ctx->timeout_secs) {
-                       mail_storage_set_error(ctx->box->storage,
-                               MAIL_ERROR_INUSE,
-                               "Timeout while waiting for indexing to finish");
-                       ctx->failed = TRUE;
-                       return -1;
-               }
-       }
-       if (ret == 0)
-               fts_indexer_notify(ctx);
-       return ret;
-}
+static const struct connection_vfuncs indexer_client_vfuncs =
+{
+       .destroy = fts_indexer_destroy,
+       .client_connected = fts_indexer_client_connected,
+       .input_args = fts_indexer_input_args,
+       .idle_timeout = fts_indexer_idle_timeout,
+};
 
 int fts_indexer_init(struct fts_backend *backend, struct mailbox *box,
                     struct fts_indexer_context **ctx_r)
 {
+       struct ioloop *prev_ioloop = current_ioloop;
        struct fts_indexer_context *ctx;
        struct mailbox_status status;
        uint32_t last_uid, seq1, seq2;
-       const char *path, *cmd, *value, *error;
-       int fd;
+       const char *path, *value, *error;
+       unsigned int timeout_secs = 0;
+       int ret;
+
+       value = mail_user_plugin_getenv(box->storage->user, "fts_index_timeout");
+       if (value != NULL) {
+               if (settings_get_time(value, &timeout_secs, &error) < 0)
+                       i_error("Invalid fts_index_timeout setting: %s", error);
+               return -1;
+       }
 
        if (fts_backend_get_last_uid(backend, box, &last_uid) < 0)
                return -1;
@@ -226,28 +247,21 @@ int fts_indexer_init(struct fts_backend *backend, struct mailbox *box,
                return 0;
        }
 
-       cmd = t_strdup_printf("PREPEND\t1\t%s\t%s\t0\t%s\n",
-                             str_tabescape(box->storage->user->username),
-                             str_tabescape(box->vname),
-                             str_tabescape(box->storage->user->session_id));
-       fd = fts_indexer_cmd(box->storage->user, cmd, &path);
-       if (fd == -1)
-               return -1;
+       path = t_strconcat(box->storage->user->set->base_dir,
+                          "/"INDEXER_SOCKET_NAME, NULL);
 
-       /* connect to indexer and request immediate indexing of the mailbox */
        ctx = i_new(struct fts_indexer_context, 1);
        ctx->box = box;
-       ctx->conn.label = i_strdup(path);
-       ctx->conn.fd_in = fd;
-       ctx->conn.input = i_stream_create_fd(fd, 128);
        ctx->search_start_time = ioloop_timeval;
-
-       value = mail_user_plugin_getenv(box->storage->user, "fts_index_timeout");
-       if (value != NULL) {
-               if (settings_get_time(value, &ctx->timeout_secs, &error) < 0)
-                       i_error("Invalid fts_index_timeout setting: %s", error);
-       }
-
+       ctx->conn.event_parent = box->event;
+       ctx->ioloop = io_loop_create();
+       ctx->connection_list = connection_list_init(&indexer_client_set,
+                                                   &indexer_client_vfuncs);
+       ctx->conn.input_idle_timeout_secs = timeout_secs;
+       connection_init_client_unix(ctx->connection_list, &ctx->conn,
+                                   path);
+       ret = connection_client_connect(&ctx->conn);
+       io_loop_set_current(prev_ioloop);
        *ctx_r = ctx;
-       return 1;
+       return ctx->failed || ret < 0 ? -1 : 1;
 }