io_remove(&worker->io);
proxy_client_worker_msg_get_finish(worker);
}
+ timeout_reset(worker->to);
}
static void
if (worker->to_input != NULL)
timeout_remove(&worker->to_input);
- timeout_reset(worker->to);
if (worker->worker.input_callback != NULL) {
worker->worker.input_callback(worker->worker.input_context);
+ timeout_reset(worker->to);
return;
}
don't get back here. */
timeout_remove(&worker->to_input);
}
+ timeout_reset(worker->to);
}
-static int proxy_client_worker_output(struct proxy_client_dsync_worker *worker)
+static int
+proxy_client_worker_output_real(struct proxy_client_dsync_worker *worker)
{
int ret;
- timeout_reset(worker->to);
if ((ret = o_stream_flush(worker->output)) < 0)
return 1;
return ret;
}
+static int proxy_client_worker_output(struct proxy_client_dsync_worker *worker)
+{
+ int ret;
+
+ ret = proxy_client_worker_output_real(worker);
+ timeout_reset(worker->to);
+ return ret;
+}
+
static void proxy_client_worker_timeout(void *context ATTR_UNUSED)
{
i_error("proxy client timed out");
aqueue_append(worker->request_queue, &request);
}
-static void proxy_client_send_stream(struct proxy_client_dsync_worker *worker)
+static void
+proxy_client_send_stream_real(struct proxy_client_dsync_worker *worker)
{
dsync_worker_save_callback_t *callback;
const unsigned char *data;
callback(worker->save_context);
}
+static void proxy_client_send_stream(struct proxy_client_dsync_worker *worker)
+{
+ proxy_client_send_stream_real(worker);
+ timeout_reset(worker->to);
+}
+
static void
proxy_client_worker_msg_save(struct dsync_worker *_worker,
const struct dsync_message *msg,