conn_state_t pub;
/** chaining in defer_linger_chain */
struct event_conn_state_t *chain;
+ /** Is lingering close from defer_lingering_close()? */
+ int deferred_linger;
};
APR_RING_HEAD(timeout_head_t, event_conn_state_t);
return 1;
}
-static void abort_socket_nonblocking(apr_socket_t *csd)
+static void close_socket_nonblocking_(apr_socket_t *csd,
+ const char *from, int line)
{
apr_status_t rv;
+ apr_os_sock_t fd = -1;
+
+ /* close_worker_sockets() may have closed it already */
+ rv = apr_os_sock_get(&fd, csd);
+ ap_log_error(APLOG_MARK, APLOG_TRACE8, 0, ap_server_conf,
+ "closing socket %i/%pp from %s:%i", (int)fd, csd, from, line);
+ if (rv == APR_SUCCESS && fd == -1) {
+ return;
+ }
+
apr_socket_timeout_set(csd, 0);
rv = apr_socket_close(csd);
if (rv != APR_SUCCESS) {
AP_DEBUG_ASSERT(0);
}
}
+#define close_socket_nonblocking(csd) \
+ close_socket_nonblocking_(csd, __FUNCTION__, __LINE__)
static void close_worker_sockets(void)
{
apr_socket_t *csd = worker_sockets[i];
if (csd) {
worker_sockets[i] = NULL;
- abort_socket_nonblocking(csd);
- }
- }
- for (;;) {
- event_conn_state_t *cs = defer_linger_chain;
- if (!cs) {
- break;
- }
- if (apr_atomic_casptr((void *)&defer_linger_chain, cs->chain,
- cs) != cs) {
- /* Race lost, try again */
- continue;
+ close_socket_nonblocking(csd);
}
- cs->chain = NULL;
- abort_socket_nonblocking(cs->pfd.desc.s);
}
}
static void wakeup_listener(void)
{
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
+ "wake up listener%s", listener_may_exit ? " again" : "");
+
listener_may_exit = 1;
disable_listensocks();
{
int is_last_connection;
event_conn_state_t *cs = cs_;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, cs->c,
+ "cleanup connection from state %i", (int)cs->pub.state);
switch (cs->pub.state) {
+ case CONN_STATE_LINGER:
case CONN_STATE_LINGER_NORMAL:
case CONN_STATE_LINGER_SHORT:
apr_atomic_dec32(&lingering_count);
|| (listeners_disabled() && !connections_above_limit(NULL)))) {
apr_pollset_wakeup(event_pollset);
}
+ if (dying) {
+ /* Help worker_thread_should_exit_early() */
+ ap_queue_interrupt_one(worker_queue);
+ }
return APR_SUCCESS;
}
ap_run_resume_connection(cs->c, cs->r);
}
+static void close_connection(event_conn_state_t *cs);
+
/*
* Close our side of the connection, flushing data to the client first.
* Pre-condition: cs is not in any timeout queue and not in the pollset,
{
apr_socket_t *csd = cs->pfd.desc.s;
+ /* defer_lingering_close() may have bumped lingering_count already */
+ if (!cs->deferred_linger) {
+ apr_atomic_inc32(&lingering_count);
+ }
+
+ apr_socket_timeout_set(csd, apr_time_from_sec(SECONDS_TO_LINGER));
if (ap_start_lingering_close(cs->c)) {
notify_suspend(cs);
- apr_socket_close(csd);
- ap_queue_info_push_pool(worker_queue_info, cs->p);
+ close_connection(cs);
return DONE;
}
-#ifdef AP_DEBUG
- {
- apr_status_t rv;
- rv = apr_socket_timeout_set(csd, 0);
- AP_DEBUG_ASSERT(rv == APR_SUCCESS);
- }
-#else
- apr_socket_timeout_set(csd, 0);
-#endif
-
cs->queue_timestamp = apr_time_now();
/*
* If some module requested a shortened waiting period, only wait for
else {
cs->pub.state = CONN_STATE_LINGER_NORMAL;
}
- apr_atomic_inc32(&lingering_count);
notify_suspend(cs);
-
return OK;
}
/*
* Defer flush and close of the connection by adding it to defer_linger_chain,
* for a worker to grab it and do the job (should that be blocking).
- * Pre-condition: cs is not in any timeout queue and not in the pollset,
- * timeout_mutex is not locked
- * return: 1 connection is alive (but aside and about to linger)
- * May be called by listener thread.
+ * Pre-condition: nonblocking, can be called from anywhere provided cs is not
+ * in any timeout queue or in the pollset.
*/
-static int start_lingering_close_nonblocking(event_conn_state_t *cs)
+static int defer_lingering_close(event_conn_state_t *cs)
{
- event_conn_state_t *chain;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c,
+ "deferring close from state %i", (int)cs->pub.state);
+
+ /* The connection is not shutdown() yet strictly speaking, but it's not
+ * in any queue nor handled by a worker either (will be very soon), so
+ * to account for it somewhere we bump lingering_count now (and set
+ * deferred_linger for process_lingering_close() to know).
+ */
+ cs->pub.state = CONN_STATE_LINGER;
+ apr_atomic_inc32(&lingering_count);
+ cs->deferred_linger = 1;
for (;;) {
- cs->chain = chain = defer_linger_chain;
+ event_conn_state_t *chain = cs->chain = defer_linger_chain;
if (apr_atomic_casptr((void *)&defer_linger_chain, cs,
chain) != chain) {
/* Race lost, try again */
}
}
-/*
- * forcibly close a lingering connection after the lingering period has
- * expired
- * Pre-condition: cs is not in any timeout queue and not in the pollset
- * return: irrelevant (need same prototype as start_lingering_close)
+/* Close the connection and release its resources (ptrans), either because an
+ * unrecoverable error occured (queues or pollset add/remove) or more usually
+ * if lingering close timed out.
+ * Pre-condition: nonblocking, can be called from anywhere provided cs is not
+ * in any timeout queue or in the pollset.
*/
-static int stop_lingering_close(event_conn_state_t *cs)
+static void close_connection(event_conn_state_t *cs)
{
- apr_socket_t *csd = ap_get_conn_socket(cs->c);
- ap_log_error(APLOG_MARK, APLOG_TRACE4, 0, ap_server_conf,
- "socket abort in state %i", (int)cs->pub.state);
- abort_socket_nonblocking(csd);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c,
+ "closing connection from state %i", (int)cs->pub.state);
+
+ close_socket_nonblocking(cs->pfd.desc.s);
ap_queue_info_push_pool(worker_queue_info, cs->p);
- if (dying)
- ap_queue_interrupt_one(worker_queue);
- return 0;
+}
+
+/* Shutdown the connection in case of timeout, error or resources shortage.
+ * This starts short lingering close if not already there, or directly closes
+ * the connection otherwise.
+ * Pre-condition: nonblocking, can be called from anywhere provided cs is not
+ * in any timeout queue or in the pollset.
+ */
+static int shutdown_connection(event_conn_state_t *cs)
+{
+ if (cs->pub.state < CONN_STATE_LINGER) {
+ apr_table_setn(cs->c->notes, "short-lingering-close", "1");
+ defer_lingering_close(cs);
+ }
+ else {
+ close_connection(cs);
+ }
+ return 1;
}
/*
/* Forward declare */
static void process_lingering_close(event_conn_state_t *cs);
-static void update_reqevents_from_sense(event_conn_state_t *cs)
+static void update_reqevents_from_sense(event_conn_state_t *cs, int sense)
{
- if (cs->pub.sense == CONN_SENSE_WANT_READ) {
+ if (sense < 0) {
+ sense = cs->pub.sense;
+ }
+ if (sense == CONN_SENSE_WANT_READ) {
cs->pfd.reqevents = APR_POLLIN | APR_POLLHUP;
}
else {
apr_pool_cleanup_null);
ap_set_module_config(c->conn_config, &mpm_event_module, cs);
c->current_thread = thd;
+ c->cs = &cs->pub;
cs->c = c;
- c->cs = &(cs->pub);
cs->p = p;
cs->sc = ap_get_module_config(ap_server_conf->module_config,
&mpm_event_module);
cs->pfd.desc_type = APR_POLL_SOCKET;
- cs->pfd.reqevents = APR_POLLIN;
cs->pfd.desc.s = sock;
+ update_reqevents_from_sense(cs, CONN_SENSE_WANT_READ);
pt->type = PT_CSD;
pt->baton = cs;
cs->pfd.client_data = pt;
cs->queue_timestamp = apr_time_now();
notify_suspend(cs);
- update_reqevents_from_sense(cs);
+ update_reqevents_from_sense(cs, -1);
apr_thread_mutex_lock(timeout_mutex);
TO_QUEUE_APPEND(cs->sc->wc_q, cs);
rv = apr_pollset_add(event_pollset, &cs->pfd);
ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03465)
"process_socket: apr_pollset_add failure for "
"write completion");
- apr_socket_close(cs->pfd.desc.s);
- ap_queue_info_push_pool(worker_queue_info, cs->p);
+ close_connection(cs);
+ signal_threads(ST_GRACEFUL);
}
else {
apr_thread_mutex_unlock(timeout_mutex);
}
if (pending != DECLINED
|| c->aborted
- || c->keepalive != AP_CONN_KEEPALIVE
- || listener_may_exit) {
+ || c->keepalive != AP_CONN_KEEPALIVE) {
cs->pub.state = CONN_STATE_LINGER;
}
else if (ap_run_input_pending(c) == OK) {
cs->pub.state = CONN_STATE_READ_REQUEST_LINE;
goto read_request;
}
- else {
+ else if (!listener_may_exit) {
cs->pub.state = CONN_STATE_CHECK_REQUEST_LINE_READABLE;
}
+ else {
+ cs->pub.state = CONN_STATE_LINGER;
+ }
}
if (cs->pub.state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) {
notify_suspend(cs);
/* Add work to pollset. */
- cs->pfd.reqevents = APR_POLLIN;
+ update_reqevents_from_sense(cs, CONN_SENSE_WANT_READ);
apr_thread_mutex_lock(timeout_mutex);
TO_QUEUE_APPEND(cs->sc->ka_q, cs);
rv = apr_pollset_add(event_pollset, &cs->pfd);
ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03093)
"process_socket: apr_pollset_add failure for "
"keep alive");
- apr_socket_close(cs->pfd.desc.s);
- ap_queue_info_push_pool(worker_queue_info, cs->p);
+ close_connection(cs);
+ signal_threads(ST_GRACEFUL);
}
else {
apr_thread_mutex_unlock(timeout_mutex);
cs->pub.state = CONN_STATE_WRITE_COMPLETION;
notify_suspend(cs);
- update_reqevents_from_sense(cs);
+ update_reqevents_from_sense(cs, -1);
apr_thread_mutex_lock(timeout_mutex);
TO_QUEUE_APPEND(cs->sc->wc_q, cs);
apr_pollset_add(event_pollset, &cs->pfd);
}
}
-static void close_listeners(int *closed)
+static int close_listeners(int *closed)
{
+ ap_log_error(APLOG_MARK, APLOG_TRACE6, 0, ap_server_conf,
+ "clos%s listeners (connection_count=%u)",
+ *closed ? "ed" : "ing", apr_atomic_read32(&connection_count));
if (!*closed) {
int i;
+
ap_close_listeners_ex(my_bucket->listeners);
- *closed = 1;
+ *closed = 1; /* once */
+
dying = 1;
ap_scoreboard_image->parent[ap_child_slot].quiescing = 1;
for (i = 0; i < threads_per_child; ++i) {
ap_queue_info_free_idle_pools(worker_queue_info);
ap_queue_interrupt_all(worker_queue);
+
+ return 1;
}
+ return 0;
}
static void unblock_signal(int sig)
/* trash the connection; we couldn't queue the connected
* socket to a worker
*/
- if (csd) {
- abort_socket_nonblocking(csd);
+ if (cs) {
+ shutdown_connection(cs);
}
- if (ptrans) {
- ap_queue_info_push_pool(worker_queue_info, ptrans);
+ else {
+ if (csd) {
+ close_socket_nonblocking(csd);
+ }
+ if (ptrans) {
+ ap_queue_info_push_pool(worker_queue_info, ptrans);
+ }
}
signal_threads(ST_GRACEFUL);
}
apr_status_t rv;
struct timeout_queue *q;
- /* socket is already in non-blocking state */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c,
+ "lingering close from state %i", (int)cs->pub.state);
+ AP_DEBUG_ASSERT(cs->pub.state >= CONN_STATE_LINGER);
+
+ apr_socket_timeout_set(csd, 0);
do {
nbytes = sizeof(dummybuf);
rv = apr_socket_recv(csd, dummybuf, &nbytes);
} while (rv == APR_SUCCESS);
if (!APR_STATUS_IS_EAGAIN(rv)) {
- rv = apr_socket_close(csd);
- AP_DEBUG_ASSERT(rv == APR_SUCCESS);
- ap_queue_info_push_pool(worker_queue_info, cs->p);
+ close_connection(cs);
return;
}
- /* Re-queue the connection to come back when readable */
- cs->pfd.reqevents = APR_POLLIN;
- cs->pub.sense = CONN_SENSE_DEFAULT;
+ /* (Re)queue the connection to come back when readable */
+ update_reqevents_from_sense(cs, CONN_SENSE_WANT_READ);
q = (cs->pub.state == CONN_STATE_LINGER_SHORT) ? short_linger_q : linger_q;
apr_thread_mutex_lock(timeout_mutex);
TO_QUEUE_APPEND(q, cs);
apr_thread_mutex_unlock(timeout_mutex);
ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03092)
"process_lingering_close: apr_pollset_add failure");
- rv = apr_socket_close(cs->pfd.desc.s);
- AP_DEBUG_ASSERT(rv == APR_SUCCESS);
- ap_queue_info_push_pool(worker_queue_info, cs->p);
+ close_connection(cs);
+ signal_threads(ST_GRACEFUL);
return;
}
apr_thread_mutex_unlock(timeout_mutex);
/* If all workers are busy, we kill older keep-alive connections so
* that they may connect to another process.
*/
- if (!timeout_time) {
+ if (!timeout_time && *keepalive_q->total) {
ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
- "All workers are busy or dying, will close %u "
+ "All workers are busy or dying, will shutdown %u "
"keep-alive connections", *keepalive_q->total);
}
- process_timeout_queue(keepalive_q, timeout_time,
- start_lingering_close_nonblocking);
+ process_timeout_queue(keepalive_q, timeout_time, shutdown_connection);
}
static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
apr_int32_t num = 0;
apr_interval_time_t timeout_interval;
socket_callback_baton_t *user_chain;
- apr_time_t now, timeout_time;
+ apr_time_t now, timeout_time = -1;
int workers_were_busy = 0;
if (conns_this_child <= 0)
check_infinite_requests();
if (listener_may_exit) {
- close_listeners(&closed);
+ int first_close = close_listeners(&closed);
+
if (terminate_mode == ST_UNGRACEFUL
|| apr_atomic_read32(&connection_count) == 0)
break;
+
+ /* Don't wait in poll() for the first close (i.e. dying now), we
+ * want to maintain the queues and schedule defer_linger_chain ASAP
+ * to kill kept-alive connection and shutdown the workers and child
+ * faster.
+ */
+ if (first_close) {
+ goto do_maintenance; /* with timeout_time == -1 */
+ }
}
if (APLOGtrace6(ap_server_conf)) {
}
#endif
- now = apr_time_now();
-
/* Start with an infinite poll() timeout and update it according to
* the next expiring timer or queue entry. If there are none, either
* the listener is wakeable and it can poll() indefinitely until a wake
* up occurs, otherwise periodic checks (maintenance, shutdown, ...)
* must be performed.
*/
+ now = apr_time_now();
timeout_interval = -1;
/* Push expired timers to a worker, the first remaining one determines
&& (timeout_interval < 0
|| timeout_time <= now
|| timeout_interval > timeout_time - now)) {
- timeout_interval = timeout_time > now ? timeout_time - now : 1;
+ timeout_interval = timeout_time > now ? timeout_time - now : 0;
}
/* When non-wakeable, don't wait more than 100 ms, in any case. */
|| timeout_interval > NON_WAKEABLE_POLL_TIMEOUT)) {
timeout_interval = NON_WAKEABLE_POLL_TIMEOUT;
}
+ else if (timeout_interval > 0) {
+ /* apr_pollset_poll() might round down the timeout to milliseconds,
+ * let's forcibly round up here to never return before the timeout.
+ */
+ timeout_interval = apr_time_from_msec(
+ apr_time_as_msec(timeout_interval + apr_time_from_msec(1) - 1)
+ );
+ }
+
+ ap_log_error(APLOG_MARK, APLOG_TRACE7, 0, ap_server_conf,
+ "polling with timeout=%" APR_TIME_T_FMT
+ " queues_timeout=%" APR_TIME_T_FMT
+ " timers_timeout=%" APR_TIME_T_FMT,
+ timeout_interval, queues_next_expiry - now,
+ timers_next_expiry - now);
rc = apr_pollset_poll(event_pollset, timeout_interval, &num, &out_pfd);
if (rc != APR_SUCCESS) {
- if (APR_STATUS_IS_EINTR(rc)) {
- /* Woken up, if we are exiting or listeners are disabled we
- * must fall through to kill kept-alive connections or test
- * whether listeners should be re-enabled. Otherwise we only
- * need to update timeouts (logic is above, so simply restart
- * the loop).
- */
- if (!listener_may_exit && !listeners_disabled()) {
- continue;
- }
- timeout_time = 0;
- }
- else if (!APR_STATUS_IS_TIMEUP(rc)) {
+ if (!APR_STATUS_IS_EINTR(rc) && !APR_STATUS_IS_TIMEUP(rc)) {
ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf,
APLOGNO(03267)
"apr_pollset_poll failed. Attempting to "
num = 0;
}
- if (listener_may_exit) {
- close_listeners(&closed);
- if (terminate_mode == ST_UNGRACEFUL
- || apr_atomic_read32(&connection_count) == 0)
- break;
+ if (APLOGtrace7(ap_server_conf)) {
+ now = apr_time_now();
+ ap_log_error(APLOG_MARK, APLOG_TRACE7, rc, ap_server_conf,
+ "polled with num=%u exit=%d/%d conns=%d"
+ " queues_timeout=%" APR_TIME_T_FMT
+ " timers_timeout=%" APR_TIME_T_FMT,
+ num, listener_may_exit, dying,
+ apr_atomic_read32(&connection_count),
+ queues_next_expiry - now, timers_next_expiry - now);
}
for (user_chain = NULL; num; --num, ++out_pfd) {
AP_DEBUG_ASSERT(0);
ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
APLOGNO(03094) "pollset remove failed");
- start_lingering_close_nonblocking(cs);
+ close_connection(cs);
+ signal_threads(ST_GRACEFUL);
break;
}
/* If we don't get a worker immediately (nonblocking), we
* close the connection; the client can re-connect to a
* different process for keepalive, and for lingering close
- * the connection will be reset so the choice is to favor
+ * the connection will be shutdown so the choice is to favor
* incoming/alive connections.
*/
get_worker(&have_idle_worker, blocking,
&workers_were_busy);
if (!have_idle_worker) {
- if (remove_from_q == cs->sc->ka_q) {
- start_lingering_close_nonblocking(cs);
- }
- else {
- stop_lingering_close(cs);
- }
+ shutdown_connection(cs);
}
else if (push2worker(cs, NULL, NULL) == APR_SUCCESS) {
have_idle_worker = 0;
* during queues' processing, with the lock held. This works both
* with and without wake-ability.
*/
+ timeout_time = queues_next_expiry;
+do_maintenance:
if (timeout_time && timeout_time < (now = apr_time_now())) {
- /* handle timed out sockets */
+ ap_log_error(APLOG_MARK, APLOG_TRACE7, 0, ap_server_conf,
+ "queues maintenance with timeout=%" APR_TIME_T_FMT,
+ timeout_time > 0 ? timeout_time - now : -1);
apr_thread_mutex_lock(timeout_mutex);
/* Processing all the queues below will recompute this. */
}
/* Step 2: write completion timeouts */
process_timeout_queue(write_completion_q, now,
- start_lingering_close_nonblocking);
+ defer_lingering_close);
/* Step 3: (normal) lingering close completion timeouts */
- process_timeout_queue(linger_q, now,
- stop_lingering_close);
+ if (dying && linger_q->timeout > short_linger_q->timeout) {
+ /* Dying, force short timeout for normal lingering close */
+ linger_q->timeout = short_linger_q->timeout;
+ }
+ process_timeout_queue(linger_q, now, shutdown_connection);
/* Step 4: (short) lingering close completion timeouts */
- process_timeout_queue(short_linger_q, now,
- stop_lingering_close);
+ process_timeout_queue(short_linger_q, now, shutdown_connection);
apr_thread_mutex_unlock(timeout_mutex);
+ ap_log_error(APLOG_MARK, APLOG_TRACE7, 0, ap_server_conf,
+ "queues maintained with timeout=%" APR_TIME_T_FMT,
+ queues_next_expiry > now ? queues_next_expiry - now
+ : -1);
ps->keep_alive = *(volatile apr_uint32_t*)keepalive_q->total;
ps->write_completion = *(volatile apr_uint32_t*)write_completion_q->total;
}
} /* listener main loop */
- close_listeners(&closed);
ap_queue_term(worker_queue);
apr_thread_exit(thd, APR_SUCCESS);
continue;
}
cs->chain = NULL;
+ AP_DEBUG_ASSERT(cs->pub.state == CONN_STATE_LINGER);
worker_sockets[thread_slot] = csd = cs->pfd.desc.s;
-#ifdef AP_DEBUG
- rv = apr_socket_timeout_set(csd, SECONDS_TO_LINGER);
- AP_DEBUG_ASSERT(rv == APR_SUCCESS);
-#else
- apr_socket_timeout_set(csd, SECONDS_TO_LINGER);
-#endif
- cs->pub.state = CONN_STATE_LINGER;
process_socket(thd, cs->p, csd, cs, process_slot, thread_slot);
worker_sockets[thread_slot] = NULL;
}
AP_DEBUG_ASSERT(i < num_listensocks);
pfd = &listener_pollfd[i];
- pfd->reqevents = APR_POLLIN;
+ pfd->reqevents = APR_POLLIN | APR_POLLHUP | APR_POLLERR;
pfd->desc_type = APR_POLL_SOCKET;
pfd->desc.s = lr->sd;
*/
iter = 0;
- while (iter < 10 && !dying) {
+ while (!dying) {
+ apr_sleep(apr_time_from_msec(500));
+ if (dying || ++iter > 10) {
+ break;
+ }
/* listener has not stopped accepting yet */
- apr_sleep(apr_time_make(0, 500000));
+ ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
+ "listener has not stopped accepting yet (%d iter)", iter);
wakeup_listener();
- ++iter;
}
- if (iter >= 10) {
+ if (iter > 10) {
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(00475)
"the listener thread didn't stop accepting");
}
* If the worker hasn't exited, then this blocks until
* they have (then cleans up).
*/
+ ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
+ "%s termination received, joining workers",
+ rv == AP_MPM_PODX_GRACEFUL ? "graceful" : "ungraceful");
join_workers(ts->listener, threads);
+ ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
+ "%s termination, workers joined, exiting",
+ rv == AP_MPM_PODX_GRACEFUL ? "graceful" : "ungraceful");
}
free(threads);