]> git.ipfire.org Git - thirdparty/apache/httpd.git/commitdiff
event: Avoid possible blocking in the listener thread when shutting down
authorYann Ylavic <ylavic@apache.org>
Mon, 24 Jul 2017 23:19:06 +0000 (23:19 +0000)
committerYann Ylavic <ylavic@apache.org>
Mon, 24 Jul 2017 23:19:06 +0000 (23:19 +0000)
connections. PR 60956.

start_lingering_close_nonblocking() now puts connections in defer_linger_chain
which is emptied by any worker thread (all atomically) after its usual work,
hence any possibly blocking flush and lingering close run outside the listener.

The listener may create a dedicated worker if it fills defer_linger_chain or
while it's not empty, calling push2worker with a NULL cs.

The state machine in process_socket() is slighly modified to be able to enter
with CONN_STATE_LINGER directly w/o clogging_input_filters to interfer.

New abort_socket_nonblocking() allows to reset connections when nonblocking is
required and we can't do much about the connection anymore, nor we want the
system to linger on its own after close().

Many thanks to Stefan Priebe for his heavy testing on many event's changes!

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1802875 13f79535-47bb-0310-9956-ffa450edef68

CHANGES
server/mpm/event/event.c

diff --git a/CHANGES b/CHANGES
index 86c250e53c31d01eb04df17eec2ddaf679f8de0c..930504d62f1f2fbaa0e72597b8b8f513cac6589d 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,9 @@
                                                          -*- coding: utf-8 -*-
 Changes with Apache 2.5.0
 
+  *) event: Avoid possible blocking in the listener thread when shutting down
+     connections. PR 60956.  [Yann Ylavic]
+
   *) mod_proxy_fcgi: Add the support for mod_proxy's flushpackets and flushwait
      parameters. [Luca Toscano, Ruediger Pluem, Yann Ylavic]
 
index c7980435ed5da3a744cb676521b2dcfff33fa116..d15beefacb849dc8868fe9ae46e6db14496262fb 100644 (file)
@@ -219,6 +219,12 @@ static apr_pollfd_t *listener_pollfd;
  */
 static apr_pollset_t *event_pollset;
 
+/*
+ * The chain of connections to be shutdown by a worker thread (deferred),
+ * linked list updated atomically.
+ */
+static event_conn_state_t *volatile defer_linger_chain;
+
 struct event_conn_state_t {
     /** APR_RING of expiration timeouts */
     APR_RING_ENTRY(event_conn_state_t) timeout_list;
@@ -245,7 +251,10 @@ struct event_conn_state_t {
     apr_pollfd_t pfd;
     /** public parts of the connection state */
     conn_state_t pub;
+    /** chaining in defer_linger_chain */
+    struct event_conn_state_t *chain;
 };
+
 APR_RING_HEAD(timeout_head_t, event_conn_state_t);
 
 struct timeout_queue {
@@ -505,14 +514,55 @@ static void enable_listensocks(int process_slot)
     ap_scoreboard_image->parent[process_slot].not_accepting = 0;
 }
 
+static void abort_socket_nonblocking(apr_socket_t *csd)
+{
+    apr_status_t rv;
+    apr_socket_timeout_set(csd, 0);
+#if defined(SOL_SOCKET) && defined(SO_LINGER)
+    /* This socket is over now, and we don't want to block nor linger
+     * anymore, so reset it. A normal close could still linger in the
+     * system, while RST is fast, nonblocking, and what the peer will
+     * get if it sends us further data anyway.
+     */
+    {
+        apr_os_sock_t osd = -1;
+        struct linger opt;
+        opt.l_onoff = 1;
+        opt.l_linger = 0; /* zero timeout is RST */
+        apr_os_sock_get(&osd, csd);
+        setsockopt(osd, SOL_SOCKET, SO_LINGER, (void *)&opt, sizeof opt);
+    }
+#endif
+    rv = apr_socket_close(csd);
+    if (rv != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(00468)
+                     "error closing socket");
+        AP_DEBUG_ASSERT(0);
+    }
+}
+
 static void close_worker_sockets(void)
 {
     int i;
     for (i = 0; i < threads_per_child; i++) {
-        if (worker_sockets[i]) {
-            apr_socket_close(worker_sockets[i]);
+        apr_socket_t *csd = worker_sockets[i];
+        if (csd) {
             worker_sockets[i] = NULL;
+            abort_socket_nonblocking(csd);
+        }
+    }
+    for (;;) {
+        event_conn_state_t *cs = defer_linger_chain;
+        if (!cs) {
+            break;
+        }
+        if (apr_atomic_casptr((void *)&defer_linger_chain, cs->chain,
+                              cs) != cs) {
+            /* Race lost, try again */
+            continue;
         }
+        cs->chain = NULL;
+        abort_socket_nonblocking(cs->pfd.desc.s);
     }
 }
 
@@ -733,11 +783,27 @@ static void notify_resume(event_conn_state_t *cs, int cleanup)
     ap_run_resume_connection(cs->c, cs->r);
 }
 
