]> git.ipfire.org Git - thirdparty/apache/httpd.git/commitdiff
Merge of 1891716-1891719,1891721,1891724,1891726-1891728 from trunk:
authorStefan Eissing <icing@apache.org>
Tue, 10 Aug 2021 08:08:12 +0000 (08:08 +0000)
committerStefan Eissing <icing@apache.org>
Tue, 10 Aug 2021 08:08:12 +0000 (08:08 +0000)
  *) mpm_event: Fix graceful stop/restart of children processes if connections
     are in lingering close for too long.  [Yann Ylavic]

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4.x@1892159 13f79535-47bb-0310-9956-ffa450edef68

CHANGES
server/connection.c
server/mpm/event/event.c
server/mpm_fdqueue.c
server/mpm_fdqueue.h

diff --git a/CHANGES b/CHANGES
index 515189f2ea642d9d0513f7c1c0736854fece08d5..1322a4a8a45e8121c8f8b70f31b4f6f3720f3779 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,9 @@
                                                          -*- coding: utf-8 -*-
 Changes with Apache 2.4.49
 
+  *) mpm_event: Fix graceful stop/restart of children processes if connections
+     are in lingering close for too long.  [Yann Ylavic]
+
   *) mod_md: fixed a potential null pointer dereference if ACME/OCSP
      server returned 2xx responses without content type. Reported by chuangwen.
      [chuangwen, Stefan Eissing]
index 19745376db8a1e363949b854c77c133b2e8755cf..b0093e16c9d053b570f33e4eccc1206f83674fd3 100644 (file)
@@ -139,18 +139,12 @@ AP_DECLARE(int) ap_start_lingering_close(conn_rec *c)
     ap_flush_conn(c);
 
 #ifdef NO_LINGCLOSE
-    apr_socket_close(csd);
     return 1;
 #else
     /* Shut down the socket for write, which will send a FIN
      * to the peer.
      */
-    if (c->aborted
-            || apr_socket_shutdown(csd, APR_SHUTDOWN_WRITE) != APR_SUCCESS) {
-        apr_socket_close(csd);
-        return 1;
-    }
-    return 0;
+    return (c->aborted || apr_socket_shutdown(csd, APR_SHUTDOWN_WRITE));
 #endif
 }
 
@@ -162,6 +156,7 @@ AP_DECLARE(void) ap_lingering_close(conn_rec *c)
     apr_socket_t *csd = ap_get_conn_socket(c);
 
     if (ap_start_lingering_close(c)) {
+        apr_socket_close(csd);
         return;
     }
 
index 8977e176dca84e6a67152f7d90a221a6a18385f8..955a91afcef09de60d8048b8cc953590aa6e2564 100644 (file)
@@ -248,6 +248,8 @@ struct event_conn_state_t {
     conn_state_t pub;
     /** chaining in defer_linger_chain */
     struct event_conn_state_t *chain;
+    /** Is lingering close from defer_lingering_close()? */
+    int deferred_linger;
 };
 
 APR_RING_HEAD(timeout_head_t, event_conn_state_t);
@@ -285,21 +287,21 @@ static volatile apr_time_t  queues_next_expiry;
  */
 static void TO_QUEUE_APPEND(struct timeout_queue *q, event_conn_state_t *el)
 {
-    apr_time_t q_expiry;
+    apr_time_t elem_expiry;
     apr_time_t next_expiry;
 
     APR_RING_INSERT_TAIL(&q->head, el, event_conn_state_t, timeout_list);
     ++*q->total;
     ++q->count;
 
-    /* Cheaply update the overall queues' next expiry according to the
-     * first entry of this queue (oldest), if necessary.
+    /* Cheaply update the global queues_next_expiry with the one of the
+     * first entry of this queue (oldest) if it expires before.
      */
     el = APR_RING_FIRST(&q->head);
-    q_expiry = el->queue_timestamp + q->timeout;
+    elem_expiry = el->queue_timestamp + q->timeout;
     next_expiry = queues_next_expiry;
-    if (!next_expiry || next_expiry > q_expiry + TIMEOUT_FUDGE_FACTOR) {
-        queues_next_expiry = q_expiry;
+    if (!next_expiry || next_expiry > elem_expiry + TIMEOUT_FUDGE_FACTOR) {
+        queues_next_expiry = elem_expiry;
         /* Unblock the poll()ing listener for it to update its timeout. */
         if (listener_is_wakeable) {
             apr_pollset_wakeup(event_pollset);
@@ -525,9 +527,20 @@ static APR_INLINE int connections_above_limit(int *busy)
     return 1;
 }
 
-static void abort_socket_nonblocking(apr_socket_t *csd)
+static void close_socket_nonblocking_(apr_socket_t *csd,
+                                      const char *from, int line)
 {
     apr_status_t rv;
+    apr_os_sock_t fd = -1;
+
+    /* close_worker_sockets() may have closed it already */
+    rv = apr_os_sock_get(&fd, csd);
+    ap_log_error(APLOG_MARK, APLOG_TRACE8, 0, ap_server_conf,
+                "closing socket %i/%pp from %s:%i", (int)fd, csd, from, line);
+    if (rv == APR_SUCCESS && fd == -1) {
+        return;
+    }
+
     apr_socket_timeout_set(csd, 0);
     rv = apr_socket_close(csd);
     if (rv != APR_SUCCESS) {
@@ -536,6 +549,8 @@ static void abort_socket_nonblocking(apr_socket_t *csd)
         AP_DEBUG_ASSERT(0);
     }
 }
+#define close_socket_nonblocking(csd) \
+    close_socket_nonblocking_(csd, __FUNCTION__, __LINE__)
 
 static void close_worker_sockets(void)
 {
@@ -544,26 +559,16 @@ static void close_worker_sockets(void)
         apr_socket_t *csd = worker_sockets[i];
         if (csd) {
             worker_sockets[i] = NULL;
-            abort_socket_nonblocking(csd);
-        }
-    }
-    for (;;) {
-        event_conn_state_t *cs = defer_linger_chain;
-        if (!cs) {
-            break;
-        }
-        if (apr_atomic_casptr((void *)&defer_linger_chain, cs->chain,
-                              cs) != cs) {
-            /* Race lost, try again */
-            continue;
+            close_socket_nonblocking(csd);
         }
-        cs->chain = NULL;
-        abort_socket_nonblocking(cs->pfd.desc.s);
     }
 }
 
 static void wakeup_listener(void)
 {
+    ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
+                 "wake up listener%s", listener_may_exit ? " again" : "");
+
     listener_may_exit = 1;
     disable_listensocks();
 
@@ -741,7 +746,10 @@ static apr_status_t decrement_connection_count(void *cs_)
 {
     int is_last_connection;
     event_conn_state_t *cs = cs_;
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, cs->c,
+                  "cleanup connection from state %i", (int)cs->pub.state);
     switch (cs->pub.state) {
+        case CONN_STATE_LINGER:
         case CONN_STATE_LINGER_NORMAL:
         case CONN_STATE_LINGER_SHORT:
             apr_atomic_dec32(&lingering_count);
@@ -762,6 +770,10 @@ static apr_status_t decrement_connection_count(void *cs_)
                 || (listeners_disabled() && !connections_above_limit(NULL)))) {
         apr_pollset_wakeup(event_pollset);
     }
+    if (dying) {
+        /* Help worker_thread_should_exit_early() */
+        ap_queue_interrupt_one(worker_queue);
+    }
     return APR_SUCCESS;
 }
 
