]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
lmtp: Simplify LMTP proxying by first reading the whole input to memory/disk.
authorTimo Sirainen <tss@iki.fi>
Sat, 10 Dec 2011 08:59:30 +0000 (10:59 +0200)
committerTimo Sirainen <tss@iki.fi>
Sat, 10 Dec 2011 08:59:30 +0000 (10:59 +0200)
This hopefully fixes problems related to LMTP proxying, at the cost of
having to write large mails to temp directory.

src/lmtp/commands.c
src/lmtp/lmtp-proxy.c
src/lmtp/lmtp-proxy.h

index 0c804033f96ccf98b22d5731c2e462e667f6c366..9a22841b494cdc76b4ec0b94dbfabc76bb691333 100644 (file)
@@ -717,18 +717,12 @@ static void client_input_data_finish(struct client *client)
                client_input_handle(client);
 }
 
-static void client_proxy_finish(bool timeout, void *context)
+static void client_proxy_finish(void *context)
 {
        struct client *client = context;
 
        lmtp_proxy_deinit(&client->proxy);
-       if (timeout) {
-               client_destroy(client,
-                       t_strdup_printf("421 4.4.2 %s", client->my_domain),
-                       "Disconnected for inactivity");
-       } else {
-               client_input_data_finish(client);
-       }
+       client_input_data_finish(client);
 }
 
 static const char *client_get_added_headers(struct client *client)
@@ -765,10 +759,12 @@ static bool client_input_data_write(struct client *client)
        struct istream *input;
        bool ret = TRUE;
 
+       io_remove(&client->io);
        i_stream_destroy(&client->dot_input);
 
        input = client_get_input(client);
-       client_input_data_write_local(client, input);
+       if (array_count(&client->state.rcpt_to) != 0)
+               client_input_data_write_local(client, input);
        if (client->proxy != NULL) {
                lmtp_proxy_start(client->proxy, input, NULL,
                                 client_proxy_finish, client);
@@ -896,18 +892,8 @@ int cmd_data(struct client *client, const char *args ATTR_UNUSED)
        client_send_line(client, "354 OK");
 
        io_remove(&client->io);
-       if (array_count(&client->state.rcpt_to) == 0) {
-               client->state.name = "DATA (proxy)";
-               timeout_remove(&client->to_idle);
-               lmtp_proxy_start(client->proxy, client->dot_input,
-                                client->state.added_headers,
-                                client_proxy_finish, client);
-               i_stream_unref(&client->dot_input);
-       } else {
-               client->state.name = "DATA";
-               client->io = io_add(client->fd_in, IO_READ,
-                                   client_input_data, client);
-               client_input_data_handle(client);
-       }
+       client->state.name = "DATA";
+       client->io = io_add(client->fd_in, IO_READ, client_input_data, client);
+       client_input_data_handle(client);
        return -1;
 }
index 4cfe986403db2d8a464f4ac7ced7842745f8ea42..e3ef76d8d05ba08dc2dfa323931dcc950d56a6f2 100644 (file)
@@ -4,13 +4,11 @@
 #include "array.h"
 #include "ioloop.h"
 #include "istream.h"
-#include "istream-tee.h"
 #include "ostream.h"
 #include "lmtp-client.h"
 #include "lmtp-proxy.h"
 
 #define LMTP_MAX_LINE_LEN 1024
