From: Timo Sirainen Date: Mon, 31 Aug 2009 15:38:42 +0000 (-0400) Subject: lmtp: Added initial support for proxying mails to other LMTP/SMTP servers. X-Git-Tag: 2.0.alpha1~220 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=c2ebc8f28b5504f280cd5d4adfe57ed70f9a7d83;p=thirdparty%2Fdovecot%2Fcore.git lmtp: Added initial support for proxying mails to other LMTP/SMTP servers. --HG-- branch : HEAD --- diff --git a/src/lmtp/Makefile.am b/src/lmtp/Makefile.am index 0081f6fc21..d8628b32b4 100644 --- a/src/lmtp/Makefile.am +++ b/src/lmtp/Makefile.am @@ -5,6 +5,7 @@ pkglibexec_PROGRAMS = lmtp AM_CPPFLAGS = \ -I$(top_srcdir)/src/lib \ -I$(top_srcdir)/src/lib-settings \ + -I$(top_srcdir)/src/lib-auth \ -I$(top_srcdir)/src/lib-mail \ -I$(top_srcdir)/src/lib-imap \ -I$(top_srcdir)/src/lib-index \ @@ -28,9 +29,11 @@ lmtp_DEPENDENCIES = $(libs) lmtp_SOURCES = \ main.c \ client.c \ - commands.c + commands.c \ + lmtp-proxy.c noinst_HEADERS = \ main.h \ client.h \ - commands.h + commands.h \ + lmtp-proxy.h diff --git a/src/lmtp/client.c b/src/lmtp/client.c index 6ee08a64d0..70b96addf2 100644 --- a/src/lmtp/client.c +++ b/src/lmtp/client.c @@ -12,6 +12,7 @@ #include "mail-namespace.h" #include "mail-storage.h" #include "main.h" +#include "lmtp-proxy.h" #include "commands.h" #include "client.h" @@ -161,6 +162,7 @@ struct client *client_create(int fd_in, int fd_out) client->my_domain = my_hostname; client->state_pool = pool_alloconly_create("client state", 4096); client->state.mail_data_fd = -1; + client->try_proxying = TRUE; // FIXME: setting! DLLIST_PREPEND(&clients, client); clients_count++; @@ -180,6 +182,8 @@ void client_destroy(struct client *client, const char *prefix, DLLIST_REMOVE(&clients, client); mail_user_unref(&client->raw_mail_user); + if (client->proxy != NULL) + lmtp_proxy_deinit(&client->proxy); if (client->io != NULL) io_remove(&client->io); timeout_remove(&client->to_idle); diff --git a/src/lmtp/client.h b/src/lmtp/client.h index a60681d863..0722cfb863 100644 --- a/src/lmtp/client.h +++ b/src/lmtp/client.h @@ -34,6 +34,7 @@ struct client_state { struct client { struct client *prev, *next; + const struct lda_settings *set; int fd_in, fd_out; struct io *io; struct istream *input; @@ -51,8 +52,12 @@ struct client { pool_t state_pool; struct client_state state; struct istream *dot_input; + struct lmtp_proxy *proxy; unsigned int disconnected:1; + unsigned int try_proxying:1; + unsigned int mail_body_7bit:1; + unsigned int mail_body_8bitmime:1; }; extern unsigned int clients_count; diff --git a/src/lmtp/commands.c b/src/lmtp/commands.c index a982cd126e..136a7a41e2 100644 --- a/src/lmtp/commands.c +++ b/src/lmtp/commands.c @@ -8,6 +8,8 @@ #include "ostream.h" #include "istream-dot.h" #include "safe-mkstemp.h" +#include "master-service.h" +#include "auth-master.h" #include "mail-storage-service.h" #include "index/raw/raw-storage.h" #include "lda-settings.h" @@ -15,8 +17,12 @@ #include "main.h" #include "client.h" #include "commands.h" +#include "lmtp-proxy.h" -#define ERRSTR_MAILBOX_TEMP_FAIL "451 4.2.0 <%s> Temporary internal error" +#include + +#define ERRSTR_TEMP_MAILBOX_FAIL "451 4.3.0 <%s> Temporary internal error" +#define ERRSTR_TEMP_USERDB_FAIL "451 4.3.0 <%s> Temporary user lookup failure" int cmd_lhlo(struct client *client, const char *args ATTR_UNUSED) { @@ -52,10 +58,11 @@ int cmd_mail(struct client *client, const char *args) } for (; *argv != NULL; argv++) { - if (strcasecmp(*argv, "BODY=7BIT") == 0 || - strcasecmp(*argv, "BODY=8BITMIME") == 0) { - /* just skip these */ - } else { + if (strcasecmp(*argv, "BODY=7BIT") == 0) + client->mail_body_7bit = TRUE; + else if (strcasecmp(*argv, "BODY=8BITMIME") == 0) + client->mail_body_8bitmime = TRUE; + else { client_send_line(client, "501 5.5.4 Unsupported options"); return 0; @@ -82,13 +89,133 @@ static bool rcpt_is_duplicate(struct client *client, const char *name) return FALSE; } +static bool +client_proxy_rcpt_parse_fields(struct lmtp_proxy_settings *set, + const char *const *args, const char **address) +{ + const char *p, *key, *value; + bool proxying = FALSE; + + for (; *args != NULL; args++) { + p = strchr(*args, '='); + if (p == NULL) { + key = *args; + value = ""; + } else { + key = t_strdup_until(*args, p); + value = p + 1; + } + + if (strcmp(key, "proxy") == 0) + proxying = TRUE; + else if (strcmp(key, "host") == 0) + set->host = value; + else if (strcmp(key, "port") == 0) + set->port = atoi(value); + else if (strcmp(key, "user") == 0) { + /* changing the username */ + *address = value; + } else { + /* just ignore it */ + } + } + if (proxying && set->host == NULL) { + i_error("proxy: host not given"); + return FALSE; + } + return proxying; +} + +static bool +client_proxy_is_ourself(const struct client *client, + const struct lmtp_proxy_settings *set) +{ + struct ip_addr ip; + + if (set->port != client->local_port) + return FALSE; + + if (net_addr2ip(set->host, &ip) < 0) + return FALSE; + if (!net_ip_compare(&ip, &client->local_ip)) + return FALSE; + return TRUE; +} + +static bool client_proxy_rcpt(struct client *client, const char *address) +{ + struct auth_master_connection *auth_conn; + struct lmtp_proxy_settings set; + struct auth_user_info info; + const char *args, *const *fields, *orig_address = address; + pool_t pool; + int ret; + + memset(&info, 0, sizeof(info)); + info.service = master_service_get_name(master_service); + info.local_ip = client->local_ip; + info.remote_ip = client->remote_ip; + info.local_port = client->local_port; + info.remote_port = client->remote_port; + + pool = pool_alloconly_create("auth lookup", 1024); + auth_conn = mail_storage_service_multi_get_auth_conn(multi_service); + ret = auth_master_pass_lookup(auth_conn, address, &info, + pool, &fields); + if (ret <= 0) { + pool_unref(&pool); + if (ret < 0) { + client_send_line(client, ERRSTR_TEMP_USERDB_FAIL, + address); + return TRUE; + } else { + /* user not found from passdb. try userdb also. */ + return FALSE; + } + } + + memset(&set, 0, sizeof(set)); + set.port = client->local_port; + if (!client_proxy_rcpt_parse_fields(&set, fields, &address)) { + /* not proxying this user */ + pool_unref(&pool); + return FALSE; + } + if (strcmp(address, orig_address) == 0 && + client_proxy_is_ourself(client, &set)) { + i_error("Proxying to <%s> loops to itself", address); + client_send_line(client, "554 5.4.6 Proxying loops to itself"); + pool_unref(&pool); + return FALSE; + } + + if (client->proxy == NULL) { + client->proxy = lmtp_proxy_init(client->set->hostname, + client->output); + if (client->mail_body_8bitmime) + args = " BODY=8BITMIME"; + else if (client->mail_body_7bit) + args = " BODY=7BIT"; + else + args = ""; + lmtp_proxy_mail_from(client->proxy, t_strdup_printf( + "<%s>%s", client->state.mail_from, args)); + } + if (lmtp_proxy_add_rcpt(client->proxy, address, &set) < 0) + client_send_line(client, ERRSTR_TEMP_REMOTE_FAILURE); + else + client_send_line(client, "250 2.1.5 OK"); + pool_unref(&pool); + return TRUE; +} + int cmd_rcpt(struct client *client, const char *args) { struct mail_recipient rcpt; struct mail_storage_service_input input; - const char *name, *error, *addr, *const *argv; + const char *name, *error = NULL, *addr, *const *argv; unsigned int len; - int ret; + int ret = 0; if (client->state.mail_from == NULL) { client_send_line(client, "503 5.5.1 MAIL needed first"); @@ -121,6 +248,11 @@ int cmd_rcpt(struct client *client, const char *args) return 0; } + if (client->try_proxying) { + if (client_proxy_rcpt(client, name)) + return 0; + } + memset(&input, 0, sizeof(input)); input.username = name; input.local_ip = client->local_ip; @@ -129,10 +261,10 @@ int cmd_rcpt(struct client *client, const char *args) ret = mail_storage_service_multi_lookup(multi_service, &input, client->state_pool, &rcpt.multi_user, &error); + if (ret < 0) { i_error("User lookup failed: %s", error); - client_send_line(client, - "451 4.3.0 Temporary user lookup failure"); + client_send_line(client, ERRSTR_TEMP_USERDB_FAIL, name); return 0; } if (ret == 0) { @@ -189,7 +321,7 @@ client_deliver(struct client *client, const struct mail_recipient *rcpt, &client->state.dest_user, &error) < 0) { i_error("%s", error); - client_send_line(client, ERRSTR_MAILBOX_TEMP_FAIL, rcpt->name); + client_send_line(client, ERRSTR_TEMP_MAILBOX_FAIL, rcpt->name); return -1; } sets = mail_storage_service_multi_user_get_set(rcpt->multi_user); @@ -215,7 +347,7 @@ client_deliver(struct client *client, const struct mail_recipient *rcpt, } else if (storage == NULL) { /* This shouldn't happen */ i_error("BUG: Saving failed to unknown storage"); - client_send_line(client, ERRSTR_MAILBOX_TEMP_FAIL, + client_send_line(client, ERRSTR_TEMP_MAILBOX_FAIL, rcpt->name); ret = -1; } else { @@ -264,23 +396,14 @@ static void client_rcpt_fail_all(struct client *client) rcpts = array_get(&client->state.rcpt_to, &count); for (i = 0; i < count; i++) { - client_send_line(client, ERRSTR_MAILBOX_TEMP_FAIL, + client_send_line(client, ERRSTR_TEMP_MAILBOX_FAIL, rcpts[i].name); } } -static int client_open_raw_mail(struct client *client) +static struct istream *client_get_input(struct client *client) { - static const char *wanted_headers[] = { - "From", "To", "Message-ID", "Subject", "Return-Path", - NULL - }; - struct mailbox_list *raw_list = client->raw_mail_user->namespaces->list; - struct mailbox *box; - struct raw_mailbox *raw_box; - struct mailbox_header_lookup_ctx *headers_ctx; struct istream *input; - enum mail_error error; if (client->state.mail_data_output != NULL) { o_stream_unref(&client->state.mail_data_output); @@ -291,10 +414,24 @@ static int client_open_raw_mail(struct client *client) input = i_stream_create_from_data(client->state.mail_data->data, client->state.mail_data->used); } + return input; +} + +static int client_open_raw_mail(struct client *client, struct istream *input) +{ + static const char *wanted_headers[] = { + "From", "To", "Message-ID", "Subject", "Return-Path", + NULL + }; + struct mailbox_list *raw_list = client->raw_mail_user->namespaces->list; + struct mailbox *box; + struct raw_mailbox *raw_box; + struct mailbox_header_lookup_ctx *headers_ctx; + enum mail_error error; + client->state.raw_box = box = mailbox_alloc(raw_list, "Dovecot Delivery Mail", input, MAILBOX_FLAG_NO_INDEX_FILES); - i_stream_unref(&input); if (mailbox_open(box) < 0 || mailbox_sync(box, 0, 0, NULL) < 0) { i_error("Can't open delivery mail as raw: %s", @@ -316,15 +453,12 @@ static int client_open_raw_mail(struct client *client) return 0; } -static void client_input_data_finish(struct client *client) +static void +client_input_data_write_local(struct client *client, struct istream *input) { struct mail *src_mail; - i_stream_destroy(&client->dot_input); - io_remove(&client->io); - client->io = io_add(client->fd_in, IO_READ, client_input, client); - - if (client_open_raw_mail(client) < 0) + if (client_open_raw_mail(client, input) < 0) return; /* save the message to the first recipient's mailbox */ @@ -355,6 +489,43 @@ static void client_input_data_finish(struct client *client) } } +static void client_input_data_finish(struct client *client) +{ + if (client->io != NULL) + io_remove(&client->io); + client->io = io_add(client->fd_in, IO_READ, client_input, client); + + client_state_reset(client); + if (i_stream_have_bytes_left(client->input)) + client_input_handle(client); +} + +static void client_proxy_finish(void *context) +{ + struct client *client = context; + + lmtp_proxy_deinit(&client->proxy); + client_input_data_finish(client); +} + +static bool client_input_data_write(struct client *client) +{ + struct istream *input; + bool ret = TRUE; + + i_stream_destroy(&client->dot_input); + + input = client_get_input(client); + client_input_data_write_local(client, input); + if (client->proxy != NULL) { + lmtp_proxy_start(client->proxy, input, + client_proxy_finish, client); + ret = FALSE; + } + i_stream_unref(&input); + return ret; +} + static int client_input_add_file(struct client *client, const unsigned char *data, size_t size) { @@ -424,10 +595,8 @@ static void client_input_data_handle(struct client *client) if (ret == 0) return; - client_input_data_finish(client); - client_state_reset(client); - if (i_stream_have_bytes_left(client->input)) - client_input_handle(client); + if (client_input_data_write(client)) + client_input_data_finish(client); } static void client_input_data(struct client *client) @@ -444,7 +613,7 @@ int cmd_data(struct client *client, const char *args ATTR_UNUSED) client_send_line(client, "503 5.5.1 MAIL needed first"); return 0; } - if (array_count(&client->state.rcpt_to) == 0) { + if (array_count(&client->state.rcpt_to) == 0 && client->proxy == NULL) { client_send_line(client, "554 5.5.1 No valid recipients"); return 0; } @@ -454,10 +623,17 @@ int cmd_data(struct client *client, const char *args ATTR_UNUSED) i_assert(client->dot_input == NULL); client->dot_input = i_stream_create_dot(client->input, TRUE); - io_remove(&client->io); - client->io = io_add(client->fd_in, IO_READ, client_input_data, client); client_send_line(client, "354 OK"); - client_input_data_handle(client); + io_remove(&client->io); + if (array_count(&client->state.rcpt_to) == 0) { + lmtp_proxy_start(client->proxy, client->dot_input, + client_proxy_finish, client); + i_stream_unref(&client->dot_input); + } else { + client->io = io_add(client->fd_in, IO_READ, + client_input_data, client); + client_input_data_handle(client); + } return -1; } diff --git a/src/lmtp/lmtp-proxy.c b/src/lmtp/lmtp-proxy.c new file mode 100644 index 0000000000..8af3dba88a --- /dev/null +++ b/src/lmtp/lmtp-proxy.c @@ -0,0 +1,370 @@ +/* Copyright (c) 2009 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "array.h" +#include "ioloop.h" +#include "istream.h" +#include "istream-tee.h" +#include "ostream.h" +#include "lmtp-client.h" +#include "lmtp-proxy.h" + +#define LMTP_MAX_LINE_LEN 1024 +#define LMTP_PROXY_OUTPUT_TIMEOUT_MSECS 1000 + +struct lmtp_proxy_recipient { + struct lmtp_proxy_connection *conn; + const char *address; + const char *reply; + + unsigned int rcpt_to_failed:1; + unsigned int data_reply_received:1; +}; + +struct lmtp_proxy_connection { + struct lmtp_proxy *proxy; + struct lmtp_proxy_settings set; + + /* points to proxy->rcpt_to array. */ + unsigned int rcpt_next_reply_low_idx; + unsigned int data_next_reply_low_idx; + + struct lmtp_client *client; + struct istream *data_input; + unsigned int failed:1; +}; + +struct lmtp_proxy { + pool_t pool; + const char *mail_from, *my_hostname; + ARRAY_DEFINE(connections, struct lmtp_proxy_connection *); + ARRAY_DEFINE(rcpt_to, struct lmtp_proxy_recipient); + unsigned int rcpt_next_reply_idx; + + struct timeout *to; + struct io *io; + struct istream *data_input; + struct ostream *client_output; + struct tee_istream *tee_data_input; + + void (*finish_callback)(void *); + void *finish_context; + + unsigned int finished:1; +}; + +static void lmtp_proxy_conn_deinit(struct lmtp_proxy_connection *conn); +static void lmtp_proxy_data_input(struct lmtp_proxy *proxy); + +struct lmtp_proxy * +lmtp_proxy_init(const char *my_hostname, struct ostream *client_output) +{ + struct lmtp_proxy *proxy; + pool_t pool; + + o_stream_ref(client_output); + + pool = pool_alloconly_create("lmtp proxy", 1024); + proxy = p_new(pool, struct lmtp_proxy, 1); + proxy->pool = pool; + proxy->my_hostname = p_strdup(pool, my_hostname); + proxy->client_output = client_output; + i_array_init(&proxy->rcpt_to, 32); + i_array_init(&proxy->connections, 32); + return proxy; +} + +static void lmtp_proxy_connections_deinit(struct lmtp_proxy *proxy) +{ + struct lmtp_proxy_connection *const *conns; + unsigned int i, count; + + conns = array_get(&proxy->connections, &count); + for (i = 0; i < count; i++) + lmtp_proxy_conn_deinit(conns[i]); +} + +void lmtp_proxy_deinit(struct lmtp_proxy **_proxy) +{ + struct lmtp_proxy *proxy = *_proxy; + + *_proxy = NULL; + + lmtp_proxy_connections_deinit(proxy); + if (proxy->data_input != NULL) + i_stream_unref(&proxy->data_input); + if (proxy->client_output != NULL) + o_stream_unref(&proxy->client_output); + if (proxy->to != NULL) + timeout_remove(&proxy->to); + if (proxy->io != NULL) + io_remove(&proxy->io); + array_free(&proxy->rcpt_to); + array_free(&proxy->connections); + pool_unref(&proxy->pool); +} + +void lmtp_proxy_mail_from(struct lmtp_proxy *proxy, const char *value) +{ + proxy->mail_from = p_strdup(proxy->pool, value); +} + +static struct lmtp_proxy_connection * +lmtp_proxy_get_connection(struct lmtp_proxy *proxy, + const struct lmtp_proxy_settings *set) +{ + struct lmtp_proxy_connection *const *conns, *conn; + unsigned int i, count; + + conns = array_get(&proxy->connections, &count); + for (i = 0; i < count; i++) { + if (conns[i]->set.port == set->port && + strcmp(conns[i]->set.host, set->host) == 0) + return conns[i]; + } + + conn = p_new(proxy->pool, struct lmtp_proxy_connection, 1); + conn->proxy = proxy; + conn->set.host = p_strdup(proxy->pool, set->host); + conn->set.port = set->port; + conn->set.timeout_msecs = set->timeout_msecs; + array_append(&proxy->connections, &conn, 1); + conn->client = lmtp_client_init(proxy->mail_from, proxy->my_hostname); + if (lmtp_client_connect_tcp(conn->client, LMTP_CLIENT_PROTOCOL_LMTP, + conn->set.host, conn->set.port) < 0) + conn->failed = TRUE; + return conn; +} + +static void lmtp_proxy_conn_deinit(struct lmtp_proxy_connection *conn) +{ + struct lmtp_proxy_recipient *rcpt; + unsigned int i, count; + + /* set failure replies to all recipients in this connection */ + rcpt = array_get_modifiable(&conn->proxy->rcpt_to, &count); + for (i = 0; i < count; i++) { + if (rcpt[i].conn == conn && !rcpt[i].rcpt_to_failed) + rcpt[i].reply = ERRSTR_TEMP_REMOTE_FAILURE; + } + + if (conn->client != NULL) + lmtp_client_deinit(&conn->client); + if (conn->data_input != NULL) + i_stream_unref(&conn->data_input); + conn->failed = TRUE; +} + +static bool lmtp_proxy_send_replies(struct lmtp_proxy *proxy) +{ + const struct lmtp_proxy_recipient *rcpt; + unsigned int i, count; + + o_stream_cork(proxy->client_output); + rcpt = array_get(&proxy->rcpt_to, &count); + for (i = proxy->rcpt_next_reply_idx; i < count; i++) { + if (!(rcpt[i].rcpt_to_failed || rcpt[i].data_reply_received)) + break; + o_stream_send_str(proxy->client_output, + t_strconcat(rcpt[i].reply, "\r\n", NULL)); + } + o_stream_uncork(proxy->client_output); + proxy->rcpt_next_reply_idx = i; + + return i == count; +} + +static void lmtp_proxy_finish(struct lmtp_proxy *proxy) +{ + struct lmtp_proxy_recipient *rcpt; + unsigned int i, count; + bool ret; + + i_assert(!proxy->finished); + + /* if we haven't sent something yet, they're failures */ + rcpt = array_get_modifiable(&proxy->rcpt_to, &count); + for (i = proxy->rcpt_next_reply_idx; i < count; i++) { + if (!rcpt[i].rcpt_to_failed) { + i_assert(!rcpt[i].data_reply_received); + rcpt[i].reply = ERRSTR_TEMP_REMOTE_FAILURE; + rcpt[i].data_reply_received = TRUE; + } + } + + ret = lmtp_proxy_send_replies(proxy); + i_assert(ret); + + proxy->finished = TRUE; + proxy->finish_callback(proxy->finish_context); +} + +static void lmtp_proxy_try_finish(struct lmtp_proxy *proxy) +{ + if (lmtp_proxy_send_replies(proxy)) + lmtp_proxy_finish(proxy); +} + +static void +lmtp_proxy_conn_rcpt_to(bool success, const char *reply, void *context) +{ + struct lmtp_proxy_connection *conn = context; + struct lmtp_proxy_recipient *rcpt; + unsigned int i, count; + + rcpt = array_get_modifiable(&conn->proxy->rcpt_to, &count); + for (i = conn->rcpt_next_reply_low_idx; i < count; i++) { + if (rcpt[i].conn == conn) { + i_assert(rcpt[i].reply == NULL); + rcpt[i].reply = p_strdup(conn->proxy->pool, reply); + rcpt[i].rcpt_to_failed = !success; + conn->rcpt_next_reply_low_idx = i + 1; + break; + } + } + i_assert(i != count); + + /* send replies only if we've already sent DATA. */ + if (conn->proxy->data_input != NULL) + lmtp_proxy_try_finish(conn->proxy); +} + +static void +lmtp_proxy_conn_data(bool success ATTR_UNUSED, const char *reply, void *context) +{ + struct lmtp_proxy_connection *conn = context; + struct lmtp_proxy_recipient *rcpt; + unsigned int i, count; + + i_assert(conn->proxy->data_input != NULL); + + if (reply == NULL) + reply = ERRSTR_TEMP_REMOTE_FAILURE; + + rcpt = array_get_modifiable(&conn->proxy->rcpt_to, &count); + for (i = conn->data_next_reply_low_idx; i < count; i++) { + if (rcpt[i].conn == conn && !rcpt[i].rcpt_to_failed) { + i_assert(rcpt[i].reply != NULL); + rcpt[i].reply = p_strdup(conn->proxy->pool, reply); + rcpt[i].data_reply_received = TRUE; + conn->data_next_reply_low_idx = i + 1; + break; + } + } + i_assert(i != count); + lmtp_proxy_try_finish(conn->proxy); +} + +int lmtp_proxy_add_rcpt(struct lmtp_proxy *proxy, const char *address, + const struct lmtp_proxy_settings *set) +{ + struct lmtp_proxy_connection *conn; + struct lmtp_proxy_recipient *rcpt; + + conn = lmtp_proxy_get_connection(proxy, set); + if (conn->failed) + return -1; + + rcpt = array_append_space(&proxy->rcpt_to); + rcpt->conn = conn; + rcpt->address = p_strdup(proxy->pool, address); + + lmtp_client_add_rcpt(conn->client, address, lmtp_proxy_conn_rcpt_to, + lmtp_proxy_conn_data, conn); + return 0; +} + +static void lmtp_proxy_output_timeout(struct lmtp_proxy *proxy) +{ + struct lmtp_proxy_connection *const *conns, *max_conn = NULL; + unsigned int i, count; + size_t size, max_size = 0; + + timeout_remove(&proxy->to); + + /* drop the connection with the most unread data */ + conns = array_get(&proxy->connections, &count); + for (i = 0; i < count; i++) { + (void)i_stream_get_data(conns[i]->data_input, &size); + if (max_size < size) { + max_size = size; + max_conn = conns[i]; + } + } + i_assert(max_conn != NULL); + + lmtp_proxy_conn_deinit(max_conn); +} + +static void lmtp_proxy_wait_for_output(struct lmtp_proxy *proxy) +{ + i_assert(proxy->to == NULL); + + if (proxy->io != NULL) + io_remove(&proxy->io); + if (array_count(&proxy->connections) > 1) { + proxy->to = timeout_add(LMTP_PROXY_OUTPUT_TIMEOUT_MSECS, + lmtp_proxy_output_timeout, proxy); + } +} + +static bool lmtp_proxy_read_data(struct lmtp_proxy *proxy) +{ + size_t size; + + switch (i_stream_read(proxy->data_input)) { + case -2: + /* buffer full. someone's stalling. */ + lmtp_proxy_wait_for_output(proxy); + return FALSE; + case -1: + /* disconnected */ + lmtp_proxy_finish(proxy); + return FALSE; + case 0: + /* nothing new read */ + if (proxy->io == NULL) { + proxy->io = io_add(i_stream_get_fd(proxy->data_input), + IO_READ, + lmtp_proxy_data_input, proxy); + } + return FALSE; + default: + /* something was read */ + (void)i_stream_get_data(proxy->data_input, &size); + i_stream_skip(proxy->data_input, size); + return TRUE; + } +} + +static void lmtp_proxy_data_input(struct lmtp_proxy *proxy) +{ + struct lmtp_proxy_connection *const *conns; + unsigned int i, count; + + do { + conns = array_get(&proxy->connections, &count); + for (i = 0; i < count; i++) + lmtp_client_send_more(conns[i]->client); + } while (lmtp_proxy_read_data(proxy)); +} + +void lmtp_proxy_start(struct lmtp_proxy *proxy, struct istream *data_input, + void (*finish_callback)(void *), void *context) +{ + struct lmtp_proxy_connection *const *conns; + unsigned int i, count; + + proxy->finish_callback = finish_callback; + proxy->finish_context = context; + proxy->tee_data_input = tee_i_stream_create(data_input); + proxy->data_input = tee_i_stream_create_child(proxy->tee_data_input); + + conns = array_get(&proxy->connections, &count); + for (i = 0; i < count; i++) { + conns[i]->data_input = + tee_i_stream_create_child(proxy->tee_data_input); + } + + lmtp_proxy_data_input(proxy); +} diff --git a/src/lmtp/lmtp-proxy.h b/src/lmtp/lmtp-proxy.h new file mode 100644 index 0000000000..200827bca7 --- /dev/null +++ b/src/lmtp/lmtp-proxy.h @@ -0,0 +1,28 @@ +#ifndef LMTP_PROXY_H +#define LMTP_PROXY_H + +#include "network.h" + +#define ERRSTR_TEMP_REMOTE_FAILURE "451 4.4.0 Remote server not answering" + +struct lmtp_proxy_settings { + const char *host; + unsigned int port; + unsigned int timeout_msecs; +}; + +struct lmtp_proxy * +lmtp_proxy_init(const char *my_hostname, struct ostream *client_output); +void lmtp_proxy_deinit(struct lmtp_proxy **proxy); + +/* Set the "MAIL FROM:" line, including <> and options */ +void lmtp_proxy_mail_from(struct lmtp_proxy *proxy, const char *value); +/* Add a new recipient. Returns -1 if we already know that the destination + host can't be reached. */ +int lmtp_proxy_add_rcpt(struct lmtp_proxy *proxy, const char *address, + const struct lmtp_proxy_settings *set); +/* Start proxying */ +void lmtp_proxy_start(struct lmtp_proxy *proxy, struct istream *data_input, + void (*finish_callback)(void *), void *context); + +#endif diff --git a/src/lmtp/main.c b/src/lmtp/main.c index 4ae05971e4..bf46902cf7 100644 --- a/src/lmtp/main.c +++ b/src/lmtp/main.c @@ -30,6 +30,7 @@ static void client_connected(const struct master_service_connection *conn) client = client_create(conn->fd, conn->fd); client->remote_ip = conn->remote_ip; client->remote_port = conn->remote_port; + client->set = mail_storage_service_get_settings(master_service); (void)net_getsockname(conn->fd, &client->local_ip, &client->local_port); }