]> git.ipfire.org Git - thirdparty/openssl.git/commitdiff
Rework and simplify RCU code
authorBernd Edlinger <bernd.edlinger@hotmail.de>
Sun, 9 Feb 2025 16:24:43 +0000 (17:24 +0100)
committerNeil Horman <nhorman@openssl.org>
Sat, 15 Feb 2025 16:26:02 +0000 (11:26 -0500)
Use __ATOMIC_RELAXED where possible.
Dont store additional values in the users field.

Reviewed-by: Neil Horman <nhorman@openssl.org>
Reviewed-by: Tomas Mraz <tomas@openssl.org>
(Merged from https://github.com/openssl/openssl/pull/26751)

crypto/threads_pthread.c
crypto/threads_win.c

index f645427c9c839fe198479af143e0d917acfb2f27..8af2a504a3af5ae4899b9e36c78ddc989c5bce92 100644 (file)
@@ -245,23 +245,6 @@ static ossl_inline uint64_t fallback_atomic_or_fetch(uint64_t *p, uint64_t m)
 #  define ATOMIC_OR_FETCH(p, v, o) fallback_atomic_or_fetch(p, v)
 # endif
 
-/*
- * users is broken up into 2 parts
- * bits 0-15 current readers
- * bit 32-63 - ID
- */
-# define READER_SHIFT 0
-# define ID_SHIFT 32
-# define READER_SIZE 16
-# define ID_SIZE 32
-
-# define READER_MASK     (((uint64_t)1 << READER_SIZE) - 1)
-# define ID_MASK         (((uint64_t)1 << ID_SIZE) - 1)
-# define READER_COUNT(x) (((uint64_t)(x) >> READER_SHIFT) & READER_MASK)
-# define ID_VAL(x)       (((uint64_t)(x) >> ID_SHIFT) & ID_MASK)
-# define VAL_READER      ((uint64_t)1 << READER_SHIFT)
-# define VAL_ID(x)       ((uint64_t)x << ID_SHIFT)
-
 /*
  * This is the core of an rcu lock. It tracks the readers and writers for the
  * current quiescence point for a given lock. Users is the 64 bit value that
@@ -360,28 +343,16 @@ static struct rcu_qp *get_hold_current_qp(struct rcu_lock_st *lock)
          */
         qp_idx = ATOMIC_LOAD_N(uint64_t, &lock->reader_idx, __ATOMIC_ACQUIRE);
 
-        /*
-         * Notes of use of __ATOMIC_RELEASE
-         * This counter is only read by the write side of the lock, and so we
-         * specify __ATOMIC_RELEASE here to ensure that the write side of the
-         * lock see this during the spin loop read of users, as it waits for the
-         * reader count to approach zero
-         */
-        ATOMIC_ADD_FETCH(&lock->qp_group[qp_idx].users, VAL_READER,
-                         __ATOMIC_RELEASE);
+        ATOMIC_ADD_FETCH(&lock->qp_group[qp_idx].users, (uint64_t)1,
+                         __ATOMIC_ACQUIRE);
 
         /* if the idx hasn't changed, we're good, else try again */
-        if (qp_idx == ATOMIC_LOAD_N(uint64_t, &lock->reader_idx, __ATOMIC_ACQUIRE))
+        if (qp_idx == ATOMIC_LOAD_N(uint64_t, &lock->reader_idx,
+                                    __ATOMIC_RELAXED))
             break;
 
-        /*
-         * Notes on use of __ATOMIC_RELEASE
-         * As with the add above, we want to ensure that this decrement is
-         * seen by the write side of the lock as soon as it happens to prevent
-         * undue spinning waiting for write side completion
-         */
-        ATOMIC_SUB_FETCH(&lock->qp_group[qp_idx].users, VAL_READER,
-                         __ATOMIC_RELEASE);
+        ATOMIC_SUB_FETCH(&lock->qp_group[qp_idx].users, (uint64_t)1,
+                         __ATOMIC_RELAXED);
     }
 
     return &lock->qp_group[qp_idx];