@@ -780,65 +792,26 @@ static void notify_resume(event_conn_state_t *cs, int cleanup)
 }
 
 /*
- * Close our side of the connection, flushing data to the client first.
- * Pre-condition: cs is not in any timeout queue and not in the pollset,
- *                timeout_mutex is not locked
- * return: 0 if connection is fully closed,
- *         1 if connection is lingering
- * May only be called by worker thread.
+ * Defer flush and close of the connection by adding it to defer_linger_chain,
+ * for a worker to grab it and do the job (should that be blocking).
+ * Pre-condition: nonblocking, can be called from anywhere provided cs is not
+ *                in any timeout queue or in the pollset.
  */
-static int start_lingering_close_blocking(event_conn_state_t *cs)
+static int defer_lingering_close(event_conn_state_t *cs)
 {
-    apr_socket_t *csd = cs->pfd.desc.s;
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c,
+                  "deferring close from state %i", (int)cs->pub.state);
 
-    if (ap_start_lingering_close(cs->c)) {
-        notify_suspend(cs);
-        apr_socket_close(csd);
-        ap_queue_info_push_pool(worker_queue_info, cs->p);
-        return DONE;
-    }
-
-#ifdef AP_DEBUG
-    {
-        apr_status_t rv;
-        rv = apr_socket_timeout_set(csd, 0);
-        AP_DEBUG_ASSERT(rv == APR_SUCCESS);
-    }
-#else
-    apr_socket_timeout_set(csd, 0);
-#endif
-
-    cs->queue_timestamp = apr_time_now();
-    /*
-     * If some module requested a shortened waiting period, only wait for
-     * 2s (SECONDS_TO_LINGER). This is useful for mitigating certain
-     * DoS attacks.
+    /* The connection is not shutdown() yet strictly speaking, but it's not
+     * in any queue nor handled by a worker either (will be very soon), so
+     * to account for it somewhere we bump lingering_count now (and set
+     * deferred_linger for process_lingering_close() to know).
      */
-    if (apr_table_get(cs->c->notes, "short-lingering-close")) {
-        cs->pub.state = CONN_STATE_LINGER_SHORT;
-    }
-    else {
-        cs->pub.state = CONN_STATE_LINGER_NORMAL;
-    }
+    cs->pub.state = CONN_STATE_LINGER;
     apr_atomic_inc32(&lingering_count);
