]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
lmtp proxy fixes.
authorTimo Sirainen <tss@iki.fi>
Mon, 7 Dec 2009 18:34:35 +0000 (13:34 -0500)
committerTimo Sirainen <tss@iki.fi>
Mon, 7 Dec 2009 18:34:35 +0000 (13:34 -0500)
--HG--
branch : HEAD

src/lib-lda/lmtp-client.c
src/lib-lda/lmtp-client.h
src/lmtp/lmtp-proxy.c

index 2435bb2762257b696567b3c0a4b02a82ff20c2e0..8819659ce4c4f3a19a59fd4a058cff62523d3e24 100644 (file)
@@ -49,6 +49,9 @@ struct lmtp_client {
        struct io *io;
        int fd;
 
+       void (*data_output_callback)(void *);
+       void *data_output_context;
+
        lmtp_finish_callback_t *finish_callback;
        void *finish_context;
 
@@ -228,8 +231,7 @@ lmtp_client_data_next(struct lmtp_client *client, const char *line)
 
                client->rcpt_next_data_idx = i + 1;
                rcpt[i].failed = line[0] != '2';
-               rcpt[i].data_callback(!rcpt[i].failed, line,
-                                     rcpt[i].context);
+               rcpt[i].data_callback(!rcpt[i].failed, line, rcpt[i].context);
                if (client->protocol == LMTP_CLIENT_PROTOCOL_LMTP)
                        break;
        }
@@ -245,6 +247,7 @@ static void lmtp_client_send_data(struct lmtp_client *client)
        const unsigned char *data;
        unsigned char add;
        size_t i, size;
+       bool sent_bytes;
        int ret;
 
        if (client->output_finished)
@@ -275,6 +278,7 @@ static void lmtp_client_send_data(struct lmtp_client *client)
                                break;
                        client->output_last = data[i-1];
                        i_stream_skip(client->data_input, i);
+                       sent_bytes = TRUE;
                }
 
                if (o_stream_get_buffer_used_size(client->output) >= 4096) {
@@ -293,6 +297,9 @@ static void lmtp_client_send_data(struct lmtp_client *client)
                        client->output_last = add;
                }
        }
+       if (sent_bytes && client->data_output_callback != NULL)
+               client->data_output_callback(client->data_output_context);
+
        if (ret == 0 || ret == -2) {
                /* -2 can happen with tee istreams */
                return;
@@ -542,3 +549,11 @@ void lmtp_client_send_more(struct lmtp_client *client)
        if (client->input_state == LMTP_INPUT_STATE_DATA)
                lmtp_client_send_data(client);
 }
+
+void lmtp_client_set_data_output_callback(struct lmtp_client *client,
+                                         void (*callback)(void *),
+                                         void *context)
+{
+       client->data_output_callback = callback;
+       client->data_output_context = context;
+}
index 60889b10fe2ea641a6e7cc8f53b18f73e1a6c81b..77d3e2a5eddbd14c88bdd10428494e97ea5b0b32 100644 (file)
@@ -46,5 +46,10 @@ void lmtp_client_send_more(struct lmtp_client *client);
 void lmtp_client_fail(struct lmtp_client *client, const char *line);
 /* Return the state (command reply) the client is currently waiting for. */
 const char *lmtp_client_state_to_string(struct lmtp_client *client);
+/* Call the given callback whenever client manages to send some more DATA
+   output to client. */
+void lmtp_client_set_data_output_callback(struct lmtp_client *client,
+                                         void (*callback)(void *),
+                                         void *context);
 
 #endif
index 27315f349fcc037dea74405a593453f6a82423f5..2dc240b41d9c85a509b970ca43feff8cd8db45e5 100644 (file)
@@ -27,6 +27,8 @@ struct lmtp_proxy_connection {
 
        struct lmtp_client *client;
        struct istream *data_input;
+
+       unsigned int finished:1;
        unsigned int failed:1;
 };
 
