# define ATOMIC_OR_FETCH(p, v, o) fallback_atomic_or_fetch(p, v)
# endif
-/*
- * users is broken up into 2 parts
- * bits 0-15 current readers
- * bit 32-63 - ID
- */
-# define READER_SHIFT 0
-# define ID_SHIFT 32
-# define READER_SIZE 16
-# define ID_SIZE 32
-
-# define READER_MASK (((uint64_t)1 << READER_SIZE) - 1)
-# define ID_MASK (((uint64_t)1 << ID_SIZE) - 1)
-# define READER_COUNT(x) (((uint64_t)(x) >> READER_SHIFT) & READER_MASK)
-# define ID_VAL(x) (((uint64_t)(x) >> ID_SHIFT) & ID_MASK)
-# define VAL_READER ((uint64_t)1 << READER_SHIFT)
-# define VAL_ID(x) ((uint64_t)x << ID_SHIFT)
-
/*
* This is the core of an rcu lock. It tracks the readers and writers for the
* current quiescence point for a given lock. Users is the 64 bit value that
*/
qp_idx = ATOMIC_LOAD_N(uint64_t, &lock->reader_idx, __ATOMIC_ACQUIRE);
- /*
- * Notes of use of __ATOMIC_RELEASE
- * This counter is only read by the write side of the lock, and so we
- * specify __ATOMIC_RELEASE here to ensure that the write side of the
- * lock see this during the spin loop read of users, as it waits for the
- * reader count to approach zero
- */
- ATOMIC_ADD_FETCH(&lock->qp_group[qp_idx].users, VAL_READER,
- __ATOMIC_RELEASE);
+ ATOMIC_ADD_FETCH(&lock->qp_group[qp_idx].users, (uint64_t)1,
+ __ATOMIC_ACQUIRE);
/* if the idx hasn't changed, we're good, else try again */
- if (qp_idx == ATOMIC_LOAD_N(uint64_t, &lock->reader_idx, __ATOMIC_ACQUIRE))
+ if (qp_idx == ATOMIC_LOAD_N(uint64_t, &lock->reader_idx,
+ __ATOMIC_RELAXED))
break;
- /*
- * Notes on use of __ATOMIC_RELEASE
- * As with the add above, we want to ensure that this decrement is
- * seen by the write side of the lock as soon as it happens to prevent
- * undue spinning waiting for write side completion
- */
- ATOMIC_SUB_FETCH(&lock->qp_group[qp_idx].users, VAL_READER,
- __ATOMIC_RELEASE);
+ ATOMIC_SUB_FETCH(&lock->qp_group[qp_idx].users, (uint64_t)1,
+ __ATOMIC_RELAXED);
}
return &lock->qp_group[qp_idx];
for (i = 0; i < MAX_QPS; i++) {
if (data->thread_qps[i].lock == lock) {
/*
- * As with read side acquisition, we use __ATOMIC_RELEASE here
- * to ensure that the decrement is published immediately
- * to any write side waiters
+ * we have to use __ATOMIC_RELEASE here
+ * to ensure that all preceding read instructions complete
+ * before the decrement is visible to ossl_synchronize_rcu
*/
data->thread_qps[i].depth--;
if (data->thread_qps[i].depth == 0) {
- ret = ATOMIC_SUB_FETCH(&data->thread_qps[i].qp->users, VAL_READER,
- __ATOMIC_RELEASE);
+ ret = ATOMIC_SUB_FETCH(&data->thread_qps[i].qp->users,
+ (uint64_t)1, __ATOMIC_RELEASE);
OPENSSL_assert(ret != UINT64_MAX);
data->thread_qps[i].qp = NULL;
data->thread_qps[i].lock = NULL;
* Write side allocation routine to get the current qp
* and replace it with a new one
*/
-static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock)
+static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock, uint32_t *curr_id)
{
- uint64_t new_id;
uint64_t current_idx;
pthread_mutex_lock(&lock->alloc_lock);
lock->current_alloc_idx =
(lock->current_alloc_idx + 1) % lock->group_count;
- /* get and insert a new id */
- new_id = lock->id_ctr;
+ *curr_id = lock->id_ctr;
lock->id_ctr++;
- new_id = VAL_ID(new_id);
- /*
- * Even though we are under a write side lock here
- * We need to use atomic instructions to ensure that the results
- * of this update are published to the read side prior to updating the
- * reader idx below
- */
- ATOMIC_AND_FETCH(&lock->qp_group[current_idx].users, ID_MASK,
- __ATOMIC_RELEASE);
- ATOMIC_OR_FETCH(&lock->qp_group[current_idx].users, new_id,
- __ATOMIC_RELEASE);
-
- /*
- * Update the reader index to be the prior qp.
- * Note the use of __ATOMIC_RELEASE here is based on the corresponding use
- * of __ATOMIC_ACQUIRE in get_hold_current_qp, as we want any publication
- * of this value to be seen on the read side immediately after it happens
- */
ATOMIC_STORE_N(uint64_t, &lock->reader_idx, lock->current_alloc_idx,
- __ATOMIC_RELEASE);
+ __ATOMIC_RELAXED);
/* wake up any waiters */
pthread_cond_signal(&lock->alloc_signal);
{
struct rcu_qp *qp;
uint64_t count;
+ uint32_t curr_id;
struct rcu_cb_item *cb_items, *tmpcb;
/*
cb_items = ATOMIC_EXCHANGE_N(prcu_cb_item, &lock->cb_items, NULL,
__ATOMIC_ACQ_REL);
- qp = update_qp(lock);
+ qp = update_qp(lock, &curr_id);
+
+ /* retire in order */
+ pthread_mutex_lock(&lock->prior_lock);
+ while (lock->next_to_retire != curr_id)
+ pthread_cond_wait(&lock->prior_signal, &lock->prior_lock);
+ lock->next_to_retire++;
+ pthread_cond_broadcast(&lock->prior_signal);
+ pthread_mutex_unlock(&lock->prior_lock);
/*
* wait for the reader count to reach zero
* Note the use of __ATOMIC_ACQUIRE here to ensure that any
- * prior __ATOMIC_RELEASE write operation in get_hold_current_qp
+ * prior __ATOMIC_RELEASE write operation in ossl_rcu_read_unlock
* is visible prior to our read
+ * however this is likely just necessary to silence a tsan warning
*/
do {
count = ATOMIC_LOAD_N(uint64_t, &qp->users, __ATOMIC_ACQUIRE);
- } while (READER_COUNT(count) != 0);
-
- /* retire in order */
- pthread_mutex_lock(&lock->prior_lock);
- while (lock->next_to_retire != ID_VAL(count))
- pthread_cond_wait(&lock->prior_signal, &lock->prior_lock);
- lock->next_to_retire++;
- pthread_cond_broadcast(&lock->prior_signal);
- pthread_mutex_unlock(&lock->prior_lock);
+ } while (count != (uint64_t)0);
retire_qp(lock, qp);
} CRYPTO_win_rwlock;
# endif
-# define READER_SHIFT 0
-# define ID_SHIFT 32
-# define READER_SIZE 32
-# define ID_SIZE 32
-
-# define READER_MASK (((LONG64)1 << READER_SIZE)-1)
-# define ID_MASK (((LONG64)1 << ID_SIZE)-1)
-# define READER_COUNT(x) (((LONG64)(x) >> READER_SHIFT) & READER_MASK)
-# define ID_VAL(x) (((LONG64)(x) >> ID_SHIFT) & ID_MASK)
-# define VAL_READER ((LONG64)1 << READER_SHIFT)
-# define VAL_ID(x) ((LONG64)x << ID_SHIFT)
-
/*
* This defines a quescent point (qp)
* This is the barrier beyond which a writer
/* get the current qp index */
for (;;) {
qp_idx = InterlockedOr(&lock->reader_idx, 0);
- InterlockedAdd64(&lock->qp_group[qp_idx].users, VAL_READER);
+ InterlockedAdd64(&lock->qp_group[qp_idx].users, (LONG64)1);
if (qp_idx == InterlockedOr(&lock->reader_idx, 0))
break;
- InterlockedAdd64(&lock->qp_group[qp_idx].users, -VAL_READER);
+ InterlockedAdd64(&lock->qp_group[qp_idx].users, (LONG64)-1);
}
return &lock->qp_group[qp_idx];
if (data->thread_qps[i].lock == lock) {
data->thread_qps[i].depth--;
if (data->thread_qps[i].depth == 0) {
- ret = InterlockedAdd64(&data->thread_qps[i].qp->users, -VAL_READER);
+ ret = InterlockedAdd64(&data->thread_qps[i].qp->users, (LONG64)-1);
OPENSSL_assert(ret >= 0);
data->thread_qps[i].qp = NULL;
data->thread_qps[i].lock = NULL;
}
}
-static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock)
+static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock, uint32_t *curr_id)
{
- uint64_t new_id;
uint32_t current_idx;
uint32_t tmp;
(lock->current_alloc_idx + 1) % lock->group_count;
/* get and insert a new id */
- new_id = lock->id_ctr;
+ *curr_id = lock->id_ctr;
lock->id_ctr++;
- new_id = VAL_ID(new_id);
- InterlockedAnd64(&lock->qp_group[current_idx].users, ID_MASK);
- InterlockedAdd64(&lock->qp_group[current_idx].users, new_id);
-
/* update the reader index to be the prior qp */
tmp = lock->current_alloc_idx;
InterlockedExchange(&lock->reader_idx, tmp);
{
struct rcu_qp *qp;
uint64_t count;
+ uint32_t curr_id;
struct rcu_cb_item *cb_items, *tmpcb;
/* before we do anything else, lets grab the cb list */
cb_items = InterlockedExchangePointer((void * volatile *)&lock->cb_items, NULL);
- qp = update_qp(lock);
-
- /* wait for the reader count to reach zero */
- do {
- count = InterlockedOr64(&qp->users, 0);
- } while (READER_COUNT(count) != 0);
+ qp = update_qp(lock, &curr_id);
/* retire in order */
ossl_crypto_mutex_lock(lock->prior_lock);
- while (lock->next_to_retire != ID_VAL(count))
+ while (lock->next_to_retire != curr_id)
ossl_crypto_condvar_wait(lock->prior_signal, lock->prior_lock);
lock->next_to_retire++;
ossl_crypto_condvar_broadcast(lock->prior_signal);
ossl_crypto_mutex_unlock(lock->prior_lock);
+ /* wait for the reader count to reach zero */
+ do {
+ count = InterlockedOr64(&qp->users, 0);
+ } while (count != (uint64_t)0);
+
retire_qp(lock, qp);
/* handle any callbacks that we have */