]> git.ipfire.org Git - thirdparty/apache/httpd.git/commitdiff
mod_http2: fixing re-entrancy problems with new master event dispatching
authorStefan Eissing <icing@apache.org>
Fri, 20 May 2016 14:37:39 +0000 (14:37 +0000)
committerStefan Eissing <icing@apache.org>
Fri, 20 May 2016 14:37:39 +0000 (14:37 +0000)
git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1744751 13f79535-47bb-0310-9956-ffa450edef68

modules/http2/h2_bucket_beam.c
modules/http2/h2_mplx.c
modules/http2/h2_util.c
modules/http2/h2_util.h

index 8d0892db7273747b33a9bf0db05640c5de6f0a30..d648b1d159db02b3ab93aec5b7b777cf7a2040a9 100644 (file)
@@ -706,13 +706,13 @@ apr_status_t h2_beam_send(h2_bucket_beam *beam,
             status = APR_ECONNABORTED;
         }
         else if (red_brigade) {
-            int not_emtpy = APR_BRIGADE_EMPTY(red_brigade); 
+            int force_report = !APR_BRIGADE_EMPTY(red_brigade); 
             while (!APR_BRIGADE_EMPTY(red_brigade)
                    && status == APR_SUCCESS) {
                 bred = APR_BRIGADE_FIRST(red_brigade);
                 status = append_bucket(beam, bred, block, beam->red_pool, &bl);
             }
-            report_production(beam, not_emtpy);
+            report_production(beam, force_report);
             if (beam->m_cond) {
                 apr_thread_cond_broadcast(beam->m_cond);
             }
@@ -771,8 +771,8 @@ transfer:
                         
             if (APR_BUCKET_IS_METADATA(bred)) {
                 if (APR_BUCKET_IS_EOS(bred)) {
-                    beam->close_sent = 1;
                     bgreen = apr_bucket_eos_create(bb->bucket_alloc);
+                    beam->close_sent = 1;
                 }
                 else if (APR_BUCKET_IS_FLUSH(bred)) {
                     bgreen = apr_bucket_flush_create(bb->bucket_alloc);
@@ -850,28 +850,24 @@ transfer:
             }
         }
 
-        if ((!beam->green || APR_BRIGADE_EMPTY(beam->green))
-            && H2_BLIST_EMPTY(&beam->red) 
-            && beam->closed && !beam->close_sent) {
-            apr_bucket *b = apr_bucket_eos_create(bb->bucket_alloc);
-            APR_BRIGADE_INSERT_TAIL(bb, b);
-            beam->close_sent = 1;
-            ++transferred;
-            status = APR_SUCCESS;
-        }
-        else if (transferred) {
-            status = APR_SUCCESS;
-        }
-        else if (beam->closed) {
+        if (beam->closed 
+            && (!beam->green || APR_BRIGADE_EMPTY(beam->green))
+            && H2_BLIST_EMPTY(&beam->red)) {
+            /* beam is closed and we have nothing more to receive */ 
             if (!beam->close_sent) {
                 apr_bucket *b = apr_bucket_eos_create(bb->bucket_alloc);
                 APR_BRIGADE_INSERT_TAIL(bb, b);
                 beam->close_sent = 1;
+                ++transferred;
                 status = APR_SUCCESS;
             }
-            else {
-                status = APR_EOF;
-            }
+        }
+        
+        if (transferred) {
+            status = APR_SUCCESS;
+        }
+        else if (beam->closed) {
+            status = APR_EOF;
         }
         else if (block == APR_BLOCK_READ && bl.mutex && beam->m_cond) {
             status = wait_cond(beam, bl.mutex);
index 4861c06550508a5fda6140262bffce865d627844..f7b30fffad51e201c74275b353095e071f15d628 100644 (file)
@@ -369,6 +369,7 @@ static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master)
     if (task->output.beam) {
         m->tx_handles_reserved += 
         h2_beam_get_files_beamed(task->output.beam);
+        h2_beam_on_produced(task->output.beam, NULL, NULL);
     }
     
     slave = task->c;
@@ -502,6 +503,17 @@ static int task_abort_connection(void *ctx, void *val)
     return 1;
 }
 
+static int report_stream_iter(void *ctx, void *val) {
+    h2_mplx *m = ctx;
+    h2_stream *stream = val;
+    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                  "h2_mplx(%ld-%d): exists, started=%d, scheduled=%d, "
+                  "submitted=%d, suspended=%d", 
+                  m->id, stream->id, stream->started, stream->scheduled,
+                  stream->submitted, stream->suspended);
+    return 1;
+}
+
 apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
 {
     apr_status_t status;
@@ -511,6 +523,17 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
     
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         int i, wait_secs = 5;
+
+        if (!h2_ihash_empty(m->streams) && APLOGctrace1(m->c)) {
+            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
+                          "h2_mplx(%ld): release_join with %d streams open, "
+                          "%d streams resume, %d streams ready, %d tasks", 
+                          m->id, (int)h2_ihash_count(m->streams),
+                          (int)h2_ihash_count(m->sresume), 
+                          (int)h2_ihash_count(m->sready), 
+                          (int)h2_ihash_count(m->tasks));
+            h2_ihash_iter(m->streams, report_stream_iter, m);
+        }
         
         /* disable WINDOW_UPDATE callbacks */
         h2_mplx_set_consumed_cb(m, NULL, NULL);
