client_input_handle(client);
}
-static void client_proxy_finish(bool timeout, void *context)
+static void client_proxy_finish(void *context)
{
struct client *client = context;
lmtp_proxy_deinit(&client->proxy);
- if (timeout) {
- client_destroy(client,
- t_strdup_printf("421 4.4.2 %s", client->my_domain),
- "Disconnected for inactivity");
- } else {
- client_input_data_finish(client);
- }
+ client_input_data_finish(client);
}
static const char *client_get_added_headers(struct client *client)
struct istream *input;
bool ret = TRUE;
+ io_remove(&client->io);
i_stream_destroy(&client->dot_input);
input = client_get_input(client);
- client_input_data_write_local(client, input);
+ if (array_count(&client->state.rcpt_to) != 0)
+ client_input_data_write_local(client, input);
if (client->proxy != NULL) {
lmtp_proxy_start(client->proxy, input, NULL,
client_proxy_finish, client);
client_send_line(client, "354 OK");
io_remove(&client->io);
- if (array_count(&client->state.rcpt_to) == 0) {
- client->state.name = "DATA (proxy)";
- timeout_remove(&client->to_idle);
- lmtp_proxy_start(client->proxy, client->dot_input,
- client->state.added_headers,
- client_proxy_finish, client);
- i_stream_unref(&client->dot_input);
- } else {
- client->state.name = "DATA";
- client->io = io_add(client->fd_in, IO_READ,
- client_input_data, client);
- client_input_data_handle(client);
- }
+ client->state.name = "DATA";
+ client->io = io_add(client->fd_in, IO_READ, client_input_data, client);
+ client_input_data_handle(client);
return -1;
}
#include "array.h"
#include "ioloop.h"
#include "istream.h"
-#include "istream-tee.h"
#include "ostream.h"
#include "lmtp-client.h"
#include "lmtp-proxy.h"
#define LMTP_MAX_LINE_LEN 1024
-#define LMTP_PROXY_DATA_INPUT_TIMEOUT_MSECS (1000*60)
struct lmtp_proxy_recipient {
struct lmtp_proxy_connection *conn;
struct lmtp_client *client;
struct istream *data_input;
+ struct timeout *to;
unsigned int finished:1;
unsigned int failed:1;
ARRAY_DEFINE(rcpt_to, struct lmtp_proxy_recipient *);
unsigned int next_data_reply_idx;
- struct timeout *to, *to_data_idle, *to_finish;
- struct io *io;
- struct istream *data_input, *orig_data_input;
+ struct timeout *to_finish;
+ struct istream *data_input;
struct ostream *client_output;
- struct tee_istream *tee_data_input;
unsigned int max_timeout_msecs;
void *finish_context;
unsigned int finished:1;
- unsigned int input_timeout:1;
- unsigned int handling_data_input:1;
};
static void lmtp_conn_finish(void *context);
-static void lmtp_proxy_data_input(struct lmtp_proxy *proxy);
struct lmtp_proxy *
lmtp_proxy_init(const char *my_hostname, const char *dns_client_socket_path,
i_stream_unref(&proxy->data_input);
if (proxy->client_output != NULL)
o_stream_unref(&proxy->client_output);
- if (proxy->to_data_idle != NULL)
- timeout_remove(&proxy->to_data_idle);
if (proxy->to_finish != NULL)
timeout_remove(&proxy->to_finish);
- if (proxy->to != NULL)
- timeout_remove(&proxy->to);
- if (proxy->io != NULL)
- io_remove(&proxy->io);
array_free(&proxy->rcpt_to);
array_free(&proxy->connections);
pool_unref(&proxy->pool);
timeout_remove(&proxy->to_finish);
proxy->finished = TRUE;
- proxy->finish_callback(proxy->input_timeout, proxy->finish_context);
+ proxy->finish_callback(proxy->finish_context);
}
-static void lmtp_proxy_finish(struct lmtp_proxy *proxy)
+static void lmtp_proxy_try_finish(struct lmtp_proxy *proxy)
{
+ if (proxy->finish_callback == NULL) {
+ /* DATA command hasn't been sent yet */
+ return;
+ }
+ if (!lmtp_proxy_send_data_replies(proxy)) {
+ /* we can't received reply from all clients yet */
+ return;
+ }
/* do the actual finishing in a timeout handler, since the finish
callback causes the proxy to be destroyed and the code leading up
to this function can be called from many different places. it's
}
}
-static void lmtp_proxy_try_finish(struct lmtp_proxy *proxy)
-{
- if (proxy->finish_callback == NULL) {
- /* DATA command hasn't been sent yet */
- return;
- }
- if (lmtp_proxy_send_data_replies(proxy) &&
- (proxy->data_input == NULL ||
- proxy->data_input->eof ||
- proxy->data_input->stream_errno != 0 ||
- proxy->input_timeout))
- lmtp_proxy_finish(proxy);
-}
-
static void lmtp_conn_finish(void *context)
{
struct lmtp_proxy_connection *conn = context;
conn->finished = TRUE;
+ if (conn->to != NULL)
+ timeout_remove(&conn->to);
if (conn->data_input != NULL)
i_stream_unref(&conn->data_input);
lmtp_proxy_try_finish(conn->proxy);
}
-static void lmtp_proxy_fail_all(struct lmtp_proxy *proxy, const char *reason)
-{
- struct lmtp_proxy_connection *const *conns;
- unsigned int i, count;
- const char *line;
-
- conns = array_get(&proxy->connections, &count);
- for (i = 0; i < count; i++) {
- line = t_strdup_printf(ERRSTR_TEMP_REMOTE_FAILURE
- " (%s while waiting for reply to %s)", reason,
- lmtp_client_state_to_string(conns[i]->client));
- lmtp_client_fail(conns[i]->client, line);
- }
-
- if (proxy->to_finish == NULL) {
- /* we still have some DATA input to read */
- if (proxy->io == NULL) {
- proxy->io = io_add(i_stream_get_fd(proxy->data_input),
- IO_READ,
- lmtp_proxy_data_input, proxy);
- }
- }
-}
-
-static void lmtp_proxy_data_input_timeout(struct lmtp_proxy *proxy)
-{
- struct lmtp_proxy_connection *const *conns;
- unsigned int i, count;
-
- proxy->input_timeout = TRUE;
- i_stream_close(proxy->orig_data_input);
-
- conns = array_get(&proxy->connections, &count);
- for (i = 0; i < count; i++) {
- lmtp_client_fail(conns[i]->client, ERRSTR_TEMP_REMOTE_FAILURE
- " (timeout in DATA input)");
- }
- if (proxy->to_finish == NULL) {
- /* we had earlier failed all clients already and were just
- waiting for DATA input to finish, but DATA input also failed
- with a timeout. */
- lmtp_proxy_finish(proxy);
- }
-}
-
static void
lmtp_proxy_conn_rcpt_to(bool success, const char *reply, void *context)
{
return 0;
}
-static uoff_t lmtp_proxy_find_lowest_offset(struct lmtp_proxy *proxy)
-{
- struct lmtp_proxy_connection *const *conns;
- uoff_t min_offset = (uoff_t)-1;
-
- array_foreach(&proxy->connections, conns) {
- struct lmtp_proxy_connection *conn = *conns;
-
- if (conn->data_input != NULL &&
- min_offset > conn->data_input->v_offset &&
- i_stream_have_bytes_left(conn->data_input))
- min_offset = conn->data_input->v_offset;
- }
- return min_offset;
-}
-
-static bool lmtp_proxy_disconnect_hanging_output(struct lmtp_proxy *proxy)
-{
- struct lmtp_proxy_connection *const *conns;
- uoff_t min_offset;
- size_t size;
- const char *errstr;
-
- min_offset = lmtp_proxy_find_lowest_offset(proxy);
- if (min_offset == (uoff_t)-1)
- return FALSE;
-
- /* disconnect all connections that are keeping us from reading
- more input. */
- array_foreach(&proxy->connections, conns) {
- struct lmtp_proxy_connection *conn = *conns;
-
- if (conn->data_input != NULL &&
- conn->data_input->v_offset == min_offset) {
- (void)i_stream_get_data(conn->data_input, &size);
- errstr = t_strdup_printf(ERRSTR_TEMP_REMOTE_FAILURE
- " (DATA output stalled for %u secs, "
- "%"PRIuUOFF_T"B sent, %"PRIuSIZE_T"B buffered)",
- proxy->max_timeout_msecs/1000,
- min_offset, size);
- lmtp_client_fail(conn->client, errstr);
- }
- }
- return TRUE;
-}
-
-static void lmtp_proxy_output_timeout(struct lmtp_proxy *proxy)
-{
- timeout_remove(&proxy->to);
-
- /* drop the connection with the most unread data */
- if (lmtp_proxy_disconnect_hanging_output(proxy))
- lmtp_proxy_data_input(proxy);
- else {
- /* no such connection, so we've already sent everything but
- some servers aren't replying to us. disconnect all of
- them. */
- i_assert(proxy->data_input->eof);
- lmtp_proxy_fail_all(proxy, "timeout");
- }
-}
-
-static void lmtp_proxy_wait_for_output(struct lmtp_proxy *proxy)
-{
- if (proxy->io != NULL)
- io_remove(&proxy->io);
- if (proxy->to == NULL) {
- proxy->to = timeout_add(proxy->max_timeout_msecs,
- lmtp_proxy_output_timeout, proxy);
- }
-}
-
-static void proxy_send_more(struct lmtp_proxy *proxy)
-{
- struct lmtp_proxy_connection *const *conns;
-
- array_foreach(&proxy->connections, conns)
- lmtp_client_send_more((*conns)->client);
-}
-
-static bool lmtp_proxy_data_read(struct lmtp_proxy *proxy)
-{
- size_t size;
-
- timeout_reset(proxy->to_data_idle);
-
- switch (i_stream_read(proxy->data_input)) {
- case 0:
- if (!tee_i_stream_child_is_waiting(proxy->data_input)) {
- /* nothing new read */
- if (proxy->io != NULL)
- return FALSE;
- proxy->io = io_add(i_stream_get_fd(proxy->data_input),
- IO_READ,
- lmtp_proxy_data_input, proxy);
- return FALSE;
- }
- /* fall through */
- case -2:
- /* buffer full. someone's stalling. */
- lmtp_proxy_wait_for_output(proxy);
- return FALSE;
- case -1:
- if (proxy->data_input->stream_errno != 0)
- lmtp_proxy_fail_all(proxy, "disconnect");
- else {
- /* make sure LMTP clients see the EOF */
- proxy_send_more(proxy);
- /* finished reading data input. now we'll just have to
- wait for replies. */
- lmtp_proxy_wait_for_output(proxy);
- /* if all RCPT TOs failed, we can finish now */
- lmtp_proxy_try_finish(proxy);
- }
- return FALSE;
- default:
- /* something was read */
- if (proxy->to != NULL)
- timeout_remove(&proxy->to);
- (void)i_stream_get_data(proxy->data_input, &size);
- i_stream_skip(proxy->data_input, size);
- return TRUE;
- }
-}
-
-static void lmtp_proxy_data_input(struct lmtp_proxy *proxy)
+static void lmtp_proxy_more_data_sent(void *context)
{
- i_assert(!proxy->handling_data_input);
+ struct lmtp_proxy_connection *conn = context;
- proxy->handling_data_input = TRUE;
- do {
- proxy_send_more(proxy);
- } while (lmtp_proxy_data_read(proxy));
- proxy->handling_data_input = FALSE;
+ lmtp_client_send_more(conn->client);
}
-static void lmtp_proxy_more_data_sent(void *context)
+static void lmtp_proxy_conn_timeout(struct lmtp_proxy_connection *conn)
{
- struct lmtp_proxy *proxy = context;
+ const char *line;
- if (proxy->to != NULL && !proxy->handling_data_input) {
- /* some tee child is blocking others. it might have been this
- one, so see if we can continue. */
- lmtp_proxy_data_input(proxy);
- }
+ line = t_strdup_printf(ERRSTR_TEMP_REMOTE_FAILURE
+ " (timeout while waiting for reply to %s)",
+ lmtp_client_state_to_string(conn->client));
+ lmtp_client_fail(conn->client, line);
}
void lmtp_proxy_start(struct lmtp_proxy *proxy, struct istream *data_input,
{
struct lmtp_proxy_connection *const *conns;
+ i_assert(data_input->seekable);
+
proxy->finish_callback = callback;
proxy->finish_context = context;
- proxy->orig_data_input = data_input;
- proxy->tee_data_input = tee_i_stream_create(data_input);
- proxy->data_input = tee_i_stream_create_child(proxy->tee_data_input);
- proxy->to_data_idle = timeout_add(LMTP_PROXY_DATA_INPUT_TIMEOUT_MSECS,
- lmtp_proxy_data_input_timeout, proxy);
+ proxy->data_input = data_input;
+ i_stream_ref(proxy->data_input);
array_foreach(&proxy->connections, conns) {
struct lmtp_proxy_connection *conn = *conns;
continue;
}
+ conn->to = timeout_add(proxy->max_timeout_msecs,
+ lmtp_proxy_conn_timeout, conn);
lmtp_client_set_data_output_callback(conn->client,
lmtp_proxy_more_data_sent,
- proxy);
+ conn);
- conn->data_input =
- tee_i_stream_create_child(proxy->tee_data_input);
+ conn->data_input = i_stream_create_limit(data_input, (uoff_t)-1);
lmtp_client_set_data_header(conn->client, header);
lmtp_client_send(conn->client, conn->data_input);
+ lmtp_client_send_more(conn->client);
}
-
- lmtp_proxy_data_input(proxy);
}