]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
pop3c: Added full support for running commands asynchronously (with PIPELINING)
authorTimo Sirainen <timo.sirainen@dovecot.fi>
Tue, 26 Jan 2016 13:40:09 +0000 (15:40 +0200)
committerTimo Sirainen <timo.sirainen@dovecot.fi>
Tue, 26 Jan 2016 13:40:09 +0000 (15:40 +0200)
src/lib-storage/index/pop3c/pop3c-client.c
src/lib-storage/index/pop3c/pop3c-client.h
src/lib-storage/index/pop3c/pop3c-mail.c
src/lib-storage/index/pop3c/pop3c-storage.c
src/lib-storage/index/pop3c/pop3c-sync.c

index 1b5ab902d7e437b04b36eb2aab9ea4e90b5f7f1d..8eba22e3532d83ebfd01a620c5e3b10dd481f145 100644 (file)
@@ -1,9 +1,11 @@
 /* Copyright (c) 2011-2016 Dovecot authors, see the included COPYING file */
 
 #include "lib.h"
+#include "array.h"
 #include "ioloop.h"
 #include "net.h"
 #include "istream.h"
+#include "istream-chain.h"
 #include "istream-dot.h"
 #include "istream-seekable.h"
 #include "ostream.h"
@@ -38,6 +40,20 @@ enum pop3c_client_state {
        POP3C_CLIENT_STATE_DONE
 };
 
+struct pop3c_client_sync_cmd_ctx {
+       enum pop3c_command_state state;
+       char *reply;
+};
+
+struct pop3c_client_cmd {
+       struct istream *input;
+       struct istream_chain *chain;
+       bool reading_dot;
+
+       pop3c_cmd_callback_t *callback;
+       void *context;
+};
+
 struct pop3c_client {
        pool_t pool;
        struct pop3c_client_settings set;
@@ -59,7 +75,7 @@ struct pop3c_client {
        pop3c_login_callback_t *login_callback;
        void *login_context;
 
-       unsigned int async_commands;
+       ARRAY(struct pop3c_client_cmd) commands;
        const char *input_line;
        struct istream *dot_input;
 
@@ -71,6 +87,7 @@ pop3c_dns_callback(const struct dns_lookup_result *result,
                   struct pop3c_client *client);
 static void pop3c_client_connect_ip(struct pop3c_client *client);
 static int pop3c_client_ssl_init(struct pop3c_client *client);
+static void pop3c_client_input(struct pop3c_client *client);
 
 struct pop3c_client *
 pop3c_client_init(const struct pop3c_client_settings *set)
@@ -84,6 +101,7 @@ pop3c_client_init(const struct pop3c_client_settings *set)
        client = p_new(pool, struct pop3c_client, 1);
        client->pool = pool;
        client->fd = -1;
+       p_array_init(&client->commands, pool, 16);
 
        client->set.debug = set->debug;
        client->set.host = p_strdup(pool, set->host);
@@ -131,10 +149,50 @@ client_login_callback(struct pop3c_client *client,
        }
 }
 
+static void
+pop3c_client_async_callback(struct pop3c_client *client,
+                           enum pop3c_command_state state, const char *reply)
+{
+       struct pop3c_client_cmd *cmd, cmd_copy;
+       bool running = client->running;
+
+       i_assert(reply != NULL);
+       i_assert(array_count(&client->commands) > 0);
+
+       cmd = array_idx_modifiable(&client->commands, 0);
+       if (cmd->input != NULL && state == POP3C_COMMAND_STATE_OK &&
+           !cmd->reading_dot) {
+               /* read the full input into seekable-istream before calling
+                  the callback */
+               i_assert(client->dot_input == NULL);
+               i_stream_chain_append(cmd->chain, client->input);
+               client->dot_input = cmd->input;
+               cmd->reading_dot = TRUE;
+               return;
+       }
+       cmd_copy = *cmd;
+       array_delete(&client->commands, 0, 1);
+
+       if (cmd_copy.input != NULL) {
+               i_stream_seek(cmd_copy.input, 0);
+               i_stream_unref(&cmd_copy.input);
+       }
+       if (cmd_copy.callback != NULL)
+               cmd_copy.callback(state, reply, cmd_copy.context);
+       if (running)
+               io_loop_stop(current_ioloop);
+}
+
+static void
+pop3c_client_async_callback_disconnected(struct pop3c_client *client)
+{
+       pop3c_client_async_callback(client, POP3C_COMMAND_STATE_DISCONNECTED,
+                                   "Disconnected");
+}
+
 static void pop3c_client_disconnect(struct pop3c_client *client)
 {
        client->state = POP3C_CLIENT_STATE_DISCONNECTED;
-       client->async_commands = 0;
 
        if (client->running)
                io_loop_stop(current_ioloop);
@@ -156,6 +214,8 @@ static void pop3c_client_disconnect(struct pop3c_client *client)
                        i_error("close(pop3c) failed: %m");
                client->fd = -1;
        }
+       while (array_count(&client->commands) > 0)
+               pop3c_client_async_callback_disconnected(client);
        client_login_callback(client, POP3C_COMMAND_STATE_DISCONNECTED,
                              "Disconnected");
 }