@@ -585,10 +608,9 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
             purge_streams(m);
         }
         AP_DEBUG_ASSERT(h2_ihash_empty(m->spurge));
-        AP_DEBUG_ASSERT(h2_ihash_empty(m->tasks));
         
         if (!h2_ihash_empty(m->tasks)) {
-            ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03056)
+            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056)
                           "h2_mplx(%ld): release_join -> destroy, "
                           "%d tasks still present", 
                           m->id, (int)h2_ihash_count(m->tasks));
@@ -793,26 +815,27 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
         if (m->aborted) {
             status = APR_ECONNABORTED;
         }
-        else if (stream->response) {
-            /* already have a respone, schedule for submit */
-            h2_ihash_add(m->sready, stream);
-        }
         else {
-            h2_beam_create(&stream->input, stream->pool, stream->id, 
-                           "input", 0);
             h2_ihash_add(m->streams, stream);
-            
-            if (!m->need_registration) {
-                m->need_registration = h2_iq_empty(m->q);
+            if (stream->response) {
+                /* already have a respone, schedule for submit */
+                h2_ihash_add(m->sready, stream);
             }
-            if (m->workers_busy < m->workers_max) {
-                do_registration = m->need_registration;
+            else {
+                h2_beam_create(&stream->input, stream->pool, stream->id, 
+                               "input", 0);
+                if (!m->need_registration) {
+                    m->need_registration = h2_iq_empty(m->q);
+                }
+                if (m->workers_busy < m->workers_max) {
+                    do_registration = m->need_registration;
+                }
+                h2_iq_add(m->q, stream->id, cmp, ctx);
+                
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
+                              "h2_mplx(%ld-%d): process, body=%d", 
+                              m->c->id, stream->id, stream->request->body);
             }
-            h2_iq_add(m->q, stream->id, cmp, ctx);
-
-            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
-                          "h2_mplx(%ld-%d): process, body=%d", 
-                          m->c->id, stream->id, stream->request->body);
         }
         leave_mutex(m, acquired);
     }
@@ -958,7 +981,6 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
         task->done_at = apr_time_now();
         if (task->output.beam) {
             h2_beam_on_consumed(task->output.beam, NULL, NULL);
-            h2_beam_on_produced(task->output.beam, NULL, NULL);
             h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL);
         }
         ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c,
@@ -1304,57 +1326,12 @@ void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn)
  * mplx master events dispatching
  ******************************************************************************/
 
-typedef struct {
-    h2_mplx *m;
-    stream_ev_callback *on_resume; 
-    stream_ev_callback *on_response; 
-    void *on_ctx;
-    apr_status_t status;
-} dispatch_ctx;
-
 static int update_window(void *ctx, void *val)
 {
     input_consumed_signal(ctx, val);
     return 1;
 }
 
-static int stream_ready_iter(void *data, void *val)
-{
-    dispatch_ctx *ctx = data;
-    h2_stream *stream = val;
-    h2_task *task = h2_ihash_get(ctx->m->tasks, stream->id);
-    
-    if (task) {
-        task->submitted = 1;
-        if (task->rst_error) {
-            h2_stream_rst(stream, task->rst_error);
-        }
-        else {
-            AP_DEBUG_ASSERT(task->response);
-            h2_stream_set_response(stream, task->response, task->output.beam);
-        }
-    }
-    else {
-        /* We have the stream ready without a task. This happens
-         * when we fail streams early. A response should already
-         * be present.  */
-        AP_DEBUG_ASSERT(stream->response || stream->rst_error);
-    }
-    
-    ctx->status = ctx->on_response(ctx->on_ctx, stream->id);
-    return 1;
-}
-
-static int stream_resume_iter(void *data, void *val)
-{
-    dispatch_ctx *ctx = data;
-    h2_stream *stream = val;
-
-    h2_stream_set_suspended(stream, 0);
-    ctx->status = ctx->on_resume(ctx->on_ctx, stream->id);
-    return 1;
-}
-
 apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, 
                                             stream_ev_callback *on_resume, 
                                             stream_ev_callback *on_response, 
