From: Aaron Bannert Date: Sun, 28 Apr 2002 01:45:00 +0000 (+0000) Subject: Add a "queue_info" structure to the worker MPM. This is used to prevent X-Git-Tag: 2.0.36~61 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=23fa964736b5434cc2450447ce526323fd9f3323;p=thirdparty%2Fapache%2Fhttpd.git Add a "queue_info" structure to the worker MPM. This is used to prevent the listener thread from accept()ing more connections than there are available workers. This prevents long-running requests from starving connections that have been accepted but not yet processed. The queue_info is a simple counter, mutex, and condition variable. Only the listener thread blocks on the condition, and only when there are no idle workers. In the fast path there is a mutex lock, integer decrement, and and unlock (among a few conditionals). The worker threads each notify the queue_info when they are about to block on the normal worker_queue waiting for some connection to process, which wakes up any sleeping listener thread to go perform another accept() in parallel. git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@94824 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/server/mpm/worker/fdqueue.c b/server/mpm/worker/fdqueue.c index cc9acb3eab0..e5d20747fba 100644 --- a/server/mpm/worker/fdqueue.c +++ b/server/mpm/worker/fdqueue.c @@ -58,6 +58,114 @@ #include "fdqueue.h" +struct fd_queue_info_t { + int idlers; + apr_thread_mutex_t *idlers_mutex; + apr_thread_cond_t *wait_for_idler; + int terminated; +}; + +static apr_status_t queue_info_cleanup(void *data_) +{ + fd_queue_info_t *qi = data_; + apr_thread_cond_destroy(qi->wait_for_idler); + apr_thread_mutex_destroy(qi->idlers_mutex); + return APR_SUCCESS; +} + +apr_status_t ap_queue_info_create(fd_queue_info_t **queue_info, + apr_pool_t *pool) +{ + apr_status_t rv; + fd_queue_info_t *qi; + + qi = apr_palloc(pool, sizeof(*qi)); + memset(qi, 0, sizeof(*qi)); + + rv = apr_thread_mutex_create(&qi->idlers_mutex, APR_THREAD_MUTEX_DEFAULT, + pool); + if (rv != APR_SUCCESS) { + return rv; + } + rv = apr_thread_cond_create(&qi->wait_for_idler, pool); + if (rv != APR_SUCCESS) { + return rv; + } + apr_pool_cleanup_register(pool, qi, queue_info_cleanup, + apr_pool_cleanup_null); + + *queue_info = qi; + + return APR_SUCCESS; +} + +apr_status_t ap_queue_info_set_idle(fd_queue_info_t *queue_info) +{ + apr_status_t rv; + rv = apr_thread_mutex_lock(queue_info->idlers_mutex); + if (rv != APR_SUCCESS) { + return rv; + } + AP_DEBUG_ASSERT(queue_info->idlers >= 0); + if (queue_info->idlers++ == 0) { + /* Only signal if we had no idlers before. */ + apr_thread_cond_signal(queue_info->wait_for_idler); + } + rv = apr_thread_mutex_unlock(queue_info->idlers_mutex); + if (rv != APR_SUCCESS) { + return rv; + } + return APR_SUCCESS; +} + +apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t *queue_info) +{ + apr_status_t rv; + rv = apr_thread_mutex_lock(queue_info->idlers_mutex); + if (rv != APR_SUCCESS) { + return rv; + } + AP_DEBUG_ASSERT(queue_info->idlers >= 0); + while ((queue_info->idlers == 0) && (!queue_info->terminated)) { + rv = apr_thread_cond_wait(queue_info->wait_for_idler, + queue_info->idlers_mutex); + if (rv != APR_SUCCESS) { + rv = apr_thread_mutex_unlock(queue_info->idlers_mutex); + if (rv != APR_SUCCESS) { + return rv; + } + return rv; + } + } + queue_info->idlers--; /* Oh, and idler? Let's take 'em! */ + rv = apr_thread_mutex_unlock(queue_info->idlers_mutex); + if (rv != APR_SUCCESS) { + return rv; + } + else if (queue_info->terminated) { + return APR_EOF; + } + else { + return APR_SUCCESS; + } +} + +apr_status_t ap_queue_info_term(fd_queue_info_t *queue_info) +{ + apr_status_t rv; + rv = apr_thread_mutex_lock(queue_info->idlers_mutex); + if (rv != APR_SUCCESS) { + return rv; + } + queue_info->terminated = 1; + apr_thread_cond_broadcast(queue_info->wait_for_idler); + rv = apr_thread_mutex_unlock(queue_info->idlers_mutex); + if (rv != APR_SUCCESS) { + return rv; + } + return APR_SUCCESS; +} + /** * Detects when the fd_queue_t is full. This utility function is expected * to be called from within critical sections, and is not threadsafe. diff --git a/server/mpm/worker/fdqueue.h b/server/mpm/worker/fdqueue.h index cdd306100a6..c7d5cab1c74 100644 --- a/server/mpm/worker/fdqueue.h +++ b/server/mpm/worker/fdqueue.h @@ -71,6 +71,14 @@ #endif #include +typedef struct fd_queue_info_t fd_queue_info_t; + +apr_status_t ap_queue_info_create(fd_queue_info_t **queue_info, + apr_pool_t *pool); +apr_status_t ap_queue_info_set_idle(fd_queue_info_t *queue_info); +apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t *queue_info); +apr_status_t ap_queue_info_term(fd_queue_info_t *queue_info); + struct fd_queue_elem_t { apr_socket_t *sd; apr_pool_t *p; diff --git a/server/mpm/worker/worker.c b/server/mpm/worker/worker.c index 8d77ad68d46..378d8a56da7 100644 --- a/server/mpm/worker/worker.c +++ b/server/mpm/worker/worker.c @@ -173,6 +173,7 @@ static int requests_this_child; static int num_listensocks = 0; static int resource_shortage = 0; static fd_queue_t *worker_queue; +static fd_queue_info_t *worker_queue_info; /* The structure used to pass unique initialization info to each thread */ typedef struct { @@ -299,6 +300,7 @@ static void signal_threads(int mode) if (mode == ST_UNGRACEFUL) { workers_may_exit = 1; ap_queue_interrupt_all(worker_queue); + ap_queue_info_term(worker_queue_info); } } @@ -693,6 +695,20 @@ static void *listener_thread(apr_thread_t *thd, void * dummy) } if (listener_may_exit) break; + rv = ap_queue_info_wait_for_idler(worker_queue_info); + if (APR_STATUS_IS_EOF(rv)) { + break; /* we've been signaled to die now */ + } + else if (rv != APR_SUCCESS) { + ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf, + "apr_queue_info_wait failed. Attempting to shutdown " + "process gracefully."); + signal_threads(ST_GRACEFUL); + break; + } + /* We've already decremented the idle worker count inside + * ap_queue_info_wait_for_idler. */ + if ((rv = SAFE_ACCEPT(apr_proc_mutex_lock(accept_mutex))) != APR_SUCCESS) { int level = APLOG_EMERG; @@ -851,6 +867,15 @@ static void * APR_THREAD_FUNC worker_thread(apr_thread_t *thd, void * dummy) bucket_alloc = apr_bucket_alloc_create(apr_thread_pool_get(thd)); while (!workers_may_exit) { + rv = ap_queue_info_set_idle(worker_queue_info); + if (rv != APR_SUCCESS) { + ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf, + "ap_queue_info_set_idle failed. Attempting to " + "shutdown process gracefully."); + signal_threads(ST_GRACEFUL); + break; + } + ap_update_child_status_from_indexes(process_slot, thread_slot, SERVER_READY, NULL); rv = ap_queue_pop(worker_queue, &csd, &ptrans, last_ptrans); last_ptrans = NULL; @@ -961,6 +986,13 @@ static void * APR_THREAD_FUNC start_threads(apr_thread_t *thd, void *dummy) clean_child_exit(APEXIT_CHILDFATAL); } + rv = ap_queue_info_create(&worker_queue_info, pchild); + if (rv != APR_SUCCESS) { + ap_log_error(APLOG_MARK, APLOG_ALERT, rv, ap_server_conf, + "ap_queue_info_create() failed"); + clean_child_exit(APEXIT_CHILDFATAL); + } + loops = prev_threads_created = 0; while (1) { /* ap_threads_per_child does not include the listener thread */