#define INDEXER_PROTOCOL_MAJOR_VERSION 1
#define INDEXER_PROTOCOL_MINOR_VERSION 0
-#define INDEXER_MASTER_HANDSHAKE "VERSION\tindexer-master-worker\t1\t0\n"
+#define INDEXER_MASTER_NAME "indexer-master-worker"
#define INDEXER_WORKER_NAME "indexer-worker-master"
struct worker_connection {
unsigned int process_limit;
};
-static void worker_connection_unref(struct worker_connection *conn)
+static void worker_connection_call_callback(struct worker_connection *worker,
+ int percentage)
{
- i_assert(conn->refcount > 0);
- if (--conn->refcount > 0)
- return;
-
- i_free(conn->conn.base_name);
- i_free(conn);
+ if (worker->request != NULL)
+ worker->callback(percentage, &worker->conn);
+ if (percentage < 0 || percentage == 100)
+ worker->request = NULL;
}
-static void worker_connection_disconnect(struct connection *conn)
+static void worker_connection_destroy(struct connection *conn)
{
struct worker_connection *worker =
container_of(conn, struct worker_connection, conn);
- if (conn->fd_in != -1) {
- io_remove(&conn->io);
- i_stream_destroy(&conn->input);
- o_stream_destroy(&conn->output);
-
- if (close(conn->fd_in) < 0)
- i_error("close(%s) failed: %m", conn->base_name);
- conn->fd_in = -1;
- }
- /* conn->callback() can try to destroy us */
- worker->refcount++;
+ worker->request = NULL;
i_free_and_null(worker->request_username);
- worker_connection_unref(worker);
+ connection_deinit(conn);
}
-void worker_connection_destroy(struct connection **_conn)
+void worker_connection_unref(struct connection **_conn)
{
struct connection *conn = *_conn;
struct worker_connection *worker =
container_of(conn, struct worker_connection, conn);
- *_conn = NULL;
-
- worker_connection_disconnect(conn);
- worker_connection_unref(worker);
+ i_assert(worker->refcount > 0);
+ if (--worker->refcount > 0)
+ return;
+ worker_connection_destroy(conn);
+ i_free(conn);
}
static int
-worker_connection_input_line(struct connection *conn, const char *line)
+worker_connection_handshake_args(struct connection *conn, const char *const *args)
{
struct worker_connection *worker =
container_of(conn, struct worker_connection, conn);
- int percentage;
- /* return -1 -> error
- 0 -> request completed (100%)
- 1 -> request continues (<100%)
- */
- int ret = 1;
-
- if (str_to_int(line, &percentage) < 0 ||
- percentage < -1 || percentage > 100) {
- i_error("Invalid input from worker: %s", line);
+ int ret;
+ if (!conn->version_received) {
+ if ((ret = connection_handshake_args_default(conn, args)) < 1)
+ return ret;
+ /* we are not done yet */
+ return 0;
+ }
+ i_assert(worker->process_limit == 0);
+ if (str_to_uint(args[0], &worker->process_limit) < 0 ||
+ worker->process_limit == 0) {
+ e_error(conn->event, "Worker sent invalid process limit '%s'",
+ args[0]);
return -1;
}
-
- /* is request finished */
- if (percentage < 0)
- ret = -1;
- else if (percentage == 100)
- ret = 0;
-
- worker->callback(percentage, conn);
- return ret;
+ return 1;
}
-static void worker_connection_input(struct connection *conn)
+static int
+worker_connection_input_args(struct connection *conn, const char *const *args)
{
struct worker_connection *worker =
container_of(conn, struct worker_connection, conn);
- const char *line;
-
- if (i_stream_read(conn->input) < 0) {
- worker_connection_disconnect(conn);
- return;
- }
+ int percentage;
+ int ret = 1;
- if (!conn->version_received) {
- if ((line = i_stream_next_line(conn->input)) == NULL)
- return;
-
- if (!version_string_verify(line, INDEXER_WORKER_NAME,
- INDEXER_PROTOCOL_MAJOR_VERSION)) {
- i_error("Indexer worker not compatible with this master "
- "(mixed old and new binaries?)");
- worker_connection_disconnect(conn);
- return;
- }
- conn->version_received = TRUE;
- }
- if (worker->process_limit == 0) {
- if ((line = i_stream_next_line(conn->input)) == NULL)
- return;
- if (str_to_uint(line, &worker->process_limit) < 0 ||
- worker->process_limit == 0) {
- i_error("Indexer worker sent invalid handshake: %s",
- line);
- worker_connection_disconnect(conn);
- return;
- }
+ if (str_to_int(args[0], &percentage) < 0 ||
+ percentage < -1 || percentage > 100) {
+ e_error(conn->event, "Worker sent invalid progress '%s'", args[0]);
+ return -1;
}
- while ((line = i_stream_next_line(conn->input)) != NULL) {
- if (worker_connection_input_line(conn, line) <= 0) {
- break;
- }
- }
-}
+ if (percentage < 0)
+ ret = -1;
-int worker_connection_connect(struct connection *conn)
-{
- i_assert(conn->fd_in == -1);
+ worker_connection_call_callback(worker, percentage);
- conn->fd_in = net_connect_unix(conn->base_name);
- if (conn->fd_in == -1) {
- i_error("connect(%s) failed: %m", conn->base_name);
- return -1;
- }
- conn->io = io_add(conn->fd_in, IO_READ, worker_connection_input, conn);
- conn->input = i_stream_create_fd(conn->fd_in, SIZE_MAX);
- conn->output = o_stream_create_fd(conn->fd_in, SIZE_MAX);
- o_stream_set_no_error_handling(conn->output, TRUE);
- o_stream_nsend_str(conn->output, INDEXER_MASTER_HANDSHAKE);
- return 0;
+ return ret;
}
bool worker_connection_is_connected(struct connection *conn)
{
- return conn->fd_in != -1;
+ return !conn->disconnected;
}
bool worker_connection_get_process_limit(struct connection *conn,
return worker->request;
}
+static const struct connection_vfuncs worker_connection_vfuncs = {
+ .destroy = worker_connection_destroy,
+ .input_args = worker_connection_input_args,
+ .handshake_args = worker_connection_handshake_args,
+};
+
+static const struct connection_settings worker_connection_set = {
+ .service_name_in = INDEXER_WORKER_NAME,
+ .service_name_out = INDEXER_MASTER_NAME,
+ .major_version = INDEXER_PROTOCOL_MAJOR_VERSION,
+ .minor_version = INDEXER_PROTOCOL_MINOR_VERSION,
+ .input_max_size = SIZE_MAX,
+ .output_max_size = SIZE_MAX,
+ .client = TRUE,
+};
+
+struct connection_list *worker_connection_list_create(void)
+{
+ return connection_list_init(&worker_connection_set,
+ &worker_connection_vfuncs);
+}
+
struct connection *
worker_connection_create(const char *socket_path,
indexer_status_callback_t *callback,
conn = i_new(struct worker_connection, 1);
conn->refcount = 1;
- conn->conn.base_name = i_strdup(socket_path);
conn->callback = callback;
- conn->conn.fd_in = -1;
- DLLIST_PREPEND(&list->connections, &conn->conn);
+ connection_init_client_unix(list, &conn->conn, socket_path);
+
return &conn->conn;
}