]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
dsync: Fixed potential assert crashes with remote dsyncing.
authorTimo Sirainen <tss@iki.fi>
Mon, 19 Jul 2010 14:42:22 +0000 (15:42 +0100)
committerTimo Sirainen <tss@iki.fi>
Mon, 19 Jul 2010 14:42:22 +0000 (15:42 +0100)
src/dsync/dsync-proxy-client.c

index aa073fff08915f433a1ca70754aaf1384c37ee1a..6a461b238ca1bf57d223af9f4a7ccdea320f5e45 100644 (file)
@@ -66,6 +66,7 @@ struct proxy_client_dsync_worker {
        struct dsync_msg_static_data msg_get_data;
        ARRAY_DEFINE(request_array, struct proxy_client_request);
        struct aqueue *request_queue;
+       string_t *pending_commands;
 
        unsigned int handshake_received:1;
        unsigned int finishing:1;
@@ -363,6 +364,7 @@ struct dsync_worker *dsync_worker_init_proxy_client(int fd_in, int fd_out)
        fd_set_nonblock(fd_in, TRUE);
        fd_set_nonblock(fd_out, TRUE);
 
+       worker->pending_commands = str_new(default_pool, 1024);
        worker->msg_get_pool = pool_alloconly_create("dsync proxy msg", 128);
        i_array_init(&worker->request_array, 64);
        worker->request_queue = aqueue_init(&worker->request_array.arr);
@@ -391,6 +393,7 @@ static void proxy_client_worker_deinit(struct dsync_worker *_worker)
        aqueue_deinit(&worker->request_queue);
        array_free(&worker->request_array);
        pool_unref(&worker->msg_get_pool);
+       str_free(&worker->pending_commands);
        i_free(worker);
 }
 
@@ -417,6 +420,7 @@ static int proxy_client_worker_output_flush(struct dsync_worker *_worker)
 {
        struct proxy_client_dsync_worker *worker =
                (struct proxy_client_dsync_worker *)_worker;
+       int ret = 1;
 
        if (o_stream_flush(worker->output) < 0)
                return -1;
@@ -424,8 +428,13 @@ static int proxy_client_worker_output_flush(struct dsync_worker *_worker)
        o_stream_uncork(worker->output);
        if (o_stream_get_buffer_used_size(worker->output) > 0)
                return 0;
+
+       if (o_stream_send(worker->output, str_data(worker->pending_commands),
+                         str_len(worker->pending_commands)) < 0)
+               ret = -1;
+       str_truncate(worker->pending_commands, 0);
        o_stream_cork(worker->output);
-       return 1;
+       return ret;
 }
 
 static struct dsync_worker_mailbox_iter *
@@ -439,7 +448,7 @@ proxy_client_worker_mailbox_iter_init(struct dsync_worker *_worker)
        iter->iter.worker = _worker;
        iter->pool = pool_alloconly_create("proxy mailbox iter", 1024);
        o_stream_send_str(worker->output, "BOX-LIST\n");
-       proxy_client_worker_output_flush(_worker);
+       (void)proxy_client_worker_output_flush(_worker);
        return &iter->iter;
 }
 
@@ -502,7 +511,7 @@ proxy_client_worker_subs_iter_init(struct dsync_worker *_worker)
        iter->iter.worker = _worker;
        iter->pool = pool_alloconly_create("proxy subscription iter", 1024);
        o_stream_send_str(worker->output, "SUBS-LIST\n");
-       proxy_client_worker_output_flush(_worker);
+       (void)proxy_client_worker_output_flush(_worker);
        return &iter->iter;
 }
 
@@ -650,7 +659,7 @@ proxy_client_worker_msg_iter_init(struct dsync_worker *_worker,
        o_stream_send(worker->output, str_data(str), str_len(str));
        p_clear(iter->pool);
 
-       proxy_client_worker_output_flush(_worker);
+       (void)proxy_client_worker_output_flush(_worker);
        return &iter->iter;
 }
 
@@ -718,6 +727,15 @@ proxy_client_worker_msg_iter_deinit(struct dsync_worker_msg_iter *_iter)
        return ret;
 }
 
+static void
+proxy_client_worker_cmd(struct proxy_client_dsync_worker *worker, string_t *str)
+{
+       if (worker->save_input == NULL)
+               o_stream_send(worker->output, str_data(str), str_len(str));
+       else
+               str_append_str(worker->pending_commands, str);
+}
+
 static void
 proxy_client_worker_create_mailbox(struct dsync_worker *_worker,
                                   const struct dsync_mailbox *dsync_box)