-    notify_suspend(cs);
-
-    return OK;
-}
-
-/*
- * Defer flush and close of the connection by adding it to defer_linger_chain,
- * for a worker to grab it and do the job (should that be blocking).
- * Pre-condition: cs is not in any timeout queue and not in the pollset,
- *                timeout_mutex is not locked
- * return: 1 connection is alive (but aside and about to linger)
- * May be called by listener thread.
- */
-static int start_lingering_close_nonblocking(event_conn_state_t *cs)
-{
-    event_conn_state_t *chain;
+    cs->deferred_linger = 1;
     for (;;) {
-        cs->chain = chain = defer_linger_chain;
+        event_conn_state_t *chain = cs->chain = defer_linger_chain;
         if (apr_atomic_casptr((void *)&defer_linger_chain, cs,
                               chain) != chain) {
             /* Race lost, try again */
@@ -848,22 +821,37 @@ static int start_lingering_close_nonblocking(event_conn_state_t *cs)
     }
 }
 
-/*
- * forcibly close a lingering connection after the lingering period has
- * expired
- * Pre-condition: cs is not in any timeout queue and not in the pollset
- * return: irrelevant (need same prototype as start_lingering_close)
+/* Close the connection and release its resources (ptrans), either because an
+ * unrecoverable error occured (queues or pollset add/remove) or more usually
+ * if lingering close timed out.
+ * Pre-condition: nonblocking, can be called from anywhere provided cs is not
+ *                in any timeout queue or in the pollset.
  */
-static int stop_lingering_close(event_conn_state_t *cs)
+static void close_connection(event_conn_state_t *cs)
 {
-    apr_socket_t *csd = ap_get_conn_socket(cs->c);
-    ap_log_error(APLOG_MARK, APLOG_TRACE4, 0, ap_server_conf,
-                 "socket abort in state %i", (int)cs->pub.state);
-    abort_socket_nonblocking(csd);
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c,
+                  "closing connection from state %i", (int)cs->pub.state);
+
+    close_socket_nonblocking(cs->pfd.desc.s);
     ap_queue_info_push_pool(worker_queue_info, cs->p);
-    if (dying)
-        ap_queue_interrupt_one(worker_queue);
-    return 0;
+}
+
+/* Shutdown the connection in case of timeout, error or resources shortage.
+ * This starts short lingering close if not already there, or directly closes
+ * the connection otherwise.
+ * Pre-condition: nonblocking, can be called from anywhere provided cs is not
+ *                in any timeout queue or in the pollset.
+ */
+static int shutdown_connection(event_conn_state_t *cs)
+{
+    if (cs->pub.state < CONN_STATE_LINGER) {
+        apr_table_setn(cs->c->notes, "short-lingering-close", "1");
+        defer_lingering_close(cs);
+    }
+    else {
+        close_connection(cs);
+    }
+    return 1;
 }
 
 /*
@@ -935,6 +923,27 @@ static int event_post_read_request(request_rec *r)
 /* Forward declare */
 static void process_lingering_close(event_conn_state_t *cs);
 
+static void update_reqevents_from_sense(event_conn_state_t *cs, int sense)
+{
+    if (sense < 0) {
+        sense = cs->pub.sense;
+    }
+    if (sense == CONN_SENSE_WANT_READ) {
+        cs->pfd.reqevents = APR_POLLIN | APR_POLLHUP;
+    }
+    else {
+        cs->pfd.reqevents = APR_POLLOUT;
+    }
+    /* POLLERR is usually returned event only, but some pollset
+     * backends may require it in reqevents to do the right thing,
+     * so it shouldn't hurt (ignored otherwise).
+     */
+    cs->pfd.reqevents |= APR_POLLERR;
+
+    /* Reset to default for the next round */
+    cs->pub.sense = CONN_SENSE_DEFAULT;
+}
+
 /*
  * process one connection in the worker
  */
@@ -964,14 +973,14 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc
                                   apr_pool_cleanup_null);
         ap_set_module_config(c->conn_config, &mpm_event_module, cs);
         c->current_thread = thd;
+        c->cs = &cs->pub;
         cs->c = c;
-        c->cs = &(cs->pub);
         cs->p = p;
         cs->sc = ap_get_module_config(ap_server_conf->module_config,
                                       &mpm_event_module);
         cs->pfd.desc_type = APR_POLL_SOCKET;
-        cs->pfd.reqevents = APR_POLLIN;
         cs->pfd.desc.s = sock;
+        update_reqevents_from_sense(cs, CONN_SENSE_WANT_READ);
         pt->type = PT_CSD;
         pt->baton = cs;
         cs->pfd.client_data = pt;
@@ -1110,19 +1119,7 @@ read_request:
             cs->queue_timestamp = apr_time_now();
             notify_suspend(cs);
 
-            if (cs->pub.sense == CONN_SENSE_WANT_READ) {
-                cs->pfd.reqevents = APR_POLLIN;
-            }
-            else {
-                cs->pfd.reqevents = APR_POLLOUT;
-            }
-            /* POLLHUP/ERR are usually returned event only (ignored here), but
-             * some pollset backends may require them in reqevents to do the
-             * right thing, so it shouldn't hurt.
-             */
-            cs->pfd.reqevents |= APR_POLLHUP | APR_POLLERR;
-            cs->pub.sense = CONN_SENSE_DEFAULT;
-
+            update_reqevents_from_sense(cs, -1);
             apr_thread_mutex_lock(timeout_mutex);
             TO_QUEUE_APPEND(cs->sc->wc_q, cs);
             rv = apr_pollset_add(event_pollset, &cs->pfd);
@@ -1133,25 +1130,27 @@ read_request:
                 ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03465)
                              "process_socket: apr_pollset_add failure for "
                              "write completion");
-                apr_socket_close(cs->pfd.desc.s);
-                ap_queue_info_push_pool(worker_queue_info, cs->p);
+                close_connection(cs);
+                signal_threads(ST_GRACEFUL);
             }
             else {
                 apr_thread_mutex_unlock(timeout_mutex);
             }
             return;
         }
