]> git.ipfire.org Git - thirdparty/apache/httpd.git/commitdiff
*) mod_http2: new implementation of h2 worker pool.
authorStefan Eissing <icing@apache.org>
Fri, 17 Jun 2022 09:24:57 +0000 (09:24 +0000)
committerStefan Eissing <icing@apache.org>
Fri, 17 Jun 2022 09:24:57 +0000 (09:24 +0000)
     - O(1) cost at registration of connection processing producers
     - no limit on registered producers
     - join of ongoing work on unregister
     - callbacks to unlink dependencies into other h2 code
     - memory cleanup on workers deactivation (on idle timeouts)
     - idle_limit as apr_time_t instead of seconds

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1902005 13f79535-47bb-0310-9956-ffa450edef68

modules/http2/h2_c1.c
modules/http2/h2_config.c
modules/http2/h2_config.h
modules/http2/h2_mplx.c
modules/http2/h2_mplx.h
modules/http2/h2_workers.c
modules/http2/h2_workers.h
modules/http2/mod_http2.c

index 1dc0de7c601e260ca3d559e9739e00eb1dd9a29a..7662a0e4fec7ebca12e9d9ea17de825428092f56 100644 (file)
@@ -56,11 +56,8 @@ apr_status_t h2_c1_child_init(apr_pool_t *pool, server_rec *s)
 {
     apr_status_t status = APR_SUCCESS;
     int minw, maxw;
-    int max_threads_per_child = 0;
-    int idle_secs = 0;
+    apr_time_t idle_limit;
 
-    ap_mpm_query(AP_MPMQ_MAX_THREADS, &max_threads_per_child);
-    
     status = ap_mpm_query(AP_MPMQ_IS_ASYNC, &async_mpm);
     if (status != APR_SUCCESS) {
         /* some MPMs do not implemnent this */
@@ -70,12 +67,8 @@ apr_status_t h2_c1_child_init(apr_pool_t *pool, server_rec *s)
 
     h2_config_init(pool);
 
-    h2_get_num_workers(s, &minw, &maxw);
-    idle_secs = h2_config_sgeti(s, H2_CONF_MAX_WORKER_IDLE_SECS);
-    ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s,
-                 "h2_workers: min=%d max=%d, mthrpchild=%d, idle_secs=%d", 
-                 minw, maxw, max_threads_per_child, idle_secs);
-    workers = h2_workers_create(s, pool, minw, maxw, idle_secs);
+    h2_get_workers_config(s, &minw, &maxw, &idle_limit);
+    workers = h2_workers_create(s, pool, maxw, minw, idle_limit);
  
     h2_c_logio_add_bytes_in = APR_RETRIEVE_OPTIONAL_FN(ap_logio_add_bytes_in);
     h2_c_logio_add_bytes_out = APR_RETRIEVE_OPTIONAL_FN(ap_logio_add_bytes_out);
index 4df058d95d38b0bf658623ccf7829da51033c3d2..da1cf79a0714e0afea693e0adea1fb6e4a1780cb 100644 (file)
@@ -57,7 +57,7 @@ typedef struct h2_config {
     int h2_window_size;              /* stream window size (http2) */
     int min_workers;                 /* min # of worker threads/child */
     int max_workers;                 /* max # of worker threads/child */
-    int max_worker_idle_secs;        /* max # of idle seconds for worker */
+    apr_interval_time_t idle_limit;  /* max duration for idle workers */
     int stream_max_mem_size;         /* max # bytes held in memory/stream */
     int h2_direct;                   /* if mod_h2 is active directly */
     int modern_tls_only;             /* Accept only modern TLS in HTTP/2 connections */  
@@ -93,7 +93,7 @@ static h2_config defconf = {
     H2_INITIAL_WINDOW_SIZE, /* window_size */
     -1,                     /* min workers */
     -1,                     /* max workers */
-    10 * 60,                /* max workers idle secs */
+    apr_time_from_sec(10 * 60), /* workers idle limit */
     32 * 1024,              /* stream max mem size */
     -1,                     /* h2 direct mode */
     1,                      /* modern TLS only */
@@ -136,7 +136,7 @@ void *h2_config_create_svr(apr_pool_t *pool, server_rec *s)
     conf->h2_window_size       = DEF_VAL;
     conf->min_workers          = DEF_VAL;
     conf->max_workers          = DEF_VAL;
-    conf->max_worker_idle_secs = DEF_VAL;
+    conf->idle_limit           = DEF_VAL;
     conf->stream_max_mem_size  = DEF_VAL;
     conf->h2_direct            = DEF_VAL;
     conf->modern_tls_only      = DEF_VAL;
@@ -152,7 +152,7 @@ void *h2_config_create_svr(apr_pool_t *pool, server_rec *s)
     conf->padding_bits         = DEF_VAL;
     conf->padding_always       = DEF_VAL;
     conf->output_buffered      = DEF_VAL;
-    conf->stream_timeout         = DEF_VAL;
+    conf->stream_timeout       = DEF_VAL;
     return conf;
 }
 
@@ -168,7 +168,7 @@ static void *h2_config_merge(apr_pool_t *pool, void *basev, void *addv)
     n->h2_window_size       = H2_CONFIG_GET(add, base, h2_window_size);
     n->min_workers          = H2_CONFIG_GET(add, base, min_workers);
     n->max_workers          = H2_CONFIG_GET(add, base, max_workers);
-    n->max_worker_idle_secs = H2_CONFIG_GET(add, base, max_worker_idle_secs);
+    n->idle_limit           = H2_CONFIG_GET(add, base, idle_limit);
     n->stream_max_mem_size  = H2_CONFIG_GET(add, base, stream_max_mem_size);
     n->h2_direct            = H2_CONFIG_GET(add, base, h2_direct);
     n->modern_tls_only      = H2_CONFIG_GET(add, base, modern_tls_only);
@@ -194,7 +194,7 @@ static void *h2_config_merge(apr_pool_t *pool, void *basev, void *addv)
     n->early_hints          = H2_CONFIG_GET(add, base, early_hints);
     n->padding_bits         = H2_CONFIG_GET(add, base, padding_bits);
     n->padding_always       = H2_CONFIG_GET(add, base, padding_always);
-    n->stream_timeout         = H2_CONFIG_GET(add, base, stream_timeout);
+    n->stream_timeout       = H2_CONFIG_GET(add, base, stream_timeout);
     return n;
 }
 
@@ -248,8 +248,8 @@ static apr_int64_t h2_srv_config_geti64(const h2_config *conf, h2_config_var_t v
             return H2_CONFIG_GET(conf, &defconf, min_workers);
         case H2_CONF_MAX_WORKERS:
             return H2_CONFIG_GET(conf, &defconf, max_workers);
-        case H2_CONF_MAX_WORKER_IDLE_SECS:
-            return H2_CONFIG_GET(conf, &defconf, max_worker_idle_secs);
+        case H2_CONF_MAX_WORKER_IDLE_LIMIT:
+            return H2_CONFIG_GET(conf, &defconf, idle_limit);
         case H2_CONF_STREAM_MAX_MEM:
             return H2_CONFIG_GET(conf, &defconf, stream_max_mem_size);
         case H2_CONF_MODERN_TLS_ONLY:
@@ -298,9 +298,6 @@ static void h2_srv_config_seti(h2_config *conf, h2_config_var_t var, int val)
         case H2_CONF_MAX_WORKERS:
             H2_CONFIG_SET(conf, max_workers, val);
             break;
-        case H2_CONF_MAX_WORKER_IDLE_SECS:
-            H2_CONFIG_SET(conf, max_worker_idle_secs, val);
-            break;
         case H2_CONF_STREAM_MAX_MEM:
             H2_CONFIG_SET(conf, stream_max_mem_size, val);
             break;
@@ -354,6 +351,9 @@ static void h2_srv_config_seti64(h2_config *conf, h2_config_var_t var, apr_int64
         case H2_CONF_STREAM_TIMEOUT:
             H2_CONFIG_SET(conf, stream_timeout, val);
             break;
+        case H2_CONF_MAX_WORKER_IDLE_LIMIT:
+            H2_CONFIG_SET(conf, idle_limit, val);
+            break;
         default:
             h2_srv_config_seti(conf, var, (int)val);
             break;
@@ -557,14 +557,15 @@ static const char *h2_conf_set_max_workers(cmd_parms *cmd,
     return NULL;
 }
 
-static const char *h2_conf_set_max_worker_idle_secs(cmd_parms *cmd,
-                                                    void *dirconf, const char *value)
+static const char *h2_conf_set_max_worker_idle_limit(cmd_parms *cmd,
+                                                     void *dirconf, const char *value)
 {
-    int val = (int)apr_atoi64(value);
-    if (val < 1) {
-        return "value must be > 0";
+    apr_interval_time_t timeout;
+    apr_status_t rv = ap_timeout_parameter_parse(value, &timeout, "s");
+    if (rv != APR_SUCCESS) {
+        return "Invalid idle limit value";
     }
-    CONFIG_CMD_SET(cmd, dirconf, H2_CONF_MAX_WORKER_IDLE_SECS, val);
+    CONFIG_CMD_SET64(cmd, dirconf, H2_CONF_MAX_WORKER_IDLE_LIMIT, timeout);
     return NULL;
 }
 
@@ -868,27 +869,22 @@ static const char *h2_conf_set_stream_timeout(cmd_parms *cmd,
     return NULL;
 }
 
-void h2_get_num_workers(server_rec *s, int *minw, int *maxw)
+void h2_get_workers_config(server_rec *s, int *pminw, int *pmaxw,
+                           apr_time_t *pidle_limit)
 {
     int threads_per_child = 0;
 
-    *minw = h2_config_sgeti(s, H2_CONF_MIN_WORKERS);
-    *maxw = h2_config_sgeti(s, H2_CONF_MAX_WORKERS);    
-    ap_mpm_query(AP_MPMQ_MAX_THREADS, &threads_per_child);
+    *pminw = h2_config_sgeti(s, H2_CONF_MIN_WORKERS);
+    *pmaxw = h2_config_sgeti(s, H2_CONF_MAX_WORKERS);
 
-    if (*minw <= 0) {
-        *minw = threads_per_child;
-    }
-    if (*maxw <= 0) {
-        /* As a default, this seems to work quite well under mpm_event. 
-         * For people enabling http2 under mpm_prefork, start 4 threads unless 
-         * configured otherwise. People get unhappy if their http2 requests are 
-         * blocking each other. */
-        *maxw = 3 * (*minw) / 2;
-        if (*maxw < 4) {
-            *maxw = 4;
-        }
+    ap_mpm_query(AP_MPMQ_MAX_THREADS, &threads_per_child);
+    if (*pminw <= 0) {
+        *pminw = threads_per_child;
+    }
+    if (*pmaxw <= 0) {
+        *pmaxw = H2MAX(4, 3 * (*pminw) / 2);
     }
+    *pidle_limit = h2_config_sgeti64(s, H2_CONF_MAX_WORKER_IDLE_LIMIT);
 }
 
 #define AP_END_CMD     AP_INIT_TAKE1(NULL, NULL, NULL, RSRC_CONF, NULL)
@@ -902,7 +898,7 @@ const command_rec h2_cmds[] = {
                   RSRC_CONF, "minimum number of worker threads per child"),
     AP_INIT_TAKE1("H2MaxWorkers", h2_conf_set_max_workers, NULL,
                   RSRC_CONF, "maximum number of worker threads per child"),
-    AP_INIT_TAKE1("H2MaxWorkerIdleSeconds", h2_conf_set_max_worker_idle_secs, NULL,
+    AP_INIT_TAKE1("H2MaxWorkerIdleSeconds", h2_conf_set_max_worker_idle_limit, NULL,
                   RSRC_CONF, "maximum number of idle seconds before a worker shuts down"),
     AP_INIT_TAKE1("H2StreamMaxMemSize", h2_conf_set_stream_max_mem_size, NULL,
                   RSRC_CONF, "maximum number of bytes buffered in memory for a stream"),
index c150fe21d84e90a001a6f838f6fef02e636f0060..6d2e65f926a5683b6e61c3011f9e2f056192bb18 100644 (file)
@@ -28,7 +28,7 @@ typedef enum {
     H2_CONF_WIN_SIZE,
     H2_CONF_MIN_WORKERS,
     H2_CONF_MAX_WORKERS,
-    H2_CONF_MAX_WORKER_IDLE_SECS,
+    H2_CONF_MAX_WORKER_IDLE_LIMIT,
     H2_CONF_STREAM_MAX_MEM,
     H2_CONF_DIRECT,
     H2_CONF_MODERN_TLS_ONLY,
@@ -88,7 +88,8 @@ apr_int64_t h2_config_rgeti64(request_rec *r, h2_config_var_t var);
 apr_array_header_t *h2_config_push_list(request_rec *r);
 
 
-void h2_get_num_workers(server_rec *s, int *minw, int *maxw);
+void h2_get_workers_config(server_rec *s, int *pminw, int *pmaxw,
+                           apr_time_t *pidle_limit);
 void h2_config_init(apr_pool_t *pool);
 
 const struct h2_priority *h2_cconfig_get_priority(conn_rec *c, const char *content_type);
index 205d19f020a64ed52dc25588bf44c3e019fb40b0..bb6b324115a17365c3442e65ec353a3b7b89a588 100644 (file)
@@ -58,6 +58,9 @@ typedef struct {
     apr_size_t count;
 } stream_iter_ctx;
 
+static conn_rec *c2_prod_next(void *baton, int *phas_more);
+static void c2_prod_done(void *baton, conn_rec *c2);
+
 static void s_mplx_be_happy(h2_mplx *m, conn_rec *c, h2_conn_ctx_t *conn_ctx);
 static void m_be_annoyed(h2_mplx *m);
 
@@ -303,7 +306,7 @@ h2_mplx *h2_mplx_c1_create(int child_num, apr_uint32_t id, h2_stream *stream0,
     m->q = h2_iq_create(m->pool, m->max_streams);
 
     m->workers = workers;
-    m->processing_max = m->max_streams;
+    m->processing_max = H2MIN(h2_workers_get_max_workers(workers), m->max_streams);
     m->processing_limit = 6; /* the original h1 max parallel connections */
     m->last_mood_change = apr_time_now();
     m->mood_update_interval = apr_time_from_msec(100);
@@ -332,6 +335,9 @@ h2_mplx *h2_mplx_c1_create(int child_num, apr_uint32_t id, h2_stream *stream0,
     m->max_spare_transits = 3;
     m->c2_transits = apr_array_make(m->pool, m->max_spare_transits, sizeof(h2_c2_transit*));
 
+    m->producer = h2_workers_register(workers, m->pool,
+                                      apr_psprintf(m->pool, "h2-%d", (int)m->id),
+                                      c2_prod_next, c2_prod_done, m);
     return m;
 
 failure:
@@ -440,8 +446,7 @@ void h2_mplx_c1_destroy(h2_mplx *m)
     /* How to shut down a h2 connection:
      * 0. abort and tell the workers that no more work will come from us */
     m->aborted = 1;
-    h2_workers_unregister(m->workers, m);
-    
+
     H2_MPLX_ENTER_ALWAYS(m);
 
     /* While really terminating any c2 connections, treat the master
@@ -485,6 +490,10 @@ void h2_mplx_c1_destroy(h2_mplx *m)
         }
     }
 
+    H2_MPLX_LEAVE(m);
+    h2_workers_join(m->workers, m->producer);
+    H2_MPLX_ENTER_ALWAYS(m);
+
     /* 4. With all workers done, all streams should be in spurge */
     ap_assert(m->processing_count == 0);
     if (!h2_ihash_empty(m->shold)) {
@@ -687,15 +696,13 @@ void h2_mplx_c1_process(h2_mplx *m,
                           H2_MPLX_MSG(m, "stream %d not found to process"), sid);
         }
     }
-    if (!m->is_registered && !h2_iq_empty(m->q)) {
-        m->is_registered = 1;
+    if ((m->processing_count < m->processing_limit) && !h2_iq_empty(m->q)) {
         H2_MPLX_LEAVE(m);
-        rv = h2_workers_register(m->workers, m);
+        rv = h2_workers_activate(m->workers, m->producer);
         H2_MPLX_ENTER_ALWAYS(m);
         if (rv != APR_SUCCESS) {
-            m->is_registered = 0;
             ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, m->c1, APLOGNO(10021)
-                          H2_MPLX_MSG(m, "register at workers"));
+                          H2_MPLX_MSG(m, "activate at workers"));
         }
     }
     *pstream_count = (int)h2_ihash_count(m->streams);
@@ -863,24 +870,18 @@ cleanup:
     return c2;
 }
 
-apr_status_t h2_mplx_worker_pop_c2(h2_mplx *m, conn_rec **out_c)
+static conn_rec *c2_prod_next(void *baton, int *phas_more)
 {
-    apr_status_t rv;
+    h2_mplx *m = baton;
+    conn_rec *c = NULL;
 
     H2_MPLX_ENTER_ALWAYS(m);
-    if (m->aborted) {
-        *out_c = NULL;
-        rv = APR_EOF;
-    }
-    else {
-        *out_c = s_next_c2(m);
-        rv = (*out_c != NULL && !h2_iq_empty(m->q))? APR_EAGAIN : APR_SUCCESS;
-    }
-    if (APR_EAGAIN != rv) {
-        m->is_registered = 0; /* h2_workers will discard this mplx */
+    if (!m->aborted) {
+        c = s_next_c2(m);
+        *phas_more = (c != NULL && !h2_iq_empty(m->q));
     }
     H2_MPLX_LEAVE(m);
-    return rv;
+    return c;
 }
 
 static void s_c2_done(h2_mplx *m, conn_rec *c2, h2_conn_ctx_t *conn_ctx)
@@ -947,34 +948,18 @@ static void s_c2_done(h2_mplx *m, conn_rec *c2, h2_conn_ctx_t *conn_ctx)
     }
 }
 
-void h2_mplx_worker_c2_done(conn_rec *c2)
+static void c2_prod_done(void *baton, conn_rec *c2)
 {
+    h2_mplx *m = baton;
     h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c2);
-    h2_mplx *m;
 
     AP_DEBUG_ASSERT(conn_ctx);
-    m = conn_ctx->mplx;
     H2_MPLX_ENTER_ALWAYS(m);
 
     --m->processing_count;
     s_c2_done(m, c2, conn_ctx);
     if (m->join_wait) apr_thread_cond_signal(m->join_wait);
 
-    if (!m->aborted && !m->is_registered
-        && (m->processing_count < m->processing_limit)
-        && !h2_iq_empty(m->q)) {
-        /* We have a limit on the amount of c2s we process at a time. When
-         * this is reached, we do no longer have things to do for h2 workers
-         * and they remove such an mplx from its queue.
-         * When a c2 is done, there might then be room for more processing
-         * and we need then to register this mplx at h2 workers again.
-         */
-        m->is_registered = 1;
-        H2_MPLX_LEAVE(m);
-        h2_workers_register(m->workers, m);
-        return;
-    }
-
     H2_MPLX_LEAVE(m);
 }
 
index 42faf051ad7aac4246530db139ec254c9d05f0bb..e056acacdd10adf56013cc99e9c1462ad1ee8cc2 100644 (file)
@@ -44,6 +44,8 @@ struct h2_iqueue;
 
 #include <apr_queue.h>
 
+#include "h2_workers.h"
+
 typedef struct h2_c2_transit h2_c2_transit;
 
 struct h2_c2_transit {
@@ -63,7 +65,7 @@ struct h2_mplx {
 
     int aborted;
     int polling;                    /* is waiting/processing pollset events */
-    int is_registered;              /* is registered at h2_workers */
+    ap_conn_producer_t *producer;   /* registered producer at h2_workers */
 
     struct h2_ihash_t *streams;     /* all streams active */
     struct h2_ihash_t *shold;       /* all streams done with c2 processing ongoing */
@@ -218,12 +220,6 @@ const struct h2_stream *h2_mplx_c2_stream_get(h2_mplx *m, int stream_id);
  */
 apr_status_t h2_mplx_worker_pop_c2(h2_mplx *m, conn_rec **out_c2);
 
-/**
- * A h2 worker reports a secondary connection processing done.
- * @param c2 the secondary connection finished processing
- */
-void h2_mplx_worker_c2_done(conn_rec *c2);
-
 #define H2_MPLX_MSG(m, msg)     \
     "h2_mplx(%d-%lu): "msg, m->child_num, (unsigned long)m->id
 
index 22c31f4f83bdaa4365b2b736fb6c71321043f2f6..c8796aeeacc162260502fedfb23d5d2f7521c3d6 100644 (file)
@@ -15,7 +15,7 @@
  */
 
 #include <assert.h>
-#include <apr_atomic.h>
+#include <apr_ring.h>
 #include <apr_thread_mutex.h>
 #include <apr_thread_cond.h>
 
 #include "h2_workers.h"
 #include "h2_util.h"
 
+typedef enum {
+    PROD_IDLE,
+    PROD_ACTIVE,
+    PROD_JOINED,
+} prod_state_t;
+
+struct ap_conn_producer_t {
+    APR_RING_ENTRY(ap_conn_producer_t) link;
+    const char *name;
+    void *baton;
+    ap_conn_producer_next *fn_next;
+    ap_conn_producer_done *fn_done;
+    volatile prod_state_t state;
+    volatile int conns_active;
+};
+
+
+typedef enum {
+    H2_SLOT_FREE,
+    H2_SLOT_RUN,
+    H2_SLOT_ZOMBIE,
+} h2_slot_state_t;
+
 typedef struct h2_slot h2_slot;
 struct h2_slot {
+    APR_RING_ENTRY(h2_slot) link;
     int id;
-    h2_slot *next;
+    apr_pool_t *pool;
+    h2_slot_state_t state;
+    volatile int should_shutdown;
+    volatile int is_idle;
     h2_workers *workers;
-    conn_rec *connection;
+    ap_conn_producer_t *prod;
     apr_thread_t *thread;
-    apr_thread_mutex_t *lock;
-    apr_thread_cond_t *not_idle;
-    /* atomic */ apr_uint32_t timed_out;
+    struct apr_thread_cond_t *more_work;
+    int activations;
 };
 
-static h2_slot *pop_slot(h2_slot *volatile *phead) 
-{
-    /* Atomically pop a slot from the list */
-    for (;;) {
-        h2_slot *first = *phead;
-        if (first == NULL) {
-            return NULL;
-        }
-        if (apr_atomic_casptr((void*)phead, first->next, first) == first) {
-            first->next = NULL;
-            return first;
-        }
-    }
-}
+struct h2_workers {
+    server_rec *s;
+    apr_pool_t *pool;
+
+    apr_uint32_t max_slots;
+    apr_uint32_t min_active;
+    volatile int idle_limit;
+    volatile int aborted;
+    volatile int shutdown;
+    int dynamic;
+
+    volatile apr_uint32_t active_slots;
+    volatile apr_uint32_t idle_slots;
+
+    apr_threadattr_t *thread_attr;
+    h2_slot *slots;
+
+    APR_RING_HEAD(h2_slots_free, h2_slot) free;
+    APR_RING_HEAD(h2_slots_idle, h2_slot) idle;
+    APR_RING_HEAD(h2_slots_busy, h2_slot) busy;
+    APR_RING_HEAD(h2_slots_zombie, h2_slot) zombie;
+
+    APR_RING_HEAD(ap_conn_producer_active, ap_conn_producer_t) prod_active;
+    APR_RING_HEAD(ap_conn_producer_idle, ap_conn_producer_t) prod_idle;
+
+    struct apr_thread_mutex_t *lock;
+    struct apr_thread_cond_t *prod_done;
+    struct apr_thread_cond_t *all_done;
+};
 
-static void push_slot(h2_slot *volatile *phead, h2_slot *slot)
-{
-    /* Atomically push a slot to the list */
-    ap_assert(!slot->next);
-    for (;;) {
-        h2_slot *next = slot->next = *phead;
-        if (apr_atomic_casptr((void*)phead, slot, next) == next) {
-            return;
-        }
-    }
-}
 
 static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx);
-static void slot_done(h2_slot *slot);
 
-static apr_status_t activate_slot(h2_workers *workers, h2_slot *slot) 
+static apr_status_t activate_slot(h2_workers *workers)
 {
+    h2_slot *slot;
+    apr_pool_t *pool;
     apr_status_t rv;
-    
-    slot->workers = workers;
-    slot->connection = NULL;
 
-    apr_thread_mutex_lock(workers->lock);
-    if (!slot->lock) {
-        rv = apr_thread_mutex_create(&slot->lock,
-                                         APR_THREAD_MUTEX_DEFAULT,
-                                         workers->pool);
-        if (rv != APR_SUCCESS) goto cleanup;
+    if (APR_RING_EMPTY(&workers->free, h2_slot, link)) {
+        return APR_EAGAIN;
     }
+    slot = APR_RING_FIRST(&workers->free);
+    ap_assert(slot->state == H2_SLOT_FREE);
+    APR_RING_REMOVE(slot, link);
 
-    if (!slot->not_idle) {
-        rv = apr_thread_cond_create(&slot->not_idle, workers->pool);
-        if (rv != APR_SUCCESS) goto cleanup;
-    }
-    
     ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
-                 "h2_workers: new thread for slot %d", slot->id); 
+                 "h2_workers: activate slot %d", slot->id);
+
+    slot->state = H2_SLOT_RUN;
+    slot->should_shutdown = 0;
+    slot->is_idle = 0;
+    slot->pool = NULL;
+    ++workers->active_slots;
+    rv = apr_pool_create(&pool, workers->pool);
+    if (APR_SUCCESS != rv) goto cleanup;
+    apr_pool_tag(pool, "h2_worker_slot");
+    slot->pool = pool;
 
-    /* thread will either immediately start work or add itself
-     * to the idle queue */
-    apr_atomic_inc32(&workers->worker_count);
-    apr_atomic_set32(&slot->timed_out, 0);
     rv = ap_thread_create(&slot->thread, workers->thread_attr,
-                          slot_run, slot, workers->pool);
-    if (rv != APR_SUCCESS) {
-        apr_atomic_dec32(&workers->worker_count);
-    }
+                          slot_run, slot, slot->pool);
 
 cleanup:
-    apr_thread_mutex_unlock(workers->lock);
     if (rv != APR_SUCCESS) {
-        push_slot(&workers->free, slot);
-    }
-    return rv;
-}
-
-static apr_status_t add_worker(h2_workers *workers)
-{
-    h2_slot *slot = pop_slot(&workers->free);
-    if (slot) {
-        return activate_slot(workers, slot);
-    }
-    return APR_EAGAIN;
-}
-
-static void wake_idle_worker(h2_workers *workers) 
-{
-    h2_slot *slot;;
-    for (;;) {
-        slot = pop_slot(&workers->idle);
-        if (!slot) {
-            if (workers->dynamic && apr_atomic_read32(&workers->shutdown) == 0) {
-                add_worker(workers);
-            }
-            return;
-        }
-        if (!apr_atomic_read32(&slot->timed_out)) {
-            apr_thread_mutex_lock(slot->lock);
-            apr_thread_cond_signal(slot->not_idle);
-            apr_thread_mutex_unlock(slot->lock);
-            return;
+        AP_DEBUG_ASSERT(0);
+        slot->state = H2_SLOT_FREE;
+        if (slot->pool) {
+            apr_pool_destroy(slot->pool);
+            slot->pool = NULL;
         }
-        slot_done(slot);
+        APR_RING_INSERT_TAIL(&workers->free, slot, h2_slot, link);
+        --workers->active_slots;
     }
+    return rv;
 }
 
 static void join_zombies(h2_workers *workers)
 {
     h2_slot *slot;
-    while ((slot = pop_slot(&workers->zombies))) {
-        apr_status_t status;
+    apr_status_t status;
+
+    while (!APR_RING_EMPTY(&workers->zombie, h2_slot, link)) {
+        slot = APR_RING_FIRST(&workers->zombie);
+        APR_RING_REMOVE(slot, link);
+        ap_assert(slot->state == H2_SLOT_ZOMBIE);
         ap_assert(slot->thread != NULL);
+
+        apr_thread_mutex_unlock(workers->lock);
         apr_thread_join(&status, slot->thread);
-        slot->thread = NULL;
+        apr_thread_mutex_lock(workers->lock);
 
-        push_slot(&workers->free, slot);
+        slot->thread = NULL;
+        slot->state = H2_SLOT_FREE;
+        if (slot->pool) {
+            apr_pool_destroy(slot->pool);
+            slot->pool = NULL;
+        }
+        APR_RING_INSERT_TAIL(&workers->free, slot, h2_slot, link);
     }
 }
 
-static apr_status_t slot_pull_c2(h2_slot *slot, h2_mplx *m)
+static void wake_idle_worker(h2_workers *workers, ap_conn_producer_t *prod)
 {
-    apr_status_t rv;
-    
-    rv = h2_mplx_worker_pop_c2(m, &slot->connection);
-    if (slot->connection) {
-        return rv;
+    if (!APR_RING_EMPTY(&workers->idle, h2_slot, link)) {
+        h2_slot *slot;
+        for (slot = APR_RING_FIRST(&workers->idle);
+             slot != APR_RING_SENTINEL(&workers->idle, h2_slot, link);
+             slot = APR_RING_NEXT(slot, link)) {
+             if (slot->is_idle && !slot->should_shutdown) {
+                apr_thread_cond_signal(slot->more_work);
+                slot->is_idle = 0;
+                return;
+             }
+        }
+    }
+    if (workers->dynamic && !workers->shutdown
+        && (workers->active_slots < workers->max_slots)) {
+        activate_slot(workers);
     }
-    return APR_EOF;
-}
-
-static h2_fifo_op_t mplx_peek(void *head, void *ctx)
-{
-    h2_mplx *m = head;
-    h2_slot *slot = ctx;
-    
-    if (slot_pull_c2(slot, m) == APR_EAGAIN) {
-        wake_idle_worker(slot->workers);
-        return H2_FIFO_OP_REPUSH;
-    } 
-    return H2_FIFO_OP_PULL;
 }
 
 /**
- * Get the next c2 for the given worker. Will block until a c2 arrives
- * or the max_wait timer expires and more than min workers exist.
+ * Get the next connection to work on.
  */
-static int get_next(h2_slot *slot)
+static conn_rec *get_next(h2_slot *slot)
 {
     h2_workers *workers = slot->workers;
-    int non_essential = slot->id >= workers->min_workers;
-    apr_status_t rv;
-
-    while (apr_atomic_read32(&workers->aborted) == 0
-        && apr_atomic_read32(&slot->timed_out) == 0) {
-        ap_assert(slot->connection == NULL);
-        if (non_essential && apr_atomic_read32(&workers->shutdown)) {
-            /* Terminate non-essential worker on shutdown */
-            break;
+    conn_rec *c = NULL;
+    ap_conn_producer_t *prod;
+    int has_more;
+
+    slot->prod = NULL;
+    if (!APR_RING_EMPTY(&workers->prod_active, ap_conn_producer_t, link)) {
+        slot->prod = prod = APR_RING_FIRST(&workers->prod_active);
+        APR_RING_REMOVE(prod, link);
+        AP_DEBUG_ASSERT(PROD_ACTIVE == prod->state);
+
+        c = prod->fn_next(prod->baton, &has_more);
+        if (c && has_more) {
+            APR_RING_INSERT_TAIL(&workers->prod_active, prod, ap_conn_producer_t, link);
+            wake_idle_worker(workers, slot->prod);
         }
-        if (h2_fifo_try_peek(workers->mplxs, mplx_peek, slot) == APR_EOF) {
-            /* The queue is terminated with the MPM child being cleaned up,
-             * just leave. */
-            break;
+        else {
+            prod->state = PROD_IDLE;
+            APR_RING_INSERT_TAIL(&workers->prod_idle, prod, ap_conn_producer_t, link);
         }
-        if (slot->connection) {
-            return 1;
+        if (c) {
+            ++prod->conns_active;
         }
-        
-        join_zombies(workers);
-
-        apr_thread_mutex_lock(slot->lock);
-        if (apr_atomic_read32(&workers->aborted) == 0) {
-            apr_uint32_t idle_secs;
-
-            push_slot(&workers->idle, slot);
-            if (non_essential
-                && (idle_secs = apr_atomic_read32(&workers->max_idle_secs))) {
-                rv = apr_thread_cond_timedwait(slot->not_idle, slot->lock,
-                                               apr_time_from_sec(idle_secs));
-                if (APR_TIMEUP == rv) {
-                    apr_atomic_set32(&slot->timed_out, 1);
-                }
-            }
-            else {
-                apr_thread_cond_wait(slot->not_idle, slot->lock);
-            }
-        }
-        apr_thread_mutex_unlock(slot->lock);
     }
 
-    return 0;
+    return c;
 }
 
-static void slot_done(h2_slot *slot)
+static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx)
 {
+    h2_slot *slot = wctx;
     h2_workers *workers = slot->workers;
+    conn_rec *c;
+    apr_status_t rv;
 
-    push_slot(&workers->zombies, slot);
+    apr_thread_mutex_lock(workers->lock);
+    slot->state = H2_SLOT_RUN;
+    ++slot->activations;
+    APR_RING_ELEM_INIT(slot, link);
+    for(;;) {
+        if (APR_RING_NEXT(slot, link) != slot) {
+            /* slot is part of the idle ring from the last loop */
+            APR_RING_REMOVE(slot, link);
+            --workers->idle_slots;
+        }
+        slot->is_idle = 0;
+
+        if (!workers->aborted && !slot->should_shutdown) {
+            APR_RING_INSERT_TAIL(&workers->busy, slot, h2_slot, link);
+            do {
+                c = get_next(slot);
+                if (!c) {
+                    break;
+                }
+                apr_thread_mutex_unlock(workers->lock);
+                /* See the discussion at <https://github.com/icing/mod_h2/issues/195>
+                 *
+                 * Each conn_rec->id is supposed to be unique at a point in time. Since
+                 * some modules (and maybe external code) uses this id as an identifier
+                 * for the request_rec they handle, it needs to be unique for secondary
+                 * connections also.
+                 *
+                 * The MPM module assigns the connection ids and mod_unique_id is using
+                 * that one to generate identifier for requests. While the implementation
+                 * works for HTTP/1.x, the parallel execution of several requests per
+                 * connection will generate duplicate identifiers on load.
+                 *
+                 * The original implementation for secondary connection identifiers used
+                 * to shift the master connection id up and assign the stream id to the
+                 * lower bits. This was cramped on 32 bit systems, but on 64bit there was
+                 * enough space.
+                 *
+                 * As issue 195 showed, mod_unique_id only uses the lower 32 bit of the
+                 * connection id, even on 64bit systems. Therefore collisions in request ids.
+                 *
+                 * The way master connection ids are generated, there is some space "at the
+                 * top" of the lower 32 bits on allmost all systems. If you have a setup
+                 * with 64k threads per child and 255 child processes, you live on the edge.
+                 *
+                 * The new implementation shifts 8 bits and XORs in the worker
+                 * id. This will experience collisions with > 256 h2 workers and heavy
+                 * load still. There seems to be no way to solve this in all possible
+                 * configurations by mod_h2 alone.
+                 */
+                if (c->master) {
+                    c->id = (c->master->id << 8)^slot->id;
+                }
+                c->current_thread = thread;
+                AP_DEBUG_ASSERT(slot->prod);
 
-    /* If this worker is the last one exiting and the MPM child is stopping,
-     * unblock workers_pool_cleanup().
-     */
-    if (!apr_atomic_dec32(&workers->worker_count)
-        && apr_atomic_read32(&workers->aborted)) {
-        apr_thread_mutex_lock(workers->lock);
-        apr_thread_cond_signal(workers->all_done);
-        apr_thread_mutex_unlock(workers->lock);
-    }
-}
+                ap_process_connection(c, ap_get_conn_socket(c));
+                slot->prod->fn_done(slot->prod->baton, c);
 
+                apr_thread_mutex_lock(workers->lock);
+                if (--slot->prod->conns_active <= 0) {
+                    apr_thread_cond_broadcast(workers->prod_done);
+                }
+                if (slot->prod->state == PROD_IDLE) {
+                    APR_RING_REMOVE(slot->prod, link);
+                    slot->prod->state = PROD_ACTIVE;
+                    APR_RING_INSERT_TAIL(&workers->prod_active, slot->prod, ap_conn_producer_t, link);
+                }
 
-static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx)
-{
-    h2_slot *slot = wctx;
-    conn_rec *c;
-    
-    /* Get the next c2 from mplx to process. */
-    while (get_next(slot)) {
-        /* See the discussion at <https://github.com/icing/mod_h2/issues/195>
-         *
-         * Each conn_rec->id is supposed to be unique at a point in time. Since
-         * some modules (and maybe external code) uses this id as an identifier
-         * for the request_rec they handle, it needs to be unique for secondary
-         * connections also.
-         *
-         * The MPM module assigns the connection ids and mod_unique_id is using
-         * that one to generate identifier for requests. While the implementation
-         * works for HTTP/1.x, the parallel execution of several requests per
-         * connection will generate duplicate identifiers on load.
-         *
-         * The original implementation for secondary connection identifiers used
-         * to shift the master connection id up and assign the stream id to the
-         * lower bits. This was cramped on 32 bit systems, but on 64bit there was
-         * enough space.
-         *
-         * As issue 195 showed, mod_unique_id only uses the lower 32 bit of the
-         * connection id, even on 64bit systems. Therefore collisions in request ids.
-         *
-         * The way master connection ids are generated, there is some space "at the
-         * top" of the lower 32 bits on allmost all systems. If you have a setup
-         * with 64k threads per child and 255 child processes, you live on the edge.
-         *
-         * The new implementation shifts 8 bits and XORs in the worker
-         * id. This will experience collisions with > 256 h2 workers and heavy
-         * load still. There seems to be no way to solve this in all possible
-         * configurations by mod_h2 alone.
-         */
-        AP_DEBUG_ASSERT(slot->connection != NULL);
-        c = slot->connection;
-        slot->connection = NULL;
-        c->id = (c->master->id << 8)^slot->id;
-        c->current_thread = thread;
+            } while (!workers->aborted && !slot->should_shutdown);
+            APR_RING_REMOVE(slot, link); /* no longer busy */
+        }
 
-        ap_process_connection(c, ap_get_conn_socket(c));
+        if (workers->aborted || slot->should_shutdown) {
+            break;
+        }
 
-        h2_mplx_worker_c2_done(c);
+        join_zombies(workers);
+
+        /* we are idle */
+        APR_RING_INSERT_TAIL(&workers->idle, slot, h2_slot, link);
+        ++workers->idle_slots;
+        slot->is_idle = 1;
+        if (slot->id >= workers->min_active && workers->idle_limit) {
+            rv = apr_thread_cond_timedwait(slot->more_work, workers->lock,
+                                           workers->idle_limit);
+            if (APR_TIMEUP == rv) {
+                APR_RING_REMOVE(slot, link);
+                --workers->idle_slots;
+                ap_log_error(APLOG_MARK, APLOG_ERR, 0, workers->s,
+                             "h2_workers: idle timeout slot %d in state %d (%d activations)",
+                             slot->id, slot->state, slot->activations);
+                break;
+            }
+        }
+        else {
+            apr_thread_cond_wait(slot->more_work, workers->lock);
+        }
     }
 
-    if (apr_atomic_read32(&slot->timed_out) == 0) {
-        slot_done(slot);
+    ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
+                 "h2_workers: terminate slot %d in state %d (%d activations)",
+                 slot->id, slot->state, slot->activations);
+    slot->is_idle = 0;
+    slot->state = H2_SLOT_ZOMBIE;
+    slot->should_shutdown = 0;
+    APR_RING_INSERT_TAIL(&workers->zombie, slot, h2_slot, link);
+    --workers->active_slots;
+    if (workers->active_slots <= 0) {
+        apr_thread_cond_broadcast(workers->all_done);
     }
+    apr_thread_mutex_unlock(workers->lock);
 
     apr_thread_exit(thread, APR_SUCCESS);
     return NULL;
 }
 
-static void wake_non_essential_workers(h2_workers *workers)
+static void wake_all_idles(h2_workers *workers)
 {
     h2_slot *slot;
-    /* pop all idle, signal the non essentials and add the others again */
-    if ((slot = pop_slot(&workers->idle))) {
-        wake_non_essential_workers(workers);
-        if (slot->id > workers->min_workers) {
-            apr_thread_mutex_lock(slot->lock);
-            apr_thread_cond_signal(slot->not_idle);
-            apr_thread_mutex_unlock(slot->lock);
-        }
-        else {
-            push_slot(&workers->idle, slot);
-        }
-    }
-}
-
-static void workers_abort_idle(h2_workers *workers)
-{
-    h2_slot *slot;
-
-    apr_atomic_set32(&workers->shutdown, 1);
-    apr_atomic_set32(&workers->aborted, 1);
-    h2_fifo_term(workers->mplxs);
-
-    /* abort all idle slots */
-    while ((slot = pop_slot(&workers->idle))) {
-        apr_thread_mutex_lock(slot->lock);
-        apr_thread_cond_signal(slot->not_idle);
-        apr_thread_mutex_unlock(slot->lock);
+    for (slot = APR_RING_FIRST(&workers->idle);
+         slot != APR_RING_SENTINEL(&workers->idle, h2_slot, link);
+         slot = APR_RING_NEXT(slot, link))
+    {
+        apr_thread_cond_signal(slot->more_work);
     }
 }
 
@@ -347,37 +367,47 @@ static apr_status_t workers_pool_cleanup(void *data)
     int n, wait_sec = 5;
 
     ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s,
-                 "h2_workers: cleanup %d workers idling",
-                 (int)apr_atomic_read32(&workers->worker_count));
-    workers_abort_idle(workers);
+                 "h2_workers: cleanup %d workers (%d idle)",
+                 workers->active_slots, workers->idle_slots);
+    apr_thread_mutex_lock(workers->lock);
+    workers->shutdown = 1;
+    workers->aborted = 1;
+    wake_all_idles(workers);
+    apr_thread_mutex_unlock(workers->lock);
 
     /* wait for all the workers to become zombies and join them.
      * this gets called after the mpm shuts down and all connections
      * have either been handled (graceful) or we are forced exiting
      * (ungrateful). Either way, we show limited patience. */
-    apr_thread_mutex_lock(workers->lock);
     end = apr_time_now() + apr_time_from_sec(wait_sec);
-    while ((n = apr_atomic_read32(&workers->worker_count)) > 0
-           && apr_time_now() < end) {
+    while (apr_time_now() < end) {
+        apr_thread_mutex_lock(workers->lock);
+        if (!(n = workers->active_slots)) {
+            apr_thread_mutex_unlock(workers->lock);
+            break;
+        }
+        wake_all_idles(workers);
         rv = apr_thread_cond_timedwait(workers->all_done, workers->lock, timeout);
+        apr_thread_mutex_unlock(workers->lock);
+
         if (APR_TIMEUP == rv) {
             ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s,
-                         APLOGNO(10290) "h2_workers: waiting for idle workers to close, "
-                         "still seeing %d workers living",
-                         apr_atomic_read32(&workers->worker_count));
-            continue;
+                         APLOGNO(10290) "h2_workers: waiting for workers to close, "
+                         "still seeing %d workers (%d idle) living",
+                         workers->active_slots, workers->idle_slots);
         }
     }
     if (n) {
         ap_log_error(APLOG_MARK, APLOG_WARNING, 0, workers->s,
-                     APLOGNO(10291) "h2_workers: cleanup, %d idle workers "
+                     APLOGNO(10291) "h2_workers: cleanup, %d workers (%d idle) "
                      "did not exit after %d seconds.",
-                     n, wait_sec);
+                     n, workers->idle_slots, wait_sec);
     }
-    apr_thread_mutex_unlock(workers->lock);
     ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s,
                  "h2_workers: cleanup all workers terminated");
+    apr_thread_mutex_lock(workers->lock);
     join_zombies(workers);
+    apr_thread_mutex_unlock(workers->lock);
     ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s,
                  "h2_workers: cleanup zombie workers joined");
 
@@ -385,13 +415,13 @@ static apr_status_t workers_pool_cleanup(void *data)
 }
 
 h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pchild,
-                              int min_workers, int max_workers,
-                              int idle_secs)
+                              int max_slots, int min_active, apr_time_t idle_limit)
 {
     apr_status_t rv;
     h2_workers *workers;
     apr_pool_t *pool;
-    int i, n;
+    apr_allocator_t *allocator;
+    int i, locked = 0;
 
     ap_assert(s);
     ap_assert(pchild);
@@ -401,7 +431,16 @@ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pchild,
      * guarded by our lock. Without this pool, all subpool creations would
      * happen on the pool handed to us, which we do not guard.
      */
-    apr_pool_create(&pool, pchild);
+    rv = apr_allocator_create(&allocator);
+    if (rv != APR_SUCCESS) {
+        goto cleanup;
+    }
+    rv = apr_pool_create_ex(&pool, pchild, NULL, allocator);
+    if (rv != APR_SUCCESS) {
+        apr_allocator_destroy(allocator);
+        goto cleanup;
+    }
+    apr_allocator_owner_set(allocator, pool);
     apr_pool_tag(pool, "h2_workers");
     workers = apr_pcalloc(pool, sizeof(h2_workers));
     if (!workers) {
@@ -410,19 +449,23 @@ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pchild,
     
     workers->s = s;
     workers->pool = pool;
-    workers->min_workers = min_workers;
-    workers->max_workers = max_workers;
-    workers->max_idle_secs = (idle_secs > 0)? idle_secs : 10;
+    workers->min_active = min_active;
+    workers->max_slots = max_slots;
+    workers->idle_limit = (idle_limit > 0)? idle_limit : apr_time_from_sec(10);
+    workers->dynamic = (workers->min_active < workers->max_slots);
 
-    ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s,
-                 "h2_workers: created with min=%d max=%d idle_timeout=%d sec",
-                 workers->min_workers, workers->max_workers,
-                 (int)workers->max_idle_secs);
-    /* FIXME: the fifo set we use here has limited capacity. Once the
-     * set is full, connections with new requests do a wait.
-     */
-    rv = h2_fifo_set_create(&workers->mplxs, pool, 16 * 1024);
-    if (rv != APR_SUCCESS) goto cleanup;
+    ap_log_error(APLOG_MARK, APLOG_INFO, 0, workers->s,
+                 "h2_workers: created with min=%d max=%d idle_ms=%d",
+                 workers->min_active, workers->max_slots,
+                 (int)apr_time_as_msec(idle_limit));
+
+    APR_RING_INIT(&workers->idle, h2_slot, link);
+    APR_RING_INIT(&workers->busy, h2_slot, link);
+    APR_RING_INIT(&workers->free, h2_slot, link);
+    APR_RING_INIT(&workers->zombie, h2_slot, link);
+
+    APR_RING_INIT(&workers->prod_active, ap_conn_producer_t, link);
+    APR_RING_INIT(&workers->prod_idle, ap_conn_producer_t, link);
 
     rv = apr_threadattr_create(&workers->thread_attr, workers->pool);
     if (rv != APR_SUCCESS) goto cleanup;
@@ -441,32 +484,35 @@ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pchild,
     if (rv != APR_SUCCESS) goto cleanup;
     rv = apr_thread_cond_create(&workers->all_done, workers->pool);
     if (rv != APR_SUCCESS) goto cleanup;
+    rv = apr_thread_cond_create(&workers->prod_done, workers->pool);
+    if (rv != APR_SUCCESS) goto cleanup;
 
-    n = workers->nslots = workers->max_workers;
-    workers->slots = apr_pcalloc(workers->pool, n * sizeof(h2_slot));
-    if (workers->slots == NULL) {
-        n = workers->nslots = 0;
-        rv = APR_ENOMEM;
-        goto cleanup;
-    }
-    for (i = 0; i < n; ++i) {
+    apr_thread_mutex_lock(workers->lock);
+    locked = 1;
+
+    /* create the slots and put them on the free list */
+    workers->slots = apr_pcalloc(workers->pool, workers->max_slots * sizeof(h2_slot));
+
+    for (i = 0; i < workers->max_slots; ++i) {
         workers->slots[i].id = i;
-    }
-    /* we activate all for now, TODO: support min_workers again.
-     * do this in reverse for vanity reasons so slot 0 will most
-     * likely be at head of idle queue. */
-    n = workers->min_workers;
-    for (i = n-1; i >= 0; --i) {
-        rv = activate_slot(workers, &workers->slots[i]);
+        workers->slots[i].state = H2_SLOT_FREE;
+        workers->slots[i].workers = workers;
+        APR_RING_ELEM_INIT(&workers->slots[i], link);
+        APR_RING_INSERT_TAIL(&workers->free, &workers->slots[i], h2_slot, link);
+        rv = apr_thread_cond_create(&workers->slots[i].more_work, workers->pool);
         if (rv != APR_SUCCESS) goto cleanup;
     }
-    /* the rest of the slots go on the free list */
-    for(i = n; i < workers->nslots; ++i) {
-        push_slot(&workers->free, &workers->slots[i]);
+
+    /* activate the min amount of workers */
+    for (i = 0; i < workers->min_active; ++i) {
+        rv = activate_slot(workers);
+        if (rv != APR_SUCCESS) goto cleanup;
     }
-    workers->dynamic = (workers->worker_count < workers->max_workers);
 
 cleanup:
+    if (locked) {
+        apr_thread_mutex_unlock(workers->lock);
+    }
     if (rv == APR_SUCCESS) {
         /* Stop/join the workers threads when the MPM child exits (pchild is
          * destroyed), and as a pre_cleanup of pchild thus before the threads
@@ -476,24 +522,84 @@ cleanup:
         apr_pool_pre_cleanup_register(pchild, workers, workers_pool_cleanup);    
         return workers;
     }
+    ap_log_error(APLOG_MARK, APLOG_DEBUG, rv, workers->s,
+                 "h2_workers: errors initializing");
     return NULL;
 }
 
-apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m)
+apr_size_t h2_workers_get_max_workers(h2_workers *workers)
 {
-    apr_status_t status = h2_fifo_push(workers->mplxs, m);
-    wake_idle_worker(workers);
-    return status;
+    return workers->max_slots;
 }
 
-apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m)
+void h2_workers_graceful_shutdown(h2_workers *workers)
 {
-    return h2_fifo_remove(workers->mplxs, m);
+    apr_thread_mutex_lock(workers->lock);
+    workers->shutdown = 1;
+    workers->idle_limit = apr_time_from_sec(1);
+    wake_all_idles(workers);
+    apr_thread_mutex_unlock(workers->lock);
 }
 
-void h2_workers_graceful_shutdown(h2_workers *workers)
+ap_conn_producer_t *h2_workers_register(h2_workers *workers,
+                                        apr_pool_t *producer_pool,
+                                        const char *name,
+                                        ap_conn_producer_next *fn_next,
+                                        ap_conn_producer_done *fn_done,
+                                        void *baton)
 {
-    apr_atomic_set32(&workers->shutdown, 1);
-    apr_atomic_set32(&workers->max_idle_secs, 1);
-    wake_non_essential_workers(workers);
+    ap_conn_producer_t *prod;
+
+    prod = apr_pcalloc(producer_pool, sizeof(*prod));
+    APR_RING_ELEM_INIT(prod, link);
+    prod->name = name;
+    prod->fn_next = fn_next;
+    prod->fn_done = fn_done;
+    prod->baton = baton;
+
+    apr_thread_mutex_lock(workers->lock);
+    prod->state = PROD_IDLE;
+    APR_RING_INSERT_TAIL(&workers->prod_idle, prod, ap_conn_producer_t, link);
+    apr_thread_mutex_unlock(workers->lock);
+
+    return prod;
+}
+
+apr_status_t h2_workers_join(h2_workers *workers, ap_conn_producer_t *prod)
+{
+    apr_status_t rv = APR_SUCCESS;
+
+    apr_thread_mutex_lock(workers->lock);
+    if (PROD_JOINED == prod->state) {
+        AP_DEBUG_ASSERT(APR_RING_NEXT(prod, link) == prod); /* should be in no ring */
+        rv = APR_EINVAL;
+    }
+    else {
+        AP_DEBUG_ASSERT(PROD_ACTIVE == prod->state || PROD_IDLE == prod->state);
+        APR_RING_REMOVE(prod, link);
+        prod->state = PROD_JOINED; /* prevent further activations */
+        while (prod->conns_active > 0) {
+            apr_thread_cond_wait(workers->prod_done, workers->lock);
+        }
+        APR_RING_ELEM_INIT(prod, link); /* make it link to itself */
+    }
+    apr_thread_mutex_unlock(workers->lock);
+    return rv;
+}
+
+apr_status_t h2_workers_activate(h2_workers *workers, ap_conn_producer_t *prod)
+{
+    apr_status_t rv = APR_SUCCESS;
+    apr_thread_mutex_lock(workers->lock);
+    if (PROD_IDLE == prod->state) {
+        APR_RING_REMOVE(prod, link);
+        prod->state = PROD_ACTIVE;
+        APR_RING_INSERT_TAIL(&workers->prod_active, prod, ap_conn_producer_t, link);
+        wake_idle_worker(workers, prod);
+    }
+    else if (PROD_JOINED == prod->state) {
+        rv = APR_EINVAL;
+    }
+    apr_thread_mutex_unlock(workers->lock);
+    return rv;
 }
index 0de30406761d17d0fafdaa6cf4154360d93ccf4c..20169a0d50dd375f20a9283de32cac9e4b9059c1 100644 (file)
@@ -28,59 +28,94 @@ struct h2_mplx;
 struct h2_request;
 struct h2_fifo;
 
-struct h2_slot;
-
 typedef struct h2_workers h2_workers;
 
-struct h2_workers {
-    server_rec *s;
-    apr_pool_t *pool;
-    
-    int next_worker_id;
-    apr_uint32_t max_workers;
-    apr_uint32_t min_workers;
-    /* atomic */ apr_uint32_t worker_count;
-    /* atomic */ apr_uint32_t max_idle_secs;
-    /* atomic */ apr_uint32_t aborted;
-    /* atomic */ apr_uint32_t shutdown;
-    int dynamic;
 
-    apr_threadattr_t *thread_attr;
-    int nslots;
-    struct h2_slot *slots;
+/**
+ * Create a worker set with a maximum number of 'slots', e.g. worker
+ * threads to run. Always keep `min_active` workers running. Shutdown
+ * any additional workers after `idle_secs` seconds of doing nothing.
+ *
+ * @oaram s the base server
+ * @param pool for allocations
+ * @param min_active minimum number of workers to run
+ * @param max_slots maximum number of worker slots
+ * @param idle_limit upper duration of idle after a non-minimal slots shuts down
+ */
+h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pool,
+                              int max_slots, int min_active, apr_time_t idle_limit);
 
-    struct h2_slot *free;
-    struct h2_slot *idle;
-    struct h2_slot *zombies;
-    
-    struct h2_fifo *mplxs;
-    
-    struct apr_thread_mutex_t *lock;
-    struct apr_thread_cond_t *all_done;
-};
+/**
+ *  Shut down processing gracefully by terminating all idle workers.
+ */
+void h2_workers_graceful_shutdown(h2_workers *workers);
 
+/**
+ * Get the maximum number of workers.
+ */
+apr_size_t h2_workers_get_max_workers(h2_workers *workers);
 
-/* Create a worker pool with the given minimum and maximum number of
- * threads.
+/**
+ * ap_conn_producer_t is the source of connections (conn_rec*) to run.
+ *
+ * Active producers are queried by idle workers for connections.
+ * If they do not hand one back, they become inactive and are not
+ * queried further. `h2_workers_activate()` places them on the active
+ * list again.
+ *
+ * A producer finishing MUST call `h2_workers_join()` which removes
+ * it completely from workers processing and waits for all ongoing
+ * work for this producer to be done.
  */
-h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pool,
-                              int min_size, int max_size, int idle_secs);
+typedef struct ap_conn_producer_t ap_conn_producer_t;
 
 /**
- * Registers a h2_mplx for scheduling. If this h2_mplx runs
- * out of work, it will be automatically be unregistered. Should
- * new work arrive, it needs to be registered again.
+ * Ask a producer for the next connection to process.
+ * @param baton value from producer registration
+ * @param pconn holds the connection to process on return
+ * @param pmore if the producer has more connections that may be retrieved
+ * @return APR_SUCCESS for a connection to process, APR_EAGAIN for no
+ *         connection being available at the time.
  */
-apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m);
+typedef conn_rec *ap_conn_producer_next(void *baton, int *pmore);
 
 /**
- * Remove a h2_mplx from the worker registry.
+ * Tell the producer that processing the connection is done.
+ * @param baton value from producer registration
+ * @param conn the connection that has been processed.
  */
-apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m);
+typedef void ap_conn_producer_done(void *baton, conn_rec *conn);
 
 /**
- *  Shut down processing gracefully by terminating all idle workers.
+ * Register a new producer with the given `baton` and callback functions.
+ * Will allocate internal structures from the given pool (but make no use
+ * of the pool after registration).
+ * Producers are inactive on registration. See `h2_workers_activate()`.
+ * @param producer_pool to allocate the producer from
+ * @param name descriptive name of the producer, must not be unique
+ * @param fn_next callback for retrieving connections to process
+ * @param fn_done callback for processed connections
+ * @param baton provided value passed on in callbacks
+ * @return the producer instance created
  */
-void h2_workers_graceful_shutdown(h2_workers *workers);
+ap_conn_producer_t *h2_workers_register(h2_workers *workers,
+                                        apr_pool_t *producer_pool,
+                                        const char *name,
+                                        ap_conn_producer_next *fn_next,
+                                        ap_conn_producer_done *fn_done,
+                                        void *baton);
+
+/**
+ * Stop retrieving more connection from the producer and wait
+ * for all ongoing for from that producer to be done.
+ */
+apr_status_t h2_workers_join(h2_workers *workers, ap_conn_producer_t *producer);
+
+/**
+ * Activate a producer. A worker will query the producer for a connection
+ * to process, once a worker is available.
+ * This may be called, irregardless of the producers active/inactive.
+ */
+apr_status_t h2_workers_activate(h2_workers *workers, ap_conn_producer_t *producer);
 
 #endif /* defined(__mod_h2__h2_workers__) */
index 36acc432f9342b18b79f8d6b21aaeb1200f4748e..a4800c148b32f5e508a9073c71e5338c30655bf8 100644 (file)
@@ -150,7 +150,9 @@ static int http2_is_h2(conn_rec *);
 
 static void http2_get_num_workers(server_rec *s, int *minw, int *maxw)
 {
-    h2_get_num_workers(s, minw, maxw);
+    apr_time_t tdummy;
+
+    h2_get_workers_config(s, minw, maxw, &tdummy);
 }
 
 /* Runs once per created child process. Perform any process