-static int start_lingering_close_common(event_conn_state_t *cs, int in_worker)
+/*
+ * Close our side of the connection, flushing data to the client first.
+ * Pre-condition: cs is not in any timeout queue and not in the pollset,
+ *                timeout_mutex is not locked
+ * return: 0 if connection is fully closed,
+ *         1 if connection is lingering
+ * May only be called by worker thread.
+ */
+static int start_lingering_close_blocking(event_conn_state_t *cs)
 {
     apr_status_t rv;
     struct timeout_queue *q;
     apr_socket_t *csd = cs->pfd.desc.s;
+
+    if (ap_start_lingering_close(cs->c)) {
+        notify_suspend(cs);
+        apr_socket_close(csd);
+        ap_push_pool(worker_queue_info, cs->p);
+        return 0;
+    }
+
 #ifdef AP_DEBUG
     {
         rv = apr_socket_timeout_set(csd, 0);
@@ -746,6 +812,7 @@ static int start_lingering_close_common(event_conn_state_t *cs, int in_worker)
 #else
     apr_socket_timeout_set(csd, 0);
 #endif
+
     cs->queue_timestamp = apr_time_now();
     /*
      * If some module requested a shortened waiting period, only wait for
@@ -761,12 +828,8 @@ static int start_lingering_close_common(event_conn_state_t *cs, int in_worker)
         cs->pub.state = CONN_STATE_LINGER_NORMAL;
     }
     apr_atomic_inc32(&lingering_count);
-    if (in_worker) { 
-        notify_suspend(cs);
-    }
-    else {
-        cs->c->sbh = NULL;
-    }
+    notify_suspend(cs);
+
     cs->pfd.reqevents = (
             cs->pub.sense == CONN_SENSE_WANT_WRITE ? APR_POLLOUT :
                     APR_POLLIN) | APR_POLLHUP | APR_POLLERR;
@@ -789,49 +852,25 @@ static int start_lingering_close_common(event_conn_state_t *cs, int in_worker)
 }
 
 /*
- * Close our side of the connection, flushing data to the client first.
- * Pre-condition: cs is not in any timeout queue and not in the pollset,
- *                timeout_mutex is not locked
- * return: 0 if connection is fully closed,
- *         1 if connection is lingering
- * May only be called by worker thread.
- */
-static int start_lingering_close_blocking(event_conn_state_t *cs)
-{
-    if (ap_start_lingering_close(cs->c)) {
-        notify_suspend(cs);
-        ap_push_pool(worker_queue_info, cs->p);
-        return 0;
-    }
-    return start_lingering_close_common(cs, 1);
-}
-
-/*
- * Close our side of the connection, NOT flushing data to the client.
- * This should only be called if there has been an error or if we know
- * that our send buffers are empty.
+ * Defer flush and close of the connection by adding it to defer_linger_chain,
+ * for a worker to grab it and do the job (should that be blocking).
  * Pre-condition: cs is not in any timeout queue and not in the pollset,
  *                timeout_mutex is not locked
- * return: 0 if connection is fully closed,
- *         1 if connection is lingering
- * may be called by listener thread
+ * return: 1 connection is alive (but aside and about to linger)
+ * May be called by listener thread.
  */
 static int start_lingering_close_nonblocking(event_conn_state_t *cs)
 {
-    conn_rec *c = cs->c;
-    apr_socket_t *csd = cs->pfd.desc.s;
-
-    if (ap_prep_lingering_close(c)
-        || c->aborted
-        || ap_shutdown_conn(c, 0) != APR_SUCCESS || c->aborted
-        || apr_socket_shutdown(csd, APR_SHUTDOWN_WRITE) != APR_SUCCESS) {
-        apr_socket_close(csd);
-        ap_push_pool(worker_queue_info, cs->p);
-        if (dying)
-            ap_queue_interrupt_one(worker_queue);
-        return 0;
+    event_conn_state_t *chain;
+    for (;;) {
+        cs->chain = chain = defer_linger_chain;
+        if (apr_atomic_casptr((void *)&defer_linger_chain, cs,
+                              chain) != chain) {
+            /* Race lost, try again */
+            continue;
+        }
+        return 1;
     }
-    return start_lingering_close_common(cs, 0);
 }
 
 /*
@@ -842,15 +881,10 @@ static int start_lingering_close_nonblocking(event_conn_state_t *cs)
  */
 static int stop_lingering_close(event_conn_state_t *cs)
 {
-    apr_status_t rv;
     apr_socket_t *csd = ap_get_conn_socket(cs->c);
     ap_log_error(APLOG_MARK, APLOG_TRACE4, 0, ap_server_conf,
                  "socket reached timeout in lingering-close state");
-    rv = apr_socket_close(csd);
-    if (rv != APR_SUCCESS) {
-        ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(00468) "error closing socket");
-        AP_DEBUG_ASSERT(0);
-    }
+    abort_socket_nonblocking(csd);
     ap_push_pool(worker_queue_info, cs->p);
     if (dying)
         ap_queue_interrupt_one(worker_queue);
@@ -997,9 +1031,16 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc
         c->current_thread = thd;
         /* Subsequent request on a conn, and thread number is part of ID */
         c->id = conn_id;
+
+        if (c->aborted) {
+            cs->pub.state = CONN_STATE_LINGER;
+        }
     }
 
-    if (c->clogging_input_filters && !c->aborted) {
+    if (cs->pub.state == CONN_STATE_LINGER) {
+        /* do lingering close below */
+    }
+    else if (c->clogging_input_filters) {
         /* Since we have an input filter which 'clogs' the input stream,
          * like mod_ssl used to, lets just do the normal read from input
          * filters, like the Worker MPM does. Filters that need to write
@@ -1013,20 +1054,14 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc
         }
         apr_atomic_dec32(&clogged_count);
     }
-
+    else if (cs->pub.state == CONN_STATE_READ_REQUEST_LINE) {
 read_request:
-    if (cs->pub.state == CONN_STATE_READ_REQUEST_LINE) {
-        if (!c->aborted) {
-            ap_run_process_connection(c);
+        ap_run_process_connection(c);
 
-            /* state will be updated upon return
-             * fall thru to either wait for readability/timeout or
-             * do lingering close
-             */
-        }
-        else {
-            cs->pub.state = CONN_STATE_LINGER;
-        }
+        /* state will be updated upon return
+         * fall thru to either wait for readability/timeout or
+         * do lingering close
+         */
     }
 
     if (cs->pub.state == CONN_STATE_WRITE_COMPLETION) {
@@ -1067,7 +1102,7 @@ read_request:
             return;
         }
         else if (c->keepalive != AP_CONN_KEEPALIVE || c->aborted ||
-            listener_may_exit) {
+                 listener_may_exit) {
             cs->pub.state = CONN_STATE_LINGER;
         }
         else if (c->data_in_input_filters || ap_run_input_pending(c) == OK) {
@@ -1287,26 +1322,32 @@ static apr_status_t push_timer2worker(timer_event_t* te)
 }
 
 /*
- * Pre-condition: pfd->cs is neither in pollset nor timeout queue
+ * Pre-condition: cs is neither in event_pollset nor a timeout queue
  * this function may only be called by the listener
  */
-static apr_status_t push2worker(const apr_pollfd_t * pfd,
-                                apr_pollset_t * pollset)
+static apr_status_t push2worker(event_conn_state_t *cs, apr_socket_t *csd,
+                                apr_pool_t *ptrans)
 {
-    listener_poll_type *pt = (listener_poll_type *) pfd->client_data;
-    event_conn_state_t *cs = (event_conn_state_t *) pt->baton;
     apr_status_t rc;
 
-    rc = ap_queue_push(worker_queue, cs->pfd.desc.s, cs, cs->p);
+    if (cs) {
+        csd = cs->pfd.desc.s;
+        ptrans = cs->p;
+    }
+    rc = ap_queue_push(worker_queue, csd, cs, ptrans);
     if (rc != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf, APLOGNO(00471)
+                     "push2worker: ap_queue_push failed");
         /* trash the connection; we couldn't queue the connected
          * socket to a worker
          */
-        apr_bucket_alloc_destroy(cs->bucket_alloc);
-        apr_socket_close(cs->pfd.desc.s);
-        ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
-                     ap_server_conf, APLOGNO(00471) "push2worker: ap_queue_push failed");
-        ap_push_pool(worker_queue_info, cs->p);
+        if (csd) {
+            abort_socket_nonblocking(csd);
+        }
+        if (ptrans) {
+            ap_push_pool(worker_queue_info, ptrans);
+        }
+        signal_threads(ST_GRACEFUL);
     }
 
     return rc;
@@ -1861,6 +1902,8 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
                     TO_QUEUE_REMOVE(remove_from_q, cs);
                     rc = apr_pollset_remove(event_pollset, &cs->pfd);
                     apr_thread_mutex_unlock(timeout_mutex);
+                    TO_QUEUE_ELEM_INIT(cs);
+
                     /*
                      * Some of the pollset backends, like KQueue or Epoll
                      * automagically remove the FD if the socket is closed,
@@ -1874,29 +1917,23 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
                         break;
                     }
 
-                    TO_QUEUE_ELEM_INIT(cs);
                     /* If we didn't get a worker immediately for a keep-alive
                      * request, we close the connection, so that the client can
                      * re-connect to a different process.
                      */
                     if (!have_idle_worker) {
                         start_lingering_close_nonblocking(cs);
-                        break;
-                    }
-                    rc = push2worker(out_pfd, event_pollset);
-                    if (rc != APR_SUCCESS) {
-                        ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
-                                     ap_server_conf, APLOGNO(03095)
-                                     "push2worker failed");
                     }
