that have only minor differences in dictionary open/access
options.
+20070105
+
+ Performance: pipeline of pending delivery agent connections,
+ to improve Linux/Solaris mail delivery performance by another
+ 10% while going down-hill with the wind from behind. Design
+ and implementation Victor and Wietse. Files: *qmgr/qmgr.c,
+ *qmgr/qmgr.h, *qmgr/qmgr_transport.c.
+
+20070106
+
+ Cleanup: eliminate the Linux/Solaris "wait for accept()"
+ stage from the queue manager to delivery agent protocol.
+ This alone achieves 99.99% of the Linux/Solaris speed up
+ from the preceding change. The pending connection pipeline
+ takes care of the rest. Tested on Linux kernels dating
+ back to 2.0.27 (that's more than 10 years ago). Files:
+ *qmgr/qmgr_transport.c.
+
Wish list:
Update BACKSCATTER_README to use PCRE because that's what I
Warning: you still need to edit myorigin/mydestination/mynetworks
parameter settings in $config_directory/main.cf.
- See also http://www.postfix.org/faq.html for information about
- dialup sites or about sites inside a firewalled network.
+ See also http://www.postfix.org/STANDARD_CONFIGURATION_README.html
+ for information about dialup sites or about sites inside a
+ firewalled network.
BTW: Check your $ALIASES file and be sure to set up aliases
that send mail for root and postmaster to a real person, then
* Patches change both the patchlevel and the release date. Snapshots have no
* patchlevel; they change the release date only.
*/
-#define MAIL_RELEASE_DATE "20070104"
+#define MAIL_RELEASE_DATE "20070107"
#define MAIL_VERSION_NUMBER "2.4"
#ifdef SNAPSHOT
bool var_verp_bounce_off;
int var_qmgr_clog_warn_time;
-static QMGR_SCAN *qmgr_incoming;
-static QMGR_SCAN *qmgr_deferred;
+static QMGR_SCAN *qmgr_scans[2];
+
+#define QMGR_SCAN_IDX_INCOMING 0
+#define QMGR_SCAN_IDX_DEFERRED 1
+#define QMGR_SCAN_IDX_COUNT (sizeof(qmgr_scans) / sizeof(qmgr_scans[0]))
/* qmgr_deferred_run_event - queue manager heartbeat */
* This routine runs when it is time for another deferred queue scan.
* Make sure this routine gets called again in the future.
*/
- qmgr_scan_request(qmgr_deferred, QMGR_SCAN_START);
+ qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_DEFERRED], QMGR_SCAN_START);
event_request_timer(qmgr_deferred_run_event, dummy, var_queue_run_delay);
}
* requested, the request takes effect immediately.
*/
if (incoming_flag != 0)
- qmgr_scan_request(qmgr_incoming, incoming_flag);
+ qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_INCOMING], incoming_flag);
if (deferred_flag != 0)
- qmgr_scan_request(qmgr_deferred, deferred_flag);
+ qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_DEFERRED], deferred_flag);
}
/* qmgr_loop - queue manager main loop */
static int qmgr_loop(char *unused_name, char **unused_argv)
{
- char *in_path = 0;
- char *df_path = 0;
+ char *path;
int token_count;
- int in_feed = 0;
+ int feed = 0;
+ int scan_idx; /* Priority order scan index */
+ static int first_scan_idx = QMGR_SCAN_IDX_INCOMING;
+ int last_scan_idx = QMGR_SCAN_IDX_COUNT - 1;
+ int delay;
/*
* This routine runs as part of the event handling loop, after the event
/*
* Let some new blood into the active queue when the queue size is
* smaller than some configurable limit, and when the number of in-core
- * recipients does not exceed some configurable limit. When the system is
- * under heavy load, favor new mail over old mail.
+ * recipients does not exceed some configurable limit.
+ *
+ * We import one message per interrupt, to optimally tune the input count
+ * for the number of delivery agent protocol wait states, as explained in
+ * qmgr_transport.c.
+ */
+ delay = WAIT_FOR_EVENT;
+ for (scan_idx = 0; qmgr_message_count < var_qmgr_active_limit
+ && qmgr_recipient_count < var_qmgr_rcpt_limit
+ && scan_idx < QMGR_SCAN_IDX_COUNT; ++scan_idx) {
+ last_scan_idx = (scan_idx + first_scan_idx) % QMGR_SCAN_IDX_COUNT;
+ if ((path = qmgr_scan_next(qmgr_scans[last_scan_idx])) != 0) {
+ delay = DONT_WAIT;
+ if ((feed = qmgr_active_feed(qmgr_scans[last_scan_idx], path)) != 0)
+ break;
+ }
+ }
+
+ /*
+ * Round-robin the queue scans. When the active queue becomes full,
+ * prefer new mail over deferred mail.
*/
if (qmgr_message_count < var_qmgr_active_limit
- && qmgr_recipient_count < var_qmgr_rcpt_limit)
- if ((in_path = qmgr_scan_next(qmgr_incoming)) != 0)
- in_feed = qmgr_active_feed(qmgr_incoming, in_path);
- if (qmgr_message_count < var_qmgr_active_limit
- && qmgr_recipient_count < var_qmgr_rcpt_limit)
- if ((df_path = qmgr_scan_next(qmgr_deferred)) != 0)
- qmgr_active_feed(qmgr_deferred, df_path);
+ && qmgr_recipient_count < var_qmgr_rcpt_limit) {
+ first_scan_idx = (last_scan_idx + 1) % QMGR_SCAN_IDX_COUNT;
+ } else if (first_scan_idx != QMGR_SCAN_IDX_INCOMING) {
+ first_scan_idx = QMGR_SCAN_IDX_INCOMING;
+ }
/*
* Global flow control. If enabled, slow down receiving processes that
if (var_in_flow_delay > 0) {
token_count = mail_flow_count();
if (token_count < var_proc_limit) {
- if (in_feed != 0)
+ if (feed != 0 && last_scan_idx == QMGR_SCAN_IDX_INCOMING)
mail_flow_put(1);
- else if (qmgr_incoming->handle == 0)
+ else if (qmgr_scans[QMGR_SCAN_IDX_INCOMING]->handle == 0)
mail_flow_put(var_proc_limit - token_count);
} else if (token_count > var_proc_limit) {
mail_flow_get(token_count - var_proc_limit);
}
}
- if (in_path || df_path)
- return (DONT_WAIT);
- return (WAIT_FOR_EVENT);
+ return (delay);
}
/* pre_accept - see if tables have changed */
var_use_limit = 0;
var_idle_limit = 0;
qmgr_move(MAIL_QUEUE_ACTIVE, MAIL_QUEUE_INCOMING, event_time());
- qmgr_incoming = qmgr_scan_create(MAIL_QUEUE_INCOMING);
- qmgr_deferred = qmgr_scan_create(MAIL_QUEUE_DEFERRED);
- qmgr_scan_request(qmgr_incoming, QMGR_SCAN_START);
+ qmgr_scans[QMGR_SCAN_IDX_INCOMING] = qmgr_scan_create(MAIL_QUEUE_INCOMING);
+ qmgr_scans[QMGR_SCAN_IDX_DEFERRED] = qmgr_scan_create(MAIL_QUEUE_DEFERRED);
+ qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_INCOMING], QMGR_SCAN_START);
qmgr_deferred_run_event(0, (char *) 0);
}
struct QMGR_TRANSPORT {
int flags; /* blocked, etc. */
+ int pending; /* incomplete DA connections */
char *name; /* transport name */
int dest_concurrency_limit; /* concurrency per domain */
int init_dest_concurrency; /* init. per-domain concurrency */
};
#define QMGR_TRANSPORT_STAT_DEAD (1<<1)
-#define QMGR_TRANSPORT_STAT_BUSY (1<<2)
typedef void (*QMGR_TRANSPORT_ALLOC_NOTIFY) (QMGR_TRANSPORT *, VSTREAM *);
extern QMGR_TRANSPORT *qmgr_transport_select(void);
extern QMGR_TRANSPORT *qmgr_transport_create(const char *);
extern QMGR_TRANSPORT *qmgr_transport_find(const char *);
-#define QMGR_TRANSPORT_THROTTLED(t) ((t)->flags & QMGR_TRANSPORT_STAT_DEAD)
+#define QMGR_TRANSPORT_THROTTLED(t) ((t)->flags & QMGR_TRANSPORT_STAT_DEAD)
/*
* Each next hop (e.g., a domain name) has its own queue of pending message
QMGR_TRANSPORT_ALLOC_NOTIFY notify; /* application call-back routine */
};
+ /*
+ * Connections to delivery agents are managed asynchronously. Each delivery
+ * agent connection goes through multiple wait states:
+ *
+ * - With Linux/Solaris and old queue manager implementations only, wait for
+ * the server to invoke accept().
+ *
+ * - Wait for the delivery agent's announcement that it is ready to receive a
+ * delivery request.
+ *
+ * - Wait for the delivery request completion status.
+ *
+ * Older queue manager implementations had only one pending delivery agent
+ * connection per transport. With low-latency destinations, the output rates
+ * were reduced on Linux/Solaris systems that had the extra wait state.
+ *
+ * To maximize delivery agent output rates with low-latency destinations, the
+ * following changes were made to the queue manager by the end of the 2.4
+ * development cycle:
+ *
+ * - The Linux/Solaris accept() wait state was eliminated.
+ *
+ * - A pipeline was implemented for pending delivery agent connections. The
+ * number of pending delivery agent connections was increased from one to
+ * two: the number of before-delivery wait states, plus one extra pipeline
+ * slot to prevent the pipeline from stalling easily. Increasing the
+ * pipeline much further actually hurt performance.
+ *
+ * - To reduce queue manager disk competition with delivery agents, the queue
+ * scanning algorithm was modified to import only one message per interrupt.
+ * The incoming and deferred queue scans now happen on alternate interrupts.
+ *
+ * Simplistically reasoned, a non-zero (incoming + active) queue length is
+ * equivalent to a time shift for mail deliveries; this is undesirable when
+ * delivery agents are not fully utilized.
+ *
+ * On the other hand a non-empty active queue is what allows us to do clever
+ * things such as queue file prefetch, concurrency windows, and connection
+ * caching; the idea is that such "thinking time" is affordable only after
+ * the output channels are maxed out.
+ */
+#define QMGR_TRANSPORT_MAX_PEND 2
+
/* qmgr_transport_unthrottle_wrapper - in case (char *) != (struct *) */
static void qmgr_transport_unthrottle_wrapper(int unused_event, char *context)
event_cancel_timer(qmgr_transport_abort, context);
/*
- * Disable further read events that end up calling this function.
+ * Disable further read events that end up calling this function, turn
+ * off the Linux connect() workaround, and free up this pending
+ * connection pipeline slot.
*/
- if (alloc->stream)
+ if (alloc->stream) {
event_disable_readwrite(vstream_fileno(alloc->stream));
- alloc->transport->flags &= ~QMGR_TRANSPORT_STAT_BUSY;
+ non_blocking(vstream_fileno(alloc->stream), BLOCKING);
+ }
+ alloc->transport->pending -= 1;
/*
* Notify the requestor.
myfree((char *) alloc);
}
-#ifdef UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
-
-/* qmgr_transport_connect - handle connection request completion */
-
-static void qmgr_transport_connect(int unused_event, char *context)
-{
- QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context;
-
- /*
- * This code is necessary for some versions of LINUX, where connect(2)
- * blocks until the application performs an accept(2). Reportedly, the
- * same can happen on Solaris 2.5.1.
- */
- event_disable_readwrite(vstream_fileno(alloc->stream));
- non_blocking(vstream_fileno(alloc->stream), BLOCKING);
- event_enable_read(vstream_fileno(alloc->stream),
- qmgr_transport_event, (char *) alloc);
-}
-
-#endif
-
/* qmgr_transport_select - select transport for allocation */
QMGR_TRANSPORT *qmgr_transport_select(void)
{
QMGR_TRANSPORT *xport;
QMGR_QUEUE *queue;
+ int need;
/*
* If we find a suitable transport, rotate the list of transports to
* effectuate round-robin selection. See similar selection code in
* qmgr_queue_select().
+ *
+ * This function is called repeatedly until all transports have maxed out
+ * the number of pending delivery agent connections, until all delivery
+ * agent concurrency windows are maxed out, or until we run out of "todo"
+ * queue entries.
*/
-#define STAY_AWAY (QMGR_TRANSPORT_STAT_BUSY | QMGR_TRANSPORT_STAT_DEAD)
+#define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y))
for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) {
- if (xport->flags & STAY_AWAY)
+ if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0
+ || xport->pending >= QMGR_TRANSPORT_MAX_PEND)
continue;
+ need = xport->pending + 1;
for (queue = xport->queue_list.next; queue; queue = queue->peers.next) {
- if (queue->window > queue->busy_refcount && queue->todo.next != 0) {
+ if ((need -= MIN5af51743e4eef(queue->window - queue->busy_refcount,
+ queue->todo_refcount)) <= 0) {
QMGR_LIST_ROTATE(qmgr_transport_list, xport);
if (msg_verbose)
msg_info("qmgr_transport_select: %s", xport->name);
void qmgr_transport_alloc(QMGR_TRANSPORT *transport, QMGR_TRANSPORT_ALLOC_NOTIFY notify)
{
QMGR_TRANSPORT_ALLOC *alloc;
- VSTREAM *stream;
/*
* Sanity checks.
*/
if (transport->flags & QMGR_TRANSPORT_STAT_DEAD)
msg_panic("qmgr_transport: dead transport: %s", transport->name);
- if (transport->flags & QMGR_TRANSPORT_STAT_BUSY)
- msg_panic("qmgr_transport: nested allocation: %s", transport->name);
+ if (transport->pending >= QMGR_TRANSPORT_MAX_PEND)
+ msg_panic("qmgr_transport: excess allocation: %s", transport->name);
/*
* Connect to the well-known port for this delivery service, and wake up
- * when a process announces its availability. In the mean time, block out
- * other delivery process allocation attempts for this transport. In case
- * of problems, back off. Do not hose the system when it is in trouble
+ * when a process announces its availability. Allow only a limited number
+ * of delivery process allocation attempts for this transport. In case of
+ * problems, back off. Do not hose the system when it is in trouble
* already.
- */
-#ifdef UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
-#define BLOCK_MODE NON_BLOCKING
-#define ENABLE_EVENTS event_enable_write
-#define EVENT_HANDLER qmgr_transport_connect
-#else
-#define BLOCK_MODE BLOCKING
-#define ENABLE_EVENTS event_enable_read
-#define EVENT_HANDLER qmgr_transport_event
-#endif
-
- /*
- * When the connection to the delivery agent cannot be completed, notify
- * the event handler so that it can throttle the transport and defer the
- * todo queues, just like it does when communication fails *after*
- * connection completion.
*
- * Before Postfix 2.4, the event handler was not invoked, and mail was not
- * deferred. Because of this, mail would be stuck in the active queue
- * after triggering a "connection refused" condition.
+ * Use non-blocking connect(), so that Linux won't block the queue manager
+ * until the delivery agent calls accept().
+ *
+ * When the connection to delivery agent cannot be completed, notify the
+ * event handler so that it can throttle the transport and defer the todo
+ * queues, just like it does when communication fails *after* connection
+ * completion.
+ *
+ * Before Postfix 2.4, the event handler was not invoked after connect()
+ * error, and mail was not deferred. Because of this, mail would be stuck
+ * in the active queue after triggering a "connection refused" condition.
*/
- if ((stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name, BLOCK_MODE)) == 0) {
- msg_warn("connect to transport %s: %m", transport->name);
- }
alloc = (QMGR_TRANSPORT_ALLOC *) mymalloc(sizeof(*alloc));
- alloc->stream = stream;
alloc->transport = transport;
alloc->notify = notify;
- transport->flags |= QMGR_TRANSPORT_STAT_BUSY;
- if (alloc->stream == 0) {
+ transport->pending += 1;
+ if ((alloc->stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name,
+ NON_BLOCKING)) == 0) {
+ msg_warn("connect to transport %s: %m", transport->name);
event_request_timer(qmgr_transport_event, (char *) alloc, 0);
return;
}
- ENABLE_EVENTS(vstream_fileno(alloc->stream), EVENT_HANDLER, (char *) alloc);
+ event_enable_read(vstream_fileno(alloc->stream), qmgr_transport_event,
+ (char *) alloc);
/*
* Guard against broken systems.
msg_panic("qmgr_transport_create: transport exists: %s", name);
transport = (QMGR_TRANSPORT *) mymalloc(sizeof(QMGR_TRANSPORT));
transport->flags = 0;
+ transport->pending = 0;
transport->name = mystrdup(name);
/*
bool var_verp_bounce_off;
int var_qmgr_clog_warn_time;
-static QMGR_SCAN *qmgr_incoming;
-static QMGR_SCAN *qmgr_deferred;
+static QMGR_SCAN *qmgr_scans[2];
+
+#define QMGR_SCAN_IDX_INCOMING 0
+#define QMGR_SCAN_IDX_DEFERRED 1
+#define QMGR_SCAN_IDX_COUNT (sizeof(qmgr_scans) / sizeof(qmgr_scans[0]))
/* qmgr_deferred_run_event - queue manager heartbeat */
* This routine runs when it is time for another deferred queue scan.
* Make sure this routine gets called again in the future.
*/
- qmgr_scan_request(qmgr_deferred, QMGR_SCAN_START);
+ qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_DEFERRED], QMGR_SCAN_START);
event_request_timer(qmgr_deferred_run_event, dummy, var_queue_run_delay);
}
* requested, the request takes effect immediately.
*/
if (incoming_flag != 0)
- qmgr_scan_request(qmgr_incoming, incoming_flag);
+ qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_INCOMING], incoming_flag);
if (deferred_flag != 0)
- qmgr_scan_request(qmgr_deferred, deferred_flag);
+ qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_DEFERRED], deferred_flag);
}
/* qmgr_loop - queue manager main loop */
static int qmgr_loop(char *unused_name, char **unused_argv)
{
- char *in_path = 0;
- char *df_path = 0;
+ char *path;
int token_count;
- int in_feed = 0;
+ int feed = 0;
+ int scan_idx; /* Priority order scan index */
+ static int first_scan_idx = QMGR_SCAN_IDX_INCOMING;
+ int last_scan_idx = QMGR_SCAN_IDX_COUNT - 1;
+ int delay;
/*
* This routine runs as part of the event handling loop, after the event
/*
* Let some new blood into the active queue when the queue size is
- * smaller than some configurable limit. When the system is under heavy
- * load, favor new mail over old mail.
+ * smaller than some configurable limit.
+ *
+ * We import one message per interrupt, to optimally tune the input count
+ * for the number of delivery agent protocol wait states, as explained in
+ * qmgr_transport.c.
*/
- if (qmgr_message_count < var_qmgr_active_limit)
- if ((in_path = qmgr_scan_next(qmgr_incoming)) != 0)
- in_feed = qmgr_active_feed(qmgr_incoming, in_path);
- if (qmgr_message_count < var_qmgr_active_limit)
- if ((df_path = qmgr_scan_next(qmgr_deferred)) != 0)
- qmgr_active_feed(qmgr_deferred, df_path);
+ delay = WAIT_FOR_EVENT;
+ for (scan_idx = 0; qmgr_message_count < var_qmgr_active_limit
+ && scan_idx < QMGR_SCAN_IDX_COUNT; ++scan_idx) {
+ last_scan_idx = (scan_idx + first_scan_idx) % QMGR_SCAN_IDX_COUNT;
+ if ((path = qmgr_scan_next(qmgr_scans[last_scan_idx])) != 0) {
+ delay = DONT_WAIT;
+ if ((feed = qmgr_active_feed(qmgr_scans[last_scan_idx], path)) != 0)
+ break;
+ }
+ }
+
+ /*
+ * Round-robin the queue scans. When the active queue becomes full,
+ * prefer new mail over deferred mail.
+ */
+ if (qmgr_message_count < var_qmgr_active_limit) {
+ first_scan_idx = (last_scan_idx + 1) % QMGR_SCAN_IDX_COUNT;
+ } else if (first_scan_idx != QMGR_SCAN_IDX_INCOMING) {
+ first_scan_idx = QMGR_SCAN_IDX_INCOMING;
+ }
/*
* Global flow control. If enabled, slow down receiving processes that
if (var_in_flow_delay > 0) {
token_count = mail_flow_count();
if (token_count < var_proc_limit) {
- if (in_feed != 0)
+ if (feed != 0 && last_scan_idx == QMGR_SCAN_IDX_INCOMING)
mail_flow_put(1);
- else if (qmgr_incoming->handle == 0)
+ else if (qmgr_scans[QMGR_SCAN_IDX_INCOMING]->handle == 0)
mail_flow_put(var_proc_limit - token_count);
} else if (token_count > var_proc_limit) {
mail_flow_get(token_count - var_proc_limit);
}
}
- if (in_path || df_path)
- return (DONT_WAIT);
- return (WAIT_FOR_EVENT);
+ return (delay);
}
/* pre_accept - see if tables have changed */
var_use_limit = 0;
var_idle_limit = 0;
qmgr_move(MAIL_QUEUE_ACTIVE, MAIL_QUEUE_INCOMING, event_time());
- qmgr_incoming = qmgr_scan_create(MAIL_QUEUE_INCOMING);
- qmgr_deferred = qmgr_scan_create(MAIL_QUEUE_DEFERRED);
- qmgr_scan_request(qmgr_incoming, QMGR_SCAN_START);
+ qmgr_scans[QMGR_SCAN_IDX_INCOMING] = qmgr_scan_create(MAIL_QUEUE_INCOMING);
+ qmgr_scans[QMGR_SCAN_IDX_DEFERRED] = qmgr_scan_create(MAIL_QUEUE_DEFERRED);
+ qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_INCOMING], QMGR_SCAN_START);
qmgr_deferred_run_event(0, (char *) 0);
}
struct QMGR_TRANSPORT {
int flags; /* blocked, etc. */
+ int pending; /* incomplete DA connections */
char *name; /* transport name */
int dest_concurrency_limit; /* concurrency per domain */
int init_dest_concurrency; /* init. per-domain concurrency */
};
#define QMGR_TRANSPORT_STAT_DEAD (1<<1)
-#define QMGR_TRANSPORT_STAT_BUSY (1<<2)
typedef void (*QMGR_TRANSPORT_ALLOC_NOTIFY) (QMGR_TRANSPORT *, VSTREAM *);
extern QMGR_TRANSPORT *qmgr_transport_select(void);
extern QMGR_TRANSPORT *qmgr_transport_create(const char *);
extern QMGR_TRANSPORT *qmgr_transport_find(const char *);
-#define QMGR_TRANSPORT_THROTTLED(t) ((t)->flags & QMGR_TRANSPORT_STAT_DEAD)
+#define QMGR_TRANSPORT_THROTTLED(t) ((t)->flags & QMGR_TRANSPORT_STAT_DEAD)
/*
* Each next hop (e.g., a domain name) has its own queue of pending message
QMGR_TRANSPORT_ALLOC_NOTIFY notify; /* application call-back routine */
};
+ /*
+ * Connections to delivery agents are managed asynchronously. Each delivery
+ * agent connection goes through multiple wait states:
+ *
+ * - With Linux/Solaris and old queue manager implementations only, wait for
+ * the server to invoke accept().
+ *
+ * - Wait for the delivery agent's announcement that it is ready to receive a
+ * delivery request.
+ *
+ * - Wait for the delivery request completion status.
+ *
+ * Older queue manager implementations had only one pending delivery agent
+ * connection per transport. With low-latency destinations, the output rates
+ * were reduced on Linux/Solaris systems that had the extra wait state.
+ *
+ * To maximize delivery agent output rates with low-latency destinations, the
+ * following changes were made to the queue manager by the end of the 2.4
+ * development cycle:
+ *
+ * - The Linux/Solaris accept() wait state was eliminated.
+ *
+ * - A pipeline was implemented for pending delivery agent connections. The
+ * number of pending delivery agent connections was increased from one to
+ * two: the number of before-delivery wait states, plus one extra pipeline
+ * slot to prevent the pipeline from stalling easily. Increasing the
+ * pipeline much further actually hurt performance.
+ *
+ * - To reduce queue manager disk competition with delivery agents, the queue
+ * scanning algorithm was modified to import only one message per interrupt.
+ * The incoming and deferred queue scans now happen on alternate interrupts.
+ *
+ * Simplistically reasoned, a non-zero (incoming + active) queue length is
+ * equivalent to a time shift for mail deliveries; this is undesirable when
+ * delivery agents are not fully utilized.
+ *
+ * On the other hand a non-empty active queue is what allows us to do clever
+ * things such as queue file prefetch, concurrency windows, and connection
+ * caching; the idea is that such "thinking time" is affordable only after
+ * the output channels are maxed out.
+ */
+#define QMGR_TRANSPORT_MAX_PEND 2
+
/* qmgr_transport_unthrottle_wrapper - in case (char *) != (struct *) */
static void qmgr_transport_unthrottle_wrapper(int unused_event, char *context)
event_cancel_timer(qmgr_transport_abort, context);
/*
- * Disable further read events that end up calling this function.
+ * Disable further read events that end up calling this function, turn
+ * off the Linux connect() workaround, and free up this pending
+ * connection pipeline slot.
*/
- if (alloc->stream)
+ if (alloc->stream) {
event_disable_readwrite(vstream_fileno(alloc->stream));
- alloc->transport->flags &= ~QMGR_TRANSPORT_STAT_BUSY;
+ non_blocking(vstream_fileno(alloc->stream), BLOCKING);
+ }
+ alloc->transport->pending -= 1;
/*
* Notify the requestor.
myfree((char *) alloc);
}
-#ifdef UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
-
-/* qmgr_transport_connect - handle connection request completion */
-
-static void qmgr_transport_connect(int unused_event, char *context)
-{
- QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context;
-
- /*
- * This code is necessary for some versions of LINUX, where connect(2)
- * blocks until the application performs an accept(2). Reportedly, the
- * same can happen on Solaris 2.5.1.
- */
- event_disable_readwrite(vstream_fileno(alloc->stream));
- non_blocking(vstream_fileno(alloc->stream), BLOCKING);
- event_enable_read(vstream_fileno(alloc->stream),
- qmgr_transport_event, (char *) alloc);
-}
-
-#endif
-
/* qmgr_transport_select - select transport for allocation */
QMGR_TRANSPORT *qmgr_transport_select(void)
{
QMGR_TRANSPORT *xport;
QMGR_QUEUE *queue;
+ int need;
/*
* If we find a suitable transport, rotate the list of transports to
* effectuate round-robin selection. See similar selection code in
* qmgr_peer_select().
+ *
+ * This function is called repeatedly until all transports have maxed out
+ * the number of pending delivery agent connections, until all delivery
+ * agent concurrency windows are maxed out, or until we run out of "todo"
+ * queue entries.
*/
-#define STAY_AWAY (QMGR_TRANSPORT_STAT_BUSY | QMGR_TRANSPORT_STAT_DEAD)
+#define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y))
for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) {
- if (xport->flags & STAY_AWAY)
+ if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0
+ || xport->pending >= QMGR_TRANSPORT_MAX_PEND)
continue;
+ need = xport->pending + 1;
for (queue = xport->queue_list.next; queue; queue = queue->peers.next) {
- if (queue->window > queue->busy_refcount && queue->todo.next != 0) {
+ if ((need -= MIN5af51743e4eef(queue->window - queue->busy_refcount,
+ queue->todo_refcount)) <= 0) {
QMGR_LIST_ROTATE(qmgr_transport_list, xport, peers);
if (msg_verbose)
msg_info("qmgr_transport_select: %s", xport->name);
void qmgr_transport_alloc(QMGR_TRANSPORT *transport, QMGR_TRANSPORT_ALLOC_NOTIFY notify)
{
QMGR_TRANSPORT_ALLOC *alloc;
- VSTREAM *stream;
/*
* Sanity checks.
*/
if (transport->flags & QMGR_TRANSPORT_STAT_DEAD)
msg_panic("qmgr_transport: dead transport: %s", transport->name);
- if (transport->flags & QMGR_TRANSPORT_STAT_BUSY)
- msg_panic("qmgr_transport: nested allocation: %s", transport->name);
+ if (transport->pending >= QMGR_TRANSPORT_MAX_PEND)
+ msg_panic("qmgr_transport: excess allocation: %s", transport->name);
/*
* Connect to the well-known port for this delivery service, and wake up
- * when a process announces its availability. In the mean time, block out
- * other delivery process allocation attempts for this transport. In case
- * of problems, back off. Do not hose the system when it is in trouble
+ * when a process announces its availability. Allow only a limited number
+ * of delivery process allocation attempts for this transport. In case of
+ * problems, back off. Do not hose the system when it is in trouble
* already.
- */
-#ifdef UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
-#define BLOCK_MODE NON_BLOCKING
-#define ENABLE_EVENTS event_enable_write
-#define EVENT_HANDLER qmgr_transport_connect
-#else
-#define BLOCK_MODE BLOCKING
-#define ENABLE_EVENTS event_enable_read
-#define EVENT_HANDLER qmgr_transport_event
-#endif
-
- /*
- * When the connection to the delivery agent cannot be completed, notify
- * the event handler so that it can throttle the transport and defer the
- * todo queues, just like it does when communication fails *after*
- * connection completion.
*
- * Before Postfix 2.4, the event handler was not invoked, and mail was not
- * deferred. Because of this, mail would be stuck in the active queue
- * after triggering a "connection refused" condition.
+ * Use non-blocking connect(), so that Linux won't block the queue manager
+ * until the delivery agent calls accept().
+ *
+ * When the connection to delivery agent cannot be completed, notify the
+ * event handler so that it can throttle the transport and defer the todo
+ * queues, just like it does when communication fails *after* connection
+ * completion.
+ *
+ * Before Postfix 2.4, the event handler was not invoked after connect()
+ * error, and mail was not deferred. Because of this, mail would be stuck
+ * in the active queue after triggering a "connection refused" condition.
*/
- if ((stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name, BLOCK_MODE)) == 0) {
- msg_warn("connect to transport %s: %m", transport->name);
- }
alloc = (QMGR_TRANSPORT_ALLOC *) mymalloc(sizeof(*alloc));
- alloc->stream = stream;
alloc->transport = transport;
alloc->notify = notify;
- transport->flags |= QMGR_TRANSPORT_STAT_BUSY;
- if (alloc->stream == 0) {
+ transport->pending += 1;
+ if ((alloc->stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name,
+ NON_BLOCKING)) == 0) {
+ msg_warn("connect to transport %s: %m", transport->name);
event_request_timer(qmgr_transport_event, (char *) alloc, 0);
return;
}
- ENABLE_EVENTS(vstream_fileno(alloc->stream), EVENT_HANDLER, (char *) alloc);
+ event_enable_read(vstream_fileno(alloc->stream), qmgr_transport_event,
+ (char *) alloc);
/*
* Guard against broken systems.
msg_panic("qmgr_transport_create: transport exists: %s", name);
transport = (QMGR_TRANSPORT *) mymalloc(sizeof(QMGR_TRANSPORT));
transport->flags = 0;
+ transport->pending = 0;
transport->name = mystrdup(name);
/*
transport->rcpt_per_stack = get_mail_conf_int2(name, _STACK_RCPT_LIMIT,
var_stack_rcpt_limit, 0, 0);
transport->refill_limit = get_mail_conf_int2(name, _XPORT_REFILL_LIMIT,
- var_xport_refill_limit, 1, 0);
+ var_xport_refill_limit, 1, 0);
transport->refill_delay = get_mail_conf_time2(name, _XPORT_REFILL_DELAY,
- var_xport_refill_delay, 's', 1, 0);
+ var_xport_refill_delay, 's', 1, 0);
transport->queue_byname = htable_create(0);
QMGR_LIST_INIT(transport->queue_list);
#define DBM_NO_TRAILING_NULL
#define USE_STATVFS
#define STATVFS_IN_SYS_STATVFS_H
-#define UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
#define STRCASECMP_IN_STRINGS_H
#define SET_H_ERRNO(err) (set_h_errno(err))
#endif
#define DBM_NO_TRAILING_NULL
#define USE_STATVFS
#define STATVFS_IN_SYS_STATVFS_H
-#define UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
#endif
/*
#define FIONREAD_IN_TERMIOS_H
#define USE_STATFS
#define STATFS_IN_SYS_VFS_H
-#define UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
#define PREPEND_PLUS_TO_OPTSTRING
#define HAS_POSIX_REGEXP
#define NATIVE_SENDMAIL_PATH "/usr/sbin/sendmail"
# define _PATH_PROCNET_IFINET6 "/proc/net/if_inet6"
#endif
#include <linux/version.h>
-#if !defined(KERNEL_VERSION) || (LINUX_VERSION_CODE < KERNEL_VERSION(2,2,0)) \
+#if !defined(KERNEL_VERSION)
+# define KERNEL_VERSION(a,b,c) (LINUX_VERSION_CODE + 1)
+#endif
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,2,0)) \
|| (__GLIBC__ < 2)
# define CANT_USE_SEND_RECV_MSG
# define DEF_SMTP_CACHE_DEMAND 0
#define FIONREAD_IN_TERMIOS_H /* maybe unnecessary */
#define USE_STATFS
#define STATFS_IN_SYS_VFS_H
-#define UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT /* unverified */
#define PREPEND_PLUS_TO_OPTSTRING
#define HAS_POSIX_REGEXP
#define NATIVE_SENDMAIL_PATH "/usr/sbin/sendmail"
#define DBM_NO_TRAILING_NULL
#define USE_STATVFS
#define STATVFS_IN_SYS_STATVFS_H
-#define UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
#ifndef S_ISSOCK
#define S_ISSOCK(mode) ((mode&0xF000) == 0xC000)
#endif
#define ROOT_PATH "/bin:/etc:/usr/bin:/tcb/bin"
#define USE_STATVFS
#define STATVFS_IN_SYS_STATVFS_H
-#define UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
#define MISSING_SETENV
#define STRCASECMP_IN_STRINGS_H
/* SCO5 misses just S_ISSOCK, the others are there