]> git.ipfire.org Git - thirdparty/openssl.git/commitdiff
Rework and simplify RCU code
authorBernd Edlinger <bernd.edlinger@hotmail.de>
Sun, 9 Feb 2025 16:24:43 +0000 (17:24 +0100)
committerTomas Mraz <tomas@openssl.org>
Thu, 13 Feb 2025 19:23:48 +0000 (20:23 +0100)
Use __ATOMIC_RELAXED where possible.
Dont store additional values in the users field.

Reviewed-by: Neil Horman <nhorman@openssl.org>
Reviewed-by: Tomas Mraz <tomas@openssl.org>
(Merged from https://github.com/openssl/openssl/pull/26690)

crypto/threads_pthread.c
crypto/threads_win.c

index 4e8df06207bb1273df704efe9961472854b44e30..116603652a15b1b78dd3f6e0bab2a41baa4b156f 100644 (file)
@@ -270,25 +270,6 @@ static ossl_inline uint64_t fallback_atomic_or_fetch(uint64_t *p, uint64_t m)
 #  define ATOMIC_OR_FETCH(p, v, o) fallback_atomic_or_fetch(p, v)
 # endif
 
-/*
- * users is broken up into 2 parts
- * bits 0-15 current readers
- * bit 32-63 ID
- */
-# define READER_SHIFT 0
-# define ID_SHIFT 32
-/* TODO: READER_SIZE 32 in threads_win.c */
-# define READER_SIZE 16
-# define ID_SIZE 32
-
-# define READER_MASK     (((uint64_t)1 << READER_SIZE) - 1)
-# define ID_MASK         (((uint64_t)1 << ID_SIZE) - 1)
-# define READER_COUNT(x) ((uint32_t)(((uint64_t)(x) >> READER_SHIFT) & \
-                                     READER_MASK))
-# define ID_VAL(x)       ((uint32_t)(((uint64_t)(x) >> ID_SHIFT) & ID_MASK))
-# define VAL_READER      ((uint64_t)1 << READER_SHIFT)
-# define VAL_ID(x)       ((uint64_t)x << ID_SHIFT)
-
 /*
  * This is the core of an rcu lock. It tracks the readers and writers for the
  * current quiescence point for a given lock. Users is the 64 bit value that
@@ -388,29 +369,16 @@ static struct rcu_qp *get_hold_current_qp(struct rcu_lock_st *lock)
          */
         qp_idx = ATOMIC_LOAD_N(uint32_t, &lock->reader_idx, __ATOMIC_ACQUIRE);
 
-        /*
-         * Notes of use of __ATOMIC_RELEASE
-         * This counter is only read by the write side of the lock, and so we
-         * specify __ATOMIC_RELEASE here to ensure that the write side of the
-         * lock see this during the spin loop read of users, as it waits for the
-         * reader count to approach zero
-         */
-        ATOMIC_ADD_FETCH(&lock->qp_group[qp_idx].users, VAL_READER,
-                         __ATOMIC_RELEASE);
+        ATOMIC_ADD_FETCH(&lock->qp_group[qp_idx].users, (uint64_t)1,
+                         __ATOMIC_ACQUIRE);
 
         /* if the idx hasn't changed, we're good, else try again */
         if (qp_idx == ATOMIC_LOAD_N(uint32_t, &lock->reader_idx,
-                                    __ATOMIC_ACQUIRE))
+                                    __ATOMIC_RELAXED))
             break;
 
-        /*
-         * Notes on use of __ATOMIC_RELEASE
-         * As with the add above, we want to ensure that this decrement is
-         * seen by the write side of the lock as soon as it happens to prevent
-         * undue spinning waiting for write side completion
-         */
-        ATOMIC_SUB_FETCH(&lock->qp_group[qp_idx].users, VAL_READER,
-                         __ATOMIC_RELEASE);
+        ATOMIC_SUB_FETCH(&lock->qp_group[qp_idx].users, (uint64_t)1,
+                         __ATOMIC_RELAXED);
     }
 
     return &lock->qp_group[qp_idx];
@@ -477,14 +445,14 @@ void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock)
     for (i = 0; i < MAX_QPS; i++) {
         if (data->thread_qps[i].lock == lock) {
             /*
-             * As with read side acquisition, we use __ATOMIC_RELEASE here
-             * to ensure that the decrement is published immediately
-             * to any write side waiters
+             * we have to use __ATOMIC_RELEASE here
+             * to ensure that all preceding read instructions complete
+             * before the decrement is visible to ossl_synchronize_rcu
              */
             data->thread_qps[i].depth--;
             if (data->thread_qps[i].depth == 0) {
                 ret = ATOMIC_SUB_FETCH(&data->thread_qps[i].qp->users,
-                                       VAL_READER, __ATOMIC_RELEASE);
+                                       (uint64_t)1, __ATOMIC_RELEASE);
                 OPENSSL_assert(ret != UINT64_MAX);
                 data->thread_qps[i].qp = NULL;
                 data->thread_qps[i].lock = NULL;
@@ -503,9 +471,8 @@ void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock)
  * Write side allocation routine to get the current qp
  * and replace it with a new one
  */
-static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock)
+static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock, uint32_t *curr_id)
 {
-    uint64_t new_id;
     uint32_t current_idx;
 
     pthread_mutex_lock(&lock->alloc_lock);
@@ -528,29 +495,11 @@ static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock)
     lock->current_alloc_idx =
         (lock->current_alloc_idx + 1) % lock->group_count;
 