-        else if (c->keepalive != AP_CONN_KEEPALIVE || c->aborted ||
-                 listener_may_exit) {
+        else if (c->keepalive != AP_CONN_KEEPALIVE || c->aborted) {
             cs->pub.state = CONN_STATE_LINGER;
         }
         else if (c->data_in_input_filters) {
             cs->pub.state = CONN_STATE_READ_REQUEST_LINE;
             goto read_request;
         }
-        else {
+        else if (!listener_may_exit) {
             cs->pub.state = CONN_STATE_CHECK_REQUEST_LINE_READABLE;
         }
+        else {
+            cs->pub.state = CONN_STATE_LINGER;
+        }
     }
 
     if (cs->pub.state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) {
@@ -1169,7 +1168,7 @@ read_request:
         notify_suspend(cs);
 
         /* Add work to pollset. */
-        cs->pfd.reqevents = APR_POLLIN;
+        update_reqevents_from_sense(cs, CONN_SENSE_WANT_READ);
         apr_thread_mutex_lock(timeout_mutex);
         TO_QUEUE_APPEND(cs->sc->ka_q, cs);
         rv = apr_pollset_add(event_pollset, &cs->pfd);
@@ -1180,8 +1179,8 @@ read_request:
             ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03093)
                          "process_socket: apr_pollset_add failure for "
                          "keep alive");
-            apr_socket_close(cs->pfd.desc.s);
-            ap_queue_info_push_pool(worker_queue_info, cs->p);
+            close_connection(cs);
+            signal_threads(ST_GRACEFUL);
         }
         else {
             apr_thread_mutex_unlock(timeout_mutex);
@@ -1195,12 +1194,10 @@ read_request:
         return;
     }
 
-    if (cs->pub.state == CONN_STATE_LINGER) {
-        rc = start_lingering_close_blocking(cs);
-    }
-    if (rc == OK && (cs->pub.state == CONN_STATE_LINGER_NORMAL ||
-                     cs->pub.state == CONN_STATE_LINGER_SHORT)) {
+    /* CONN_STATE_LINGER[_*] fall through process_lingering_close() */
+    if (cs->pub.state >= CONN_STATE_LINGER) {
         process_lingering_close(cs);
+        return;
     }
 }
 
@@ -1220,12 +1217,17 @@ static void check_infinite_requests(void)
     }
 }
 
