struct proxy_client_request {
enum proxy_client_request_type type;
+ uint32_t uid;
union {
dsync_worker_msg_callback_t *get;
dsync_worker_copy_callback_t *copy;
}
static bool
-proxy_client_worker_next_copy(const struct proxy_client_request *request,
+proxy_client_worker_next_copy(struct proxy_client_dsync_worker *worker,
+ const struct proxy_client_request *request,
const char *line)
{
- request->callback.copy(*line == '1', request->context);
+ uint32_t uid;
+ bool success;
+
+ if (line[0] == '1' && line[1] == '\t')
+ success = TRUE;
+ else if (line[0] == '0' && line[1] == '\t')
+ success = FALSE;
+ else {
+ i_error("msg-copy returned invalid input: %s", line);
+ proxy_client_fail(worker);
+ return FALSE;
+ }
+ uid = strtoul(line + 2, NULL, 10);
+ if (uid != request->uid) {
+ i_error("msg-copy returned invalid uid: %u != %u",
+ uid, request->uid);
+ proxy_client_fail(worker);
+ return FALSE;
+ }
+
+ request->callback.copy(success, request->context);
return TRUE;
}
uid = strtoul(t_strcut(line, '\t'), NULL, 10);
line = p + 1;
+ if (uid != request->uid) {
+ i_error("msg-get returned invalid uid: %u != %u",
+ uid, request->uid);
+ proxy_client_fail(worker);
+ return FALSE;
+ }
+
if (dsync_proxy_msg_static_import(worker->msg_get_pool,
line, &worker->msg_get_data,
&error) < 0) {
switch (request.type) {
case PROXY_CLIENT_REQUEST_TYPE_COPY:
- ret = proxy_client_worker_next_copy(&request, line);
+ ret = proxy_client_worker_next_copy(worker, &request, line);
break;
case PROXY_CLIENT_REQUEST_TYPE_GET:
ret = proxy_client_worker_next_msg_get(worker, &request, line);
request.type = PROXY_CLIENT_REQUEST_TYPE_COPY;
request.callback.copy = callback;
request.context = context;
+ request.uid = src_uid;
aqueue_append(worker->request_queue, &request);
}
request.type = PROXY_CLIENT_REQUEST_TYPE_GET;
request.callback.get = callback;
request.context = context;
+ request.uid = uid;
aqueue_append(worker->request_queue, &request);
}
static void copy_callback(bool success, void *context)
{
struct dsync_proxy_server *server = context;
+ const char *reply;
+
+ i_assert(server->copy_uid != 0);
- o_stream_send(server->output, success ? "1\n" : "0\n", 2);
+ reply = t_strdup_printf("%d\t%u\n", success ? 1 : 0, server->copy_uid);
+ o_stream_send_str(server->output, reply);
}
static int
args + 2, &msg, &error) < 0)
i_error("Invalid message input: %s", error);
+ server->copy_uid = src_uid;
dsync_worker_msg_copy(server->worker, &src_mailbox_guid, src_uid, &msg,
copy_callback, server);
+ server->copy_uid = 0;
return 1;
}
struct dsync_proxy_server *server = context;
string_t *str;
+ i_assert(server->get_uid != 0);
+
switch (result) {
case DSYNC_MSG_GET_RESULT_SUCCESS:
break;
dsync_worker_msg_get(server->worker, &mailbox_guid, uid,
cmd_msg_get_callback, server);
}
- return server->get_input == NULL ? 1 : 0;
+ if (server->get_input != NULL)
+ return 0;
+ server->get_uid = 0;
+ return 1;
}
static void cmd_finish_callback(bool success, void *context)