@@ -34,10 +36,10 @@ struct lmtp_proxy {
        pool_t pool;
        const char *mail_from, *my_hostname;
        ARRAY_DEFINE(connections, struct lmtp_proxy_connection *);
-       ARRAY_DEFINE(rcpt_to, struct lmtp_proxy_recipient);
+       ARRAY_DEFINE(rcpt_to, struct lmtp_proxy_recipient *);
        unsigned int next_data_reply_idx;
 
-       struct timeout *to, *to_data_idle;
+       struct timeout *to, *to_data_idle, *to_finish;
        struct io *io;
        struct istream *data_input, *orig_data_input;
        struct ostream *client_output;
@@ -50,6 +52,7 @@ struct lmtp_proxy {
 
        unsigned int finished:1;
        unsigned int input_timeout:1;
+       unsigned int handling_data_input:1;
 };
 
 static void lmtp_conn_finish(void *context);
@@ -80,7 +83,6 @@ static void lmtp_proxy_connections_deinit(struct lmtp_proxy *proxy)
        array_foreach(&proxy->connections, conns) {
                struct lmtp_proxy_connection *conn = *conns;
 
-               lmtp_client_fail(conn->client, "451 4.3.0 Aborting");
                lmtp_client_deinit(&conn->client);
        }
 }
@@ -147,16 +149,16 @@ lmtp_proxy_get_connection(struct lmtp_proxy *proxy,
 
 static bool lmtp_proxy_send_data_replies(struct lmtp_proxy *proxy)
 {
-       const struct lmtp_proxy_recipient *rcpt;
+       struct lmtp_proxy_recipient *const *rcpt;
        unsigned int i, count;
 
        o_stream_cork(proxy->client_output);
        rcpt = array_get(&proxy->rcpt_to, &count);
        for (i = proxy->next_data_reply_idx; i < count; i++) {
-               if (!(rcpt[i].rcpt_to_failed || rcpt[i].data_reply_received))
+               if (!(rcpt[i]->rcpt_to_failed || rcpt[i]->data_reply_received))
                        break;
                o_stream_send_str(proxy->client_output,
-                                 t_strconcat(rcpt[i].reply, "\r\n", NULL));
+                                 t_strconcat(rcpt[i]->reply, "\r\n", NULL));
        }
        o_stream_uncork(proxy->client_output);
        proxy->next_data_reply_idx = i;
@@ -164,14 +166,28 @@ static bool lmtp_proxy_send_data_replies(struct lmtp_proxy *proxy)
        return i == count;
 }
 
-static void lmtp_proxy_finish(struct lmtp_proxy *proxy)
+static void lmtp_proxy_finish_timeout(struct lmtp_proxy *proxy)
 {
        i_assert(!proxy->finished);
 
+       timeout_remove(&proxy->to_finish);
        proxy->finished = TRUE;
        proxy->finish_callback(proxy->input_timeout, proxy->finish_context);
 }
 
