]> git.ipfire.org Git - thirdparty/apache/httpd.git/commitdiff
mpm_event: poll callbacks fixes and improvements.
authorYann Ylavic <ylavic@apache.org>
Thu, 2 Jul 2020 00:04:57 +0000 (00:04 +0000)
committerYann Ylavic <ylavic@apache.org>
Thu, 2 Jul 2020 00:04:57 +0000 (00:04 +0000)
server/mpm_fdqueue.h;
    Rename "remove" field to "pfds" in timer_event_t.

server/mpm/event/event.c:
    update_reqevents_from_sense():
        New helper to update pfd->reqevents according to the given cs->sense
        for CONN_STATE_WRITE_COMPLETION, reusable in process_socket() and
        event_resume_suspended().

    event_resume_suspended():
        Process lingering close if given cs->state = CONN_STATE_LINGER.
        Call notify_suspend() before entering CONN_STATE_WRITE_COMPLETION.

    event_register_poll_callback_ex():
        Don't poll pfds with reqevents == 0.

    listener_thread():
        Run event_cleanup_poll_callback to both remove the registered pfds
        and leave pfds->pool in a consistent state.
        Process users callabacks after all PT_USER batons have been collected
        in the result pfds loop, otherwise we might race with the callbacks
        within the loop if multiple events/sockets concern the same baton, and
        crash if pfds->pool is cleared.

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1879417 13f79535-47bb-0310-9956-ffa450edef68

server/mpm/event/event.c
server/mpm_fdqueue.h

index 7f835afa955fe12fd7f5f40e1aa6bb40720a4f35..97bc695b21637f644f6c64c986e60a119b4a0210 100644 (file)
@@ -380,12 +380,13 @@ typedef struct
     void *baton;
 } listener_poll_type;
 
-typedef struct
+typedef struct socket_callback_baton
 {
     ap_mpm_callback_fn_t *cbfunc;
     void *user_baton;
     apr_array_header_t *pfds;
     timer_event_t *cancel_event; /* If a timeout was requested, a pointer to the timer event */
+    struct socket_callback_baton *next;
     unsigned int signaled :1;
 } socket_callback_baton_t;
 
@@ -984,6 +985,24 @@ static int event_post_read_request(request_rec *r)
 /* Forward declare */
 static void process_lingering_close(event_conn_state_t *cs);
 
+static void update_reqevents_from_sense(event_conn_state_t *cs)
+{
+    if (cs->pub.sense == CONN_SENSE_WANT_READ) {
+        cs->pfd.reqevents = APR_POLLIN | APR_POLLHUP;
+    }
+    else {
+        cs->pfd.reqevents = APR_POLLOUT;
+    }
+    /* POLLERR is usually returned event only, but some pollset
+     * backends may require it in reqevents to do the right thing,
+     * so it shouldn't hurt (ignored otherwise).
+     */
+    cs->pfd.reqevents |= APR_POLLERR;
+
+    /* Reset to default for the next round */
+    cs->pub.sense = CONN_SENSE_DEFAULT;
+}
+
 /*
  * process one connection in the worker
  */
@@ -1161,19 +1180,7 @@ read_request:
             cs->queue_timestamp = apr_time_now();
             notify_suspend(cs);
 
-            if (cs->pub.sense == CONN_SENSE_WANT_READ) {
-                cs->pfd.reqevents = APR_POLLIN;
-            }
-            else {
-                cs->pfd.reqevents = APR_POLLOUT;
-            }
-            /* POLLHUP/ERR are usually returned event only (ignored here), but
-             * some pollset backends may require them in reqevents to do the
-             * right thing, so it shouldn't hurt.
-             */
-            cs->pfd.reqevents |= APR_POLLHUP | APR_POLLERR;
-            cs->pub.sense = CONN_SENSE_DEFAULT;
-
+            update_reqevents_from_sense(cs);
             apr_thread_mutex_lock(timeout_mutex);
             TO_QUEUE_APPEND(cs->sc->wc_q, cs);
             rv = apr_pollset_add(event_pollset, &cs->pfd);
@@ -1274,15 +1281,24 @@ static apr_status_t event_resume_suspended (conn_rec *c)
     apr_atomic_dec32(&suspended_count);
     c->suspended_baton = NULL;
 