@@ -233,13 +293,21 @@ static int pop3c_client_dns_lookup(struct pop3c_client *client)
        return 0;
 }
 
-void pop3c_client_run(struct pop3c_client *client)
+void pop3c_client_wait_one(struct pop3c_client *client)
 {
        struct ioloop *ioloop, *prev_ioloop = current_ioloop;
        bool timeout_added = FALSE, failed = FALSE;
 
+       if (client->state == POP3C_CLIENT_STATE_DISCONNECTED &&
+           array_count(&client->commands) > 0) {
+               while (array_count(&client->commands) > 0)
+                       pop3c_client_async_callback_disconnected(client);
+       }
+
        i_assert(client->fd != -1 ||
                 client->state == POP3C_CLIENT_STATE_CONNECTING);
+       i_assert(array_count(&client->commands) > 0 ||
+                client->state == POP3C_CLIENT_STATE_CONNECTING);
 
        ioloop = io_loop_create();
        pop3c_client_ioloop_changed(client);
@@ -328,6 +396,8 @@ pop3c_client_get_sasl_plain_request(struct pop3c_client *client)
 static void pop3c_client_login_finished(struct pop3c_client *client)
 {
        io_remove(&client->io);
+       client->io = io_add(client->fd, IO_READ, pop3c_client_input, client);
+
        timeout_remove(&client->to);
        client->state = POP3C_CLIENT_STATE_DONE;
 
@@ -635,110 +705,139 @@ pop3c_client_get_capabilities(struct pop3c_client *client)
        return client->capabilities;
 }
 
-static void pop3c_client_input_reply(struct pop3c_client *client)
+static int pop3c_client_dot_input(struct pop3c_client *client)
 {
-       i_assert(client->state == POP3C_CLIENT_STATE_DONE);
+       ssize_t ret;
 
-       if (client->to != NULL)
-               timeout_reset(client->to);
-       client->input_line = i_stream_read_next_line(client->input);
-       if (client->input_line != NULL)
-               io_loop_stop(current_ioloop);
-       else if (client->input->closed || client->input->eof ||
-                client->input->stream_errno != 0) {
-               /* disconnected */
-               i_error("pop3c(%s): Server disconnected unexpectedly",
-                       client->set.host);
-               pop3c_client_disconnect(client);
-               io_loop_stop(current_ioloop);
+       while ((ret = i_stream_read(client->dot_input)) > 0 || ret == -2) {
+               i_stream_skip(client->dot_input,
+                             i_stream_get_data_size(client->dot_input));
        }
-}
+       if (ret == 0)
+               return 0;
+       i_assert(ret == -1);
 
-static int
-pop3c_client_read_line(struct pop3c_client *client,
-                      const char **line_r, const char **error_r)
-{
-       i_assert(client->io == NULL);
-       i_assert(client->input_line == NULL);
+       if (client->dot_input->stream_errno == 0)
+               ret = 1;
+       client->dot_input = NULL;
 
-       client->io = io_add(client->fd, IO_READ,
-                           pop3c_client_input_reply, client);
-       pop3c_client_input_reply(client);
-       if (client->input_line == NULL && client->input != NULL)
-               pop3c_client_run(client);
-
-       if (client->input_line == NULL) {
-               i_assert(client->io == NULL);
-               *error_r = "Disconnected";
+       if (ret > 0) {
+               /* currently we don't actually care about preserving the
+                  +OK reply line for multi-line replies, so just return
+                  it as empty */
+               pop3c_client_async_callback(client, POP3C_COMMAND_STATE_OK, "");
+               return 1;
+       } else {
+               pop3c_client_async_callback_disconnected(client);
                return -1;
        }
-
-       io_remove(&client->io);
-       *line_r = t_strdup(client->input_line);
-       client->input_line = NULL;
-       return 0;
 }
 
 static int
-pop3c_client_flush_asyncs(struct pop3c_client *client, const char **error_r)
+pop3c_client_input_next_reply(struct pop3c_client *client)
 {
        const char *line;
+       enum pop3c_client_state state;
 
-       if (client->state != POP3C_CLIENT_STATE_DONE) {
-               i_assert(client->state == POP3C_CLIENT_STATE_DISCONNECTED);
-               *error_r = "Disconnected";
-               return -1;
-       }
+       line = i_stream_read_next_line(client->input);
+       if (line == NULL)
+               return client->input->eof ? -1 : 0;
 
-       while (client->async_commands > 0) {
-               if (pop3c_client_read_line(client, &line, error_r) < 0)
-                       return -1;
-               client->async_commands--;
+       if (strncasecmp(line, "+OK", 3) == 0) {
+               line += 3;
+               state = POP3C_COMMAND_STATE_OK;
+       } else if (strncasecmp(line, "-ERR", 4) == 0) {
+               line += 4;
+               state = POP3C_COMMAND_STATE_ERR;
+       } else {
+               i_error("pop3c(%s): Server sent unrecognized line: %s",
+                       client->set.host, line);
+               state = POP3C_COMMAND_STATE_ERR;
+       }
+       if (line[0] == ' ')
+               line++;
+       if (array_count(&client->commands) == 0) {
+               i_error("pop3c(%s): Server sent line when no command was running: %s",
+                       client->set.host, line);
+       } else {
+               pop3c_client_async_callback(client, state, line);
        }
-       return 0;
+       return 1;
 }
 
-int pop3c_client_cmd_line(struct pop3c_client *client, const char *cmd,
-                         const char **reply_r)
+static void pop3c_client_input(struct pop3c_client *client)
 {
-       const char *line;
        int ret;
 
-       if (pop3c_client_flush_asyncs(client, reply_r) < 0)
-               return -1;
-       o_stream_nsend_str(client->output, cmd);
-       if (pop3c_client_read_line(client, &line, reply_r) < 0)
-               return -1;
-       if (strncasecmp(line, "+OK", 3) == 0) {
-               *reply_r = line + 3;
-               ret = 0;
-       } else if (strncasecmp(line, "-ERR", 4) == 0) {
-               *reply_r = line + 4;
-               ret = -1;
-       } else {
-               *reply_r = line;
-               ret = -1;
+       if (client->to != NULL)
+               timeout_reset(client->to);
+       do {
+               if (client->dot_input != NULL) {
+                       /* continue reading the current multiline reply */
+                       if ((ret = pop3c_client_dot_input(client)) == 0)
+                               return;
+               } else {
+                       ret = pop3c_client_input_next_reply(client);
+               }
+       } while (ret > 0);
+
+       if (ret < 0) {
+               i_error("pop3c(%s): Server disconnected unexpectedly",
+                       client->set.host);
+               pop3c_client_disconnect(client);
        }
-       if (**reply_r == ' ')
-               *reply_r += 1;
-       return ret;
 }
 
-void pop3c_client_cmd_line_async(struct pop3c_client *client, const char *cmd)
+static void pop3c_client_cmd_reply(enum pop3c_command_state state,
+                                  const char *reply, void *context)
 {
-       const char *error;
+       struct pop3c_client_sync_cmd_ctx *ctx = context;
 
-       if (client->state != POP3C_CLIENT_STATE_DONE) {
-               i_assert(client->state == POP3C_CLIENT_STATE_DISCONNECTED);
-               return;
-       }
+       i_assert(ctx->reply == NULL);
+
+       ctx->state = state;
+       ctx->reply = i_strdup(reply);
+}
+
+int pop3c_client_cmd_line(struct pop3c_client *client, const char *cmdline,
+                         const char **reply_r)
+{
+       struct pop3c_client_sync_cmd_ctx ctx;
+
+       memset(&ctx, 0, sizeof(ctx));
+       pop3c_client_cmd_line_async(client, cmdline, pop3c_client_cmd_reply, &ctx);
+       while (ctx.reply == NULL)
+               pop3c_client_wait_one(client);
+       *reply_r = t_strdup(ctx.reply);
+       i_free(ctx.reply);
+       return ctx.state == POP3C_COMMAND_STATE_OK ? 0 : -1;
+}
+
+struct pop3c_client_cmd *
+pop3c_client_cmd_line_async(struct pop3c_client *client, const char *cmdline,
+                           pop3c_cmd_callback_t *callback, void *context)
+{
+       struct pop3c_client_cmd *cmd;
 
        if ((client->capabilities & POP3C_CAPABILITY_PIPELINING) == 0) {
-               if (pop3c_client_flush_asyncs(client, &error) < 0)
-                       return;
-       }
-       o_stream_nsend_str(client->output, cmd);
-       client->async_commands++;
+               while (array_count(&client->commands) > 0)
+                       pop3c_client_wait_one(client);
+       }
+       i_assert(client->state == POP3C_CLIENT_STATE_DISCONNECTED ||
+                client->state == POP3C_CLIENT_STATE_DONE);
+       if (client->state == POP3C_CLIENT_STATE_DONE)
+               o_stream_nsend_str(client->output, cmdline);
+
+       cmd = array_append_space(&client->commands);
+       cmd->callback = callback;
+       cmd->context = context;
+       return cmd;
+}
+
+void pop3c_client_cmd_line_async_nocb(struct pop3c_client *client,
+                                     const char *cmdline)
+{
+       pop3c_client_cmd_line_async(client, cmdline, NULL, NULL);
 }
 
 static int seekable_fd_callback(const char **path_r, void *context)
@@ -766,67 +865,44 @@ static int seekable_fd_callback(const char **path_r, void *context)
        return fd;
 }
 
-static void pop3c_client_dot_input(struct pop3c_client *client)
+int pop3c_client_cmd_stream(struct pop3c_client *client, const char *cmdline,
+                           struct istream **input_r, const char **error_r)
 {
-       ssize_t ret;
+       struct pop3c_client_sync_cmd_ctx ctx;
+       const char *reply;
 
-       if (client->to != NULL)
-               timeout_reset(client->to);
-       while ((ret = i_stream_read(client->dot_input)) > 0 || ret == -2) {
-               i_stream_skip(client->dot_input,
-                             i_stream_get_data_size(client->dot_input));
-       }
-       if (ret != 0) {
-               i_assert(ret == -1);
-               if (client->dot_input->stream_errno != 0) {
-                       i_error("pop3c(%s): Server disconnected unexpectedly",
-                               client->set.host);
-                       pop3c_client_disconnect(client);
-               }
-               if (client->running)
-                       io_loop_stop(current_ioloop);
-       }
+       memset(&ctx, 0, sizeof(ctx));
+       *input_r = pop3c_client_cmd_stream_async(client, cmdline,
+                                                pop3c_client_cmd_reply, &ctx);
+       while (ctx.reply == NULL)
+               pop3c_client_wait_one(client);
+       reply = t_strdup(ctx.reply);
+       i_free(ctx.reply);
+
+       if (ctx.state == POP3C_COMMAND_STATE_OK)
+               return 0;
+       i_stream_unref(input_r);
+       *error_r = reply;
+       return -1;
 }
 
-int pop3c_client_cmd_stream(struct pop3c_client *client, const char *cmd,
-                           struct istream **input_r, const char **error_r)
+struct istream *
+pop3c_client_cmd_stream_async(struct pop3c_client *client, const char *cmdline,
+                             pop3c_cmd_callback_t *callback, void *context)
 {
-       struct istream *inputs[2];
+       struct istream *input, *inputs[2];
+       struct pop3c_client_cmd *cmd;
 
-       *input_r = NULL;
+       cmd = pop3c_client_cmd_line_async(client, cmdline, callback, context);
 
-       /* read the +OK / -ERR */
-       if (pop3c_client_cmd_line(client, cmd, error_r) < 0)
-               return -1;
-       /* read the stream */
-       inputs[0] = i_stream_create_dot(client->input, TRUE);
+       input = i_stream_create_chain(&cmd->chain);
+       inputs[0] = i_stream_create_dot(input, TRUE);
        inputs[1] = NULL;
-       client->dot_input =
-               i_stream_create_seekable(inputs, POP3C_MAX_INBUF_SIZE,
-                                        seekable_fd_callback, client);
+       cmd->input = i_stream_create_seekable(inputs, POP3C_MAX_INBUF_SIZE,
+                                             seekable_fd_callback, client);
+       i_stream_unref(&input);
        i_stream_unref(&inputs[0]);
 
-       i_assert(client->io == NULL);
-       client->io = io_add(client->fd, IO_READ,
-                           pop3c_client_dot_input, client);
-       /* read any pending data from the stream */
-       pop3c_client_dot_input(client);
-       if (!client->dot_input->eof)
-               pop3c_client_run(client);
-
-       if (client->input == NULL) {
-               i_assert(client->io == NULL);
-               i_stream_destroy(&client->dot_input);
-               *error_r = "Disconnected";
-               return -1;
-       }
-       io_remove(&client->io);
-       i_stream_seek(client->dot_input, 0);
-       /* if this stream is used by some filter stream, make the filter
-          stream blocking */
-       client->dot_input->blocking = TRUE;
-
-       *input_r = client->dot_input;
-       client->dot_input = NULL;
-       return 0;
+       i_stream_ref(cmd->input);
+       return cmd->input;
 }
index edffa41df49a67684f0d4abf86c1ac6d922fef5a..84365c2621414b14d7d9ed58f9f8cf7631ab7893 100644 (file)
@@ -43,13 +43,13 @@ struct pop3c_client_settings {
 
 typedef void pop3c_login_callback_t(enum pop3c_command_state state,
                                    const char *reply, void *context);
+typedef void pop3c_cmd_callback_t(enum pop3c_command_state state,
+                                 const char *reply, void *context);
 
 struct pop3c_client *
 pop3c_client_init(const struct pop3c_client_settings *set);
 void pop3c_client_deinit(struct pop3c_client **client);
 
-void pop3c_client_run(struct pop3c_client *client);
-
 void pop3c_client_login(struct pop3c_client *client,
                        pop3c_login_callback_t *callback, void *context);
 
@@ -59,13 +59,25 @@ pop3c_client_get_capabilities(struct pop3c_client *client);
 
 /* Returns 0 if received +OK reply, reply contains the text without the +OK.
    Returns -1 if received -ERR reply or disconnected. */
-int pop3c_client_cmd_line(struct pop3c_client *client, const char *cmd,
+int pop3c_client_cmd_line(struct pop3c_client *client, const char *cmdline,
                          const char **reply_r);
+/* Start the command asynchronously. Call the callback when finished. */
+struct pop3c_client_cmd *
+pop3c_client_cmd_line_async(struct pop3c_client *client, const char *cmdline,
+                           pop3c_cmd_callback_t *callback, void *context);
 /* Send a command, don't care if it succeeds or not. */
-void pop3c_client_cmd_line_async(struct pop3c_client *client, const char *cmd);
+void pop3c_client_cmd_line_async_nocb(struct pop3c_client *client,
+                                     const char *cmdline);
 /* Returns 0 and stream if succeeded, -1 and error if received -ERR reply or
    disconnected. */
-int pop3c_client_cmd_stream(struct pop3c_client *client, const char *cmd,
+int pop3c_client_cmd_stream(struct pop3c_client *client, const char *cmdline,
                            struct istream **input_r, const char **error_r);
+/* Start the command asynchronously. Call the callback when finished. */
+struct istream *
+pop3c_client_cmd_stream_async(struct pop3c_client *client, const char *cmdline,
+                             pop3c_cmd_callback_t *callback, void *context);
+/* Wait for the next async command to finish. It's an error to call this when
+   there are no pending async commands. */
+void pop3c_client_wait_one(struct pop3c_client *client);
 
 #endif
index 53ee963b9de9a93d579569c552fbb848c6b1377f..7769d38f78eb05bc3350a20a653244d18fbe4682 100644 (file)
@@ -151,6 +151,9 @@ pop3c_mail_get_stream(struct mail *_mail, bool get_body,
                if (get_body)
                        pop3c_mail_cache_size(mail);
        }
+       /* if this stream is used by some filter stream, make the
+          filter stream blocking */
+       mail->data.stream->blocking = TRUE;
        return index_mail_init_stream(mail, hdr_size, body_size, stream_r);
 }
 
index 4392e33a6eb66c79a9e863564a5c7a711a5f453d..cb50f5321b0b2aab6f7ed5cacade7d150aecaca5 100644 (file)
@@ -176,7 +176,7 @@ static int pop3c_mailbox_open(struct mailbox *box)
        mbox->client = pop3c_client_create_from_set(box->storage,
                                                    mbox->storage->set);
        pop3c_client_login(mbox->client, pop3c_login_callback, mbox);
-       pop3c_client_run(mbox->client);
+       pop3c_client_wait_one(mbox->client);
        return mbox->logged_in ? 0 : -1;
 }
 
index 56bb84699f603d593e8f25f9d8b4ee0a79166c13..8acbc79532b00cd3a74f10852bf1dafa8b9e674c 100644 (file)
@@ -319,7 +319,7 @@ int pop3c_sync(struct pop3c_mailbox *mbox)
 
                        str_truncate(str, 0);
                        str_printfa(str, "DELE %u\r\n", idx+1);
-                       pop3c_client_cmd_line_async(mbox->client, str_c(str));
+                       pop3c_client_cmd_line_async_nocb(mbox->client, str_c(str));
                        deletions = TRUE;
                }
        }