#include "lib.h"
#include "ioloop.h"
#include "connection.h"
-#include "net.h"
#include "istream.h"
-#include "write-full.h"
+#include "ostream.h"
#include "strescape.h"
#include "time-util.h"
#include "settings-parser.h"
#include "fts-indexer.h"
#define INDEXER_NOTIFY_INTERVAL_SECS 10
-
#define INDEXER_SOCKET_NAME "indexer"
#define INDEXER_WAIT_MSECS 250
-#define INDEXER_HANDSHAKE "VERSION\tindexer\t1\t0\n"
struct fts_indexer_context {
struct connection conn;
struct mailbox *box;
+ struct ioloop *ioloop;
struct timeval search_start_time, last_notify;
unsigned int percentage;
- unsigned int timeout_secs;
- struct connection_list *connections;
+ struct connection_list *connection_list;
bool notified:1;
bool failed:1;
+ bool completed:1;
};
-int fts_indexer_cmd(struct mail_user *user, const char *cmd,
- const char **path_r)
-{
- const char *path;
- int fd;
-
- path = t_strconcat(user->set->base_dir,
- "/"INDEXER_SOCKET_NAME, NULL);
- fd = net_connect_unix_with_retries(path, 1000);
- if (fd == -1) {
- i_error("net_connect_unix(%s) failed: %m", path);
- return -1;
- }
-
- cmd = t_strconcat(INDEXER_HANDSHAKE, cmd, NULL);
- if (write_full(fd, cmd, strlen(cmd)) < 0) {
- i_error("write(%s) failed: %m", path);
- i_close_fd(&fd);
- return -1;
- }
- *path_r = path;
- return fd;
-}
-
static void fts_indexer_notify(struct fts_indexer_context *ctx)
{
unsigned long long elapsed_msecs, est_total_msecs;
} T_END;
}
+static int fts_indexer_more_int(struct fts_indexer_context *ctx)
+{
+ struct ioloop *prev_ioloop = current_ioloop;
+ struct timeout *to;
+
+ if (ctx->failed)
+ return -1;
+ if (ctx->completed)
+ return 1;
+
+ /* wait for a while for the reply. FIXME: once search API supports
+ asynchronous waits, get rid of this wait and use the mail IO loop */
+ io_loop_set_current(ctx->ioloop);
+ to = timeout_add_short(INDEXER_WAIT_MSECS, io_loop_stop, ctx->ioloop);
+ io_loop_run(ctx->ioloop);
+ timeout_remove(&to);
+ io_loop_set_current(prev_ioloop);
+
+ if (ctx->failed)
+ return -1;
+ if (ctx->completed)
+ return 1;
+ return 0;
+}
+
+int fts_indexer_more(struct fts_indexer_context *ctx)
+{
+ int ret;
+
+ if ((ret = fts_indexer_more_int(ctx)) < 0) {
+ mail_storage_set_internal_error(ctx->box->storage);
+ ctx->failed = TRUE;
+ return -1;
+ }
+
+ if (ret == 0)
+ fts_indexer_notify(ctx);
+
+ return ret;
+}
+
+static void fts_indexer_destroy(struct connection *conn)
+{
+ struct fts_indexer_context *ctx =
+ container_of(conn, struct fts_indexer_context, conn);
+ connection_deinit(conn);
+ if (!ctx->completed)
+ ctx->failed = TRUE;
+ ctx->completed = TRUE;
+}
+
int fts_indexer_deinit(struct fts_indexer_context **_ctx)
{
struct fts_indexer_context *ctx = *_ctx;
- int ret = ctx->failed ? -1 : 0;
-
+ i_assert(ctx != NULL);
*_ctx = NULL;
-
- i_stream_destroy(&ctx->conn.input);
- if (close(ctx->conn.fd_in) < 0)
- i_error("close(%s) failed: %m", ctx->conn.label);
+ if (!ctx->completed)
+ ctx->failed = TRUE;
+ int ret = ctx->failed ? -1 : 0;
if (ctx->notified) {
/* we notified at least once */
ctx->box->storage->callbacks.
notify_ok(ctx->box, "Mailbox indexing finished",
ctx->box->storage->callback_context);
}
- i_free(ctx->conn.label);
+ connection_list_deinit(&ctx->connection_list);
+ io_loop_set_current(ctx->ioloop);
+ io_loop_destroy(&ctx->ioloop);
i_free(ctx);
return ret;
}
-static int fts_indexer_input(struct fts_indexer_context *ctx)
+static int
+fts_indexer_input_args(struct connection *conn, const char *const *args)
{
- const char *line;
+ struct fts_indexer_context *ctx =
+ container_of(conn, struct fts_indexer_context, conn);
int percentage;
-
- while ((line = i_stream_read_next_line(ctx->conn.input)) != NULL) {
- /* initial reply: <tag> \t OK
- following: <tag> \t <percentage> */
- if (!str_begins(line, "1\t")) {
- i_error("indexer sent invalid reply: %s", line);
- return -1;
- }
- line += 2;
- if (strcmp(line, "OK") == 0)
- continue;
- if (str_to_int(line, &percentage) < 0 || percentage > 100) {
- i_error("indexer sent invalid percentage: %s", line);
- return -1;
- }
- if (percentage < 0) {
- /* indexing failed */
- i_error("indexer failed to index mailbox %s",
- ctx->box->vname);
- return -1;
- }
- ctx->percentage = percentage;
- if (percentage == 100) {
- /* finished */
- return 1;
- }
+ if (args[1] == NULL) {
+ i_error("indexer sent invalid reply");
+ return -1;
}
- if (ctx->conn.input->stream_errno != 0) {
- i_error("indexer read(%s) failed: %s",
- i_stream_get_name(ctx->conn.input),
- i_stream_get_error(ctx->conn.input));
+ if (strcmp(args[0], "1") != 0) {
+ i_error("indexer sent invalid reply");
return -1;
}
- if (ctx->conn.input->eof) {
- i_error("indexer disconnected unexpectedly");
+ if (strcmp(args[1], "OK") == 0)
+ return 1;
+ if (str_to_int(args[1], &percentage) < 0) {
+ i_error("indexer sent invalid progress: %s", args[1]);
+ ctx->failed = TRUE;
return -1;
}
- return 0;
+ if (percentage < 0) {
+ i_error("indexer failed to index mailbox %s", ctx->box->vname);
+ ctx->failed = TRUE;
+ return -1;
+ }
+ ctx->percentage = percentage;
+ if (ctx->percentage == 100)
+ ctx->completed = TRUE;
+ return 1;
}
-static int fts_indexer_more_int(struct fts_indexer_context *ctx)
+static void fts_indexer_client_connected(struct connection *conn, bool success)
{
- struct ioloop *ioloop;
- struct io *io;
- struct timeout *to;
- int ret;
-
- if ((ret = fts_indexer_input(ctx)) != 0)
- return ret;
-
- /* wait for a while for the reply. FIXME: once search API supports
- asynchronous waits, get rid of this wait and use the mail IO loop */
- ioloop = io_loop_create();
- io = io_add(ctx->conn.fd_in, IO_READ, io_loop_stop, ioloop);
- to = timeout_add_short(INDEXER_WAIT_MSECS, io_loop_stop, ioloop);
- io_loop_run(ioloop);
- io_remove(&io);
- timeout_remove(&to);
- io_loop_destroy(&ioloop);
-
- return fts_indexer_input(ctx);
+ struct fts_indexer_context *ctx =
+ container_of(conn, struct fts_indexer_context, conn);
+ if (!success) {
+ ctx->completed = TRUE;
+ ctx->failed = TRUE;
+ return;
+ }
+ ctx->failed = ctx->completed = FALSE;
+ const char *cmd = t_strdup_printf("PREPEND\t1\t%s\t%s\t0\t%s\n",
+ str_tabescape(ctx->box->storage->user->username),
+ str_tabescape(ctx->box->vname),
+ str_tabescape(ctx->box->storage->user->session_id));
+ o_stream_nsend_str(conn->output, cmd);
}
-int fts_indexer_more(struct fts_indexer_context *ctx)
+static void fts_indexer_idle_timeout(struct connection *conn)
{
- int ret, diff;
+ struct fts_indexer_context *ctx =
+ container_of(conn, struct fts_indexer_context, conn);
+ mail_storage_set_error(ctx->box->storage, MAIL_ERROR_INUSE,
+ "Timeout while waiting for indexing to finish");
+ ctx->failed = TRUE;
+ connection_disconnect(conn);
+}
- if ((ret = fts_indexer_more_int(ctx)) < 0) {
- mail_storage_set_internal_error(ctx->box->storage);
- ctx->failed = TRUE;
- return -1;
- }
+static const struct connection_settings indexer_client_set =
+{
+ .service_name_in = "indexer",
+ .service_name_out = "indexer",
+ .major_version = 1,
+ .minor_version = 0,
+ .client_connect_timeout_msecs = 2000,
+ .input_max_size = SIZE_MAX,
+ .output_max_size = IO_BLOCK_SIZE,
+ .client = TRUE,
+};
- if (ctx->timeout_secs > 0) {
- diff = ioloop_time - ctx->search_start_time.tv_sec;
- if (diff > (int)ctx->timeout_secs) {
- mail_storage_set_error(ctx->box->storage,
- MAIL_ERROR_INUSE,
- "Timeout while waiting for indexing to finish");
- ctx->failed = TRUE;
- return -1;
- }
- }
- if (ret == 0)
- fts_indexer_notify(ctx);
- return ret;
-}
+static const struct connection_vfuncs indexer_client_vfuncs =
+{
+ .destroy = fts_indexer_destroy,
+ .client_connected = fts_indexer_client_connected,
+ .input_args = fts_indexer_input_args,
+ .idle_timeout = fts_indexer_idle_timeout,
+};
int fts_indexer_init(struct fts_backend *backend, struct mailbox *box,
struct fts_indexer_context **ctx_r)
{
+ struct ioloop *prev_ioloop = current_ioloop;
struct fts_indexer_context *ctx;
struct mailbox_status status;
uint32_t last_uid, seq1, seq2;
- const char *path, *cmd, *value, *error;
- int fd;
+ const char *path, *value, *error;
+ unsigned int timeout_secs = 0;
+ int ret;
+
+ value = mail_user_plugin_getenv(box->storage->user, "fts_index_timeout");
+ if (value != NULL) {
+ if (settings_get_time(value, &timeout_secs, &error) < 0)
+ i_error("Invalid fts_index_timeout setting: %s", error);
+ return -1;
+ }
if (fts_backend_get_last_uid(backend, box, &last_uid) < 0)
return -1;
return 0;
}
- cmd = t_strdup_printf("PREPEND\t1\t%s\t%s\t0\t%s\n",
- str_tabescape(box->storage->user->username),
- str_tabescape(box->vname),
- str_tabescape(box->storage->user->session_id));
- fd = fts_indexer_cmd(box->storage->user, cmd, &path);
- if (fd == -1)
- return -1;
+ path = t_strconcat(box->storage->user->set->base_dir,
+ "/"INDEXER_SOCKET_NAME, NULL);
- /* connect to indexer and request immediate indexing of the mailbox */
ctx = i_new(struct fts_indexer_context, 1);
ctx->box = box;
- ctx->conn.label = i_strdup(path);
- ctx->conn.fd_in = fd;
- ctx->conn.input = i_stream_create_fd(fd, 128);
ctx->search_start_time = ioloop_timeval;
-
- value = mail_user_plugin_getenv(box->storage->user, "fts_index_timeout");
- if (value != NULL) {
- if (settings_get_time(value, &ctx->timeout_secs, &error) < 0)
- i_error("Invalid fts_index_timeout setting: %s", error);
- }
-
+ ctx->conn.event_parent = box->event;
+ ctx->ioloop = io_loop_create();
+ ctx->connection_list = connection_list_init(&indexer_client_set,
+ &indexer_client_vfuncs);
+ ctx->conn.input_idle_timeout_secs = timeout_secs;
+ connection_init_client_unix(ctx->connection_list, &ctx->conn,
+ path);
+ ret = connection_client_connect(&ctx->conn);
+ io_loop_set_current(prev_ioloop);
*ctx_r = ctx;
- return 1;
+ return ctx->failed || ret < 0 ? -1 : 1;
}