@@ -725,15 +743,13 @@ proxy_client_worker_create_mailbox(struct dsync_worker *_worker,
        struct proxy_client_dsync_worker *worker =
                (struct proxy_client_dsync_worker *)_worker;
 
-       i_assert(worker->save_input == NULL);
-
        T_BEGIN {
                string_t *str = t_str_new(128);
 
                str_append(str, "BOX-CREATE\t");
                dsync_proxy_mailbox_export(str, dsync_box);
                str_append_c(str, '\n');
-               o_stream_send(worker->output, str_data(str), str_len(str));
+               proxy_client_worker_cmd(worker, str);
        } T_END;
 }
 
@@ -744,15 +760,13 @@ proxy_client_worker_delete_mailbox(struct dsync_worker *_worker,
        struct proxy_client_dsync_worker *worker =
                (struct proxy_client_dsync_worker *)_worker;
 
-       i_assert(worker->save_input == NULL);
-
        T_BEGIN {
                string_t *str = t_str_new(128);
 
                str_append(str, "BOX-DELETE\t");
                dsync_proxy_mailbox_guid_export(str, &dsync_box->mailbox_guid);
                str_printfa(str, "\t%s\n", dec2str(dsync_box->last_change));
-               o_stream_send(worker->output, str_data(str), str_len(str));
+               proxy_client_worker_cmd(worker, str);
        } T_END;
 }
 
@@ -763,15 +777,13 @@ proxy_client_worker_delete_dir(struct dsync_worker *_worker,
        struct proxy_client_dsync_worker *worker =
                (struct proxy_client_dsync_worker *)_worker;
 
-       i_assert(worker->save_input == NULL);
-
        T_BEGIN {
                string_t *str = t_str_new(128);
 
                str_append(str, "DIR-DELETE\t");
                str_tabescape_write(str, dsync_box->name);
                str_printfa(str, "\t%s\n", dec2str(dsync_box->last_change));
-               o_stream_send(worker->output, str_data(str), str_len(str));
+               proxy_client_worker_cmd(worker, str);
        } T_END;
 }
 
@@ -784,8 +796,6 @@ proxy_client_worker_rename_mailbox(struct dsync_worker *_worker,
                (struct proxy_client_dsync_worker *)_worker;
        char sep[2];
 
-       i_assert(worker->save_input == NULL);
-
        T_BEGIN {
                string_t *str = t_str_new(128);
 
@@ -797,7 +807,7 @@ proxy_client_worker_rename_mailbox(struct dsync_worker *_worker,
                sep[0] = dsync_box->name_sep; sep[1] = '\0';
                str_tabescape_write(str, sep);
                str_append_c(str, '\n');
-               o_stream_send(worker->output, str_data(str), str_len(str));
+               proxy_client_worker_cmd(worker, str);
        } T_END;
 }
 
@@ -808,15 +818,13 @@ proxy_client_worker_update_mailbox(struct dsync_worker *_worker,
        struct proxy_client_dsync_worker *worker =
                (struct proxy_client_dsync_worker *)_worker;
 
-       i_assert(worker->save_input == NULL);
-
        T_BEGIN {
                string_t *str = t_str_new(128);
 
                str_append(str, "BOX-UPDATE\t");
                dsync_proxy_mailbox_export(str, dsync_box);
                str_append_c(str, '\n');
-               o_stream_send(worker->output, str_data(str), str_len(str));
+               proxy_client_worker_cmd(worker, str);
        } T_END;
 }
 
@@ -828,8 +836,6 @@ proxy_client_worker_select_mailbox(struct dsync_worker *_worker,
        struct proxy_client_dsync_worker *worker =
                (struct proxy_client_dsync_worker *)_worker;
 
-       i_assert(worker->save_input == NULL);
-
        if (dsync_guid_equals(&worker->selected_box_guid, mailbox))
                return;
        worker->selected_box_guid = *mailbox;
@@ -842,7 +848,7 @@ proxy_client_worker_select_mailbox(struct dsync_worker *_worker,
                if (cache_fields != NULL)
                        dsync_proxy_strings_export(str, cache_fields);
                str_append_c(str, '\n');
-               o_stream_send(worker->output, str_data(str), str_len(str));
+               proxy_client_worker_cmd(worker, str);
        } T_END;
 }
 
@@ -853,8 +859,6 @@ proxy_client_worker_msg_update_metadata(struct dsync_worker *_worker,
        struct proxy_client_dsync_worker *worker =
                (struct proxy_client_dsync_worker *)_worker;
 
-       i_assert(worker->save_input == NULL);
-
        T_BEGIN {
                string_t *str = t_str_new(128);
 
@@ -862,7 +866,7 @@ proxy_client_worker_msg_update_metadata(struct dsync_worker *_worker,
                            (unsigned long long)msg->modseq);
                imap_write_flags(str, msg->flags, msg->keywords);
                str_append_c(str, '\n');
-               o_stream_send(worker->output, str_data(str), str_len(str));
+               proxy_client_worker_cmd(worker, str);
        } T_END;
 }
 
@@ -873,12 +877,10 @@ proxy_client_worker_msg_update_uid(struct dsync_worker *_worker,
        struct proxy_client_dsync_worker *worker =
                (struct proxy_client_dsync_worker *)_worker;
 
-       i_assert(worker->save_input == NULL);
-
        T_BEGIN {
-               o_stream_send_str(worker->output,
-                       t_strdup_printf("MSG-UID-CHANGE\t%u\t%u\n",
-                                       old_uid, new_uid));
+               string_t *str = t_str_new(64);
+               str_printfa(str, "MSG-UID-CHANGE\t%u\t%u\n", old_uid, new_uid);
+               proxy_client_worker_cmd(worker, str);
        } T_END;
 }
 
@@ -888,11 +890,10 @@ proxy_client_worker_msg_expunge(struct dsync_worker *_worker, uint32_t uid)
        struct proxy_client_dsync_worker *worker =
                (struct proxy_client_dsync_worker *)_worker;
 
-       i_assert(worker->save_input == NULL);
-
        T_BEGIN {
-               o_stream_send_str(worker->output,
-                       t_strdup_printf("MSG-EXPUNGE\t%u\n", uid));
+               string_t *str = t_str_new(64);
+               str_printfa(str, "MSG-EXPUNGE\t%u\n", uid);
+               proxy_client_worker_cmd(worker, str);
        } T_END;
 }
 