-    cs->queue_timestamp = apr_time_now();
-    cs->pfd.reqevents = (
-            cs->pub.sense == CONN_SENSE_WANT_READ ? APR_POLLIN :
-                    APR_POLLOUT) | APR_POLLHUP | APR_POLLERR;
-    cs->pub.sense = CONN_SENSE_DEFAULT;
-    apr_thread_mutex_lock(timeout_mutex);
-    TO_QUEUE_APPEND(cs->sc->wc_q, cs);
-    apr_pollset_add(event_pollset, &cs->pfd);
-    apr_thread_mutex_unlock(timeout_mutex);
+    if (cs->pub.state == CONN_STATE_LINGER) {
+        int rc = start_lingering_close_blocking(cs);
+        if (rc == OK && (cs->pub.state == CONN_STATE_LINGER_NORMAL ||
+                         cs->pub.state == CONN_STATE_LINGER_SHORT)) {
+            process_lingering_close(cs);
+        }
+    }
+    else {
+        cs->queue_timestamp = apr_time_now();
+        cs->pub.state = CONN_STATE_WRITE_COMPLETION;
+        notify_suspend(cs);
+
+        update_reqevents_from_sense(cs);
+        apr_thread_mutex_lock(timeout_mutex);
+        TO_QUEUE_APPEND(cs->sc->wc_q, cs);
+        apr_pollset_add(event_pollset, &cs->pfd);
+        apr_thread_mutex_unlock(timeout_mutex);
+    }
 
     return OK;
 }
@@ -1503,7 +1519,7 @@ static timer_event_t * event_get_timer_event(apr_time_t t,
                                              ap_mpm_callback_fn_t *cbfn,
                                              void *baton,
                                              int insert, 
-                                             apr_array_header_t *remove)
+                                             apr_array_header_t *pfds)
 {
     timer_event_t *te;
     apr_time_t now = (t < 0) ? 0 : apr_time_now();
@@ -1525,7 +1541,7 @@ static timer_event_t * event_get_timer_event(apr_time_t t,
     te->baton = baton;
     te->canceled = 0;
     te->when = now + t;
-    te->remove = remove;
+    te->pfds = pfds;
 
     if (insert) { 
         apr_time_t next_expiry;
@@ -1553,9 +1569,9 @@ static timer_event_t * event_get_timer_event(apr_time_t t,
 static apr_status_t event_register_timed_callback_ex(apr_time_t t,
                                                   ap_mpm_callback_fn_t *cbfn,
                                                   void *baton, 
-                                                  apr_array_header_t *remove)
+                                                  apr_array_header_t *pfds)
 {
-    event_get_timer_event(t, cbfn, baton, 1, remove);
+    event_get_timer_event(t, cbfn, baton, 1, pfds);
     return APR_SUCCESS;
 }
 
@@ -1581,6 +1597,7 @@ static apr_status_t event_cleanup_poll_callback(void *data)
             if (rc != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rc)) {
                 final_rc = rc;
             }
+            pfd->client_data = NULL;
         }
     }
 
