]> git.ipfire.org Git - thirdparty/apache/httpd.git/commitdiff
Add a "queue_info" structure to the worker MPM. This is used to prevent
authorAaron Bannert <aaron@apache.org>
Sun, 28 Apr 2002 01:45:00 +0000 (01:45 +0000)
committerAaron Bannert <aaron@apache.org>
Sun, 28 Apr 2002 01:45:00 +0000 (01:45 +0000)
the listener thread from accept()ing more connections than there are
available workers. This prevents long-running requests from starving
connections that have been accepted but not yet processed.

The queue_info is a simple counter, mutex, and condition variable. Only
the listener thread blocks on the condition, and only when there are no
idle workers. In the fast path there is a mutex lock, integer decrement,
and and unlock (among a few conditionals). The worker threads each notify
the queue_info when they are about to block on the normal worker_queue
waiting for some connection to process, which wakes up any sleeping
listener thread to go perform another accept() in parallel.

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@94824 13f79535-47bb-0310-9956-ffa450edef68

server/mpm/worker/fdqueue.c
server/mpm/worker/fdqueue.h
server/mpm/worker/worker.c

index cc9acb3eab0b607e87e97e815ed9043ce32ae06a..e5d20747fba025eaecf0d79f95a7a0026a9a470d 100644 (file)
 
 #include "fdqueue.h"
 
+struct fd_queue_info_t {
+    int idlers;
+    apr_thread_mutex_t *idlers_mutex;
+    apr_thread_cond_t *wait_for_idler;
+    int terminated;
+};
+
+static apr_status_t queue_info_cleanup(void *data_)
+{
+    fd_queue_info_t *qi = data_;
+    apr_thread_cond_destroy(qi->wait_for_idler);
+    apr_thread_mutex_destroy(qi->idlers_mutex);
+    return APR_SUCCESS;
+}
+
+apr_status_t ap_queue_info_create(fd_queue_info_t **queue_info,
+                                  apr_pool_t *pool)
+{
+    apr_status_t rv;
+    fd_queue_info_t *qi;
+
+    qi = apr_palloc(pool, sizeof(*qi));
+    memset(qi, 0, sizeof(*qi));
+
+    rv = apr_thread_mutex_create(&qi->idlers_mutex, APR_THREAD_MUTEX_DEFAULT,
+                                 pool);
+    if (rv != APR_SUCCESS) {
+        return rv;
+    }
+    rv = apr_thread_cond_create(&qi->wait_for_idler, pool);
+    if (rv != APR_SUCCESS) {
+        return rv;
+    }
+    apr_pool_cleanup_register(pool, qi, queue_info_cleanup,
+                              apr_pool_cleanup_null);
+
+    *queue_info = qi;
+
+    return APR_SUCCESS;
+}
+
+apr_status_t ap_queue_info_set_idle(fd_queue_info_t *queue_info)
+{
+    apr_status_t rv;
+    rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
+    if (rv != APR_SUCCESS) {
+        return rv;
+    }
+    AP_DEBUG_ASSERT(queue_info->idlers >= 0);
+    if (queue_info->idlers++ == 0) {
+        /* Only signal if we had no idlers before. */
+        apr_thread_cond_signal(queue_info->wait_for_idler);
+    }
+    rv = apr_thread_mutex_unlock(queue_info->idlers_mutex);
+    if (rv != APR_SUCCESS) {
+        return rv;
+    }
+    return APR_SUCCESS;
+}
+
+apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t *queue_info)
+{
+    apr_status_t rv;
+    rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
+    if (rv != APR_SUCCESS) {
+        return rv;
+    }
+    AP_DEBUG_ASSERT(queue_info->idlers >= 0);
+    while ((queue_info->idlers == 0) && (!queue_info->terminated)) {
+        rv = apr_thread_cond_wait(queue_info->wait_for_idler,
+                                  queue_info->idlers_mutex);
+        if (rv != APR_SUCCESS) {
+            rv = apr_thread_mutex_unlock(queue_info->idlers_mutex);
+            if (rv != APR_SUCCESS) {
+                return rv;
+            }
+            return rv;
+        }
+    }
+    queue_info->idlers--; /* Oh, and idler? Let's take 'em! */
+    rv = apr_thread_mutex_unlock(queue_info->idlers_mutex);
+    if (rv != APR_SUCCESS) {
+        return rv;
+    }
+    else if (queue_info->terminated) {
+        return APR_EOF;
+    }
+    else {
+        return APR_SUCCESS;
+    }
+}
+
+apr_status_t ap_queue_info_term(fd_queue_info_t *queue_info)
+{
+    apr_status_t rv;
+    rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
+    if (rv != APR_SUCCESS) {
+        return rv;
+    }
+    queue_info->terminated = 1;
+    apr_thread_cond_broadcast(queue_info->wait_for_idler);
+    rv = apr_thread_mutex_unlock(queue_info->idlers_mutex);
+    if (rv != APR_SUCCESS) {
+        return rv;
+    }
+    return APR_SUCCESS;
+}
+
 /**
  * Detects when the fd_queue_t is full. This utility function is expected
  * to be called from within critical sections, and is not threadsafe.
index cdd306100a615ca148a7535d47751dd4fb248644..c7d5cab1c741a528bb12b4496befee17110f95f9 100644 (file)
 #endif
 #include <apr_errno.h>
 
+typedef struct fd_queue_info_t fd_queue_info_t;
+
+apr_status_t ap_queue_info_create(fd_queue_info_t **queue_info,
+                                  apr_pool_t *pool);
+apr_status_t ap_queue_info_set_idle(fd_queue_info_t *queue_info);
+apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t *queue_info);
+apr_status_t ap_queue_info_term(fd_queue_info_t *queue_info);
+
 struct fd_queue_elem_t {
     apr_socket_t      *sd;
     apr_pool_t        *p;
index 8d77ad68d46e3d9da41305a8c9365f6e1427c5c7..378d8a56da78039f6a30d7b682f871f55f8950d7 100644 (file)
@@ -173,6 +173,7 @@ static int requests_this_child;
 static int num_listensocks = 0;
 static int resource_shortage = 0;
 static fd_queue_t *worker_queue;
+static fd_queue_info_t *worker_queue_info;
 
 /* The structure used to pass unique initialization info to each thread */
 typedef struct {
@@ -299,6 +300,7 @@ static void signal_threads(int mode)
     if (mode == ST_UNGRACEFUL) {
         workers_may_exit = 1;
         ap_queue_interrupt_all(worker_queue);
+        ap_queue_info_term(worker_queue_info);
     }
 }
 
