enum item_type {
ITEM_NONE,
+ ITEM_DONE,
ITEM_HANDSHAKE,
ITEM_MAILBOX_STATE,
const char *optional_keys;
} items[ITEM_END_OF_LIST+1] = {
{ NULL, '\0', NULL, NULL },
+ { .name = "done",
+ .chr = 'X',
+ .optional_keys = ""
+ },
{ .name = "handshake",
.chr = 'H',
.optional_keys = "sync_ns_prefix sync_box sync_type debug sync_all_namespaces "
unsigned int version_received:1;
unsigned int handshake_received:1;
unsigned int has_pending_data:1;
+ unsigned int stopped:1;
};
static void dsync_ibc_stream_stop(struct dsync_ibc_stream *ibc)
{
+ ibc->stopped = TRUE;
i_stream_close(ibc->input);
o_stream_close(ibc->output);
io_loop_stop(current_ioloop);
o_stream_nsend_str(ibc->output, DSYNC_HANDSHAKE_VERSION);
/* initialize serializers and send their headers to remote */
- for (i = 1; i < ITEM_END_OF_LIST; i++) T_BEGIN {
+ for (i = ITEM_DONE + 1; i < ITEM_END_OF_LIST; i++) T_BEGIN {
const char *keys;
keys = items[i].required_keys == NULL ? items[i].optional_keys :
dsync_deserializer_decode_finish(&ibc->cur_decoder);
if (ibc->mail_output != NULL)
i_stream_unref(&ibc->mail_output);
+ else {
+ /* notify remote that we're closing. this is mainly to avoid
+ "read() failed: EOF" errors on failing dsyncs */
+ o_stream_nsend_str(ibc->output,
+ t_strdup_printf("%c\n", items[ITEM_DONE].chr));
+ o_stream_nfinish(ibc->output);
+ }
timeout_remove(&ibc->to);
if (ibc->io != NULL)
*line_r = line;
return 1;
}
-
/* try reading some */
if (i_stream_read(ibc->input) == -1) {
+ if (ibc->stopped)
+ return -1;
if (ibc->input->stream_errno != 0) {
errno = ibc->input->stream_errno;
i_error("read(%s) failed: %m", ibc->name);
unsigned int i;
int ret = 0;
- for (i = 1; i < ITEM_END_OF_LIST; i++) {
+ for (i = ITEM_DONE + 1; i < ITEM_END_OF_LIST; i++) {
if (ibc->deserializers[i] == NULL &&
(items[i].required_keys != NULL ||
items[i].optional_keys != NULL)) {
/* end of this list */
return DSYNC_IBC_RECV_RET_FINISHED;
}
+ if (line[0] == items[ITEM_DONE].chr) {
+ /* remote cleanly closed the connection, possibly because of
+ some failure (which it should have logged). we don't want to
+ log any stream errors anyway after this. */
+ dsync_ibc_stream_stop(ibc);
+ return DSYNC_IBC_RECV_RET_TRYAGAIN;
+
+ }
for (i = 1; i < ITEM_END_OF_LIST; i++) {
if (*line == items[i].chr) {
line_item = i;