@@ -1593,9 +1610,10 @@ static apr_status_t event_register_poll_callback_ex(apr_array_header_t *pfds,
                                                   void *baton,
                                                   apr_time_t timeout)
 {
-    socket_callback_baton_t *scb = apr_pcalloc(pfds->pool, sizeof(*scb));
-    listener_poll_type *pt = apr_palloc(pfds->pool, sizeof(*pt));
-    apr_status_t rc, final_rc= APR_SUCCESS;
+    apr_pool_t *p = pfds->pool;
+    socket_callback_baton_t *scb = apr_pcalloc(p, sizeof(*scb));
+    listener_poll_type *pt = apr_palloc(p, sizeof(*pt));
+    apr_status_t rc, final_rc = APR_SUCCESS;
     int i;
 
     pt->type = PT_USER;
@@ -1605,23 +1623,33 @@ static apr_status_t event_register_poll_callback_ex(apr_array_header_t *pfds,
     scb->user_baton = baton;
     scb->pfds = pfds;
 
-    apr_pool_pre_cleanup_register(pfds->pool, pfds, event_cleanup_poll_callback);
+    apr_pool_pre_cleanup_register(p, pfds, event_cleanup_poll_callback);
 
     for (i = 0; i < pfds->nelts; i++) {
         apr_pollfd_t *pfd = (apr_pollfd_t *)pfds->elts + i;
-        pfd->reqevents = (pfd->reqevents) | APR_POLLERR | APR_POLLHUP;
-        pfd->client_data = pt;
+        if (pfd->reqevents) {
+            if (pfd->reqevents & APR_POLLIN) {
+                pfd->reqevents |= APR_POLLHUP;
+            }
+            pfd->reqevents |= APR_POLLERR;
+            pfd->client_data = pt;
+        }
+        else {
+            pfd->client_data = NULL;
+        }
     }
 
     if (timeout > 0) { 
-        /* XXX:  This cancel timer event count fire before the pollset is updated */
+        /* XXX:  This cancel timer event can fire before the pollset is updated */
         scb->cancel_event = event_get_timer_event(timeout, tofn, baton, 1, pfds);
     }
     for (i = 0; i < pfds->nelts; i++) {
         apr_pollfd_t *pfd = (apr_pollfd_t *)pfds->elts + i;
-        rc = apr_pollset_add(event_pollset, pfd);
-        if (rc != APR_SUCCESS) {
-            final_rc = rc;
+        if (pfd->client_data) {
+            rc = apr_pollset_add(event_pollset, pfd);
+            if (rc != APR_SUCCESS) {
+                final_rc = rc;
+            }
         }
     }
     return final_rc;
@@ -1818,6 +1846,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
         const apr_pollfd_t *out_pfd;
         apr_int32_t num = 0;
         apr_interval_time_t timeout_interval;
+        socket_callback_baton_t *user_chain;
         apr_time_t now, timeout_time;
         int workers_were_busy = 0;
 
@@ -1887,13 +1916,10 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
                 }
                 apr_skiplist_pop(timer_skiplist, NULL);
                 if (!te->canceled) { 
-                    if (te->remove) {
-                        int i;
-                        for (i = 0; i < te->remove->nelts; i++) {
-                            apr_pollfd_t *pfd;
-                            pfd = (apr_pollfd_t *)te->remove->elts + i;
-                            apr_pollset_remove(event_pollset, pfd);
-                        }
+                    if (te->pfds) {
+                        /* remove all sockets from the pollset */
+                        apr_pool_cleanup_run(te->pfds->pool, te->pfds,
+                                             event_cleanup_poll_callback);
                     }
                     push_timer2worker(te);
                 }
@@ -1956,7 +1982,7 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
                 break;
         }
 
-        for (; num; --num, ++out_pfd) {
+        for (user_chain = NULL; num; --num, ++out_pfd) {
             listener_poll_type *pt = (listener_poll_type *) out_pfd->client_data;
             if (pt->type == PT_CSD) {
                 /* one of the sockets is readable */
@@ -2126,33 +2152,43 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
 
 #endif
             else if (pt->type == PT_USER) {
-                /* masquerade as a timer event that is firing */
-                int i = 0;
-                socket_callback_baton_t *baton = (socket_callback_baton_t *) pt->baton;
+                socket_callback_baton_t *baton = pt->baton;
                 if (baton->cancel_event) {
                     baton->cancel_event->canceled = 1;
                 }
 
-                /* We only signal once per N sockets with this baton */
-                if (!(baton->signaled)) { 
+                /* We only signal once per N sockets with this baton,
+                 * and after this loop to avoid any race/lifetime issue
+                 * with the user callback being called while we handle
+                 * the same baton multiple times here.
+                 */
+                if (!baton->signaled) { 
                     baton->signaled = 1;
-                    te = event_get_timer_event(-1 /* fake timer */, 
-                                               baton->cbfunc, 
-                                               baton->user_baton, 
-                                               0, /* don't insert it */
-                                               NULL /* no associated socket callback */);
-                    /* remove all sockets in my set */
-                    for (i = 0; i < baton->pfds->nelts; i++) {
-                        apr_pollfd_t *pfd = (apr_pollfd_t *)baton->pfds->elts + i;
-                        apr_pollset_remove(event_pollset, pfd);
-                        pfd->client_data = NULL;
-                    }
-
-                    push_timer2worker(te);
+                    baton->next = user_chain;
+                    user_chain = baton;
                 }
             }
         } /* for processing poll */
 
+        /* Time to handle user callbacks chained above */
+        while (user_chain) {
+            socket_callback_baton_t *baton = user_chain;
+            user_chain = user_chain->next;
+            baton->next = NULL;
+
+            /* remove all sockets from the pollset */
+            apr_pool_cleanup_run(baton->pfds->pool, baton->pfds,
+                                 event_cleanup_poll_callback);
+
+            /* masquerade as a timer event that is firing */
+            te = event_get_timer_event(-1 /* fake timer */, 
+                                       baton->cbfunc, 
+                                       baton->user_baton, 
+                                       0, /* don't insert it */
+                                       NULL /* no associated socket callback */);
+            push_timer2worker(te);
+        }
+
         /* XXX possible optimization: stash the current time for use as
          * r->request_time for new requests
          */
index 9aeedde30da675780a04c27182afa603bad9a78a..ef9b0ab75f3a191d56c258acfd6e5df4f5525cba 100644 (file)
@@ -69,7 +69,7 @@ struct timer_event_t
     ap_mpm_callback_fn_t *cbfunc;
     void *baton;
     int canceled;
-    apr_array_header_t *remove;
+    apr_array_header_t *pfds;
 };
 typedef struct timer_event_t timer_event_t;