int h2_window_size; /* stream window size (http2) */
int min_workers; /* min # of worker threads/child */
int max_workers; /* max # of worker threads/child */
- int max_worker_idle_secs; /* max # of idle seconds for worker */
+ apr_interval_time_t idle_limit; /* max duration for idle workers */
int stream_max_mem_size; /* max # bytes held in memory/stream */
int h2_direct; /* if mod_h2 is active directly */
int modern_tls_only; /* Accept only modern TLS in HTTP/2 connections */
H2_INITIAL_WINDOW_SIZE, /* window_size */
-1, /* min workers */
-1, /* max workers */
- 10 * 60, /* max workers idle secs */
+ apr_time_from_sec(10 * 60), /* workers idle limit */
32 * 1024, /* stream max mem size */
-1, /* h2 direct mode */
1, /* modern TLS only */
conf->h2_window_size = DEF_VAL;
conf->min_workers = DEF_VAL;
conf->max_workers = DEF_VAL;
- conf->max_worker_idle_secs = DEF_VAL;
+ conf->idle_limit = DEF_VAL;
conf->stream_max_mem_size = DEF_VAL;
conf->h2_direct = DEF_VAL;
conf->modern_tls_only = DEF_VAL;
conf->padding_bits = DEF_VAL;
conf->padding_always = DEF_VAL;
conf->output_buffered = DEF_VAL;
- conf->stream_timeout = DEF_VAL;
+ conf->stream_timeout = DEF_VAL;
return conf;
}
n->h2_window_size = H2_CONFIG_GET(add, base, h2_window_size);
n->min_workers = H2_CONFIG_GET(add, base, min_workers);
n->max_workers = H2_CONFIG_GET(add, base, max_workers);
- n->max_worker_idle_secs = H2_CONFIG_GET(add, base, max_worker_idle_secs);
+ n->idle_limit = H2_CONFIG_GET(add, base, idle_limit);
n->stream_max_mem_size = H2_CONFIG_GET(add, base, stream_max_mem_size);
n->h2_direct = H2_CONFIG_GET(add, base, h2_direct);
n->modern_tls_only = H2_CONFIG_GET(add, base, modern_tls_only);
n->early_hints = H2_CONFIG_GET(add, base, early_hints);
n->padding_bits = H2_CONFIG_GET(add, base, padding_bits);
n->padding_always = H2_CONFIG_GET(add, base, padding_always);
- n->stream_timeout = H2_CONFIG_GET(add, base, stream_timeout);
+ n->stream_timeout = H2_CONFIG_GET(add, base, stream_timeout);
return n;
}
return H2_CONFIG_GET(conf, &defconf, min_workers);
case H2_CONF_MAX_WORKERS:
return H2_CONFIG_GET(conf, &defconf, max_workers);
- case H2_CONF_MAX_WORKER_IDLE_SECS:
- return H2_CONFIG_GET(conf, &defconf, max_worker_idle_secs);
+ case H2_CONF_MAX_WORKER_IDLE_LIMIT:
+ return H2_CONFIG_GET(conf, &defconf, idle_limit);
case H2_CONF_STREAM_MAX_MEM:
return H2_CONFIG_GET(conf, &defconf, stream_max_mem_size);
case H2_CONF_MODERN_TLS_ONLY:
case H2_CONF_MAX_WORKERS:
H2_CONFIG_SET(conf, max_workers, val);
break;
- case H2_CONF_MAX_WORKER_IDLE_SECS:
- H2_CONFIG_SET(conf, max_worker_idle_secs, val);
- break;
case H2_CONF_STREAM_MAX_MEM:
H2_CONFIG_SET(conf, stream_max_mem_size, val);
break;
case H2_CONF_STREAM_TIMEOUT:
H2_CONFIG_SET(conf, stream_timeout, val);
break;
+ case H2_CONF_MAX_WORKER_IDLE_LIMIT:
+ H2_CONFIG_SET(conf, idle_limit, val);
+ break;
default:
h2_srv_config_seti(conf, var, (int)val);
break;
return NULL;
}
-static const char *h2_conf_set_max_worker_idle_secs(cmd_parms *cmd,
- void *dirconf, const char *value)
+static const char *h2_conf_set_max_worker_idle_limit(cmd_parms *cmd,
+ void *dirconf, const char *value)
{
- int val = (int)apr_atoi64(value);
- if (val < 1) {
- return "value must be > 0";
+ apr_interval_time_t timeout;
+ apr_status_t rv = ap_timeout_parameter_parse(value, &timeout, "s");
+ if (rv != APR_SUCCESS) {
+ return "Invalid idle limit value";
}
- CONFIG_CMD_SET(cmd, dirconf, H2_CONF_MAX_WORKER_IDLE_SECS, val);
+ CONFIG_CMD_SET64(cmd, dirconf, H2_CONF_MAX_WORKER_IDLE_LIMIT, timeout);
return NULL;
}
return NULL;
}
-void h2_get_num_workers(server_rec *s, int *minw, int *maxw)
+void h2_get_workers_config(server_rec *s, int *pminw, int *pmaxw,
+ apr_time_t *pidle_limit)
{
int threads_per_child = 0;
- *minw = h2_config_sgeti(s, H2_CONF_MIN_WORKERS);
- *maxw = h2_config_sgeti(s, H2_CONF_MAX_WORKERS);
- ap_mpm_query(AP_MPMQ_MAX_THREADS, &threads_per_child);
+ *pminw = h2_config_sgeti(s, H2_CONF_MIN_WORKERS);
+ *pmaxw = h2_config_sgeti(s, H2_CONF_MAX_WORKERS);
- if (*minw <= 0) {
- *minw = threads_per_child;
- }
- if (*maxw <= 0) {
- /* As a default, this seems to work quite well under mpm_event.
- * For people enabling http2 under mpm_prefork, start 4 threads unless
- * configured otherwise. People get unhappy if their http2 requests are
- * blocking each other. */
- *maxw = 3 * (*minw) / 2;
- if (*maxw < 4) {
- *maxw = 4;
- }
+ ap_mpm_query(AP_MPMQ_MAX_THREADS, &threads_per_child);
+ if (*pminw <= 0) {
+ *pminw = threads_per_child;
+ }
+ if (*pmaxw <= 0) {
+ *pmaxw = H2MAX(4, 3 * (*pminw) / 2);
}
+ *pidle_limit = h2_config_sgeti64(s, H2_CONF_MAX_WORKER_IDLE_LIMIT);
}
#define AP_END_CMD AP_INIT_TAKE1(NULL, NULL, NULL, RSRC_CONF, NULL)
RSRC_CONF, "minimum number of worker threads per child"),
AP_INIT_TAKE1("H2MaxWorkers", h2_conf_set_max_workers, NULL,
RSRC_CONF, "maximum number of worker threads per child"),
- AP_INIT_TAKE1("H2MaxWorkerIdleSeconds", h2_conf_set_max_worker_idle_secs, NULL,
+ AP_INIT_TAKE1("H2MaxWorkerIdleSeconds", h2_conf_set_max_worker_idle_limit, NULL,
RSRC_CONF, "maximum number of idle seconds before a worker shuts down"),
AP_INIT_TAKE1("H2StreamMaxMemSize", h2_conf_set_stream_max_mem_size, NULL,
RSRC_CONF, "maximum number of bytes buffered in memory for a stream"),
*/
#include <assert.h>
-#include <apr_atomic.h>
+#include <apr_ring.h>
#include <apr_thread_mutex.h>
#include <apr_thread_cond.h>
#include "h2_workers.h"
#include "h2_util.h"
+typedef enum {
+ PROD_IDLE,
+ PROD_ACTIVE,
+ PROD_JOINED,
+} prod_state_t;
+
+struct ap_conn_producer_t {
+ APR_RING_ENTRY(ap_conn_producer_t) link;
+ const char *name;
+ void *baton;
+ ap_conn_producer_next *fn_next;
+ ap_conn_producer_done *fn_done;
+ volatile prod_state_t state;
+ volatile int conns_active;
+};
+
+
+typedef enum {
+ H2_SLOT_FREE,
+ H2_SLOT_RUN,
+ H2_SLOT_ZOMBIE,
+} h2_slot_state_t;
+
typedef struct h2_slot h2_slot;
struct h2_slot {
+ APR_RING_ENTRY(h2_slot) link;
int id;
- h2_slot *next;
+ apr_pool_t *pool;
+ h2_slot_state_t state;
+ volatile int should_shutdown;
+ volatile int is_idle;
h2_workers *workers;
- conn_rec *connection;
+ ap_conn_producer_t *prod;
apr_thread_t *thread;
- apr_thread_mutex_t *lock;
- apr_thread_cond_t *not_idle;
- /* atomic */ apr_uint32_t timed_out;
+ struct apr_thread_cond_t *more_work;
+ int activations;
};
-static h2_slot *pop_slot(h2_slot *volatile *phead)
-{
- /* Atomically pop a slot from the list */
- for (;;) {
- h2_slot *first = *phead;
- if (first == NULL) {
- return NULL;
- }
- if (apr_atomic_casptr((void*)phead, first->next, first) == first) {
- first->next = NULL;
- return first;
- }
- }
-}
+struct h2_workers {
+ server_rec *s;
+ apr_pool_t *pool;
+
+ apr_uint32_t max_slots;
+ apr_uint32_t min_active;
+ volatile int idle_limit;
+ volatile int aborted;
+ volatile int shutdown;
+ int dynamic;
+
+ volatile apr_uint32_t active_slots;
+ volatile apr_uint32_t idle_slots;
+
+ apr_threadattr_t *thread_attr;
+ h2_slot *slots;
+
+ APR_RING_HEAD(h2_slots_free, h2_slot) free;
+ APR_RING_HEAD(h2_slots_idle, h2_slot) idle;
+ APR_RING_HEAD(h2_slots_busy, h2_slot) busy;
+ APR_RING_HEAD(h2_slots_zombie, h2_slot) zombie;
+
+ APR_RING_HEAD(ap_conn_producer_active, ap_conn_producer_t) prod_active;
+ APR_RING_HEAD(ap_conn_producer_idle, ap_conn_producer_t) prod_idle;
+
+ struct apr_thread_mutex_t *lock;
+ struct apr_thread_cond_t *prod_done;
+ struct apr_thread_cond_t *all_done;
+};
-static void push_slot(h2_slot *volatile *phead, h2_slot *slot)
-{
- /* Atomically push a slot to the list */
- ap_assert(!slot->next);
- for (;;) {
- h2_slot *next = slot->next = *phead;
- if (apr_atomic_casptr((void*)phead, slot, next) == next) {
- return;
- }
- }
-}
static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx);
-static void slot_done(h2_slot *slot);
-static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot)
+static apr_status_t activate_slot(h2_workers *workers)
{
+ h2_slot *slot;
+ apr_pool_t *pool;
apr_status_t rv;
-
- slot->workers = workers;
- slot->connection = NULL;
- apr_thread_mutex_lock(workers->lock);
- if (!slot->lock) {
- rv = apr_thread_mutex_create(&slot->lock,
- APR_THREAD_MUTEX_DEFAULT,
- workers->pool);
- if (rv != APR_SUCCESS) goto cleanup;
+ if (APR_RING_EMPTY(&workers->free, h2_slot, link)) {
+ return APR_EAGAIN;
}
+ slot = APR_RING_FIRST(&workers->free);
+ ap_assert(slot->state == H2_SLOT_FREE);
+ APR_RING_REMOVE(slot, link);
- if (!slot->not_idle) {
- rv = apr_thread_cond_create(&slot->not_idle, workers->pool);
- if (rv != APR_SUCCESS) goto cleanup;
- }
-
ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_workers: new thread for slot %d", slot->id);
+ "h2_workers: activate slot %d", slot->id);
+
+ slot->state = H2_SLOT_RUN;
+ slot->should_shutdown = 0;
+ slot->is_idle = 0;
+ slot->pool = NULL;
+ ++workers->active_slots;
+ rv = apr_pool_create(&pool, workers->pool);
+ if (APR_SUCCESS != rv) goto cleanup;
+ apr_pool_tag(pool, "h2_worker_slot");
+ slot->pool = pool;
- /* thread will either immediately start work or add itself
- * to the idle queue */
- apr_atomic_inc32(&workers->worker_count);
- apr_atomic_set32(&slot->timed_out, 0);
rv = ap_thread_create(&slot->thread, workers->thread_attr,
- slot_run, slot, workers->pool);
- if (rv != APR_SUCCESS) {
- apr_atomic_dec32(&workers->worker_count);
- }
+ slot_run, slot, slot->pool);
cleanup:
- apr_thread_mutex_unlock(workers->lock);
if (rv != APR_SUCCESS) {
- push_slot(&workers->free, slot);
- }
- return rv;
-}
-
-static apr_status_t add_worker(h2_workers *workers)
-{
- h2_slot *slot = pop_slot(&workers->free);
- if (slot) {
- return activate_slot(workers, slot);
- }
- return APR_EAGAIN;
-}
-
-static void wake_idle_worker(h2_workers *workers)
-{
- h2_slot *slot;;
- for (;;) {
- slot = pop_slot(&workers->idle);
- if (!slot) {
- if (workers->dynamic && apr_atomic_read32(&workers->shutdown) == 0) {
- add_worker(workers);
- }
- return;
- }
- if (!apr_atomic_read32(&slot->timed_out)) {
- apr_thread_mutex_lock(slot->lock);
- apr_thread_cond_signal(slot->not_idle);
- apr_thread_mutex_unlock(slot->lock);
- return;
+ AP_DEBUG_ASSERT(0);
+ slot->state = H2_SLOT_FREE;
+ if (slot->pool) {
+ apr_pool_destroy(slot->pool);
+ slot->pool = NULL;
}
- slot_done(slot);
+ APR_RING_INSERT_TAIL(&workers->free, slot, h2_slot, link);
+ --workers->active_slots;
}
+ return rv;
}
static void join_zombies(h2_workers *workers)
{
h2_slot *slot;
- while ((slot = pop_slot(&workers->zombies))) {
- apr_status_t status;
+ apr_status_t status;
+
+ while (!APR_RING_EMPTY(&workers->zombie, h2_slot, link)) {
+ slot = APR_RING_FIRST(&workers->zombie);
+ APR_RING_REMOVE(slot, link);
+ ap_assert(slot->state == H2_SLOT_ZOMBIE);
ap_assert(slot->thread != NULL);
+
+ apr_thread_mutex_unlock(workers->lock);
apr_thread_join(&status, slot->thread);
- slot->thread = NULL;
+ apr_thread_mutex_lock(workers->lock);
- push_slot(&workers->free, slot);
+ slot->thread = NULL;
+ slot->state = H2_SLOT_FREE;
+ if (slot->pool) {
+ apr_pool_destroy(slot->pool);
+ slot->pool = NULL;
+ }
+ APR_RING_INSERT_TAIL(&workers->free, slot, h2_slot, link);
}
}
-static apr_status_t slot_pull_c2(h2_slot *slot, h2_mplx *m)
+static void wake_idle_worker(h2_workers *workers, ap_conn_producer_t *prod)
{
- apr_status_t rv;
-
- rv = h2_mplx_worker_pop_c2(m, &slot->connection);
- if (slot->connection) {
- return rv;
+ if (!APR_RING_EMPTY(&workers->idle, h2_slot, link)) {
+ h2_slot *slot;
+ for (slot = APR_RING_FIRST(&workers->idle);
+ slot != APR_RING_SENTINEL(&workers->idle, h2_slot, link);
+ slot = APR_RING_NEXT(slot, link)) {
+ if (slot->is_idle && !slot->should_shutdown) {
+ apr_thread_cond_signal(slot->more_work);
+ slot->is_idle = 0;
+ return;
+ }
+ }
+ }
+ if (workers->dynamic && !workers->shutdown
+ && (workers->active_slots < workers->max_slots)) {
+ activate_slot(workers);
}
- return APR_EOF;
-}
-
-static h2_fifo_op_t mplx_peek(void *head, void *ctx)
-{
- h2_mplx *m = head;
- h2_slot *slot = ctx;
-
- if (slot_pull_c2(slot, m) == APR_EAGAIN) {
- wake_idle_worker(slot->workers);
- return H2_FIFO_OP_REPUSH;
- }
- return H2_FIFO_OP_PULL;
}
/**
- * Get the next c2 for the given worker. Will block until a c2 arrives
- * or the max_wait timer expires and more than min workers exist.
+ * Get the next connection to work on.
*/
-static int get_next(h2_slot *slot)
+static conn_rec *get_next(h2_slot *slot)
{
h2_workers *workers = slot->workers;
- int non_essential = slot->id >= workers->min_workers;
- apr_status_t rv;
-
- while (apr_atomic_read32(&workers->aborted) == 0
- && apr_atomic_read32(&slot->timed_out) == 0) {
- ap_assert(slot->connection == NULL);
- if (non_essential && apr_atomic_read32(&workers->shutdown)) {
- /* Terminate non-essential worker on shutdown */
- break;
+ conn_rec *c = NULL;
+ ap_conn_producer_t *prod;
+ int has_more;
+
+ slot->prod = NULL;
+ if (!APR_RING_EMPTY(&workers->prod_active, ap_conn_producer_t, link)) {
+ slot->prod = prod = APR_RING_FIRST(&workers->prod_active);
+ APR_RING_REMOVE(prod, link);
+ AP_DEBUG_ASSERT(PROD_ACTIVE == prod->state);
+
+ c = prod->fn_next(prod->baton, &has_more);
+ if (c && has_more) {
+ APR_RING_INSERT_TAIL(&workers->prod_active, prod, ap_conn_producer_t, link);
+ wake_idle_worker(workers, slot->prod);
}
- if (h2_fifo_try_peek(workers->mplxs, mplx_peek, slot) == APR_EOF) {
- /* The queue is terminated with the MPM child being cleaned up,
- * just leave. */
- break;
+ else {
+ prod->state = PROD_IDLE;
+ APR_RING_INSERT_TAIL(&workers->prod_idle, prod, ap_conn_producer_t, link);
}
- if (slot->connection) {
- return 1;
+ if (c) {
+ ++prod->conns_active;
}
-
- join_zombies(workers);
-
- apr_thread_mutex_lock(slot->lock);
- if (apr_atomic_read32(&workers->aborted) == 0) {
- apr_uint32_t idle_secs;
-
- push_slot(&workers->idle, slot);
- if (non_essential
- && (idle_secs = apr_atomic_read32(&workers->max_idle_secs))) {
- rv = apr_thread_cond_timedwait(slot->not_idle, slot->lock,
- apr_time_from_sec(idle_secs));
- if (APR_TIMEUP == rv) {
- apr_atomic_set32(&slot->timed_out, 1);
- }
- }
- else {
- apr_thread_cond_wait(slot->not_idle, slot->lock);
- }
- }
- apr_thread_mutex_unlock(slot->lock);
}
- return 0;
+ return c;
}
-static void slot_done(h2_slot *slot)
+static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx)
{
+ h2_slot *slot = wctx;
h2_workers *workers = slot->workers;
+ conn_rec *c;
+ apr_status_t rv;
- push_slot(&workers->zombies, slot);
+ apr_thread_mutex_lock(workers->lock);
+ slot->state = H2_SLOT_RUN;
+ ++slot->activations;
+ APR_RING_ELEM_INIT(slot, link);
+ for(;;) {
+ if (APR_RING_NEXT(slot, link) != slot) {
+ /* slot is part of the idle ring from the last loop */
+ APR_RING_REMOVE(slot, link);
+ --workers->idle_slots;
+ }
+ slot->is_idle = 0;
+
+ if (!workers->aborted && !slot->should_shutdown) {
+ APR_RING_INSERT_TAIL(&workers->busy, slot, h2_slot, link);
+ do {
+ c = get_next(slot);
+ if (!c) {
+ break;
+ }
+ apr_thread_mutex_unlock(workers->lock);
+ /* See the discussion at <https://github.com/icing/mod_h2/issues/195>
+ *
+ * Each conn_rec->id is supposed to be unique at a point in time. Since
+ * some modules (and maybe external code) uses this id as an identifier
+ * for the request_rec they handle, it needs to be unique for secondary
+ * connections also.
+ *
+ * The MPM module assigns the connection ids and mod_unique_id is using
+ * that one to generate identifier for requests. While the implementation
+ * works for HTTP/1.x, the parallel execution of several requests per
+ * connection will generate duplicate identifiers on load.
+ *
+ * The original implementation for secondary connection identifiers used
+ * to shift the master connection id up and assign the stream id to the
+ * lower bits. This was cramped on 32 bit systems, but on 64bit there was
+ * enough space.
+ *
+ * As issue 195 showed, mod_unique_id only uses the lower 32 bit of the
+ * connection id, even on 64bit systems. Therefore collisions in request ids.
+ *
+ * The way master connection ids are generated, there is some space "at the
+ * top" of the lower 32 bits on allmost all systems. If you have a setup
+ * with 64k threads per child and 255 child processes, you live on the edge.
+ *
+ * The new implementation shifts 8 bits and XORs in the worker
+ * id. This will experience collisions with > 256 h2 workers and heavy
+ * load still. There seems to be no way to solve this in all possible
+ * configurations by mod_h2 alone.
+ */
+ if (c->master) {
+ c->id = (c->master->id << 8)^slot->id;
+ }
+ c->current_thread = thread;
+ AP_DEBUG_ASSERT(slot->prod);
- /* If this worker is the last one exiting and the MPM child is stopping,
- * unblock workers_pool_cleanup().
- */
- if (!apr_atomic_dec32(&workers->worker_count)
- && apr_atomic_read32(&workers->aborted)) {
- apr_thread_mutex_lock(workers->lock);
- apr_thread_cond_signal(workers->all_done);
- apr_thread_mutex_unlock(workers->lock);
- }
-}
+ ap_process_connection(c, ap_get_conn_socket(c));
+ slot->prod->fn_done(slot->prod->baton, c);
+ apr_thread_mutex_lock(workers->lock);
+ if (--slot->prod->conns_active <= 0) {
+ apr_thread_cond_broadcast(workers->prod_done);
+ }
+ if (slot->prod->state == PROD_IDLE) {
+ APR_RING_REMOVE(slot->prod, link);
+ slot->prod->state = PROD_ACTIVE;
+ APR_RING_INSERT_TAIL(&workers->prod_active, slot->prod, ap_conn_producer_t, link);
+ }
-static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx)
-{
- h2_slot *slot = wctx;
- conn_rec *c;
-
- /* Get the next c2 from mplx to process. */
- while (get_next(slot)) {
- /* See the discussion at <https://github.com/icing/mod_h2/issues/195>
- *
- * Each conn_rec->id is supposed to be unique at a point in time. Since
- * some modules (and maybe external code) uses this id as an identifier
- * for the request_rec they handle, it needs to be unique for secondary
- * connections also.
- *
- * The MPM module assigns the connection ids and mod_unique_id is using
- * that one to generate identifier for requests. While the implementation
- * works for HTTP/1.x, the parallel execution of several requests per
- * connection will generate duplicate identifiers on load.
- *
- * The original implementation for secondary connection identifiers used
- * to shift the master connection id up and assign the stream id to the
- * lower bits. This was cramped on 32 bit systems, but on 64bit there was
- * enough space.
- *
- * As issue 195 showed, mod_unique_id only uses the lower 32 bit of the
- * connection id, even on 64bit systems. Therefore collisions in request ids.
- *
- * The way master connection ids are generated, there is some space "at the
- * top" of the lower 32 bits on allmost all systems. If you have a setup
- * with 64k threads per child and 255 child processes, you live on the edge.
- *
- * The new implementation shifts 8 bits and XORs in the worker
- * id. This will experience collisions with > 256 h2 workers and heavy
- * load still. There seems to be no way to solve this in all possible
- * configurations by mod_h2 alone.
- */
- AP_DEBUG_ASSERT(slot->connection != NULL);
- c = slot->connection;
- slot->connection = NULL;
- c->id = (c->master->id << 8)^slot->id;
- c->current_thread = thread;
+ } while (!workers->aborted && !slot->should_shutdown);
+ APR_RING_REMOVE(slot, link); /* no longer busy */
+ }
- ap_process_connection(c, ap_get_conn_socket(c));
+ if (workers->aborted || slot->should_shutdown) {
+ break;
+ }
- h2_mplx_worker_c2_done(c);
+ join_zombies(workers);
+
+ /* we are idle */
+ APR_RING_INSERT_TAIL(&workers->idle, slot, h2_slot, link);
+ ++workers->idle_slots;
+ slot->is_idle = 1;
+ if (slot->id >= workers->min_active && workers->idle_limit) {
+ rv = apr_thread_cond_timedwait(slot->more_work, workers->lock,
+ workers->idle_limit);
+ if (APR_TIMEUP == rv) {
+ APR_RING_REMOVE(slot, link);
+ --workers->idle_slots;
+ ap_log_error(APLOG_MARK, APLOG_ERR, 0, workers->s,
+ "h2_workers: idle timeout slot %d in state %d (%d activations)",
+ slot->id, slot->state, slot->activations);
+ break;
+ }
+ }
+ else {
+ apr_thread_cond_wait(slot->more_work, workers->lock);
+ }
}
- if (apr_atomic_read32(&slot->timed_out) == 0) {
- slot_done(slot);
+ ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
+ "h2_workers: terminate slot %d in state %d (%d activations)",
+ slot->id, slot->state, slot->activations);
+ slot->is_idle = 0;
+ slot->state = H2_SLOT_ZOMBIE;
+ slot->should_shutdown = 0;
+ APR_RING_INSERT_TAIL(&workers->zombie, slot, h2_slot, link);
+ --workers->active_slots;
+ if (workers->active_slots <= 0) {
+ apr_thread_cond_broadcast(workers->all_done);
}
+ apr_thread_mutex_unlock(workers->lock);
apr_thread_exit(thread, APR_SUCCESS);
return NULL;
}
-static void wake_non_essential_workers(h2_workers *workers)
+static void wake_all_idles(h2_workers *workers)
{
h2_slot *slot;
- /* pop all idle, signal the non essentials and add the others again */
- if ((slot = pop_slot(&workers->idle))) {
- wake_non_essential_workers(workers);
- if (slot->id > workers->min_workers) {
- apr_thread_mutex_lock(slot->lock);
- apr_thread_cond_signal(slot->not_idle);
- apr_thread_mutex_unlock(slot->lock);
- }
- else {
- push_slot(&workers->idle, slot);
- }
- }
-}
-
-static void workers_abort_idle(h2_workers *workers)
-{
- h2_slot *slot;
-
- apr_atomic_set32(&workers->shutdown, 1);
- apr_atomic_set32(&workers->aborted, 1);
- h2_fifo_term(workers->mplxs);
-
- /* abort all idle slots */
- while ((slot = pop_slot(&workers->idle))) {
- apr_thread_mutex_lock(slot->lock);
- apr_thread_cond_signal(slot->not_idle);
- apr_thread_mutex_unlock(slot->lock);
+ for (slot = APR_RING_FIRST(&workers->idle);
+ slot != APR_RING_SENTINEL(&workers->idle, h2_slot, link);
+ slot = APR_RING_NEXT(slot, link))
+ {
+ apr_thread_cond_signal(slot->more_work);
}
}
int n, wait_sec = 5;
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s,
- "h2_workers: cleanup %d workers idling",
- (int)apr_atomic_read32(&workers->worker_count));
- workers_abort_idle(workers);
+ "h2_workers: cleanup %d workers (%d idle)",
+ workers->active_slots, workers->idle_slots);
+ apr_thread_mutex_lock(workers->lock);
+ workers->shutdown = 1;
+ workers->aborted = 1;
+ wake_all_idles(workers);
+ apr_thread_mutex_unlock(workers->lock);
/* wait for all the workers to become zombies and join them.
* this gets called after the mpm shuts down and all connections
* have either been handled (graceful) or we are forced exiting
* (ungrateful). Either way, we show limited patience. */
- apr_thread_mutex_lock(workers->lock);
end = apr_time_now() + apr_time_from_sec(wait_sec);
- while ((n = apr_atomic_read32(&workers->worker_count)) > 0
- && apr_time_now() < end) {
+ while (apr_time_now() < end) {
+ apr_thread_mutex_lock(workers->lock);
+ if (!(n = workers->active_slots)) {
+ apr_thread_mutex_unlock(workers->lock);
+ break;
+ }
+ wake_all_idles(workers);
rv = apr_thread_cond_timedwait(workers->all_done, workers->lock, timeout);
+ apr_thread_mutex_unlock(workers->lock);
+
if (APR_TIMEUP == rv) {
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s,
- APLOGNO(10290) "h2_workers: waiting for idle workers to close, "
- "still seeing %d workers living",
- apr_atomic_read32(&workers->worker_count));
- continue;
+ APLOGNO(10290) "h2_workers: waiting for workers to close, "
+ "still seeing %d workers (%d idle) living",
+ workers->active_slots, workers->idle_slots);
}
}
if (n) {
ap_log_error(APLOG_MARK, APLOG_WARNING, 0, workers->s,
- APLOGNO(10291) "h2_workers: cleanup, %d idle workers "
+ APLOGNO(10291) "h2_workers: cleanup, %d workers (%d idle) "
"did not exit after %d seconds.",
- n, wait_sec);
+ n, workers->idle_slots, wait_sec);
}
- apr_thread_mutex_unlock(workers->lock);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s,
"h2_workers: cleanup all workers terminated");
+ apr_thread_mutex_lock(workers->lock);
join_zombies(workers);
+ apr_thread_mutex_unlock(workers->lock);
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s,
"h2_workers: cleanup zombie workers joined");
}
h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pchild,
- int min_workers, int max_workers,
- int idle_secs)
+ int max_slots, int min_active, apr_time_t idle_limit)
{
apr_status_t rv;
h2_workers *workers;
apr_pool_t *pool;
- int i, n;
+ apr_allocator_t *allocator;
+ int i, locked = 0;
ap_assert(s);
ap_assert(pchild);
* guarded by our lock. Without this pool, all subpool creations would
* happen on the pool handed to us, which we do not guard.
*/
- apr_pool_create(&pool, pchild);
+ rv = apr_allocator_create(&allocator);
+ if (rv != APR_SUCCESS) {
+ goto cleanup;
+ }
+ rv = apr_pool_create_ex(&pool, pchild, NULL, allocator);
+ if (rv != APR_SUCCESS) {
+ apr_allocator_destroy(allocator);
+ goto cleanup;
+ }
+ apr_allocator_owner_set(allocator, pool);
apr_pool_tag(pool, "h2_workers");
workers = apr_pcalloc(pool, sizeof(h2_workers));
if (!workers) {
workers->s = s;
workers->pool = pool;
- workers->min_workers = min_workers;
- workers->max_workers = max_workers;
- workers->max_idle_secs = (idle_secs > 0)? idle_secs : 10;
+ workers->min_active = min_active;
+ workers->max_slots = max_slots;
+ workers->idle_limit = (idle_limit > 0)? idle_limit : apr_time_from_sec(10);
+ workers->dynamic = (workers->min_active < workers->max_slots);
- ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s,
- "h2_workers: created with min=%d max=%d idle_timeout=%d sec",
- workers->min_workers, workers->max_workers,
- (int)workers->max_idle_secs);
- /* FIXME: the fifo set we use here has limited capacity. Once the
- * set is full, connections with new requests do a wait.
- */
- rv = h2_fifo_set_create(&workers->mplxs, pool, 16 * 1024);
- if (rv != APR_SUCCESS) goto cleanup;
+ ap_log_error(APLOG_MARK, APLOG_INFO, 0, workers->s,
+ "h2_workers: created with min=%d max=%d idle_ms=%d",
+ workers->min_active, workers->max_slots,
+ (int)apr_time_as_msec(idle_limit));
+
+ APR_RING_INIT(&workers->idle, h2_slot, link);
+ APR_RING_INIT(&workers->busy, h2_slot, link);
+ APR_RING_INIT(&workers->free, h2_slot, link);
+ APR_RING_INIT(&workers->zombie, h2_slot, link);
+
+ APR_RING_INIT(&workers->prod_active, ap_conn_producer_t, link);
+ APR_RING_INIT(&workers->prod_idle, ap_conn_producer_t, link);
rv = apr_threadattr_create(&workers->thread_attr, workers->pool);
if (rv != APR_SUCCESS) goto cleanup;
if (rv != APR_SUCCESS) goto cleanup;
rv = apr_thread_cond_create(&workers->all_done, workers->pool);
if (rv != APR_SUCCESS) goto cleanup;
+ rv = apr_thread_cond_create(&workers->prod_done, workers->pool);
+ if (rv != APR_SUCCESS) goto cleanup;
- n = workers->nslots = workers->max_workers;
- workers->slots = apr_pcalloc(workers->pool, n * sizeof(h2_slot));
- if (workers->slots == NULL) {
- n = workers->nslots = 0;
- rv = APR_ENOMEM;
- goto cleanup;
- }
- for (i = 0; i < n; ++i) {
+ apr_thread_mutex_lock(workers->lock);
+ locked = 1;
+
+ /* create the slots and put them on the free list */
+ workers->slots = apr_pcalloc(workers->pool, workers->max_slots * sizeof(h2_slot));
+
+ for (i = 0; i < workers->max_slots; ++i) {
workers->slots[i].id = i;
- }
- /* we activate all for now, TODO: support min_workers again.
- * do this in reverse for vanity reasons so slot 0 will most
- * likely be at head of idle queue. */
- n = workers->min_workers;
- for (i = n-1; i >= 0; --i) {
- rv = activate_slot(workers, &workers->slots[i]);
+ workers->slots[i].state = H2_SLOT_FREE;
+ workers->slots[i].workers = workers;
+ APR_RING_ELEM_INIT(&workers->slots[i], link);
+ APR_RING_INSERT_TAIL(&workers->free, &workers->slots[i], h2_slot, link);
+ rv = apr_thread_cond_create(&workers->slots[i].more_work, workers->pool);
if (rv != APR_SUCCESS) goto cleanup;
}
- /* the rest of the slots go on the free list */
- for(i = n; i < workers->nslots; ++i) {
- push_slot(&workers->free, &workers->slots[i]);
+
+ /* activate the min amount of workers */
+ for (i = 0; i < workers->min_active; ++i) {
+ rv = activate_slot(workers);
+ if (rv != APR_SUCCESS) goto cleanup;
}
- workers->dynamic = (workers->worker_count < workers->max_workers);
cleanup:
+ if (locked) {
+ apr_thread_mutex_unlock(workers->lock);
+ }
if (rv == APR_SUCCESS) {
/* Stop/join the workers threads when the MPM child exits (pchild is
* destroyed), and as a pre_cleanup of pchild thus before the threads
apr_pool_pre_cleanup_register(pchild, workers, workers_pool_cleanup);
return workers;
}
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, rv, workers->s,
+ "h2_workers: errors initializing");
return NULL;
}
-apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m)
+apr_size_t h2_workers_get_max_workers(h2_workers *workers)
{
- apr_status_t status = h2_fifo_push(workers->mplxs, m);
- wake_idle_worker(workers);
- return status;
+ return workers->max_slots;
}
-apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m)
+void h2_workers_graceful_shutdown(h2_workers *workers)
{
- return h2_fifo_remove(workers->mplxs, m);
+ apr_thread_mutex_lock(workers->lock);
+ workers->shutdown = 1;
+ workers->idle_limit = apr_time_from_sec(1);
+ wake_all_idles(workers);
+ apr_thread_mutex_unlock(workers->lock);
}
-void h2_workers_graceful_shutdown(h2_workers *workers)
+ap_conn_producer_t *h2_workers_register(h2_workers *workers,
+ apr_pool_t *producer_pool,
+ const char *name,
+ ap_conn_producer_next *fn_next,
+ ap_conn_producer_done *fn_done,
+ void *baton)
{
- apr_atomic_set32(&workers->shutdown, 1);
- apr_atomic_set32(&workers->max_idle_secs, 1);
- wake_non_essential_workers(workers);
+ ap_conn_producer_t *prod;
+
+ prod = apr_pcalloc(producer_pool, sizeof(*prod));
+ APR_RING_ELEM_INIT(prod, link);
+ prod->name = name;
+ prod->fn_next = fn_next;
+ prod->fn_done = fn_done;
+ prod->baton = baton;
+
+ apr_thread_mutex_lock(workers->lock);
+ prod->state = PROD_IDLE;
+ APR_RING_INSERT_TAIL(&workers->prod_idle, prod, ap_conn_producer_t, link);
+ apr_thread_mutex_unlock(workers->lock);
+
+ return prod;
+}
+
+apr_status_t h2_workers_join(h2_workers *workers, ap_conn_producer_t *prod)
+{
+ apr_status_t rv = APR_SUCCESS;
+
+ apr_thread_mutex_lock(workers->lock);
+ if (PROD_JOINED == prod->state) {
+ AP_DEBUG_ASSERT(APR_RING_NEXT(prod, link) == prod); /* should be in no ring */
+ rv = APR_EINVAL;
+ }
+ else {
+ AP_DEBUG_ASSERT(PROD_ACTIVE == prod->state || PROD_IDLE == prod->state);
+ APR_RING_REMOVE(prod, link);
+ prod->state = PROD_JOINED; /* prevent further activations */
+ while (prod->conns_active > 0) {
+ apr_thread_cond_wait(workers->prod_done, workers->lock);
+ }
+ APR_RING_ELEM_INIT(prod, link); /* make it link to itself */
+ }
+ apr_thread_mutex_unlock(workers->lock);
+ return rv;
+}
+
+apr_status_t h2_workers_activate(h2_workers *workers, ap_conn_producer_t *prod)
+{
+ apr_status_t rv = APR_SUCCESS;
+ apr_thread_mutex_lock(workers->lock);
+ if (PROD_IDLE == prod->state) {
+ APR_RING_REMOVE(prod, link);
+ prod->state = PROD_ACTIVE;
+ APR_RING_INSERT_TAIL(&workers->prod_active, prod, ap_conn_producer_t, link);
+ wake_idle_worker(workers, prod);
+ }
+ else if (PROD_JOINED == prod->state) {
+ rv = APR_EINVAL;
+ }
+ apr_thread_mutex_unlock(workers->lock);
+ return rv;
}