From d0e1a0ae701cfaca7f3dd3bf28a3f934a6408813 Mon Sep 17 00:00:00 2001 From: Neil Horman Date: Fri, 12 Jan 2024 10:39:56 -0500 Subject: [PATCH] RCU lock implementation Introduce an RCU lock implementation as an alternative locking mechanism to openssl. The api is documented in the ossl_rcu.pod file Read side implementaiton is comparable to that of RWLOCKS: ossl_rcu_read_lock(lock); < critical section in which data can be accessed via ossl_derefrence > ossl_rcu_read_unlock(lock); Write side implementation is: ossl_rcu_write_lock(lock); < critical section in which data can be updated via ossl_assign_pointer and stale data can optionally be scheduled for removal via ossl_rcu_call > ossl_rcu_write_unlock(lock); ... ossl_synchronize_rcu(lock); ossl_rcu_call fixup Reviewed-by: Hugo Landau Reviewed-by: Matt Caswell (Merged from https://github.com/openssl/openssl/pull/22729) --- crypto/rcu_internal.h | 22 + crypto/threads_none.c | 78 ++++ crypto/threads_pthread.c | 572 +++++++++++++++++++++++- crypto/threads_win.c | 360 ++++++++++++++- doc/internal/man3/ossl_rcu_lock_new.pod | 258 +++++++++++ include/internal/rcu.h | 31 ++ test/threadstest.c | 376 ++++++++++++++++ 7 files changed, 1693 insertions(+), 4 deletions(-) create mode 100644 crypto/rcu_internal.h create mode 100644 doc/internal/man3/ossl_rcu_lock_new.pod create mode 100644 include/internal/rcu.h diff --git a/crypto/rcu_internal.h b/crypto/rcu_internal.h new file mode 100644 index 00000000000..206f6ed5778 --- /dev/null +++ b/crypto/rcu_internal.h @@ -0,0 +1,22 @@ +/* + * Copyright 2023 The OpenSSL Project Authors. All Rights Reserved. + * + * Licensed under the Apache License 2.0 (the "License"). You may not use + * this file except in compliance with the License. You can obtain a copy + * in the file LICENSE in the source distribution or at + * https://www.openssl.org/source/license.html + */ + +#ifndef OPENSSL_RCU_INTERNAL_H +# define OPENSSL_RCU_INTERNAL_H +# pragma once + +struct rcu_qp; + +struct rcu_cb_item { + rcu_cb_fn fn; + void *data; + struct rcu_cb_item *next; +}; + +#endif diff --git a/crypto/threads_none.c b/crypto/threads_none.c index 580e5345d20..eb7b036fce1 100644 --- a/crypto/threads_none.c +++ b/crypto/threads_none.c @@ -9,6 +9,8 @@ #include #include "internal/cryptlib.h" +#include "internal/rcu.h" +#include "rcu_internal.h" #if !defined(OPENSSL_THREADS) || defined(CRYPTO_TDEBUG) @@ -17,6 +19,82 @@ # include # endif +struct rcu_lock_st { + struct rcu_cb_item *cb_items; +}; + +CRYPTO_RCU_LOCK *ossl_rcu_lock_new(int num_writers) +{ + struct rcu_lock_st *lock; + + lock = OPENSSL_zalloc(sizeof(*lock)); + return lock; +} + +void ossl_rcu_lock_free(CRYPTO_RCU_LOCK *lock) +{ + OPENSSL_free(lock); +} + +void ossl_rcu_read_lock(CRYPTO_RCU_LOCK *lock) +{ + return; +} + +void ossl_rcu_write_lock(CRYPTO_RCU_LOCK *lock) +{ + return; +} + +void ossl_rcu_write_unlock(CRYPTO_RCU_LOCK *lock) +{ + return; +} + +void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock) +{ + return; +} + +void ossl_synchronize_rcu(CRYPTO_RCU_LOCK *lock) +{ + struct rcu_cb_item *items = lock->cb_items; + struct rcu_cb_item *tmp; + + lock->cb_items = NULL; + + while (items != NULL) { + tmp = items->next; + items->fn(items->data); + OPENSSL_free(items); + items = tmp; + } +} + +int ossl_rcu_call(CRYPTO_RCU_LOCK *lock, rcu_cb_fn cb, void *data) +{ + struct rcu_cb_item *new = OPENSSL_zalloc(sizeof(*new)); + + if (new == NULL) + return 0; + + new->fn = cb; + new->data = data; + new->next = lock->cb_items; + lock->cb_items = new; + return 1; +} + +void *ossl_rcu_uptr_deref(void **p) +{ + return (void *)*p; +} + +void ossl_rcu_assign_uptr(void **p, void **v) +{ + *(void **)p = *(void **)v; +} + CRYPTO_RWLOCK *CRYPTO_THREAD_lock_new(void) { CRYPTO_RWLOCK *lock; diff --git a/crypto/threads_pthread.c b/crypto/threads_pthread.c index 59ddcdbff83..ad6c259d92a 100644 --- a/crypto/threads_pthread.c +++ b/crypto/threads_pthread.c @@ -11,7 +11,10 @@ #define OPENSSL_SUPPRESS_DEPRECATED #include +#include #include "internal/cryptlib.h" +#include "internal/rcu.h" +#include "rcu_internal.h" #if defined(__sun) # include @@ -42,12 +45,577 @@ # define USE_RWLOCK # endif +# if defined(__GNUC__) && defined(__ATOMIC_ACQUIRE) && !defined(BROKEN_CLANG_ATOMICS) +# define ATOMIC_LOAD_N(p,o) __atomic_load_n(p, o) +# define ATOMIC_STORE_N(p, v, o) __atomic_store_n(p, v, o) +# define ATOMIC_STORE(p, v, o) __atomic_store(p, v, o) +# define ATOMIC_EXCHANGE_N(p, v, o) __atomic_exchange_n(p, v, o) +# define ATOMIC_ADD_FETCH(p, v, o) __atomic_add_fetch(p, v, o) +# define ATOMIC_FETCH_ADD(p, v, o) __atomic_fetch_add(p, v, o) +# define ATOMIC_SUB_FETCH(p, v, o) __atomic_sub_fetch(p, v, o) +# define ATOMIC_AND_FETCH(p, m, o) __atomic_and_fetch(p, m, o) +# define ATOMIC_OR_FETCH(p, m, o) __atomic_or_fetch(p, m, o) +#else +static pthread_mutex_t atomic_sim_lock = PTHREAD_MUTEX_INITIALIZER; + +static inline void *fallback_atomic_load_n(void **p) +{ + void *ret; + + pthread_mutex_lock(&atomic_sim_lock); + ret = *(void **)p; + pthread_mutex_unlock(&atomic_sim_lock); + return ret; +} + +# define ATOMIC_LOAD_N(p, o) fallback_atomic_load_n((void **)p) + +static inline void *fallback_atomic_store_n(void **p, void *v) +{ + void *ret; + + pthread_mutex_lock(&atomic_sim_lock); + ret = *p; + *p = v; + pthread_mutex_unlock(&atomic_sim_lock); + return ret; +} + +# define ATOMIC_STORE_N(p, v, o) fallback_atomic_store_n((void **)p, (void *)v) + +static inline void fallback_atomic_store(void **p, void **v) +{ + void *ret; + + pthread_mutex_lock(&atomic_sim_lock); + ret = *p; + *p = *v; + v = ret; + pthread_mutex_unlock(&atomic_sim_lock); +} + +# define ATOMIC_STORE(p, v, o) fallback_atomic_store((void **)p, (void **)v) + +static inline void *fallback_atomic_exchange_n(void **p, void *v) +{ + void *ret; + + pthread_mutex_lock(&atomic_sim_lock); + ret = *p; + *p = v; + pthread_mutex_unlock(&atomic_sim_lock); + return ret; +} + +#define ATOMIC_EXCHANGE_N(p, v, o) fallback_atomic_exchange_n((void **)p, (void *)v) + +static inline uint64_t fallback_atomic_add_fetch(uint64_t *p, uint64_t v) +{ + uint64_t ret; + + pthread_mutex_lock(&atomic_sim_lock); + *p += v; + ret = *p; + pthread_mutex_unlock(&atomic_sim_lock); + return ret; +} + +# define ATOMIC_ADD_FETCH(p, v, o) fallback_atomic_add_fetch(p, v) + +static inline uint64_t fallback_atomic_fetch_add(uint64_t *p, uint64_t v) +{ + uint64_t ret; + + pthread_mutex_lock(&atomic_sim_lock); + ret = *p; + *p += v; + pthread_mutex_unlock(&atomic_sim_lock); + return ret; +} + +# define ATOMIC_FETCH_ADD(p, v, o) fallback_atomic_fetch_add(p, v) + +static inline uint64_t fallback_atomic_sub_fetch(uint64_t *p, uint64_t v) +{ + uint64_t ret; + + pthread_mutex_lock(&atomic_sim_lock); + *p -= v; + ret = *p; + pthread_mutex_unlock(&atomic_sim_lock); + return ret; +} + +# define ATOMIC_SUB_FETCH(p, v, o) fallback_atomic_sub_fetch(p, v) + +static inline uint64_t fallback_atomic_and_fetch(uint64_t *p, uint64_t m) +{ + uint64_t ret; + + pthread_mutex_lock(&atomic_sim_lock); + *p &= m; + ret = *p; + pthread_mutex_unlock(&atomic_sim_lock); + return ret; +} + +# define ATOMIC_AND_FETCH(p, v, o) fallback_atomic_and_fetch(p, v) + +static inline uint64_t fallback_atomic_or_fetch(uint64_t *p, uint64_t m) +{ + uint64_t ret; + + pthread_mutex_lock(&atomic_sim_lock); + *p |= m; + ret = *p; + pthread_mutex_unlock(&atomic_sim_lock); + return ret; +} + +# define ATOMIC_OR_FETCH(p, v, o) fallback_atomic_or_fetch(p, v) +#endif + +static CRYPTO_THREAD_LOCAL rcu_thr_key; + +/* + * users is broken up into 2 parts + * bits 0-15 current readers + * bit 32-63 - ID + */ +# define READER_SHIFT 0 +# define ID_SHIFT 32 +# define READER_SIZE 16 +# define ID_SIZE 32 + +# define READER_MASK (((uint64_t)1 << READER_SIZE) - 1) +# define ID_MASK (((uint64_t)1 << ID_SIZE) - 1) +# define READER_COUNT(x) (((uint64_t)(x) >> READER_SHIFT) & READER_MASK) +# define ID_VAL(x) (((uint64_t)(x) >> ID_SHIFT) & ID_MASK) +# define VAL_READER ((uint64_t)1 << READER_SHIFT) +# define VAL_ID(x) ((uint64_t)x << ID_SHIFT) + +/* + * This is the core of an rcu lock. It tracks the readers and writers for the + * current quiescence point for a given lock. Users is the 64 bit value that + * stores the READERS/ID as defined above + * + */ +struct rcu_qp { + uint64_t users; +}; + +struct thread_qp { + struct rcu_qp *qp; + unsigned int depth; + CRYPTO_RCU_LOCK *lock; +}; + +#define MAX_QPS 10 +/* + * This is the per thread tracking data + * that is assigned to each thread participating + * in an rcu qp + * + * qp points to the qp that it last acquired + * + */ +struct rcu_thr_data { + struct thread_qp thread_qps[MAX_QPS]; +}; + +/* + * This is the internal version of a CRYPTO_RCU_LOCK + * it is cast from CRYPTO_RCU_LOCK + */ +struct rcu_lock_st { + /* Callbacks to call for next ossl_synchronize_rcu */ + struct rcu_cb_item *cb_items; + + /* rcu generation counter for in-order retirement */ + uint32_t id_ctr; + + /* Array of quiescent points for synchronization */ + struct rcu_qp *qp_group; + + /* Number of elements in qp_group array */ + size_t group_count; + + /* Index of the current qp in the qp_group array */ + uint64_t reader_idx; + + /* value of the next id_ctr value to be retired */ + uint32_t next_to_retire; + + /* index of the next free rcu_qp in the qp_group */ + uint64_t current_alloc_idx; + + /* number of qp's in qp_group array currently being retired */ + uint32_t writers_alloced; + + /* lock protecting write side operations */ + pthread_mutex_t write_lock; + + /* lock protecting updates to writers_alloced/current_alloc_idx */ + pthread_mutex_t alloc_lock; + + /* signal to wake threads waiting on alloc_lock */ + pthread_cond_t alloc_signal; + + /* lock to enforce in-order retirement */ + pthread_mutex_t prior_lock; + + /* signal to wake threads waiting on prior_lock */ + pthread_cond_t prior_signal; +}; + +/* + * Called on thread exit to free the pthread key + * associated with this thread, if any + */ +static void free_rcu_thr_data(void *ptr) +{ + struct rcu_thr_data *data = + (struct rcu_thr_data *)CRYPTO_THREAD_get_local(&rcu_thr_key); + + OPENSSL_free(data); + CRYPTO_THREAD_set_local(&rcu_thr_key, NULL); +} + +static void ossl_rcu_init(void) +{ + CRYPTO_THREAD_init_local(&rcu_thr_key, NULL); +} + +/* Read side acquisition of the current qp */ +static struct rcu_qp *get_hold_current_qp(struct rcu_lock_st *lock) +{ + uint64_t qp_idx; + + /* get the current qp index */ + for (;;) { + /* + * Notes on use of __ATOMIC_ACQUIRE + * We need to ensure the following: + * 1) That subsequent operations aren't optimized by hoisting them above + * this operation. Specifically, we don't want the below re-load of + * qp_idx to get optimized away + * 2) We want to ensure that any updating of reader_idx on the write side + * of the lock is flushed from a local cpu cache so that we see any + * updates prior to the load. This is a non-issue on cache coherent + * systems like x86, but is relevant on other arches + * Note: This applies to the reload below as well + */ + qp_idx = (uint64_t)ATOMIC_LOAD_N(&lock->reader_idx, __ATOMIC_ACQUIRE); + + /* + * Notes of use of __ATOMIC_RELEASE + * This counter is only read by the write side of the lock, and so we + * specify __ATOMIC_RELEASE here to ensure that the write side of the + * lock see this during the spin loop read of users, as it waits for the + * reader count to approach zero + */ + ATOMIC_ADD_FETCH(&lock->qp_group[qp_idx].users, VAL_READER, + __ATOMIC_RELEASE); + + /* if the idx hasn't changed, we're good, else try again */ + if (qp_idx == (uint64_t)ATOMIC_LOAD_N(&lock->reader_idx, __ATOMIC_ACQUIRE)) + break; + + /* + * Notes on use of __ATOMIC_RELEASE + * As with the add above, we want to ensure that this decrement is + * seen by the write side of the lock as soon as it happens to prevent + * undue spinning waiting for write side completion + */ + ATOMIC_SUB_FETCH(&lock->qp_group[qp_idx].users, VAL_READER, + __ATOMIC_RELEASE); + } + + return &lock->qp_group[qp_idx]; +} + +void ossl_rcu_read_lock(CRYPTO_RCU_LOCK *lock) +{ + struct rcu_thr_data *data; + int i, available_qp = -1; + + /* + * we're going to access current_qp here so ask the + * processor to fetch it + */ + data = CRYPTO_THREAD_get_local(&rcu_thr_key); + + if (data == NULL) { + data = OPENSSL_zalloc(sizeof(*data)); + OPENSSL_assert(data != NULL); + CRYPTO_THREAD_set_local(&rcu_thr_key, data); + ossl_init_thread_start(NULL, NULL, free_rcu_thr_data); + } + + for (i = 0; i < MAX_QPS; i++) { + if (data->thread_qps[i].qp == NULL && available_qp == -1) + available_qp = i; + /* If we have a hold on this lock already, we're good */ + if (data->thread_qps[i].lock == lock) { + data->thread_qps[i].depth++; + return; + } + } + + /* + * if we get here, then we don't have a hold on this lock yet + */ + assert(available_qp != -1); + + data->thread_qps[available_qp].qp = get_hold_current_qp(lock); + data->thread_qps[available_qp].depth = 1; + data->thread_qps[available_qp].lock = lock; +} + +void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock) +{ + int i; + struct rcu_thr_data *data = CRYPTO_THREAD_get_local(&rcu_thr_key); + uint64_t ret; + + assert(data != NULL); + + for (i = 0; i < MAX_QPS; i++) { + if (data->thread_qps[i].lock == lock) { + /* + * As with read side acquisition, we use __ATOMIC_RELEASE here + * to ensure that the decrement is published immediately + * to any write side waiters + */ + data->thread_qps[i].depth--; + if (data->thread_qps[i].depth == 0) { + ret = ATOMIC_SUB_FETCH(&data->thread_qps[i].qp->users, VAL_READER, + __ATOMIC_RELEASE); + OPENSSL_assert(ret != UINT64_MAX); + data->thread_qps[i].qp = NULL; + data->thread_qps[i].lock = NULL; + } + return; + } + } + /* + * if we get here, we're trying to unlock a lock that we never acquired + * thats fatal + */ + assert(0); +} + +/* + * Write side allocation routine to get the current qp + * and replace it with a new one + */ +static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock) +{ + uint64_t new_id; + uint64_t current_idx; + + pthread_mutex_lock(&lock->alloc_lock); + + /* + * we need at least one qp to be available with one + * left over, so that readers can start working on + * one that isn't yet being waited on + */ + while (lock->group_count - lock->writers_alloced < 2) + /* we have to wait for one to be free */ + pthread_cond_wait(&lock->alloc_signal, &lock->alloc_lock); + + current_idx = lock->current_alloc_idx; + + /* Allocate the qp */ + lock->writers_alloced++; + + /* increment the allocation index */ + lock->current_alloc_idx = + (lock->current_alloc_idx + 1) % lock->group_count; + + /* get and insert a new id */ + new_id = lock->id_ctr; + lock->id_ctr++; + + new_id = VAL_ID(new_id); + /* + * Even though we are under a write side lock here + * We need to use atomic instructions to ensure that the results + * of this update are published to the read side prior to updating the + * reader idx below + */ + ATOMIC_AND_FETCH(&lock->qp_group[current_idx].users, ID_MASK, + __ATOMIC_RELEASE); + ATOMIC_OR_FETCH(&lock->qp_group[current_idx].users, new_id, + __ATOMIC_RELEASE); + + /* + * update the reader index to be the prior qp + * Note the use of __ATOMIC_RELEASE here is based on the corresponding use + * of __ATOMIC_ACQUIRE in get_hold_current_qp, as we wan't any publication + * of this value to be seen on the read side immediately after it happens + */ + ATOMIC_STORE_N(&lock->reader_idx, lock->current_alloc_idx, + __ATOMIC_RELEASE); + + /* wake up any waiters */ + pthread_cond_signal(&lock->alloc_signal); + pthread_mutex_unlock(&lock->alloc_lock); + return &lock->qp_group[current_idx]; +} + +static void retire_qp(CRYPTO_RCU_LOCK *lock, struct rcu_qp *qp) +{ + pthread_mutex_lock(&lock->alloc_lock); + lock->writers_alloced--; + pthread_cond_signal(&lock->alloc_signal); + pthread_mutex_unlock(&lock->alloc_lock); +} + +static struct rcu_qp *allocate_new_qp_group(CRYPTO_RCU_LOCK *lock, + int count) +{ + struct rcu_qp *new = + OPENSSL_zalloc(sizeof(*new) * count); + + lock->group_count = count; + return new; +} + +void ossl_rcu_write_lock(CRYPTO_RCU_LOCK *lock) +{ + pthread_mutex_lock(&lock->write_lock); +} + +void ossl_rcu_write_unlock(CRYPTO_RCU_LOCK *lock) +{ + pthread_mutex_unlock(&lock->write_lock); +} + +void ossl_synchronize_rcu(CRYPTO_RCU_LOCK *lock) +{ + struct rcu_qp *qp; + uint64_t count; + struct rcu_cb_item *cb_items, *tmpcb; + + /* + * __ATOMIC_ACQ_REL is used here to ensure that we get any prior published + * writes before we read, and publish our write immediately + */ + cb_items = ATOMIC_EXCHANGE_N(&lock->cb_items, NULL, __ATOMIC_ACQ_REL); + + qp = update_qp(lock); + + /* + * wait for the reader count to reach zero + * Note the use of __ATOMIC_ACQUIRE here to ensure that any + * prior __ATOMIC_RELEASE write operation in get_hold_current_qp + * is visible prior to our read + */ + do { + count = (uint64_t)ATOMIC_LOAD_N(&qp->users, __ATOMIC_ACQUIRE); + } while (READER_COUNT(count) != 0); + + /* retire in order */ + pthread_mutex_lock(&lock->prior_lock); + while (lock->next_to_retire != ID_VAL(count)) + pthread_cond_wait(&lock->prior_signal, &lock->prior_lock); + lock->next_to_retire++; + pthread_cond_broadcast(&lock->prior_signal); + pthread_mutex_unlock(&lock->prior_lock); + + retire_qp(lock, qp); + + /* handle any callbacks that we have */ + while (cb_items != NULL) { + tmpcb = cb_items; + cb_items = cb_items->next; + tmpcb->fn(tmpcb->data); + OPENSSL_free(tmpcb); + } +} + +int ossl_rcu_call(CRYPTO_RCU_LOCK *lock, rcu_cb_fn cb, void *data) +{ + struct rcu_cb_item *new = + OPENSSL_zalloc(sizeof(*new)); + + if (new == NULL) + return 0; + + new->data = data; + new->fn = cb; + /* + * Use __ATOMIC_ACQ_REL here to indicate that any prior writes to this + * list are visible to us prior to reading, and publish the new value + * immediately + */ + new->next = ATOMIC_EXCHANGE_N(&lock->cb_items, new, __ATOMIC_ACQ_REL); + + return 1; +} + +void *ossl_rcu_uptr_deref(void **p) +{ + return (void *)ATOMIC_LOAD_N(p, __ATOMIC_ACQUIRE); +} + +void ossl_rcu_assign_uptr(void **p, void **v) +{ + ATOMIC_STORE(p, v, __ATOMIC_RELEASE); +} + +static CRYPTO_ONCE rcu_init_once = CRYPTO_ONCE_STATIC_INIT; + +CRYPTO_RCU_LOCK *ossl_rcu_lock_new(int num_writers) +{ + struct rcu_lock_st *new; + + if (!CRYPTO_THREAD_run_once(&rcu_init_once, ossl_rcu_init)) + return NULL; + + if (num_writers < 1) + num_writers = 1; + + new = OPENSSL_zalloc(sizeof(*new)); + if (new == NULL) + return NULL; + + pthread_mutex_init(&new->write_lock, NULL); + pthread_mutex_init(&new->prior_lock, NULL); + pthread_mutex_init(&new->alloc_lock, NULL); + pthread_cond_init(&new->prior_signal, NULL); + pthread_cond_init(&new->alloc_signal, NULL); + new->qp_group = allocate_new_qp_group(new, num_writers + 1); + if (new->qp_group == NULL) { + OPENSSL_free(new); + new = NULL; + } + return new; +} + +void ossl_rcu_lock_free(CRYPTO_RCU_LOCK *lock) +{ + struct rcu_lock_st *rlock = (struct rcu_lock_st *)lock; + + if (lock == NULL) + return; + + /* make sure we're synchronized */ + ossl_synchronize_rcu(rlock); + + OPENSSL_free(rlock->qp_group); + /* There should only be a single qp left now */ + OPENSSL_free(rlock); +} + CRYPTO_RWLOCK *CRYPTO_THREAD_lock_new(void) { # ifdef USE_RWLOCK CRYPTO_RWLOCK *lock; - if ((lock = CRYPTO_zalloc(sizeof(pthread_rwlock_t), NULL, 0)) == NULL) + if ((lock = OPENSSL_zalloc(sizeof(pthread_rwlock_t))) == NULL) /* Don't set error, to avoid recursion blowup. */ return NULL; @@ -59,7 +627,7 @@ CRYPTO_RWLOCK *CRYPTO_THREAD_lock_new(void) pthread_mutexattr_t attr; CRYPTO_RWLOCK *lock; - if ((lock = CRYPTO_zalloc(sizeof(pthread_mutex_t), NULL, 0)) == NULL) + if ((lock = OPENSSL_zalloc(sizeof(pthread_mutex_t))) == NULL) /* Don't set error, to avoid recursion blowup. */ return NULL; diff --git a/crypto/threads_win.c b/crypto/threads_win.c index 4cdc62339de..fd51c735f8d 100644 --- a/crypto/threads_win.c +++ b/crypto/threads_win.c @@ -13,6 +13,7 @@ # define USE_RWLOCK # endif #endif +#include /* * VC++ 2008 or earlier x86 compilers do not have an inline implementation @@ -27,6 +28,11 @@ #endif #include +#include +#include "internal/common.h" +#include "internal/thread_arch.h" +#include "internal/rcu.h" +#include "rcu_internal.h" #if defined(OPENSSL_THREADS) && !defined(CRYPTO_TDEBUG) && defined(OPENSSL_SYS_WINDOWS) @@ -37,20 +43,370 @@ typedef struct { } CRYPTO_win_rwlock; # endif +static CRYPTO_THREAD_LOCAL rcu_thr_key; + +# define READER_SHIFT 0 +# define ID_SHIFT 32 +# define READER_SIZE 32 +# define ID_SIZE 32 + +# define READER_MASK (((LONG64)1 << READER_SIZE)-1) +# define ID_MASK (((LONG64)1 << ID_SIZE)-1) +# define READER_COUNT(x) (((LONG64)(x) >> READER_SHIFT) & READER_MASK) +# define ID_VAL(x) (((LONG64)(x) >> ID_SHIFT) & ID_MASK) +# define VAL_READER ((LONG64)1 << READER_SHIFT) +# define VAL_ID(x) ((LONG64)x << ID_SHIFT) + +/* + * This defines a quescent point (qp) + * This is the barrier beyond which a writer + * must wait before freeing data that was + * atomically updated + */ +struct rcu_qp { + volatile LONG64 users; +}; + +struct thread_qp { + struct rcu_qp *qp; + unsigned int depth; + CRYPTO_RCU_LOCK *lock; +}; + +#define MAX_QPS 10 +/* + * This is the per thread tracking data + * that is assigned to each thread participating + * in an rcu qp + * + * qp points to the qp that it last acquired + * + */ +struct rcu_thr_data { + struct thread_qp thread_qps[MAX_QPS]; +}; + +/* + * This is the internal version of a CRYPTO_RCU_LOCK + * it is cast from CRYPTO_RCU_LOCK + */ +struct rcu_lock_st { + struct rcu_cb_item *cb_items; + uint32_t id_ctr; + struct rcu_qp *qp_group; + size_t group_count; + uint32_t next_to_retire; + volatile long int reader_idx; + uint32_t current_alloc_idx; + uint32_t writers_alloced; + CRYPTO_MUTEX *write_lock; + CRYPTO_MUTEX *alloc_lock; + CRYPTO_CONDVAR *alloc_signal; + CRYPTO_MUTEX *prior_lock; + CRYPTO_CONDVAR *prior_signal; +}; + +/* + * Called on thread exit to free the pthread key + * associated with this thread, if any + */ +static void free_rcu_thr_data(void *ptr) +{ + struct rcu_thr_data *data = + (struct rcu_thr_data *)CRYPTO_THREAD_get_local(&rcu_thr_key); + + OPENSSL_free(data); + CRYPTO_THREAD_set_local(&rcu_thr_key, NULL); +} + + +static void ossl_rcu_init(void) +{ + CRYPTO_THREAD_init_local(&rcu_thr_key, NULL); + ossl_init_thread_start(NULL, NULL, free_rcu_thr_data); +} + +static struct rcu_qp *allocate_new_qp_group(struct rcu_lock_st *lock, + int count) +{ + struct rcu_qp *new = + OPENSSL_zalloc(sizeof(*new) * count); + + lock->group_count = count; + return new; +} + +static CRYPTO_ONCE rcu_init_once = CRYPTO_ONCE_STATIC_INIT; + +CRYPTO_RCU_LOCK *ossl_rcu_lock_new(int num_writers) +{ + struct rcu_lock_st *new; + + if (!CRYPTO_THREAD_run_once(&rcu_init_once, ossl_rcu_init)) + return NULL; + + if (num_writers < 1) + num_writers = 1; + + new = OPENSSL_zalloc(sizeof(*new)); + + if (new == NULL) + return NULL; + + new->write_lock = ossl_crypto_mutex_new(); + new->alloc_signal = ossl_crypto_condvar_new(); + new->prior_signal = ossl_crypto_condvar_new(); + new->alloc_lock = ossl_crypto_mutex_new(); + new->prior_lock = ossl_crypto_mutex_new(); + new->write_lock = ossl_crypto_mutex_new(); + new->qp_group = allocate_new_qp_group(new, num_writers + 1); + if (new->qp_group == NULL + || new->alloc_signal == NULL + || new->prior_signal == NULL + || new->write_lock == NULL + || new->alloc_lock == NULL + || new->prior_lock == NULL) { + OPENSSL_free(new->qp_group); + ossl_crypto_condvar_free(&new->alloc_signal); + ossl_crypto_condvar_free(&new->prior_signal); + ossl_crypto_mutex_free(&new->alloc_lock); + ossl_crypto_mutex_free(&new->prior_lock); + ossl_crypto_mutex_free(&new->write_lock); + OPENSSL_free(new); + new = NULL; + } + return new; + +} + +void ossl_rcu_lock_free(CRYPTO_RCU_LOCK *lock) +{ + OPENSSL_free(lock->qp_group); + ossl_crypto_condvar_free(&lock->alloc_signal); + ossl_crypto_condvar_free(&lock->prior_signal); + ossl_crypto_mutex_free(&lock->alloc_lock); + ossl_crypto_mutex_free(&lock->prior_lock); + ossl_crypto_mutex_free(&lock->write_lock); + OPENSSL_free(lock); +} + +static inline struct rcu_qp *get_hold_current_qp(CRYPTO_RCU_LOCK *lock) +{ + uint32_t qp_idx; + + /* get the current qp index */ + for (;;) { + qp_idx = InterlockedOr(&lock->reader_idx, 0); + InterlockedAdd64(&lock->qp_group[qp_idx].users, VAL_READER); + if (qp_idx == InterlockedOr(&lock->reader_idx, 0)) + break; + InterlockedAdd64(&lock->qp_group[qp_idx].users, -VAL_READER); + } + + return &lock->qp_group[qp_idx]; +} + +void ossl_rcu_read_lock(CRYPTO_RCU_LOCK *lock) +{ + struct rcu_thr_data *data; + int i; + int available_qp = -1; + + /* + * we're going to access current_qp here so ask the + * processor to fetch it + */ + data = CRYPTO_THREAD_get_local(&rcu_thr_key); + + if (data == NULL) { + data = OPENSSL_zalloc(sizeof(*data)); + OPENSSL_assert(data != NULL); + CRYPTO_THREAD_set_local(&rcu_thr_key, data); + } + + for (i = 0; i < MAX_QPS; i++) { + if (data->thread_qps[i].qp == NULL && available_qp == -1) + available_qp = i; + /* If we have a hold on this lock already, we're good */ + if (data->thread_qps[i].lock == lock) + return; + } + + /* + * if we get here, then we don't have a hold on this lock yet + */ + assert(available_qp != -1); + + data->thread_qps[available_qp].qp = get_hold_current_qp(lock); + data->thread_qps[available_qp].depth = 1; + data->thread_qps[available_qp].lock = lock; +} + +void ossl_rcu_write_lock(CRYPTO_RCU_LOCK *lock) +{ + ossl_crypto_mutex_lock(lock->write_lock); +} + +void ossl_rcu_write_unlock(CRYPTO_RCU_LOCK *lock) +{ + ossl_crypto_mutex_unlock(lock->write_lock); +} + +void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock) +{ + struct rcu_thr_data *data = CRYPTO_THREAD_get_local(&rcu_thr_key); + int i; + LONG64 ret; + + assert(data != NULL); + + for (i = 0; i < MAX_QPS; i++) { + if (data->thread_qps[i].lock == lock) { + data->thread_qps[i].depth--; + if (data->thread_qps[i].depth == 0) { + ret = InterlockedAdd64(&data->thread_qps[i].qp->users, -VAL_READER); + OPENSSL_assert(ret >= 0); + data->thread_qps[i].qp = NULL; + data->thread_qps[i].lock = NULL; + } + return; + } + } +} + +static struct rcu_qp *update_qp(CRYPTO_RCU_LOCK *lock) +{ + uint64_t new_id; + uint32_t current_idx; + uint32_t tmp; + + ossl_crypto_mutex_lock(lock->alloc_lock); + /* + * we need at least one qp to be available with one + * left over, so that readers can start working on + * one that isn't yet being waited on + */ + while (lock->group_count - lock->writers_alloced < 2) + ossl_crypto_condvar_wait(lock->alloc_signal, lock->alloc_lock); + + current_idx = lock->current_alloc_idx; + /* Allocate the qp */ + lock->writers_alloced++; + + /* increment the allocation index */ + lock->current_alloc_idx = + (lock->current_alloc_idx + 1) % lock->group_count; + + /* get and insert a new id */ + new_id = lock->id_ctr; + lock->id_ctr++; + + new_id = VAL_ID(new_id); + InterlockedAnd64(&lock->qp_group[current_idx].users, ID_MASK); + InterlockedAdd64(&lock->qp_group[current_idx].users, new_id); + + /* update the reader index to be the prior qp */ + tmp = lock->current_alloc_idx; + InterlockedExchange(&lock->reader_idx, tmp); + + /* wake up any waiters */ + ossl_crypto_condvar_broadcast(lock->alloc_signal); + ossl_crypto_mutex_unlock(lock->alloc_lock); + return &lock->qp_group[current_idx]; +} + +static void retire_qp(CRYPTO_RCU_LOCK *lock, + struct rcu_qp *qp) +{ + ossl_crypto_mutex_lock(lock->alloc_lock); + lock->writers_alloced--; + ossl_crypto_condvar_broadcast(lock->alloc_signal); + ossl_crypto_mutex_unlock(lock->alloc_lock); +} + + +void ossl_synchronize_rcu(CRYPTO_RCU_LOCK *lock) +{ + struct rcu_qp *qp; + uint64_t count; + struct rcu_cb_item *cb_items, *tmpcb; + + /* before we do anything else, lets grab the cb list */ + cb_items = InterlockedExchangePointer((void * volatile *)&lock->cb_items, NULL); + + qp = update_qp(lock); + + /* wait for the reader count to reach zero */ + do { + count = InterlockedOr64(&qp->users, 0); + } while (READER_COUNT(count) != 0); + + /* retire in order */ + ossl_crypto_mutex_lock(lock->prior_lock); + while (lock->next_to_retire != ID_VAL(count)) + ossl_crypto_condvar_wait(lock->prior_signal, lock->prior_lock); + + lock->next_to_retire++; + ossl_crypto_condvar_broadcast(lock->prior_signal); + ossl_crypto_mutex_unlock(lock->prior_lock); + + retire_qp(lock, qp); + + /* handle any callbacks that we have */ + while (cb_items != NULL) { + tmpcb = cb_items; + cb_items = cb_items->next; + tmpcb->fn(tmpcb->data); + OPENSSL_free(tmpcb); + } + + /* and we're done */ + return; + +} + +int ossl_rcu_call(CRYPTO_RCU_LOCK *lock, rcu_cb_fn cb, void *data) +{ + struct rcu_cb_item *new; + struct rcu_cb_item *prev; + + new = OPENSSL_zalloc(sizeof(struct rcu_cb_item)); + if (new == NULL) + return 0; + prev = new; + new->data = data; + new->fn = cb; + + InterlockedExchangePointer((void * volatile *)&lock->cb_items, prev); + new->next = prev; + return 1; +} + +void *ossl_rcu_uptr_deref(void **p) +{ + return (void *)*p; +} + +void ossl_rcu_assign_uptr(void **p, void **v) +{ + InterlockedExchangePointer((void * volatile *)p, (void *)*v); +} + + CRYPTO_RWLOCK *CRYPTO_THREAD_lock_new(void) { CRYPTO_RWLOCK *lock; # ifdef USE_RWLOCK CRYPTO_win_rwlock *rwlock; - if ((lock = CRYPTO_zalloc(sizeof(CRYPTO_win_rwlock), NULL, 0)) == NULL) + if ((lock = OPENSSL_zalloc(sizeof(CRYPTO_win_rwlock))) == NULL) /* Don't set error, to avoid recursion blowup. */ return NULL; rwlock = lock; InitializeSRWLock(&rwlock->lock); # else - if ((lock = CRYPTO_zalloc(sizeof(CRITICAL_SECTION), NULL, 0)) == NULL) + if ((lock = OPENSSL_zalloc(sizeof(CRITICAL_SECTION))) == NULL) /* Don't set error, to avoid recursion blowup. */ return NULL; diff --git a/doc/internal/man3/ossl_rcu_lock_new.pod b/doc/internal/man3/ossl_rcu_lock_new.pod new file mode 100644 index 00000000000..e92bf29165b --- /dev/null +++ b/doc/internal/man3/ossl_rcu_lock_new.pod @@ -0,0 +1,258 @@ +=pod + +=head1 NAME + +ossl_rcu_lock_new, +ossl_rcu_lock_free, ossl_rcu_read_lock, +ossl_rcu_read_unlock, ossl_rcu_write_lock, +ossl_rcu_write_unlock, ossl_synchronize_rcu, +ossl_rcu_call, ossl_rcu_deref, +ossl_rcu_assign_ptr, ossl_rcu_uptr_deref, +ossl_rcu_assign_uptr +- perform read-copy-update locking + +=head1 SYNOPSIS + + CRYPTO_RCU_LOCK *ossl_rcu_lock_new(int num_writers); + void ossl_rcu_read_lock(CRYPTO_RCU_LOCK *lock); + void ossl_rcu_write_lock(CRYPTO_RCU_LOCK *lock); + void ossl_rcu_write_unlock(CRYPTO_RCU_LOCK *lock); + void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock); + void ossl_synchronize_rcu(CRYPTO_RCU_LOCK *lock); + void ossl_rcu_call(CRYPTO_RCU_LOCK *lock, rcu_cb_fn cb, void *data); + void *ossl_rcu_deref(void **p); + void ossl_rcu_uptr_deref(void **p); + void ossl_rcu_assign_ptr(void **p, void **v); + void ossl_rcu_assign_uptr(void **p, void **v); + void ossl_rcu_lock_free(CRYPTO_RCU_LOCK *lock); + +=head1 DESCRIPTION + +OpenSSL can be safely used in multi-threaded applications provided that +support for the underlying OS threading API is built-in. Currently, OpenSSL +supports the pthread and Windows APIs. OpenSSL can also be built without +any multi-threading support, for example on platforms that don't provide +any threading support or that provide a threading API that is not yet +supported by OpenSSL. + +In addition to more traditional Read/Write locks, OpenSSL provides +Read-Copy-Update (RCU) locks, which allow for always nonblocking read paths. + +The following multi-threading functions are provided: + +=over 2 + +=item * + +ossl_rcu_assign_uptr() assigns the value pointed to by v to the +location pointed to by p. This function should typically not be used, rely +instead on the ossl_rcu_assign_ptr() macro. + +=item * + +ossl_rcu_uptr_deref() returns the value stored at the +location pointed to by p. This function should typically not be used, rely +instead on the ossl_rcu_deref() macro. + +=item * + +ossl_rcu_assign_ptr() assigns the value pointed to by v to +location pointed to by p. + +=item * + +ossl_rcu_lock_new() allocates a new RCU lock. The I param +indicates the number of write side threads which may execute +ossl_synchronize_rcu() in parallel. The value must be at least 1, but may be +larger to obtain increased write side throughput at the cost of additional +internal memory usage. A value of 1 is generally recommended. + +=item * + +ossl_rcu_read_lock() acquires a read side hold on data protected by +the lock. + +=item * + +ossl_rcu_read_unlock() releases a read side hold on data protected by +the lock. + +=item * + +ossl_rcu_write_lock() acquires a write side hold on data protected by +the lock. Note only one writer per lock is permitted, as with read/write locks. + +=item * + +ossl_rcu_write_unlock() releases a write side hold on data protected +by the lock. + +=item * + +ossl_synchronize_rcu() blocks the calling thread until all read side +holds on the lock have been released, guaranteeing that any old data updated by +the write side thread is safe to free. + +=item * + +ossl_rcu_call() enqueues a callback function to the lock, to be called +when the next synchronization completes. Note: It is not guaranteed that the +thread which enqueued the callback will be the thread which executes the +callback + +=item * + +ossl_rcu_deref(p) atomically reads a pointer under an RCU locks +protection + +=item * + +ossl_rcu_assign_ptr(p,v) atomically writes to a pointer under an +RCU locks protection + +=item * + +ossl_rcu_lock_free() frees an allocated RCU lock + +=back + +=head1 RETURN VALUES + +ossl_rcu_lock_new() returns a pointer to a newly created RCU lock structure. + +ossl_rcu_deref() and ossl_rcu_uptr_deref() return the value pointed +to by the passed in value v. + +All other functions return no value. + +=head1 EXAMPLES + +You can find out if OpenSSL was configured with thread support: + + #include + #if defined(OPENSSL_THREADS) + /* thread support enabled */ + #else + /* no thread support */ + #endif + +This example safely initializes and uses a lock. + + #include "internal/rcu.h" + + struct foo { + int aval; + char *name; + }; + + static CRYPTO_ONCE once = CRYPTO_ONCE_STATIC_INIT; + static CRYPTO_RCU_LOCK *lock; + static struct foo *fooptr = NULL; + + static void myinit(void) + { + lock = ossl_rcu_lock_new(1); + } + + static int initlock(void) + { + if (!RUN_ONCE(&once, myinit) || lock == NULL) + return 0; + return 1; + } + + static void writer_thread() + { + struct foo *newfoo; + struct foo *oldfoo; + + initlock(); + + /* + * update steps in an rcu model + */ + + /* + * 1) create a new shared object + */ + newfoo = OPENSSL_zalloc(sizeof(struct foo)); + + /* + * acquire the write side lock + */ + ossl_rcu_write_lock(lock); + + /* + * 2) read the old pointer + */ + oldfoo = ossl_rcu_deref(&fooptr); + + /* + * 3) Copy the old pointer to the new object, and + * make any needed adjustments + */ + memcpy(newfoo, oldfoo, sizeof(struct foo)); + newfoo->aval++; + + /* + * 4) Update the shared pointer to the new value + */ + ossl_rcu_assign_ptr(&fooptr, &newfoo); + + /* + * 5) Release the write side lock + */ + ossl_rcu_write_unlock(lock); + + /* + * 6) wait for any read side holds on the old data + * to be released + */ + ossl_synchronize_rcu(lock); + + /* + * 7) free the old pointer, now that there are no + * further readers + */ + OPENSSL_free(oldfoo); + } + + static void reader_thread() + { + struct foo *myfoo = NULL; + int a; + /* + * 1) Acquire a read side hold on the shared data + */ + ossl_rcu_read_lock(lock); + + /* + * 2) Access the shared data pointer + */ + myfoo = ossl_rcu_deref(&fooptr); + + /* + * 3) Read the data from the pointer + */ + a = myfoo->aval; + + /* + * 4) Indicate our hold on the shared data is complete + */ + ossl_rcu_read_unlock(lock); + } + +=head1 SEE ALSO + +L, L. + +=head1 COPYRIGHT + +Copyright 2023 The OpenSSL Project Authors. All Rights Reserved. + +Licensed under the Apache License 2.0 (the "License"). You may not use +this file except in compliance with the License. You can obtain a copy +in the file LICENSE in the source distribution or at +L. + +=cut diff --git a/include/internal/rcu.h b/include/internal/rcu.h new file mode 100644 index 00000000000..31a270222d2 --- /dev/null +++ b/include/internal/rcu.h @@ -0,0 +1,31 @@ +/* + * Copyright 2023 The OpenSSL Project Authors. All Rights Reserved. + * + * Licensed under the Apache License 2.0 (the "License"). You may not use + * this file except in compliance with the License. You can obtain a copy + * in the file LICENSE in the source distribution or at + * https://www.openssl.org/source/license.html + */ + +#ifndef OPENSSL_RCU_H +# define OPENSSL_RCU_H +# pragma once + +typedef void (*rcu_cb_fn)(void *data); + +typedef struct rcu_lock_st CRYPTO_RCU_LOCK; + +CRYPTO_RCU_LOCK *ossl_rcu_lock_new(int num_writers); +void ossl_rcu_lock_free(CRYPTO_RCU_LOCK *lock); +void ossl_rcu_read_lock(CRYPTO_RCU_LOCK *lock); +void ossl_rcu_write_lock(CRYPTO_RCU_LOCK *lock); +void ossl_rcu_write_unlock(CRYPTO_RCU_LOCK *lock); +void ossl_rcu_read_unlock(CRYPTO_RCU_LOCK *lock); +void ossl_synchronize_rcu(CRYPTO_RCU_LOCK *lock); +int ossl_rcu_call(CRYPTO_RCU_LOCK *lock, rcu_cb_fn cb, void *data); +void *ossl_rcu_uptr_deref(void **p); +void ossl_rcu_assign_uptr(void **p, void **v); +#define ossl_rcu_deref(p) ossl_rcu_uptr_deref((void **)p) +#define ossl_rcu_assign_ptr(p,v) ossl_rcu_assign_uptr((void **)p, (void **)v) + +#endif diff --git a/test/threadstest.c b/test/threadstest.c index 317b637a07e..e0ecfd78149 100644 --- a/test/threadstest.c +++ b/test/threadstest.c @@ -29,9 +29,18 @@ #include #include "internal/tsan_assist.h" #include "internal/nelem.h" +#include "internal/time.h" +#include "internal/rcu.h" #include "testutil.h" #include "threadstest.h" +#ifdef __SANITIZE_THREAD__ +#include +#define TSAN_ACQUIRE(s) __tsan_acquire(s) +#else +#define TSAN_ACQUIRE(s) +#endif + /* Limit the maximum number of threads */ #define MAXIMUM_THREADS 10 @@ -91,6 +100,367 @@ static int test_lock(void) return res; } +#if defined(OPENSSL_THREADS) +static int contention = 0; +static int rwwriter1_done = 0; +static int rwwriter2_done = 0; +static int rwreader1_iterations = 0; +static int rwreader2_iterations = 0; +static int rwwriter1_iterations = 0; +static int rwwriter2_iterations = 0; +static int *rwwriter_ptr = NULL; +static int rw_torture_result = 1; +static CRYPTO_RWLOCK *rwtorturelock = NULL; + +static void rwwriter_fn(int id, int *iterations) +{ + int count; + int *old, *new; + OSSL_TIME t1, t2; + t1 = ossl_time_now(); + + for (count = 0; ; count++) { + new = CRYPTO_zalloc(sizeof (int), NULL, 0); + if (contention == 0) + OSSL_sleep(1000); + if (!CRYPTO_THREAD_write_lock(rwtorturelock)) + abort(); + if (rwwriter_ptr != NULL) { + *new = *rwwriter_ptr + 1; + } else { + *new = 0; + } + old = rwwriter_ptr; + rwwriter_ptr = new; + if (!CRYPTO_THREAD_unlock(rwtorturelock)) + abort(); + if (old != NULL) + CRYPTO_free(old, __FILE__, __LINE__); + t2 = ossl_time_now(); + if ((ossl_time2seconds(t2) - ossl_time2seconds(t1)) >= 4) + break; + } + *iterations = count; + return; +} + +static void rwwriter1_fn(void) +{ + int local; + + TEST_info("Starting writer1"); + rwwriter_fn(1, &rwwriter1_iterations); + CRYPTO_atomic_add(&rwwriter1_done, 1, &local, NULL); +} + +static void rwwriter2_fn(void) +{ + int local; + + TEST_info("Starting writer 2"); + rwwriter_fn(2, &rwwriter2_iterations); + CRYPTO_atomic_add(&rwwriter2_done, 1, &local, NULL); +} + +static void rwreader_fn(int *iterations) +{ + unsigned int count = 0; + + int old = 0; + int lw1 = 0; + int lw2 = 0; + + if (CRYPTO_THREAD_read_lock(rwtorturelock) == 0) + abort(); + + while (lw1 != 1 || lw2 != 1) { + CRYPTO_atomic_add(&rwwriter1_done, 0, &lw1, NULL); + CRYPTO_atomic_add(&rwwriter2_done, 0, &lw2, NULL); + + count++; + if (rwwriter_ptr != NULL && old > *rwwriter_ptr) { + TEST_info("rwwriter pointer went backwards\n"); + rw_torture_result = 0; + } + if (CRYPTO_THREAD_unlock(rwtorturelock) == 0) + abort(); + *iterations = count; + if (rw_torture_result == 0) { + *iterations = count; + return; + } + if (CRYPTO_THREAD_read_lock(rwtorturelock) == 0) + abort(); + } + *iterations = count; + if (CRYPTO_THREAD_unlock(rwtorturelock) == 0) + abort(); +} + +static void rwreader1_fn(void) +{ + TEST_info("Starting reader 1"); + rwreader_fn(&rwreader1_iterations); +} + +static void rwreader2_fn(void) +{ + TEST_info("Starting reader 2"); + rwreader_fn(&rwreader2_iterations); +} + +static thread_t rwwriter1; +static thread_t rwwriter2; +static thread_t rwreader1; +static thread_t rwreader2; + +static int _torture_rw(void) +{ + double tottime = 0; + int ret = 0; + double avr, avw; + OSSL_TIME t1, t2; + struct timeval dtime; + + rwtorturelock = CRYPTO_THREAD_lock_new(); + rwwriter1_iterations = 0; + rwwriter2_iterations = 0; + rwreader1_iterations = 0; + rwreader2_iterations = 0; + rwwriter1_done = 0; + rwwriter2_done = 0; + rw_torture_result = 1; + + memset(&rwwriter1, 0, sizeof(thread_t)); + memset(&rwwriter2, 0, sizeof(thread_t)); + memset(&rwreader1, 0, sizeof(thread_t)); + memset(&rwreader2, 0, sizeof(thread_t)); + + TEST_info("Staring rw torture"); + t1 = ossl_time_now(); + if (!TEST_true(run_thread(&rwreader1, rwreader1_fn)) + || !TEST_true(run_thread(&rwreader2, rwreader2_fn)) + || !TEST_true(run_thread(&rwwriter1, rwwriter1_fn)) + || !TEST_true(run_thread(&rwwriter2, rwwriter2_fn)) + || !TEST_true(wait_for_thread(rwwriter1)) + || !TEST_true(wait_for_thread(rwwriter2)) + || !TEST_true(wait_for_thread(rwreader1)) + || !TEST_true(wait_for_thread(rwreader2))) + goto out; + + t2 = ossl_time_now(); + dtime = ossl_time_to_timeval(ossl_time_subtract(t2, t1)); + tottime = dtime.tv_sec + (dtime.tv_usec / 1e6); + TEST_info("rw_torture_result is %d\n", rw_torture_result); + TEST_info("performed %d reads and %d writes over 2 read and 2 write threads in %e seconds", + rwreader1_iterations + rwreader2_iterations, + rwwriter1_iterations + rwwriter2_iterations, tottime); + avr = tottime / (rwreader1_iterations + rwreader2_iterations); + avw = (tottime / (rwwriter1_iterations + rwwriter2_iterations)); + TEST_info("Average read time %e/read", avr); + TEST_info("Averate write time %e/write", avw); + + if (TEST_int_eq(rw_torture_result, 1)) + ret = 1; +out: + CRYPTO_THREAD_lock_free(rwtorturelock); + rwtorturelock = NULL; + return ret; +} + +static int torture_rw_low(void) +{ + contention = 0; + return _torture_rw(); +} + +static int torture_rw_high(void) +{ + contention = 1; + return _torture_rw(); +} + + +static CRYPTO_RCU_LOCK *rcu_lock = NULL; + +static int writer1_done = 0; +static int writer2_done = 0; +static int reader1_iterations = 0; +static int reader2_iterations = 0; +static int writer1_iterations = 0; +static int writer2_iterations = 0; +static unsigned int *writer_ptr = NULL; +static unsigned int global_ctr = 0; +static int rcu_torture_result = 1; + +static void free_old_rcu_data(void *data) +{ + CRYPTO_free(data, NULL, 0); +} + +static void writer_fn(int id, int *iterations) +{ + int count; + OSSL_TIME t1, t2; + unsigned int *old, *new; + + t1 = ossl_time_now(); + + for (count = 0; ; count++) { + new = CRYPTO_zalloc(sizeof(int), NULL, 0); + if (contention == 0) + OSSL_sleep(1000); + ossl_rcu_write_lock(rcu_lock); + old = ossl_rcu_deref(&writer_ptr); + TSAN_ACQUIRE(&writer_ptr); + *new = global_ctr++; + ossl_rcu_assign_ptr(&writer_ptr, &new); + if (contention == 0) + ossl_rcu_call(rcu_lock, free_old_rcu_data, old); + ossl_rcu_write_unlock(rcu_lock); + if (contention != 0) { + ossl_synchronize_rcu(rcu_lock); + CRYPTO_free(old, NULL, 0); + } + t2 = ossl_time_now(); + if ((ossl_time2seconds(t2) - ossl_time2seconds(t1)) >= 4) + break; + } + *iterations = count; + return; +} + +static void writer1_fn(void) +{ + int local; + + TEST_info("Starting writer1"); + writer_fn(1, &writer1_iterations); + CRYPTO_atomic_add(&writer1_done, 1, &local, NULL); +} + +static void writer2_fn(void) +{ + int local; + + TEST_info("Starting writer2"); + writer_fn(2, &writer2_iterations); + CRYPTO_atomic_add(&writer2_done, 1, &local, NULL); +} + +static void reader_fn(int *iterations) +{ + unsigned int count = 0; + unsigned int *valp; + unsigned int val; + unsigned int oldval = 0; + int lw1 = 0; + int lw2 = 0; + + while (lw1 != 1 || lw2 != 1) { + CRYPTO_atomic_add(&writer1_done, 0, &lw1, NULL); + CRYPTO_atomic_add(&writer2_done, 0, &lw2, NULL); + count++; + ossl_rcu_read_lock(rcu_lock); + valp = ossl_rcu_deref(&writer_ptr); + val = (valp == NULL) ? 0 : *valp; + if (oldval > val) { + TEST_info("rcu torture value went backwards! (%p) %x : %x\n", (void *)valp, oldval, val); + rcu_torture_result = 0; + } + oldval = val; /* just try to deref the pointer */ + ossl_rcu_read_unlock(rcu_lock); + if (rcu_torture_result == 0) { + *iterations = count; + return; + } + } + *iterations = count; +} + +static void reader1_fn(void) +{ + TEST_info("Starting reader 1"); + reader_fn(&reader1_iterations); +} + +static void reader2_fn(void) +{ + TEST_info("Starting reader 2"); + reader_fn(&reader2_iterations); +} + +static thread_t writer1; +static thread_t writer2; +static thread_t reader1; +static thread_t reader2; + +static int _torture_rcu(void) +{ + OSSL_TIME t1, t2; + struct timeval dtime; + double tottime; + double avr, avw; + + memset(&writer1, 0, sizeof(thread_t)); + memset(&writer2, 0, sizeof(thread_t)); + memset(&reader1, 0, sizeof(thread_t)); + memset(&reader2, 0, sizeof(thread_t)); + + writer1_iterations = 0; + writer2_iterations = 0; + reader1_iterations = 0; + reader2_iterations = 0; + writer1_done = 0; + writer2_done = 0; + rcu_torture_result = 1; + + rcu_lock = ossl_rcu_lock_new(1); + + TEST_info("Staring rcu torture"); + t1 = ossl_time_now(); + if (!TEST_true(run_thread(&reader1, reader1_fn)) + || !TEST_true(run_thread(&reader2, reader2_fn)) + || !TEST_true(run_thread(&writer1, writer1_fn)) + || !TEST_true(run_thread(&writer2, writer2_fn)) + || !TEST_true(wait_for_thread(writer1)) + || !TEST_true(wait_for_thread(writer2)) + || !TEST_true(wait_for_thread(reader1)) + || !TEST_true(wait_for_thread(reader2))) + return 0; + + t2 = ossl_time_now(); + dtime = ossl_time_to_timeval(ossl_time_subtract(t2, t1)); + tottime = dtime.tv_sec + (dtime.tv_usec / 1e6); + TEST_info("rcu_torture_result is %d\n", rcu_torture_result); + TEST_info("performed %d reads and %d writes over 2 read and 2 write threads in %e seconds", + reader1_iterations + reader2_iterations, + writer1_iterations + writer2_iterations, tottime); + avr = tottime / (reader1_iterations + reader2_iterations); + avw = tottime / (writer1_iterations + writer2_iterations); + TEST_info("Average read time %e/read", avr); + TEST_info("Average write time %e/write", avw); + + ossl_rcu_lock_free(rcu_lock); + if (!TEST_int_eq(rcu_torture_result, 1)) + return 0; + + return 1; +} + +static int torture_rcu_low(void) +{ + contention = 0; + return _torture_rcu(); +} + +static int torture_rcu_high(void) +{ + contention = 1; + return _torture_rcu(); +} +#endif + static CRYPTO_ONCE once_run = CRYPTO_ONCE_STATIC_INIT; static unsigned once_run_count = 0; @@ -850,6 +1220,12 @@ int setup_tests(void) ADD_TEST(test_multi_default); ADD_TEST(test_lock); +#if defined(OPENSSL_THREADS) + ADD_TEST(torture_rw_low); + ADD_TEST(torture_rw_high); + ADD_TEST(torture_rcu_low); + ADD_TEST(torture_rcu_high); +#endif ADD_TEST(test_once); ADD_TEST(test_thread_local); ADD_TEST(test_atomic); -- 2.47.2