-    /* get and insert a new id */
-    new_id = VAL_ID(lock->id_ctr);
+    *curr_id = lock->id_ctr;
     lock->id_ctr++;
 
-    /*
-     * Even though we are under a write side lock here
-     * We need to use atomic instructions to ensure that the results
-     * of this update are published to the read side prior to updating the
-     * reader idx below
-     */
-    ATOMIC_AND_FETCH(&lock->qp_group[current_idx].users, ID_MASK,
-                     __ATOMIC_RELEASE);
-    ATOMIC_OR_FETCH(&lock->qp_group[current_idx].users, new_id,
-                    __ATOMIC_RELEASE);
-
-    /*
-     * Update the reader index to be the prior qp.
-     * Note the use of __ATOMIC_RELEASE here is based on the corresponding use
-     * of __ATOMIC_ACQUIRE in get_hold_current_qp, as we want any publication
-     * of this value to be seen on the read side immediately after it happens
-     */
     ATOMIC_STORE_N(uint32_t, &lock->reader_idx, lock->current_alloc_idx,
-                   __ATOMIC_RELEASE);
+                   __ATOMIC_RELAXED);
 
     /* wake up any waiters */
     pthread_cond_signal(&lock->alloc_signal);
@@ -594,6 +543,7 @@ void ossl_synchronize_rcu(CRYPTO_RCU_LOCK *lock)
 {
     struct rcu_qp *qp;
     uint64_t count;
+    uint32_t curr_id;
     struct rcu_cb_item *cb_items, *tmpcb;
 
     pthread_mutex_lock(&lock->write_lock);
@@ -601,25 +551,26 @@ void ossl_synchronize_rcu(CRYPTO_RCU_LOCK *lock)
     lock->cb_items = NULL;
     pthread_mutex_unlock(&lock->write_lock);
 
-    qp = update_qp(lock);
+    qp = update_qp(lock, &curr_id);
+
+    /* retire in order */
+    pthread_mutex_lock(&lock->prior_lock);
+    while (lock->next_to_retire != curr_id)
+        pthread_cond_wait(&lock->prior_signal, &lock->prior_lock);
+    lock->next_to_retire++;
+    pthread_cond_broadcast(&lock->prior_signal);
+    pthread_mutex_unlock(&lock->prior_lock);
 
     /*
      * wait for the reader count to reach zero
      * Note the use of __ATOMIC_ACQUIRE here to ensure that any
-     * prior __ATOMIC_RELEASE write operation in get_hold_current_qp
+     * prior __ATOMIC_RELEASE write operation in ossl_rcu_read_unlock
      * is visible prior to our read
+     * however this is likely just necessary to silence a tsan warning
      */
     do {
         count = ATOMIC_LOAD_N(uint64_t, &qp->users, __ATOMIC_ACQUIRE);
-    } while (READER_COUNT(count) != 0);
-
-    /* retire in order */
-    pthread_mutex_lock(&lock->prior_lock);
-    while (lock->next_to_retire != ID_VAL(count))
-        pthread_cond_wait(&lock->prior_signal, &lock->prior_lock);
-    lock->next_to_retire++;
-    pthread_cond_broadcast(&lock->prior_signal);
-    pthread_mutex_unlock(&lock->prior_lock);
+    } while (count != (uint64_t)0);
 
     retire_qp(lock, qp);
 
index 03a22fd2c92a66db7984f0a13e0ddfaa0f9d5c03..258b7a59c7746a2ac1c106a0a20a4e8be962c33c 100644 (file)
@@ -43,25 +43,6 @@ typedef struct {
 } CRYPTO_win_rwlock;
 # endif
 