@@ -1362,32 +1339,66 @@ apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m,
 {
     apr_status_t status;
     int acquired;
+    int streams[32];
+    h2_stream *stream;
+    h2_task *task;
+    size_t i, n;
     
     AP_DEBUG_ASSERT(m);
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
-        dispatch_ctx ctx;
-        ctx.m = m;
-        ctx.on_resume = on_resume;
-        ctx.on_response = on_response;
-        ctx.on_ctx = on_ctx;
-        ctx.status = APR_SUCCESS;
-        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, 
+        ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, 
                       "h2_mplx(%ld): dispatch events", m->id);
+                      
         /* update input windows for streams */
         h2_ihash_iter(m->streams, update_window, m);
 
-        if (ctx.on_response) {
-            h2_ihash_iter(m->sready, stream_ready_iter, &ctx);
-            h2_ihash_clear(m->sready);
+        if (on_response && !h2_ihash_empty(m->sready)) {
+            n = h2_ihash_ishift(m->sready, streams, H2_ALEN(streams));
+            for (i = 0; i < n; ++i) {
+                stream = h2_ihash_get(m->streams, streams[i]);
+                if (!stream) {
+                    continue;
+                }
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, 
+                              "h2_mplx(%ld-%d): on_response", 
+                              m->id, stream->id);
+                task = h2_ihash_get(m->tasks, stream->id);
+                if (task) {
+                    task->submitted = 1;
+                    if (task->rst_error) {
+                        h2_stream_rst(stream, task->rst_error);
+                    }
+                    else {
+                        AP_DEBUG_ASSERT(task->response);
+                        h2_stream_set_response(stream, task->response, task->output.beam);
+                    }
+                }
+                else {
+                    /* We have the stream ready without a task. This happens
+                     * when we fail streams early. A response should already
+                     * be present.  */
+                    AP_DEBUG_ASSERT(stream->response || stream->rst_error);
+                }
+                status = on_response(on_ctx, stream->id);
+            }
         }
 
-        if (ctx.on_resume) {
-            h2_ihash_iter(m->sresume, stream_resume_iter, &ctx);
-            h2_ihash_clear(m->sresume);
+        if (on_resume && !h2_ihash_empty(m->sresume)) {
+            n = h2_ihash_ishift(m->sresume, streams, H2_ALEN(streams));
+            for (i = 0; i < n; ++i) {
+                stream = h2_ihash_get(m->streams, streams[i]);
+                if (!stream) {
+                    continue;
+                }
+                ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, 
+                              "h2_mplx(%ld-%d): on_resume", 
+                              m->id, stream->id);
+                h2_stream_set_suspended(stream, 0);
+                status = on_resume(on_ctx, stream->id);
+            }
         }
         
         leave_mutex(m, acquired);
-        return ctx.status;
     }
     return status;
 }
@@ -1397,7 +1408,6 @@ static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
     h2_mplx *m = ctx;
     apr_status_t status;
     h2_stream *stream;
-    h2_task *task;
     int acquired;
     
     AP_DEBUG_ASSERT(m);
@@ -1405,10 +1415,7 @@ static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes)
         stream = h2_ihash_get(m->streams, beam->id);
         if (stream && h2_stream_is_suspended(stream)) {
             h2_ihash_add(m->sresume, stream);
-            task = h2_ihash_get(m->tasks, stream->id);
-            if (task && task->output.beam) {
-                h2_beam_on_produced(task->output.beam, NULL, NULL);
-            }
+            h2_beam_on_produced(beam, NULL, NULL);
             have_out_data_for(m, beam->id);
         }
         leave_mutex(m, acquired);
@@ -1425,17 +1432,15 @@ apr_status_t h2_mplx_suspend_stream(h2_mplx *m, int stream_id)
     AP_DEBUG_ASSERT(m);
     if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
         stream = h2_ihash_get(m->streams, stream_id);
