#include "lib.h"
#include "ioloop.h"
#include "net.h"
+#include "connection.h"
#include "istream.h"
#include "ostream.h"
#include "istream-multiplex.h"
};
struct anvil_client {
- char *path;
- int fd;
- struct istream *input;
- struct ostream *output;
- struct io *io;
+ struct connection conn;
struct timeout *to_query;
struct timeout *to_reconnect;
struct anvil_client *client;
client = i_new(struct anvil_client, 1);
- client->path = i_strdup(path);
+ client->conn.base_name = i_strdup(path);
client->reconnect_callback = reconnect_callback;
client->flags = flags;
- client->fd = -1;
+ client->conn.fd_in = -1;
i_array_init(&client->queries_arr, 32);
client->queries = aqueue_init(&client->queries_arr.arr);
return client;
anvil_client_disconnect(client);
array_free(&client->queries_arr);
aqueue_deinit(&client->queries);
- i_free(client->path);
+ i_free(client->conn.base_name);
i_assert(client->to_reconnect == NULL);
i_free(client);
}
static void anvil_client_start_multiplex_input(struct anvil_client *client)
{
- struct istream *orig_input = client->input;
- client->input = i_stream_create_multiplex(orig_input, ANVIL_INBUF_SIZE);
+ struct istream *orig_input = client->conn.input;
+ client->conn.input = i_stream_create_multiplex(orig_input, ANVIL_INBUF_SIZE);
i_stream_unref(&orig_input);
}
static void
anvil_client_start_multiplex_output(struct anvil_client *client)
{
- struct ostream *orig_output = client->output;
- client->output = o_stream_create_multiplex(orig_output, SIZE_MAX);
- o_stream_set_no_error_handling(client->output, TRUE);
+ struct ostream *orig_output = client->conn.output;
+ client->conn.output = o_stream_create_multiplex(orig_output, SIZE_MAX);
+ o_stream_set_no_error_handling(client->conn.output, TRUE);
o_stream_unref(&orig_output);
}
unsigned int count;
queries = array_get(&client->queries_arr, &count);
- while ((line = i_stream_read_next_line(client->input)) != NULL) {
+ while ((line = i_stream_read_next_line(client->conn.input)) != NULL) {
if (aqueue_count(client->queries) == 0) {
i_error("anvil: Unexpected input: %s", line);
continue;
i_free(query);
aqueue_delete_tail(client->queries);
}
- if (client->input->stream_errno != 0) {
- i_error("read(%s) failed: %s", client->path,
- i_stream_get_error(client->input));
+ if (client->conn.input->stream_errno != 0) {
+ i_error("read(%s) failed: %s", client->conn.base_name,
+ i_stream_get_error(client->conn.input));
anvil_reconnect(client);
- } else if (client->input->eof) {
- i_error("read(%s) failed: EOF", client->path);
+ } else if (client->conn.input->eof) {
+ i_error("read(%s) failed: EOF", client->conn.base_name);
anvil_reconnect(client);
} else if (client->to_query != NULL) {
if (aqueue_count(client->queries) == 0)
{
int fd;
- i_assert(client->fd == -1);
+ i_assert(client->conn.fd_in == -1);
- fd = retry ? net_connect_unix_with_retries(client->path, 5000) :
- net_connect_unix(client->path);
+ fd = retry ? net_connect_unix_with_retries(client->conn.base_name, 5000) :
+ net_connect_unix(client->conn.base_name);
if (fd == -1) {
if (errno != ENOENT ||
(client->flags & ANVIL_CLIENT_FLAG_HIDE_ENOENT) == 0) {
i_error("net_connect_unix(%s) failed: %m",
- client->path);
+ client->conn.base_name);
}
return -1;
}
timeout_remove(&client->to_reconnect);
- client->fd = fd;
- client->input = i_stream_create_fd(fd, ANVIL_INBUF_SIZE);
- client->output = o_stream_create_fd(fd, SIZE_MAX);
- client->io = io_add(fd, IO_READ, anvil_input, client);
+ client->conn.fd_in = fd;
+ client->conn.input = i_stream_create_fd(fd, ANVIL_INBUF_SIZE);
+ client->conn.output = o_stream_create_fd(fd, SIZE_MAX);
+ client->conn.io = io_add(fd, IO_READ, anvil_input, client);
const char *anvil_handshake =
t_strdup_printf(ANVIL_HANDSHAKE,
master_service_get_name(master_service),
my_pid);
- if (o_stream_send_str(client->output, anvil_handshake) < 0) {
- i_error("write(%s) failed: %s", client->path,
- o_stream_get_error(client->output));
+ if (o_stream_send_str(client->conn.output, anvil_handshake) < 0) {
+ i_error("write(%s) failed: %s", client->conn.base_name,
+ o_stream_get_error(client->conn.output));
anvil_reconnect(client);
return -1;
}
static void anvil_client_disconnect(struct anvil_client *client)
{
anvil_client_cancel_queries(client);
- if (client->fd != -1) {
- io_remove(&client->io);
- i_stream_destroy(&client->input);
- o_stream_destroy(&client->output);
- net_disconnect(client->fd);
- client->fd = -1;
+ if (client->conn.fd_in != -1) {
+ io_remove(&client->conn.io);
+ i_stream_destroy(&client->conn.input);
+ o_stream_destroy(&client->conn.output);
+ net_disconnect(client->conn.fd_in);
+ client->conn.fd_in = -1;
}
timeout_remove(&client->to_reconnect);
}
i_assert(aqueue_count(client->queries) > 0);
i_error("%s: Anvil queries timed out after %u secs - aborting queries",
- client->path, ANVIL_QUERY_TIMEOUT_MSECS/1000);
+ client->conn.base_name, ANVIL_QUERY_TIMEOUT_MSECS/1000);
/* perhaps reconnect helps */
anvil_reconnect(client);
}
{
struct const_iovec iov[2];
- if (client->fd == -1) {
+ if (client->conn.fd_in == -1) {
if (anvil_client_connect(client, FALSE) < 0)
return -1;
}
iov[0].iov_len = strlen(cmd);
iov[1].iov_base = "\n";
iov[1].iov_len = 1;
- if (o_stream_sendv(client->output, iov, 2) < 0) {
- i_error("write(%s) failed: %s", client->path,
- o_stream_get_error(client->output));
+ if (o_stream_sendv(client->conn.output, iov, 2) < 0) {
+ i_error("write(%s) failed: %s", client->conn.base_name,
+ o_stream_get_error(client->conn.output));
anvil_reconnect(client);
return -1;
}
bool anvil_client_is_connected(struct anvil_client *client)
{
- return client->fd != -1;
+ return client->conn.fd_in != -1;
}