-static void close_listeners(int *closed)
+static int close_listeners(int *closed)
 {
+    ap_log_error(APLOG_MARK, APLOG_TRACE6, 0, ap_server_conf,
+                 "clos%s listeners (connection_count=%u)",
+                 *closed ? "ed" : "ing", apr_atomic_read32(&connection_count));
     if (!*closed) {
         int i;
+
         ap_close_listeners_ex(my_bucket->listeners);
-        *closed = 1;
+        *closed = 1; /* once */
+
         dying = 1;
         ap_scoreboard_image->parent[ap_child_slot].quiescing = 1;
         for (i = 0; i < threads_per_child; ++i) {
@@ -1237,7 +1239,10 @@ static void close_listeners(int *closed)
 
         ap_queue_info_free_idle_pools(worker_queue_info);
         ap_queue_interrupt_all(worker_queue);
+
+        return 1;
     }
+    return 0;
 }
 
 static void unblock_signal(int sig)
@@ -1286,11 +1291,16 @@ static apr_status_t push2worker(event_conn_state_t *cs, apr_socket_t *csd,
         /* trash the connection; we couldn't queue the connected
          * socket to a worker
          */
-        if (csd) {
-            abort_socket_nonblocking(csd);
+        if (cs) {
+            shutdown_connection(cs);
         }
-        if (ptrans) {
-            ap_queue_info_push_pool(worker_queue_info, ptrans);
+        else {
+            if (csd) {
+                close_socket_nonblocking(csd);
+            }
+            if (ptrans) {
+                ap_queue_info_push_pool(worker_queue_info, ptrans);
+            }
         }
         signal_threads(ST_GRACEFUL);
     }
@@ -1400,8 +1410,8 @@ static apr_status_t event_register_timed_callback(apr_time_t t,
         /* Okay, add sorted by when.. */
         apr_skiplist_insert(timer_skiplist, te);
 
-        /* Cheaply update the overall timers' next expiry according to
-         * this event, if necessary.
+        /* Cheaply update the global timers_next_expiry with this event's
+         * if it expires before.
          */
         next_expiry = timers_next_expiry;
         if (!next_expiry || next_expiry > te->when + EVENT_FUDGE_FACTOR) {
@@ -1420,10 +1430,13 @@ static apr_status_t event_register_timed_callback(apr_time_t t,
 
 
 /*
- * Close socket and clean up if remote closed its end while we were in
- * lingering close. Only to be called in the worker thread, and since it's
- * in immediate call stack, we can afford a comfortable buffer size to
- * consume data quickly.
+ * Flush data and close our side of the connection, then drain incoming data.
+ * If the latter would block put the connection in one of the linger timeout
+ * queues to be called back when ready, and repeat until it's closed by peer.
+ * Only to be called in the worker thread, and since it's in immediate call
+ * stack, we can afford a comfortable buffer size to consume data quickly.
+ * Pre-condition: cs is not in any timeout queue and not in the pollset,
+ *                timeout_mutex is not locked
  */
 #define LINGERING_BUF_SIZE (32 * 1024)
 static void process_lingering_close(event_conn_state_t *cs)
@@ -1434,22 +1447,55 @@ static void process_lingering_close(event_conn_state_t *cs)
     apr_status_t rv;
     struct timeout_queue *q;
 
-    /* socket is already in non-blocking state */
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c,
+                  "lingering close from state %i", (int)cs->pub.state);
+    AP_DEBUG_ASSERT(cs->pub.state >= CONN_STATE_LINGER);
+
+    if (cs->pub.state == CONN_STATE_LINGER) {
+        /* defer_lingering_close() may have bumped lingering_count already */
+        if (!cs->deferred_linger) {
+            apr_atomic_inc32(&lingering_count);
+        }
+
+        apr_socket_timeout_set(csd, apr_time_from_sec(SECONDS_TO_LINGER));
+        if (ap_start_lingering_close(cs->c)) {
+            notify_suspend(cs);
+            close_connection(cs);
+            return;
+        }
+
+        cs->queue_timestamp = apr_time_now();
+        /* Clear APR_INCOMPLETE_READ if it was ever set, we'll do the poll()
+         * at the listener only from now, if needed.
+         */
+        apr_socket_opt_set(csd, APR_INCOMPLETE_READ, 0);
+        /*
+         * If some module requested a shortened waiting period, only wait for
+         * 2s (SECONDS_TO_LINGER). This is useful for mitigating certain
+         * DoS attacks.
+         */
+        if (apr_table_get(cs->c->notes, "short-lingering-close")) {
+            cs->pub.state = CONN_STATE_LINGER_SHORT;
+        }
+        else {
+            cs->pub.state = CONN_STATE_LINGER_NORMAL;
+        }
+        notify_suspend(cs);
+    }
+
+    apr_socket_timeout_set(csd, 0);
     do {
         nbytes = sizeof(dummybuf);
         rv = apr_socket_recv(csd, dummybuf, &nbytes);
     } while (rv == APR_SUCCESS);
 
     if (!APR_STATUS_IS_EAGAIN(rv)) {
-        rv = apr_socket_close(csd);
-        AP_DEBUG_ASSERT(rv == APR_SUCCESS);
-        ap_queue_info_push_pool(worker_queue_info, cs->p);
+        close_connection(cs);
         return;
     }
 
-    /* Re-queue the connection to come back when readable */
-    cs->pfd.reqevents = APR_POLLIN;
-    cs->pub.sense = CONN_SENSE_DEFAULT;
+    /* (Re)queue the connection to come back when readable */
+    update_reqevents_from_sense(cs, CONN_SENSE_WANT_READ);
     q = (cs->pub.state == CONN_STATE_LINGER_SHORT) ? short_linger_q : linger_q;
     apr_thread_mutex_lock(timeout_mutex);
     TO_QUEUE_APPEND(q, cs);
@@ -1460,20 +1506,18 @@ static void process_lingering_close(event_conn_state_t *cs)
         apr_thread_mutex_unlock(timeout_mutex);
         ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03092)
                      "process_lingering_close: apr_pollset_add failure");
-        rv = apr_socket_close(cs->pfd.desc.s);
-        AP_DEBUG_ASSERT(rv == APR_SUCCESS);
-        ap_queue_info_push_pool(worker_queue_info, cs->p);
+        close_connection(cs);
+        signal_threads(ST_GRACEFUL);
         return;
     }
     apr_thread_mutex_unlock(timeout_mutex);
 }
 
-/* call 'func' for all elements of 'q' with timeout less than 'timeout_time'.
+/* call 'func' for all elements of 'q' above 'expiry'.
  * Pre-condition: timeout_mutex must already be locked
  * Post-condition: timeout_mutex will be locked again
  */
-static void process_timeout_queue(struct timeout_queue *q,
-                                  apr_time_t timeout_time,
+static void process_timeout_queue(struct timeout_queue *q, apr_time_t expiry,
                                   int (*func)(event_conn_state_t *))
 {
     apr_uint32_t total = 0, count;
@@ -1493,27 +1537,26 @@ static void process_timeout_queue(struct timeout_queue *q,
         while (cs != APR_RING_SENTINEL(&qp->head, event_conn_state_t,
                                        timeout_list)) {
             /* Trash the entry if:
-             * - no timeout_time was given (asked for all), or
+             * - no expiry was given (zero means all), or
              * - it expired (according to the queue timeout), or
              * - the system clock skewed in the past: no entry should be
-             *   registered above the given timeout_time (~now) + the queue
+             *   registered above the given expiry (~now) + the queue
              *   timeout, we won't keep any here (eg. for centuries).
              *
              * Otherwise stop, no following entry will match thanks to the
              * single timeout per queue (entries are added to the end!).
              * This allows maintenance in O(1).
              */
-            if (timeout_time
-                    && cs->queue_timestamp + qp->timeout > timeout_time
-                    && cs->queue_timestamp < timeout_time + qp->timeout) {
-                /* Since this is the next expiring of this queue, update the
-                 * overall queues' next expiry if it's later than this one.
+            if (expiry && cs->queue_timestamp + qp->timeout > expiry
+                       && cs->queue_timestamp < expiry + qp->timeout) {
+                /* Since this is the next expiring entry of this queue, update
+                 * the global queues_next_expiry if it's later than this one.
                  */
-                apr_time_t q_expiry = cs->queue_timestamp + qp->timeout;
+                apr_time_t elem_expiry = cs->queue_timestamp + qp->timeout;
                 apr_time_t next_expiry = queues_next_expiry;
                 if (!next_expiry
-                        || next_expiry > q_expiry + TIMEOUT_FUDGE_FACTOR) {
-                    queues_next_expiry = q_expiry;
+                        || next_expiry > elem_expiry + TIMEOUT_FUDGE_FACTOR) {
+                    queues_next_expiry = elem_expiry;
                 }
                 break;
             }
@@ -1553,18 +1596,17 @@ static void process_timeout_queue(struct timeout_queue *q,
     apr_thread_mutex_lock(timeout_mutex);
 }
 
-static void process_keepalive_queue(apr_time_t timeout_time)
+static void process_keepalive_queue(apr_time_t expiry)
 {
     /* If all workers are busy, we kill older keep-alive connections so
      * that they may connect to another process.
      */
-    if (!timeout_time) {
+    if (!expiry && *keepalive_q->total) {
         ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
-                     "All workers are busy or dying, will close %u "
+                     "All workers are busy or dying, will shutdown %u "
                      "keep-alive connections", *keepalive_q->total);
     }
-    process_timeout_queue(keepalive_q, timeout_time,
-                          start_lingering_close_nonblocking);
+    process_timeout_queue(keepalive_q, expiry, shutdown_connection);
 }
 
 static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
@@ -1590,18 +1632,28 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
         timer_event_t *te;
         const apr_pollfd_t *out_pfd;
         apr_int32_t num = 0;
-        apr_interval_time_t timeout_interval;
-        apr_time_t now, timeout_time;
+        apr_interval_time_t timeout;
+        apr_time_t now, expiry = -1;
         int workers_were_busy = 0;
 
         if (conns_this_child <= 0)
             check_infinite_requests();
 
         if (listener_may_exit) {
-            close_listeners(&closed);
+            int first_close = close_listeners(&closed);
+
             if (terminate_mode == ST_UNGRACEFUL
                 || apr_atomic_read32(&connection_count) == 0)
                 break;
+
+            /* Don't wait in poll() for the first close (i.e. dying now), we
+             * want to maintain the queues and schedule defer_linger_chain ASAP
+             * to kill kept-alive connection and shutdown the workers and child
+             * faster.
+             */
+            if (first_close) {
+                goto do_maintenance; /* with expiry == -1 */
+            }
         }
 
         now = apr_time_now();
@@ -1615,8 +1667,8 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
                              "keep-alive: %d lingering: %d suspended: %u)",
                              apr_atomic_read32(&connection_count),
                              apr_atomic_read32(&clogged_count),
-                             *(volatile apr_uint32_t*)write_completion_q->total,
-                             *(volatile apr_uint32_t*)keepalive_q->total,
+                             apr_atomic_read32(write_completion_q->total),
+                             apr_atomic_read32(keepalive_q->total),
                              apr_atomic_read32(&lingering_count),
                              apr_atomic_read32(&suspended_count));
                 if (dying) {
@@ -1635,18 +1687,19 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
          * up occurs, otherwise periodic checks (maintenance, shutdown, ...)
          * must be performed.
          */
-        timeout_interval = -1;
+        now = apr_time_now();
+        timeout = -1;
 
         /* Push expired timers to a worker, the first remaining one determines
          * the maximum time to poll() below, if any.
          */
-        timeout_time = timers_next_expiry;
-        if (timeout_time && timeout_time < now + EVENT_FUDGE_FACTOR) {
+        expiry = timers_next_expiry;
+        if (expiry && expiry < now) {
             apr_thread_mutex_lock(g_timer_skiplist_mtx);
             while ((te = apr_skiplist_peek(timer_skiplist))) {
-                if (te->when > now + EVENT_FUDGE_FACTOR) {
+                if (te->when > now) {
                     timers_next_expiry = te->when;
-                    timeout_interval = te->when - now;
+                    timeout = te->when - now;
                     break;
                 }
                 apr_skiplist_pop(timer_skiplist, NULL);
@@ -1659,37 +1712,40 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
         }
 
         /* Same for queues, use their next expiry, if any. */
-        timeout_time = queues_next_expiry;
-        if (timeout_time
-                && (timeout_interval < 0
-                    || timeout_time <= now
-                    || timeout_interval > timeout_time - now)) {
-            timeout_interval = timeout_time > now ? timeout_time - now : 1;
+        expiry = queues_next_expiry;
+        if (expiry
+                && (timeout < 0
+                    || expiry <= now
+                    || timeout > expiry - now)) {
+            timeout = expiry > now ? expiry - now : 0;
         }
 
         /* When non-wakeable, don't wait more than 100 ms, in any case. */
 #define NON_WAKEABLE_POLL_TIMEOUT apr_time_from_msec(100)
         if (!listener_is_wakeable
-                && (timeout_interval < 0
-                    || timeout_interval > NON_WAKEABLE_POLL_TIMEOUT)) {
-            timeout_interval = NON_WAKEABLE_POLL_TIMEOUT;
+                && (timeout < 0
+                    || timeout > NON_WAKEABLE_POLL_TIMEOUT)) {
+            timeout = NON_WAKEABLE_POLL_TIMEOUT;
         }
+        else if (timeout > 0) {
+            /* apr_pollset_poll() might round down the timeout to milliseconds,
+             * let's forcibly round up here to never return before the timeout.
+             */
+            timeout = apr_time_from_msec(
+                apr_time_as_msec(timeout + apr_time_from_msec(1) - 1)
+            );
+        }
+
+        ap_log_error(APLOG_MARK, APLOG_TRACE7, 0, ap_server_conf,
+                     "polling with timeout=%" APR_TIME_T_FMT
+                     " queues_timeout=%" APR_TIME_T_FMT
+                     " timers_timeout=%" APR_TIME_T_FMT,
+                     timeout, queues_next_expiry - now,
+                     timers_next_expiry - now);
 
-        rc = apr_pollset_poll(event_pollset, timeout_interval, &num, &out_pfd);
+        rc = apr_pollset_poll(event_pollset, timeout, &num, &out_pfd);
         if (rc != APR_SUCCESS) {
-            if (APR_STATUS_IS_EINTR(rc)) {
-                /* Woken up, if we are exiting or listeners are disabled we
-                 * must fall through to kill kept-alive connections or test
-                 * whether listeners should be re-enabled. Otherwise we only
-                 * need to update timeouts (logic is above, so simply restart
-                 * the loop).
-                 */
-                if (!listener_may_exit && !listeners_disabled()) {
-                    continue;
-                }
-                timeout_time = 0;
-            }
-            else if (!APR_STATUS_IS_TIMEUP(rc)) {
+            if (!APR_STATUS_IS_EINTR(rc) && !APR_STATUS_IS_TIMEUP(rc)) {
                 ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf,
                              "apr_pollset_poll failed.  Attempting to "
                              "shutdown process gracefully");
@@ -1698,13 +1754,21 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
             num = 0;
         }
 
-        if (listener_may_exit) {
-            close_listeners(&closed);
-            if (terminate_mode == ST_UNGRACEFUL
-                || apr_atomic_read32(&connection_count) == 0)
-                break;
+        if (APLOGtrace7(ap_server_conf)) {
+            now = apr_time_now();
+            ap_log_error(APLOG_MARK, APLOG_TRACE7, rc, ap_server_conf,
+                         "polled with num=%u exit=%d/%d conns=%d"
+                         " queues_timeout=%" APR_TIME_T_FMT
+                         " timers_timeout=%" APR_TIME_T_FMT,
+                         num, listener_may_exit, dying,
+                         apr_atomic_read32(&connection_count),
+                         queues_next_expiry - now, timers_next_expiry - now);
         }
 
+        /* XXX possible optimization: stash the current time for use as
+         * r->request_time for new requests or queues maintenance
+         */
+
         for (; num; --num, ++out_pfd) {
             listener_poll_type *pt = (listener_poll_type *) out_pfd->client_data;
             if (pt->type == PT_CSD) {
@@ -1757,25 +1821,21 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
                         AP_DEBUG_ASSERT(0);
                         ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
                                      APLOGNO(03094) "pollset remove failed");
-                        start_lingering_close_nonblocking(cs);
+                        close_connection(cs);
+                        signal_threads(ST_GRACEFUL);
                         break;
                     }
 
                     /* If we don't get a worker immediately (nonblocking), we
                      * close the connection; the client can re-connect to a
                      * different process for keepalive, and for lingering close
-                     * the connection will be reset so the choice is to favor
+                     * the connection will be shutdown so the choice is to favor
                      * incoming/alive connections.
                      */
                     get_worker(&have_idle_worker, blocking,
                                &workers_were_busy);
                     if (!have_idle_worker) {
-                        if (remove_from_q == cs->sc->ka_q) {
-                            start_lingering_close_nonblocking(cs);
-                        }
-                        else {
-                            stop_lingering_close(cs);
-                        }
+                        shutdown_connection(cs);
                     }
                     else if (push2worker(cs, NULL, NULL) == APR_SUCCESS) {
                         have_idle_worker = 0;
@@ -1861,21 +1921,22 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
             }               /* if:else on pt->type */
         } /* for processing poll */
 
-        /* XXX possible optimization: stash the current time for use as
-         * r->request_time for new requests
-         */
-        /* We process the timeout queues here only when their overall next
-         * expiry (read once above) is over. This happens accurately since
+        /* We process the timeout queues here only when the global
+         * queues_next_expiry is passed. This happens accurately since
          * adding to the queues (in workers) can only decrease this expiry,
          * while latest ones are only taken into account here (in listener)
          * during queues' processing, with the lock held. This works both
          * with and without wake-ability.
          */
-        if (timeout_time && timeout_time < (now = apr_time_now())) {
-            /* handle timed out sockets */
+        expiry = queues_next_expiry;
+do_maintenance:
+        if (expiry && expiry < (now = apr_time_now())) {
+            ap_log_error(APLOG_MARK, APLOG_TRACE7, 0, ap_server_conf,
+                         "queues maintenance with timeout=%" APR_TIME_T_FMT,
+                         expiry > 0 ? expiry - now : -1);
             apr_thread_mutex_lock(timeout_mutex);
 
-            /* Processing all the queues below will recompute this. */
+            /* Steps below will recompute this. */
             queues_next_expiry = 0;
 
             /* Step 1: keepalive timeouts */
@@ -1887,24 +1948,30 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
             }
             /* Step 2: write completion timeouts */
             process_timeout_queue(write_completion_q, now,
-                                  start_lingering_close_nonblocking);
+                                  defer_lingering_close);
             /* Step 3: (normal) lingering close completion timeouts */
-            process_timeout_queue(linger_q, now,
-                                  stop_lingering_close);
+            if (dying && linger_q->timeout > short_linger_q->timeout) {
+                /* Dying, force short timeout for normal lingering close */
+                linger_q->timeout = short_linger_q->timeout;
+            }
+            process_timeout_queue(linger_q, now, shutdown_connection);
             /* Step 4: (short) lingering close completion timeouts */
-            process_timeout_queue(short_linger_q, now,
-                                  stop_lingering_close);
+            process_timeout_queue(short_linger_q, now, shutdown_connection);
 
             apr_thread_mutex_unlock(timeout_mutex);
+            ap_log_error(APLOG_MARK, APLOG_TRACE7, 0, ap_server_conf,
+                         "queues maintained with timeout=%" APR_TIME_T_FMT,
+                         queues_next_expiry > now ? queues_next_expiry - now
+                                                  : -1);
 
-            ps->keep_alive = *(volatile apr_uint32_t*)keepalive_q->total;
-            ps->write_completion = *(volatile apr_uint32_t*)write_completion_q->total;
+            ps->keep_alive = apr_atomic_read32(keepalive_q->total);
+            ps->write_completion = apr_atomic_read32(write_completion_q->total);
             ps->connections = apr_atomic_read32(&connection_count);
             ps->suspended = apr_atomic_read32(&suspended_count);
             ps->lingering_close = apr_atomic_read32(&lingering_count);
         }
         else if ((workers_were_busy || dying)
-                 && *(volatile apr_uint32_t*)keepalive_q->total) {
+                 && apr_atomic_read32(keepalive_q->total)) {
             apr_thread_mutex_lock(timeout_mutex);
             process_keepalive_queue(0); /* kill'em all \m/ */
             apr_thread_mutex_unlock(timeout_mutex);
@@ -1936,7 +2003,6 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
         }
     } /* listener main loop */
 
-    close_listeners(&closed);
     ap_queue_term(worker_queue);
 
     apr_thread_exit(thd, APR_SUCCESS);
@@ -2084,15 +2150,9 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy)
                 continue;
             }
             cs->chain = NULL;
+            AP_DEBUG_ASSERT(cs->pub.state == CONN_STATE_LINGER);
 
             worker_sockets[thread_slot] = csd = cs->pfd.desc.s;
-#ifdef AP_DEBUG
-            rv = apr_socket_timeout_set(csd, SECONDS_TO_LINGER);
-            AP_DEBUG_ASSERT(rv == APR_SUCCESS);
-#else
-            apr_socket_timeout_set(csd, SECONDS_TO_LINGER);
-#endif
-            cs->pub.state = CONN_STATE_LINGER;
             process_socket(thd, cs->p, csd, cs, process_slot, thread_slot);
             worker_sockets[thread_slot] = NULL;
         }
@@ -2258,7 +2318,7 @@ static void setup_threads_runtime(void)
         AP_DEBUG_ASSERT(i < num_listensocks);
         pfd = &listener_pollfd[i];
 
-        pfd->reqevents = APR_POLLIN;
+        pfd->reqevents = APR_POLLIN | APR_POLLHUP | APR_POLLERR;
         pfd->desc_type = APR_POLL_SOCKET;
         pfd->desc.s = lr->sd;
 
@@ -2389,13 +2449,17 @@ static void join_workers(apr_thread_t * listener, apr_thread_t ** threads)
          */
 
         iter = 0;
-        while (iter < 10 && !dying) {
+        while (!dying) {
+            apr_sleep(apr_time_from_msec(500));
+            if (dying || ++iter > 10) {
+                break;
+            }
             /* listener has not stopped accepting yet */
-            apr_sleep(apr_time_make(0, 500000));
+            ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
+                         "listener has not stopped accepting yet (%d iter)", iter);
             wakeup_listener();
-            ++iter;
         }
-        if (iter >= 10) {
+        if (iter > 10) {
             ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(00475)
                          "the listener thread didn't stop accepting");
         }
@@ -2604,7 +2668,13 @@ static void child_main(int child_num_arg, int child_bucket)
          *   If the worker hasn't exited, then this blocks until
          *   they have (then cleans up).
          */
+        ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
+                     "%s termination received, joining workers",
+                     rv == AP_MPM_PODX_GRACEFUL ? "graceful" : "ungraceful");
         join_workers(ts->listener, threads);
+        ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
+                     "%s termination, workers joined, exiting",
+                     rv == AP_MPM_PODX_GRACEFUL ? "graceful" : "ungraceful");
     }
 
     free(threads);
index c81245078ab509b85eb771ca75a437774860cd79..3697ca722f621ab081acaea7e3927e6a9d9e8359 100644 (file)
@@ -493,6 +493,10 @@ static apr_status_t queue_interrupt(fd_queue_t *queue, int all, int term)
 {
     apr_status_t rv;
 
+    if (queue->terminated) {
+        return APR_EOF;
+    }
+
     if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
         return rv;
     }
index 9aeedde30da675780a04c27182afa603bad9a78a..1047f889f0a8af5c577f885644d3839aea1cae55 100644 (file)
@@ -83,7 +83,7 @@ struct fd_queue_t
     unsigned int out;
     apr_thread_mutex_t *one_big_mutex;
     apr_thread_cond_t *not_empty;
-    int terminated;
+    volatile int terminated;
 };
 typedef struct fd_queue_t fd_queue_t;