]> git.ipfire.org Git - thirdparty/postfix.git/commitdiff
postfix-2.4-20070107
authorWietse Venema <wietse@porcupine.org>
Sun, 7 Jan 2007 05:00:00 +0000 (00:00 -0500)
committerViktor Dukhovni <viktor@dukhovni.org>
Tue, 5 Feb 2013 06:32:46 +0000 (06:32 +0000)
postfix/HISTORY
postfix/conf/post-install
postfix/src/global/mail_version.h
postfix/src/oqmgr/qmgr.c
postfix/src/oqmgr/qmgr.h
postfix/src/oqmgr/qmgr_transport.c
postfix/src/qmgr/qmgr.c
postfix/src/qmgr/qmgr.h
postfix/src/qmgr/qmgr_transport.c
postfix/src/util/sys_defs.h

index b4785a0952c5f430ce0f2ea86e175cb8f8f20f68..3cbfb9c95fc06bb8b82814c6912df040cdbff17b 100644 (file)
@@ -13076,6 +13076,24 @@ Apologies for any names omitted.
        that have only minor differences in dictionary open/access
        options.
 
+20070105
+
+       Performance: pipeline of pending delivery agent connections,
+       to improve Linux/Solaris mail delivery performance by another
+       10% while going down-hill with the wind from behind. Design
+       and implementation Victor and Wietse. Files: *qmgr/qmgr.c,
+       *qmgr/qmgr.h, *qmgr/qmgr_transport.c.
+
+20070106
+
+       Cleanup: eliminate the Linux/Solaris "wait for accept()"
+       stage from the queue manager to delivery agent protocol.
+       This alone achieves 99.99% of the Linux/Solaris speed up
+       from the preceding change. The pending connection pipeline
+       takes care of the rest.  Tested on Linux kernels dating
+       back to 2.0.27 (that's more than 10 years ago).  Files:
+       *qmgr/qmgr_transport.c.
+
 Wish list:
 
        Update BACKSCATTER_README to use PCRE because that's what I
index 755bb53ee5fd18248bade97d6295aaf3b37628c8..27d6ac2bde359bee8a3394a3471486e572fba1d4 100644 (file)
@@ -694,8 +694,9 @@ test -n "$first_install_reminder" && {
     Warning: you still need to edit myorigin/mydestination/mynetworks
     parameter settings in $config_directory/main.cf.
 
-    See also http://www.postfix.org/faq.html for information about
-    dialup sites or about sites inside a firewalled network.
+    See also http://www.postfix.org/STANDARD_CONFIGURATION_README.html
+    for information about dialup sites or about sites inside a
+    firewalled network.
 
     BTW: Check your $ALIASES file and be sure to set up aliases
     that send mail for root and postmaster to a real person, then
index 36f3128c634d4eed737e507327b93579796449c8..3c967c28c9fa0acb33093cfcb5bc7a2e3dfc989d 100644 (file)
@@ -20,7 +20,7 @@
   * Patches change both the patchlevel and the release date. Snapshots have no
   * patchlevel; they change the release date only.
   */
-#define MAIL_RELEASE_DATE      "20070104"
+#define MAIL_RELEASE_DATE      "20070107"
 #define MAIL_VERSION_NUMBER    "2.4"
 
 #ifdef SNAPSHOT
index 927f1be2f6fa77f2f8c07380b5602a2cd2137ed0..a61ad4064052d88625a9c18eb80ff22d2ae3eeed 100644 (file)
@@ -330,8 +330,11 @@ int     var_proc_limit;
 bool    var_verp_bounce_off;
 int     var_qmgr_clog_warn_time;
 
-static QMGR_SCAN *qmgr_incoming;
-static QMGR_SCAN *qmgr_deferred;
+static QMGR_SCAN *qmgr_scans[2];
+
+#define QMGR_SCAN_IDX_INCOMING 0
+#define QMGR_SCAN_IDX_DEFERRED 1
+#define QMGR_SCAN_IDX_COUNT (sizeof(qmgr_scans) / sizeof(qmgr_scans[0]))
 
 /* qmgr_deferred_run_event - queue manager heartbeat */
 
@@ -342,7 +345,7 @@ static void qmgr_deferred_run_event(int unused_event, char *dummy)
      * This routine runs when it is time for another deferred queue scan.
      * Make sure this routine gets called again in the future.
      */
-    qmgr_scan_request(qmgr_deferred, QMGR_SCAN_START);
+    qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_DEFERRED], QMGR_SCAN_START);
     event_request_timer(qmgr_deferred_run_event, dummy, var_queue_run_delay);
 }
 
@@ -402,19 +405,22 @@ static void qmgr_trigger_event(char *buf, int len,
      * requested, the request takes effect immediately.
      */
     if (incoming_flag != 0)
-       qmgr_scan_request(qmgr_incoming, incoming_flag);
+       qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_INCOMING], incoming_flag);
     if (deferred_flag != 0)