+static void lmtp_proxy_finish(struct lmtp_proxy *proxy)
+{
+       /* do the actual finishing in a timeout handler, since the finish
+          callback causes the proxy to be destroyed and the code leading up
+          to this function can be called from many different places. it's
+          easier this way rather than having all the callers check if the
+          proxy was already destroyed. */
+       if (proxy->to_finish == NULL) {
+               proxy->to_finish = timeout_add(0, lmtp_proxy_finish_timeout,
+                                              proxy);
+       }
+}
+
 static void lmtp_proxy_try_finish(struct lmtp_proxy *proxy)
 {
        if (lmtp_proxy_send_data_replies(proxy) &&
@@ -184,6 +200,7 @@ static void lmtp_conn_finish(void *context)
 {
        struct lmtp_proxy_connection *conn = context;
 
+       conn->finished = TRUE;
        if (conn->data_input != NULL)
                i_stream_unref(&conn->data_input);
        lmtp_proxy_try_finish(conn->proxy);
@@ -195,20 +212,22 @@ static void lmtp_proxy_fail_all(struct lmtp_proxy *proxy, const char *reason)
        unsigned int i, count;
        const char *line;
 
-       pool_ref(proxy->pool);
        conns = array_get(&proxy->connections, &count);
        for (i = 0; i < count; i++) {
                line = t_strdup_printf(ERRSTR_TEMP_REMOTE_FAILURE
                                " (%s while waiting for reply to %s)", reason,
                                lmtp_client_state_to_string(conns[i]->client));
                lmtp_client_fail(conns[i]->client, line);
+       }
 
-               if (!array_is_created(&proxy->connections))
-                       break;
+       if (proxy->to_finish == NULL) {
+               /* we still have some DATA input to read */
+               if (proxy->io == NULL) {
+                       proxy->io = io_add(i_stream_get_fd(proxy->data_input),
+                                          IO_READ,
+                                          lmtp_proxy_data_input, proxy);
+               }
        }
-       pool_unref(&proxy->pool);
-       /* either the whole proxy is destroyed now, or we still have some
-          DATA input to read. */
 }
 
 static void lmtp_proxy_data_input_timeout(struct lmtp_proxy *proxy)
@@ -219,18 +238,17 @@ static void lmtp_proxy_data_input_timeout(struct lmtp_proxy *proxy)
        proxy->input_timeout = TRUE;
        i_stream_close(proxy->orig_data_input);
 
-       pool_ref(proxy->pool);
        conns = array_get(&proxy->connections, &count);
        for (i = 0; i < count; i++) {
                lmtp_client_fail(conns[i]->client, ERRSTR_TEMP_REMOTE_FAILURE
                                 " (timeout in DATA input)");
-               if (!array_is_created(&proxy->connections)) {
-                       pool_unref(&proxy->pool);
-                       return;
-               }
        }
-       /* last client failure should have caused the proxy to be destroyed */
-       i_unreached();
+       if (proxy->to_finish == NULL) {
+               /* we had earlier failed all clients already and were just
+                  waiting for DATA input to finish, but DATA input also failed
+                  with a timeout. */
+               lmtp_proxy_finish(proxy);
+       }
 }
 
 static void
@@ -268,51 +286,48 @@ int lmtp_proxy_add_rcpt(struct lmtp_proxy *proxy, const char *address,
        if (conn->failed)
                return -1;
 
-       rcpt = array_append_space(&proxy->rcpt_to);
+       rcpt = p_new(proxy->pool, struct lmtp_proxy_recipient, 1);
        rcpt->conn = conn;
        rcpt->address = p_strdup(proxy->pool, address);
+       array_append(&proxy->rcpt_to, &rcpt, 1);
 
        lmtp_client_add_rcpt(conn->client, address, lmtp_proxy_conn_rcpt_to,
                             lmtp_proxy_conn_data, rcpt);
        return 0;
 }
 
-static size_t lmtp_proxy_find_max_data_input_size(struct lmtp_proxy *proxy)
+static size_t lmtp_proxy_find_lowest_offset(struct lmtp_proxy *proxy)
 {
        struct lmtp_proxy_connection *const *conns;
-       unsigned int i, count;
-       size_t size, max_size = 0;
+       uoff_t min_offset = (uoff_t)-1;
 
-       conns = array_get(&proxy->connections, &count);
-       for (i = 0; i < count; i++) {
-               if (conns[i]->data_input == NULL)
-                       continue;
-               (void)i_stream_get_data(conns[i]->data_input, &size);
-               if (max_size < size)
-                       max_size = size;
+       array_foreach(&proxy->connections, conns) {
+               struct lmtp_proxy_connection *conn = *conns;
+
+               if (conn->data_input != NULL &&
+                   min_offset > conn->data_input->v_offset)
+                       min_offset = conn->data_input->v_offset;
        }
-       return max_size;
+       return min_offset;
 }
 
 static bool lmtp_proxy_disconnect_hanging_output(struct lmtp_proxy *proxy)
 {
        struct lmtp_proxy_connection *const *conns;
-       unsigned int i, count;
-       size_t size, max_size;
+       uoff_t min_offset;
 
-       max_size = lmtp_proxy_find_max_data_input_size(proxy);
-       if (max_size == 0)
+       min_offset = lmtp_proxy_find_lowest_offset(proxy);
+       if (min_offset == (uoff_t)-1)
                return FALSE;
 
        /* disconnect all connections that are keeping us from reading
           more input. */
-       conns = array_get(&proxy->connections, &count);
-       for (i = 0; i < count; i++) {
-               if (conns[i]->data_input == NULL)
-                       continue;
-               (void)i_stream_get_data(conns[i]->data_input, &size);
-               if (size == max_size) {
-                       lmtp_client_fail(conns[i]->client,
+       array_foreach(&proxy->connections, conns) {
+               struct lmtp_proxy_connection *conn = *conns;
+
+               if (conn->data_input != NULL &&
+                   conn->data_input->v_offset == min_offset) {
+                       lmtp_client_fail(conn->client,
                                         ERRSTR_TEMP_REMOTE_FAILURE
                                         " (DATA output timeout)");
                }
@@ -331,18 +346,19 @@ static void lmtp_proxy_output_timeout(struct lmtp_proxy *proxy)
                /* no such connection, so we've already sent everything but
                   some servers aren't replying to us. disconnect all of
                   them. */
+               i_assert(proxy->data_input->eof);
                lmtp_proxy_fail_all(proxy, "timeout");
        }
 }
 
 static void lmtp_proxy_wait_for_output(struct lmtp_proxy *proxy)
 {
-       i_assert(proxy->to == NULL);
-
        if (proxy->io != NULL)
                io_remove(&proxy->io);
-       proxy->to = timeout_add(proxy->max_timeout_msecs,
-                               lmtp_proxy_output_timeout, proxy);
+       if (proxy->to == NULL) {
+               proxy->to = timeout_add(proxy->max_timeout_msecs,
+                                       lmtp_proxy_output_timeout, proxy);
+       }
 }
 
 static bool lmtp_proxy_data_read(struct lmtp_proxy *proxy)
@@ -380,6 +396,8 @@ static bool lmtp_proxy_data_read(struct lmtp_proxy *proxy)
                return FALSE;
        default:
                /* something was read */
+               if (proxy->to != NULL)
+                       timeout_remove(&proxy->to);
                (void)i_stream_get_data(proxy->data_input, &size);
                i_stream_skip(proxy->data_input, size);
                return TRUE;
@@ -390,10 +408,25 @@ static void lmtp_proxy_data_input(struct lmtp_proxy *proxy)
 {
        struct lmtp_proxy_connection *const *conns;
 
+       i_assert(!proxy->handling_data_input);
+
+       proxy->handling_data_input = TRUE;
        do {
-               array_foreach(&proxy->connections, conns);
+               array_foreach(&proxy->connections, conns)
                        lmtp_client_send_more((*conns)->client);
        } while (lmtp_proxy_data_read(proxy));
+       proxy->handling_data_input = FALSE;
+}
+
+static void lmtp_proxy_more_data_sent(void *context)
+{
+       struct lmtp_proxy *proxy = context;
+
+       if (proxy->to != NULL && !proxy->handling_data_input) {
+               /* some tee child is blocking others. it might have been this
+                  one, so see if we can continue. */
+               lmtp_proxy_data_input(proxy);
+       }
 }
 
 void lmtp_proxy_start(struct lmtp_proxy *proxy, struct istream *data_input,
@@ -413,6 +446,15 @@ void lmtp_proxy_start(struct lmtp_proxy *proxy, struct istream *data_input,
        array_foreach(&proxy->connections, conns) {
                struct lmtp_proxy_connection *conn = *conns;
 
+               if (conn->finished) {
+                       /* this connection had already failed */
+                       continue;
+               }
+
+               lmtp_client_set_data_output_callback(conn->client,
+                                                    lmtp_proxy_more_data_sent,
+                                                    proxy);
+
                conn->data_input =
                        tee_i_stream_create_child(proxy->tee_data_input);
                lmtp_client_set_data_header(conn->client, header);