]> git.ipfire.org Git - thirdparty/apache/httpd.git/commitdiff
improvements in http2 connection and worker shutdown, hopefully addressing high load...
authorStefan Eissing <icing@apache.org>
Fri, 23 Oct 2015 11:54:41 +0000 (11:54 +0000)
committerStefan Eissing <icing@apache.org>
Fri, 23 Oct 2015 11:54:41 +0000 (11:54 +0000)
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1710185 13f79535-47bb-0310-9956-ffa450edef68

CHANGES
modules/http2/h2_conn.c
modules/http2/h2_conn_io.c
modules/http2/h2_mplx.c
modules/http2/h2_session.c
modules/http2/h2_worker.c
modules/http2/h2_workers.c
modules/http2/h2_workers.h

diff --git a/CHANGES b/CHANGES
index 6f46298db8135740f2f5c439b0496f5f08a918ea..37050ca6aea7d787adff7b7e22e2d60ce7c729bf 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,11 @@
                                                          -*- coding: utf-8 -*-
 Changes with Apache 2.5.0
 
+  *) mod_http2: reworked deallocation on connection shutdown and worker
+     abort. Separate parent pool for all workers. worker threads are joined
+     on planned worker shutdown.
+     [Yann Ylavic, Stefan Eissing]
+     
   *) mod_ssl: when receiving requests for other virtual hosts than the handshake
      server, the SSL parameters are checked for equality. With equal 
      configuration, requests are passed for processing. Any change will trigger
index 31029d6a1050b462f5f238b972608340f1b3ccc9..00decc723c290b2bfba21c3afcfdf7d3bfd58610 100644 (file)
@@ -254,7 +254,7 @@ apr_status_t h2_session_process(h2_session *session)
             have_written = 1;
             wait_micros = 0;
         }
-        else if (status == APR_EAGAIN) {
+        else if (APR_STATUS_IS_EAGAIN(status)) {
             /* nop */
         }
         else if (status == APR_TIMEUP) {
index 08d00a4f3a6924e903d853e4d54f111a918ac5e1..3a094218f4a593714c81027d4700780a7e3dfb00 100644 (file)
@@ -180,6 +180,10 @@ static apr_status_t flush_out(apr_bucket_brigade *bb, void *ctx)
     apr_status_t status;
     apr_off_t bblen;
     
+    if (APR_BRIGADE_EMPTY(bb)) {
+        return APR_SUCCESS;
+    }
+    
     ap_update_child_status(io->connection->sbh, SERVER_BUSY_WRITE, NULL);
     status = apr_brigade_length(bb, 1, &bblen);
     if (status == APR_SUCCESS) {
@@ -193,6 +197,9 @@ static apr_status_t flush_out(apr_bucket_brigade *bb, void *ctx)
     return status;
 }
 
+/* Bring the current buffer content into the output brigade, appropriately
+ * chunked.
+ */
 static apr_status_t bucketeer_buffer(h2_conn_io *io) {
     const char *data = io->buffer;
     apr_size_t remaining = io->buflen;
@@ -282,30 +289,22 @@ apr_status_t h2_conn_io_flush(h2_conn_io *io)
     if (io->unflushed) {
         apr_status_t status; 
         if (io->buflen > 0) {
+            /* something in the buffer, put it in the output brigade */
             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, io->connection,
                           "h2_conn_io: flush, flushing %ld bytes", (long)io->buflen);
             bucketeer_buffer(io);
             io->buflen = 0;
         }
-        /* Append flush.
-         */
-        APR_BRIGADE_INSERT_TAIL(io->output,
-                                apr_bucket_flush_create(io->output->bucket_alloc));
-        
-        /* Send it out through installed filters (TLS) to the client */
+        /* Send it out */
         status = flush_out(io->output, io);
         
-        if (status == APR_SUCCESS) {
-            /* These are all fine and no reason for concern. Everything else
-             * is interesting. */
-            io->unflushed = 0;
-        }
-        else {
+        if (status != APR_SUCCESS) {
             ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, io->connection,
-                          "h2_conn_io: flush error");
+                          "h2_conn_io: flush");
+            return status;
         }
-        
-        return status;
+
+        io->unflushed = 0;
     }
     return APR_SUCCESS;
 }
index aec9f3606c6eaf68e058b0e9f29854b1e955c522..68d307a0fdd339d9e311925b0eb4ea3cac9210e6 100644 (file)
@@ -144,12 +144,18 @@ static void reference(h2_mplx *m)
     apr_atomic_inc32(&m->refs);
 }
 
