From: Hugo Landau Date: Wed, 24 Apr 2024 11:24:22 +0000 (+0100) Subject: QUIC REACTOR: Inter-thread notification X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c46dd3fc7747392bb4aa64768c118920114ebf04;p=thirdparty%2Fopenssl.git QUIC REACTOR: Inter-thread notification Reviewed-by: Matt Caswell Reviewed-by: Neil Horman (Merged from https://github.com/openssl/openssl/pull/24971) --- diff --git a/include/internal/quic_reactor.h b/include/internal/quic_reactor.h index 95216766550..aa55d378647 100644 --- a/include/internal/quic_reactor.h +++ b/include/internal/quic_reactor.h @@ -100,9 +100,21 @@ struct quic_reactor_st { void (*tick_cb)(QUIC_TICK_RESULT *res, void *arg, uint32_t flags); void *tick_cb_arg; - /* Used to notify other threads. */ + /* Used to notify other threads. Valid only if have_notifier is set. */ RIO_NOTIFIER notifier; + /* + * Condvar to assist synchronising use of the notifier. Valid only if + * have_notifier is set. + */ + CRYPTO_CONDVAR *notifier_cv; + + /* + * Count of the current number of blocking waiters. Like everything else, + * this is protected the caller's mutex (i.e., the engine mutex). + */ + size_t cur_blocking_waiters; + /* * These are true if we would like to know when we can read or write from * the network respectively. @@ -119,6 +131,9 @@ struct quic_reactor_st { /* 1 if notifier is present and initialised. */ unsigned int have_notifier : 1; + + /* 1 if a block_until_pred call has put the notifier in the signalled state. */ + unsigned int signalled_notifier : 1; }; /* Create an OS notifier? */ @@ -173,12 +188,17 @@ RIO_NOTIFIER *ossl_quic_reactor_get0_notifier(QUIC_REACTOR *rtor); * * The blocking I/O adaptation layer implements blocking I/O on top of our * asynchronous core. + */ + +/* + * ossl_quic_reactor_block_until_pred + * ---------------------------------- * - * The core mechanism is block_until_pred(), which does not return until pred() - * returns a value other than 0. The blocker uses OS I/O synchronisation - * primitives (e.g. poll(2)) and ticks the reactor until the predicate is - * satisfied. The blocker is not required to call pred() more than once between - * tick calls. + * The core mechanism of the Blocking I/O Adaption Layer is block_until_pred(), + * which does not return until pred() returns a value other than 0. The blocker + * uses OS I/O synchronisation primitives (e.g. poll(2)) and ticks the reactor + * until the predicate is satisfied. The blocker is not required to call pred() + * more than once between tick calls. * * When pred returns a non-zero value, that value is returned by this function. * This can be used to allow pred() to indicate error conditions and short @@ -209,6 +229,23 @@ int ossl_quic_reactor_block_until_pred(QUIC_REACTOR *rtor, uint32_t flags, CRYPTO_MUTEX *mutex); +/* + * ossl_quic_reactor_notify_other_threads + * -------------------------------------- + * + * Notify other threads currently blocking in + * ossl_quic_reactor_block_until_pred() calls that a predicate they are using + * might now be met due to state changes. + * + * This function must be called after state changes which might cause a + * predicate in another thread to now be met (i.e., ticking). It is a no-op if + * inter-thread notification is not being used. + * + * mutex is required and must be held. + */ +void ossl_quic_reactor_notify_other_threads(QUIC_REACTOR *rtor, + CRYPTO_MUTEX *mutex); + # endif #endif diff --git a/ssl/quic/quic_reactor.c b/ssl/quic/quic_reactor.c index 6d1f8bc5cf2..cdc90fefd88 100644 --- a/ssl/quic/quic_reactor.c +++ b/ssl/quic/quic_reactor.c @@ -9,6 +9,7 @@ #include "internal/quic_reactor.h" #include "internal/common.h" #include "internal/thread_arch.h" +#include /* * Core I/O Reactor Framework @@ -32,10 +33,17 @@ int ossl_quic_reactor_init(QUIC_REACTOR *rtor, rtor->tick_cb = tick_cb; rtor->tick_cb_arg = tick_cb_arg; + rtor->cur_blocking_waiters = 0; + if ((flags & QUIC_REACTOR_FLAG_USE_NOTIFIER) != 0) { if (!ossl_rio_notifier_init(&rtor->notifier)) return 0; + if ((rtor->notifier_cv = ossl_crypto_condvar_new()) == NULL) { + ossl_rio_notifier_cleanup(&rtor->notifier); + return 0; + } + rtor->have_notifier = 1; } else { rtor->have_notifier = 0; @@ -52,6 +60,8 @@ void ossl_quic_reactor_cleanup(QUIC_REACTOR *rtor) if (rtor->have_notifier) { ossl_rio_notifier_cleanup(&rtor->notifier); rtor->have_notifier = 0; + + ossl_crypto_condvar_free(&rtor->notifier_cv); } } @@ -191,6 +201,7 @@ RIO_NOTIFIER *ossl_quic_reactor_get0_notifier(QUIC_REACTOR *rtor) */ static int poll_two_fds(int rfd, int rfd_want_read, int wfd, int wfd_want_write, + int notify_rfd, OSSL_TIME deadline, CRYPTO_MUTEX *mutex) { @@ -224,9 +235,17 @@ static int poll_two_fds(int rfd, int rfd_want_read, if (wfd != -1) openssl_fdset(wfd, &efd_set); + /* Check for notifier FD readability. */ + if (notify_rfd != -1) { + openssl_fdset(notify_rfd, &rfd_set); + openssl_fdset(notify_rfd, &efd_set); + } + maxfd = rfd; if (wfd > maxfd) maxfd = wfd; + if (notify_rfd > maxfd) + maxfd = notify_rfd; if (!ossl_assert(rfd != -1 || wfd != -1 || !ossl_time_is_infinite(deadline))) @@ -269,7 +288,7 @@ static int poll_two_fds(int rfd, int rfd_want_read, #else int pres, timeout_ms; OSSL_TIME now, timeout; - struct pollfd pfds[2] = {0}; + struct pollfd pfds[3] = {0}; size_t npfd = 0; if (rfd == wfd) { @@ -290,6 +309,12 @@ static int poll_two_fds(int rfd, int rfd_want_read, ++npfd; } + if (notify_rfd >= 0) { + pfds[npfd].fd = notify_rfd; + pfds[npfd].events = POLLIN; + ++npfd; + } + if (!ossl_assert(npfd != 0 || !ossl_time_is_infinite(deadline))) /* Do not block forever; should not happen. */ return 0; @@ -336,8 +361,8 @@ static int poll_descriptor_to_fd(const BIO_POLL_DESCRIPTOR *d, int *fd) } /* - * Poll up to two abstract poll descriptors. Currently we only support - * poll descriptors which represent FDs. + * Poll up to two abstract poll descriptors, as well as an optional notify FD. + * Currently we only support poll descriptors which represent FDs. * * If mutex is non-NULL, it is assumed be a lock currently held for write and is * unlocked for the duration of any wait. @@ -348,6 +373,7 @@ static int poll_descriptor_to_fd(const BIO_POLL_DESCRIPTOR *d, int *fd) */ static int poll_two_descriptors(const BIO_POLL_DESCRIPTOR *r, int r_want_read, const BIO_POLL_DESCRIPTOR *w, int w_want_write, + int notify_rfd, OSSL_TIME deadline, CRYPTO_MUTEX *mutex) { @@ -357,7 +383,45 @@ static int poll_two_descriptors(const BIO_POLL_DESCRIPTOR *r, int r_want_read, || !poll_descriptor_to_fd(w, &wfd)) return 0; - return poll_two_fds(rfd, r_want_read, wfd, w_want_write, deadline, mutex); + return poll_two_fds(rfd, r_want_read, wfd, w_want_write, + notify_rfd, deadline, mutex); +} + +void ossl_quic_reactor_notify_other_threads(QUIC_REACTOR *rtor, + CRYPTO_MUTEX *mutex) +{ + if (!rtor->have_notifier) + return; + + /* + * This function is called when we have done anything on this thread which + * might allow a predicate for a block_until_pred call on another thread to + * now be met. + * + * When this happens, we need to wake those threads using the notifier. + * However, we do not want to wake *this* thread (if/when it subsequently + * enters block_until_pred) due to the notifier FD becoming readable. + * Therefore, signal the notifier, and use a CV to detect when all other + * threads have woken. + */ + + if (rtor->cur_blocking_waiters == 0) + /* Nothing to do in this case. */ + return; + + /* Signal the notifier to wake up all threads. */ + if (!rtor->signalled_notifier) { + ossl_rio_notifier_signal(&rtor->notifier); + rtor->signalled_notifier = 1; + } + + /* + * Wait on the CV until all threads have finished the first phase of the + * wakeup process and the last thread out has taken responsibility for + * unsignalling the notifier. + */ + while (rtor->signalled_notifier) + ossl_crypto_condvar_wait(rtor->notifier_cv, mutex); } /* @@ -376,9 +440,12 @@ int ossl_quic_reactor_block_until_pred(QUIC_REACTOR *rtor, uint32_t flags, CRYPTO_MUTEX *mutex) { - int res, net_read_desired, net_write_desired; + int res, net_read_desired, net_write_desired, notifier_fd; OSSL_TIME tick_deadline; + notifier_fd + = (rtor->have_notifier ? ossl_rio_notifier_as_fd(&rtor->notifier) : -1); + for (;;) { if ((flags & SKIP_FIRST_TICK) != 0) flags &= ~SKIP_FIRST_TICK; @@ -397,12 +464,77 @@ int ossl_quic_reactor_block_until_pred(QUIC_REACTOR *rtor, /* Can't wait if there is nothing to wait for. */ return 0; - if (!poll_two_descriptors(ossl_quic_reactor_get_poll_r(rtor), - net_read_desired, - ossl_quic_reactor_get_poll_w(rtor), - net_write_desired, - tick_deadline, - mutex)) + ++rtor->cur_blocking_waiters; + + res = poll_two_descriptors(ossl_quic_reactor_get_poll_r(rtor), + net_read_desired, + ossl_quic_reactor_get_poll_w(rtor), + net_write_desired, + notifier_fd, + tick_deadline, + mutex); + + assert(rtor->cur_blocking_waiters > 0); + --rtor->cur_blocking_waiters; + + /* + * We have now exited the OS poller call. We may have + * (rtor->signalled_notifier), and other threads may still be blocking. + * This means that cur_blocking_waiters may still be non-zero. As such, + * we cannot unsignal the notifier until all threads have had an + * opportunity to wake up. + * + * At the same time, we cannot unsignal in the case where + * cur_blocking_waiters is now zero because this condition may not occur + * reliably. Consider the following scenario: + * + * T1 enters block_until_pred, cur_blocking_waiters -> 1 + * T2 enters block_until_pred, cur_blocking_waiters -> 2 + * T3 enters block_until_pred, cur_blocking_waiters -> 3 + * + * T4 enters block_until_pred, does not block, ticks, + * sees that cur_blocking_waiters > 0 and signals the notifier + * + * T3 wakes, cur_blocking_waiters -> 2 + * T3 predicate is not satisfied, cur_blocking_waiters -> 3, block again + * + * Notifier is still signalled, so T3 immediately wakes again + * and is stuck repeating the above steps. + * + * T1, T2 are also woken by the notifier but never see + * cur_blocking_waiters drop to 0, so never unsignal the notifier. + * + * As such, a two phase approach is chosen when designalling the + * notifier: + * + * First, all of the poll_two_descriptor calls on all threads are + * allowed to exit due to the notifier being signalled. + * + * Second, the thread which happened to be the one which decremented + * cur_blocking_waiters to 0 unsignals the notifier and is then + * responsible for broadcasting to a CV to indicate to the other + * threads that the synchronised wakeup has been cmpleted. Other + * threads wait for this CV to be signalled. + * + */ + if (rtor->have_notifier && rtor->signalled_notifier) { + if (rtor->cur_blocking_waiters == 0) { + ossl_rio_notifier_unsignal(&rtor->notifier); + rtor->signalled_notifier = 0; + + /* + * Release the other threads which have woken up (and possibly + * rtor_notify_other_threads as well). + */ + ossl_crypto_condvar_broadcast(rtor->notifier_cv); + } else { + /* We are not the last waiter out - so wait for that one. */ + while (rtor->signalled_notifier) + ossl_crypto_condvar_wait(rtor->notifier_cv, mutex); + } + } + + if (!res) /* * We don't actually care why the call succeeded (timeout, FD * readiness), we just call reactor_tick and start trying to do I/O @@ -420,4 +552,6 @@ int ossl_quic_reactor_block_until_pred(QUIC_REACTOR *rtor, */ return 0; } + + return res; }