struct lmtp_client *client;
struct istream *data_input;
+
+ unsigned int finished:1;
unsigned int failed:1;
};
pool_t pool;
const char *mail_from, *my_hostname;
ARRAY_DEFINE(connections, struct lmtp_proxy_connection *);
- ARRAY_DEFINE(rcpt_to, struct lmtp_proxy_recipient);
+ ARRAY_DEFINE(rcpt_to, struct lmtp_proxy_recipient *);
unsigned int next_data_reply_idx;
- struct timeout *to, *to_data_idle;
+ struct timeout *to, *to_data_idle, *to_finish;
struct io *io;
struct istream *data_input, *orig_data_input;
struct ostream *client_output;
unsigned int finished:1;
unsigned int input_timeout:1;
+ unsigned int handling_data_input:1;
};
static void lmtp_conn_finish(void *context);
array_foreach(&proxy->connections, conns) {
struct lmtp_proxy_connection *conn = *conns;
- lmtp_client_fail(conn->client, "451 4.3.0 Aborting");
lmtp_client_deinit(&conn->client);
}
}
static bool lmtp_proxy_send_data_replies(struct lmtp_proxy *proxy)
{
- const struct lmtp_proxy_recipient *rcpt;
+ struct lmtp_proxy_recipient *const *rcpt;
unsigned int i, count;
o_stream_cork(proxy->client_output);
rcpt = array_get(&proxy->rcpt_to, &count);
for (i = proxy->next_data_reply_idx; i < count; i++) {
- if (!(rcpt[i].rcpt_to_failed || rcpt[i].data_reply_received))
+ if (!(rcpt[i]->rcpt_to_failed || rcpt[i]->data_reply_received))
break;
o_stream_send_str(proxy->client_output,
- t_strconcat(rcpt[i].reply, "\r\n", NULL));
+ t_strconcat(rcpt[i]->reply, "\r\n", NULL));
}
o_stream_uncork(proxy->client_output);
proxy->next_data_reply_idx = i;
return i == count;
}
-static void lmtp_proxy_finish(struct lmtp_proxy *proxy)
+static void lmtp_proxy_finish_timeout(struct lmtp_proxy *proxy)
{
i_assert(!proxy->finished);
+ timeout_remove(&proxy->to_finish);
proxy->finished = TRUE;
proxy->finish_callback(proxy->input_timeout, proxy->finish_context);
}
+static void lmtp_proxy_finish(struct lmtp_proxy *proxy)
+{
+ /* do the actual finishing in a timeout handler, since the finish
+ callback causes the proxy to be destroyed and the code leading up
+ to this function can be called from many different places. it's
+ easier this way rather than having all the callers check if the
+ proxy was already destroyed. */
+ if (proxy->to_finish == NULL) {
+ proxy->to_finish = timeout_add(0, lmtp_proxy_finish_timeout,
+ proxy);
+ }
+}
+
static void lmtp_proxy_try_finish(struct lmtp_proxy *proxy)
{
if (lmtp_proxy_send_data_replies(proxy) &&
{
struct lmtp_proxy_connection *conn = context;
+ conn->finished = TRUE;
if (conn->data_input != NULL)
i_stream_unref(&conn->data_input);
lmtp_proxy_try_finish(conn->proxy);
unsigned int i, count;
const char *line;
- pool_ref(proxy->pool);
conns = array_get(&proxy->connections, &count);
for (i = 0; i < count; i++) {
line = t_strdup_printf(ERRSTR_TEMP_REMOTE_FAILURE
" (%s while waiting for reply to %s)", reason,
lmtp_client_state_to_string(conns[i]->client));
lmtp_client_fail(conns[i]->client, line);
+ }
- if (!array_is_created(&proxy->connections))
- break;
+ if (proxy->to_finish == NULL) {
+ /* we still have some DATA input to read */
+ if (proxy->io == NULL) {
+ proxy->io = io_add(i_stream_get_fd(proxy->data_input),
+ IO_READ,
+ lmtp_proxy_data_input, proxy);
+ }
}
- pool_unref(&proxy->pool);
- /* either the whole proxy is destroyed now, or we still have some
- DATA input to read. */
}
static void lmtp_proxy_data_input_timeout(struct lmtp_proxy *proxy)
proxy->input_timeout = TRUE;
i_stream_close(proxy->orig_data_input);
- pool_ref(proxy->pool);
conns = array_get(&proxy->connections, &count);
for (i = 0; i < count; i++) {
lmtp_client_fail(conns[i]->client, ERRSTR_TEMP_REMOTE_FAILURE
" (timeout in DATA input)");
- if (!array_is_created(&proxy->connections)) {
- pool_unref(&proxy->pool);
- return;
- }
}
- /* last client failure should have caused the proxy to be destroyed */
- i_unreached();
+ if (proxy->to_finish == NULL) {
+ /* we had earlier failed all clients already and were just
+ waiting for DATA input to finish, but DATA input also failed
+ with a timeout. */
+ lmtp_proxy_finish(proxy);
+ }
}
static void
if (conn->failed)
return -1;
- rcpt = array_append_space(&proxy->rcpt_to);
+ rcpt = p_new(proxy->pool, struct lmtp_proxy_recipient, 1);
rcpt->conn = conn;
rcpt->address = p_strdup(proxy->pool, address);
+ array_append(&proxy->rcpt_to, &rcpt, 1);
lmtp_client_add_rcpt(conn->client, address, lmtp_proxy_conn_rcpt_to,
lmtp_proxy_conn_data, rcpt);
return 0;
}
-static size_t lmtp_proxy_find_max_data_input_size(struct lmtp_proxy *proxy)
+static size_t lmtp_proxy_find_lowest_offset(struct lmtp_proxy *proxy)
{
struct lmtp_proxy_connection *const *conns;
- unsigned int i, count;
- size_t size, max_size = 0;
+ uoff_t min_offset = (uoff_t)-1;
- conns = array_get(&proxy->connections, &count);
- for (i = 0; i < count; i++) {
- if (conns[i]->data_input == NULL)
- continue;
- (void)i_stream_get_data(conns[i]->data_input, &size);
- if (max_size < size)
- max_size = size;
+ array_foreach(&proxy->connections, conns) {
+ struct lmtp_proxy_connection *conn = *conns;
+
+ if (conn->data_input != NULL &&
+ min_offset > conn->data_input->v_offset)
+ min_offset = conn->data_input->v_offset;
}
- return max_size;
+ return min_offset;
}
static bool lmtp_proxy_disconnect_hanging_output(struct lmtp_proxy *proxy)
{
struct lmtp_proxy_connection *const *conns;
- unsigned int i, count;
- size_t size, max_size;
+ uoff_t min_offset;
- max_size = lmtp_proxy_find_max_data_input_size(proxy);
- if (max_size == 0)
+ min_offset = lmtp_proxy_find_lowest_offset(proxy);
+ if (min_offset == (uoff_t)-1)
return FALSE;
/* disconnect all connections that are keeping us from reading
more input. */
- conns = array_get(&proxy->connections, &count);
- for (i = 0; i < count; i++) {
- if (conns[i]->data_input == NULL)
- continue;
- (void)i_stream_get_data(conns[i]->data_input, &size);
- if (size == max_size) {
- lmtp_client_fail(conns[i]->client,
+ array_foreach(&proxy->connections, conns) {
+ struct lmtp_proxy_connection *conn = *conns;
+
+ if (conn->data_input != NULL &&
+ conn->data_input->v_offset == min_offset) {
+ lmtp_client_fail(conn->client,
ERRSTR_TEMP_REMOTE_FAILURE
" (DATA output timeout)");
}
/* no such connection, so we've already sent everything but
some servers aren't replying to us. disconnect all of
them. */
+ i_assert(proxy->data_input->eof);
lmtp_proxy_fail_all(proxy, "timeout");
}
}
static void lmtp_proxy_wait_for_output(struct lmtp_proxy *proxy)
{
- i_assert(proxy->to == NULL);
-
if (proxy->io != NULL)
io_remove(&proxy->io);
- proxy->to = timeout_add(proxy->max_timeout_msecs,
- lmtp_proxy_output_timeout, proxy);
+ if (proxy->to == NULL) {
+ proxy->to = timeout_add(proxy->max_timeout_msecs,
+ lmtp_proxy_output_timeout, proxy);
+ }
}
static bool lmtp_proxy_data_read(struct lmtp_proxy *proxy)
return FALSE;
default:
/* something was read */
+ if (proxy->to != NULL)
+ timeout_remove(&proxy->to);
(void)i_stream_get_data(proxy->data_input, &size);
i_stream_skip(proxy->data_input, size);
return TRUE;
{
struct lmtp_proxy_connection *const *conns;
+ i_assert(!proxy->handling_data_input);
+
+ proxy->handling_data_input = TRUE;
do {
- array_foreach(&proxy->connections, conns);
+ array_foreach(&proxy->connections, conns)
lmtp_client_send_more((*conns)->client);
} while (lmtp_proxy_data_read(proxy));
+ proxy->handling_data_input = FALSE;
+}
+
+static void lmtp_proxy_more_data_sent(void *context)
+{
+ struct lmtp_proxy *proxy = context;
+
+ if (proxy->to != NULL && !proxy->handling_data_input) {
+ /* some tee child is blocking others. it might have been this
+ one, so see if we can continue. */
+ lmtp_proxy_data_input(proxy);
+ }
}
void lmtp_proxy_start(struct lmtp_proxy *proxy, struct istream *data_input,
array_foreach(&proxy->connections, conns) {
struct lmtp_proxy_connection *conn = *conns;
+ if (conn->finished) {
+ /* this connection had already failed */
+ continue;
+ }
+
+ lmtp_client_set_data_output_callback(conn->client,
+ lmtp_proxy_more_data_sent,
+ proxy);
+
conn->data_input =
tee_i_stream_create_child(proxy->tee_data_input);
lmtp_client_set_data_header(conn->client, header);