#include "hostpid.h"
#include "array.h"
#include "aqueue.h"
+#include "strescape.h"
#include "master-service.h"
#include "anvil-client.h"
bool (*reconnect_callback)(void);
enum anvil_client_flags flags;
+ bool deinitializing;
};
-#define ANVIL_HANDSHAKE "VERSION\tanvil\t2\t0\n%s\t%s\n"
#define ANVIL_INBUF_SIZE 1024
#define ANVIL_RECONNECT_MIN_SECS 5
#define ANVIL_QUERY_TIMEOUT_MSECS (1000*5)
-static void anvil_client_disconnect(struct anvil_client *client);
+static void anvil_client_destroy(struct connection *conn);
+static int anvil_client_input_line(struct connection *conn, const char *line);
+
+static struct connection_list *anvil_connections;
+
+static struct connection_settings anvil_connections_set = {
+ .major_version = 2,
+ .minor_version = 0,
+ .service_name_out = "anvil-client",
+ .service_name_in = "anvil-server",
+
+ .input_max_size = ANVIL_INBUF_SIZE,
+ .output_max_size = SIZE_MAX,
+
+ .client = TRUE,
+};
+
+static struct connection_vfuncs anvil_connections_vfuncs = {
+ .destroy = anvil_client_destroy,
+ .input_line = anvil_client_input_line,
+};
struct anvil_client *
anvil_client_init(const char *path, bool (*reconnect_callback)(void),
{
struct anvil_client *client;
+ if (anvil_connections == NULL) {
+ anvil_connections = connection_list_init(&anvil_connections_set,
+ &anvil_connections_vfuncs);
+ }
+
client = i_new(struct anvil_client, 1);
- client->conn.base_name = i_strdup(path);
+ connection_init_client_unix(anvil_connections, &client->conn, path);
client->reconnect_callback = reconnect_callback;
client->flags = flags;
- client->conn.fd_in = -1;
i_array_init(&client->queries_arr, 32);
client->queries = aqueue_init(&client->queries_arr.arr);
return client;
*_client = NULL;
- anvil_client_disconnect(client);
+ client->deinitializing = TRUE;
+ anvil_client_destroy(&client->conn);
+
array_free(&client->queries_arr);
aqueue_deinit(&client->queries);
- i_free(client->conn.base_name);
i_assert(client->to_reconnect == NULL);
+ connection_deinit(&client->conn);
i_free(client);
+
+ if (anvil_connections->connections == NULL)
+ connection_list_deinit(&anvil_connections);
}
static void anvil_client_start_multiplex_input(struct anvil_client *client)
struct istream *orig_input = client->conn.input;
client->conn.input = i_stream_create_multiplex(orig_input, ANVIL_INBUF_SIZE);
i_stream_unref(&orig_input);
+
+ connection_streams_changed(&client->conn);
}
static void
o_stream_unref(&orig_output);
}
-static void anvil_reconnect(struct anvil_client *client)
+static void anvil_client_reconnect(struct anvil_client *client)
{
- anvil_client_disconnect(client);
if (client->reconnect_callback != NULL) {
if (!client->reconnect_callback()) {
/* no reconnection */
if (client->to_reconnect == NULL) {
client->to_reconnect =
timeout_add(ANVIL_RECONNECT_MIN_SECS*1000,
- anvil_reconnect, client);
+ anvil_client_reconnect, client);
}
} else {
client->last_reconnect = ioloop_time;
}
}
-static void anvil_input(struct anvil_client *client)
+static int anvil_client_input_line(struct connection *conn, const char *line)
{
+ struct anvil_client *client =
+ container_of(conn, struct anvil_client, conn);
struct anvil_query *const *queries;
struct anvil_query *query;
- const char *line;
unsigned int count;
- queries = array_get(&client->queries_arr, &count);
- while ((line = i_stream_read_next_line(client->conn.input)) != NULL) {
- if (aqueue_count(client->queries) == 0) {
- i_error("anvil: Unexpected input: %s", line);
- continue;
- }
-
- query = queries[aqueue_idx(client->queries, 0)];
- if (query->callback != NULL) T_BEGIN {
- query->callback(line, query->context);
- } T_END;
- i_free(query);
- aqueue_delete_tail(client->queries);
+ if (aqueue_count(client->queries) == 0) {
+ e_error(client->conn.event, "Unexpected input: %s", line);
+ return -1;
}
- if (client->conn.input->stream_errno != 0) {
- i_error("read(%s) failed: %s", client->conn.base_name,
- i_stream_get_error(client->conn.input));
- anvil_reconnect(client);
- } else if (client->conn.input->eof) {
- i_error("read(%s) failed: EOF", client->conn.base_name);
- anvil_reconnect(client);
- } else if (client->to_query != NULL) {
+
+ queries = array_get(&client->queries_arr, &count);
+ query = queries[aqueue_idx(client->queries, 0)];
+ if (query->callback != NULL) T_BEGIN {
+ query->callback(line, query->context);
+ } T_END;
+ i_free(query);
+ aqueue_delete_tail(client->queries);
+
+ if (client->to_query != NULL) {
if (aqueue_count(client->queries) == 0)
timeout_remove(&client->to_query);
else
timeout_reset(client->to_query);
}
+ return 1;
}
int anvil_client_connect(struct anvil_client *client, bool retry)
{
- int fd;
+ int ret;
i_assert(client->conn.fd_in == -1);
- fd = retry ? net_connect_unix_with_retries(client->conn.base_name, 5000) :
- net_connect_unix(client->conn.base_name);
- if (fd == -1) {
+ ret = retry ? connection_client_connect_with_retries(&client->conn, 5000) :
+ connection_client_connect(&client->conn);
+ if (ret < 0) {
if (errno != ENOENT ||
(client->flags & ANVIL_CLIENT_FLAG_HIDE_ENOENT) == 0) {
- i_error("net_connect_unix(%s) failed: %m",
+ e_error(client->conn.event,
+ "net_connect_unix(%s) failed: %m",
client->conn.base_name);
}
return -1;
}
-
timeout_remove(&client->to_reconnect);
- client->conn.fd_in = fd;
- client->conn.input = i_stream_create_fd(fd, ANVIL_INBUF_SIZE);
- client->conn.output = o_stream_create_fd(fd, SIZE_MAX);
- client->conn.io = io_add(fd, IO_READ, anvil_input, client);
const char *anvil_handshake =
- t_strdup_printf(ANVIL_HANDSHAKE,
+ t_strdup_printf("%s\t%s\n",
master_service_get_name(master_service),
my_pid);
o_stream_nsend_str(client->conn.output, anvil_handshake);
timeout_remove(&client->to_query);
}
-static void anvil_client_disconnect(struct anvil_client *client)
+static void anvil_client_destroy(struct connection *conn)
{
+ struct anvil_client *client =
+ container_of(conn, struct anvil_client, conn);
+
+ connection_disconnect(&client->conn);
anvil_client_cancel_queries(client);
- if (client->conn.fd_in != -1) {
- io_remove(&client->conn.io);
- i_stream_destroy(&client->conn.input);
- o_stream_destroy(&client->conn.output);
- net_disconnect(client->conn.fd_in);
- client->conn.fd_in = -1;
- }
timeout_remove(&client->to_reconnect);
+
+ if (!client->deinitializing)
+ anvil_client_reconnect(client);
}
static void anvil_client_timeout(struct anvil_client *client)
{
i_assert(aqueue_count(client->queries) > 0);
- i_error("%s: Anvil queries timed out after %u secs - aborting queries",
- client->conn.base_name, ANVIL_QUERY_TIMEOUT_MSECS/1000);
+ e_error(client->conn.event,
+ "Anvil queries timed out after %u secs - aborting queries",
+ ANVIL_QUERY_TIMEOUT_MSECS/1000);
/* perhaps reconnect helps */
- anvil_reconnect(client);
+ anvil_client_destroy(&client->conn);
}
static int anvil_client_send(struct anvil_client *client, const char *cmd)
{
struct const_iovec iov[2];
- if (client->conn.fd_in == -1) {
+ if (client->conn.disconnected) {
if (anvil_client_connect(client, FALSE) < 0)
return -1;
}
bool anvil_client_is_connected(struct anvil_client *client)
{
- return client->conn.fd_in != -1;
+ return !client->conn.disconnected;
}