--- /dev/null
+/* Copyright 2005 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "apr_poll.h"
+#include "apr_ring.h"
+#include "apr_thread_cond.h"
+#include "apr_thread_mutex.h"
+
+#include "io_multiplexer.h"
+
+extern server_rec *ap_server_conf;
+
+APR_RING_HEAD(timeout_ring_header_t, conn_state_t);
+
+struct io_multiplexer {
+ int stopped;
+ apr_thread_mutex_t *lock;
+ apr_thread_mutex_t *pollset_lock;
+ apr_pollset_t *pollset;
+ apr_int32_t num_pending_events;
+ const apr_pollfd_t *next_pending_event;
+ struct timeout_ring_header_t pending_timeouts;
+ struct timeout_ring_header_t expired_timeouts;
+ volatile int poll_sequence_num;
+};
+
+static apr_status_t io_multiplexer_remove_internal(io_multiplexer *iom,
+ multiplexable *m);
+
+apr_status_t io_multiplexer_create(io_multiplexer **iom, apr_pool_t *p,
+ apr_uint32_t max_descriptors)
+{
+ apr_status_t rv;
+ *iom = (io_multiplexer *)apr_palloc(p, sizeof(**iom));
+ rv = apr_thread_mutex_create(&((*iom)->lock), APR_THREAD_MUTEX_DEFAULT, p);
+ if (rv != APR_SUCCESS) {
+ return rv;
+ }
+ rv = apr_thread_mutex_create(&((*iom)->pollset_lock),
+ APR_THREAD_MUTEX_DEFAULT, p);
+ if (rv != APR_SUCCESS) {
+ return rv;
+ }
+ rv = apr_pollset_create(&((*iom)->pollset), max_descriptors, p,
+ APR_POLLSET_THREADSAFE);
+ if (rv != APR_SUCCESS) {
+ return rv;
+ }
+ (*iom)->stopped = 0;
+ (*iom)->num_pending_events = 0;
+ (*iom)->next_pending_event = NULL;
+ APR_RING_INIT(&((*iom)->pending_timeouts), conn_state_t, timeout_list);
+ APR_RING_INIT(&((*iom)->expired_timeouts), conn_state_t, timeout_list);
+ (*iom)->poll_sequence_num = 0;
+
+ return APR_SUCCESS;
+}
+
+#define DEFAULT_POLL_TIMEOUT 1000000
+
+apr_status_t io_multiplexer_get_event(io_multiplexer *iom,
+ apr_pollfd_t *event)
+{
+ apr_status_t rv;
+ rv = apr_thread_mutex_lock(iom->pollset_lock);
+ if (rv != APR_SUCCESS) {
+ return rv;
+ }
+ rv = apr_thread_mutex_lock(iom->lock);
+ if (rv != APR_SUCCESS) {
+ apr_thread_mutex_unlock(iom->pollset_lock);
+ return rv;
+ }
+ if (iom->stopped) {
+ apr_thread_mutex_unlock(iom->lock);
+ apr_thread_mutex_unlock(iom->pollset_lock);
+ return APR_EINVAL;
+ }
+ for (;;) {
+ /* Invariant: at the start of each iteration of this loop, the
+ * active thread holds iom->lock.
+ */
+ if (!APR_RING_EMPTY(&(iom->expired_timeouts), conn_state_t, timeout_list)) {
+ /* There are some timeout notifications remaining
+ * from the last poll. Return the next one.
+ */
+ conn_state_t *cs = APR_RING_FIRST(&(iom->expired_timeouts));
+ APR_RING_REMOVE(cs, timeout_list);
+ *event = cs->pfd;
+ event->rtnevents |= IOM_POLL_TIMEOUT;
+ apr_thread_mutex_unlock(iom->lock);
+ return apr_thread_mutex_unlock(iom->pollset_lock);
+ }
+ else if (iom->num_pending_events > 0) {
+ /* There are some events remaining from the last
+ * poll. Return the next one.
+ */
+ *event = *(iom->next_pending_event++);
+ apr_pollset_remove(iom->pollset, event);
+ iom->num_pending_events--;
+ apr_thread_mutex_unlock(iom->lock);
+ return apr_thread_mutex_unlock(iom->pollset_lock);
+ }
+ else {
+ /* No unprocessed events remain from the previous poll,
+ * so initiate a new poll.
+ */
+ apr_int32_t num_pending_events = 0;
+ const apr_pollfd_t *next_pending_event;
+ apr_interval_time_t poll_timeout;
+ int i;
+
+ if (APR_RING_EMPTY(&(iom->pending_timeouts), conn_state_t,
+ timeout_list)) {
+ poll_timeout = DEFAULT_POLL_TIMEOUT;
+ }
+ else {
+ /* If there are pending timeouts, check whether
+ * any of them have expired. If none have expired,
+ * use the expiration time on the first one to
+ * determine how long the poll should block.
+ */
+ apr_time_t now = apr_time_now();
+ conn_state_t *cs = APR_RING_FIRST(&(iom->pending_timeouts));
+ if (cs->expiration_time <= now) {
+ do {
+ APR_RING_REMOVE(cs, timeout_list);
+ apr_pollset_remove(iom->pollset, &(cs->pfd));
+ APR_RING_INSERT_TAIL(&(iom->expired_timeouts), cs,
+ conn_state_t, timeout_list);
+ if (APR_RING_EMPTY(&(iom->pending_timeouts),
+ conn_state_t, timeout_list)) {
+ break;
+ }
+ cs = APR_RING_FIRST(&(iom->pending_timeouts));
+ } while (cs->expiration_time <= now);
+ continue;
+ }
+ else {
+ poll_timeout = cs->expiration_time - now;
+ }
+ }
+
+ apr_thread_mutex_unlock(iom->lock);
+
+ rv = apr_pollset_poll(iom->pollset, poll_timeout,
+ &num_pending_events, &next_pending_event);
+
+ if ((rv != APR_SUCCESS) && !APR_STATUS_IS_TIMEUP(rv) && !APR_STATUS_IS_EINTR(rv)) {
+ apr_thread_mutex_unlock(iom->pollset_lock);
+ return rv;
+ }
+ apr_thread_mutex_lock(iom->lock);
+
+ if (num_pending_events > 0) {
+ iom->num_pending_events = num_pending_events;
+ iom->next_pending_event = next_pending_event;
+ for (i = 0; i < num_pending_events; i++) {
+ multiplexable *m = (multiplexable *)next_pending_event[i].client_data;
+ if (m != NULL) {
+ io_multiplexer_remove_internal(iom, m);
+ }
+ }
+ }
+ }
+ }
+}
+
+apr_status_t io_multiplexer_stop(io_multiplexer *iom, int graceful) {
+ iom->stopped = 1;
+ return APR_SUCCESS;
+}
+
+apr_status_t io_multiplexer_add(io_multiplexer *iom, multiplexable *m,
+ long timeout_in_usec)
+{
+ apr_status_t rv;
+ apr_thread_mutex_lock(iom->lock);
+ if (iom->stopped) {
+ rv = APR_EINVAL;
+ }
+ else if (m->type == IOM_CONNECTION) {
+ APR_RING_REMOVE(m->c->cs, timeout_list);
+ m->c->cs->pfd.client_data = m;
+ rv = apr_pollset_add(iom->pollset, &(m->c->cs->pfd));
+ if (timeout_in_usec >= 0) {
+ /* XXX: Keep the pending_timeouts list sorted */
+ m->c->cs->expiration_time = apr_time_now() + timeout_in_usec;
+ APR_RING_INSERT_TAIL(&(iom->pending_timeouts), m->c->cs,
+ conn_state_t, timeout_list);
+ }
+ }
+ else if (m->type == IOM_LISTENER) {
+ apr_pollfd_t desc;
+ desc.desc_type = APR_POLL_SOCKET;
+ desc.desc.s = m->l->sd;
+ desc.reqevents = APR_POLLIN;
+ desc.client_data = m;
+ rv = apr_pollset_add(iom->pollset, &desc);
+ }
+ else {
+ rv = APR_EINVALSOCK;
+ }
+ apr_thread_mutex_unlock(iom->lock);
+ return rv;
+}
+
+apr_status_t io_multiplexer_remove(io_multiplexer *iom, multiplexable *m)
+{
+ apr_status_t rv;
+ apr_thread_mutex_lock(iom->lock);
+ rv = io_multiplexer_remove_internal(iom, m);
+ apr_thread_mutex_unlock(iom->lock);
+ return rv;
+}
+
+static apr_status_t io_multiplexer_remove_internal(io_multiplexer *iom,
+ multiplexable *m)
+{
+ apr_status_t rv;
+ if (iom->stopped) {
+ rv = APR_EINVAL;
+ }
+ else if (m->type == IOM_CONNECTION) {
+ APR_RING_REMOVE(m->c->cs, timeout_list);
+ rv = apr_pollset_remove(iom->pollset, &(m->c->cs->pfd));
+ }
+ else if (m->type == IOM_LISTENER) {
+ apr_pollfd_t desc;
+ desc.desc_type = APR_POLL_SOCKET;
+ desc.desc.s = m->l->sd;
+ desc.reqevents = APR_POLLIN;
+ desc.client_data = NULL;
+ rv = apr_pollset_remove(iom->pollset, &desc);
+ }
+ else {
+ rv = APR_EINVALSOCK;
+ }
+ return rv;
+}
+
#include "apr_thread_proc.h"
#include "apr_signal.h"
#include "apr_thread_cond.h"
-#include "apr_thread_mutex.h"
-#include "apr_proc_mutex.h"
#define APR_WANT_STRFUNC
#include "apr_want.h"
#include "http_config.h" /* for read_config */
#include "http_core.h" /* for get_remote_host */
#include "http_connection.h"
+#include "http_vhost.h"
#include "ap_mpm.h"
#include "mpm_common.h"
#include "ap_listen.h"
#include "apr_atomic.h"
+#include "io_multiplexer.h"
+
/* Limit on the total --- clients will be locked out if more servers than
* this are needed. It is intended solely to keep the server from crashing
* when things get out of hand.
thread. Use this instead */
static pid_t parent_pid;
-/* Locks for accept serialization */
-static apr_proc_mutex_t *accept_mutex;
-
-#ifdef SINGLE_LISTEN_UNSERIALIZED_ACCEPT
-#define SAFE_ACCEPT(stmt) (ap_listeners->next ? (stmt) : APR_SUCCESS)
-#else
-#define SAFE_ACCEPT(stmt) (stmt)
-#endif
-
-
-/* Structure used to wake up an idle worker thread
+/* Max # of file descriptors that the io_multiplexer supports
+ * XXX should be configurable at runtime
*/
-struct worker_wakeup_info {
- apr_uint32_t next; /* index into worker_wakeups array,
- * used to build a linked list
- */
- apr_thread_cond_t *cond;
- apr_thread_mutex_t *mutex;
-};
-
-static worker_wakeup_info *worker_wakeup_create(apr_pool_t *pool)
-{
- apr_status_t rv;
- worker_wakeup_info *wakeup;
-
- wakeup = (worker_wakeup_info *)apr_palloc(pool, sizeof(*wakeup));
- if ((rv = apr_thread_cond_create(&wakeup->cond, pool)) != APR_SUCCESS) {
- return NULL;
- }
- if ((rv = apr_thread_mutex_create(&wakeup->mutex, APR_THREAD_MUTEX_DEFAULT,
- pool)) != APR_SUCCESS) {
- return NULL;
- }
- /* The wakeup's mutex will be unlocked automatically when
- * the worker blocks on the condition variable
- */
- apr_thread_mutex_lock(wakeup->mutex);
- return wakeup;
-}
-
+#define MAX_IOM_DESCRIPTORS 1024
-/* Structure used to hold a stack of idle worker threads
+/* Socket multiplexer used to watch large numbers of connections
+ * for readability/writeability
*/
-typedef struct {
- /* 'state' consists of several fields concatenated into a
- * single 32-bit int for use with the apr_atomic_cas32() API:
- * state & STACK_FIRST is the thread ID of the first thread
- * in a linked list of idle threads
- * state & STACK_TERMINATED indicates whether the proc is shutting down
- * state & STACK_NO_LISTENER indicates whether the process has
- * no current listener thread
- */
- apr_uint32_t state;
-} worker_stack;
-
-#define STACK_FIRST 0xffff
-#define STACK_LIST_END 0xffff
-#define STACK_TERMINATED 0x10000
-#define STACK_NO_LISTENER 0x20000
-
-static worker_wakeup_info **worker_wakeups = NULL;
-
-static worker_stack* worker_stack_create(apr_pool_t *pool, apr_size_t max)
-{
- worker_stack *stack = (worker_stack *)apr_palloc(pool, sizeof(*stack));
- stack->state = STACK_NO_LISTENER | STACK_LIST_END;
- return stack;
-}
-
-static apr_status_t worker_stack_wait(worker_stack *stack,
- apr_uint32_t worker_id)
-{
- worker_wakeup_info *wakeup = worker_wakeups[worker_id];
-
- while (1) {
- apr_uint32_t state = stack->state;
- if (state & (STACK_TERMINATED | STACK_NO_LISTENER)) {
- if (state & STACK_TERMINATED) {
- return APR_EINVAL;
- }
- if (apr_atomic_cas32(&(stack->state), STACK_LIST_END, state) !=
- state) {
- continue;
- }
- else {
- return APR_SUCCESS;
- }
- }
- wakeup->next = state;
- if (apr_atomic_cas32(&(stack->state), worker_id, state) != state) {
- continue;
- }
- else {
- return apr_thread_cond_wait(wakeup->cond, wakeup->mutex);
- }
- }
-}
-
-static apr_status_t worker_stack_awaken_next(worker_stack *stack)
-{
-
- while (1) {
- apr_uint32_t state = stack->state;
- apr_uint32_t first = state & STACK_FIRST;
- if (first == STACK_LIST_END) {
- if (apr_atomic_cas32(&(stack->state), state | STACK_NO_LISTENER,
- state) != state) {
- continue;
- }
- else {
- return APR_SUCCESS;
- }
- }
- else {
- worker_wakeup_info *wakeup = worker_wakeups[first];
- if (apr_atomic_cas32(&(stack->state), (state ^ first) | wakeup->next,
- state) != state) {
- continue;
- }
- else {
- /* Acquire and release the idle worker's mutex to ensure
- * that it's actually waiting on its condition variable
- */
- apr_status_t rv;
- if ((rv = apr_thread_mutex_lock(wakeup->mutex)) !=
- APR_SUCCESS) {
- return rv;
- }
- if ((rv = apr_thread_mutex_unlock(wakeup->mutex)) !=
- APR_SUCCESS) {
- return rv;
- }
- return apr_thread_cond_signal(wakeup->cond);
- }
- }
- }
-}
-
-static apr_status_t worker_stack_term(worker_stack *stack)
-{
- int i;
- apr_status_t rv;
-
- while (1) {
- apr_uint32_t state = stack->state;
- if (apr_atomic_cas32(&(stack->state), state | STACK_TERMINATED,
- state) == state) {
- break;
- }
- }
- for (i = 0; i < ap_threads_per_child; i++) {
- if ((rv = worker_stack_awaken_next(stack)) != APR_SUCCESS) {
- return rv;
- }
- }
- return APR_SUCCESS;
-}
-
-static worker_stack *idle_worker_stack;
+static io_multiplexer *iom;
#define ST_INIT 0
#define ST_GRACEFUL 1
mpm_state = AP_MPMQ_STOPPING;
workers_may_exit = 1;
- worker_stack_term(idle_worker_stack);
+ io_multiplexer_stop(iom, 0);
}
AP_DECLARE(apr_status_t) ap_mpm_query(int query_code, int *result)
case AP_MPMQ_IS_FORKED:
*result = AP_MPMQ_DYNAMIC;
return APR_SUCCESS;
+ case AP_MPMQ_IS_ASYNC:
+ *result = 1;
+ return APR_SUCCESS;
case AP_MPMQ_HARD_LIMIT_DAEMONS:
*result = server_limit;
return APR_SUCCESS;
return workers_may_exit;
}
-/*****************************************************************
- * Child process main loop.
- */
-
-static void process_socket(apr_pool_t *p, apr_socket_t *sock, int my_child_num,
- int my_thread_num, apr_bucket_alloc_t *bucket_alloc)
+static apr_status_t process_event_on_connection(const apr_pollfd_t *event,
+ io_multiplexer *iom,
+ int child_num, int thread_num)
{
- conn_rec *current_conn;
- long conn_id = ID_FROM_CHILD_THREAD(my_child_num, my_thread_num);
- int csd;
- ap_sb_handle_t *sbh;
-
- ap_create_sb_handle(&sbh, p, my_child_num, my_thread_num);
- apr_os_sock_get(&csd, sock);
-
- current_conn = ap_run_create_connection(p, ap_server_conf, sock,
- conn_id, sbh, bucket_alloc);
- if (current_conn) {
- ap_process_connection(current_conn, sock);
- ap_lingering_close(current_conn);
+ multiplexable *m = (multiplexable *)(event->client_data);
+ conn_rec *c = m->c;
+ conn_state_t *cs = c->cs;
+
+ if (cs->state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) {
+ if (event->rtnevents & IOM_POLL_TIMEOUT) {
+ cs->state = CONN_STATE_LINGER;
+ ap_log_error(APLOG_MARK, APLOG_CRIT, 0, ap_server_conf, "timeout on connection");
+ }
+ else if (event->rtnevents & (APR_POLLIN | APR_POLLPRI)) {
+ cs->state = CONN_STATE_READ_REQUEST_LINE;
+ }
+ else if (event->rtnevents & APR_POLLHUP) {
+ /* close... */
+ }
+ else if (event->rtnevents & (APR_POLLERR | APR_POLLNVAL)) {
+ /* error... */
+ }
}
+
+ if (cs->state == CONN_STATE_READ_REQUEST_LINE) {
+ if (!c->aborted) {
+ ap_run_process_connection(c);
+ }
+ else {
+ cs->state = CONN_STATE_LINGER;
+ }
+ }
+
+ if (cs->state == CONN_STATE_LINGER) {
+ ap_lingering_close(c);
+ apr_bucket_alloc_destroy(cs->bucket_alloc);
+ apr_pool_destroy(cs->p);
+ return APR_SUCCESS;
+ }
+
+ if (cs->state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) {
+ io_multiplexer_add(iom, m, ap_server_conf->keep_alive_timeout);
+ }
+
+ return APR_SUCCESS;
}
/* requests_this_child has gone to zero or below. See if the admin coded
#endif
}
+static apr_status_t accept_new_connection(const apr_pollfd_t *event,
+ io_multiplexer *iom,
+ int child_num, int thread_num)
+{
+ apr_allocator_t *allocator = NULL;
+ apr_pool_t *p = NULL;
+ apr_bucket_alloc_t *bucket_alloc = NULL;
+ apr_socket_t *new_socket = NULL;
+ void *csd;
+ apr_status_t rv;
+ multiplexable *client_data = (multiplexable *)(event->client_data);
+ ap_listen_rec *listener = client_data->l;
+
+ apr_allocator_create(&allocator);
+ apr_allocator_max_free_set(allocator, ap_max_mem_free);
+ apr_pool_create_ex(&p, NULL, NULL, allocator);
+ apr_allocator_owner_set(allocator, p);
+ bucket_alloc = apr_bucket_alloc_create_ex(allocator);
+
+ rv = listener->accept_func(&csd, listener, p);
+
+
+ if (rv == APR_SUCCESS) {
+ /* Re-register the listener with the multiplexer */
+ io_multiplexer_add(iom, client_data, -1);
+
+ if (csd == NULL) {
+ rv = APR_EGENERAL;
+ }
+ else {
+ new_socket = (apr_socket_t *)csd;
+ conn_rec *connection;
+ long conn_id = ID_FROM_CHILD_THREAD(child_num, thread_num);
+ ap_sb_handle_t *sbh;
+ ap_create_sb_handle(&sbh, p, child_num, thread_num);
+ connection = ap_run_create_connection(p, ap_server_conf,
+ new_socket, conn_id,
+ sbh, bucket_alloc);
+ if (connection == NULL) {
+ rv = APR_EGENERAL;
+ }
+ else {
+ conn_state_t *cs = connection->cs;
+ multiplexable *m;
+ int rc;
+
+ ap_update_vhost_given_ip(connection);
+ rc = ap_run_pre_connection(connection, csd);
+ if (rc != OK && rc != DONE) {
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
+ "accept_new_connection: connection aborted");
+ connection->aborted = 1;
+ }
+ m = (multiplexable *)apr_palloc(p, sizeof(*m));
+ m->type = IOM_CONNECTION;
+ m->c = connection;
+ cs->pfd.desc_type = APR_POLL_SOCKET;
+ cs->pfd.desc.s = new_socket;
+ cs->pfd.reqevents = APR_POLLIN;
+ cs->pfd.client_data = m;
+
+ rv = io_multiplexer_add(iom, m, -1);
+ if (rv != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf,
+ "io_multiplexer_add() failed");
+ }
+ }
+ }
+ }
+ if (rv != APR_SUCCESS) {
+ if (new_socket != NULL) {
+ apr_socket_close(new_socket);
+ }
+ if (bucket_alloc != NULL) {
+ apr_bucket_alloc_destroy(bucket_alloc); /* XXX unneeded? */
+ }
+ if (p != NULL) {
+ apr_pool_destroy(p);
+ }
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, rv, ap_server_conf, "not restoring listener");
+ }
+
+ return rv;
+}
+
static void *worker_thread(apr_thread_t *thd, void * dummy)
{
proc_info * ti = dummy;
int process_slot = ti->pid;
int thread_slot = ti->tid;
- apr_uint32_t my_worker_num = (apr_uint32_t)(ti->tid);
- apr_pool_t *tpool = apr_thread_pool_get(thd);
- void *csd = NULL;
apr_allocator_t *allocator;
apr_pool_t *ptrans; /* Pool for per-transaction stuff */
apr_bucket_alloc_t *bucket_alloc;
- int numdesc;
- apr_pollset_t *pollset;
apr_status_t rv;
- ap_listen_rec *lr;
- int is_listener;
- int last_poll_idx = 0;
ap_update_child_status_from_indexes(process_slot, thread_slot, SERVER_STARTING, NULL);
apr_allocator_owner_set(allocator, ptrans);
bucket_alloc = apr_bucket_alloc_create_ex(allocator);
- apr_pollset_create(&pollset, num_listensocks, tpool, 0);
- for (lr = ap_listeners ; lr != NULL ; lr = lr->next) {
- apr_pollfd_t pfd = { 0 };
-
- pfd.desc_type = APR_POLL_SOCKET;
- pfd.desc.s = lr->sd;
- pfd.reqevents = APR_POLLIN;
- pfd.client_data = lr;
-
- /* ### check the status */
- (void) apr_pollset_add(pollset, &pfd);
- }
-
- /* TODO: Switch to a system where threads reuse the results from earlier
- poll calls - manoj */
- is_listener = 0;
while (!workers_may_exit) {
-
+ apr_pollfd_t event;
ap_update_child_status_from_indexes(process_slot, thread_slot,
SERVER_READY, NULL);
- if (!is_listener) {
- /* Wait until it's our turn to become the listener */
- if ((rv = worker_stack_wait(idle_worker_stack, my_worker_num)) !=
- APR_SUCCESS) {
- if (rv != APR_EINVAL) {
- ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
- "worker_stack_wait failed. Shutting down");
- }
- break;
- }
- if (workers_may_exit) {
- break;
- }
- is_listener = 1;
- }
-
+ rv = io_multiplexer_get_event(iom, &event);
/* TODO: requests_this_child should be synchronized - aaron */
if (requests_this_child <= 0) {
check_infinite_requests();
}
- if (workers_may_exit) break;
-
- if ((rv = SAFE_ACCEPT(apr_proc_mutex_lock(accept_mutex)))
- != APR_SUCCESS) {
- int level = APLOG_EMERG;
-
- if (workers_may_exit) {
- break;
- }
- if (ap_scoreboard_image->parent[process_slot].generation !=
- ap_scoreboard_image->global->running_generation) {
- level = APLOG_DEBUG; /* common to get these at restart time */
- }
- ap_log_error(APLOG_MARK, level, rv, ap_server_conf,
- "apr_proc_mutex_lock failed. Attempting to shutdown "
- "process gracefully.");
- signal_threads(ST_GRACEFUL);
- break; /* skip the lock release */
- }
-
- if (!ap_listeners->next) {
- /* Only one listener, so skip the poll */
- lr = ap_listeners;
- }
- else {
- while (!workers_may_exit) {
- apr_status_t ret;
- const apr_pollfd_t *pdesc;
-
- ret = apr_pollset_poll(pollset, -1, &numdesc, &pdesc);
- if (ret != APR_SUCCESS) {
- if (APR_STATUS_IS_EINTR(ret)) {
- continue;
- }
-
- /* apr_pollset_poll() will only return errors in catastrophic
- * circumstances. Let's try exiting gracefully, for now. */
- ap_log_error(APLOG_MARK, APLOG_ERR, ret, (const server_rec *)
- ap_server_conf, "apr_pollset_poll: (listen)");
- signal_threads(ST_GRACEFUL);
- }
-
- if (workers_may_exit) break;
-
- /* We can always use pdesc[0], but sockets at position N
- * could end up completely starved of attention in a very
- * busy server. Therefore, we round-robin across the
- * returned set of descriptors. While it is possible that
- * the returned set of descriptors might flip around and
- * continue to starve some sockets, we happen to know the
- * internal pollset implementation retains ordering
- * stability of the sockets. Thus, the round-robin should
- * ensure that a socket will eventually be serviced.
- */
- if (last_poll_idx >= numdesc)
- last_poll_idx = 0;
-
- /* Grab a listener record from the client_data of the poll
- * descriptor, and advance our saved index to round-robin
- * the next fetch.
- *
- * ### hmm... this descriptor might have POLLERR rather
- * ### than POLLIN
- */
- lr = pdesc[last_poll_idx++].client_data;
- goto got_fd;
- }
+ if (workers_may_exit) {
+ break;
}
- got_fd:
- if (!workers_may_exit) {
- rv = lr->accept_func(&csd, lr, ptrans);
- /* later we trash rv and rely on csd to indicate success/failure */
- AP_DEBUG_ASSERT(rv == APR_SUCCESS || !csd);
-
- if (rv == APR_EGENERAL) {
- /* E[NM]FILE, ENOMEM, etc */
- resource_shortage = 1;
- signal_threads(ST_GRACEFUL);
+ if (rv == APR_SUCCESS) {
+ multiplexable *m = (multiplexable *)event.client_data;
+ if (m->type == IOM_CONNECTION) {
+ rv = process_event_on_connection(&event, iom, process_slot, thread_slot);
}
- if ((rv = SAFE_ACCEPT(apr_proc_mutex_unlock(accept_mutex)))
- != APR_SUCCESS) {
- int level = APLOG_EMERG;
-
- if (workers_may_exit) {
- break;
+ else if (m->type == IOM_LISTENER) {
+ rv = accept_new_connection(&event, iom, process_slot, thread_slot);
+ if (rv != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf,
+ "accept_new_connection() failed");
}
- if (ap_scoreboard_image->parent[process_slot].generation !=
- ap_scoreboard_image->global->running_generation) {
- level = APLOG_DEBUG; /* common to get these at restart time */
- }
- ap_log_error(APLOG_MARK, level, rv, ap_server_conf,
- "apr_proc_mutex_unlock failed. Attempting to "
- "shutdown process gracefully.");
- signal_threads(ST_GRACEFUL);
- }
- if (csd != NULL) {
- is_listener = 0;
- worker_stack_awaken_next(idle_worker_stack);
- process_socket(ptrans, csd, process_slot,
- thread_slot, bucket_alloc);
- apr_pool_clear(ptrans);
- requests_this_child--;
}
- if ((ap_mpm_pod_check(pod) == APR_SUCCESS) ||
- (ap_my_generation !=
- ap_scoreboard_image->global->running_generation)) {
- signal_threads(ST_GRACEFUL);
- break;
+ else {
+ ap_log_error(APLOG_MARK, APLOG_CRIT, 0, ap_server_conf,
+ "got event on polled object with unknown type %d",
+ m->type);
}
}
else {
- if ((rv = SAFE_ACCEPT(apr_proc_mutex_unlock(accept_mutex)))
- != APR_SUCCESS) {
- ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
- "apr_proc_mutex_unlock failed. Attempting to "
- "shutdown process gracefully.");
- signal_threads(ST_GRACEFUL);
- }
- break;
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf, "io_multiplexer_get_event failed");
}
}
dying = 1;
ap_scoreboard_image->parent[process_slot].quiescing = 1;
- worker_stack_term(idle_worker_stack);
+ io_multiplexer_stop(iom, 0);
ap_update_child_status_from_indexes(process_slot, thread_slot,
(dying) ? SERVER_DEAD : SERVER_GRACEFUL, (request_rec *) NULL);
int my_child_num = child_num_arg;
proc_info *my_info;
apr_status_t rv;
+ ap_listen_rec *listener;
int i;
int threads_created = 0;
int loops;
int prev_threads_created;
- idle_worker_stack = worker_stack_create(pchild, ap_threads_per_child);
- if (idle_worker_stack == NULL) {
- ap_log_error(APLOG_MARK, APLOG_ALERT, 0, ap_server_conf,
- "worker_stack_create() failed");
+ rv = io_multiplexer_create(&iom, pchild, MAX_IOM_DESCRIPTORS);
+ if (rv != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf,
+ "io_multiplexer_create() failed");
clean_child_exit(APEXIT_CHILDFATAL);
}
-
- worker_wakeups = (worker_wakeup_info **)
- apr_palloc(pchild, sizeof(worker_wakeup_info *) *
- ap_threads_per_child);
+ for (listener = ap_listeners; listener != NULL; listener = listener->next) {
+ apr_pollfd_t descriptor;
+ multiplexable *m = (multiplexable *)apr_palloc(pconf, sizeof(*m));
+ m->type = IOM_LISTENER;
+ m->l = listener;
+ descriptor.desc_type = APR_POLL_SOCKET;
+ descriptor.desc.s = listener->sd;
+ descriptor.reqevents = APR_POLLIN;
+ descriptor.client_data = NULL;
+ rv = io_multiplexer_add(iom, m, -1);
+ if (rv != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf,
+ "could not add listener to io_multiplexer");
+ }
+ }
loops = prev_threads_created = 0;
while (1) {
for (i = 0; i < ap_threads_per_child; i++) {
int status = ap_scoreboard_image->servers[child_num_arg][i].status;
- worker_wakeup_info *wakeup;
if (status != SERVER_GRACEFUL && status != SERVER_DEAD) {
continue;
}
- wakeup = worker_wakeup_create(pchild);
- if (wakeup == NULL) {
- ap_log_error(APLOG_MARK, APLOG_ALERT|APLOG_NOERRNO, 0,
- ap_server_conf, "worker_wakeup_create failed");
- clean_child_exit(APEXIT_CHILDFATAL);
- }
- worker_wakeups[threads_created] = wakeup;
my_info = (proc_info *)malloc(sizeof(proc_info));
if (my_info == NULL) {
ap_log_error(APLOG_MARK, APLOG_ALERT, errno, ap_server_conf,
/*stuff to do before we switch id's, so we have permissions.*/
ap_reopen_scoreboard(pchild, NULL, 0);
- rv = SAFE_ACCEPT(apr_proc_mutex_child_init(&accept_mutex, ap_lock_fname,
- pchild));
- if (rv != APR_SUCCESS) {
- ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
- "Couldn't initialize cross-process lock in child");
- clean_child_exit(APEXIT_CHILDFATAL);
- }
-
if (unixd_setup_child()) {
clean_child_exit(APEXIT_CHILDFATAL);
}
int ap_mpm_run(apr_pool_t *_pconf, apr_pool_t *plog, server_rec *s)
{
int remaining_children_to_start;
- apr_status_t rv;
ap_log_pid(pconf, ap_pid_fname);
"ignored during restart");
changed_limit_at_restart = 0;
}
-
- /* Initialize cross-process accept lock */
- ap_lock_fname = apr_psprintf(_pconf, "%s.%" APR_PID_T_FMT,
- ap_server_root_relative(_pconf, ap_lock_fname),
- ap_my_pid);
-
- rv = apr_proc_mutex_create(&accept_mutex, ap_lock_fname,
- ap_accept_lock_mech, _pconf);
- if (rv != APR_SUCCESS) {
- ap_log_error(APLOG_MARK, APLOG_EMERG, rv, s,
- "Couldn't create accept lock");
- mpm_state = AP_MPMQ_STOPPING;
- return 1;
- }
-
-#if APR_USE_SYSVSEM_SERIALIZE
- if (ap_accept_lock_mech == APR_LOCK_DEFAULT ||
- ap_accept_lock_mech == APR_LOCK_SYSVSEM) {
-#else
- if (ap_accept_lock_mech == APR_LOCK_SYSVSEM) {
-#endif
- rv = unixd_set_proc_mutex_perms(accept_mutex);
- if (rv != APR_SUCCESS) {
- ap_log_error(APLOG_MARK, APLOG_EMERG, rv, s,
- "Couldn't set permissions on cross-process lock; "
- "check User and Group directives");
- mpm_state = AP_MPMQ_STOPPING;
- return 1;
- }
- }
if (!is_graceful) {
if (ap_run_pre_mpm(s->process->pool, SB_SHARED) != OK) {
ap_get_server_version());
ap_log_error(APLOG_MARK, APLOG_INFO, 0, ap_server_conf,
"Server built: %s", ap_get_server_built());
-#ifdef AP_MPM_WANT_SET_ACCEPT_LOCK_MECH
- ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
- "AcceptMutex: %s (default: %s)",
- apr_proc_mutex_name(accept_mutex),
- apr_proc_mutex_defname());
-#endif
restart_pending = shutdown_pending = 0;
mpm_state = AP_MPMQ_RUNNING;