-/*
- * users is broken up into 2 parts
- * bits 0-31 current readers
- * bit 32-63 ID
- */
-# define READER_SHIFT 0
-# define ID_SHIFT 32
-/* TODO: READER_SIZE 16 in threads_pthread.c */
-# define READER_SIZE 32
-# define ID_SIZE 32
-
-# define READER_MASK     (((uint64_t)1 << READER_SIZE) - 1)
-# define ID_MASK         (((uint64_t)1 << ID_SIZE) - 1)
-# define READER_COUNT(x) ((uint32_t)(((uint64_t)(x) >> READER_SHIFT) & \
-                                     READER_MASK))
-# define ID_VAL(x)       ((uint32_t)(((uint64_t)(x) >> ID_SHIFT) & ID_MASK))
-# define VAL_READER      ((int64_t)1 << READER_SHIFT)
-# define VAL_ID(x)       ((uint64_t)x << ID_SHIFT)
-
 /*
  * This defines a quescent point (qp)
  * This is the barrier beyond which a writer
@@ -229,13 +210,13 @@ static ossl_inline struct rcu_qp *get_hold_current_qp(CRYPTO_RCU_LOCK *lock)
     for (;;) {
         CRYPTO_atomic_load_int((int *)&lock->reader_idx, (int *)&qp_idx,
                                lock->rw_lock);
-        CRYPTO_atomic_add64(&lock->qp_group[qp_idx].users, VAL_READER, &tmp64,
+        CRYPTO_atomic_add64(&lock->qp_group[qp_idx].users, (uint64_t)1, &tmp64,
                             lock->rw_lock);
         CRYPTO_atomic_load_int((int *)&lock->reader_idx, (int *)&tmp,
                                lock->rw_lock);
         if (qp_idx == tmp)
             break;
-        CRYPTO_atomic_add64(&lock->qp_group[qp_idx].users, -VAL_READER, &tmp64,
+        CRYPTO_atomic_add64(&lock->qp_group[qp_idx].users, (uint64_t)-1, &tmp64,
                             lock->rw_lock);
     }
 
@@ -313,7 +294,7 @@ void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock)
             data->thread_qps[i].depth--;
             if (data->thread_qps[i].depth == 0) {
                 CRYPTO_atomic_add64(&data->thread_qps[i].qp->users,
-                                    -VAL_READER, (uint64_t *)&ret,
+                                    (uint64_t)-1, (uint64_t *)&ret,
                                     lock->rw_lock);
                 OPENSSL_assert(ret >= 0);
                 data->thread_qps[i].qp = NULL;
@@ -328,12 +309,10 @@ void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock)
  * Write side allocation routine to get the current qp
  * and replace it with a new one
  */
-static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock)
+static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock, uint32_t *curr_id)
 {
-    uint64_t new_id;
     uint32_t current_idx;
     uint32_t tmp;
-    uint64_t tmp64;
 
     ossl_crypto_mutex_lock(lock->alloc_lock);
     /*
@@ -355,20 +334,9 @@ static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock)
         (lock->current_alloc_idx + 1) % lock->group_count;
 
     /* get and insert a new id */
-    new_id = VAL_ID(lock->id_ctr);
+    *curr_id = lock->id_ctr;
     lock->id_ctr++;
 
-    /*
-     * Even though we are under a write side lock here
-     * We need to use atomic instructions to ensure that the results
-     * of this update are published to the read side prior to updating the
-     * reader idx below
-     */
-    CRYPTO_atomic_and(&lock->qp_group[current_idx].users, ID_MASK, &tmp64,
-                      lock->rw_lock);
-    CRYPTO_atomic_add64(&lock->qp_group[current_idx].users, new_id, &tmp64,
-                        lock->rw_lock);
-
     /* update the reader index to be the prior qp */
     tmp = lock->current_alloc_idx;
     InterlockedExchange((LONG volatile *)&lock->reader_idx, tmp);
@@ -393,28 +361,29 @@ void ossl_synchronize_rcu(CRYPTO_RCU_LOCK *lock)
 {
     struct rcu_qp *qp;
     uint64_t count;
+    uint32_t curr_id;
     struct rcu_cb_item *cb_items, *tmpcb;
 
     /* before we do anything else, lets grab the cb list */
     cb_items = InterlockedExchangePointer((void * volatile *)&lock->cb_items,
                                           NULL);
 
-    qp = update_qp(lock);
-
-    /* wait for the reader count to reach zero */
-    do {
-        CRYPTO_atomic_load(&qp->users, &count, lock->rw_lock);
-    } while (READER_COUNT(count) != 0);
+    qp = update_qp(lock, &curr_id);
 
     /* retire in order */
     ossl_crypto_mutex_lock(lock->prior_lock);
-    while (lock->next_to_retire != ID_VAL(count))
+    while (lock->next_to_retire != curr_id)
         ossl_crypto_condvar_wait(lock->prior_signal, lock->prior_lock);
 
     lock->next_to_retire++;
     ossl_crypto_condvar_broadcast(lock->prior_signal);
     ossl_crypto_mutex_unlock(lock->prior_lock);
 
+    /* wait for the reader count to reach zero */
+    do {
+        CRYPTO_atomic_load(&qp->users, &count, lock->rw_lock);
+    } while (count != (uint64_t)0);
+
     retire_qp(lock, qp);
 
     /* handle any callbacks that we have */