conn_state_t pub;
/** chaining in defer_linger_chain */
struct event_conn_state_t *chain;
+ /** Is lingering close from defer_lingering_close()? */
+ int deferred_linger;
};
APR_RING_HEAD(timeout_head_t, event_conn_state_t);
*/
static void TO_QUEUE_APPEND(struct timeout_queue *q, event_conn_state_t *el)
{
- apr_time_t q_expiry;
+ apr_time_t elem_expiry;
apr_time_t next_expiry;
APR_RING_INSERT_TAIL(&q->head, el, event_conn_state_t, timeout_list);
++*q->total;
++q->count;
- /* Cheaply update the overall queues' next expiry according to the
- * first entry of this queue (oldest), if necessary.
+ /* Cheaply update the global queues_next_expiry with the one of the
+ * first entry of this queue (oldest) if it expires before.
*/
el = APR_RING_FIRST(&q->head);
- q_expiry = el->queue_timestamp + q->timeout;
+ elem_expiry = el->queue_timestamp + q->timeout;
next_expiry = queues_next_expiry;
- if (!next_expiry || next_expiry > q_expiry + TIMEOUT_FUDGE_FACTOR) {
- queues_next_expiry = q_expiry;
+ if (!next_expiry || next_expiry > elem_expiry + TIMEOUT_FUDGE_FACTOR) {
+ queues_next_expiry = elem_expiry;
/* Unblock the poll()ing listener for it to update its timeout. */
if (listener_is_wakeable) {
apr_pollset_wakeup(event_pollset);
return 1;
}
-static void abort_socket_nonblocking(apr_socket_t *csd)
+static void close_socket_nonblocking_(apr_socket_t *csd,
+ const char *from, int line)
{
apr_status_t rv;
+ apr_os_sock_t fd = -1;
+
+ /* close_worker_sockets() may have closed it already */
+ rv = apr_os_sock_get(&fd, csd);
+ ap_log_error(APLOG_MARK, APLOG_TRACE8, 0, ap_server_conf,
+ "closing socket %i/%pp from %s:%i", (int)fd, csd, from, line);
+ if (rv == APR_SUCCESS && fd == -1) {
+ return;
+ }
+
apr_socket_timeout_set(csd, 0);
rv = apr_socket_close(csd);
if (rv != APR_SUCCESS) {
AP_DEBUG_ASSERT(0);
}
}
+#define close_socket_nonblocking(csd) \
+ close_socket_nonblocking_(csd, __FUNCTION__, __LINE__)
static void close_worker_sockets(void)
{
apr_socket_t *csd = worker_sockets[i];
if (csd) {
worker_sockets[i] = NULL;
- abort_socket_nonblocking(csd);
- }
- }
- for (;;) {
- event_conn_state_t *cs = defer_linger_chain;
- if (!cs) {
- break;
- }
- if (apr_atomic_casptr((void *)&defer_linger_chain, cs->chain,
- cs) != cs) {
- /* Race lost, try again */
- continue;
+ close_socket_nonblocking(csd);
}
- cs->chain = NULL;
- abort_socket_nonblocking(cs->pfd.desc.s);
}
}
static void wakeup_listener(void)
{
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
+ "wake up listener%s", listener_may_exit ? " again" : "");
+
listener_may_exit = 1;
disable_listensocks();
{
int is_last_connection;
event_conn_state_t *cs = cs_;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, cs->c,
+ "cleanup connection from state %i", (int)cs->pub.state);
switch (cs->pub.state) {
+ case CONN_STATE_LINGER:
case CONN_STATE_LINGER_NORMAL:
case CONN_STATE_LINGER_SHORT:
apr_atomic_dec32(&lingering_count);
|| (listeners_disabled() && !connections_above_limit(NULL)))) {
apr_pollset_wakeup(event_pollset);
}
+ if (dying) {
+ /* Help worker_thread_should_exit_early() */
+ ap_queue_interrupt_one(worker_queue);
+ }
return APR_SUCCESS;
}
}
/*
- * Close our side of the connection, flushing data to the client first.
- * Pre-condition: cs is not in any timeout queue and not in the pollset,
- * timeout_mutex is not locked
- * return: 0 if connection is fully closed,
- * 1 if connection is lingering
- * May only be called by worker thread.
+ * Defer flush and close of the connection by adding it to defer_linger_chain,
+ * for a worker to grab it and do the job (should that be blocking).
+ * Pre-condition: nonblocking, can be called from anywhere provided cs is not
+ * in any timeout queue or in the pollset.
*/
-static int start_lingering_close_blocking(event_conn_state_t *cs)
+static int defer_lingering_close(event_conn_state_t *cs)
{
- apr_socket_t *csd = cs->pfd.desc.s;
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c,
+ "deferring close from state %i", (int)cs->pub.state);
- if (ap_start_lingering_close(cs->c)) {
- notify_suspend(cs);
- apr_socket_close(csd);
- ap_queue_info_push_pool(worker_queue_info, cs->p);
- return DONE;
- }
-
-#ifdef AP_DEBUG
- {
- apr_status_t rv;
- rv = apr_socket_timeout_set(csd, 0);
- AP_DEBUG_ASSERT(rv == APR_SUCCESS);
- }
-#else
- apr_socket_timeout_set(csd, 0);
-#endif
-
- cs->queue_timestamp = apr_time_now();
- /*
- * If some module requested a shortened waiting period, only wait for
- * 2s (SECONDS_TO_LINGER). This is useful for mitigating certain
- * DoS attacks.
+ /* The connection is not shutdown() yet strictly speaking, but it's not
+ * in any queue nor handled by a worker either (will be very soon), so
+ * to account for it somewhere we bump lingering_count now (and set
+ * deferred_linger for process_lingering_close() to know).
*/
- if (apr_table_get(cs->c->notes, "short-lingering-close")) {
- cs->pub.state = CONN_STATE_LINGER_SHORT;
- }
- else {
- cs->pub.state = CONN_STATE_LINGER_NORMAL;
- }
+ cs->pub.state = CONN_STATE_LINGER;
apr_atomic_inc32(&lingering_count);
- notify_suspend(cs);
-
- return OK;
-}
-
-/*
- * Defer flush and close of the connection by adding it to defer_linger_chain,
- * for a worker to grab it and do the job (should that be blocking).
- * Pre-condition: cs is not in any timeout queue and not in the pollset,
- * timeout_mutex is not locked
- * return: 1 connection is alive (but aside and about to linger)
- * May be called by listener thread.
- */
-static int start_lingering_close_nonblocking(event_conn_state_t *cs)
-{
- event_conn_state_t *chain;
+ cs->deferred_linger = 1;
for (;;) {
- cs->chain = chain = defer_linger_chain;
+ event_conn_state_t *chain = cs->chain = defer_linger_chain;
if (apr_atomic_casptr((void *)&defer_linger_chain, cs,
chain) != chain) {
/* Race lost, try again */
}
}
-/*
- * forcibly close a lingering connection after the lingering period has
- * expired
- * Pre-condition: cs is not in any timeout queue and not in the pollset
- * return: irrelevant (need same prototype as start_lingering_close)
+/* Close the connection and release its resources (ptrans), either because an
+ * unrecoverable error occured (queues or pollset add/remove) or more usually
+ * if lingering close timed out.
+ * Pre-condition: nonblocking, can be called from anywhere provided cs is not
+ * in any timeout queue or in the pollset.
*/
-static int stop_lingering_close(event_conn_state_t *cs)
+static void close_connection(event_conn_state_t *cs)
{
- apr_socket_t *csd = ap_get_conn_socket(cs->c);
- ap_log_error(APLOG_MARK, APLOG_TRACE4, 0, ap_server_conf,
- "socket abort in state %i", (int)cs->pub.state);
- abort_socket_nonblocking(csd);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c,
+ "closing connection from state %i", (int)cs->pub.state);
+
+ close_socket_nonblocking(cs->pfd.desc.s);
ap_queue_info_push_pool(worker_queue_info, cs->p);
- if (dying)
- ap_queue_interrupt_one(worker_queue);
- return 0;
+}
+
+/* Shutdown the connection in case of timeout, error or resources shortage.
+ * This starts short lingering close if not already there, or directly closes
+ * the connection otherwise.
+ * Pre-condition: nonblocking, can be called from anywhere provided cs is not
+ * in any timeout queue or in the pollset.
+ */
+static int shutdown_connection(event_conn_state_t *cs)
+{
+ if (cs->pub.state < CONN_STATE_LINGER) {
+ apr_table_setn(cs->c->notes, "short-lingering-close", "1");
+ defer_lingering_close(cs);
+ }
+ else {
+ close_connection(cs);
+ }
+ return 1;
}
/*
/* Forward declare */
static void process_lingering_close(event_conn_state_t *cs);
+static void update_reqevents_from_sense(event_conn_state_t *cs, int sense)
+{
+ if (sense < 0) {
+ sense = cs->pub.sense;
+ }
+ if (sense == CONN_SENSE_WANT_READ) {
+ cs->pfd.reqevents = APR_POLLIN | APR_POLLHUP;
+ }
+ else {
+ cs->pfd.reqevents = APR_POLLOUT;
+ }
+ /* POLLERR is usually returned event only, but some pollset
+ * backends may require it in reqevents to do the right thing,
+ * so it shouldn't hurt (ignored otherwise).
+ */
+ cs->pfd.reqevents |= APR_POLLERR;
+
+ /* Reset to default for the next round */
+ cs->pub.sense = CONN_SENSE_DEFAULT;
+}
+
/*
* process one connection in the worker
*/
apr_pool_cleanup_null);
ap_set_module_config(c->conn_config, &mpm_event_module, cs);
c->current_thread = thd;
+ c->cs = &cs->pub;
cs->c = c;
- c->cs = &(cs->pub);
cs->p = p;
cs->sc = ap_get_module_config(ap_server_conf->module_config,
&mpm_event_module);
cs->pfd.desc_type = APR_POLL_SOCKET;
- cs->pfd.reqevents = APR_POLLIN;
cs->pfd.desc.s = sock;
+ update_reqevents_from_sense(cs, CONN_SENSE_WANT_READ);
pt->type = PT_CSD;
pt->baton = cs;
cs->pfd.client_data = pt;
cs->queue_timestamp = apr_time_now();
notify_suspend(cs);
- if (cs->pub.sense == CONN_SENSE_WANT_READ) {
- cs->pfd.reqevents = APR_POLLIN;
- }
- else {
- cs->pfd.reqevents = APR_POLLOUT;
- }
- /* POLLHUP/ERR are usually returned event only (ignored here), but
- * some pollset backends may require them in reqevents to do the
- * right thing, so it shouldn't hurt.
- */
- cs->pfd.reqevents |= APR_POLLHUP | APR_POLLERR;
- cs->pub.sense = CONN_SENSE_DEFAULT;
-
+ update_reqevents_from_sense(cs, -1);
apr_thread_mutex_lock(timeout_mutex);
TO_QUEUE_APPEND(cs->sc->wc_q, cs);
rv = apr_pollset_add(event_pollset, &cs->pfd);
ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03465)
"process_socket: apr_pollset_add failure for "
"write completion");
- apr_socket_close(cs->pfd.desc.s);
- ap_queue_info_push_pool(worker_queue_info, cs->p);
+ close_connection(cs);
+ signal_threads(ST_GRACEFUL);
}
else {
apr_thread_mutex_unlock(timeout_mutex);
}
return;
}
- else if (c->keepalive != AP_CONN_KEEPALIVE || c->aborted ||
- listener_may_exit) {
+ else if (c->keepalive != AP_CONN_KEEPALIVE || c->aborted) {
cs->pub.state = CONN_STATE_LINGER;
}
else if (c->data_in_input_filters) {
cs->pub.state = CONN_STATE_READ_REQUEST_LINE;
goto read_request;
}
- else {
+ else if (!listener_may_exit) {
cs->pub.state = CONN_STATE_CHECK_REQUEST_LINE_READABLE;
}
+ else {
+ cs->pub.state = CONN_STATE_LINGER;
+ }
}
if (cs->pub.state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) {
notify_suspend(cs);
/* Add work to pollset. */
- cs->pfd.reqevents = APR_POLLIN;
+ update_reqevents_from_sense(cs, CONN_SENSE_WANT_READ);
apr_thread_mutex_lock(timeout_mutex);
TO_QUEUE_APPEND(cs->sc->ka_q, cs);
rv = apr_pollset_add(event_pollset, &cs->pfd);
ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03093)
"process_socket: apr_pollset_add failure for "
"keep alive");
- apr_socket_close(cs->pfd.desc.s);
- ap_queue_info_push_pool(worker_queue_info, cs->p);
+ close_connection(cs);
+ signal_threads(ST_GRACEFUL);
}
else {
apr_thread_mutex_unlock(timeout_mutex);
return;
}
- if (cs->pub.state == CONN_STATE_LINGER) {
- rc = start_lingering_close_blocking(cs);
- }
- if (rc == OK && (cs->pub.state == CONN_STATE_LINGER_NORMAL ||
- cs->pub.state == CONN_STATE_LINGER_SHORT)) {
+ /* CONN_STATE_LINGER[_*] fall through process_lingering_close() */
+ if (cs->pub.state >= CONN_STATE_LINGER) {
process_lingering_close(cs);
+ return;
}
}
}
}
-static void close_listeners(int *closed)
+static int close_listeners(int *closed)
{
+ ap_log_error(APLOG_MARK, APLOG_TRACE6, 0, ap_server_conf,
+ "clos%s listeners (connection_count=%u)",
+ *closed ? "ed" : "ing", apr_atomic_read32(&connection_count));
if (!*closed) {
int i;
+
ap_close_listeners_ex(my_bucket->listeners);
- *closed = 1;
+ *closed = 1; /* once */
+
dying = 1;
ap_scoreboard_image->parent[ap_child_slot].quiescing = 1;
for (i = 0; i < threads_per_child; ++i) {
ap_queue_info_free_idle_pools(worker_queue_info);
ap_queue_interrupt_all(worker_queue);
+
+ return 1;
}
+ return 0;
}
static void unblock_signal(int sig)
/* trash the connection; we couldn't queue the connected
* socket to a worker
*/
- if (csd) {
- abort_socket_nonblocking(csd);
+ if (cs) {
+ shutdown_connection(cs);
}
- if (ptrans) {
- ap_queue_info_push_pool(worker_queue_info, ptrans);
+ else {
+ if (csd) {
+ close_socket_nonblocking(csd);
+ }
+ if (ptrans) {
+ ap_queue_info_push_pool(worker_queue_info, ptrans);
+ }
}
signal_threads(ST_GRACEFUL);
}
/* Okay, add sorted by when.. */
apr_skiplist_insert(timer_skiplist, te);
- /* Cheaply update the overall timers' next expiry according to
- * this event, if necessary.
+ /* Cheaply update the global timers_next_expiry with this event's
+ * if it expires before.
*/
next_expiry = timers_next_expiry;
if (!next_expiry || next_expiry > te->when + EVENT_FUDGE_FACTOR) {
/*
- * Close socket and clean up if remote closed its end while we were in
- * lingering close. Only to be called in the worker thread, and since it's
- * in immediate call stack, we can afford a comfortable buffer size to
- * consume data quickly.
+ * Flush data and close our side of the connection, then drain incoming data.
+ * If the latter would block put the connection in one of the linger timeout
+ * queues to be called back when ready, and repeat until it's closed by peer.
+ * Only to be called in the worker thread, and since it's in immediate call
+ * stack, we can afford a comfortable buffer size to consume data quickly.
+ * Pre-condition: cs is not in any timeout queue and not in the pollset,
+ * timeout_mutex is not locked
*/
#define LINGERING_BUF_SIZE (32 * 1024)
static void process_lingering_close(event_conn_state_t *cs)
apr_status_t rv;
struct timeout_queue *q;
- /* socket is already in non-blocking state */
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c,
+ "lingering close from state %i", (int)cs->pub.state);
+ AP_DEBUG_ASSERT(cs->pub.state >= CONN_STATE_LINGER);
+
+ if (cs->pub.state == CONN_STATE_LINGER) {
+ /* defer_lingering_close() may have bumped lingering_count already */
+ if (!cs->deferred_linger) {
+ apr_atomic_inc32(&lingering_count);
+ }
+
+ apr_socket_timeout_set(csd, apr_time_from_sec(SECONDS_TO_LINGER));
+ if (ap_start_lingering_close(cs->c)) {
+ notify_suspend(cs);
+ close_connection(cs);
+ return;
+ }
+
+ cs->queue_timestamp = apr_time_now();
+ /* Clear APR_INCOMPLETE_READ if it was ever set, we'll do the poll()
+ * at the listener only from now, if needed.
+ */
+ apr_socket_opt_set(csd, APR_INCOMPLETE_READ, 0);
+ /*
+ * If some module requested a shortened waiting period, only wait for
+ * 2s (SECONDS_TO_LINGER). This is useful for mitigating certain
+ * DoS attacks.
+ */
+ if (apr_table_get(cs->c->notes, "short-lingering-close")) {
+ cs->pub.state = CONN_STATE_LINGER_SHORT;
+ }
+ else {
+ cs->pub.state = CONN_STATE_LINGER_NORMAL;
+ }
+ notify_suspend(cs);
+ }
+
+ apr_socket_timeout_set(csd, 0);
do {
nbytes = sizeof(dummybuf);
rv = apr_socket_recv(csd, dummybuf, &nbytes);
} while (rv == APR_SUCCESS);
if (!APR_STATUS_IS_EAGAIN(rv)) {
- rv = apr_socket_close(csd);
- AP_DEBUG_ASSERT(rv == APR_SUCCESS);
- ap_queue_info_push_pool(worker_queue_info, cs->p);
+ close_connection(cs);
return;
}
- /* Re-queue the connection to come back when readable */
- cs->pfd.reqevents = APR_POLLIN;
- cs->pub.sense = CONN_SENSE_DEFAULT;
+ /* (Re)queue the connection to come back when readable */
+ update_reqevents_from_sense(cs, CONN_SENSE_WANT_READ);
q = (cs->pub.state == CONN_STATE_LINGER_SHORT) ? short_linger_q : linger_q;
apr_thread_mutex_lock(timeout_mutex);
TO_QUEUE_APPEND(q, cs);
apr_thread_mutex_unlock(timeout_mutex);
ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03092)
"process_lingering_close: apr_pollset_add failure");
- rv = apr_socket_close(cs->pfd.desc.s);
- AP_DEBUG_ASSERT(rv == APR_SUCCESS);
- ap_queue_info_push_pool(worker_queue_info, cs->p);
+ close_connection(cs);
+ signal_threads(ST_GRACEFUL);
return;
}
apr_thread_mutex_unlock(timeout_mutex);
}
-/* call 'func' for all elements of 'q' with timeout less than 'timeout_time'.
+/* call 'func' for all elements of 'q' above 'expiry'.
* Pre-condition: timeout_mutex must already be locked
* Post-condition: timeout_mutex will be locked again
*/
-static void process_timeout_queue(struct timeout_queue *q,
- apr_time_t timeout_time,
+static void process_timeout_queue(struct timeout_queue *q, apr_time_t expiry,
int (*func)(event_conn_state_t *))
{
apr_uint32_t total = 0, count;
while (cs != APR_RING_SENTINEL(&qp->head, event_conn_state_t,
timeout_list)) {
/* Trash the entry if:
- * - no timeout_time was given (asked for all), or
+ * - no expiry was given (zero means all), or
* - it expired (according to the queue timeout), or
* - the system clock skewed in the past: no entry should be
- * registered above the given timeout_time (~now) + the queue
+ * registered above the given expiry (~now) + the queue
* timeout, we won't keep any here (eg. for centuries).
*
* Otherwise stop, no following entry will match thanks to the
* single timeout per queue (entries are added to the end!).
* This allows maintenance in O(1).
*/
- if (timeout_time
- && cs->queue_timestamp + qp->timeout > timeout_time
- && cs->queue_timestamp < timeout_time + qp->timeout) {
- /* Since this is the next expiring of this queue, update the
- * overall queues' next expiry if it's later than this one.
+ if (expiry && cs->queue_timestamp + qp->timeout > expiry
+ && cs->queue_timestamp < expiry + qp->timeout) {
+ /* Since this is the next expiring entry of this queue, update
+ * the global queues_next_expiry if it's later than this one.
*/
- apr_time_t q_expiry = cs->queue_timestamp + qp->timeout;
+ apr_time_t elem_expiry = cs->queue_timestamp + qp->timeout;
apr_time_t next_expiry = queues_next_expiry;
if (!next_expiry
- || next_expiry > q_expiry + TIMEOUT_FUDGE_FACTOR) {
- queues_next_expiry = q_expiry;
+ || next_expiry > elem_expiry + TIMEOUT_FUDGE_FACTOR) {
+ queues_next_expiry = elem_expiry;
}
break;
}
apr_thread_mutex_lock(timeout_mutex);
}
-static void process_keepalive_queue(apr_time_t timeout_time)
+static void process_keepalive_queue(apr_time_t expiry)
{
/* If all workers are busy, we kill older keep-alive connections so
* that they may connect to another process.
*/
- if (!timeout_time) {
+ if (!expiry && *keepalive_q->total) {
ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
- "All workers are busy or dying, will close %u "
+ "All workers are busy or dying, will shutdown %u "
"keep-alive connections", *keepalive_q->total);
}
- process_timeout_queue(keepalive_q, timeout_time,
- start_lingering_close_nonblocking);
+ process_timeout_queue(keepalive_q, expiry, shutdown_connection);
}
static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
timer_event_t *te;
const apr_pollfd_t *out_pfd;
apr_int32_t num = 0;
- apr_interval_time_t timeout_interval;
- apr_time_t now, timeout_time;
+ apr_interval_time_t timeout;
+ apr_time_t now, expiry = -1;
int workers_were_busy = 0;
if (conns_this_child <= 0)
check_infinite_requests();
if (listener_may_exit) {
- close_listeners(&closed);
+ int first_close = close_listeners(&closed);
+
if (terminate_mode == ST_UNGRACEFUL
|| apr_atomic_read32(&connection_count) == 0)
break;
+
+ /* Don't wait in poll() for the first close (i.e. dying now), we
+ * want to maintain the queues and schedule defer_linger_chain ASAP
+ * to kill kept-alive connection and shutdown the workers and child
+ * faster.
+ */
+ if (first_close) {
+ goto do_maintenance; /* with expiry == -1 */
+ }
}
now = apr_time_now();
"keep-alive: %d lingering: %d suspended: %u)",
apr_atomic_read32(&connection_count),
apr_atomic_read32(&clogged_count),
- *(volatile apr_uint32_t*)write_completion_q->total,
- *(volatile apr_uint32_t*)keepalive_q->total,
+ apr_atomic_read32(write_completion_q->total),
+ apr_atomic_read32(keepalive_q->total),
apr_atomic_read32(&lingering_count),
apr_atomic_read32(&suspended_count));
if (dying) {
* up occurs, otherwise periodic checks (maintenance, shutdown, ...)
* must be performed.
*/
- timeout_interval = -1;
+ now = apr_time_now();
+ timeout = -1;
/* Push expired timers to a worker, the first remaining one determines
* the maximum time to poll() below, if any.
*/
- timeout_time = timers_next_expiry;
- if (timeout_time && timeout_time < now + EVENT_FUDGE_FACTOR) {
+ expiry = timers_next_expiry;
+ if (expiry && expiry < now) {
apr_thread_mutex_lock(g_timer_skiplist_mtx);
while ((te = apr_skiplist_peek(timer_skiplist))) {
- if (te->when > now + EVENT_FUDGE_FACTOR) {
+ if (te->when > now) {
timers_next_expiry = te->when;
- timeout_interval = te->when - now;
+ timeout = te->when - now;
break;
}
apr_skiplist_pop(timer_skiplist, NULL);
}
/* Same for queues, use their next expiry, if any. */
- timeout_time = queues_next_expiry;
- if (timeout_time
- && (timeout_interval < 0
- || timeout_time <= now
- || timeout_interval > timeout_time - now)) {
- timeout_interval = timeout_time > now ? timeout_time - now : 1;
+ expiry = queues_next_expiry;
+ if (expiry
+ && (timeout < 0
+ || expiry <= now
+ || timeout > expiry - now)) {
+ timeout = expiry > now ? expiry - now : 0;
}
/* When non-wakeable, don't wait more than 100 ms, in any case. */
#define NON_WAKEABLE_POLL_TIMEOUT apr_time_from_msec(100)
if (!listener_is_wakeable
- && (timeout_interval < 0
- || timeout_interval > NON_WAKEABLE_POLL_TIMEOUT)) {
- timeout_interval = NON_WAKEABLE_POLL_TIMEOUT;
+ && (timeout < 0
+ || timeout > NON_WAKEABLE_POLL_TIMEOUT)) {
+ timeout = NON_WAKEABLE_POLL_TIMEOUT;
}
+ else if (timeout > 0) {
+ /* apr_pollset_poll() might round down the timeout to milliseconds,
+ * let's forcibly round up here to never return before the timeout.
+ */
+ timeout = apr_time_from_msec(
+ apr_time_as_msec(timeout + apr_time_from_msec(1) - 1)
+ );
+ }
+
+ ap_log_error(APLOG_MARK, APLOG_TRACE7, 0, ap_server_conf,
+ "polling with timeout=%" APR_TIME_T_FMT
+ " queues_timeout=%" APR_TIME_T_FMT
+ " timers_timeout=%" APR_TIME_T_FMT,
+ timeout, queues_next_expiry - now,
+ timers_next_expiry - now);
- rc = apr_pollset_poll(event_pollset, timeout_interval, &num, &out_pfd);
+ rc = apr_pollset_poll(event_pollset, timeout, &num, &out_pfd);
if (rc != APR_SUCCESS) {
- if (APR_STATUS_IS_EINTR(rc)) {
- /* Woken up, if we are exiting or listeners are disabled we
- * must fall through to kill kept-alive connections or test
- * whether listeners should be re-enabled. Otherwise we only
- * need to update timeouts (logic is above, so simply restart
- * the loop).
- */
- if (!listener_may_exit && !listeners_disabled()) {
- continue;
- }
- timeout_time = 0;
- }
- else if (!APR_STATUS_IS_TIMEUP(rc)) {
+ if (!APR_STATUS_IS_EINTR(rc) && !APR_STATUS_IS_TIMEUP(rc)) {
ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf,
"apr_pollset_poll failed. Attempting to "
"shutdown process gracefully");
num = 0;
}
- if (listener_may_exit) {
- close_listeners(&closed);
- if (terminate_mode == ST_UNGRACEFUL
- || apr_atomic_read32(&connection_count) == 0)
- break;
+ if (APLOGtrace7(ap_server_conf)) {
+ now = apr_time_now();
+ ap_log_error(APLOG_MARK, APLOG_TRACE7, rc, ap_server_conf,
+ "polled with num=%u exit=%d/%d conns=%d"
+ " queues_timeout=%" APR_TIME_T_FMT
+ " timers_timeout=%" APR_TIME_T_FMT,
+ num, listener_may_exit, dying,
+ apr_atomic_read32(&connection_count),
+ queues_next_expiry - now, timers_next_expiry - now);
}
+ /* XXX possible optimization: stash the current time for use as
+ * r->request_time for new requests or queues maintenance
+ */
+
for (; num; --num, ++out_pfd) {
listener_poll_type *pt = (listener_poll_type *) out_pfd->client_data;
if (pt->type == PT_CSD) {
AP_DEBUG_ASSERT(0);
ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf,
APLOGNO(03094) "pollset remove failed");
- start_lingering_close_nonblocking(cs);
+ close_connection(cs);
+ signal_threads(ST_GRACEFUL);
break;
}
/* If we don't get a worker immediately (nonblocking), we
* close the connection; the client can re-connect to a
* different process for keepalive, and for lingering close
- * the connection will be reset so the choice is to favor
+ * the connection will be shutdown so the choice is to favor
* incoming/alive connections.
*/
get_worker(&have_idle_worker, blocking,
&workers_were_busy);
if (!have_idle_worker) {
- if (remove_from_q == cs->sc->ka_q) {
- start_lingering_close_nonblocking(cs);
- }
- else {
- stop_lingering_close(cs);
- }
+ shutdown_connection(cs);
}
else if (push2worker(cs, NULL, NULL) == APR_SUCCESS) {
have_idle_worker = 0;
} /* if:else on pt->type */
} /* for processing poll */
- /* XXX possible optimization: stash the current time for use as
- * r->request_time for new requests
- */
- /* We process the timeout queues here only when their overall next
- * expiry (read once above) is over. This happens accurately since
+ /* We process the timeout queues here only when the global
+ * queues_next_expiry is passed. This happens accurately since
* adding to the queues (in workers) can only decrease this expiry,
* while latest ones are only taken into account here (in listener)
* during queues' processing, with the lock held. This works both
* with and without wake-ability.
*/
- if (timeout_time && timeout_time < (now = apr_time_now())) {
- /* handle timed out sockets */
+ expiry = queues_next_expiry;
+do_maintenance:
+ if (expiry && expiry < (now = apr_time_now())) {
+ ap_log_error(APLOG_MARK, APLOG_TRACE7, 0, ap_server_conf,
+ "queues maintenance with timeout=%" APR_TIME_T_FMT,
+ expiry > 0 ? expiry - now : -1);
apr_thread_mutex_lock(timeout_mutex);
- /* Processing all the queues below will recompute this. */
+ /* Steps below will recompute this. */
queues_next_expiry = 0;
/* Step 1: keepalive timeouts */
}
/* Step 2: write completion timeouts */
process_timeout_queue(write_completion_q, now,
- start_lingering_close_nonblocking);
+ defer_lingering_close);
/* Step 3: (normal) lingering close completion timeouts */
- process_timeout_queue(linger_q, now,
- stop_lingering_close);
+ if (dying && linger_q->timeout > short_linger_q->timeout) {
+ /* Dying, force short timeout for normal lingering close */
+ linger_q->timeout = short_linger_q->timeout;
+ }
+ process_timeout_queue(linger_q, now, shutdown_connection);
/* Step 4: (short) lingering close completion timeouts */
- process_timeout_queue(short_linger_q, now,
- stop_lingering_close);
+ process_timeout_queue(short_linger_q, now, shutdown_connection);
apr_thread_mutex_unlock(timeout_mutex);
+ ap_log_error(APLOG_MARK, APLOG_TRACE7, 0, ap_server_conf,
+ "queues maintained with timeout=%" APR_TIME_T_FMT,
+ queues_next_expiry > now ? queues_next_expiry - now
+ : -1);
- ps->keep_alive = *(volatile apr_uint32_t*)keepalive_q->total;
- ps->write_completion = *(volatile apr_uint32_t*)write_completion_q->total;
+ ps->keep_alive = apr_atomic_read32(keepalive_q->total);
+ ps->write_completion = apr_atomic_read32(write_completion_q->total);
ps->connections = apr_atomic_read32(&connection_count);
ps->suspended = apr_atomic_read32(&suspended_count);
ps->lingering_close = apr_atomic_read32(&lingering_count);
}
else if ((workers_were_busy || dying)
- && *(volatile apr_uint32_t*)keepalive_q->total) {
+ && apr_atomic_read32(keepalive_q->total)) {
apr_thread_mutex_lock(timeout_mutex);
process_keepalive_queue(0); /* kill'em all \m/ */
apr_thread_mutex_unlock(timeout_mutex);
}
} /* listener main loop */
- close_listeners(&closed);
ap_queue_term(worker_queue);
apr_thread_exit(thd, APR_SUCCESS);
continue;
}
cs->chain = NULL;
+ AP_DEBUG_ASSERT(cs->pub.state == CONN_STATE_LINGER);
worker_sockets[thread_slot] = csd = cs->pfd.desc.s;
-#ifdef AP_DEBUG
- rv = apr_socket_timeout_set(csd, SECONDS_TO_LINGER);
- AP_DEBUG_ASSERT(rv == APR_SUCCESS);
-#else
- apr_socket_timeout_set(csd, SECONDS_TO_LINGER);
-#endif
- cs->pub.state = CONN_STATE_LINGER;
process_socket(thd, cs->p, csd, cs, process_slot, thread_slot);
worker_sockets[thread_slot] = NULL;
}
AP_DEBUG_ASSERT(i < num_listensocks);
pfd = &listener_pollfd[i];
- pfd->reqevents = APR_POLLIN;
+ pfd->reqevents = APR_POLLIN | APR_POLLHUP | APR_POLLERR;
pfd->desc_type = APR_POLL_SOCKET;
pfd->desc.s = lr->sd;
*/
iter = 0;
- while (iter < 10 && !dying) {
+ while (!dying) {
+ apr_sleep(apr_time_from_msec(500));
+ if (dying || ++iter > 10) {
+ break;
+ }
/* listener has not stopped accepting yet */
- apr_sleep(apr_time_make(0, 500000));
+ ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
+ "listener has not stopped accepting yet (%d iter)", iter);
wakeup_listener();
- ++iter;
}
- if (iter >= 10) {
+ if (iter > 10) {
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(00475)
"the listener thread didn't stop accepting");
}
* If the worker hasn't exited, then this blocks until
* they have (then cleans up).
*/
+ ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
+ "%s termination received, joining workers",
+ rv == AP_MPM_PODX_GRACEFUL ? "graceful" : "ungraceful");
join_workers(ts->listener, threads);
+ ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
+ "%s termination, workers joined, exiting",
+ rv == AP_MPM_PODX_GRACEFUL ? "graceful" : "ungraceful");
}
free(threads);