void (*tick_cb)(QUIC_TICK_RESULT *res, void *arg, uint32_t flags);
void *tick_cb_arg;
- /* Used to notify other threads. */
+ /* Used to notify other threads. Valid only if have_notifier is set. */
RIO_NOTIFIER notifier;
+ /*
+ * Condvar to assist synchronising use of the notifier. Valid only if
+ * have_notifier is set.
+ */
+ CRYPTO_CONDVAR *notifier_cv;
+
+ /*
+ * Count of the current number of blocking waiters. Like everything else,
+ * this is protected the caller's mutex (i.e., the engine mutex).
+ */
+ size_t cur_blocking_waiters;
+
/*
* These are true if we would like to know when we can read or write from
* the network respectively.
/* 1 if notifier is present and initialised. */
unsigned int have_notifier : 1;
+
+ /* 1 if a block_until_pred call has put the notifier in the signalled state. */
+ unsigned int signalled_notifier : 1;
};
/* Create an OS notifier? */
*
* The blocking I/O adaptation layer implements blocking I/O on top of our
* asynchronous core.
+ */
+
+/*
+ * ossl_quic_reactor_block_until_pred
+ * ----------------------------------
*
- * The core mechanism is block_until_pred(), which does not return until pred()
- * returns a value other than 0. The blocker uses OS I/O synchronisation
- * primitives (e.g. poll(2)) and ticks the reactor until the predicate is
- * satisfied. The blocker is not required to call pred() more than once between
- * tick calls.
+ * The core mechanism of the Blocking I/O Adaption Layer is block_until_pred(),
+ * which does not return until pred() returns a value other than 0. The blocker
+ * uses OS I/O synchronisation primitives (e.g. poll(2)) and ticks the reactor
+ * until the predicate is satisfied. The blocker is not required to call pred()
+ * more than once between tick calls.
*
* When pred returns a non-zero value, that value is returned by this function.
* This can be used to allow pred() to indicate error conditions and short
uint32_t flags,
CRYPTO_MUTEX *mutex);
+/*
+ * ossl_quic_reactor_notify_other_threads
+ * --------------------------------------
+ *
+ * Notify other threads currently blocking in
+ * ossl_quic_reactor_block_until_pred() calls that a predicate they are using
+ * might now be met due to state changes.
+ *
+ * This function must be called after state changes which might cause a
+ * predicate in another thread to now be met (i.e., ticking). It is a no-op if
+ * inter-thread notification is not being used.
+ *
+ * mutex is required and must be held.
+ */
+void ossl_quic_reactor_notify_other_threads(QUIC_REACTOR *rtor,
+ CRYPTO_MUTEX *mutex);
+
# endif
#endif
#include "internal/quic_reactor.h"
#include "internal/common.h"
#include "internal/thread_arch.h"
+#include <assert.h>
/*
* Core I/O Reactor Framework
rtor->tick_cb = tick_cb;
rtor->tick_cb_arg = tick_cb_arg;
+ rtor->cur_blocking_waiters = 0;
+
if ((flags & QUIC_REACTOR_FLAG_USE_NOTIFIER) != 0) {
if (!ossl_rio_notifier_init(&rtor->notifier))
return 0;
+ if ((rtor->notifier_cv = ossl_crypto_condvar_new()) == NULL) {
+ ossl_rio_notifier_cleanup(&rtor->notifier);
+ return 0;
+ }
+
rtor->have_notifier = 1;
} else {
rtor->have_notifier = 0;
if (rtor->have_notifier) {
ossl_rio_notifier_cleanup(&rtor->notifier);
rtor->have_notifier = 0;
+
+ ossl_crypto_condvar_free(&rtor->notifier_cv);
}
}
*/
static int poll_two_fds(int rfd, int rfd_want_read,
int wfd, int wfd_want_write,
+ int notify_rfd,
OSSL_TIME deadline,
CRYPTO_MUTEX *mutex)
{
if (wfd != -1)
openssl_fdset(wfd, &efd_set);
+ /* Check for notifier FD readability. */
+ if (notify_rfd != -1) {
+ openssl_fdset(notify_rfd, &rfd_set);
+ openssl_fdset(notify_rfd, &efd_set);
+ }
+
maxfd = rfd;
if (wfd > maxfd)
maxfd = wfd;
+ if (notify_rfd > maxfd)
+ maxfd = notify_rfd;
if (!ossl_assert(rfd != -1 || wfd != -1
|| !ossl_time_is_infinite(deadline)))
#else
int pres, timeout_ms;
OSSL_TIME now, timeout;
- struct pollfd pfds[2] = {0};
+ struct pollfd pfds[3] = {0};
size_t npfd = 0;
if (rfd == wfd) {
++npfd;
}
+ if (notify_rfd >= 0) {
+ pfds[npfd].fd = notify_rfd;
+ pfds[npfd].events = POLLIN;
+ ++npfd;
+ }
+
if (!ossl_assert(npfd != 0 || !ossl_time_is_infinite(deadline)))
/* Do not block forever; should not happen. */
return 0;
}
/*
- * Poll up to two abstract poll descriptors. Currently we only support
- * poll descriptors which represent FDs.
+ * Poll up to two abstract poll descriptors, as well as an optional notify FD.
+ * Currently we only support poll descriptors which represent FDs.
*
* If mutex is non-NULL, it is assumed be a lock currently held for write and is
* unlocked for the duration of any wait.
*/
static int poll_two_descriptors(const BIO_POLL_DESCRIPTOR *r, int r_want_read,
const BIO_POLL_DESCRIPTOR *w, int w_want_write,
+ int notify_rfd,
OSSL_TIME deadline,
CRYPTO_MUTEX *mutex)
{
|| !poll_descriptor_to_fd(w, &wfd))
return 0;
- return poll_two_fds(rfd, r_want_read, wfd, w_want_write, deadline, mutex);
+ return poll_two_fds(rfd, r_want_read, wfd, w_want_write,
+ notify_rfd, deadline, mutex);
+}
+
+void ossl_quic_reactor_notify_other_threads(QUIC_REACTOR *rtor,
+ CRYPTO_MUTEX *mutex)
+{
+ if (!rtor->have_notifier)
+ return;
+
+ /*
+ * This function is called when we have done anything on this thread which
+ * might allow a predicate for a block_until_pred call on another thread to
+ * now be met.
+ *
+ * When this happens, we need to wake those threads using the notifier.
+ * However, we do not want to wake *this* thread (if/when it subsequently
+ * enters block_until_pred) due to the notifier FD becoming readable.
+ * Therefore, signal the notifier, and use a CV to detect when all other
+ * threads have woken.
+ */
+
+ if (rtor->cur_blocking_waiters == 0)
+ /* Nothing to do in this case. */
+ return;
+
+ /* Signal the notifier to wake up all threads. */
+ if (!rtor->signalled_notifier) {
+ ossl_rio_notifier_signal(&rtor->notifier);
+ rtor->signalled_notifier = 1;
+ }
+
+ /*
+ * Wait on the CV until all threads have finished the first phase of the
+ * wakeup process and the last thread out has taken responsibility for
+ * unsignalling the notifier.
+ */
+ while (rtor->signalled_notifier)
+ ossl_crypto_condvar_wait(rtor->notifier_cv, mutex);
}
/*
uint32_t flags,
CRYPTO_MUTEX *mutex)
{
- int res, net_read_desired, net_write_desired;
+ int res, net_read_desired, net_write_desired, notifier_fd;
OSSL_TIME tick_deadline;
+ notifier_fd
+ = (rtor->have_notifier ? ossl_rio_notifier_as_fd(&rtor->notifier) : -1);
+
for (;;) {
if ((flags & SKIP_FIRST_TICK) != 0)
flags &= ~SKIP_FIRST_TICK;
/* Can't wait if there is nothing to wait for. */
return 0;
- if (!poll_two_descriptors(ossl_quic_reactor_get_poll_r(rtor),
- net_read_desired,
- ossl_quic_reactor_get_poll_w(rtor),
- net_write_desired,
- tick_deadline,
- mutex))
+ ++rtor->cur_blocking_waiters;
+
+ res = poll_two_descriptors(ossl_quic_reactor_get_poll_r(rtor),
+ net_read_desired,
+ ossl_quic_reactor_get_poll_w(rtor),
+ net_write_desired,
+ notifier_fd,
+ tick_deadline,
+ mutex);
+
+ assert(rtor->cur_blocking_waiters > 0);
+ --rtor->cur_blocking_waiters;
+
+ /*
+ * We have now exited the OS poller call. We may have
+ * (rtor->signalled_notifier), and other threads may still be blocking.
+ * This means that cur_blocking_waiters may still be non-zero. As such,
+ * we cannot unsignal the notifier until all threads have had an
+ * opportunity to wake up.
+ *
+ * At the same time, we cannot unsignal in the case where
+ * cur_blocking_waiters is now zero because this condition may not occur
+ * reliably. Consider the following scenario:
+ *
+ * T1 enters block_until_pred, cur_blocking_waiters -> 1
+ * T2 enters block_until_pred, cur_blocking_waiters -> 2
+ * T3 enters block_until_pred, cur_blocking_waiters -> 3
+ *
+ * T4 enters block_until_pred, does not block, ticks,
+ * sees that cur_blocking_waiters > 0 and signals the notifier
+ *
+ * T3 wakes, cur_blocking_waiters -> 2
+ * T3 predicate is not satisfied, cur_blocking_waiters -> 3, block again
+ *
+ * Notifier is still signalled, so T3 immediately wakes again
+ * and is stuck repeating the above steps.
+ *
+ * T1, T2 are also woken by the notifier but never see
+ * cur_blocking_waiters drop to 0, so never unsignal the notifier.
+ *
+ * As such, a two phase approach is chosen when designalling the
+ * notifier:
+ *
+ * First, all of the poll_two_descriptor calls on all threads are
+ * allowed to exit due to the notifier being signalled.
+ *
+ * Second, the thread which happened to be the one which decremented
+ * cur_blocking_waiters to 0 unsignals the notifier and is then
+ * responsible for broadcasting to a CV to indicate to the other
+ * threads that the synchronised wakeup has been cmpleted. Other
+ * threads wait for this CV to be signalled.
+ *
+ */
+ if (rtor->have_notifier && rtor->signalled_notifier) {
+ if (rtor->cur_blocking_waiters == 0) {
+ ossl_rio_notifier_unsignal(&rtor->notifier);
+ rtor->signalled_notifier = 0;
+
+ /*
+ * Release the other threads which have woken up (and possibly
+ * rtor_notify_other_threads as well).
+ */
+ ossl_crypto_condvar_broadcast(rtor->notifier_cv);
+ } else {
+ /* We are not the last waiter out - so wait for that one. */
+ while (rtor->signalled_notifier)
+ ossl_crypto_condvar_wait(rtor->notifier_cv, mutex);
+ }
+ }
+
+ if (!res)
/*
* We don't actually care why the call succeeded (timeout, FD
* readiness), we just call reactor_tick and start trying to do I/O
*/
return 0;
}
+
+ return res;
}