struct notify_connection {
struct notify_connection *prev, *next;
+ struct event *event;
int refcount;
int fd;
}
static int
-notify_input_line(struct notify_connection *conn, const char *line)
+notify_input_line(struct notify_connection *conn, const char *line,
+ const char **error_r)
{
const char *const *args;
enum replication_priority priority;
/* <username> \t <priority> */
args = t_strsplit_tabescaped(line);
if (str_array_length(args) < 2) {
- i_error("Client sent invalid input");
+ *error_r = "Client sent invalid input";
return -1;
}
if (replication_priority_parse(args[1], &priority) < 0) {
- i_error("Client sent invalid priority: %s", args[1]);
+ *error_r = t_strdup_printf(
+ "Client sent invalid priority: %s", args[1]);
return -1;
}
if (priority != REPLICATION_PRIORITY_SYNC)
{
const char *line;
int ret;
+ const char *error;
switch (i_stream_read(conn->input)) {
case -2:
/* buffer full */
- i_error("Client sent too long line");
+ e_error(conn->event, "Client sent too long line");
(void)notify_input_error(conn);
return;
case -1:
while ((line = i_stream_next_line(conn->input)) != NULL) {
T_BEGIN {
- ret = notify_input_line(conn, line);
+ ret = notify_input_line(conn, line, &error);
+ if (ret < 0)
+ e_error(conn->event, "%s", error);
} T_END;
if (ret < 0) {
if (!notify_input_error(conn))
conn->fd = fd;
conn->io = io_add(fd, IO_READ, notify_input, conn);
conn->input = i_stream_create_fd(fd, MAX_INBUF_SIZE);
+ conn->event = event_create(NULL);
if (!fifo) {
conn->output = o_stream_create_fd(fd, SIZE_MAX);
o_stream_set_no_error_handling(conn->output, TRUE);
i_stream_destroy(&conn->input);
o_stream_destroy(&conn->output);
+ event_unref(&conn->event);
i_free(conn);
}
struct replicator_connection {
char *path;
+ struct event *event;
struct ip_addr *ips;
unsigned int ips_count, ip_idx;
in_port_t port;
/* <+|-> \t <id> */
if ((line[0] != '+' && line[0] != '-') || line[1] != '\t' ||
str_to_uint(line+2, &id) < 0 || id == 0) {
- i_error("Replicator sent invalid input: %s", line);
+ e_error(conn->event, "Replicator sent invalid input: %s", line);
return -1;
}
context = hash_table_lookup(conn->requests, POINTER_CAST(id));
if (context == NULL) {
- i_error("Replicator sent invalid ID: %u", id);
+ e_error(conn->event, "Replicator sent invalid ID: %u", id);
return -1;
}
hash_table_remove(conn->requests, POINTER_CAST(id));
switch (i_stream_read(conn->input)) {
case -2:
/* buffer full */
- i_error("Replicator sent too long line");
+ e_error(conn->event, "Replicator sent too long line");
replicator_connection_disconnect(conn);
return;
case -1:
if (conn->port == 0) {
fd = net_connect_unix(conn->path);
if (fd == -1)
- i_error("net_connect_unix(%s) failed: %m", conn->path);
+ e_error(conn->event, "net_connect_unix(%s) failed: %m",
+ conn->path);
} else {
for (n = 0; n < conn->ips_count; n++) {
unsigned int idx = conn->ip_idx;
fd = net_connect_ip(&conn->ips[idx], conn->port, NULL);
if (fd != -1)
break;
- i_error("connect(%s, %u) failed: %m",
+ e_error(conn->event, "connect(%s, %u) failed: %m",
net_ip2addr(&conn->ips[idx]), conn->port);
}
}
conn = i_new(struct replicator_connection, 1);
conn->fd = -1;
+ conn->event = event_create(NULL);
hash_table_create_direct(&conn->requests, default_pool, 0);
for (i = REPLICATION_PRIORITY_LOW; i <= REPLICATION_PRIORITY_SYNC; i++)
conn->queue[i] = buffer_create_dynamic(default_pool, 1024);
timeout_remove(&conn->to);
hash_table_destroy(&conn->requests);
+ event_unref(&conn->event);
i_free(conn->ips);
i_free(conn->path);
i_free(conn);