}
static void
-proxy_client_worker_msg_get_done(struct proxy_client_dsync_worker *worker)
+proxy_client_worker_msg_get_finish(struct proxy_client_dsync_worker *worker)
{
- struct istream *input = worker->msg_get_data.input;
- const unsigned char *data;
- size_t size;
-
- i_assert(worker->io == NULL);
-
worker->msg_get_data.input = NULL;
worker->io = io_add(worker->fd_in, IO_READ,
proxy_client_worker_input, worker);
- /* we'll need to read the input until EOF or we'll start treating the
- input as commands. make sure saving read everything. */
- while ((i_stream_read_data(input, &data, &size, 0)) > 0)
- i_stream_skip(input, size);
-
/* some input may already be buffered. note that we may be coming here
from the input function itself, in which case this timeout must not
be called (we'll remove it later) */
}
}
+static void
+proxy_client_worker_read_to_eof(struct proxy_client_dsync_worker *worker)
+{
+ struct istream *input = worker->msg_get_data.input;
+ const unsigned char *data;
+ size_t size;
+ int ret;
+
+ while ((ret = i_stream_read_data(input, &data, &size, 0)) > 0)
+ i_stream_skip(input, size);
+ if (ret == -1) {
+ i_stream_unref(&input);
+ io_remove(&worker->io);
+ proxy_client_worker_msg_get_finish(worker);
+ }
+}
+
+static void
+proxy_client_worker_msg_get_done(struct proxy_client_dsync_worker *worker)
+{
+ struct istream *input = worker->msg_get_data.input;
+
+ i_assert(worker->io == NULL);
+
+ if (input->eof)
+ proxy_client_worker_msg_get_finish(worker);
+ else {
+ /* saving read the message only partially. we'll need to read
+ the input until EOF or we'll start treating the input as
+ commands. */
+ worker->io = io_add(worker->fd_in, IO_READ,
+ proxy_client_worker_read_to_eof, worker);
+ worker->msg_get_data.input =
+ i_stream_create_dot(worker->input, FALSE);
+ }
+}
+
static bool
proxy_client_worker_next_copy(struct proxy_client_dsync_worker *worker,
const struct proxy_client_request *request,
}
request->callback.get(result, &worker->msg_get_data, request->context);
- return worker->io != NULL;
+ return worker->io != NULL && worker->msg_get_data.input == NULL;
}
static void