]> git.ipfire.org Git - thirdparty/openssl.git/commitdiff
QUIC REACTOR: Inter-thread notification
authorHugo Landau <hlandau@openssl.org>
Wed, 24 Apr 2024 11:24:22 +0000 (12:24 +0100)
committerNeil Horman <nhorman@openssl.org>
Mon, 17 Feb 2025 16:27:32 +0000 (11:27 -0500)
Reviewed-by: Matt Caswell <matt@openssl.org>
Reviewed-by: Neil Horman <nhorman@openssl.org>
(Merged from https://github.com/openssl/openssl/pull/24971)

include/internal/quic_reactor.h
ssl/quic/quic_reactor.c

index 952167665506bf4e6d4e8d4c335137cfa3f04a54..aa55d378647af3c15d5e0514008e3b6e4c0911af 100644 (file)
@@ -100,9 +100,21 @@ struct quic_reactor_st {
     void (*tick_cb)(QUIC_TICK_RESULT *res, void *arg, uint32_t flags);
     void *tick_cb_arg;
 
-    /* Used to notify other threads. */
+    /* Used to notify other threads. Valid only if have_notifier is set. */
     RIO_NOTIFIER notifier;
 
+    /*
+     * Condvar to assist synchronising use of the notifier. Valid only if
+     * have_notifier is set.
+     */
+    CRYPTO_CONDVAR *notifier_cv;
+
+    /*
+     * Count of the current number of blocking waiters. Like everything else,
+     * this is protected the caller's mutex (i.e., the engine mutex).
+     */
+    size_t cur_blocking_waiters;
+
     /*
      * These are true if we would like to know when we can read or write from
      * the network respectively.
@@ -119,6 +131,9 @@ struct quic_reactor_st {
 
     /* 1 if notifier is present and initialised. */
     unsigned int have_notifier : 1;
+
+    /* 1 if a block_until_pred call has put the notifier in the signalled state. */
+    unsigned int signalled_notifier : 1;
 };
 
 /* Create an OS notifier? */
@@ -173,12 +188,17 @@ RIO_NOTIFIER *ossl_quic_reactor_get0_notifier(QUIC_REACTOR *rtor);
  *
  * The blocking I/O adaptation layer implements blocking I/O on top of our
  * asynchronous core.
+ */
+
+/*
+ * ossl_quic_reactor_block_until_pred
+ * ----------------------------------
  *
- * The core mechanism is block_until_pred(), which does not return until pred()
- * returns a value other than 0. The blocker uses OS I/O synchronisation
- * primitives (e.g. poll(2)) and ticks the reactor until the predicate is
- * satisfied. The blocker is not required to call pred() more than once between
- * tick calls.
+ * The core mechanism of the Blocking I/O Adaption Layer is block_until_pred(),
+ * which does not return until pred() returns a value other than 0. The blocker
+ * uses OS I/O synchronisation primitives (e.g. poll(2)) and ticks the reactor
+ * until the predicate is satisfied. The blocker is not required to call pred()
+ * more than once between tick calls.
  *
  * When pred returns a non-zero value, that value is returned by this function.
  * This can be used to allow pred() to indicate error conditions and short
@@ -209,6 +229,23 @@ int ossl_quic_reactor_block_until_pred(QUIC_REACTOR *rtor,
                                        uint32_t flags,
                                        CRYPTO_MUTEX *mutex);
 
+/*
+ * ossl_quic_reactor_notify_other_threads
+ * --------------------------------------
+ *
+ * Notify other threads currently blocking in
+ * ossl_quic_reactor_block_until_pred() calls that a predicate they are using
+ * might now be met due to state changes.
+ *
+ * This function must be called after state changes which might cause a
+ * predicate in another thread to now be met (i.e., ticking). It is a no-op if
+ * inter-thread notification is not being used.
+ *
+ * mutex is required and must be held.
+ */
+void ossl_quic_reactor_notify_other_threads(QUIC_REACTOR *rtor,
+                                            CRYPTO_MUTEX *mutex);
+
 # endif
 
 #endif
index 6d1f8bc5cf2908757446ed82604d862c95033c08..cdc90fefd888afaf355ead4553c7e847ada0f391 100644 (file)
@@ -9,6 +9,7 @@
 #include "internal/quic_reactor.h"
 #include "internal/common.h"
 #include "internal/thread_arch.h"
+#include <assert.h>
 
 /*
  * Core I/O Reactor Framework
@@ -32,10 +33,17 @@ int ossl_quic_reactor_init(QUIC_REACTOR *rtor,
     rtor->tick_cb           = tick_cb;
     rtor->tick_cb_arg       = tick_cb_arg;
 
+    rtor->cur_blocking_waiters = 0;
+
     if ((flags & QUIC_REACTOR_FLAG_USE_NOTIFIER) != 0) {
         if (!ossl_rio_notifier_init(&rtor->notifier))
             return 0;
 
+        if ((rtor->notifier_cv = ossl_crypto_condvar_new()) == NULL) {
+            ossl_rio_notifier_cleanup(&rtor->notifier);
+            return 0;
+        }
+
         rtor->have_notifier = 1;
     } else {
         rtor->have_notifier = 0;
@@ -52,6 +60,8 @@ void ossl_quic_reactor_cleanup(QUIC_REACTOR *rtor)
     if (rtor->have_notifier) {
         ossl_rio_notifier_cleanup(&rtor->notifier);
         rtor->have_notifier = 0;
+
+        ossl_crypto_condvar_free(&rtor->notifier_cv);
     }
 }
 
@@ -191,6 +201,7 @@ RIO_NOTIFIER *ossl_quic_reactor_get0_notifier(QUIC_REACTOR *rtor)
  */
 static int poll_two_fds(int rfd, int rfd_want_read,
                         int wfd, int wfd_want_write,
+                        int notify_rfd,
                         OSSL_TIME deadline,
                         CRYPTO_MUTEX *mutex)
 {
@@ -224,9 +235,17 @@ static int poll_two_fds(int rfd, int rfd_want_read,
     if (wfd != -1)
         openssl_fdset(wfd, &efd_set);
 
+    /* Check for notifier FD readability. */
+    if (notify_rfd != -1) {
+        openssl_fdset(notify_rfd, &rfd_set);
+        openssl_fdset(notify_rfd, &efd_set);
+    }
+
     maxfd = rfd;
     if (wfd > maxfd)
         maxfd = wfd;
+    if (notify_rfd > maxfd)
+        maxfd = notify_rfd;
 
     if (!ossl_assert(rfd != -1 || wfd != -1
                      || !ossl_time_is_infinite(deadline)))
@@ -269,7 +288,7 @@ static int poll_two_fds(int rfd, int rfd_want_read,
 #else
     int pres, timeout_ms;
     OSSL_TIME now, timeout;
-    struct pollfd pfds[2] = {0};
+    struct pollfd pfds[3] = {0};
     size_t npfd = 0;
 
     if (rfd == wfd) {
@@ -290,6 +309,12 @@ static int poll_two_fds(int rfd, int rfd_want_read,
             ++npfd;
     }
 
+    if (notify_rfd >= 0) {
+        pfds[npfd].fd       = notify_rfd;
+        pfds[npfd].events   = POLLIN;
+        ++npfd;
+    }
+
     if (!ossl_assert(npfd != 0 || !ossl_time_is_infinite(deadline)))
         /* Do not block forever; should not happen. */
         return 0;
@@ -336,8 +361,8 @@ static int poll_descriptor_to_fd(const BIO_POLL_DESCRIPTOR *d, int *fd)
 }
 
 /*
- * Poll up to two abstract poll descriptors. Currently we only support
- * poll descriptors which represent FDs.
+ * Poll up to two abstract poll descriptors, as well as an optional notify FD.
+ * Currently we only support poll descriptors which represent FDs.
  *
  * If mutex is non-NULL, it is assumed be a lock currently held for write and is
  * unlocked for the duration of any wait.
@@ -348,6 +373,7 @@ static int poll_descriptor_to_fd(const BIO_POLL_DESCRIPTOR *d, int *fd)
  */
 static int poll_two_descriptors(const BIO_POLL_DESCRIPTOR *r, int r_want_read,
                                 const BIO_POLL_DESCRIPTOR *w, int w_want_write,
+                                int notify_rfd,
                                 OSSL_TIME deadline,
                                 CRYPTO_MUTEX *mutex)
 {
@@ -357,7 +383,45 @@ static int poll_two_descriptors(const BIO_POLL_DESCRIPTOR *r, int r_want_read,
         || !poll_descriptor_to_fd(w, &wfd))
         return 0;
 
-    return poll_two_fds(rfd, r_want_read, wfd, w_want_write, deadline, mutex);
+    return poll_two_fds(rfd, r_want_read, wfd, w_want_write,
+                        notify_rfd, deadline, mutex);
+}
+
+void ossl_quic_reactor_notify_other_threads(QUIC_REACTOR *rtor,
+                                            CRYPTO_MUTEX *mutex)
+{
+    if (!rtor->have_notifier)
+        return;
+
+    /*
+     * This function is called when we have done anything on this thread which
+     * might allow a predicate for a block_until_pred call on another thread to
+     * now be met.
+     *
+     * When this happens, we need to wake those threads using the notifier.
+     * However, we do not want to wake *this* thread (if/when it subsequently
+     * enters block_until_pred) due to the notifier FD becoming readable.
+     * Therefore, signal the notifier, and use a CV to detect when all other
+     * threads have woken.
+     */
+
+   if (rtor->cur_blocking_waiters == 0)
+       /* Nothing to do in this case. */
+       return;
+
+   /* Signal the notifier to wake up all threads. */
+   if (!rtor->signalled_notifier) {
+       ossl_rio_notifier_signal(&rtor->notifier);
+       rtor->signalled_notifier = 1;
+   }
+
+   /*
+    * Wait on the CV until all threads have finished the first phase of the
+    * wakeup process and the last thread out has taken responsibility for
+    * unsignalling the notifier.
+    */
+    while (rtor->signalled_notifier)
+        ossl_crypto_condvar_wait(rtor->notifier_cv, mutex);
 }
 
 /*
@@ -376,9 +440,12 @@ int ossl_quic_reactor_block_until_pred(QUIC_REACTOR *rtor,
                                        uint32_t flags,
                                        CRYPTO_MUTEX *mutex)
 {
-    int res, net_read_desired, net_write_desired;
+    int res, net_read_desired, net_write_desired, notifier_fd;
     OSSL_TIME tick_deadline;
 
+    notifier_fd
+        = (rtor->have_notifier ? ossl_rio_notifier_as_fd(&rtor->notifier) : -1);
+
     for (;;) {
         if ((flags & SKIP_FIRST_TICK) != 0)
             flags &= ~SKIP_FIRST_TICK;
@@ -397,12 +464,77 @@ int ossl_quic_reactor_block_until_pred(QUIC_REACTOR *rtor,
             /* Can't wait if there is nothing to wait for. */
             return 0;
 
-        if (!poll_two_descriptors(ossl_quic_reactor_get_poll_r(rtor),
-                                  net_read_desired,
-                                  ossl_quic_reactor_get_poll_w(rtor),
-                                  net_write_desired,
-                                  tick_deadline,
-                                  mutex))
+        ++rtor->cur_blocking_waiters;
+
+        res = poll_two_descriptors(ossl_quic_reactor_get_poll_r(rtor),
+                                   net_read_desired,
+                                   ossl_quic_reactor_get_poll_w(rtor),
+                                   net_write_desired,
+                                   notifier_fd,
+                                   tick_deadline,
+                                   mutex);
+
+        assert(rtor->cur_blocking_waiters > 0);
+        --rtor->cur_blocking_waiters;
+
+        /*
+         * We have now exited the OS poller call. We may have
+         * (rtor->signalled_notifier), and other threads may still be blocking.
+         * This means that cur_blocking_waiters may still be non-zero. As such,
+         * we cannot unsignal the notifier until all threads have had an
+         * opportunity to wake up.
+         *
+         * At the same time, we cannot unsignal in the case where
+         * cur_blocking_waiters is now zero because this condition may not occur
+         * reliably. Consider the following scenario:
+         *
+         *   T1 enters block_until_pred, cur_blocking_waiters -> 1
+         *   T2 enters block_until_pred, cur_blocking_waiters -> 2
+         *   T3 enters block_until_pred, cur_blocking_waiters -> 3
+         *
+         *   T4 enters block_until_pred, does not block, ticks,
+         *     sees that cur_blocking_waiters > 0 and signals the notifier
+         *
+         *   T3 wakes, cur_blocking_waiters -> 2
+         *   T3 predicate is not satisfied, cur_blocking_waiters -> 3, block again
+         *
+         *   Notifier is still signalled, so T3 immediately wakes again
+         *   and is stuck repeating the above steps.
+         *
+         *   T1, T2 are also woken by the notifier but never see
+         *   cur_blocking_waiters drop to 0, so never unsignal the notifier.
+         *
+         * As such, a two phase approach is chosen when designalling the
+         * notifier:
+         *
+         *   First, all of the poll_two_descriptor calls on all threads are
+         *   allowed to exit due to the notifier being signalled.
+         *
+         *   Second, the thread which happened to be the one which decremented
+         *   cur_blocking_waiters to 0 unsignals the notifier and is then
+         *   responsible for broadcasting to a CV to indicate to the other
+         *   threads that the synchronised wakeup has been cmpleted. Other
+         *   threads wait for this CV to be signalled.
+         *
+         */
+        if (rtor->have_notifier && rtor->signalled_notifier) {
+            if (rtor->cur_blocking_waiters == 0) {
+                ossl_rio_notifier_unsignal(&rtor->notifier);
+                rtor->signalled_notifier = 0;
+
+                /*
+                 * Release the other threads which have woken up (and possibly
+                 * rtor_notify_other_threads as well).
+                 */
+                ossl_crypto_condvar_broadcast(rtor->notifier_cv);
+            } else {
+                /* We are not the last waiter out - so wait for that one. */
+                while (rtor->signalled_notifier)
+                    ossl_crypto_condvar_wait(rtor->notifier_cv, mutex);
+            }
+        }
+
+        if (!res)
             /*
              * We don't actually care why the call succeeded (timeout, FD
              * readiness), we just call reactor_tick and start trying to do I/O
@@ -420,4 +552,6 @@ int ossl_quic_reactor_block_until_pred(QUIC_REACTOR *rtor,
              */
             return 0;
     }
+
+    return res;
 }