-       qmgr_scan_request(qmgr_deferred, deferred_flag);
+       qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_DEFERRED], deferred_flag);
 }
 
 /* qmgr_loop - queue manager main loop */
 
 static int qmgr_loop(char *unused_name, char **unused_argv)
 {
-    char   *in_path = 0;
-    char   *df_path = 0;
+    char   *path;
     int     token_count;
-    int     in_feed = 0;
+    int     feed = 0;
+    int     scan_idx;                  /* Priority order scan index */
+    static int first_scan_idx = QMGR_SCAN_IDX_INCOMING;
+    int     last_scan_idx = QMGR_SCAN_IDX_COUNT - 1;
+    int     delay;
 
     /*
      * This routine runs as part of the event handling loop, after the event
@@ -436,17 +442,34 @@ static int qmgr_loop(char *unused_name, char **unused_argv)
     /*
      * Let some new blood into the active queue when the queue size is
      * smaller than some configurable limit, and when the number of in-core
-     * recipients does not exceed some configurable limit. When the system is
-     * under heavy load, favor new mail over old mail.
+     * recipients does not exceed some configurable limit.
+     * 
+     * We import one message per interrupt, to optimally tune the input count
+     * for the number of delivery agent protocol wait states, as explained in
+     * qmgr_transport.c.
+     */
+    delay = WAIT_FOR_EVENT;
+    for (scan_idx = 0; qmgr_message_count < var_qmgr_active_limit
+        && qmgr_recipient_count < var_qmgr_rcpt_limit
+        && scan_idx < QMGR_SCAN_IDX_COUNT; ++scan_idx) {
+       last_scan_idx = (scan_idx + first_scan_idx) % QMGR_SCAN_IDX_COUNT;
+       if ((path = qmgr_scan_next(qmgr_scans[last_scan_idx])) != 0) {
+           delay = DONT_WAIT;
+           if ((feed = qmgr_active_feed(qmgr_scans[last_scan_idx], path)) != 0)
+               break;
+       }
+    }
+
+    /*
+     * Round-robin the queue scans. When the active queue becomes full,
+     * prefer new mail over deferred mail.
      */
     if (qmgr_message_count < var_qmgr_active_limit
-       && qmgr_recipient_count < var_qmgr_rcpt_limit)
-       if ((in_path = qmgr_scan_next(qmgr_incoming)) != 0)
-           in_feed = qmgr_active_feed(qmgr_incoming, in_path);
-    if (qmgr_message_count < var_qmgr_active_limit
-       && qmgr_recipient_count < var_qmgr_rcpt_limit)
-       if ((df_path = qmgr_scan_next(qmgr_deferred)) != 0)
-           qmgr_active_feed(qmgr_deferred, df_path);
+       && qmgr_recipient_count < var_qmgr_rcpt_limit) {
+       first_scan_idx = (last_scan_idx + 1) % QMGR_SCAN_IDX_COUNT;
+    } else if (first_scan_idx != QMGR_SCAN_IDX_INCOMING) {
+       first_scan_idx = QMGR_SCAN_IDX_INCOMING;
+    }
 
     /*
      * Global flow control. If enabled, slow down receiving processes that
@@ -455,17 +478,15 @@ static int qmgr_loop(char *unused_name, char **unused_argv)
     if (var_in_flow_delay > 0) {
        token_count = mail_flow_count();
        if (token_count < var_proc_limit) {
-           if (in_feed != 0)
+           if (feed != 0 && last_scan_idx == QMGR_SCAN_IDX_INCOMING)
                mail_flow_put(1);
-           else if (qmgr_incoming->handle == 0)
+           else if (qmgr_scans[QMGR_SCAN_IDX_INCOMING]->handle == 0)
                mail_flow_put(var_proc_limit - token_count);
        } else if (token_count > var_proc_limit) {
            mail_flow_get(token_count - var_proc_limit);
        }
     }
-    if (in_path || df_path)
-       return (DONT_WAIT);
-    return (WAIT_FOR_EVENT);
+    return (delay);
 }
 
 /* pre_accept - see if tables have changed */
@@ -521,9 +542,9 @@ static void qmgr_post_init(char *unused_name, char **unused_argv)
     var_use_limit = 0;
     var_idle_limit = 0;
     qmgr_move(MAIL_QUEUE_ACTIVE, MAIL_QUEUE_INCOMING, event_time());
-    qmgr_incoming = qmgr_scan_create(MAIL_QUEUE_INCOMING);
-    qmgr_deferred = qmgr_scan_create(MAIL_QUEUE_DEFERRED);
-    qmgr_scan_request(qmgr_incoming, QMGR_SCAN_START);
+    qmgr_scans[QMGR_SCAN_IDX_INCOMING] = qmgr_scan_create(MAIL_QUEUE_INCOMING);
+    qmgr_scans[QMGR_SCAN_IDX_DEFERRED] = qmgr_scan_create(MAIL_QUEUE_DEFERRED);
+    qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_INCOMING], QMGR_SCAN_START);
     qmgr_deferred_run_event(0, (char *) 0);
 }
 
