# define ATOMIC_OR_FETCH(p, v, o) fallback_atomic_or_fetch(p, v)
# endif
-/*
- * users is broken up into 2 parts
- * bits 0-15 current readers
- * bit 32-63 ID
- */
-# define READER_SHIFT 0
-# define ID_SHIFT 32
-/* TODO: READER_SIZE 32 in threads_win.c */
-# define READER_SIZE 16
-# define ID_SIZE 32
-
-# define READER_MASK (((uint64_t)1 << READER_SIZE) - 1)
-# define ID_MASK (((uint64_t)1 << ID_SIZE) - 1)
-# define READER_COUNT(x) ((uint32_t)(((uint64_t)(x) >> READER_SHIFT) & \
- READER_MASK))
-# define ID_VAL(x) ((uint32_t)(((uint64_t)(x) >> ID_SHIFT) & ID_MASK))
-# define VAL_READER ((uint64_t)1 << READER_SHIFT)
-# define VAL_ID(x) ((uint64_t)x << ID_SHIFT)
-
/*
* This is the core of an rcu lock. It tracks the readers and writers for the
* current quiescence point for a given lock. Users is the 64 bit value that
*/
qp_idx = ATOMIC_LOAD_N(uint32_t, &lock->reader_idx, __ATOMIC_ACQUIRE);
- /*
- * Notes of use of __ATOMIC_RELEASE
- * This counter is only read by the write side of the lock, and so we
- * specify __ATOMIC_RELEASE here to ensure that the write side of the
- * lock see this during the spin loop read of users, as it waits for the
- * reader count to approach zero
- */
- ATOMIC_ADD_FETCH(&lock->qp_group[qp_idx].users, VAL_READER,
- __ATOMIC_RELEASE);
+ ATOMIC_ADD_FETCH(&lock->qp_group[qp_idx].users, (uint64_t)1,
+ __ATOMIC_ACQUIRE);
/* if the idx hasn't changed, we're good, else try again */
- if (qp_idx == ATOMIC_LOAD_N(uint32_t, &lock->reader_idx, __ATOMIC_ACQUIRE))
+ if (qp_idx == ATOMIC_LOAD_N(uint32_t, &lock->reader_idx,
+ __ATOMIC_RELAXED))
break;
- /*
- * Notes on use of __ATOMIC_RELEASE
- * As with the add above, we want to ensure that this decrement is
- * seen by the write side of the lock as soon as it happens to prevent
- * undue spinning waiting for write side completion
- */
- ATOMIC_SUB_FETCH(&lock->qp_group[qp_idx].users, VAL_READER,
- __ATOMIC_RELEASE);
+ ATOMIC_SUB_FETCH(&lock->qp_group[qp_idx].users, (uint64_t)1,
+ __ATOMIC_RELAXED);
}
return &lock->qp_group[qp_idx];
for (i = 0; i < MAX_QPS; i++) {
if (data->thread_qps[i].lock == lock) {
/*
- * As with read side acquisition, we use __ATOMIC_RELEASE here
- * to ensure that the decrement is published immediately
- * to any write side waiters
+ * we have to use __ATOMIC_RELEASE here
+ * to ensure that all preceding read instructions complete
+ * before the decrement is visible to ossl_synchronize_rcu
*/
data->thread_qps[i].depth--;
if (data->thread_qps[i].depth == 0) {
- ret = ATOMIC_SUB_FETCH(&data->thread_qps[i].qp->users, VAL_READER,
- __ATOMIC_RELEASE);
+ ret = ATOMIC_SUB_FETCH(&data->thread_qps[i].qp->users,
+ (uint64_t)1, __ATOMIC_RELEASE);
OPENSSL_assert(ret != UINT64_MAX);
data->thread_qps[i].qp = NULL;
data->thread_qps[i].lock = NULL;
* Write side allocation routine to get the current qp
* and replace it with a new one
*/
-static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock)
+static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock, uint32_t *curr_id)
{
- uint64_t new_id;
uint32_t current_idx;
pthread_mutex_lock(&lock->alloc_lock);
lock->current_alloc_idx =
(lock->current_alloc_idx + 1) % lock->group_count;
- /* get and insert a new id */
- new_id = VAL_ID(lock->id_ctr);
+ *curr_id = lock->id_ctr;
lock->id_ctr++;
- /*
- * Even though we are under a write side lock here
- * We need to use atomic instructions to ensure that the results
- * of this update are published to the read side prior to updating the
- * reader idx below
- */
- ATOMIC_AND_FETCH(&lock->qp_group[current_idx].users, ID_MASK,
- __ATOMIC_RELEASE);
- ATOMIC_OR_FETCH(&lock->qp_group[current_idx].users, new_id,
- __ATOMIC_RELEASE);
-
- /*
- * Update the reader index to be the prior qp.
- * Note the use of __ATOMIC_RELEASE here is based on the corresponding use
- * of __ATOMIC_ACQUIRE in get_hold_current_qp, as we want any publication
- * of this value to be seen on the read side immediately after it happens
- */
ATOMIC_STORE_N(uint32_t, &lock->reader_idx, lock->current_alloc_idx,
- __ATOMIC_RELEASE);
+ __ATOMIC_RELAXED);
/* wake up any waiters */
pthread_cond_signal(&lock->alloc_signal);
{
struct rcu_qp *qp;
uint64_t count;
+ uint32_t curr_id;
struct rcu_cb_item *cb_items, *tmpcb;
pthread_mutex_lock(&lock->write_lock);
lock->cb_items = NULL;
pthread_mutex_unlock(&lock->write_lock);
- qp = update_qp(lock);
+ qp = update_qp(lock, &curr_id);
+
+ /* retire in order */
+ pthread_mutex_lock(&lock->prior_lock);
+ while (lock->next_to_retire != curr_id)
+ pthread_cond_wait(&lock->prior_signal, &lock->prior_lock);
+ lock->next_to_retire++;
+ pthread_cond_broadcast(&lock->prior_signal);
+ pthread_mutex_unlock(&lock->prior_lock);
/*
* wait for the reader count to reach zero
* Note the use of __ATOMIC_ACQUIRE here to ensure that any
- * prior __ATOMIC_RELEASE write operation in get_hold_current_qp
+ * prior __ATOMIC_RELEASE write operation in ossl_rcu_read_unlock
* is visible prior to our read
+ * however this is likely just necessary to silence a tsan warning
*/
do {
count = ATOMIC_LOAD_N(uint64_t, &qp->users, __ATOMIC_ACQUIRE);
- } while (READER_COUNT(count) != 0);
-
- /* retire in order */
- pthread_mutex_lock(&lock->prior_lock);
- while (lock->next_to_retire != ID_VAL(count))
- pthread_cond_wait(&lock->prior_signal, &lock->prior_lock);
- lock->next_to_retire++;
- pthread_cond_broadcast(&lock->prior_signal);
- pthread_mutex_unlock(&lock->prior_lock);
+ } while (count != (uint64_t)0);
retire_qp(lock, qp);
} CRYPTO_win_rwlock;
# endif
-/*
- * users is broken up into 2 parts
- * bits 0-31 current readers
- * bit 32-63 ID
- */
-# define READER_SHIFT 0
-# define ID_SHIFT 32
-/* TODO: READER_SIZE 16 in threads_pthread.c */
-# define READER_SIZE 32
-# define ID_SIZE 32
-
-# define READER_MASK (((uint64_t)1 << READER_SIZE) - 1)
-# define ID_MASK (((uint64_t)1 << ID_SIZE) - 1)
-# define READER_COUNT(x) ((uint32_t)(((uint64_t)(x) >> READER_SHIFT) & \
- READER_MASK))
-# define ID_VAL(x) ((uint32_t)(((uint64_t)(x) >> ID_SHIFT) & ID_MASK))
-# define VAL_READER ((int64_t)1 << READER_SHIFT)
-# define VAL_ID(x) ((uint64_t)x << ID_SHIFT)
-
/*
* This defines a quescent point (qp)
* This is the barrier beyond which a writer
for (;;) {
CRYPTO_atomic_load_int((int *)&lock->reader_idx, (int *)&qp_idx,
lock->rw_lock);
- CRYPTO_atomic_add64(&lock->qp_group[qp_idx].users, VAL_READER, &tmp64,
+ CRYPTO_atomic_add64(&lock->qp_group[qp_idx].users, (uint64_t)1, &tmp64,
lock->rw_lock);
CRYPTO_atomic_load_int((int *)&lock->reader_idx, (int *)&tmp,
lock->rw_lock);
if (qp_idx == tmp)
break;
- CRYPTO_atomic_add64(&lock->qp_group[qp_idx].users, -VAL_READER, &tmp64,
+ CRYPTO_atomic_add64(&lock->qp_group[qp_idx].users, (uint64_t)-1, &tmp64,
lock->rw_lock);
}
data->thread_qps[i].depth--;
if (data->thread_qps[i].depth == 0) {
CRYPTO_atomic_add64(&data->thread_qps[i].qp->users,
- -VAL_READER, (uint64_t *)&ret,
+ (uint64_t)-1, (uint64_t *)&ret,
lock->rw_lock);
OPENSSL_assert(ret >= 0);
data->thread_qps[i].qp = NULL;
* Write side allocation routine to get the current qp
* and replace it with a new one
*/
-static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock)
+static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock, uint32_t *curr_id)
{
- uint64_t new_id;
uint32_t current_idx;
uint32_t tmp;
- uint64_t tmp64;
ossl_crypto_mutex_lock(lock->alloc_lock);
/*
(lock->current_alloc_idx + 1) % lock->group_count;
/* get and insert a new id */
- new_id = VAL_ID(lock->id_ctr);
+ *curr_id = lock->id_ctr;
lock->id_ctr++;
- /*
- * Even though we are under a write side lock here
- * We need to use atomic instructions to ensure that the results
- * of this update are published to the read side prior to updating the
- * reader idx below
- */
- CRYPTO_atomic_and(&lock->qp_group[current_idx].users, ID_MASK, &tmp64,
- lock->rw_lock);
- CRYPTO_atomic_add64(&lock->qp_group[current_idx].users, new_id, &tmp64,
- lock->rw_lock);
-
/* update the reader index to be the prior qp */
tmp = lock->current_alloc_idx;
InterlockedExchange((LONG volatile *)&lock->reader_idx, tmp);
{
struct rcu_qp *qp;
uint64_t count;
+ uint32_t curr_id;
struct rcu_cb_item *cb_items, *tmpcb;
/* before we do anything else, lets grab the cb list */
cb_items = InterlockedExchangePointer((void * volatile *)&lock->cb_items, NULL);
- qp = update_qp(lock);
-
- /* wait for the reader count to reach zero */
- do {
- CRYPTO_atomic_load(&qp->users, &count, lock->rw_lock);
- } while (READER_COUNT(count) != 0);
+ qp = update_qp(lock, &curr_id);
/* retire in order */
ossl_crypto_mutex_lock(lock->prior_lock);
- while (lock->next_to_retire != ID_VAL(count))
+ while (lock->next_to_retire != curr_id)
ossl_crypto_condvar_wait(lock->prior_signal, lock->prior_lock);
lock->next_to_retire++;
ossl_crypto_condvar_broadcast(lock->prior_signal);
ossl_crypto_mutex_unlock(lock->prior_lock);
+ /* wait for the reader count to reach zero */
+ do {
+ CRYPTO_atomic_load(&qp->users, &count, lock->rw_lock);
+ } while (count != (uint64_t)0);
+
retire_qp(lock, qp);
/* handle any callbacks that we have */