struct io *io;
struct istream *input;
struct ostream *output;
+ struct timeout *to;
mailbox_guid_t selected_box_guid;
const char *line;
int ret;
+ timeout_reset(worker->to);
if (worker->worker.input_callback != NULL) {
worker->worker.input_callback(worker->worker.input_context);
return;
{
int ret;
+ timeout_reset(worker->to);
if ((ret = o_stream_flush(worker->output)) < 0)
return 1;
return ret;
}
+static void proxy_client_worker_timeout(void *context ATTR_UNUSED)
+{
+ i_error("proxy client timed out");
+ master_service_stop(master_service);
+}
+
struct dsync_worker *dsync_worker_init_proxy_client(int fd_in, int fd_out)
{
struct proxy_client_dsync_worker *worker;
worker->worker.v = proxy_client_dsync_worker;
worker->fd_in = fd_in;
worker->fd_out = fd_out;
+ worker->to = timeout_add(DSYNC_PROXY_TIMEOUT_MSECS,
+ proxy_client_worker_timeout, NULL);
worker->io = io_add(fd_in, IO_READ, proxy_client_worker_input, worker);
worker->input = i_stream_create_fd(fd_in, (size_t)-1, FALSE);
worker->output = o_stream_create_fd(fd_out, (size_t)-1, FALSE);
struct proxy_client_dsync_worker *worker =
(struct proxy_client_dsync_worker *)_worker;
+ timeout_remove(&worker->to);
if (worker->io != NULL)
io_remove(&worker->io);
i_stream_destroy(&worker->input);
return;
}
+ timeout_reset(server->to);
o_stream_cork(server->output);
while (proxy_server_read_line(server, &line) > 0) {
T_BEGIN {
struct ostream *output = server->output;
int ret;
+ timeout_reset(server->to);
if ((ret = o_stream_flush(output)) < 0)
ret = 1;
else if (server->cur_cmd != NULL) {
return ret;
}
+static void dsync_proxy_server_timeout(void *context ATTR_UNUSED)
+{
+ i_error("proxy server timed out");
+ master_service_stop(master_service);
+}
+
struct dsync_proxy_server *
dsync_proxy_server_init(int fd_in, int fd_out, struct dsync_worker *worker)
{
server->io = io_add(fd_in, IO_READ, proxy_server_input, server);
server->input = i_stream_create_fd(fd_in, (size_t)-1, FALSE);
server->output = o_stream_create_fd(fd_out, (size_t)-1, FALSE);
+ server->to = timeout_add(DSYNC_PROXY_TIMEOUT_MSECS,
+ dsync_proxy_server_timeout, NULL);
o_stream_set_flush_callback(server->output, proxy_server_output,
server);
fd_set_nonblock(fd_in, TRUE);
if (server->get_input != NULL)
i_stream_unref(&server->get_input);
pool_unref(&server->cmd_pool);
+ timeout_remove(&server->to);
io_remove(&server->io);
i_stream_destroy(&server->input);
o_stream_destroy(&server->output);