-        if (stream && !h2_stream_is_suspended(stream)) {
+        if (stream) {
             h2_stream_set_suspended(stream, 1);
             task = h2_ihash_get(m->tasks, stream->id);
-            if (task && task->output.beam && h2_beam_empty(task->output.beam)) {
-                /* register callback so that we can resume on new output */
-                h2_beam_on_produced(task->output.beam, output_produced, m);
+            if (stream->started && (!task || task->worker_done)) {
+                h2_ihash_add(m->sresume, stream);
             }
             else {
-                /* if the beam got data in the meantime, add this to the to-be
-                 * resumed streams right away. */
-                h2_ihash_add(m->sresume, stream);
+                /* register callback so that we can resume on new output */
+                h2_beam_on_produced(task->output.beam, output_produced, m);
             }
         }
         leave_mutex(m, acquired);
index a0f82ac057c4deb2e1649261688640865f16fdfb..f8575fa7e10a4b31c5ac7fc295a6ad8adc7a7f2f 100644 (file)
@@ -325,11 +325,84 @@ void h2_ihash_remove(h2_ihash_t *ih, int id)
     apr_hash_set(ih->hash, &id, sizeof(id), NULL);
 }
 
+void h2_ihash_remove_val(h2_ihash_t *ih, void *val)
+{
+    int id = *((int*)((char *)val + ih->ioff));
+    apr_hash_set(ih->hash, &id, sizeof(id), NULL);
+}
+
+
 void h2_ihash_clear(h2_ihash_t *ih)
 {
     apr_hash_clear(ih->hash);
 }
 
+typedef struct {
+    h2_ihash_t *ih;
+    void **buffer;
+    size_t max;
+    size_t len;
+} collect_ctx;
+
+static int collect_iter(void *x, void *val)
+{
+    collect_ctx *ctx = x;
+    if (ctx->len < ctx->max) {
+        ctx->buffer[ctx->len++] = val;
+        return 1;
+    }
+    return 0;
+}
+
+size_t h2_ihash_shift(h2_ihash_t *ih, void **buffer, size_t max)
+{
+    collect_ctx ctx;
+    size_t i;
+    
+    ctx.ih = ih;
+    ctx.buffer = buffer;
+    ctx.max = max;
+    ctx.len = 0;
+    h2_ihash_iter(ih, collect_iter, &ctx);
+    for (i = 0; i < ctx.len; ++i) {
+        h2_ihash_remove_val(ih, buffer[i]);
+    }
+    return ctx.len;
+}
+
+typedef struct {
+    h2_ihash_t *ih;
+    int *buffer;
+    size_t max;
+    size_t len;
+} icollect_ctx;
+
+static int icollect_iter(void *x, void *val)
+{
+    icollect_ctx *ctx = x;
+    if (ctx->len < ctx->max) {
+        ctx->buffer[ctx->len++] = *((int*)((char *)val + ctx->ih->ioff));
+        return 1;
+    }
+    return 0;
+}
+
+size_t h2_ihash_ishift(h2_ihash_t *ih, int *buffer, size_t max)
+{
+    icollect_ctx ctx;
+    size_t i;
+    
+    ctx.ih = ih;
+    ctx.buffer = buffer;
+    ctx.max = max;
+    ctx.len = 0;
+    h2_ihash_iter(ih, icollect_iter, &ctx);
+    for (i = 0; i < ctx.len; ++i) {
+        h2_ihash_remove(ih, buffer[i]);
+    }
+    return ctx.len;
+}
+
 /*******************************************************************************
  * ilist - sorted list for structs with int identifier
  ******************************************************************************/
index c200729c20b3a854eeb0a265b63991417cb75570..99724d7a5d75ce25213456fc12ad64b699b0b0e4 100644 (file)
@@ -64,8 +64,12 @@ int h2_ihash_iter(h2_ihash_t *ih, h2_ihash_iter_t *fn, void *ctx);
 
 void h2_ihash_add(h2_ihash_t *ih, void *val);
 void h2_ihash_remove(h2_ihash_t *ih, int id);
+void h2_ihash_remove_val(h2_ihash_t *ih, void *val);
 void h2_ihash_clear(h2_ihash_t *ih);
 
+size_t h2_ihash_shift(h2_ihash_t *ih, void **buffer, size_t max);
+size_t h2_ihash_ishift(h2_ihash_t *ih, int *buffer, size_t max);
+
 /*******************************************************************************
  * ilist - sorted list for structs with int identifier as first member
  ******************************************************************************/