index c28430c00efd59adc83abc9c157caaf40ad6d49f..73744c5d8fe0a9434c058256f558b0fa71551a49 100644 (file)
@@ -114,6 +114,7 @@ struct QMGR_QUEUE_LIST {
 
 struct QMGR_TRANSPORT {
     int     flags;                     /* blocked, etc. */
+    int     pending;                   /* incomplete DA connections */
     char   *name;                      /* transport name */
     int     dest_concurrency_limit;    /* concurrency per domain */
     int     init_dest_concurrency;     /* init. per-domain concurrency */
@@ -125,7 +126,6 @@ struct QMGR_TRANSPORT {
 };
 
 #define QMGR_TRANSPORT_STAT_DEAD       (1<<1)
-#define QMGR_TRANSPORT_STAT_BUSY       (1<<2)
 
 typedef void (*QMGR_TRANSPORT_ALLOC_NOTIFY) (QMGR_TRANSPORT *, VSTREAM *);
 extern QMGR_TRANSPORT *qmgr_transport_select(void);
@@ -135,7 +135,7 @@ extern void qmgr_transport_unthrottle(QMGR_TRANSPORT *);
 extern QMGR_TRANSPORT *qmgr_transport_create(const char *);
 extern QMGR_TRANSPORT *qmgr_transport_find(const char *);
 
-#define QMGR_TRANSPORT_THROTTLED(t) ((t)->flags & QMGR_TRANSPORT_STAT_DEAD)
+#define QMGR_TRANSPORT_THROTTLED(t)    ((t)->flags & QMGR_TRANSPORT_STAT_DEAD)
 
  /*
   * Each next hop (e.g., a domain name) has its own queue of pending message
index 65ae935b39bc710ee84b53cd4c894bda9d3876f9..55a192e09874523069e92f6dc74bb4f5dcc045d1 100644 (file)
@@ -105,6 +105,49 @@ struct QMGR_TRANSPORT_ALLOC {
     QMGR_TRANSPORT_ALLOC_NOTIFY notify;        /* application call-back routine */
 };
 
+ /*
+  * Connections to delivery agents are managed asynchronously. Each delivery
+  * agent connection goes through multiple wait states:
+  * 
+  * - With Linux/Solaris and old queue manager implementations only, wait for
+  * the server to invoke accept().
+  * 
+  * - Wait for the delivery agent's announcement that it is ready to receive a
+  * delivery request.
+  * 
+  * - Wait for the delivery request completion status.
+  * 
+  * Older queue manager implementations had only one pending delivery agent
+  * connection per transport. With low-latency destinations, the output rates
+  * were reduced on Linux/Solaris systems that had the extra wait state.
+  * 
+  * To maximize delivery agent output rates with low-latency destinations, the
+  * following changes were made to the queue manager by the end of the 2.4
+  * development cycle:
+  * 
+  * - The Linux/Solaris accept() wait state was eliminated.
+  * 
+  * - A pipeline was implemented for pending delivery agent connections. The
+  * number of pending delivery agent connections was increased from one to
+  * two: the number of before-delivery wait states, plus one extra pipeline
+  * slot to prevent the pipeline from stalling easily. Increasing the
+  * pipeline much further actually hurt performance.
+  * 
+  * - To reduce queue manager disk competition with delivery agents, the queue
+  * scanning algorithm was modified to import only one message per interrupt.
+  * The incoming and deferred queue scans now happen on alternate interrupts.
+  * 
+  * Simplistically reasoned, a non-zero (incoming + active) queue length is
+  * equivalent to a time shift for mail deliveries; this is undesirable when
+  * delivery agents are not fully utilized.
+  * 
+  * On the other hand a non-empty active queue is what allows us to do clever
+  * things such as queue file prefetch, concurrency windows, and connection
+  * caching; the idea is that such "thinking time" is affordable only after
+  * the output channels are maxed out.
+  */
+#define QMGR_TRANSPORT_MAX_PEND        2
+
 /* qmgr_transport_unthrottle_wrapper - in case (char *) != (struct *) */
 
 static void qmgr_transport_unthrottle_wrapper(int unused_event, char *context)
@@ -191,11 +234,15 @@ static void qmgr_transport_event(int unused_event, char *context)
     event_cancel_timer(qmgr_transport_abort, context);
 
     /*
-     * Disable further read events that end up calling this function.
+     * Disable further read events that end up calling this function, turn
+     * off the Linux connect() workaround, and free up this pending
+     * connection pipeline slot.
      */
-    if (alloc->stream)
+    if (alloc->stream) {
        event_disable_readwrite(vstream_fileno(alloc->stream));
-    alloc->transport->flags &= ~QMGR_TRANSPORT_STAT_BUSY;
+       non_blocking(vstream_fileno(alloc->stream), BLOCKING);
+    }
+    alloc->transport->pending -= 1;
 
     /*
      * Notify the requestor.
@@ -204,46 +251,34 @@ static void qmgr_transport_event(int unused_event, char *context)
     myfree((char *) alloc);
 }
 
-#ifdef UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
-
-/* qmgr_transport_connect - handle connection request completion */
-
-static void qmgr_transport_connect(int unused_event, char *context)
-{
-    QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context;
-
-    /*
-     * This code is necessary for some versions of LINUX, where connect(2)
-     * blocks until the application performs an accept(2). Reportedly, the
-     * same can happen on Solaris 2.5.1.
-     */
-    event_disable_readwrite(vstream_fileno(alloc->stream));
-    non_blocking(vstream_fileno(alloc->stream), BLOCKING);
-    event_enable_read(vstream_fileno(alloc->stream),
-                     qmgr_transport_event, (char *) alloc);
-}
-
-#endif
-
 /* qmgr_transport_select - select transport for allocation */
 
 QMGR_TRANSPORT *qmgr_transport_select(void)
 {
     QMGR_TRANSPORT *xport;
     QMGR_QUEUE *queue;
+    int     need;
 
     /*
      * If we find a suitable transport, rotate the list of transports to
      * effectuate round-robin selection. See similar selection code in
      * qmgr_queue_select().
+     * 
+     * This function is called repeatedly until all transports have maxed out
+     * the number of pending delivery agent connections, until all delivery
+     * agent concurrency windows are maxed out, or until we run out of "todo"
+     * queue entries.
      */
-#define STAY_AWAY (QMGR_TRANSPORT_STAT_BUSY | QMGR_TRANSPORT_STAT_DEAD)
+#define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y))
 
     for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) {
-       if (xport->flags & STAY_AWAY)
+       if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0
+           || xport->pending >= QMGR_TRANSPORT_MAX_PEND)
            continue;
+       need = xport->pending + 1;
        for (queue = xport->queue_list.next; queue; queue = queue->peers.next) {
-           if (queue->window > queue->busy_refcount && queue->todo.next != 0) {
+           if ((need -= MIN5af51743e4eef(queue->window - queue->busy_refcount,
+                                         queue->todo_refcount)) <= 0) {
                QMGR_LIST_ROTATE(qmgr_transport_list, xport);
                if (msg_verbose)
                    msg_info("qmgr_transport_select: %s", xport->name);
@@ -259,56 +294,46 @@ QMGR_TRANSPORT *qmgr_transport_select(void)
 void    qmgr_transport_alloc(QMGR_TRANSPORT *transport, QMGR_TRANSPORT_ALLOC_NOTIFY notify)
 {
     QMGR_TRANSPORT_ALLOC *alloc;
-    VSTREAM *stream;
 
     /*
      * Sanity checks.
      */
     if (transport->flags & QMGR_TRANSPORT_STAT_DEAD)
        msg_panic("qmgr_transport: dead transport: %s", transport->name);
-    if (transport->flags & QMGR_TRANSPORT_STAT_BUSY)
-       msg_panic("qmgr_transport: nested allocation: %s", transport->name);
+    if (transport->pending >= QMGR_TRANSPORT_MAX_PEND)
+       msg_panic("qmgr_transport: excess allocation: %s", transport->name);
 
     /*
      * Connect to the well-known port for this delivery service, and wake up
-     * when a process announces its availability. In the mean time, block out
-     * other delivery process allocation attempts for this transport. In case
-     * of problems, back off. Do not hose the system when it is in trouble
+     * when a process announces its availability. Allow only a limited number
+     * of delivery process allocation attempts for this transport. In case of
+     * problems, back off. Do not hose the system when it is in trouble
      * already.
-     */
-#ifdef UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
-#define BLOCK_MODE     NON_BLOCKING
-#define ENABLE_EVENTS  event_enable_write
-#define EVENT_HANDLER  qmgr_transport_connect
-#else
-#define BLOCK_MODE     BLOCKING
-#define ENABLE_EVENTS  event_enable_read
-#define EVENT_HANDLER  qmgr_transport_event
-#endif
-
-    /*
-     * When the connection to the delivery agent cannot be completed, notify
-     * the event handler so that it can throttle the transport and defer the
-     * todo queues, just like it does when communication fails *after*
-     * connection completion.
      * 
-     * Before Postfix 2.4, the event handler was not invoked, and mail was not
-     * deferred. Because of this, mail would be stuck in the active queue
-     * after triggering a "connection refused" condition.
+     * Use non-blocking connect(), so that Linux won't block the queue manager
+     * until the delivery agent calls accept().
+     * 
+     * When the connection to delivery agent cannot be completed, notify the
+     * event handler so that it can throttle the transport and defer the todo
+     * queues, just like it does when communication fails *after* connection
+     * completion.
+     * 
+     * Before Postfix 2.4, the event handler was not invoked after connect()
+     * error, and mail was not deferred. Because of this, mail would be stuck
+     * in the active queue after triggering a "connection refused" condition.
      */
-    if ((stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name, BLOCK_MODE)) == 0) {
-       msg_warn("connect to transport %s: %m", transport->name);
-    }
     alloc = (QMGR_TRANSPORT_ALLOC *) mymalloc(sizeof(*alloc));
-    alloc->stream = stream;
     alloc->transport = transport;
     alloc->notify = notify;
-    transport->flags |= QMGR_TRANSPORT_STAT_BUSY;
-    if (alloc->stream == 0) {
+    transport->pending += 1;
+    if ((alloc->stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name,
+                                     NON_BLOCKING)) == 0) {
+       msg_warn("connect to transport %s: %m", transport->name);
        event_request_timer(qmgr_transport_event, (char *) alloc, 0);
        return;
     }
-    ENABLE_EVENTS(vstream_fileno(alloc->stream), EVENT_HANDLER, (char *) alloc);
+    event_enable_read(vstream_fileno(alloc->stream), qmgr_transport_event,
+                     (char *) alloc);
 
     /*
      * Guard against broken systems.
@@ -327,6 +352,7 @@ QMGR_TRANSPORT *qmgr_transport_create(const char *name)
        msg_panic("qmgr_transport_create: transport exists: %s", name);
     transport = (QMGR_TRANSPORT *) mymalloc(sizeof(QMGR_TRANSPORT));
     transport->flags = 0;
+    transport->pending = 0;
     transport->name = mystrdup(name);
 
     /*
index 4641f4cb8c6a11dbbd47e14f8e7d9061911de500..8a2349784ea31aa5a94fed6ba73320a7527e2acb 100644 (file)
@@ -390,8 +390,11 @@ int     var_proc_limit;
 bool    var_verp_bounce_off;
 int     var_qmgr_clog_warn_time;
 
-static QMGR_SCAN *qmgr_incoming;
-static QMGR_SCAN *qmgr_deferred;
+static QMGR_SCAN *qmgr_scans[2];
+
+#define QMGR_SCAN_IDX_INCOMING 0
+#define QMGR_SCAN_IDX_DEFERRED 1
+#define QMGR_SCAN_IDX_COUNT (sizeof(qmgr_scans) / sizeof(qmgr_scans[0]))
 
 /* qmgr_deferred_run_event - queue manager heartbeat */
 
@@ -402,7 +405,7 @@ static void qmgr_deferred_run_event(int unused_event, char *dummy)
      * This routine runs when it is time for another deferred queue scan.
      * Make sure this routine gets called again in the future.
      */
-    qmgr_scan_request(qmgr_deferred, QMGR_SCAN_START);
+    qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_DEFERRED], QMGR_SCAN_START);
     event_request_timer(qmgr_deferred_run_event, dummy, var_queue_run_delay);
 }
 
@@ -462,19 +465,22 @@ static void qmgr_trigger_event(char *buf, int len,
      * requested, the request takes effect immediately.
      */
     if (incoming_flag != 0)
-       qmgr_scan_request(qmgr_incoming, incoming_flag);
+       qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_INCOMING], incoming_flag);
     if (deferred_flag != 0)