@@ -908,8 +909,6 @@ proxy_client_worker_msg_copy(struct dsync_worker *_worker,
                (struct proxy_client_dsync_worker *)_worker;
        struct proxy_client_request request;
 
-       i_assert(worker->save_input == NULL);
-
        T_BEGIN {
                string_t *str = t_str_new(128);
 
@@ -918,7 +917,7 @@ proxy_client_worker_msg_copy(struct dsync_worker *_worker,
                str_printfa(str, "\t%u\t", src_uid);
                dsync_proxy_msg_export(str, dest_msg);
                str_append_c(str, '\n');
-               o_stream_send(worker->output, str_data(str), str_len(str));
+               proxy_client_worker_cmd(worker, str);
        } T_END;
 
        memset(&request, 0, sizeof(request));
@@ -976,6 +975,7 @@ static void proxy_client_send_stream(struct proxy_client_dsync_worker *worker)
        callback = worker->save_callback;
        worker->save_callback = NULL;
        i_stream_unref(&worker->save_input);
+       (void)proxy_client_worker_output_flush(&worker->worker);
 
        callback(worker->save_context);
 }
@@ -990,8 +990,6 @@ proxy_client_worker_msg_save(struct dsync_worker *_worker,
        struct proxy_client_dsync_worker *worker =
                (struct proxy_client_dsync_worker *)_worker;
 
-       i_assert(worker->save_input == NULL);
-
        T_BEGIN {
                string_t *str = t_str_new(128);
 
@@ -1000,7 +998,7 @@ proxy_client_worker_msg_save(struct dsync_worker *_worker,
                str_append_c(str, '\t');
                dsync_proxy_msg_export(str, msg);
                str_append_c(str, '\n');
-               o_stream_send(worker->output, str_data(str), str_len(str));
+               proxy_client_worker_cmd(worker, str);
        } T_END;
 
        i_assert(worker->save_io == NULL);
@@ -1034,15 +1032,13 @@ proxy_client_worker_msg_get(struct dsync_worker *_worker,
                (struct proxy_client_dsync_worker *)_worker;
        struct proxy_client_request request;
 
-       i_assert(worker->save_input == NULL);
-
        T_BEGIN {
                string_t *str = t_str_new(128);
 
                str_append(str, "MSG-GET\t");
                dsync_proxy_mailbox_guid_export(str, mailbox);
                str_printfa(str, "\t%u\n", uid);
-               o_stream_send(worker->output, str_data(str), str_len(str));
+               proxy_client_worker_cmd(worker, str);
        } T_END;
 
        memset(&request, 0, sizeof(request));