]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Add the reader-writer synchronization with modified C-RW-WP
authorOndřej Surý <ondrej@sury.org>
Wed, 24 Mar 2021 16:52:56 +0000 (17:52 +0100)
committerOndřej Surý <ondrej@isc.org>
Wed, 15 Feb 2023 08:30:04 +0000 (09:30 +0100)
This changes the internal isc_rwlock implementation to:

  Irina Calciu, Dave Dice, Yossi Lev, Victor Luchangco, Virendra
  J. Marathe, and Nir Shavit.  2013.  NUMA-aware reader-writer locks.
  SIGPLAN Not. 48, 8 (August 2013), 157–166.
  DOI:https://doi.org/10.1145/2517327.24425

(The full article available from:
  http://mcg.cs.tau.ac.il/papers/ppopp2013-rwlocks.pdf)

The implementation is based on the The Writer-Preference Lock (C-RW-WP)
variant (see the 3.4 section of the paper for the rationale).

The implemented algorithm has been modified for simplicity and for usage
patterns in rbtdb.c.

The changes compared to the original algorithm:

  * We haven't implemented the cohort locks because that would require a
    knowledge of NUMA nodes, instead a simple atomic_bool is used as
    synchronization point for writer lock.

  * The per-thread reader counters are not being used - this would
    require the internal thread id (isc_tid_v) to be always initialized,
    even in the utilities; the change has a slight performance penalty,
    so we might revisit this change in the future.  However, this change
    also saves a lot of memory, because cache-line aligned counters were
    used, so on 32-core machine, the rwlock would be 4096+ bytes big.

  * The readers use a writer_barrier that will raise after a while when
    readers lock can't be acquired to prevent readers starvation.

  * Separate ingress and egress readers counters queues to reduce both
    inter and intra-thread contention.

22 files changed:
bin/dnssec/dnssec-signzone.c
lib/dns/acl.c
lib/dns/badcache.c
lib/dns/db.c
lib/dns/dlz.c
lib/dns/forward.c
lib/dns/keytable.c
lib/dns/nta.c
lib/dns/rbtdb.c
lib/dns/rpz.c
lib/dns/transport.c
lib/dns/tsig.c
lib/dns/view.c
lib/dns/zone.c
lib/dns/zt.c
lib/isc/include/isc/mutex.h
lib/isc/include/isc/rwlock.h
lib/isc/include/isc/util.h
lib/isc/log.c
lib/isc/managers.c
lib/isc/rwlock.c
lib/isc/tls.c

index 6f2328c72e9105163ea28190f404c99724878634..39ae3a35b90d25f03dc3fcb900c351bd9641f73a 100644 (file)
@@ -3778,7 +3778,7 @@ main(int argc, char *argv[]) {
         * of keys rather early.
         */
        ISC_LIST_INIT(keylist);
-       isc_rwlock_init(&keylist_lock, 0, 0);
+       isc_rwlock_init(&keylist_lock);
 
        /*
         * Fill keylist with:
index 09f39bf28fa9a6f8fdf904bf8f95ac70a2a98ce9..8678490906dac4848463067887e22baeb36d6667 100644 (file)
@@ -696,7 +696,7 @@ dns_aclenv_create(isc_mem_t *mctx, dns_aclenv_t **envp) {
 
        isc_mem_attach(mctx, &env->mctx);
        isc_refcount_init(&env->references, 1);
-       isc_rwlock_init(&env->rwlock, 0, 0);
+       isc_rwlock_init(&env->rwlock);
 
        result = dns_acl_create(mctx, 0, &env->localhost);
        if (result != ISC_R_SUCCESS) {
index 64363ea7939cabf17e00e25eb539becfe4598fef..cff5dcadf0375c0139519a78126d09ecdd660854 100644 (file)
@@ -83,7 +83,7 @@ dns_badcache_init(isc_mem_t *mctx, unsigned int size, dns_badcache_t **bcp) {
        };
 
        isc_mem_attach(mctx, &bc->mctx);
-       isc_rwlock_init(&bc->lock, 0, 0);
+       isc_rwlock_init(&bc->lock);
 
        bc->table = isc_mem_getx(bc->mctx, sizeof(bc->table[0]) * size,
                                 ISC_MEM_ZERO);
index e0a46c9856ca758309ad52c067b9d0ea92ff8433..87549bbcb45c3b89c82faaf2aa01e06332f16581 100644 (file)
@@ -70,7 +70,7 @@ static dns_dbimplementation_t rbtimp;
 
 static void
 initialize(void) {
-       isc_rwlock_init(&implock, 0, 0);
+       isc_rwlock_init(&implock);
 
        rbtimp.name = "rbt";
        rbtimp.create = dns_rbtdb_create;
index 75159439124d3f8976bb4fad95f473051bca5cd5..3cd390600c7c794f10913d42f161cfb942fe0cc6 100644 (file)
@@ -83,7 +83,7 @@ static isc_once_t once = ISC_ONCE_INIT;
 
 static void
 dlz_initialize(void) {
-       isc_rwlock_init(&dlz_implock, 0, 0);
+       isc_rwlock_init(&dlz_implock);
        ISC_LIST_INIT(dlz_implementations);
 }
 
index ef05529d832a274fa3d03d831134d587d1ff37a7..20cb70947246dbe3b3a418a544931c54eb826837 100644 (file)
@@ -54,7 +54,7 @@ dns_fwdtable_create(isc_mem_t *mctx, dns_fwdtable_t **fwdtablep) {
                goto cleanup_fwdtable;
        }
 
-       isc_rwlock_init(&fwdtable->rwlock, 0, 0);
+       isc_rwlock_init(&fwdtable->rwlock);
        fwdtable->mctx = NULL;
        isc_mem_attach(mctx, &fwdtable->mctx);
        fwdtable->magic = FWDTABLEMAGIC;
index 2e9922509b3861fe02c1c1974ee3719ad976b96a..deb6ca01de8384e7dcdfbbd04a76fa7179b6673c 100644 (file)
@@ -158,7 +158,7 @@ dns_keytable_create(isc_mem_t *mctx, dns_keytable_t **keytablep) {
                goto cleanup_keytable;
        }
 
-       isc_rwlock_init(&keytable->rwlock, 0, 0);
+       isc_rwlock_init(&keytable->rwlock);
        isc_refcount_init(&keytable->references, 1);
 
        keytable->mctx = NULL;
@@ -342,7 +342,7 @@ new_keynode(dns_rdata_ds_t *ds, dns_keytable_t *keytable, bool managed,
 
        dns_rdataset_init(&knode->dsset);
        isc_refcount_init(&knode->refcount, 1);
-       isc_rwlock_init(&knode->rwlock, 0, 0);
+       isc_rwlock_init(&knode->rwlock);
 
        /*
         * If a DS was supplied, initialize an rdatalist.
index 92fb14c535128c6f15b8ce97e049cd2bd453d73e..c913c7b2814305fee3a760f25cf265f13af2ca99 100644 (file)
@@ -136,7 +136,7 @@ dns_ntatable_create(dns_view_t *view, isc_taskmgr_t *taskmgr,
                goto cleanup_task;
        }
 
-       isc_rwlock_init(&ntatable->rwlock, 0, 0);
+       isc_rwlock_init(&ntatable->rwlock);
 
        isc_refcount_init(&ntatable->references, 1);
 
index b41f05d73d7c4cbe115a36217fe1902a1cded59d..a416ab66f7893e77d785fb48eb5fd0ef1d0f2390 100644 (file)
@@ -104,7 +104,7 @@ typedef uint32_t rbtdb_rdatatype_t;
        RBTDB_RDATATYPE_VALUE(dns_rdatatype_rrsig, dns_rdatatype_soa)
 #define RBTDB_RDATATYPE_NCACHEANY RBTDB_RDATATYPE_VALUE(0, dns_rdatatype_any)
 
-#define RBTDB_INITLOCK(l)    isc_rwlock_init((l), 0, 0)
+#define RBTDB_INITLOCK(l)    isc_rwlock_init((l))
 #define RBTDB_DESTROYLOCK(l) isc_rwlock_destroy(l)
 #define RBTDB_LOCK(l, t)     RWLOCK((l), (t))
 #define RBTDB_UNLOCK(l, t)   RWUNLOCK((l), (t))
@@ -117,7 +117,7 @@ typedef uint32_t rbtdb_rdatatype_t;
 
 typedef isc_rwlock_t nodelock_t;
 
-#define NODE_INITLOCK(l)    isc_rwlock_init((l), 0, 0)
+#define NODE_INITLOCK(l)    isc_rwlock_init((l))
 #define NODE_DESTROYLOCK(l) isc_rwlock_destroy(l)
 #define NODE_LOCK(l, t, tp)                                      \
        {                                                        \
@@ -161,7 +161,7 @@ typedef isc_rwlock_t nodelock_t;
 
 typedef isc_rwlock_t treelock_t;
 
-#define TREE_INITLOCK(l)    isc_rwlock_init(l, 0, 0)
+#define TREE_INITLOCK(l)    isc_rwlock_init(l)
 #define TREE_DESTROYLOCK(l) isc_rwlock_destroy(l)
 #define TREE_LOCK(l, t, tp)                                      \
        {                                                        \
@@ -1294,7 +1294,7 @@ allocate_version(isc_mem_t *mctx, rbtdb_serial_t serial,
        version->serial = serial;
 
        isc_refcount_init(&version->references, references);
-       isc_rwlock_init(&version->glue_rwlock, 0, 0);
+       isc_rwlock_init(&version->glue_rwlock);
 
        version->glue_table_bits = ISC_HASH_MIN_BITS;
        version->glue_table_nodecount = 0U;
@@ -1343,7 +1343,7 @@ newversion(dns_db_t *db, dns_dbversion_t **versionp) {
                version->salt_length = 0;
                memset(version->salt, 0, sizeof(version->salt));
        }
-       isc_rwlock_init(&version->rwlock, 0, 0);
+       isc_rwlock_init(&version->rwlock);
        RWLOCK(&rbtdb->current_version->rwlock, isc_rwlocktype_read);
        version->records = rbtdb->current_version->records;
        version->xfrsize = rbtdb->current_version->xfrsize;
@@ -2207,7 +2207,7 @@ prune_tree(void *arg) {
                node = parent;
        } while (node != NULL);
        NODE_UNLOCK(&rbtdb->node_locks[locknum].lock, &nlocktype);
-       RWUNLOCK(&rbtdb->tree_lock, tlocktype);
+       TREE_UNLOCK(&rbtdb->tree_lock, &tlocktype);
 
        detach((dns_db_t **)&rbtdb);
 }
@@ -8431,7 +8431,7 @@ dns_rbtdb_create(isc_mem_t *mctx, const dns_name_t *origin, dns_dbtype_t type,
        rbtdb->current_version->salt_length = 0;
        memset(rbtdb->current_version->salt, 0,
               sizeof(rbtdb->current_version->salt));
-       isc_rwlock_init(&rbtdb->current_version->rwlock, 0, 0);
+       isc_rwlock_init(&rbtdb->current_version->rwlock);
        rbtdb->current_version->records = 0;
        rbtdb->current_version->xfrsize = 0;
        rbtdb->future_version = NULL;
index 8dc62d29e29f91c2b843c518a21d84df7927c16f..75ee525f7da62007ba7f0fb0ce6243a137fd4de4 100644 (file)
@@ -1481,7 +1481,7 @@ dns_rpz_new_zones(isc_mem_t *mctx, isc_loopmgr_t *loopmgr, char *rps_cstr,
                .magic = DNS_RPZ_ZONES_MAGIC,
        };
 
-       isc_rwlock_init(&rpzs->search_lock, 0, 0);
+       isc_rwlock_init(&rpzs->search_lock);
        isc_mutex_init(&rpzs->maint_lock);
        isc_refcount_init(&rpzs->references, 1);
 
index 9c6a123081cf4103da60a683e8fab55ae3d0cb9b..d76f62f3d5d095c8c48d1de7afd4b8514df9487b 100644 (file)
@@ -652,7 +652,7 @@ dns_transport_list_new(isc_mem_t *mctx) {
 
        *list = (dns_transport_list_t){ 0 };
 
-       isc_rwlock_init(&list->lock, 0, 0);
+       isc_rwlock_init(&list->lock);
 
        isc_mem_attach(mctx, &list->mctx);
        isc_refcount_init(&list->references, 1);
index 3dca68930bc9fd3750842d70bed3bb8bc12f9ba6..1bf652a1596044a96728da3d27dd20a744b61617 100644 (file)
@@ -1812,7 +1812,7 @@ dns_tsigkeyring_create(isc_mem_t *mctx, dns_tsig_keyring_t **ringp) {
 
        ring = isc_mem_get(mctx, sizeof(dns_tsig_keyring_t));
 
-       isc_rwlock_init(&ring->lock, 0, 0);
+       isc_rwlock_init(&ring->lock);
        ring->keys = NULL;
        result = dns_rbt_create(mctx, free_tsignode, NULL, &ring->keys);
        if (result != ISC_R_SUCCESS) {
index e8a53aa108ebff94894105f23681f8cc970a979b..0c7de7446314da4e4651b8ad50615b2274449a8c 100644 (file)
@@ -133,7 +133,7 @@ dns_view_create(isc_mem_t *mctx, dns_rdataclass_t rdclass, const char *name,
 
        isc_mutex_init(&view->lock);
 
-       isc_rwlock_init(&view->sfd_lock, 0, 0);
+       isc_rwlock_init(&view->sfd_lock);
 
        view->zonetable = NULL;
        result = dns_zt_create(mctx, rdclass, &view->zonetable);
index 951bbfd34c7f4fd6eab9ac2d8cb942a1f4b82520..3a261d0b494d829ead533cbdbe4f34122ea271a8 100644 (file)
@@ -214,7 +214,7 @@ typedef struct dns_include dns_include_t;
        } while (0)
 #endif /* ifdef DNS_ZONE_CHECKLOCK */
 
-#define ZONEDB_INITLOCK(l)    isc_rwlock_init((l), 0, 0)
+#define ZONEDB_INITLOCK(l)    isc_rwlock_init((l))
 #define ZONEDB_DESTROYLOCK(l) isc_rwlock_destroy(l)
 #define ZONEDB_LOCK(l, t)     RWLOCK((l), (t))
 #define ZONEDB_UNLOCK(l, t)   RWUNLOCK((l), (t))
@@ -17942,7 +17942,7 @@ zonemgr_keymgmt_init(dns_zonemgr_t *zmgr) {
        };
 
        isc_mem_attach(zmgr->mctx, &mgmt->mctx);
-       isc_rwlock_init(&mgmt->lock, 0, 0);
+       isc_rwlock_init(&mgmt->lock);
        isc_hashmap_create(mgmt->mctx, DNS_KEYMGMT_HASH_BITS,
                           ISC_HASHMAP_CASE_SENSITIVE, &mgmt->table);
 
@@ -18074,10 +18074,10 @@ dns_zonemgr_create(isc_mem_t *mctx, isc_loopmgr_t *loopmgr,
        for (size_t i = 0; i < UNREACH_CACHE_SIZE; i++) {
                atomic_init(&zmgr->unreachable[i].expire, 0);
        }
-       isc_rwlock_init(&zmgr->rwlock, 0, 0);
+       isc_rwlock_init(&zmgr->rwlock);
 
        /* Unreachable lock. */
-       isc_rwlock_init(&zmgr->urlock, 0, 0);
+       isc_rwlock_init(&zmgr->urlock);
 
        isc_ratelimiter_create(loop, &zmgr->checkdsrl);
        isc_ratelimiter_create(loop, &zmgr->notifyrl);
index a0c300aaf0c9decab1e57bb62232e662beccdf2c..da4eac74ce545b82a8e795d913fa71963a13d0f2 100644 (file)
@@ -95,7 +95,7 @@ dns_zt_create(isc_mem_t *mctx, dns_rdataclass_t rdclass, dns_zt_t **ztp) {
                goto cleanup_zt;
        }
 
-       isc_rwlock_init(&zt->rwlock, 0, 0);
+       isc_rwlock_init(&zt->rwlock);
        zt->mctx = NULL;
        isc_mem_attach(mctx, &zt->mctx);
        isc_refcount_init(&zt->references, 1);
index df249223365eb5e44caba2d92d151be4fb1bd7c0..45256a6fac9e2fbfadb69402fe050ddbf383cdf1 100644 (file)
@@ -23,6 +23,8 @@
 #include <isc/result.h> /* for ISC_R_ codes */
 #include <isc/util.h>
 
+#define ISC_MUTEX_INITIALIZER PTHREAD_MUTEX_INITIALIZER
+
 ISC_LANG_BEGINDECLS
 
 /*
index 44a44a86ccc671858a3ab05f7dd71ee24037111d..9d5c54053e92a4da51137d7db110d42705539911 100644 (file)
@@ -18,8 +18,6 @@
 
 /*! \file isc/rwlock.h */
 
-#include <isc/atomic.h>
-#include <isc/condition.h>
 #include <isc/lang.h>
 #include <isc/types.h>
 #include <isc/util.h>
@@ -46,10 +44,10 @@ typedef enum {
 typedef pthread_rwlock_t *isc_rwlock_t;
 typedef pthread_rwlock_t  isc__rwlock_t;
 
-#define isc_rwlock_init(rwl, rq, wq)            \
-       {                                       \
-               *rwl = malloc(sizeof(**rwl));   \
-               isc__rwlock_init(*rwl, rq, wq); \
+#define isc_rwlock_init(rwl)                  \
+       {                                     \
+               *rwl = malloc(sizeof(**rwl)); \
+               isc__rwlock_init(*rwl);       \
        }
 #define isc_rwlock_lock(rwl, type)    isc__rwlock_lock(*rwl, type)
 #define isc_rwlock_trylock(rwl, type) isc__rwlock_trylock(*rwl, type)
@@ -66,7 +64,7 @@ typedef pthread_rwlock_t  isc__rwlock_t;
 typedef pthread_rwlock_t isc_rwlock_t;
 typedef pthread_rwlock_t isc__rwlock_t;
 
-#define isc_rwlock_init(rwl, rq, wq)  isc__rwlock_init(rwl, rq, wq)
+#define isc_rwlock_init(rwl)         isc__rwlock_init(rwl)
 #define isc_rwlock_lock(rwl, type)    isc__rwlock_lock(rwl, type)
 #define isc_rwlock_trylock(rwl, type) isc__rwlock_trylock(rwl, type)
 #define isc_rwlock_unlock(rwl, type)  isc__rwlock_unlock(rwl, type)
@@ -75,7 +73,7 @@ typedef pthread_rwlock_t isc__rwlock_t;
 
 #endif /* ISC_TRACK_PTHREADS_OBJECTS */
 
-#define isc__rwlock_init(rwl, read_quota, write_quote)             \
+#define isc__rwlock_init(rwl)                                      \
        {                                                          \
                int _ret = pthread_rwlock_init(rwl, NULL);         \
                PTHREADS_RUNTIME_CHECK(pthread_rwlock_init, _ret); \
@@ -158,72 +156,99 @@ typedef pthread_rwlock_t isc__rwlock_t;
                PTHREADS_RUNTIME_CHECK(pthread_rwlock_destroy, _ret); \
        }
 
+#define isc_rwlock_setworkers(workers)
+
 #else /* USE_PTHREAD_RWLOCK */
 
+#include <isc/align.h>
+#include <isc/atomic.h>
+#include <isc/os.h>
+
 struct isc_rwlock {
-       /* Unlocked. */
-       unsigned int        magic;
-       isc_mutex_t         lock;
-       atomic_int_fast32_t spins;
-
-       /*
-        * When some atomic instructions with hardware assistance are
-        * available, rwlock will use those so that concurrent readers do not
-        * interfere with each other through mutex as long as no writers
-        * appear, massively reducing the lock overhead in the typical case.
-        *
-        * The basic algorithm of this approach is the "simple
-        * writer-preference lock" shown in the following URL:
-        * http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/rw.html
-        * but our implementation does not rely on the spin lock unlike the
-        * original algorithm to be more portable as a user space application.
-        */
-
-       /* Read or modified atomically. */
-       atomic_int_fast32_t write_requests;
-       atomic_int_fast32_t write_completions;
-       atomic_int_fast32_t cnt_and_flag;
-
-       /* Locked by lock. */
-       isc_condition_t readable;
-       isc_condition_t writeable;
-       unsigned int    readers_waiting;
-
-       /* Locked by rwlock itself. */
-       atomic_uint_fast32_t write_granted;
-
-       /* Unlocked. */
-       unsigned int write_quota;
+       alignas(ISC_OS_CACHELINE_SIZE) atomic_uint_fast32_t readers_ingress;
+       alignas(ISC_OS_CACHELINE_SIZE) atomic_uint_fast32_t readers_egress;
+       alignas(ISC_OS_CACHELINE_SIZE) atomic_int_fast32_t writers_barrier;
+       alignas(ISC_OS_CACHELINE_SIZE) atomic_bool writers_lock;
 };
 
 typedef struct isc_rwlock isc_rwlock_t;
-typedef struct isc_rwlock isc__rwlock_t;
 
-#define isc_rwlock_init(rwl, rq, wq)  isc__rwlock_init(rwl, rq, wq)
-#define isc_rwlock_lock(rwl, type)    isc__rwlock_lock(rwl, type)
-#define isc_rwlock_trylock(rwl, type) isc__rwlock_trylock(rwl, type)
-#define isc_rwlock_unlock(rwl, type)  isc__rwlock_unlock(rwl, type)
-#define isc_rwlock_tryupgrade(rwl)    isc__rwlock_tryupgrade(rwl)
-#define isc_rwlock_destroy(rwl)              isc__rwlock_destroy(rwl)
+void
+isc_rwlock_init(isc_rwlock_t *rwl);
 
 void
-isc__rwlock_init(isc__rwlock_t *rwl, unsigned int read_quota,
-                unsigned int write_quota);
+isc_rwlock_rdlock(isc_rwlock_t *rwl);
 
 void
-isc__rwlock_lock(isc__rwlock_t *rwl, isc_rwlocktype_t type);
+isc_rwlock_wrlock(isc_rwlock_t *rwl);
+
+isc_result_t
+isc_rwlock_tryrdlock(isc_rwlock_t *rwl);
 
 isc_result_t
-isc__rwlock_trylock(isc__rwlock_t *rwl, isc_rwlocktype_t type);
+isc_rwlock_trywrlock(isc_rwlock_t *rwl);
 
 void
-isc__rwlock_unlock(isc__rwlock_t *rwl, isc_rwlocktype_t type);
+isc_rwlock_rdunlock(isc_rwlock_t *rwl);
+
+void
+isc_rwlock_wrunlock(isc_rwlock_t *rwl);
 
 isc_result_t
-isc__rwlock_tryupgrade(isc__rwlock_t *rwl);
+isc_rwlock_tryupgrade(isc_rwlock_t *rwl);
+
+void
+isc_rwlock_downgrade(isc_rwlock_t *rwl);
 
 void
-isc__rwlock_destroy(isc__rwlock_t *rwl);
+isc_rwlock_destroy(isc_rwlock_t *rwl);
+
+void
+isc_rwlock_setworkers(uint16_t workers);
+
+#define isc_rwlock_lock(rwl, type)              \
+       {                                       \
+               switch (type) {                 \
+               case isc_rwlocktype_read:       \
+                       isc_rwlock_rdlock(rwl); \
+                       break;                  \
+               case isc_rwlocktype_write:      \
+                       isc_rwlock_wrlock(rwl); \
+                       break;                  \
+               default:                        \
+                       UNREACHABLE();          \
+               }                               \
+       }
+
+#define isc_rwlock_trylock(rwl, type)                         \
+       ({                                                    \
+               int __result;                                 \
+               switch (type) {                               \
+               case isc_rwlocktype_read:                     \
+                       __result = isc_rwlock_tryrdlock(rwl); \
+                       break;                                \
+               case isc_rwlocktype_write:                    \
+                       __result = isc_rwlock_trywrlock(rwl); \
+                       break;                                \
+               default:                                      \
+                       UNREACHABLE();                        \
+               }                                             \
+               __result;                                     \
+       })
+
+#define isc_rwlock_unlock(rwl, type)              \
+       {                                         \
+               switch (type) {                   \
+               case isc_rwlocktype_read:         \
+                       isc_rwlock_rdunlock(rwl); \
+                       break;                    \
+               case isc_rwlocktype_write:        \
+                       isc_rwlock_wrunlock(rwl); \
+                       break;                    \
+               default:                          \
+                       UNREACHABLE();            \
+               }                                 \
+       }
 
 #endif /* USE_PTHREAD_RWLOCK */
 
index f24a8da2d581f95c28e22837839c5105f38b152b..35d905015879bed145fd25fba6f57803ea65c76d 100644 (file)
 #define WRLOCK(lp)   RWLOCK(lp, isc_rwlocktype_write)
 #define WRUNLOCK(lp) RWUNLOCK(lp, isc_rwlocktype_write)
 
+#define UPGRADELOCK(lock, locktype)                                         \
+       {                                                                   \
+               if (locktype == isc_rwlocktype_read) {                      \
+                       if (isc_rwlock_tryupgrade(lock) == ISC_R_SUCCESS) { \
+                               locktype = isc_rwlocktype_write;            \
+                       } else {                                            \
+                               RWUNLOCK(lock, locktype);                   \
+                               locktype = isc_rwlocktype_write;            \
+                               RWLOCK(lock, locktype);                     \
+                       }                                                   \
+               }                                                           \
+               INSIST(locktype == isc_rwlocktype_write);                   \
+       }
+
 /*
  * List Macros.
  */
index 5547495f7f59061225a5f9bcf7c1d0a4645b9287..43e371a8133a87a787db32cbdc5139b71f408660 100644 (file)
@@ -264,7 +264,7 @@ isc_log_create(isc_mem_t *mctx, isc_log_t **lctxp, isc_logconfig_t **lcfgp) {
        ISC_LIST_INIT(lctx->messages);
 
        isc_mutex_init(&lctx->lock);
-       isc_rwlock_init(&lctx->lcfg_rwl, 0, 0);
+       isc_rwlock_init(&lctx->lcfg_rwl);
 
        /*
         * Normally setting the magic number is the last step done
index 2fa8177720144981ba91ca6d920d7bf4eb1d217a..9fcade3e6195a634ae25c2b53736e2f8701dd907 100644 (file)
@@ -12,7 +12,9 @@
  */
 
 #include <isc/managers.h>
+#include <isc/rwlock.h>
 #include <isc/util.h>
+#include <isc/uv.h>
 
 void
 isc_managers_create(isc_mem_t **mctxp, uint32_t workers,
@@ -33,6 +35,8 @@ isc_managers_create(isc_mem_t **mctxp, uint32_t workers,
        REQUIRE(taskmgrp != NULL && *taskmgrp == NULL);
        isc_taskmgr_create(*mctxp, *loopmgrp, taskmgrp);
        INSIST(*taskmgrp != NULL);
+
+       isc_rwlock_setworkers(workers);
 }
 
 void
index 62bdbf1a625fc1543866c14d1dcb99c4391336c1..ae4fc388a02d4ee89cca884962de46b76007edf1 100644 (file)
  * information regarding copyright ownership.
  */
 
+/*
+ * Modified C-RW-WP Implementation from NUMA-Aware Reader-Writer Locks paper:
+ * http://dl.acm.org/citation.cfm?id=2442532
+ *
+ * This work is based on C++ code available from
+ * https://github.com/pramalhe/ConcurrencyFreaks/
+ *
+ * Copyright (c) 2014-2016, Pedro Ramalhete, Andreia Correia
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in the
+ *       documentation and/or other materials provided with the distribution.
+ *     * Neither the name of Concurrency Freaks nor the
+ *       names of its contributors may be used to endorse or promote products
+ *       derived from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
+ * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+ * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER>
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
 /*! \file */
 
 #include <inttypes.h>
 #include <stdbool.h>
 #include <stddef.h>
+#include <stdlib.h>
+#include <unistd.h>
 
 #include <isc/atomic.h>
-#include <isc/magic.h>
+#include <isc/hash.h>
 #include <isc/pause.h>
-#include <isc/print.h>
 #include <isc/rwlock.h>
+#include <isc/thread.h>
+#include <isc/tid.h>
 #include <isc/util.h>
 
-#define RWLOCK_MAGIC     ISC_MAGIC('R', 'W', 'L', 'k')
-#define VALID_RWLOCK(rwl) ISC_MAGIC_VALID(rwl, RWLOCK_MAGIC)
-
-#ifndef RWLOCK_DEFAULT_READ_QUOTA
-#define RWLOCK_DEFAULT_READ_QUOTA 4
-#endif /* ifndef RWLOCK_DEFAULT_READ_QUOTA */
+static atomic_uint_fast16_t isc__crwlock_workers = 128;
 
-#ifndef RWLOCK_DEFAULT_WRITE_QUOTA
-#define RWLOCK_DEFAULT_WRITE_QUOTA 4
-#endif /* ifndef RWLOCK_DEFAULT_WRITE_QUOTA */
+#define ISC_RWLOCK_UNLOCKED false
+#define ISC_RWLOCK_LOCKED   true
 
-#ifndef RWLOCK_MAX_ADAPTIVE_COUNT
-#define RWLOCK_MAX_ADAPTIVE_COUNT 100
-#endif /* ifndef RWLOCK_MAX_ADAPTIVE_COUNT */
+/*
+ * See https://csce.ucmss.com/cr/books/2017/LFS/CSREA2017/FCS3701.pdf for
+ * guidance on patience level
+ */
+#ifndef RWLOCK_MAX_READER_PATIENCE
+#define RWLOCK_MAX_READER_PATIENCE 500
+#endif /* ifndef RWLOCK_MAX_READER_PATIENCE */
 
-#ifdef ISC_RWLOCK_TRACE
-#include <stdio.h> /* Required for fprintf/stderr. */
+static void
+read_indicator_wait_until_empty(isc_rwlock_t *rwl);
 
-#include <isc/thread.h> /* Required for isc_thread_self(). */
+#include <stdio.h>
 
 static void
-print_lock(const char *operation, isc__rwlock_t *rwl, isc_rwlocktype_t type) {
-       fprintf(stderr,
-               "rwlock %p thread %" PRIuPTR " %s(%s): "
-               "write_requests=%u, write_completions=%u, "
-               "cnt_and_flag=0x%x, readers_waiting=%u, "
-               "write_granted=%u, write_quota=%u\n",
-               rwl, isc_thread_self(), operation,
-               (type == isc_rwlocktype_read ? "read" : "write"),
-               atomic_load_acquire(&rwl->write_requests),
-               atomic_load_acquire(&rwl->write_completions),
-               atomic_load_acquire(&rwl->cnt_and_flag), rwl->readers_waiting,
-               atomic_load_acquire(&rwl->write_granted), rwl->write_quota);
+read_indicator_arrive(isc_rwlock_t *rwl) {
+       (void)atomic_fetch_add_release(&rwl->readers_ingress, 1);
 }
-#endif /* ISC_RWLOCK_TRACE */
-
-void
-isc__rwlock_init(isc__rwlock_t *rwl, unsigned int read_quota,
-                unsigned int write_quota) {
-       REQUIRE(rwl != NULL);
 
-       /*
-        * In case there's trouble initializing, we zero magic now.  If all
-        * goes well, we'll set it to RWLOCK_MAGIC.
-        */
-       rwl->magic = 0;
-
-       atomic_init(&rwl->spins, 0);
-       atomic_init(&rwl->write_requests, 0);
-       atomic_init(&rwl->write_completions, 0);
-       atomic_init(&rwl->cnt_and_flag, 0);
-       rwl->readers_waiting = 0;
-       atomic_init(&rwl->write_granted, 0);
-       if (read_quota != 0) {
-               UNEXPECTED_ERROR("read quota is not supported");
-       }
-       if (write_quota == 0) {
-               write_quota = RWLOCK_DEFAULT_WRITE_QUOTA;
-       }
-       rwl->write_quota = write_quota;
+static void
+read_indicator_depart(isc_rwlock_t *rwl) {
+       (void)atomic_fetch_add_release(&rwl->readers_egress, 1);
+}
 
-       isc_mutex_init(&rwl->lock);
+static bool
+read_indicator_isempty(isc_rwlock_t *rwl) {
+       return (atomic_load_acquire(&rwl->readers_egress) ==
+               atomic_load_acquire(&rwl->readers_ingress));
+}
 
-       isc_condition_init(&rwl->readable);
-       isc_condition_init(&rwl->writeable);
+static void
+writers_barrier_raise(isc_rwlock_t *rwl) {
+       (void)atomic_fetch_add_release(&rwl->writers_barrier, 1);
+}
 
-       rwl->magic = RWLOCK_MAGIC;
+static void
+writers_barrier_lower(isc_rwlock_t *rwl) {
+       (void)atomic_fetch_sub_release(&rwl->writers_barrier, 1);
 }
 
-void
-isc__rwlock_destroy(isc__rwlock_t *rwl) {
-       REQUIRE(VALID_RWLOCK(rwl));
-
-       REQUIRE(atomic_load_acquire(&rwl->write_requests) ==
-                       atomic_load_acquire(&rwl->write_completions) &&
-               atomic_load_acquire(&rwl->cnt_and_flag) == 0 &&
-               rwl->readers_waiting == 0);
-
-       rwl->magic = 0;
-       isc_condition_destroy(&rwl->readable);
-       isc_condition_destroy(&rwl->writeable);
-       isc_mutex_destroy(&rwl->lock);
+static bool
+writers_barrier_israised(isc_rwlock_t *rwl) {
+       return (atomic_load_acquire(&rwl->writers_barrier) > 0);
 }
 
-/*
- * When some architecture-dependent atomic operations are available,
- * rwlock can be more efficient than the generic algorithm defined below.
- * The basic algorithm is described in the following URL:
- *   http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/rw.html
- *
- * The key is to use the following integer variables modified atomically:
- *   write_requests, write_completions, and cnt_and_flag.
- *
- * write_requests and write_completions act as a waiting queue for writers
- * in order to ensure the FIFO order.  Both variables begin with the initial
- * value of 0.  When a new writer tries to get a write lock, it increments
- * write_requests and gets the previous value of the variable as a "ticket".
- * When write_completions reaches the ticket number, the new writer can start
- * writing.  When the writer completes its work, it increments
- * write_completions so that another new writer can start working.  If the
- * write_requests is not equal to write_completions, it means a writer is now
- * working or waiting.  In this case, a new readers cannot start reading, or
- * in other words, this algorithm basically prefers writers.
- *
- * cnt_and_flag is a "lock" shared by all readers and writers.  This integer
- * variable is a kind of structure with two members: writer_flag (1 bit) and
- * reader_count (31 bits).  The writer_flag shows whether a writer is working,
- * and the reader_count shows the number of readers currently working or almost
- * ready for working.  A writer who has the current "ticket" tries to get the
- * lock by exclusively setting the writer_flag to 1, provided that the whole
- * 32-bit is 0 (meaning no readers or writers working).  On the other hand,
- * a new reader tries to increment the "reader_count" field provided that
- * the writer_flag is 0 (meaning there is no writer working).
- *
- * If some of the above operations fail, the reader or the writer sleeps
- * until the related condition changes.  When a working reader or writer
- * completes its work, some readers or writers are sleeping, and the condition
- * that suspended the reader or writer has changed, it wakes up the sleeping
- * readers or writers.
- *
- * As already noted, this algorithm basically prefers writers.  In order to
- * prevent readers from starving, however, the algorithm also introduces the
- * "writer quota" (Q).  When Q consecutive writers have completed their work,
- * suspending readers, the last writer will wake up the readers, even if a new
- * writer is waiting.
- *
- * Implementation specific note: due to the combination of atomic operations
- * and a mutex lock, ordering between the atomic operation and locks can be
- * very sensitive in some cases.  In particular, it is generally very important
- * to check the atomic variable that requires a reader or writer to sleep after
- * locking the mutex and before actually sleeping; otherwise, it could be very
- * likely to cause a deadlock.  For example, assume "var" is a variable
- * atomically modified, then the corresponding code would be:
- *     if (var == need_sleep) {
- *             LOCK(lock);
- *             if (var == need_sleep)
- *                     WAIT(cond, lock);
- *             UNLOCK(lock);
- *     }
- * The second check is important, since "var" is protected by the atomic
- * operation, not by the mutex, and can be changed just before sleeping.
- * (The first "if" could be omitted, but this is also important in order to
- * make the code efficient by avoiding the use of the mutex unless it is
- * really necessary.)
- */
+static bool
+writers_lock_islocked(isc_rwlock_t *rwl) {
+       return (atomic_load_acquire(&rwl->writers_lock) == ISC_RWLOCK_LOCKED);
+}
 
-#define WRITER_ACTIVE 0x1
-#define READER_INCR   0x2
+static bool
+writers_lock_acquire(isc_rwlock_t *rwl) {
+       return (atomic_compare_exchange_weak_acq_rel(
+               &rwl->writers_lock, &(bool){ ISC_RWLOCK_UNLOCKED },
+               ISC_RWLOCK_LOCKED));
+}
 
 static void
-rwlock_lock(isc__rwlock_t *rwl, isc_rwlocktype_t type) {
-       int32_t cntflag;
-
-       REQUIRE(VALID_RWLOCK(rwl));
-
-#ifdef ISC_RWLOCK_TRACE
-       print_lock("prelock", rwl, type);
-#endif /* ifdef ISC_RWLOCK_TRACE */
-
-       if (type == isc_rwlocktype_read) {
-               if (atomic_load_acquire(&rwl->write_requests) !=
-                   atomic_load_acquire(&rwl->write_completions))
-               {
-                       /* there is a waiting or active writer */
-                       LOCK(&rwl->lock);
-                       if (atomic_load_acquire(&rwl->write_requests) !=
-                           atomic_load_acquire(&rwl->write_completions))
-                       {
-                               rwl->readers_waiting++;
-                               WAIT(&rwl->readable, &rwl->lock);
-                               rwl->readers_waiting--;
-                       }
-                       UNLOCK(&rwl->lock);
-               }
+writers_lock_release(isc_rwlock_t *rwl) {
+       REQUIRE(atomic_compare_exchange_strong_acq_rel(
+               &rwl->writers_lock, &(bool){ ISC_RWLOCK_LOCKED },
+               ISC_RWLOCK_UNLOCKED));
+}
 
-               cntflag = atomic_fetch_add_release(&rwl->cnt_and_flag,
-                                                  READER_INCR);
-               POST(cntflag);
-               while (1) {
-                       if ((atomic_load_acquire(&rwl->cnt_and_flag) &
-                            WRITER_ACTIVE) == 0)
-                       {
-                               break;
-                       }
+#define ran_out_of_patience(cnt) (cnt >= RWLOCK_MAX_READER_PATIENCE)
 
-                       /* A writer is still working */
-                       LOCK(&rwl->lock);
-                       rwl->readers_waiting++;
-                       if ((atomic_load_acquire(&rwl->cnt_and_flag) &
-                            WRITER_ACTIVE) != 0)
-                       {
-                               WAIT(&rwl->readable, &rwl->lock);
-                       }
-                       rwl->readers_waiting--;
-                       UNLOCK(&rwl->lock);
-
-                       /*
-                        * Typically, the reader should be able to get a lock
-                        * at this stage:
-                        *   (1) there should have been no pending writer when
-                        *       the reader was trying to increment the
-                        *       counter; otherwise, the writer should be in
-                        *       the waiting queue, preventing the reader from
-                        *       proceeding to this point.
-                        *   (2) once the reader increments the counter, no
-                        *       more writer can get a lock.
-                        * Still, it is possible another writer can work at
-                        * this point, e.g. in the following scenario:
-                        *   A previous writer unlocks the writer lock.
-                        *   This reader proceeds to point (1).
-                        *   A new writer appears, and gets a new lock before
-                        *   the reader increments the counter.
-                        *   The reader then increments the counter.
-                        *   The previous writer notices there is a waiting
-                        *   reader who is almost ready, and wakes it up.
-                        * So, the reader needs to confirm whether it can now
-                        * read explicitly (thus we loop).  Note that this is
-                        * not an infinite process, since the reader has
-                        * incremented the counter at this point.
-                        */
-               }
-
-               /*
-                * If we are temporarily preferred to writers due to the writer
-                * quota, reset the condition (race among readers doesn't
-                * matter).
-                */
-               atomic_store_release(&rwl->write_granted, 0);
-       } else {
-               int32_t prev_writer;
-
-               /* enter the waiting queue, and wait for our turn */
-               prev_writer = atomic_fetch_add_release(&rwl->write_requests, 1);
-               while (atomic_load_acquire(&rwl->write_completions) !=
-                      prev_writer)
-               {
-                       LOCK(&rwl->lock);
-                       if (atomic_load_acquire(&rwl->write_completions) !=
-                           prev_writer)
-                       {
-                               WAIT(&rwl->writeable, &rwl->lock);
-                               UNLOCK(&rwl->lock);
-                               continue;
-                       }
-                       UNLOCK(&rwl->lock);
+void
+isc_rwlock_rdlock(isc_rwlock_t *rwl) {
+       uint32_t cnt = 0;
+       bool barrier_raised = false;
+
+       while (true) {
+               read_indicator_arrive(rwl);
+               if (!writers_lock_islocked(rwl)) {
+                       /* Acquired lock in read-only mode */
                        break;
                }
 
-               while (!atomic_compare_exchange_weak_acq_rel(
-                       &rwl->cnt_and_flag, &(int_fast32_t){ 0 },
-                       WRITER_ACTIVE))
-               {
-                       /* Another active reader or writer is working. */
-                       LOCK(&rwl->lock);
-                       if (atomic_load_acquire(&rwl->cnt_and_flag) != 0) {
-                               WAIT(&rwl->writeable, &rwl->lock);
+               /* Writer has acquired the lock, must reset to 0 and wait */
+               read_indicator_depart(rwl);
+
+               while (writers_lock_islocked(rwl)) {
+                       isc_pause();
+                       if (ran_out_of_patience(cnt++) && !barrier_raised) {
+                               writers_barrier_raise(rwl);
+                               barrier_raised = true;
                        }
-                       UNLOCK(&rwl->lock);
                }
+       }
+       if (barrier_raised) {
+               writers_barrier_lower(rwl);
+       }
+}
+
+isc_result_t
+isc_rwlock_tryrdlock(isc_rwlock_t *rwl) {
+       read_indicator_arrive(rwl);
+       if (writers_lock_islocked(rwl)) {
+               /* Writer has acquired the lock, release the read lock */
+               read_indicator_depart(rwl);
 
-               INSIST((atomic_load_acquire(&rwl->cnt_and_flag) &
-                       WRITER_ACTIVE));
-               atomic_fetch_add_release(&rwl->write_granted, 1);
+               return (ISC_R_LOCKBUSY);
        }
 
-#ifdef ISC_RWLOCK_TRACE
-       print_lock("postlock", rwl, type);
-#endif /* ifdef ISC_RWLOCK_TRACE */
+       /* Acquired lock in read-only mode */
+       return (ISC_R_SUCCESS);
 }
 
 void
-isc__rwlock_lock(isc__rwlock_t *rwl, isc_rwlocktype_t type) {
-       int32_t cnt = 0;
-       int32_t spins = atomic_load_acquire(&rwl->spins) * 2 + 10;
-       int32_t max_cnt = ISC_MAX(spins, RWLOCK_MAX_ADAPTIVE_COUNT);
-
-       do {
-               if (cnt++ >= max_cnt) {
-                       rwlock_lock(rwl, type);
-                       break;
-               }
-               isc_pause();
-       } while (isc_rwlock_trylock(rwl, type) != ISC_R_SUCCESS);
-
-       atomic_fetch_add_release(&rwl->spins, (cnt - spins) / 8);
+isc_rwlock_rdunlock(isc_rwlock_t *rwl) {
+       read_indicator_depart(rwl);
 }
 
 isc_result_t
-isc__rwlock_trylock(isc__rwlock_t *rwl, isc_rwlocktype_t type) {
-       int32_t cntflag;
+isc_rwlock_tryupgrade(isc_rwlock_t *rwl) {
+       /* Write Barriers has been raised */
+       if (writers_barrier_israised(rwl)) {
+               return (ISC_R_LOCKBUSY);
+       }
 
-       REQUIRE(VALID_RWLOCK(rwl));
+       /* Try to acquire the write-lock */
+       if (!writers_lock_acquire(rwl)) {
+               return (ISC_R_LOCKBUSY);
+       }
 
-#ifdef ISC_RWLOCK_TRACE
-       print_lock("prelock", rwl, type);
-#endif /* ifdef ISC_RWLOCK_TRACE */
+       /* Unlock the read-lock */
+       read_indicator_depart(rwl);
 
-       if (type == isc_rwlocktype_read) {
-               /* If a writer is waiting or working, we fail. */
-               if (atomic_load_acquire(&rwl->write_requests) !=
-                   atomic_load_acquire(&rwl->write_completions))
-               {
-                       return (ISC_R_LOCKBUSY);
-               }
+       if (!read_indicator_isempty(rwl)) {
+               /* Re-acquire the read-lock back */
+               read_indicator_arrive(rwl);
 
-               /* Otherwise, be ready for reading. */
-               cntflag = atomic_fetch_add_release(&rwl->cnt_and_flag,
-                                                  READER_INCR);
-               if ((cntflag & WRITER_ACTIVE) != 0) {
-                       /*
-                        * A writer is working.  We lose, and cancel the read
-                        * request.
-                        */
-                       cntflag = atomic_fetch_sub_release(&rwl->cnt_and_flag,
-                                                          READER_INCR);
-                       /*
-                        * If no other readers are waiting and we've suspended
-                        * new writers in this short period, wake them up.
-                        */
-                       if (cntflag == READER_INCR &&
-                           atomic_load_acquire(&rwl->write_completions) !=
-                                   atomic_load_acquire(&rwl->write_requests))
-                       {
-                               LOCK(&rwl->lock);
-                               BROADCAST(&rwl->writeable);
-                               UNLOCK(&rwl->lock);
-                       }
+               /* Unlock the write-lock */
+               writers_lock_release(rwl);
+               return (ISC_R_LOCKBUSY);
+       }
+       return (ISC_R_SUCCESS);
+}
 
-                       return (ISC_R_LOCKBUSY);
-               }
-       } else {
-               /* Try locking without entering the waiting queue. */
-               int_fast32_t zero = 0;
-               if (!atomic_compare_exchange_strong_acq_rel(
-                           &rwl->cnt_and_flag, &zero, WRITER_ACTIVE))
-               {
-                       return (ISC_R_LOCKBUSY);
+static void
+read_indicator_wait_until_empty(isc_rwlock_t *rwl) {
+       /* Write-lock was acquired, now wait for running Readers to finish */
+       while (true) {
+               if (read_indicator_isempty(rwl)) {
+                       break;
                }
+               isc_pause();
+       }
+}
 
-               /*
-                * XXXJT: jump into the queue, possibly breaking the writer
-                * order.
-                */
-               atomic_fetch_sub_release(&rwl->write_completions, 1);
-               atomic_fetch_add_release(&rwl->write_granted, 1);
+void
+isc_rwlock_wrlock(isc_rwlock_t *rwl) {
+       /* Write Barriers has been raised, wait */
+       while (writers_barrier_israised(rwl)) {
+               isc_pause();
        }
 
-#ifdef ISC_RWLOCK_TRACE
-       print_lock("postlock", rwl, type);
-#endif /* ifdef ISC_RWLOCK_TRACE */
+       /* Try to acquire the write-lock */
+       while (!writers_lock_acquire(rwl)) {
+               isc_pause();
+       }
 
-       return (ISC_R_SUCCESS);
+       read_indicator_wait_until_empty(rwl);
+}
+
+void
+isc_rwlock_wrunlock(isc_rwlock_t *rwl) {
+       writers_lock_release(rwl);
 }
 
 isc_result_t
-isc__rwlock_tryupgrade(isc__rwlock_t *rwl) {
-       REQUIRE(VALID_RWLOCK(rwl));
-
-       int_fast32_t reader_incr = READER_INCR;
-
-       /* Try to acquire write access. */
-       atomic_compare_exchange_strong_acq_rel(&rwl->cnt_and_flag, &reader_incr,
-                                              WRITER_ACTIVE);
-       /*
-        * There must have been no writer, and there must have
-        * been at least one reader.
-        */
-       INSIST((reader_incr & WRITER_ACTIVE) == 0 &&
-              (reader_incr & ~WRITER_ACTIVE) != 0);
-
-       if (reader_incr == READER_INCR) {
-               /*
-                * We are the only reader and have been upgraded.
-                * Now jump into the head of the writer waiting queue.
-                */
-               atomic_fetch_sub_release(&rwl->write_completions, 1);
-       } else {
+isc_rwlock_trywrlock(isc_rwlock_t *rwl) {
+       /* Write Barriers has been raised */
+       if (writers_barrier_israised(rwl)) {
+               return (ISC_R_LOCKBUSY);
+       }
+
+       /* Try to acquire the write-lock */
+       if (!writers_lock_acquire(rwl)) {
+               return (ISC_R_LOCKBUSY);
+       }
+
+       if (!read_indicator_isempty(rwl)) {
+               /* Unlock the write-lock */
+               writers_lock_release(rwl);
+
                return (ISC_R_LOCKBUSY);
        }
 
@@ -408,74 +259,30 @@ isc__rwlock_tryupgrade(isc__rwlock_t *rwl) {
 }
 
 void
-isc__rwlock_unlock(isc__rwlock_t *rwl, isc_rwlocktype_t type) {
-       int32_t prev_cnt;
-
-       REQUIRE(VALID_RWLOCK(rwl));
-
-#ifdef ISC_RWLOCK_TRACE
-       print_lock("preunlock", rwl, type);
-#endif /* ifdef ISC_RWLOCK_TRACE */
-
-       if (type == isc_rwlocktype_read) {
-               prev_cnt = atomic_fetch_sub_release(&rwl->cnt_and_flag,
-                                                   READER_INCR);
-               /*
-                * If we're the last reader and any writers are waiting, wake
-                * them up.  We need to wake up all of them to ensure the
-                * FIFO order.
-                */
-               if (prev_cnt == READER_INCR &&
-                   atomic_load_acquire(&rwl->write_completions) !=
-                           atomic_load_acquire(&rwl->write_requests))
-               {
-                       LOCK(&rwl->lock);
-                       BROADCAST(&rwl->writeable);
-                       UNLOCK(&rwl->lock);
-               }
-       } else {
-               bool wakeup_writers = true;
-
-               /*
-                * Reset the flag, and (implicitly) tell other writers
-                * we are done.
-                */
-               atomic_fetch_sub_release(&rwl->cnt_and_flag, WRITER_ACTIVE);
-               atomic_fetch_add_release(&rwl->write_completions, 1);
-
-               if ((atomic_load_acquire(&rwl->write_granted) >=
-                    rwl->write_quota) ||
-                   (atomic_load_acquire(&rwl->write_requests) ==
-                    atomic_load_acquire(&rwl->write_completions)) ||
-                   (atomic_load_acquire(&rwl->cnt_and_flag) & ~WRITER_ACTIVE))
-               {
-                       /*
-                        * We have passed the write quota, no writer is
-                        * waiting, or some readers are almost ready, pending
-                        * possible writers.  Note that the last case can
-                        * happen even if write_requests != write_completions
-                        * (which means a new writer in the queue), so we need
-                        * to catch the case explicitly.
-                        */
-                       LOCK(&rwl->lock);
-                       if (rwl->readers_waiting > 0) {
-                               wakeup_writers = false;
-                               BROADCAST(&rwl->readable);
-                       }
-                       UNLOCK(&rwl->lock);
-               }
+isc_rwlock_downgrade(isc_rwlock_t *rwl) {
+       read_indicator_arrive(rwl);
 
-               if ((atomic_load_acquire(&rwl->write_requests) !=
-                    atomic_load_acquire(&rwl->write_completions)) &&
-                   wakeup_writers)
-               {
-                       LOCK(&rwl->lock);
-                       BROADCAST(&rwl->writeable);
-                       UNLOCK(&rwl->lock);
-               }
-       }
+       writers_lock_release(rwl);
+}
+
+void
+isc_rwlock_init(isc_rwlock_t *rwl) {
+       REQUIRE(rwl != NULL);
 
-#ifdef ISC_RWLOCK_TRACE
-       print_lock("postunlock", rwl, type);
-#endif /* ifdef ISC_RWLOCK_TRACE */
+       atomic_init(&rwl->writers_lock, ISC_RWLOCK_UNLOCKED);
+       atomic_init(&rwl->writers_barrier, 0);
+       atomic_init(&rwl->readers_ingress, 0);
+       atomic_init(&rwl->readers_egress, 0);
+}
+
+void
+isc_rwlock_destroy(isc_rwlock_t *rwl) {
+       /* Check whether write lock has been unlocked */
+       REQUIRE(atomic_load(&rwl->writers_lock) == ISC_RWLOCK_UNLOCKED);
+       REQUIRE(read_indicator_isempty(rwl));
+}
+
+void
+isc_rwlock_setworkers(uint16_t workers) {
+       atomic_store(&isc__crwlock_workers, workers);
 }
index 8c4768c46b133adec769dcd7f86a10869e8affa4..569a041a8da826f7e4d3c55e1c3bfb255af6a979 100644 (file)
@@ -1186,7 +1186,7 @@ isc_tlsctx_cache_create(isc_mem_t *mctx, isc_tlsctx_cache_t **cachep) {
        isc_mem_attach(mctx, &nc->mctx);
 
        isc_ht_init(&nc->data, mctx, 5, ISC_HT_CASE_SENSITIVE);
-       isc_rwlock_init(&nc->rwlock, 0, 0);
+       isc_rwlock_init(&nc->rwlock);
 
        *cachep = nc;
 }