-       qmgr_scan_request(qmgr_deferred, deferred_flag);
+       qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_DEFERRED], deferred_flag);
 }
 
 /* qmgr_loop - queue manager main loop */
 
 static int qmgr_loop(char *unused_name, char **unused_argv)
 {
-    char   *in_path = 0;
-    char   *df_path = 0;
+    char   *path;
     int     token_count;
-    int     in_feed = 0;
+    int     feed = 0;
+    int     scan_idx;                  /* Priority order scan index */
+    static int first_scan_idx = QMGR_SCAN_IDX_INCOMING;
+    int     last_scan_idx = QMGR_SCAN_IDX_COUNT - 1;
+    int     delay;
 
     /*
      * This routine runs as part of the event handling loop, after the event
@@ -495,15 +501,32 @@ static int qmgr_loop(char *unused_name, char **unused_argv)
 
     /*
      * Let some new blood into the active queue when the queue size is
-     * smaller than some configurable limit. When the system is under heavy
-     * load, favor new mail over old mail.
+     * smaller than some configurable limit.
+     * 
+     * We import one message per interrupt, to optimally tune the input count
+     * for the number of delivery agent protocol wait states, as explained in
+     * qmgr_transport.c.
      */
-    if (qmgr_message_count < var_qmgr_active_limit)
-       if ((in_path = qmgr_scan_next(qmgr_incoming)) != 0)
-           in_feed = qmgr_active_feed(qmgr_incoming, in_path);
-    if (qmgr_message_count < var_qmgr_active_limit)
-       if ((df_path = qmgr_scan_next(qmgr_deferred)) != 0)
-           qmgr_active_feed(qmgr_deferred, df_path);
+    delay = WAIT_FOR_EVENT;
+    for (scan_idx = 0; qmgr_message_count < var_qmgr_active_limit
+        && scan_idx < QMGR_SCAN_IDX_COUNT; ++scan_idx) {
+       last_scan_idx = (scan_idx + first_scan_idx) % QMGR_SCAN_IDX_COUNT;
+       if ((path = qmgr_scan_next(qmgr_scans[last_scan_idx])) != 0) {
+           delay = DONT_WAIT;
+           if ((feed = qmgr_active_feed(qmgr_scans[last_scan_idx], path)) != 0)
+               break;
+       }
+    }
+
+    /*
+     * Round-robin the queue scans. When the active queue becomes full,
+     * prefer new mail over deferred mail.
+     */
+    if (qmgr_message_count < var_qmgr_active_limit) {
+       first_scan_idx = (last_scan_idx + 1) % QMGR_SCAN_IDX_COUNT;
+    } else if (first_scan_idx != QMGR_SCAN_IDX_INCOMING) {
+       first_scan_idx = QMGR_SCAN_IDX_INCOMING;
+    }
 
     /*
      * Global flow control. If enabled, slow down receiving processes that
@@ -512,17 +535,15 @@ static int qmgr_loop(char *unused_name, char **unused_argv)
     if (var_in_flow_delay > 0) {
        token_count = mail_flow_count();
        if (token_count < var_proc_limit) {
-           if (in_feed != 0)
+           if (feed != 0 && last_scan_idx == QMGR_SCAN_IDX_INCOMING)
                mail_flow_put(1);
-           else if (qmgr_incoming->handle == 0)
+           else if (qmgr_scans[QMGR_SCAN_IDX_INCOMING]->handle == 0)
                mail_flow_put(var_proc_limit - token_count);
        } else if (token_count > var_proc_limit) {
            mail_flow_get(token_count - var_proc_limit);
        }
     }
-    if (in_path || df_path)
-       return (DONT_WAIT);
-    return (WAIT_FOR_EVENT);
+    return (delay);
 }
 
 /* pre_accept - see if tables have changed */
@@ -588,9 +609,9 @@ static void qmgr_post_init(char *name, char **unused_argv)
     var_use_limit = 0;
     var_idle_limit = 0;
     qmgr_move(MAIL_QUEUE_ACTIVE, MAIL_QUEUE_INCOMING, event_time());
-    qmgr_incoming = qmgr_scan_create(MAIL_QUEUE_INCOMING);
-    qmgr_deferred = qmgr_scan_create(MAIL_QUEUE_DEFERRED);
-    qmgr_scan_request(qmgr_incoming, QMGR_SCAN_START);
+    qmgr_scans[QMGR_SCAN_IDX_INCOMING] = qmgr_scan_create(MAIL_QUEUE_INCOMING);
+    qmgr_scans[QMGR_SCAN_IDX_DEFERRED] = qmgr_scan_create(MAIL_QUEUE_DEFERRED);
+    qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_INCOMING], QMGR_SCAN_START);
     qmgr_deferred_run_event(0, (char *) 0);
 }
 