-static void release(h2_mplx *m)
+static void release(h2_mplx *m, int lock)
 {
     if (!apr_atomic_dec32(&m->refs)) {
+        if (lock) {
+            apr_thread_mutex_lock(m->lock);
+        }
         if (m->join_wait) {
             apr_thread_cond_signal(m->join_wait);
         }
+        if (lock) {
+            apr_thread_mutex_unlock(m->lock);
+        }
     }
 }
 
@@ -159,7 +165,7 @@ void h2_mplx_reference(h2_mplx *m)
 }
 void h2_mplx_release(h2_mplx *m)
 {
-    release(m);
+    release(m, 1);
 }
 
 static void workers_register(h2_mplx *m) {
@@ -188,29 +194,17 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
 
     status = apr_thread_mutex_lock(m->lock);
     if (APR_SUCCESS == status) {
-        int attempts = 0;
-        
-        release(m);
+        release(m, 0);
         while (apr_atomic_read32(&m->refs) > 0) {
             m->join_wait = wait;
-            ap_log_cerror(APLOG_MARK, (attempts? APLOG_INFO : APLOG_DEBUG), 
-                          0, m->c,
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
                           "h2_mplx(%ld): release_join, refs=%d, waiting...", 
                           m->id, m->refs);
-            apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(10));
-            if (++attempts >= 6) {
-                ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c,
-                              APLOGNO(02952) 
-                              "h2_mplx(%ld): join attempts exhausted, refs=%d", 
-                              m->id, m->refs);
-                break;
-            }
-        }
-        if (m->join_wait) {
-            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
-                          "h2_mplx(%ld): release_join -> destroy", m->id);
+            apr_thread_cond_wait(wait, m->lock);
         }
         m->join_wait = NULL;
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c,
+                      "h2_mplx(%ld): release_join -> destroy", m->id);
         apr_thread_mutex_unlock(m->lock);
         h2_mplx_destroy(m);
     }