-                    else {
+                    else if (push2worker(cs, NULL, NULL) == APR_SUCCESS) {
                         have_idle_worker = 0;
                     }
                     break;
+
                 case CONN_STATE_LINGER_NORMAL:
                 case CONN_STATE_LINGER_SHORT:
                     process_lingering_close(cs, out_pfd);
                     break;
+
                 default:
                     ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
                                  ap_server_conf, APLOGNO(03096)
@@ -1980,18 +2017,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
 
                     if (csd != NULL) {
                         conns_this_child--;
-                        rc = ap_queue_push(worker_queue, csd, NULL, ptrans);
-                        if (rc != APR_SUCCESS) {
-                            /* trash the connection; we couldn't queue the connected
-                             * socket to a worker
-                             */
-                            apr_socket_close(csd);
-                            ap_log_error(APLOG_MARK, APLOG_CRIT, rc,
-                                         ap_server_conf, APLOGNO(03098)
-                                         "ap_queue_push failed");
-                            ap_push_pool(worker_queue_info, ptrans);
-                        }
-                        else {
+                        if (push2worker(NULL, csd, ptrans) == APR_SUCCESS) {
                             have_idle_worker = 0;
                         }
                     }
@@ -2090,6 +2116,24 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
             ps->keep_alive = 0;
         }
 
+        /* If there are some lingering closes to defer (to a worker), schedule
+         * them now. We might wakeup a worker spuriously if another one empties
+         * defer_linger_chain in the meantime, but there also may be no active
+         * or all busy workers for an undefined time.  In any case a deferred
+         * lingering close can't starve if we do that here since the chain is
+         * filled only above in the listener and it's emptied only in the
+         * worker(s); thus a NULL here means it will stay so while the listener
+         * waits (possibly indefinitely) in poll().
+         */
+        if (defer_linger_chain) {
+            get_worker(&have_idle_worker, 0, &workers_were_busy);
+            if (have_idle_worker
+                    && defer_linger_chain /* re-test */
+                    && push2worker(NULL, NULL, NULL) == APR_SUCCESS) {
+                have_idle_worker = 0;
+            }
+        }
+
         if (listeners_disabled && !workers_were_busy
             && ((c_count = apr_atomic_read32(&connection_count))
                     >= (l_count = apr_atomic_read32(&lingering_count))
@@ -2235,8 +2279,35 @@ static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy)
         }
         else {
             is_idle = 0;
-            worker_sockets[thread_slot] = csd;
-            process_socket(thd, ptrans, csd, cs, process_slot, thread_slot);
+            if (csd != NULL) {
+                worker_sockets[thread_slot] = csd;
+                process_socket(thd, ptrans, csd, cs, process_slot, thread_slot);
+                worker_sockets[thread_slot] = NULL;
+            }
+        }
+
+        /* If there are deferred lingering closes, handle them now. */
+        while (!workers_may_exit) {
+            cs = defer_linger_chain;
+            if (!cs) {
+                break;
+            }
+            if (apr_atomic_casptr((void *)&defer_linger_chain, cs->chain,
+                                  cs) != cs) {
+                /* Race lost, try again */
+                continue;
+            }
+            cs->chain = NULL;
+
+            worker_sockets[thread_slot] = csd = cs->pfd.desc.s;
+#ifdef AP_DEBUG
+            rv = apr_socket_timeout_set(csd, SECONDS_TO_LINGER);
+            AP_DEBUG_ASSERT(rv == APR_SUCCESS);
+#else
+            apr_socket_timeout_set(csd, SECONDS_TO_LINGER);
+#endif
+            cs->pub.state = CONN_STATE_LINGER;
+            process_socket(thd, cs->p, csd, cs, process_slot, thread_slot);
             worker_sockets[thread_slot] = NULL;
         }
     }
@@ -3497,6 +3568,7 @@ static int event_pre_config(apr_pool_t * pconf, apr_pool_t * plog,
     active_daemons_limit = server_limit;
     threads_per_child = DEFAULT_THREADS_PER_CHILD;
     max_workers = active_daemons_limit * threads_per_child;
+    defer_linger_chain = NULL;
     had_healthy_child = 0;
     ap_extended_status = 0;