#include "connection.h"
#include "ioloop.h"
#include "istream.h"
-#include "write-full.h"
+#include "ostream.h"
#include "strescape.h"
#include "process-title.h"
#include "master-service.h"
#define INDEXER_PROTOCOL_MAJOR_VERSION 1
#define INDEXER_PROTOCOL_MINOR_VERSION 0
-#define INDEXER_WORKER_HANDSHAKE "VERSION\tindexer-worker-master\t1\t0\n%u\n"
#define INDEXER_MASTER_NAME "indexer-master-worker"
-
-struct master_connection *master_conn;
+#define INDEXER_WORKER_NAME "indexer-worker-master"
struct master_connection {
struct connection conn;
sizeof(percentage_str), "%u\n",
percentage) < 0)
i_unreached();
- (void)write_full(conn->conn.fd_in, percentage_str,
- strlen(percentage_str));
+ o_stream_nsend_str(conn->conn.output,
+ percentage_str);
}
indexer_worker_refresh_proctitle(username, box_vname,
counter, max);
}
static int
-master_connection_input_line(struct master_connection *conn, const char *line)
+master_connection_input_args(struct connection *_conn, const char *const *args)
{
- const char *const *args = t_strsplit_tabescaped(line);
+ struct master_connection *conn =
+ container_of(_conn, struct master_connection, conn);
struct mail_storage_service_input input;
struct mail_storage_service_user *service_user;
struct mail_user *user;
/* <username> <mailbox> <session ID> <max_recent_msgs> [i][o] */
if (str_array_length(args) != 5 ||
str_to_uint(args[3], &max_recent_msgs) < 0 || args[4][0] == '\0') {
- i_error("Invalid input from master: %s", line);
+ i_error("Invalid input from master: %s",
+ t_strarray_join(args, "\t"));
return -1;
}
}
str = ret < 0 ? "-1\n" : "100\n";
- return write_full(conn->conn.fd_in, str, strlen(str));
+ o_stream_nsend_str(conn->conn.output, str);
+ return ret;
}
-static void master_connection_input(struct master_connection *conn)
+static void master_connection_destroy(struct connection *connection)
{
- const char *line;
- int ret;
+ connection_deinit(connection);
+ i_free(connection);
+ master_service_client_connection_destroyed(master_service);
+}
- if (i_stream_read(conn->conn.input) < 0) {
- master_connection_destroy();
- return;
- }
+static int master_connection_handshake_args(struct connection *connection,
+ const char *const *args)
+{
+ int ret;
+ if ((ret = connection_handshake_args_default(connection, args)) < 1)
+ return ret;
+ const char *limit = t_strdup_printf("%u\n",
+ master_service_get_process_limit(master_service));
+ o_stream_nsend_str(connection->output, limit);
+ return 1;
+}
- if (!conn->conn.version_received) {
- if ((line = i_stream_next_line(conn->conn.input)) == NULL)
- return;
+static struct connection_list *master_connection_list = NULL;
- if (!version_string_verify(line, INDEXER_MASTER_NAME,
- INDEXER_PROTOCOL_MAJOR_VERSION)) {
- i_error("Indexer master not compatible with this master "
- "(mixed old and new binaries?)");
- master_connection_destroy();
- return;
- }
- conn->conn.version_received = TRUE;
- }
+static const struct connection_vfuncs master_connection_vfuncs = {
+ .destroy = master_connection_destroy,
+ .input_args = master_connection_input_args,
+ .handshake_args = master_connection_handshake_args,
+};
- while ((line = i_stream_next_line(conn->conn.input)) != NULL) {
- T_BEGIN {
- ret = master_connection_input_line(conn, line);
- } T_END;
- if (ret < 0) {
- master_connection_destroy();
- break;
- }
- }
-}
+static const struct connection_settings master_connection_set = {
+ .service_name_in = INDEXER_MASTER_NAME,
+ .service_name_out = INDEXER_WORKER_NAME,
+ .major_version = INDEXER_PROTOCOL_MAJOR_VERSION,
+ .minor_version = INDEXER_PROTOCOL_MINOR_VERSION,
+ .input_max_size = SIZE_MAX,
+ .output_max_size = SIZE_MAX,
+};
-struct master_connection *
-master_connection_create(int fd, struct mail_storage_service_ctx *storage_service)
+bool
+master_connection_create(struct master_service_connection *master,
+ struct mail_storage_service_ctx *storage_service)
{
struct master_connection *conn;
- const char *handshake;
+
+ if (master_connection_list == NULL) {
+ master_connection_list =
+ connection_list_init(&master_connection_set,
+ &master_connection_vfuncs);
+ } else if (master_connection_list->connections_count > 0) {
+ return FALSE;
+ }
conn = i_new(struct master_connection, 1);
conn->storage_service = storage_service;
- conn->conn.fd_in = fd;
- conn->conn.io = io_add(conn->conn.fd_in, IO_READ, master_connection_input, conn);
- conn->conn.input = i_stream_create_fd(conn->conn.fd_in, SIZE_MAX);
+ connection_init_server(master_connection_list, &conn->conn,
+ master->name, master->fd, master->fd);
- handshake = t_strdup_printf(INDEXER_WORKER_HANDSHAKE,
- master_service_get_process_limit(master_service));
- (void)write_full(conn->conn.fd_in, handshake, strlen(handshake));
- return conn;
+ return TRUE;
}
-void master_connection_destroy(void)
+void master_connections_destroy(void)
{
- struct master_connection *conn = master_conn;
-
- master_conn = NULL;
-
- io_remove(&conn->conn.io);
- i_stream_destroy(&conn->conn.input);
-
- if (close(conn->conn.fd_in) < 0)
- i_error("close(master conn) failed: %m");
- i_free(conn);
-
- master_service_client_connection_destroyed(master_service);
+ if (master_connection_list != NULL)
+ connection_list_deinit(&master_connection_list);
}