From: Brian Pane Date: Mon, 12 Sep 2005 00:35:23 +0000 (+0000) Subject: Added asynchronous keep-alive support to the leader MPM... X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=926f1e0ab3cd143e078aaf9d00d28a2ac0892e21;p=thirdparty%2Fapache%2Fhttpd.git Added asynchronous keep-alive support to the leader MPM... the new io_multiplexer object is intended to provide a foundation for async processing of the other connection states. git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/branches/async-dev@280222 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/server/mpm/experimental/leader/Makefile.in b/server/mpm/experimental/leader/Makefile.in index 03f1765edd8..2b5a71ec263 100644 --- a/server/mpm/experimental/leader/Makefile.in +++ b/server/mpm/experimental/leader/Makefile.in @@ -1,5 +1,5 @@ LTLIBRARY_NAME = libleader.la -LTLIBRARY_SOURCES = leader.c +LTLIBRARY_SOURCES = leader.c io_multiplexer.c include $(top_srcdir)/build/ltlib.mk diff --git a/server/mpm/experimental/leader/README b/server/mpm/experimental/leader/README index 1981a5beedb..a2bcccf9cdd 100644 --- a/server/mpm/experimental/leader/README +++ b/server/mpm/experimental/leader/README @@ -1,15 +1,24 @@ Leader MPM: -This is an experimental variant of the standard worker MPM. -It uses a Leader/Followers design pattern to coordinate work among threads: + +This is an experimental MPM that uses the Leader/Followers design +pattern to coordinate work among threads: http://deuce.doc.wustl.edu/doc/pspdfs/lf.pdf +As of httpd-2.3, the Leader MPM also incorporates a variant of the +Event MPM's asynchronous socket I/O for keepalive connections. The +management of pollsets and timeouts is encapsulated within the +io_multiplexer functions, with the aim of eventually supporting +asynchronous write completion. + To use the leader MPM, add "--with-mpm=leader" to the configure script's arguments when building the httpd. -This MPM depends on APR's atomic compare-and-swap operations for -thread synchronization. If you are compiling for an x86 target -and you don't need to support 386s, or you're compiling for a -SPARC and you don't need to run on pre-UltraSPARC chips, add -"--enable-nonportable-atomics=yes" to the configure script's -arguments. This will cause APR to implement atomic operations -using efficient opcodes not available in older CPUs. +IMPORTANT NOTES: + +* At the moment, with the async code under active development, the + Leader MPM is suitable for R&D use, but not for production use. + +* Like the Event MPM, the Leader MPM now requires a threadsafe + apr_pollset implementation, such as epoll on Linux or kqueue + on BSD. + diff --git a/server/mpm/experimental/leader/io_multiplexer.c b/server/mpm/experimental/leader/io_multiplexer.c new file mode 100644 index 00000000000..1f2d6346a3b --- /dev/null +++ b/server/mpm/experimental/leader/io_multiplexer.c @@ -0,0 +1,254 @@ +/* Copyright 2005 The Apache Software Foundation or its licensors, as + * applicable. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "apr_poll.h" +#include "apr_ring.h" +#include "apr_thread_cond.h" +#include "apr_thread_mutex.h" + +#include "io_multiplexer.h" + +extern server_rec *ap_server_conf; + +APR_RING_HEAD(timeout_ring_header_t, conn_state_t); + +struct io_multiplexer { + int stopped; + apr_thread_mutex_t *lock; + apr_thread_mutex_t *pollset_lock; + apr_pollset_t *pollset; + apr_int32_t num_pending_events; + const apr_pollfd_t *next_pending_event; + struct timeout_ring_header_t pending_timeouts; + struct timeout_ring_header_t expired_timeouts; + volatile int poll_sequence_num; +}; + +static apr_status_t io_multiplexer_remove_internal(io_multiplexer *iom, + multiplexable *m); + +apr_status_t io_multiplexer_create(io_multiplexer **iom, apr_pool_t *p, + apr_uint32_t max_descriptors) +{ + apr_status_t rv; + *iom = (io_multiplexer *)apr_palloc(p, sizeof(**iom)); + rv = apr_thread_mutex_create(&((*iom)->lock), APR_THREAD_MUTEX_DEFAULT, p); + if (rv != APR_SUCCESS) { + return rv; + } + rv = apr_thread_mutex_create(&((*iom)->pollset_lock), + APR_THREAD_MUTEX_DEFAULT, p); + if (rv != APR_SUCCESS) { + return rv; + } + rv = apr_pollset_create(&((*iom)->pollset), max_descriptors, p, + APR_POLLSET_THREADSAFE); + if (rv != APR_SUCCESS) { + return rv; + } + (*iom)->stopped = 0; + (*iom)->num_pending_events = 0; + (*iom)->next_pending_event = NULL; + APR_RING_INIT(&((*iom)->pending_timeouts), conn_state_t, timeout_list); + APR_RING_INIT(&((*iom)->expired_timeouts), conn_state_t, timeout_list); + (*iom)->poll_sequence_num = 0; + + return APR_SUCCESS; +} + +#define DEFAULT_POLL_TIMEOUT 1000000 + +apr_status_t io_multiplexer_get_event(io_multiplexer *iom, + apr_pollfd_t *event) +{ + apr_status_t rv; + rv = apr_thread_mutex_lock(iom->pollset_lock); + if (rv != APR_SUCCESS) { + return rv; + } + rv = apr_thread_mutex_lock(iom->lock); + if (rv != APR_SUCCESS) { + apr_thread_mutex_unlock(iom->pollset_lock); + return rv; + } + if (iom->stopped) { + apr_thread_mutex_unlock(iom->lock); + apr_thread_mutex_unlock(iom->pollset_lock); + return APR_EINVAL; + } + for (;;) { + /* Invariant: at the start of each iteration of this loop, the + * active thread holds iom->lock. + */ + if (!APR_RING_EMPTY(&(iom->expired_timeouts), conn_state_t, timeout_list)) { + /* There are some timeout notifications remaining + * from the last poll. Return the next one. + */ + conn_state_t *cs = APR_RING_FIRST(&(iom->expired_timeouts)); + APR_RING_REMOVE(cs, timeout_list); + *event = cs->pfd; + event->rtnevents |= IOM_POLL_TIMEOUT; + apr_thread_mutex_unlock(iom->lock); + return apr_thread_mutex_unlock(iom->pollset_lock); + } + else if (iom->num_pending_events > 0) { + /* There are some events remaining from the last + * poll. Return the next one. + */ + *event = *(iom->next_pending_event++); + apr_pollset_remove(iom->pollset, event); + iom->num_pending_events--; + apr_thread_mutex_unlock(iom->lock); + return apr_thread_mutex_unlock(iom->pollset_lock); + } + else { + /* No unprocessed events remain from the previous poll, + * so initiate a new poll. + */ + apr_int32_t num_pending_events = 0; + const apr_pollfd_t *next_pending_event; + apr_interval_time_t poll_timeout; + int i; + + if (APR_RING_EMPTY(&(iom->pending_timeouts), conn_state_t, + timeout_list)) { + poll_timeout = DEFAULT_POLL_TIMEOUT; + } + else { + /* If there are pending timeouts, check whether + * any of them have expired. If none have expired, + * use the expiration time on the first one to + * determine how long the poll should block. + */ + apr_time_t now = apr_time_now(); + conn_state_t *cs = APR_RING_FIRST(&(iom->pending_timeouts)); + if (cs->expiration_time <= now) { + do { + APR_RING_REMOVE(cs, timeout_list); + apr_pollset_remove(iom->pollset, &(cs->pfd)); + APR_RING_INSERT_TAIL(&(iom->expired_timeouts), cs, + conn_state_t, timeout_list); + if (APR_RING_EMPTY(&(iom->pending_timeouts), + conn_state_t, timeout_list)) { + break; + } + cs = APR_RING_FIRST(&(iom->pending_timeouts)); + } while (cs->expiration_time <= now); + continue; + } + else { + poll_timeout = cs->expiration_time - now; + } + } + + apr_thread_mutex_unlock(iom->lock); + + rv = apr_pollset_poll(iom->pollset, poll_timeout, + &num_pending_events, &next_pending_event); + + if ((rv != APR_SUCCESS) && !APR_STATUS_IS_TIMEUP(rv) && !APR_STATUS_IS_EINTR(rv)) { + apr_thread_mutex_unlock(iom->pollset_lock); + return rv; + } + apr_thread_mutex_lock(iom->lock); + + if (num_pending_events > 0) { + iom->num_pending_events = num_pending_events; + iom->next_pending_event = next_pending_event; + for (i = 0; i < num_pending_events; i++) { + multiplexable *m = (multiplexable *)next_pending_event[i].client_data; + if (m != NULL) { + io_multiplexer_remove_internal(iom, m); + } + } + } + } + } +} + +apr_status_t io_multiplexer_stop(io_multiplexer *iom, int graceful) { + iom->stopped = 1; + return APR_SUCCESS; +} + +apr_status_t io_multiplexer_add(io_multiplexer *iom, multiplexable *m, + long timeout_in_usec) +{ + apr_status_t rv; + apr_thread_mutex_lock(iom->lock); + if (iom->stopped) { + rv = APR_EINVAL; + } + else if (m->type == IOM_CONNECTION) { + APR_RING_REMOVE(m->c->cs, timeout_list); + m->c->cs->pfd.client_data = m; + rv = apr_pollset_add(iom->pollset, &(m->c->cs->pfd)); + if (timeout_in_usec >= 0) { + /* XXX: Keep the pending_timeouts list sorted */ + m->c->cs->expiration_time = apr_time_now() + timeout_in_usec; + APR_RING_INSERT_TAIL(&(iom->pending_timeouts), m->c->cs, + conn_state_t, timeout_list); + } + } + else if (m->type == IOM_LISTENER) { + apr_pollfd_t desc; + desc.desc_type = APR_POLL_SOCKET; + desc.desc.s = m->l->sd; + desc.reqevents = APR_POLLIN; + desc.client_data = m; + rv = apr_pollset_add(iom->pollset, &desc); + } + else { + rv = APR_EINVALSOCK; + } + apr_thread_mutex_unlock(iom->lock); + return rv; +} + +apr_status_t io_multiplexer_remove(io_multiplexer *iom, multiplexable *m) +{ + apr_status_t rv; + apr_thread_mutex_lock(iom->lock); + rv = io_multiplexer_remove_internal(iom, m); + apr_thread_mutex_unlock(iom->lock); + return rv; +} + +static apr_status_t io_multiplexer_remove_internal(io_multiplexer *iom, + multiplexable *m) +{ + apr_status_t rv; + if (iom->stopped) { + rv = APR_EINVAL; + } + else if (m->type == IOM_CONNECTION) { + APR_RING_REMOVE(m->c->cs, timeout_list); + rv = apr_pollset_remove(iom->pollset, &(m->c->cs->pfd)); + } + else if (m->type == IOM_LISTENER) { + apr_pollfd_t desc; + desc.desc_type = APR_POLL_SOCKET; + desc.desc.s = m->l->sd; + desc.reqevents = APR_POLLIN; + desc.client_data = NULL; + rv = apr_pollset_remove(iom->pollset, &desc); + } + else { + rv = APR_EINVALSOCK; + } + return rv; +} + diff --git a/server/mpm/experimental/leader/io_multiplexer.h b/server/mpm/experimental/leader/io_multiplexer.h new file mode 100644 index 00000000000..443b46c9691 --- /dev/null +++ b/server/mpm/experimental/leader/io_multiplexer.h @@ -0,0 +1,58 @@ +/* Copyright 2005 The Apache Software Foundation or its licensors, as + * applicable. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef APACHE_MPM_EVENT_IOMUX_H +#define APACHE_MPM_EVENT_IOMUX_H + +#include "apr_pools.h" +#include "apr_network_io.h" +#include "apr_poll.h" + +#include "ap_listen.h" +#include "httpd.h" + +typedef struct io_multiplexer io_multiplexer; + +typedef struct { + enum { IOM_LISTENER, IOM_CONNECTION } type; + union { + ap_listen_rec *l; + conn_rec *c; + }; +} multiplexable; + +/* Flag to set in apr_pollfd_t.rtnevents upon timeout + * XXX: Find a way to make sure this never collides with any value + * set by APR + */ +#define IOM_POLL_TIMEOUT 0x8000 + +apr_status_t io_multiplexer_create(io_multiplexer **iom, apr_pool_t *p, + apr_uint32_t max_descriptors); + +apr_status_t io_multiplexer_get_event(io_multiplexer *iom, apr_pollfd_t *event); + +apr_status_t io_multiplexer_stop(io_multiplexer *iom, int graceful); + +#define IOM_TIMEOUT_INFINITE -1 + +apr_status_t io_multiplexer_add(io_multiplexer *iom, multiplexable *m, + long timeout_in_usec); + +apr_status_t io_multiplexer_remove(io_multiplexer *iom, multiplexable *m); + +#endif /* APACHE_MPM_EVENT_IOMUX_H */ + diff --git a/server/mpm/experimental/leader/leader.c b/server/mpm/experimental/leader/leader.c index f9d6a2c32bf..2baf74427d7 100644 --- a/server/mpm/experimental/leader/leader.c +++ b/server/mpm/experimental/leader/leader.c @@ -21,8 +21,6 @@ #include "apr_thread_proc.h" #include "apr_signal.h" #include "apr_thread_cond.h" -#include "apr_thread_mutex.h" -#include "apr_proc_mutex.h" #define APR_WANT_STRFUNC #include "apr_want.h" @@ -52,6 +50,7 @@ #include "http_config.h" /* for read_config */ #include "http_core.h" /* for get_remote_host */ #include "http_connection.h" +#include "http_vhost.h" #include "ap_mpm.h" #include "mpm_common.h" #include "ap_listen.h" @@ -64,6 +63,8 @@ #include "apr_atomic.h" +#include "io_multiplexer.h" + /* Limit on the total --- clients will be locked out if more servers than * this are needed. It is intended solely to keep the server from crashing * when things get out of hand. @@ -187,165 +188,15 @@ static pid_t ap_my_pid; /* Linux getpid() doesn't work except in main thread. Use this instead */ static pid_t parent_pid; -/* Locks for accept serialization */ -static apr_proc_mutex_t *accept_mutex; - -#ifdef SINGLE_LISTEN_UNSERIALIZED_ACCEPT -#define SAFE_ACCEPT(stmt) (ap_listeners->next ? (stmt) : APR_SUCCESS) -#else -#define SAFE_ACCEPT(stmt) (stmt) -#endif - - -/* Structure used to wake up an idle worker thread +/* Max # of file descriptors that the io_multiplexer supports + * XXX should be configurable at runtime */ -struct worker_wakeup_info { - apr_uint32_t next; /* index into worker_wakeups array, - * used to build a linked list - */ - apr_thread_cond_t *cond; - apr_thread_mutex_t *mutex; -}; - -static worker_wakeup_info *worker_wakeup_create(apr_pool_t *pool) -{ - apr_status_t rv; - worker_wakeup_info *wakeup; - - wakeup = (worker_wakeup_info *)apr_palloc(pool, sizeof(*wakeup)); - if ((rv = apr_thread_cond_create(&wakeup->cond, pool)) != APR_SUCCESS) { - return NULL; - } - if ((rv = apr_thread_mutex_create(&wakeup->mutex, APR_THREAD_MUTEX_DEFAULT, - pool)) != APR_SUCCESS) { - return NULL; - } - /* The wakeup's mutex will be unlocked automatically when - * the worker blocks on the condition variable - */ - apr_thread_mutex_lock(wakeup->mutex); - return wakeup; -} - +#define MAX_IOM_DESCRIPTORS 1024 -/* Structure used to hold a stack of idle worker threads +/* Socket multiplexer used to watch large numbers of connections + * for readability/writeability */ -typedef struct { - /* 'state' consists of several fields concatenated into a - * single 32-bit int for use with the apr_atomic_cas32() API: - * state & STACK_FIRST is the thread ID of the first thread - * in a linked list of idle threads - * state & STACK_TERMINATED indicates whether the proc is shutting down - * state & STACK_NO_LISTENER indicates whether the process has - * no current listener thread - */ - apr_uint32_t state; -} worker_stack; - -#define STACK_FIRST 0xffff -#define STACK_LIST_END 0xffff -#define STACK_TERMINATED 0x10000 -#define STACK_NO_LISTENER 0x20000 - -static worker_wakeup_info **worker_wakeups = NULL; - -static worker_stack* worker_stack_create(apr_pool_t *pool, apr_size_t max) -{ - worker_stack *stack = (worker_stack *)apr_palloc(pool, sizeof(*stack)); - stack->state = STACK_NO_LISTENER | STACK_LIST_END; - return stack; -} - -static apr_status_t worker_stack_wait(worker_stack *stack, - apr_uint32_t worker_id) -{ - worker_wakeup_info *wakeup = worker_wakeups[worker_id]; - - while (1) { - apr_uint32_t state = stack->state; - if (state & (STACK_TERMINATED | STACK_NO_LISTENER)) { - if (state & STACK_TERMINATED) { - return APR_EINVAL; - } - if (apr_atomic_cas32(&(stack->state), STACK_LIST_END, state) != - state) { - continue; - } - else { - return APR_SUCCESS; - } - } - wakeup->next = state; - if (apr_atomic_cas32(&(stack->state), worker_id, state) != state) { - continue; - } - else { - return apr_thread_cond_wait(wakeup->cond, wakeup->mutex); - } - } -} - -static apr_status_t worker_stack_awaken_next(worker_stack *stack) -{ - - while (1) { - apr_uint32_t state = stack->state; - apr_uint32_t first = state & STACK_FIRST; - if (first == STACK_LIST_END) { - if (apr_atomic_cas32(&(stack->state), state | STACK_NO_LISTENER, - state) != state) { - continue; - } - else { - return APR_SUCCESS; - } - } - else { - worker_wakeup_info *wakeup = worker_wakeups[first]; - if (apr_atomic_cas32(&(stack->state), (state ^ first) | wakeup->next, - state) != state) { - continue; - } - else { - /* Acquire and release the idle worker's mutex to ensure - * that it's actually waiting on its condition variable - */ - apr_status_t rv; - if ((rv = apr_thread_mutex_lock(wakeup->mutex)) != - APR_SUCCESS) { - return rv; - } - if ((rv = apr_thread_mutex_unlock(wakeup->mutex)) != - APR_SUCCESS) { - return rv; - } - return apr_thread_cond_signal(wakeup->cond); - } - } - } -} - -static apr_status_t worker_stack_term(worker_stack *stack) -{ - int i; - apr_status_t rv; - - while (1) { - apr_uint32_t state = stack->state; - if (apr_atomic_cas32(&(stack->state), state | STACK_TERMINATED, - state) == state) { - break; - } - } - for (i = 0; i < ap_threads_per_child; i++) { - if ((rv = worker_stack_awaken_next(stack)) != APR_SUCCESS) { - return rv; - } - } - return APR_SUCCESS; -} - -static worker_stack *idle_worker_stack; +static io_multiplexer *iom; #define ST_INIT 0 #define ST_GRACEFUL 1 @@ -362,7 +213,7 @@ static void signal_threads(int mode) mpm_state = AP_MPMQ_STOPPING; workers_may_exit = 1; - worker_stack_term(idle_worker_stack); + io_multiplexer_stop(iom, 0); } AP_DECLARE(apr_status_t) ap_mpm_query(int query_code, int *result) @@ -377,6 +228,9 @@ AP_DECLARE(apr_status_t) ap_mpm_query(int query_code, int *result) case AP_MPMQ_IS_FORKED: *result = AP_MPMQ_DYNAMIC; return APR_SUCCESS; + case AP_MPMQ_IS_ASYNC: + *result = 1; + return APR_SUCCESS; case AP_MPMQ_HARD_LIMIT_DAEMONS: *result = server_limit; return APR_SUCCESS; @@ -587,27 +441,51 @@ int ap_graceful_stop_signalled(void) return workers_may_exit; } -/***************************************************************** - * Child process main loop. - */ - -static void process_socket(apr_pool_t *p, apr_socket_t *sock, int my_child_num, - int my_thread_num, apr_bucket_alloc_t *bucket_alloc) +static apr_status_t process_event_on_connection(const apr_pollfd_t *event, + io_multiplexer *iom, + int child_num, int thread_num) { - conn_rec *current_conn; - long conn_id = ID_FROM_CHILD_THREAD(my_child_num, my_thread_num); - int csd; - ap_sb_handle_t *sbh; - - ap_create_sb_handle(&sbh, p, my_child_num, my_thread_num); - apr_os_sock_get(&csd, sock); - - current_conn = ap_run_create_connection(p, ap_server_conf, sock, - conn_id, sbh, bucket_alloc); - if (current_conn) { - ap_process_connection(current_conn, sock); - ap_lingering_close(current_conn); + multiplexable *m = (multiplexable *)(event->client_data); + conn_rec *c = m->c; + conn_state_t *cs = c->cs; + + if (cs->state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) { + if (event->rtnevents & IOM_POLL_TIMEOUT) { + cs->state = CONN_STATE_LINGER; + ap_log_error(APLOG_MARK, APLOG_CRIT, 0, ap_server_conf, "timeout on connection"); + } + else if (event->rtnevents & (APR_POLLIN | APR_POLLPRI)) { + cs->state = CONN_STATE_READ_REQUEST_LINE; + } + else if (event->rtnevents & APR_POLLHUP) { + /* close... */ + } + else if (event->rtnevents & (APR_POLLERR | APR_POLLNVAL)) { + /* error... */ + } } + + if (cs->state == CONN_STATE_READ_REQUEST_LINE) { + if (!c->aborted) { + ap_run_process_connection(c); + } + else { + cs->state = CONN_STATE_LINGER; + } + } + + if (cs->state == CONN_STATE_LINGER) { + ap_lingering_close(c); + apr_bucket_alloc_destroy(cs->bucket_alloc); + apr_pool_destroy(cs->p); + return APR_SUCCESS; + } + + if (cs->state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) { + io_multiplexer_add(iom, m, ap_server_conf->keep_alive_timeout); + } + + return APR_SUCCESS; } /* requests_this_child has gone to zero or below. See if the admin coded @@ -647,23 +525,100 @@ static void unblock_signal(int sig) #endif } +static apr_status_t accept_new_connection(const apr_pollfd_t *event, + io_multiplexer *iom, + int child_num, int thread_num) +{ + apr_allocator_t *allocator = NULL; + apr_pool_t *p = NULL; + apr_bucket_alloc_t *bucket_alloc = NULL; + apr_socket_t *new_socket = NULL; + void *csd; + apr_status_t rv; + multiplexable *client_data = (multiplexable *)(event->client_data); + ap_listen_rec *listener = client_data->l; + + apr_allocator_create(&allocator); + apr_allocator_max_free_set(allocator, ap_max_mem_free); + apr_pool_create_ex(&p, NULL, NULL, allocator); + apr_allocator_owner_set(allocator, p); + bucket_alloc = apr_bucket_alloc_create_ex(allocator); + + rv = listener->accept_func(&csd, listener, p); + + + if (rv == APR_SUCCESS) { + /* Re-register the listener with the multiplexer */ + io_multiplexer_add(iom, client_data, -1); + + if (csd == NULL) { + rv = APR_EGENERAL; + } + else { + new_socket = (apr_socket_t *)csd; + conn_rec *connection; + long conn_id = ID_FROM_CHILD_THREAD(child_num, thread_num); + ap_sb_handle_t *sbh; + ap_create_sb_handle(&sbh, p, child_num, thread_num); + connection = ap_run_create_connection(p, ap_server_conf, + new_socket, conn_id, + sbh, bucket_alloc); + if (connection == NULL) { + rv = APR_EGENERAL; + } + else { + conn_state_t *cs = connection->cs; + multiplexable *m; + int rc; + + ap_update_vhost_given_ip(connection); + rc = ap_run_pre_connection(connection, csd); + if (rc != OK && rc != DONE) { + ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, + "accept_new_connection: connection aborted"); + connection->aborted = 1; + } + m = (multiplexable *)apr_palloc(p, sizeof(*m)); + m->type = IOM_CONNECTION; + m->c = connection; + cs->pfd.desc_type = APR_POLL_SOCKET; + cs->pfd.desc.s = new_socket; + cs->pfd.reqevents = APR_POLLIN; + cs->pfd.client_data = m; + + rv = io_multiplexer_add(iom, m, -1); + if (rv != APR_SUCCESS) { + ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf, + "io_multiplexer_add() failed"); + } + } + } + } + if (rv != APR_SUCCESS) { + if (new_socket != NULL) { + apr_socket_close(new_socket); + } + if (bucket_alloc != NULL) { + apr_bucket_alloc_destroy(bucket_alloc); /* XXX unneeded? */ + } + if (p != NULL) { + apr_pool_destroy(p); + } + ap_log_error(APLOG_MARK, APLOG_DEBUG, rv, ap_server_conf, "not restoring listener"); + } + + return rv; +} + static void *worker_thread(apr_thread_t *thd, void * dummy) { proc_info * ti = dummy; int process_slot = ti->pid; int thread_slot = ti->tid; - apr_uint32_t my_worker_num = (apr_uint32_t)(ti->tid); - apr_pool_t *tpool = apr_thread_pool_get(thd); - void *csd = NULL; apr_allocator_t *allocator; apr_pool_t *ptrans; /* Pool for per-transaction stuff */ apr_bucket_alloc_t *bucket_alloc; - int numdesc; - apr_pollset_t *pollset; apr_status_t rv; - ap_listen_rec *lr; - int is_listener; - int last_poll_idx = 0; ap_update_child_status_from_indexes(process_slot, thread_slot, SERVER_STARTING, NULL); @@ -676,172 +631,45 @@ static void *worker_thread(apr_thread_t *thd, void * dummy) apr_allocator_owner_set(allocator, ptrans); bucket_alloc = apr_bucket_alloc_create_ex(allocator); - apr_pollset_create(&pollset, num_listensocks, tpool, 0); - for (lr = ap_listeners ; lr != NULL ; lr = lr->next) { - apr_pollfd_t pfd = { 0 }; - - pfd.desc_type = APR_POLL_SOCKET; - pfd.desc.s = lr->sd; - pfd.reqevents = APR_POLLIN; - pfd.client_data = lr; - - /* ### check the status */ - (void) apr_pollset_add(pollset, &pfd); - } - - /* TODO: Switch to a system where threads reuse the results from earlier - poll calls - manoj */ - is_listener = 0; while (!workers_may_exit) { - + apr_pollfd_t event; ap_update_child_status_from_indexes(process_slot, thread_slot, SERVER_READY, NULL); - if (!is_listener) { - /* Wait until it's our turn to become the listener */ - if ((rv = worker_stack_wait(idle_worker_stack, my_worker_num)) != - APR_SUCCESS) { - if (rv != APR_EINVAL) { - ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf, - "worker_stack_wait failed. Shutting down"); - } - break; - } - if (workers_may_exit) { - break; - } - is_listener = 1; - } - + rv = io_multiplexer_get_event(iom, &event); /* TODO: requests_this_child should be synchronized - aaron */ if (requests_this_child <= 0) { check_infinite_requests(); } - if (workers_may_exit) break; - - if ((rv = SAFE_ACCEPT(apr_proc_mutex_lock(accept_mutex))) - != APR_SUCCESS) { - int level = APLOG_EMERG; - - if (workers_may_exit) { - break; - } - if (ap_scoreboard_image->parent[process_slot].generation != - ap_scoreboard_image->global->running_generation) { - level = APLOG_DEBUG; /* common to get these at restart time */ - } - ap_log_error(APLOG_MARK, level, rv, ap_server_conf, - "apr_proc_mutex_lock failed. Attempting to shutdown " - "process gracefully."); - signal_threads(ST_GRACEFUL); - break; /* skip the lock release */ - } - - if (!ap_listeners->next) { - /* Only one listener, so skip the poll */ - lr = ap_listeners; - } - else { - while (!workers_may_exit) { - apr_status_t ret; - const apr_pollfd_t *pdesc; - - ret = apr_pollset_poll(pollset, -1, &numdesc, &pdesc); - if (ret != APR_SUCCESS) { - if (APR_STATUS_IS_EINTR(ret)) { - continue; - } - - /* apr_pollset_poll() will only return errors in catastrophic - * circumstances. Let's try exiting gracefully, for now. */ - ap_log_error(APLOG_MARK, APLOG_ERR, ret, (const server_rec *) - ap_server_conf, "apr_pollset_poll: (listen)"); - signal_threads(ST_GRACEFUL); - } - - if (workers_may_exit) break; - - /* We can always use pdesc[0], but sockets at position N - * could end up completely starved of attention in a very - * busy server. Therefore, we round-robin across the - * returned set of descriptors. While it is possible that - * the returned set of descriptors might flip around and - * continue to starve some sockets, we happen to know the - * internal pollset implementation retains ordering - * stability of the sockets. Thus, the round-robin should - * ensure that a socket will eventually be serviced. - */ - if (last_poll_idx >= numdesc) - last_poll_idx = 0; - - /* Grab a listener record from the client_data of the poll - * descriptor, and advance our saved index to round-robin - * the next fetch. - * - * ### hmm... this descriptor might have POLLERR rather - * ### than POLLIN - */ - lr = pdesc[last_poll_idx++].client_data; - goto got_fd; - } + if (workers_may_exit) { + break; } - got_fd: - if (!workers_may_exit) { - rv = lr->accept_func(&csd, lr, ptrans); - /* later we trash rv and rely on csd to indicate success/failure */ - AP_DEBUG_ASSERT(rv == APR_SUCCESS || !csd); - - if (rv == APR_EGENERAL) { - /* E[NM]FILE, ENOMEM, etc */ - resource_shortage = 1; - signal_threads(ST_GRACEFUL); + if (rv == APR_SUCCESS) { + multiplexable *m = (multiplexable *)event.client_data; + if (m->type == IOM_CONNECTION) { + rv = process_event_on_connection(&event, iom, process_slot, thread_slot); } - if ((rv = SAFE_ACCEPT(apr_proc_mutex_unlock(accept_mutex))) - != APR_SUCCESS) { - int level = APLOG_EMERG; - - if (workers_may_exit) { - break; + else if (m->type == IOM_LISTENER) { + rv = accept_new_connection(&event, iom, process_slot, thread_slot); + if (rv != APR_SUCCESS) { + ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf, + "accept_new_connection() failed"); } - if (ap_scoreboard_image->parent[process_slot].generation != - ap_scoreboard_image->global->running_generation) { - level = APLOG_DEBUG; /* common to get these at restart time */ - } - ap_log_error(APLOG_MARK, level, rv, ap_server_conf, - "apr_proc_mutex_unlock failed. Attempting to " - "shutdown process gracefully."); - signal_threads(ST_GRACEFUL); - } - if (csd != NULL) { - is_listener = 0; - worker_stack_awaken_next(idle_worker_stack); - process_socket(ptrans, csd, process_slot, - thread_slot, bucket_alloc); - apr_pool_clear(ptrans); - requests_this_child--; } - if ((ap_mpm_pod_check(pod) == APR_SUCCESS) || - (ap_my_generation != - ap_scoreboard_image->global->running_generation)) { - signal_threads(ST_GRACEFUL); - break; + else { + ap_log_error(APLOG_MARK, APLOG_CRIT, 0, ap_server_conf, + "got event on polled object with unknown type %d", + m->type); } } else { - if ((rv = SAFE_ACCEPT(apr_proc_mutex_unlock(accept_mutex))) - != APR_SUCCESS) { - ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf, - "apr_proc_mutex_unlock failed. Attempting to " - "shutdown process gracefully."); - signal_threads(ST_GRACEFUL); - } - break; + ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf, "io_multiplexer_get_event failed"); } } dying = 1; ap_scoreboard_image->parent[process_slot].quiescing = 1; - worker_stack_term(idle_worker_stack); + io_multiplexer_stop(iom, 0); ap_update_child_status_from_indexes(process_slot, thread_slot, (dying) ? SERVER_DEAD : SERVER_GRACEFUL, (request_rec *) NULL); @@ -876,39 +704,43 @@ static void * APR_THREAD_FUNC start_threads(apr_thread_t *thd, void *dummy) int my_child_num = child_num_arg; proc_info *my_info; apr_status_t rv; + ap_listen_rec *listener; int i; int threads_created = 0; int loops; int prev_threads_created; - idle_worker_stack = worker_stack_create(pchild, ap_threads_per_child); - if (idle_worker_stack == NULL) { - ap_log_error(APLOG_MARK, APLOG_ALERT, 0, ap_server_conf, - "worker_stack_create() failed"); + rv = io_multiplexer_create(&iom, pchild, MAX_IOM_DESCRIPTORS); + if (rv != APR_SUCCESS) { + ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf, + "io_multiplexer_create() failed"); clean_child_exit(APEXIT_CHILDFATAL); } - - worker_wakeups = (worker_wakeup_info **) - apr_palloc(pchild, sizeof(worker_wakeup_info *) * - ap_threads_per_child); + for (listener = ap_listeners; listener != NULL; listener = listener->next) { + apr_pollfd_t descriptor; + multiplexable *m = (multiplexable *)apr_palloc(pconf, sizeof(*m)); + m->type = IOM_LISTENER; + m->l = listener; + descriptor.desc_type = APR_POLL_SOCKET; + descriptor.desc.s = listener->sd; + descriptor.reqevents = APR_POLLIN; + descriptor.client_data = NULL; + rv = io_multiplexer_add(iom, m, -1); + if (rv != APR_SUCCESS) { + ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf, + "could not add listener to io_multiplexer"); + } + } loops = prev_threads_created = 0; while (1) { for (i = 0; i < ap_threads_per_child; i++) { int status = ap_scoreboard_image->servers[child_num_arg][i].status; - worker_wakeup_info *wakeup; if (status != SERVER_GRACEFUL && status != SERVER_DEAD) { continue; } - wakeup = worker_wakeup_create(pchild); - if (wakeup == NULL) { - ap_log_error(APLOG_MARK, APLOG_ALERT|APLOG_NOERRNO, 0, - ap_server_conf, "worker_wakeup_create failed"); - clean_child_exit(APEXIT_CHILDFATAL); - } - worker_wakeups[threads_created] = wakeup; my_info = (proc_info *)malloc(sizeof(proc_info)); if (my_info == NULL) { ap_log_error(APLOG_MARK, APLOG_ALERT, errno, ap_server_conf, @@ -1022,14 +854,6 @@ static void child_main(int child_num_arg) /*stuff to do before we switch id's, so we have permissions.*/ ap_reopen_scoreboard(pchild, NULL, 0); - rv = SAFE_ACCEPT(apr_proc_mutex_child_init(&accept_mutex, ap_lock_fname, - pchild)); - if (rv != APR_SUCCESS) { - ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf, - "Couldn't initialize cross-process lock in child"); - clean_child_exit(APEXIT_CHILDFATAL); - } - if (unixd_setup_child()) { clean_child_exit(APEXIT_CHILDFATAL); } @@ -1446,7 +1270,6 @@ static void server_main_loop(int remaining_children_to_start) int ap_mpm_run(apr_pool_t *_pconf, apr_pool_t *plog, server_rec *s) { int remaining_children_to_start; - apr_status_t rv; ap_log_pid(pconf, ap_pid_fname); @@ -1458,36 +1281,6 @@ int ap_mpm_run(apr_pool_t *_pconf, apr_pool_t *plog, server_rec *s) "ignored during restart"); changed_limit_at_restart = 0; } - - /* Initialize cross-process accept lock */ - ap_lock_fname = apr_psprintf(_pconf, "%s.%" APR_PID_T_FMT, - ap_server_root_relative(_pconf, ap_lock_fname), - ap_my_pid); - - rv = apr_proc_mutex_create(&accept_mutex, ap_lock_fname, - ap_accept_lock_mech, _pconf); - if (rv != APR_SUCCESS) { - ap_log_error(APLOG_MARK, APLOG_EMERG, rv, s, - "Couldn't create accept lock"); - mpm_state = AP_MPMQ_STOPPING; - return 1; - } - -#if APR_USE_SYSVSEM_SERIALIZE - if (ap_accept_lock_mech == APR_LOCK_DEFAULT || - ap_accept_lock_mech == APR_LOCK_SYSVSEM) { -#else - if (ap_accept_lock_mech == APR_LOCK_SYSVSEM) { -#endif - rv = unixd_set_proc_mutex_perms(accept_mutex); - if (rv != APR_SUCCESS) { - ap_log_error(APLOG_MARK, APLOG_EMERG, rv, s, - "Couldn't set permissions on cross-process lock; " - "check User and Group directives"); - mpm_state = AP_MPMQ_STOPPING; - return 1; - } - } if (!is_graceful) { if (ap_run_pre_mpm(s->process->pool, SB_SHARED) != OK) { @@ -1532,12 +1325,6 @@ int ap_mpm_run(apr_pool_t *_pconf, apr_pool_t *plog, server_rec *s) ap_get_server_version()); ap_log_error(APLOG_MARK, APLOG_INFO, 0, ap_server_conf, "Server built: %s", ap_get_server_built()); -#ifdef AP_MPM_WANT_SET_ACCEPT_LOCK_MECH - ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, - "AcceptMutex: %s (default: %s)", - apr_proc_mutex_name(accept_mutex), - apr_proc_mutex_defname()); -#endif restart_pending = shutdown_pending = 0; mpm_state = AP_MPMQ_RUNNING;