@@ -448,14 +419,14 @@ void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock)
     for (i = 0; i < MAX_QPS; i++) {
         if (data->thread_qps[i].lock == lock) {
             /*
-             * As with read side acquisition, we use __ATOMIC_RELEASE here
-             * to ensure that the decrement is published immediately
-             * to any write side waiters
+             * we have to use __ATOMIC_RELEASE here
+             * to ensure that all preceding read instructions complete
+             * before the decrement is visible to ossl_synchronize_rcu
              */
             data->thread_qps[i].depth--;
             if (data->thread_qps[i].depth == 0) {
-                ret = ATOMIC_SUB_FETCH(&data->thread_qps[i].qp->users, VAL_READER,
-                                       __ATOMIC_RELEASE);
+                ret = ATOMIC_SUB_FETCH(&data->thread_qps[i].qp->users,
+                                       (uint64_t)1, __ATOMIC_RELEASE);
                 OPENSSL_assert(ret != UINT64_MAX);
                 data->thread_qps[i].qp = NULL;
                 data->thread_qps[i].lock = NULL;
@@ -474,9 +445,8 @@ void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock)
  * Write side allocation routine to get the current qp
  * and replace it with a new one
  */
-static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock)
+static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock, uint32_t *curr_id)
 {
-    uint64_t new_id;
     uint64_t current_idx;
 
     pthread_mutex_lock(&lock->alloc_lock);
@@ -499,30 +469,11 @@ static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock)
     lock->current_alloc_idx =
         (lock->current_alloc_idx + 1) % lock->group_count;
 
-    /* get and insert a new id */
-    new_id = lock->id_ctr;
+    *curr_id = lock->id_ctr;
     lock->id_ctr++;
 
-    new_id = VAL_ID(new_id);
-    /*
-     * Even though we are under a write side lock here
-     * We need to use atomic instructions to ensure that the results
-     * of this update are published to the read side prior to updating the
-     * reader idx below
-     */
-    ATOMIC_AND_FETCH(&lock->qp_group[current_idx].users, ID_MASK,
-                     __ATOMIC_RELEASE);
-    ATOMIC_OR_FETCH(&lock->qp_group[current_idx].users, new_id,
-                    __ATOMIC_RELEASE);
-
-    /*
-     * Update the reader index to be the prior qp.
-     * Note the use of __ATOMIC_RELEASE here is based on the corresponding use
-     * of __ATOMIC_ACQUIRE in get_hold_current_qp, as we want any publication
-     * of this value to be seen on the read side immediately after it happens
-     */
     ATOMIC_STORE_N(uint64_t, &lock->reader_idx, lock->current_alloc_idx,
-                   __ATOMIC_RELEASE);
+                   __ATOMIC_RELAXED);
 
     /* wake up any waiters */
     pthread_cond_signal(&lock->alloc_signal);
@@ -562,6 +513,7 @@ void ossl_synchronize_rcu(CRYPTO_RCU_LOCK *lock)
 {
     struct rcu_qp *qp;
     uint64_t count;
+    uint32_t curr_id;
     struct rcu_cb_item *cb_items, *tmpcb;
 
     /*
@@ -571,25 +523,26 @@ void ossl_synchronize_rcu(CRYPTO_RCU_LOCK *lock)
     cb_items = ATOMIC_EXCHANGE_N(prcu_cb_item, &lock->cb_items, NULL,
                                  __ATOMIC_ACQ_REL);
 
-    qp = update_qp(lock);
+    qp = update_qp(lock, &curr_id);
+
+    /* retire in order */
+    pthread_mutex_lock(&lock->prior_lock);
+    while (lock->next_to_retire != curr_id)
+        pthread_cond_wait(&lock->prior_signal, &lock->prior_lock);
+    lock->next_to_retire++;
+    pthread_cond_broadcast(&lock->prior_signal);
+    pthread_mutex_unlock(&lock->prior_lock);
 
     /*
      * wait for the reader count to reach zero
      * Note the use of __ATOMIC_ACQUIRE here to ensure that any
-     * prior __ATOMIC_RELEASE write operation in get_hold_current_qp
+     * prior __ATOMIC_RELEASE write operation in ossl_rcu_read_unlock
      * is visible prior to our read
+     * however this is likely just necessary to silence a tsan warning
      */
     do {
         count = ATOMIC_LOAD_N(uint64_t, &qp->users, __ATOMIC_ACQUIRE);
-    } while (READER_COUNT(count) != 0);
-
-    /* retire in order */
-    pthread_mutex_lock(&lock->prior_lock);
-    while (lock->next_to_retire != ID_VAL(count))
-        pthread_cond_wait(&lock->prior_signal, &lock->prior_lock);
-    lock->next_to_retire++;
-    pthread_cond_broadcast(&lock->prior_signal);
-    pthread_mutex_unlock(&lock->prior_lock);
+    } while (count != (uint64_t)0);
 
     retire_qp(lock, qp);
 
index 59540b6b52b5ddf811efe100088a0b5923fd0a0d..81fc4e80a005867f785b977acc09d535447a97dd 100644 (file)
@@ -43,18 +43,6 @@ typedef struct {
 } CRYPTO_win_rwlock;
 # endif
 