@@ -693,6 +695,20 @@ static void *listener_thread(apr_thread_t *thd, void * dummy)
         }
         if (listener_may_exit) break;
 
+        rv = ap_queue_info_wait_for_idler(worker_queue_info);
+        if (APR_STATUS_IS_EOF(rv)) {
+            break; /* we've been signaled to die now */
+        }
+        else if (rv != APR_SUCCESS) {
+            ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+                         "apr_queue_info_wait failed. Attempting to shutdown "
+                         "process gracefully.");
+            signal_threads(ST_GRACEFUL);
+            break;
+        }
+        /* We've already decremented the idle worker count inside
+         * ap_queue_info_wait_for_idler. */
+
         if ((rv = SAFE_ACCEPT(apr_proc_mutex_lock(accept_mutex)))
             != APR_SUCCESS) {
             int level = APLOG_EMERG;
@@ -851,6 +867,15 @@ static void * APR_THREAD_FUNC worker_thread(apr_thread_t *thd, void * dummy)
     bucket_alloc = apr_bucket_alloc_create(apr_thread_pool_get(thd));
 
     while (!workers_may_exit) {
+        rv = ap_queue_info_set_idle(worker_queue_info);
+        if (rv != APR_SUCCESS) {
+            ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+                         "ap_queue_info_set_idle failed. Attempting to "
+                         "shutdown process gracefully.");
+            signal_threads(ST_GRACEFUL);
+            break;
+        }
+
         ap_update_child_status_from_indexes(process_slot, thread_slot, SERVER_READY, NULL);
         rv = ap_queue_pop(worker_queue, &csd, &ptrans, last_ptrans);
         last_ptrans = NULL;
@@ -961,6 +986,13 @@ static void * APR_THREAD_FUNC start_threads(apr_thread_t *thd, void *dummy)
         clean_child_exit(APEXIT_CHILDFATAL);
     }
 
+    rv = ap_queue_info_create(&worker_queue_info, pchild);
+    if (rv != APR_SUCCESS) {
+        ap_log_error(APLOG_MARK, APLOG_ALERT, rv, ap_server_conf,
+                     "ap_queue_info_create() failed");
+        clean_child_exit(APEXIT_CHILDFATAL);
+    }
+
     loops = prev_threads_created = 0;
     while (1) {
         /* ap_threads_per_child does not include the listener thread */