index d791a9ef059569644665e903d2fb980cc9e02ad9..5582ef81a18dc511ce50760f91debebb3996fdee 100644 (file)
@@ -131,6 +131,7 @@ struct QMGR_JOB_LIST {
 
 struct QMGR_TRANSPORT {
     int     flags;                     /* blocked, etc. */
+    int     pending;                   /* incomplete DA connections */
     char   *name;                      /* transport name */
     int     dest_concurrency_limit;    /* concurrency per domain */
     int     init_dest_concurrency;     /* init. per-domain concurrency */
@@ -165,7 +166,6 @@ struct QMGR_TRANSPORT {
 };
 
 #define QMGR_TRANSPORT_STAT_DEAD       (1<<1)
-#define QMGR_TRANSPORT_STAT_BUSY       (1<<2)
 
 typedef void (*QMGR_TRANSPORT_ALLOC_NOTIFY) (QMGR_TRANSPORT *, VSTREAM *);
 extern QMGR_TRANSPORT *qmgr_transport_select(void);
@@ -175,7 +175,7 @@ extern void qmgr_transport_unthrottle(QMGR_TRANSPORT *);
 extern QMGR_TRANSPORT *qmgr_transport_create(const char *);
 extern QMGR_TRANSPORT *qmgr_transport_find(const char *);
 
-#define QMGR_TRANSPORT_THROTTLED(t) ((t)->flags & QMGR_TRANSPORT_STAT_DEAD)
+#define QMGR_TRANSPORT_THROTTLED(t)    ((t)->flags & QMGR_TRANSPORT_STAT_DEAD)
 
  /*
   * Each next hop (e.g., a domain name) has its own queue of pending message
index d7e54f11630e9656e1b07b3f8de862cf4373c792..aa67a1c87f5ad685037ee5e51b835c1492529969 100644 (file)
@@ -110,6 +110,49 @@ struct QMGR_TRANSPORT_ALLOC {
     QMGR_TRANSPORT_ALLOC_NOTIFY notify;        /* application call-back routine */
 };
 
+ /*
+  * Connections to delivery agents are managed asynchronously. Each delivery
+  * agent connection goes through multiple wait states:
+  * 
+  * - With Linux/Solaris and old queue manager implementations only, wait for
+  * the server to invoke accept().
+  * 
+  * - Wait for the delivery agent's announcement that it is ready to receive a
+  * delivery request.
+  * 
+  * - Wait for the delivery request completion status.
+  * 
+  * Older queue manager implementations had only one pending delivery agent
+  * connection per transport. With low-latency destinations, the output rates
+  * were reduced on Linux/Solaris systems that had the extra wait state.
+  * 
+  * To maximize delivery agent output rates with low-latency destinations, the
+  * following changes were made to the queue manager by the end of the 2.4
+  * development cycle:
+  * 
+  * - The Linux/Solaris accept() wait state was eliminated.
+  * 
+  * - A pipeline was implemented for pending delivery agent connections. The
+  * number of pending delivery agent connections was increased from one to
+  * two: the number of before-delivery wait states, plus one extra pipeline
+  * slot to prevent the pipeline from stalling easily. Increasing the
+  * pipeline much further actually hurt performance.
+  * 
+  * - To reduce queue manager disk competition with delivery agents, the queue
+  * scanning algorithm was modified to import only one message per interrupt.
+  * The incoming and deferred queue scans now happen on alternate interrupts.
+  * 
+  * Simplistically reasoned, a non-zero (incoming + active) queue length is
+  * equivalent to a time shift for mail deliveries; this is undesirable when
+  * delivery agents are not fully utilized.
+  * 
+  * On the other hand a non-empty active queue is what allows us to do clever
+  * things such as queue file prefetch, concurrency windows, and connection
+  * caching; the idea is that such "thinking time" is affordable only after
+  * the output channels are maxed out.
+  */
+#define QMGR_TRANSPORT_MAX_PEND        2
+
 /* qmgr_transport_unthrottle_wrapper - in case (char *) != (struct *) */
 
 static void qmgr_transport_unthrottle_wrapper(int unused_event, char *context)
@@ -196,11 +239,15 @@ static void qmgr_transport_event(int unused_event, char *context)
     event_cancel_timer(qmgr_transport_abort, context);
 
     /*
-     * Disable further read events that end up calling this function.
+     * Disable further read events that end up calling this function, turn
+     * off the Linux connect() workaround, and free up this pending
+     * connection pipeline slot.
      */
-    if (alloc->stream)
+    if (alloc->stream) {
        event_disable_readwrite(vstream_fileno(alloc->stream));
-    alloc->transport->flags &= ~QMGR_TRANSPORT_STAT_BUSY;
+       non_blocking(vstream_fileno(alloc->stream), BLOCKING);
+    }
+    alloc->transport->pending -= 1;
 
     /*
      * Notify the requestor.
@@ -209,46 +256,34 @@ static void qmgr_transport_event(int unused_event, char *context)
     myfree((char *) alloc);
 }
 
-#ifdef UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
-
-/* qmgr_transport_connect - handle connection request completion */
-
-static void qmgr_transport_connect(int unused_event, char *context)
-{
-    QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context;
-
-    /*
-     * This code is necessary for some versions of LINUX, where connect(2)
-     * blocks until the application performs an accept(2). Reportedly, the
-     * same can happen on Solaris 2.5.1.
-     */
-    event_disable_readwrite(vstream_fileno(alloc->stream));
-    non_blocking(vstream_fileno(alloc->stream), BLOCKING);
-    event_enable_read(vstream_fileno(alloc->stream),
-                     qmgr_transport_event, (char *) alloc);
-}
-
-#endif
-
 /* qmgr_transport_select - select transport for allocation */
 
 QMGR_TRANSPORT *qmgr_transport_select(void)
 {
     QMGR_TRANSPORT *xport;
     QMGR_QUEUE *queue;
+    int     need;
 
     /*
      * If we find a suitable transport, rotate the list of transports to
      * effectuate round-robin selection. See similar selection code in
      * qmgr_peer_select().
+     * 
+     * This function is called repeatedly until all transports have maxed out
+     * the number of pending delivery agent connections, until all delivery
+     * agent concurrency windows are maxed out, or until we run out of "todo"
+     * queue entries.
      */
-#define STAY_AWAY (QMGR_TRANSPORT_STAT_BUSY | QMGR_TRANSPORT_STAT_DEAD)
+#define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y))
 
     for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) {
-       if (xport->flags & STAY_AWAY)
+       if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0
+           || xport->pending >= QMGR_TRANSPORT_MAX_PEND)
            continue;
+       need = xport->pending + 1;
        for (queue = xport->queue_list.next; queue; queue = queue->peers.next) {
-           if (queue->window > queue->busy_refcount && queue->todo.next != 0) {
+           if ((need -= MIN5af51743e4eef(queue->window - queue->busy_refcount,
+                                         queue->todo_refcount)) <= 0) {
                QMGR_LIST_ROTATE(qmgr_transport_list, xport, peers);
                if (msg_verbose)
                    msg_info("qmgr_transport_select: %s", xport->name);
@@ -264,56 +299,46 @@ QMGR_TRANSPORT *qmgr_transport_select(void)
 void    qmgr_transport_alloc(QMGR_TRANSPORT *transport, QMGR_TRANSPORT_ALLOC_NOTIFY notify)
 {
     QMGR_TRANSPORT_ALLOC *alloc;
-    VSTREAM *stream;
 
     /*
      * Sanity checks.
      */
     if (transport->flags & QMGR_TRANSPORT_STAT_DEAD)
        msg_panic("qmgr_transport: dead transport: %s", transport->name);
-    if (transport->flags & QMGR_TRANSPORT_STAT_BUSY)
-       msg_panic("qmgr_transport: nested allocation: %s", transport->name);
+    if (transport->pending >= QMGR_TRANSPORT_MAX_PEND)
+       msg_panic("qmgr_transport: excess allocation: %s", transport->name);
 
     /*
      * Connect to the well-known port for this delivery service, and wake up
-     * when a process announces its availability. In the mean time, block out
-     * other delivery process allocation attempts for this transport. In case
-     * of problems, back off. Do not hose the system when it is in trouble
+     * when a process announces its availability. Allow only a limited number
+     * of delivery process allocation attempts for this transport. In case of
+     * problems, back off. Do not hose the system when it is in trouble
      * already.
-     */
-#ifdef UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
-#define BLOCK_MODE     NON_BLOCKING
-#define ENABLE_EVENTS  event_enable_write
-#define EVENT_HANDLER  qmgr_transport_connect
-#else
-#define BLOCK_MODE     BLOCKING
-#define ENABLE_EVENTS  event_enable_read
-#define EVENT_HANDLER  qmgr_transport_event
-#endif
-
-    /*
-     * When the connection to the delivery agent cannot be completed, notify
-     * the event handler so that it can throttle the transport and defer the
-     * todo queues, just like it does when communication fails *after*
-     * connection completion.
      * 
-     * Before Postfix 2.4, the event handler was not invoked, and mail was not
-     * deferred. Because of this, mail would be stuck in the active queue
-     * after triggering a "connection refused" condition.
+     * Use non-blocking connect(), so that Linux won't block the queue manager
+     * until the delivery agent calls accept().
+     * 
+     * When the connection to delivery agent cannot be completed, notify the
+     * event handler so that it can throttle the transport and defer the todo
+     * queues, just like it does when communication fails *after* connection
+     * completion.
+     * 
+     * Before Postfix 2.4, the event handler was not invoked after connect()
+     * error, and mail was not deferred. Because of this, mail would be stuck
+     * in the active queue after triggering a "connection refused" condition.
      */
-    if ((stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name, BLOCK_MODE)) == 0) {
-       msg_warn("connect to transport %s: %m", transport->name);
-    }
     alloc = (QMGR_TRANSPORT_ALLOC *) mymalloc(sizeof(*alloc));
-    alloc->stream = stream;
     alloc->transport = transport;
     alloc->notify = notify;
-    transport->flags |= QMGR_TRANSPORT_STAT_BUSY;
-    if (alloc->stream == 0) {
+    transport->pending += 1;
+    if ((alloc->stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name,
+                                     NON_BLOCKING)) == 0) {
+       msg_warn("connect to transport %s: %m", transport->name);
        event_request_timer(qmgr_transport_event, (char *) alloc, 0);
        return;
     }
-    ENABLE_EVENTS(vstream_fileno(alloc->stream), EVENT_HANDLER, (char *) alloc);
+    event_enable_read(vstream_fileno(alloc->stream), qmgr_transport_event,
+                     (char *) alloc);
 
     /*
      * Guard against broken systems.
@@ -332,6 +357,7 @@ QMGR_TRANSPORT *qmgr_transport_create(const char *name)
        msg_panic("qmgr_transport_create: transport exists: %s", name);
     transport = (QMGR_TRANSPORT *) mymalloc(sizeof(QMGR_TRANSPORT));
     transport->flags = 0;
+    transport->pending = 0;
     transport->name = mystrdup(name);
 
     /*
@@ -364,9 +390,9 @@ QMGR_TRANSPORT *qmgr_transport_create(const char *name)
     transport->rcpt_per_stack = get_mail_conf_int2(name, _STACK_RCPT_LIMIT,
                                                var_stack_rcpt_limit, 0, 0);
     transport->refill_limit = get_mail_conf_int2(name, _XPORT_REFILL_LIMIT,
-                                               var_xport_refill_limit, 1, 0);
+                                             var_xport_refill_limit, 1, 0);
     transport->refill_delay = get_mail_conf_time2(name, _XPORT_REFILL_DELAY,
-                                               var_xport_refill_delay, 's', 1, 0);
+                                        var_xport_refill_delay, 's', 1, 0);
 
     transport->queue_byname = htable_create(0);
     QMGR_LIST_INIT(transport->queue_list);
index 25c0b40fb37b44c4cec8ed3b64b1d738fd33d7e9..c8ca6267277ed102bc702ba5b779b537ce6404da 100644 (file)
@@ -432,7 +432,6 @@ extern int opterr;
 #define DBM_NO_TRAILING_NULL
 #define USE_STATVFS
 #define STATVFS_IN_SYS_STATVFS_H
-#define UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
 #define STRCASECMP_IN_STRINGS_H
 #define SET_H_ERRNO(err) (set_h_errno(err))
 #endif
@@ -463,7 +462,6 @@ extern int opterr;
 #define DBM_NO_TRAILING_NULL
 #define USE_STATVFS
 #define STATVFS_IN_SYS_STATVFS_H
-#define UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
 #endif
 
  /*
@@ -679,7 +677,6 @@ extern int initgroups(const char *, int);
 #define FIONREAD_IN_TERMIOS_H
 #define USE_STATFS
 #define STATFS_IN_SYS_VFS_H
-#define UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
 #define PREPEND_PLUS_TO_OPTSTRING
 #define HAS_POSIX_REGEXP
 #define NATIVE_SENDMAIL_PATH "/usr/sbin/sendmail"
@@ -697,7 +694,10 @@ extern int initgroups(const char *, int);
 # define _PATH_PROCNET_IFINET6 "/proc/net/if_inet6"
 #endif
 #include <linux/version.h>
-#if !defined(KERNEL_VERSION) || (LINUX_VERSION_CODE < KERNEL_VERSION(2,2,0)) \
+#if !defined(KERNEL_VERSION) 
+# define KERNEL_VERSION(a,b,c) (LINUX_VERSION_CODE + 1)
+#endif
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,2,0)) \
        || (__GLIBC__ < 2)
 # define CANT_USE_SEND_RECV_MSG
 # define DEF_SMTP_CACHE_DEMAND 0
@@ -727,7 +727,6 @@ extern int initgroups(const char *, int);
 #define FIONREAD_IN_TERMIOS_H          /* maybe unnecessary */
 #define USE_STATFS
 #define STATFS_IN_SYS_VFS_H
-#define UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT  /* unverified */
 #define PREPEND_PLUS_TO_OPTSTRING
 #define HAS_POSIX_REGEXP
 #define NATIVE_SENDMAIL_PATH "/usr/sbin/sendmail"
@@ -1018,7 +1017,6 @@ extern int opterr;                        /* XXX use <getopt.h> */
 #define DBM_NO_TRAILING_NULL
 #define USE_STATVFS
 #define STATVFS_IN_SYS_STATVFS_H
-#define UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
 #ifndef S_ISSOCK
 #define S_ISSOCK(mode) ((mode&0xF000) == 0xC000)
 #endif
@@ -1049,7 +1047,6 @@ extern int h_errno;
 #define ROOT_PATH      "/bin:/etc:/usr/bin:/tcb/bin"
 #define USE_STATVFS
 #define STATVFS_IN_SYS_STATVFS_H
-#define UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
 #define MISSING_SETENV
 #define STRCASECMP_IN_STRINGS_H
 /* SCO5 misses just S_ISSOCK, the others are there