@@ -353,7 +347,7 @@ apr_status_t h2_mplx_in_read(h2_mplx *m, apr_read_type_e block,
         if (io) {
             io->input_arrived = iowait;
             status = h2_io_in_read(io, bb, 0);
-            while (status == APR_EAGAIN 
+            while (APR_STATUS_IS_EAGAIN(status) 
                    && !is_aborted(m, &status)
                    && block == APR_BLOCK_READ) {
                 apr_thread_cond_wait(io->input_arrived, m->lock);
index b8fa885a2bf699f8de29d76ca521fc6b962a1af9..d8a8735afac45c583905a20b6af5637259334a41 100644 (file)
@@ -41,17 +41,16 @@ static int frame_print(const nghttp2_frame *frame, char *buffer, size_t maxlen);
 
 static int h2_session_status_from_apr_status(apr_status_t rv)
 {
-    switch (rv) {
-        case APR_SUCCESS:
-            return NGHTTP2_NO_ERROR;
-        case APR_EAGAIN:
-        case APR_TIMEUP:
-            return NGHTTP2_ERR_WOULDBLOCK;
-        case APR_EOF:
+    if (rv == APR_SUCCESS) {
+        return NGHTTP2_NO_ERROR;
+    }
+    else if (APR_STATUS_IS_EAGAIN(rv)) {
+        return NGHTTP2_ERR_WOULDBLOCK;
+    }
+    else if (APR_STATUS_IS_EOF(rv)) {
             return NGHTTP2_ERR_EOF;
-        default:
-            return NGHTTP2_ERR_PROTO;
     }
+    return NGHTTP2_ERR_PROTO;
 }
 
 static int stream_open(h2_session *session, int stream_id)
@@ -107,7 +106,7 @@ static ssize_t send_cb(nghttp2_session *ngh2,
     if (status == APR_SUCCESS) {
         return length;
     }
-    if (status == APR_EAGAIN || status == APR_TIMEUP) {
+    if (APR_STATUS_IS_EAGAIN(status)) {
         return NGHTTP2_ERR_WOULDBLOCK;
     }
     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
@@ -509,12 +508,11 @@ static int on_send_data_cb(nghttp2_session *ngh2,
     if (status == APR_SUCCESS) {
         return 0;
     }
-    else if (status != APR_EOF) {
-        ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c,
+    else {
+        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c,
                       APLOGNO(02925) 
                       "h2_stream(%ld-%d): failed send_data_cb",
                       session->id, (int)stream_id);
-        return NGHTTP2_ERR_CALLBACK_FAILURE;
     }
     
     return h2_session_status_from_apr_status(status);
@@ -732,10 +730,12 @@ apr_status_t h2_session_abort(h2_session *session, apr_status_t reason, int rv)
                 rv = 0;                            /* ...gracefully shut down */
                 break;
             case APR_EBADF:        /* connection unusable, terminate silently */
-            case APR_ECONNABORTED:
-                rv = NGHTTP2_ERR_EOF;
-                break;
             default:
+                if (APR_STATUS_IS_ECONNABORTED(reason)
+                    || APR_STATUS_IS_ECONNRESET(reason)
+                    || APR_STATUS_IS_EBADF(reason)) {
+                    rv = NGHTTP2_ERR_EOF;
+                }
                 break;
         }
     }
@@ -911,7 +911,7 @@ apr_status_t h2_session_write(h2_session *session, apr_interval_time_t timeout)
     if (status == APR_SUCCESS) {
         flush_output = 1;
     }
-    else if (status != APR_EAGAIN) {
+    else if (!APR_STATUS_IS_EAGAIN(status)) {
         return status;
     }
     
index 8145b7aaa5f72bdb5d5683651696970768f9f114..297b4b21fe6be1198ecbf2a8a153eb85c9d3bfd7 100644 (file)
@@ -71,6 +71,19 @@ static void* APR_THREAD_FUNC execute(apr_thread_t *thread, void *wctx)
     return NULL;
 }
 
+static apr_status_t cleanup_join_thread(void *ctx)
+{
+    h2_worker *w = ctx;
+    /* do the join only when the worker is aborted. Otherwise,
+     * we are probably in a process shutdown.
+     */
+    if (w->thread && w->aborted) {
+        apr_status_t rv;
+        apr_thread_join(&rv, w->thread);
+    }
+    return APR_SUCCESS;
+}
+
 h2_worker *h2_worker_create(int id,
                             apr_pool_t *parent_pool,
                             apr_threadattr_t *attr,
@@ -110,6 +123,7 @@ h2_worker *h2_worker_create(int id,
             return NULL;
         }
         
+        apr_pool_pre_cleanup_register(pool, w, cleanup_join_thread);
         apr_thread_create(&w->thread, attr, execute, w, pool);
     }
     return w;
index cf3009585b7088c549aacc31ddcaeeb78818855d..18f39d136c9b64dd5f76bd7bf58cc3080175ab3e 100644 (file)
@@ -42,6 +42,22 @@ static int in_list(h2_workers *workers, h2_mplx *m)
     return 0;
 }
 
+static void cleanup_zombies(h2_workers *workers, int lock) {
+    if (lock) {
+        apr_thread_mutex_lock(workers->lock);
+    }
+    while (!H2_WORKER_LIST_EMPTY(&workers->zombies)) {
+        h2_worker *zombie = H2_WORKER_LIST_FIRST(&workers->zombies);
+        H2_WORKER_REMOVE(zombie);
+        ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s,
+                      "h2_workers: cleanup zombie %d", zombie->id);
+        h2_worker_destroy(zombie);
+    }
+    if (lock) {
+        apr_thread_mutex_unlock(workers->lock);
+    }
+}
+
 
 /**
  * Get the next task for the given worker. Will block until a task arrives
@@ -123,23 +139,12 @@ static apr_status_t get_mplx_next(h2_worker *worker, h2_mplx **pm,
             if (!task) {
                 /* Need to wait for either a new mplx to arrive.
                  */
+                cleanup_zombies(workers, 0);
+                
                 if (workers->worker_count > workers->min_size) {
                     apr_time_t now = apr_time_now();
                     if (now >= (start_wait + max_wait)) {
                         /* waited long enough without getting a task. */
-                        status = APR_TIMEUP;
-                    }
-                    else {
-                        ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s,
-                                     "h2_worker(%d): waiting signal, "
-                                     "worker_count=%d", worker->id, 
-                                     (int)workers->worker_count);
-                        status = apr_thread_cond_timedwait(workers->mplx_added,
-                                                           workers->lock, max_wait);
-                    }
-                    
-                    if (status == APR_TIMEUP) {
-                        /* waited long enough */
                         if (workers->worker_count > workers->min_size) {
                             ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, 
                                          workers->s,
@@ -148,6 +153,12 @@ static apr_status_t get_mplx_next(h2_worker *worker, h2_mplx **pm,
                             break;
                         }
                     }
+                    ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s,
+                                 "h2_worker(%d): waiting signal, "
+                                 "worker_count=%d", worker->id, 
+                                 (int)workers->worker_count);
+                    apr_thread_cond_timedwait(workers->mplx_added,
+                                              workers->lock, max_wait);
                 }
                 else {
                     ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s,
@@ -194,11 +205,11 @@ static void worker_done(h2_worker *worker, void *ctx)
     h2_workers *workers = (h2_workers *)ctx;
     apr_status_t status = apr_thread_mutex_lock(workers->lock);
     if (status == APR_SUCCESS) {
-        ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, workers->s,
+        ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s,
                      "h2_worker(%d): done", h2_worker_get_id(worker));
         H2_WORKER_REMOVE(worker);
         --workers->worker_count;
-        h2_worker_destroy(worker);
+        H2_WORKER_LIST_INSERT_TAIL(&workers->zombies, worker);
         
         apr_thread_mutex_unlock(workers->lock);
     }
@@ -213,7 +224,7 @@ static apr_status_t add_worker(h2_workers *workers)
     if (!w) {
         return APR_ENOMEM;
     }
-    ap_log_error(APLOG_MARK, APLOG_TRACE2, 0, workers->s,
+    ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, workers->s,
                  "h2_workers: adding worker(%d)", h2_worker_get_id(w));
     ++workers->worker_count;
     H2_WORKER_LIST_INSERT_TAIL(&workers->workers, w);
@@ -235,15 +246,22 @@ static apr_status_t h2_workers_start(h2_workers *workers) {
     return status;
 }
 
-h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pool,
+h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool,
                               int min_size, int max_size)
 {
     apr_status_t status;
     h2_workers *workers;
+    apr_pool_t *pool;
+
     AP_DEBUG_ASSERT(s);
-    AP_DEBUG_ASSERT(pool);
-    status = APR_SUCCESS;
+    AP_DEBUG_ASSERT(server_pool);
 
+    /* let's have our own pool that will be parent to all h2_worker
+     * instances we create. This happens in various threads, but always
+     * guarded by our lock. Without this pool, all subpool creations would
+     * happen on the pool handed to us, which we do not guard.
+     */
+    apr_pool_create(&pool, server_pool);
     workers = apr_pcalloc(pool, sizeof(h2_workers));
     if (workers) {
         workers->s = s;
@@ -255,6 +273,7 @@ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pool,
         apr_threadattr_create(&workers->thread_attr, workers->pool);
         
         APR_RING_INIT(&workers->workers, h2_worker, link);
+        APR_RING_INIT(&workers->zombies, h2_worker, link);
         APR_RING_INIT(&workers->mplxs, h2_mplx, link);
         
         status = apr_thread_mutex_create(&workers->lock,
@@ -278,6 +297,9 @@ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pool,
 
 void h2_workers_destroy(h2_workers *workers)
 {
+    /* before we go, cleanup any zombie workers that may have accumulated */
+    cleanup_zombies(workers, 1);
+    
     if (workers->mplx_added) {
         apr_thread_cond_destroy(workers->mplx_added);
         workers->mplx_added = NULL;
@@ -294,6 +316,10 @@ void h2_workers_destroy(h2_workers *workers)
         h2_worker *w = H2_WORKER_LIST_FIRST(&workers->workers);
         H2_WORKER_REMOVE(w);
     }
+    if (workers->pool) {
+        apr_pool_destroy(workers->pool);
+        /* workers is gone */
+    }
 }
 
 apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m)
@@ -320,6 +346,9 @@ apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m)
             add_worker(workers);
         }
         
+        /* cleanup any zombie workers that may have accumulated */
+        cleanup_zombies(workers, 0);
+        
         apr_thread_mutex_unlock(workers->lock);
     }
     return status;
@@ -334,6 +363,9 @@ apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m)
             H2_MPLX_REMOVE(m);
             status = APR_SUCCESS;
         }
+        /* cleanup any zombie workers that may have accumulated */
+        cleanup_zombies(workers, 0);
+        
         apr_thread_mutex_unlock(workers->lock);
     }
     return status;
index 50fd6b8ad58db58771cf21f950a48890aaecfabc..99aa1f4daf73a7c837cfcc5b90be914151b13291 100644 (file)
@@ -42,6 +42,7 @@ struct h2_workers {
     apr_threadattr_t *thread_attr;
     
     APR_RING_HEAD(h2_worker_list, h2_worker) workers;
+    APR_RING_HEAD(h2_worker_zombies, h2_worker) zombies;
     APR_RING_HEAD(h2_mplx_list, h2_mplx) mplxs;
     
     int worker_count;