-#define LMTP_PROXY_DATA_INPUT_TIMEOUT_MSECS (1000*60)
 
 struct lmtp_proxy_recipient {
        struct lmtp_proxy_connection *conn;
@@ -27,6 +25,7 @@ struct lmtp_proxy_connection {
 
        struct lmtp_client *client;
        struct istream *data_input;
+       struct timeout *to;
 
        unsigned int finished:1;
        unsigned int failed:1;
@@ -41,11 +40,9 @@ struct lmtp_proxy {
        ARRAY_DEFINE(rcpt_to, struct lmtp_proxy_recipient *);
        unsigned int next_data_reply_idx;
 
-       struct timeout *to, *to_data_idle, *to_finish;
-       struct io *io;
-       struct istream *data_input, *orig_data_input;
+       struct timeout *to_finish;
+       struct istream *data_input;
        struct ostream *client_output;
-       struct tee_istream *tee_data_input;
 
        unsigned int max_timeout_msecs;
 
@@ -53,12 +50,9 @@ struct lmtp_proxy {
        void *finish_context;
 
        unsigned int finished:1;
-       unsigned int input_timeout:1;
-       unsigned int handling_data_input:1;
 };
 
 static void lmtp_conn_finish(void *context);
-static void lmtp_proxy_data_input(struct lmtp_proxy *proxy);
 
 struct lmtp_proxy *
 lmtp_proxy_init(const char *my_hostname, const char *dns_client_socket_path,
@@ -102,14 +96,8 @@ void lmtp_proxy_deinit(struct lmtp_proxy **_proxy)
                i_stream_unref(&proxy->data_input);
        if (proxy->client_output != NULL)
                o_stream_unref(&proxy->client_output);
-       if (proxy->to_data_idle != NULL)
-               timeout_remove(&proxy->to_data_idle);
        if (proxy->to_finish != NULL)
                timeout_remove(&proxy->to_finish);
-       if (proxy->to != NULL)
-               timeout_remove(&proxy->to);
-       if (proxy->io != NULL)
-               io_remove(&proxy->io);
        array_free(&proxy->rcpt_to);
        array_free(&proxy->connections);
        pool_unref(&proxy->pool);
@@ -184,11 +172,19 @@ static void lmtp_proxy_finish_timeout(struct lmtp_proxy *proxy)
 
        timeout_remove(&proxy->to_finish);
        proxy->finished = TRUE;
-       proxy->finish_callback(proxy->input_timeout, proxy->finish_context);
+       proxy->finish_callback(proxy->finish_context);
 }
 
-static void lmtp_proxy_finish(struct lmtp_proxy *proxy)
+static void lmtp_proxy_try_finish(struct lmtp_proxy *proxy)
 {
+       if (proxy->finish_callback == NULL) {
+               /* DATA command hasn't been sent yet */
+               return;
+       }
+       if (!lmtp_proxy_send_data_replies(proxy)) {
+               /* we can't received reply from all clients yet */
+               return;
+       }
        /* do the actual finishing in a timeout handler, since the finish
           callback causes the proxy to be destroyed and the code leading up
           to this function can be called from many different places. it's
@@ -200,75 +196,18 @@ static void lmtp_proxy_finish(struct lmtp_proxy *proxy)
        }
 }
 
-static void lmtp_proxy_try_finish(struct lmtp_proxy *proxy)
-{
-       if (proxy->finish_callback == NULL) {
-               /* DATA command hasn't been sent yet */
-               return;
-       }
-       if (lmtp_proxy_send_data_replies(proxy) &&
-           (proxy->data_input == NULL ||
-            proxy->data_input->eof ||
-            proxy->data_input->stream_errno != 0 ||
-            proxy->input_timeout))
-               lmtp_proxy_finish(proxy);
-}
-
 static void lmtp_conn_finish(void *context)
 {
        struct lmtp_proxy_connection *conn = context;
 
        conn->finished = TRUE;
+       if (conn->to != NULL)
+               timeout_remove(&conn->to);
        if (conn->data_input != NULL)
                i_stream_unref(&conn->data_input);
        lmtp_proxy_try_finish(conn->proxy);
 }
 
-static void lmtp_proxy_fail_all(struct lmtp_proxy *proxy, const char *reason)
-{
-       struct lmtp_proxy_connection *const *conns;
-       unsigned int i, count;
-       const char *line;
-
-       conns = array_get(&proxy->connections, &count);
-       for (i = 0; i < count; i++) {
-               line = t_strdup_printf(ERRSTR_TEMP_REMOTE_FAILURE
-                               " (%s while waiting for reply to %s)", reason,
-                               lmtp_client_state_to_string(conns[i]->client));
-               lmtp_client_fail(conns[i]->client, line);
-       }
-
-       if (proxy->to_finish == NULL) {
-               /* we still have some DATA input to read */
-               if (proxy->io == NULL) {
-                       proxy->io = io_add(i_stream_get_fd(proxy->data_input),
-                                          IO_READ,
-                                          lmtp_proxy_data_input, proxy);
-               }
-       }
-}
-
-static void lmtp_proxy_data_input_timeout(struct lmtp_proxy *proxy)
-{
-       struct lmtp_proxy_connection *const *conns;
-       unsigned int i, count;
-
-       proxy->input_timeout = TRUE;
-       i_stream_close(proxy->orig_data_input);
-
-       conns = array_get(&proxy->connections, &count);
-       for (i = 0; i < count; i++) {
-               lmtp_client_fail(conns[i]->client, ERRSTR_TEMP_REMOTE_FAILURE
-                                " (timeout in DATA input)");
-       }
-       if (proxy->to_finish == NULL) {
-               /* we had earlier failed all clients already and were just
-                  waiting for DATA input to finish, but DATA input also failed
-                  with a timeout. */
-               lmtp_proxy_finish(proxy);
-       }
-}
-
 static void
 lmtp_proxy_conn_rcpt_to(bool success, const char *reply, void *context)
 {
@@ -316,151 +255,21 @@ int lmtp_proxy_add_rcpt(struct lmtp_proxy *proxy, const char *address,
        return 0;
 }
 
-static uoff_t lmtp_proxy_find_lowest_offset(struct lmtp_proxy *proxy)
-{
-       struct lmtp_proxy_connection *const *conns;
-       uoff_t min_offset = (uoff_t)-1;
-
-       array_foreach(&proxy->connections, conns) {
-               struct lmtp_proxy_connection *conn = *conns;
-
-               if (conn->data_input != NULL &&
-                   min_offset > conn->data_input->v_offset &&
-                   i_stream_have_bytes_left(conn->data_input))
-                       min_offset = conn->data_input->v_offset;
-       }
-       return min_offset;
-}
-
-static bool lmtp_proxy_disconnect_hanging_output(struct lmtp_proxy *proxy)
-{
-       struct lmtp_proxy_connection *const *conns;
-       uoff_t min_offset;
-       size_t size;
-       const char *errstr;
-
-       min_offset = lmtp_proxy_find_lowest_offset(proxy);
-       if (min_offset == (uoff_t)-1)
-               return FALSE;
-
-       /* disconnect all connections that are keeping us from reading
-          more input. */
-       array_foreach(&proxy->connections, conns) {
-               struct lmtp_proxy_connection *conn = *conns;
-
-               if (conn->data_input != NULL &&
-                   conn->data_input->v_offset == min_offset) {
-                       (void)i_stream_get_data(conn->data_input, &size);
-                       errstr = t_strdup_printf(ERRSTR_TEMP_REMOTE_FAILURE
-                               " (DATA output stalled for %u secs, "
-                               "%"PRIuUOFF_T"B sent, %"PRIuSIZE_T"B buffered)",
-                               proxy->max_timeout_msecs/1000,
-                               min_offset, size);
-                       lmtp_client_fail(conn->client, errstr);
-               }
-       }
-       return TRUE;
-}
-
-static void lmtp_proxy_output_timeout(struct lmtp_proxy *proxy)
-{
-       timeout_remove(&proxy->to);
-
-       /* drop the connection with the most unread data */
-       if (lmtp_proxy_disconnect_hanging_output(proxy))
-               lmtp_proxy_data_input(proxy);
-       else {
-               /* no such connection, so we've already sent everything but
-                  some servers aren't replying to us. disconnect all of
-                  them. */
-               i_assert(proxy->data_input->eof);
-               lmtp_proxy_fail_all(proxy, "timeout");
-       }
-}
-
-static void lmtp_proxy_wait_for_output(struct lmtp_proxy *proxy)
-{
-       if (proxy->io != NULL)
-               io_remove(&proxy->io);
-       if (proxy->to == NULL) {
-               proxy->to = timeout_add(proxy->max_timeout_msecs,
-                                       lmtp_proxy_output_timeout, proxy);
-       }
-}
-
-static void proxy_send_more(struct lmtp_proxy *proxy)
-{
-       struct lmtp_proxy_connection *const *conns;
-
-       array_foreach(&proxy->connections, conns)
-               lmtp_client_send_more((*conns)->client);
-}
-
-static bool lmtp_proxy_data_read(struct lmtp_proxy *proxy)
-{
-       size_t size;
-
-       timeout_reset(proxy->to_data_idle);
-
-       switch (i_stream_read(proxy->data_input)) {
-       case 0:
-               if (!tee_i_stream_child_is_waiting(proxy->data_input)) {
-                       /* nothing new read */
-                       if (proxy->io != NULL)
-                               return FALSE;
-                       proxy->io = io_add(i_stream_get_fd(proxy->data_input),
-                                          IO_READ,
-                                          lmtp_proxy_data_input, proxy);
-                       return FALSE;
-               }
-               /* fall through */
-       case -2:
-               /* buffer full. someone's stalling. */
-               lmtp_proxy_wait_for_output(proxy);
-               return FALSE;
-       case -1:
-               if (proxy->data_input->stream_errno != 0)
-                       lmtp_proxy_fail_all(proxy, "disconnect");
-               else {
-                       /* make sure LMTP clients see the EOF */
-                       proxy_send_more(proxy);
-                       /* finished reading data input. now we'll just have to
-                          wait for replies. */
-                       lmtp_proxy_wait_for_output(proxy);
-                       /* if all RCPT TOs failed, we can finish now */
-                       lmtp_proxy_try_finish(proxy);
-               }
-               return FALSE;
-       default:
-               /* something was read */
-               if (proxy->to != NULL)
-                       timeout_remove(&proxy->to);
-               (void)i_stream_get_data(proxy->data_input, &size);
-               i_stream_skip(proxy->data_input, size);
-               return TRUE;
-       }
-}
-
-static void lmtp_proxy_data_input(struct lmtp_proxy *proxy)
+static void lmtp_proxy_more_data_sent(void *context)
 {
-       i_assert(!proxy->handling_data_input);
+       struct lmtp_proxy_connection *conn = context;
 
-       proxy->handling_data_input = TRUE;
-       do {
-               proxy_send_more(proxy);
-       } while (lmtp_proxy_data_read(proxy));
-       proxy->handling_data_input = FALSE;
+       lmtp_client_send_more(conn->client);
 }
 
-static void lmtp_proxy_more_data_sent(void *context)
+static void lmtp_proxy_conn_timeout(struct lmtp_proxy_connection *conn)
 {
-       struct lmtp_proxy *proxy = context;
+       const char *line;
 
-       if (proxy->to != NULL && !proxy->handling_data_input) {
-               /* some tee child is blocking others. it might have been this
-                  one, so see if we can continue. */
-               lmtp_proxy_data_input(proxy);
-       }
+       line = t_strdup_printf(ERRSTR_TEMP_REMOTE_FAILURE
+                              " (timeout while waiting for reply to %s)",
+                              lmtp_client_state_to_string(conn->client));
+       lmtp_client_fail(conn->client, line);
 }
 
 void lmtp_proxy_start(struct lmtp_proxy *proxy, struct istream *data_input,
@@ -469,13 +278,12 @@ void lmtp_proxy_start(struct lmtp_proxy *proxy, struct istream *data_input,
 {
        struct lmtp_proxy_connection *const *conns;
 
+       i_assert(data_input->seekable);
+
        proxy->finish_callback = callback;
        proxy->finish_context = context;
-       proxy->orig_data_input = data_input;
-       proxy->tee_data_input = tee_i_stream_create(data_input);
-       proxy->data_input = tee_i_stream_create_child(proxy->tee_data_input);
-       proxy->to_data_idle = timeout_add(LMTP_PROXY_DATA_INPUT_TIMEOUT_MSECS,
-                                         lmtp_proxy_data_input_timeout, proxy);
+       proxy->data_input = data_input;
+       i_stream_ref(proxy->data_input);
 
        array_foreach(&proxy->connections, conns) {
                struct lmtp_proxy_connection *conn = *conns;
@@ -485,15 +293,15 @@ void lmtp_proxy_start(struct lmtp_proxy *proxy, struct istream *data_input,
                        continue;
                }
 
+               conn->to = timeout_add(proxy->max_timeout_msecs,
+                                      lmtp_proxy_conn_timeout, conn);
                lmtp_client_set_data_output_callback(conn->client,
                                                     lmtp_proxy_more_data_sent,
-                                                    proxy);
+                                                    conn);
 
-               conn->data_input =
-                       tee_i_stream_create_child(proxy->tee_data_input);
+               conn->data_input = i_stream_create_limit(data_input, (uoff_t)-1);
                lmtp_client_set_data_header(conn->client, header);
                lmtp_client_send(conn->client, conn->data_input);
+               lmtp_client_send_more(conn->client);
        }
-
-       lmtp_proxy_data_input(proxy);
 }
index babc12c9908716a592abfecfd6daeb10b2bc7d88..34f124546d849b535d1af0dcb6202f3b257c56cd 100644 (file)
@@ -11,7 +11,7 @@ struct lmtp_proxy_settings {
        enum lmtp_client_protocol protocol;
 };
 
-typedef void lmtp_proxy_finish_callback_t(bool timeout, void *context);
+typedef void lmtp_proxy_finish_callback_t(void *context);
 
 struct lmtp_proxy *
 lmtp_proxy_init(const char *my_hostname, const char *dns_client_socket_path,