-# define READER_SHIFT 0
-# define ID_SHIFT 32 
-# define READER_SIZE 32 
-# define ID_SIZE 32 
-
-# define READER_MASK     (((LONG64)1 << READER_SIZE)-1)
-# define ID_MASK         (((LONG64)1 << ID_SIZE)-1)
-# define READER_COUNT(x) (((LONG64)(x) >> READER_SHIFT) & READER_MASK)
-# define ID_VAL(x)       (((LONG64)(x) >> ID_SHIFT) & ID_MASK)
-# define VAL_READER      ((LONG64)1 << READER_SHIFT)
-# define VAL_ID(x)       ((LONG64)x << ID_SHIFT)
-
 /*
  * This defines a quescent point (qp)
  * This is the barrier beyond which a writer
@@ -181,10 +169,10 @@ static ossl_inline struct rcu_qp *get_hold_current_qp(CRYPTO_RCU_LOCK *lock)
     /* get the current qp index */
     for (;;) {
         qp_idx = InterlockedOr(&lock->reader_idx, 0);
-        InterlockedAdd64(&lock->qp_group[qp_idx].users, VAL_READER);
+        InterlockedAdd64(&lock->qp_group[qp_idx].users, (LONG64)1);
         if (qp_idx == InterlockedOr(&lock->reader_idx, 0))
             break;
-        InterlockedAdd64(&lock->qp_group[qp_idx].users, -VAL_READER);
+        InterlockedAdd64(&lock->qp_group[qp_idx].users, (LONG64)-1);
     }
 
     return &lock->qp_group[qp_idx];
@@ -260,7 +248,7 @@ void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock)
         if (data->thread_qps[i].lock == lock) {
             data->thread_qps[i].depth--;
             if (data->thread_qps[i].depth == 0) {
-                ret = InterlockedAdd64(&data->thread_qps[i].qp->users, -VAL_READER);
+                ret = InterlockedAdd64(&data->thread_qps[i].qp->users, (LONG64)-1);
                 OPENSSL_assert(ret >= 0);
                 data->thread_qps[i].qp = NULL;
                 data->thread_qps[i].lock = NULL;
@@ -270,9 +258,8 @@ void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock)
     }
 }
 
-static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock)
+static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock, uint32_t *curr_id)
 {
-    uint64_t new_id;
     uint32_t current_idx;
     uint32_t tmp;
 
@@ -294,13 +281,9 @@ static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock)
         (lock->current_alloc_idx + 1) % lock->group_count;
 
     /* get and insert a new id */
-    new_id = lock->id_ctr;
+    *curr_id = lock->id_ctr;
     lock->id_ctr++;
 
-    new_id = VAL_ID(new_id);
-    InterlockedAnd64(&lock->qp_group[current_idx].users, ID_MASK);
-    InterlockedAdd64(&lock->qp_group[current_idx].users, new_id);
-
     /* update the reader index to be the prior qp */
     tmp = lock->current_alloc_idx;
     InterlockedExchange(&lock->reader_idx, tmp);
@@ -325,27 +308,28 @@ void ossl_synchronize_rcu(CRYPTO_RCU_LOCK *lock)
 {
     struct rcu_qp *qp;
     uint64_t count;
+    uint32_t curr_id;
     struct rcu_cb_item *cb_items, *tmpcb;
 
     /* before we do anything else, lets grab the cb list */
     cb_items = InterlockedExchangePointer((void * volatile *)&lock->cb_items, NULL);
 
-    qp = update_qp(lock);
-
-    /* wait for the reader count to reach zero */
-    do {
-        count = InterlockedOr64(&qp->users, 0);
-    } while (READER_COUNT(count) != 0);
+    qp = update_qp(lock, &curr_id);
 
     /* retire in order */
     ossl_crypto_mutex_lock(lock->prior_lock);
-    while (lock->next_to_retire != ID_VAL(count))
+    while (lock->next_to_retire != curr_id)
         ossl_crypto_condvar_wait(lock->prior_signal, lock->prior_lock);
 
     lock->next_to_retire++;
     ossl_crypto_condvar_broadcast(lock->prior_signal);
     ossl_crypto_mutex_unlock(lock->prior_lock);
 
+    /* wait for the reader count to reach zero */
+    do {
+        count = InterlockedOr64(&qp->users, 0);
+    } while (count != (uint64_t)0);
+
     retire_qp(lock, qp);
 
     /* handle any callbacks that we have */