ap_rputs("</dl>", r);
if (is_async) {
- int write_completion = 0, lingering_close = 0, keep_alive = 0,
+ int read_line = 0, write_completion = 0, lingering_close = 0, keep_alive = 0,
connections = 0, stopping = 0, procs = 0;
/*
* These differ from 'busy' and 'ready' in how gracefully finishing
"<th colspan=\"3\">Async connections</th></tr>\n"
"<tr><th>total</th><th>accepting</th>"
"<th>busy</th><th>idle</th>"
- "<th>writing</th><th>keep-alive</th><th>closing</th></tr>\n", r);
+ "<th>reading</th><th>writing</th><th>keep-alive</th><th>closing</th></tr>\n", r);
for (i = 0; i < server_limit; ++i) {
ps_record = ap_get_scoreboard_process(i);
if (ps_record->pid) {
connections += ps_record->connections;
+ read_line += ps_record->read_line;
write_completion += ps_record->write_completion;
keep_alive += ps_record->keep_alive;
lingering_close += ps_record->lingering_close;
"<td>%s%s</td>"
"<td>%u</td><td>%s</td>"
"<td>%u</td><td>%u</td>"
- "<td>%u</td><td>%u</td><td>%u</td>"
+ "<td>%u</td><td>%u</td><td>%u</td><td>%u</td>"
"</tr>\n",
i, ps_record->pid,
dying, old,
ps_record->not_accepting ? "no" : "yes",
thread_busy_buffer[i],
thread_idle_buffer[i],
+ ps_record->read_line,
ps_record->write_completion,
ps_record->keep_alive,
ps_record->lingering_close);
"<td>%d</td><td>%d</td>"
"<td>%d</td><td> </td>"
"<td>%d</td><td>%d</td>"
- "<td>%d</td><td>%d</td><td>%d</td>"
+ "<td>%d</td><td>%d</td><td>%d</td><td>%d</td>"
"</tr>\n</table>\n",
procs, stopping,
connections,
busy_workers, idle_workers,
- write_completion, keep_alive, lingering_close);
+ read_line, write_completion, keep_alive, lingering_close);
}
else {
ap_rprintf(r, "Processes: %d\n"
"BusyWorkers: %d\n"
"IdleWorkers: %d\n"
"ConnsTotal: %d\n"
+ "ConnsAsyncReading: %d\n"
"ConnsAsyncWriting: %d\n"
"ConnsAsyncKeepAlive: %d\n"
"ConnsAsyncClosing: %d\n",
procs, stopping,
busy_workers, idle_workers,
connections,
- write_completion, keep_alive, lingering_close);
+ read_line, write_completion, keep_alive, lingering_close);
}
}
#include "util_md5.h"
#include "util_mutex.h"
#include "ap_provider.h"
+#include "ap_mpm.h"
#include "http_config.h"
#include "mod_proxy.h" /* for proxy_hook_section_post_config() */
{
SSLConnRec *sslconn = myConnConfig(c);
+ int status = DECLINED;
+
if (sslconn && !sslconn->disabled) {
/* On an active SSL connection, let the input filters initialize
* themselves which triggers the handshake, which again triggers
*/
apr_bucket_brigade* temp;
+ int async_mpm = 0;
+
temp = apr_brigade_create(c->pool, c->bucket_alloc);
- ap_get_brigade(c->input_filters, temp,
- AP_MODE_INIT, APR_BLOCK_READ, 0);
+
+ if (ap_mpm_query(AP_MPMQ_IS_ASYNC, &async_mpm) != APR_SUCCESS) {
+ async_mpm = 0;
+ }
+
+ if (async_mpm) {
+
+ /* Take advantage of an async MPM. If we see an EAGAIN,
+ * loop round and don't block.
+ */
+ apr_status_t rv;
+
+ rv = ap_get_brigade(c->input_filters, temp,
+ AP_MODE_INIT, APR_NONBLOCK_READ, 0);
+
+ if (rv == APR_SUCCESS) {
+ /* great news, lets continue */
+ status = DECLINED;
+ }
+ else if (rv == APR_EAGAIN) {
+ /* we've been asked to come around again, don't block */
+ status = OK;
+ }
+ else {
+ /* we failed, give up */
+ status = DONE;
+
+ c->aborted = 1;
+ }
+ }
+ else {
+ ap_get_brigade(c->input_filters, temp,
+ AP_MODE_INIT, APR_BLOCK_READ, 0);
+ }
+
apr_brigade_destroy(temp);
}
-
- return DECLINED;
+
+ return status;
}
/*
/*
* Several timeout queues that use different timeouts, so that we always can
* simply append to the end.
+ * read_line_q uses vhost's TimeOut FIXME - we can use a short timeout here
* write_completion_q uses vhost's TimeOut
* keepalive_q uses vhost's KeepAliveTimeOut
* linger_q uses MAX_SECS_TO_LINGER
* short_linger_q uses SECONDS_TO_LINGER
*/
-static struct timeout_queue *write_completion_q,
+static struct timeout_queue *read_line_q,
+ *write_completion_q,
*keepalive_q,
*linger_q,
*short_linger_q;
static int max_spawn_rate_per_bucket = MAX_SPAWN_RATE / 1;
struct event_srv_cfg_s {
- struct timeout_queue *wc_q,
+ struct timeout_queue *rl_q,
+ *wc_q,
*ka_q;
};
*/
if (rc != OK || (cs->pub.state >= CONN_STATE_NUM)
|| (cs->pub.state < CONN_STATE_LINGER
+ && cs->pub.state != CONN_STATE_READ_REQUEST_LINE
&& cs->pub.state != CONN_STATE_WRITE_COMPLETION
&& cs->pub.state != CONN_STATE_CHECK_REQUEST_LINE_READABLE
&& cs->pub.state != CONN_STATE_SUSPENDED)) {
cs->pub.state = CONN_STATE_LINGER;
}
+ if (cs->pub.state == CONN_STATE_READ_REQUEST_LINE) {
+ ap_update_child_status(cs->sbh, SERVER_BUSY_READ, NULL);
+
+ /* It greatly simplifies the logic to use a single timeout value per q
+ * because the new element can just be added to the end of the list and
+ * it will stay sorted in expiration time sequence. If brand new
+ * sockets are sent to the event thread for a readability check, this
+ * will be a slight behavior change - they use the non-keepalive
+ * timeout today. With a normal client, the socket will be readable in
+ * a few milliseconds anyway.
+ */
+ cs->queue_timestamp = apr_time_now();
+ notify_suspend(cs);
+
+ /* Add work to pollset. */
+ update_reqevents_from_sense(cs, -1);
+ apr_thread_mutex_lock(timeout_mutex);
+ TO_QUEUE_APPEND(cs->sc->rl_q, cs);
+ rv = apr_pollset_add(event_pollset, &cs->pfd);
+ if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
+ AP_DEBUG_ASSERT(0);
+ TO_QUEUE_REMOVE(cs->sc->rl_q, cs);
+ apr_thread_mutex_unlock(timeout_mutex);
+ ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO()
+ "process_socket: apr_pollset_add failure for "
+ "read request line");
+ close_connection(cs);
+ signal_threads(ST_GRACEFUL);
+ }
+ else {
+ apr_thread_mutex_unlock(timeout_mutex);
+ }
+ return;
+ }
+
if (cs->pub.state == CONN_STATE_WRITE_COMPLETION) {
int pending = DECLINED;
last_log = now;
apr_thread_mutex_lock(timeout_mutex);
ap_log_error(APLOG_MARK, APLOG_TRACE6, 0, ap_server_conf,
- "connections: %u (clogged: %u write-completion: %d "
+ "connections: %u (clogged: %u read-line: %d write-completion: %d "
"keep-alive: %d lingering: %d suspended: %u)",
apr_atomic_read32(&connection_count),
apr_atomic_read32(&clogged_count),
+ apr_atomic_read32(read_line_q->total),
apr_atomic_read32(write_completion_q->total),
apr_atomic_read32(keepalive_q->total),
apr_atomic_read32(&lingering_count),
int blocking = 0;
switch (cs->pub.state) {
+ case CONN_STATE_READ_REQUEST_LINE:
+ remove_from_q = cs->sc->rl_q;
+ blocking = 1;
+ break;
+
case CONN_STATE_WRITE_COMPLETION:
remove_from_q = cs->sc->wc_q;
blocking = 1;
else {
process_keepalive_queue(now);
}
- /* Step 2: write completion timeouts */
+ /* Step 2: read line timeouts */
+ process_timeout_queue(read_line_q, now,
+ defer_lingering_close);
+ /* Step 3: write completion timeouts */
process_timeout_queue(write_completion_q, now,
defer_lingering_close);
- /* Step 3: (normal) lingering close completion timeouts */
+ /* Step 4: (normal) lingering close completion timeouts */
if (dying && linger_q->timeout > short_linger_q->timeout) {
/* Dying, force short timeout for normal lingering close */
linger_q->timeout = short_linger_q->timeout;
}
process_timeout_queue(linger_q, now, shutdown_connection);
- /* Step 4: (short) lingering close completion timeouts */
+ /* Step 5: (short) lingering close completion timeouts */
process_timeout_queue(short_linger_q, now, shutdown_connection);
apr_thread_mutex_unlock(timeout_mutex);
: -1);
ps->keep_alive = apr_atomic_read32(keepalive_q->total);
+ ps->read_line = apr_atomic_read32(read_line_q->total);
ps->write_completion = apr_atomic_read32(write_completion_q->total);
ps->connections = apr_atomic_read32(&connection_count);
ps->suspended = apr_atomic_read32(&suspended_count);
struct {
struct timeout_queue *tail, *q;
apr_hash_t *hash;
- } wc, ka;
+ } rl, wc, ka;
/* Not needed in pre_config stage */
if (ap_state_query(AP_SQ_MAIN_STATE) == AP_SQ_MS_CREATE_PRE_CONFIG) {
return OK;
}
- wc.tail = ka.tail = NULL;
+ rl.tail = wc.tail = ka.tail = NULL;
+ rl.hash = apr_hash_make(ptemp);
wc.hash = apr_hash_make(ptemp);
ka.hash = apr_hash_make(ptemp);
ap_set_module_config(s->module_config, &mpm_event_module, sc);
if (!wc.tail) {
+
/* The main server uses the global queues */
+
+ rl.q = TO_QUEUE_MAKE(pconf, s->timeout, NULL);
+ apr_hash_set(rl.hash, &s->timeout, sizeof s->timeout, rl.q);
+ rl.tail = read_line_q = rl.q;
+
wc.q = TO_QUEUE_MAKE(pconf, s->timeout, NULL);
apr_hash_set(wc.hash, &s->timeout, sizeof s->timeout, wc.q);
wc.tail = write_completion_q = wc.q;
ka.tail = keepalive_q = ka.q;
}
else {
+
/* The vhosts use any existing queue with the same timeout,
* or their own queue(s) if there isn't */
+
+ rl.q = apr_hash_get(rl.hash, &s->timeout, sizeof s->timeout);
+ if (!rl.q) {
+ rl.q = TO_QUEUE_MAKE(pconf, s->timeout, rl.tail);
+ apr_hash_set(rl.hash, &s->timeout, sizeof s->timeout, rl.q);
+ rl.tail = rl.tail->next = rl.q;
+ }
+
wc.q = apr_hash_get(wc.hash, &s->timeout, sizeof s->timeout);
if (!wc.q) {
wc.q = TO_QUEUE_MAKE(pconf, s->timeout, wc.tail);
ka.tail = ka.tail->next = ka.q;
}
}
+ sc->rl_q = rl.q;
sc->wc_q = wc.q;
sc->ka_q = ka.q;
}