]> git.ipfire.org Git - thirdparty/gcc.git/commitdiff
libitm: Fixed conversion to C++11 atomics.
authorTorvald Riegel <triegel@redhat.com>
Sat, 24 Dec 2011 01:42:20 +0000 (01:42 +0000)
committerTorvald Riegel <torvald@gcc.gnu.org>
Sat, 24 Dec 2011 01:42:20 +0000 (01:42 +0000)
libitm/
* beginend.cc (GTM::gtm_thread::begin_transaction): Add comment.
(GTM::gtm_thread::try_commit): Changed memory order.
* config/linux/alpha/futex_bits.h (sys_futex0): Take atomic int
as parameter.
* config/linux/x86/futex_bits.h (sys_futex0): Same.
* config/linux/sh/futex_bits.h (sys_futex0): Same.
* config/linux/futex_bits.h (sys_futex0): Same.
* config/linux/futex.cc (futex_wait, futex_wake): Same.
* config/linux/futex.h (futex_wait, futex_wake): Same.
* config/linux/rwlock.h (gtm_rwlock::writers,
gtm_rwlock::writer_readers, gtm_rwlock::readers): Change to atomic
ints.
* config/linux/rwlock.cc (gtm_rwlock::read_lock,
gtm_rwlock::write_lock_generic, gtm_rwlock::read_unlock,
gtm_rwlock::write_unlock): Fix memory orders and fences.
* config/posix/rwlock.cc (gtm_rwlock::read_lock,
gtm_rwlock::write_lock_generic, gtm_rwlock::read_unlock,
gtm_rwlock::write_unlock): Same.
* config/linux/rwlock.h (gtm_rwlock::summary): Change to atomic int.
* method-gl.cc (gl_mg::init, gl_wt_dispatch::memtransfer_static,
gl_wt_dispatch::memset_static, gl_wt_dispatch::begin_or_restart):
Add comments.
(gl_wt_dispatch::pre_write, gl_wt_dispatch::validate,
gl_wt_dispatch::load, gl_wt_dispatch::store,
gl_wt_dispatch::try_commit, gl_wt_dispatch::rollback): Fix memory
orders and fences.  Add comments.

From-SVN: r182674

13 files changed:
libitm/ChangeLog
libitm/beginend.cc
libitm/config/linux/alpha/futex_bits.h
libitm/config/linux/futex.cc
libitm/config/linux/futex.h
libitm/config/linux/futex_bits.h
libitm/config/linux/rwlock.cc
libitm/config/linux/rwlock.h
libitm/config/linux/sh/futex_bits.h
libitm/config/linux/x86/futex_bits.h
libitm/config/posix/rwlock.cc
libitm/config/posix/rwlock.h
libitm/method-gl.cc

index 95969826dae357833bb56838ac6733b0ca9cfbf3..9a835e2860b20d296659496cfce5281290b722a5 100644 (file)
@@ -1,3 +1,32 @@
+2011-12-24  Torvald Riegel  <triegel@redhat.com>
+
+       * beginend.cc (GTM::gtm_thread::begin_transaction): Add comment.
+       (GTM::gtm_thread::try_commit): Changed memory order.
+       * config/linux/alpha/futex_bits.h (sys_futex0): Take atomic int
+       as parameter.
+       * config/linux/x86/futex_bits.h (sys_futex0): Same.
+       * config/linux/sh/futex_bits.h (sys_futex0): Same.
+       * config/linux/futex_bits.h (sys_futex0): Same.
+       * config/linux/futex.cc (futex_wait, futex_wake): Same.
+       * config/linux/futex.h (futex_wait, futex_wake): Same.
+       * config/linux/rwlock.h (gtm_rwlock::writers,
+       gtm_rwlock::writer_readers, gtm_rwlock::readers): Change to atomic
+       ints.
+       * config/linux/rwlock.cc (gtm_rwlock::read_lock,
+       gtm_rwlock::write_lock_generic, gtm_rwlock::read_unlock,
+       gtm_rwlock::write_unlock): Fix memory orders and fences.
+       * config/posix/rwlock.cc (gtm_rwlock::read_lock,
+       gtm_rwlock::write_lock_generic, gtm_rwlock::read_unlock,
+       gtm_rwlock::write_unlock): Same.
+       * config/linux/rwlock.h (gtm_rwlock::summary): Change to atomic int.
+       * method-gl.cc (gl_mg::init, gl_wt_dispatch::memtransfer_static,
+       gl_wt_dispatch::memset_static, gl_wt_dispatch::begin_or_restart):
+       Add comments.
+       (gl_wt_dispatch::pre_write, gl_wt_dispatch::validate,
+       gl_wt_dispatch::load, gl_wt_dispatch::store,
+       gl_wt_dispatch::try_commit, gl_wt_dispatch::rollback): Fix memory
+       orders and fences.  Add comments.
+
 2011-12-21  Jakub Jelinek  <jakub@redhat.com>
 
        * Makefile.am (AM_CXXFLAGS): Put $(XCFLAGS) first.
index bcc8516be05de2211bae45f03e1cb7a79eccc04b..d0ad5a7fc2a9901279cbd69b47c13be5c185887a 100644 (file)
@@ -259,6 +259,9 @@ GTM::gtm_thread::begin_transaction (uint32_t prop, const gtm_jmpbuf *jb)
   else
     {
 #ifdef HAVE_64BIT_SYNC_BUILTINS
+      // We don't really care which block of TIDs we get but only that we
+      // acquire one atomically; therefore, relaxed memory order is
+      // sufficient.
       tx->id = global_tid.fetch_add(tid_block_size, memory_order_relaxed);
       tx->local_tid = tx->id + 1;
 #else
@@ -471,17 +474,28 @@ GTM::gtm_thread::trycommit ()
       // Ensure privatization safety, if necessary.
       if (priv_time)
        {
+          // There must be a seq_cst fence between the following loads of the
+          // other transactions' shared_state and the dispatch-specific stores
+          // that signal updates by this transaction (e.g., lock
+          // acquisitions).  This ensures that if we read prior to other
+          // reader transactions setting their shared_state to 0, then those
+          // readers will observe our updates.  We can reuse the seq_cst fence
+          // in serial_lock.read_unlock() however, so we don't need another
+          // one here.
          // TODO Don't just spin but also block using cond vars / futexes
          // here. Should probably be integrated with the serial lock code.
-         // TODO For C++0x atomics, the loads of other threads' shared_state
-         // should have acquire semantics (together with releases for the
-         // respective updates). But is this unnecessary overhead because
-         // weaker barriers are sufficient?
          for (gtm_thread *it = gtm_thread::list_of_threads; it != 0;
              it = it->next_thread)
            {
              if (it == this) continue;
-             while (it->shared_state.load(memory_order_relaxed) < priv_time)
+             // We need to load other threads' shared_state using acquire
+             // semantics (matching the release semantics of the respective
+             // updates).  This is necessary to ensure that the other
+             // threads' memory accesses happen before our actions that
+             // assume privatization safety.
+             // TODO Are there any platform-specific optimizations (e.g.,
+             // merging barriers)?
+             while (it->shared_state.load(memory_order_acquire) < priv_time)
                cpu_relax();
            }
        }
index 997bf0b3cff55e6115fded717ebbc56fb63ab77a..ef5c10a5c1a66d9bf47d7eda8dcfea0f125c6bb6 100644 (file)
@@ -29,7 +29,7 @@
 #endif
 
 static inline long
-sys_futex0 (int *addr, long op, long val)
+sys_futex0 (std::atomic<int> *addr, long op, long val)
 {
   register long sc_0 __asm__("$0");
   register long sc_16 __asm__("$16");
index 0889ee5dfeceaf95d39b10de0a8df20c0320afd3..5707b40c26298e4a46cd860a17da85f0ff99c240 100644 (file)
@@ -41,7 +41,7 @@ static long int gtm_futex_wake = FUTEX_WAKE | FUTEX_PRIVATE_FLAG;
 
 
 void
-futex_wait (int *addr, int val)
+futex_wait (std::atomic<int> *addr, int val)
 {
   long res;
 
@@ -65,7 +65,7 @@ futex_wait (int *addr, int val)
 
 
 long
-futex_wake (int *addr, int count)
+futex_wake (std::atomic<int> *addr, int count)
 {
   long res = sys_futex0 (addr, gtm_futex_wake, count);
   if (__builtin_expect (res == -ENOSYS, 0))
index 00161b47431ce7145079450dcbbaef8b6305eed2..02bf5d0e323a2afcefe1ce7500510397c2d2304f 100644 (file)
 #ifndef GTM_FUTEX_H
 #define GTM_FUTEX_H 1
 
+#include "local_atomic"
+
 namespace GTM HIDDEN {
 
-extern void futex_wait (int *addr, int val);
-extern long futex_wake (int *addr, int count);
+extern void futex_wait (std::atomic<int> *addr, int val);
+extern long futex_wake (std::atomic<int> *addr, int count);
 
 }
 
index bbc336604e9202cc8f32621d9bcc33dc42c90924..9f6be72e3c4c1a2cd05e2d2fe6c8062352a402c6 100644 (file)
@@ -33,7 +33,7 @@
 #include <sys/syscall.h>
 
 static inline long
-sys_futex0 (int *addr, long op, long val)
+sys_futex0 (std::atomic<int> *addr, long op, long val)
 {
-  return syscall (SYS_futex, addr, op, val, 0);
+  return syscall (SYS_futex, (int*) addr, op, val, 0);
 }
index 3471049083e03e3e1bee18ad6fa56eaac0e0adad..f87be2e880a6ddb4046b56c8bd6367b84e3d09dc 100644 (file)
@@ -36,10 +36,11 @@ gtm_rwlock::read_lock (gtm_thread *tx)
   for (;;)
     {
       // Fast path: first announce our intent to read, then check for
-      // conflicting intents to write.  Note that direct assignment to
-      // an atomic object is memory_order_seq_cst.
-      tx->shared_state = 0;
-      if (likely(writers == 0))
+      // conflicting intents to write.  The fence ensures that this happens
+      // in exactly this order.
+      tx->shared_state.store (0, memory_order_relaxed);
+      atomic_thread_fence (memory_order_seq_cst);
+      if (likely (writers.load (memory_order_relaxed) == 0))
        return;
 
       // There seems to be an active, waiting, or confirmed writer, so enter
@@ -50,10 +51,11 @@ gtm_rwlock::read_lock (gtm_thread *tx)
       // We need the barrier here for the same reason that we need it in
       // read_unlock().
       // TODO Potentially too many wake-ups. See comments in read_unlock().
-      tx->shared_state = -1;
-      if (writer_readers > 0)
+      tx->shared_state.store (-1, memory_order_relaxed);
+      atomic_thread_fence (memory_order_seq_cst);
+      if (writer_readers.load (memory_order_relaxed) > 0)
        {
-         writer_readers = 0;
+         writer_readers.store (0, memory_order_relaxed);
          futex_wake(&writer_readers, 1);
        }
 
@@ -61,16 +63,16 @@ gtm_rwlock::read_lock (gtm_thread *tx)
       // writer anymore.
       // TODO Spin here on writers for a while. Consider whether we woke
       // any writers before?
-      while (writers)
+      while (writers.load (memory_order_relaxed))
        {
          // An active writer. Wait until it has finished. To avoid lost
          // wake-ups, we need to use Dekker-like synchronization.
          // Note that we cannot reset readers to zero when we see that there
          // are no writers anymore after the barrier because this pending
          // store could then lead to lost wake-ups at other readers.
-         readers = 1;
-         atomic_thread_fence(memory_order_acq_rel);
-         if (writers)
+         readers.store (1, memory_order_relaxed);
+         atomic_thread_fence (memory_order_seq_cst);
+         if (writers.load (memory_order_relaxed))
            futex_wait(&readers, 1);
        }
 
@@ -95,8 +97,8 @@ bool
 gtm_rwlock::write_lock_generic (gtm_thread *tx)
 {
   // Try to acquire the write lock.
-  unsigned int w;
-  if (unlikely((w = __sync_val_compare_and_swap(&writers, 0, 1)) != 0))
+  int w = 0;
+  if (unlikely (!writers.compare_exchange_strong (w, 1)))
     {
       // If this is an upgrade, we must not wait for other writers or
       // upgrades.
@@ -104,19 +106,14 @@ gtm_rwlock::write_lock_generic (gtm_thread *tx)
        return false;
 
       // There is already a writer. If there are no other waiting writers,
-      // switch to contended mode.
-      // Note that this is actually an atomic exchange, not a TAS. Also,
-      // it's only guaranteed to have acquire semantics, whereas we need a
-      // full barrier to make the Dekker-style synchronization work. However,
-      // we rely on the xchg being a full barrier on the architectures that we
-      // consider here.
-      // ??? Use C++0x atomics as soon as they are available.
+      // switch to contended mode.  We need seq_cst memory order to make the
+      // Dekker-style synchronization work.
       if (w != 2)
-       w = __sync_lock_test_and_set(&writers, 2);
+       w = writers.exchange (2);
       while (w != 0)
        {
          futex_wait(&writers, 2);
-         w = __sync_lock_test_and_set(&writers, 2);
+         w = writers.exchange (2);
        }
     }
 
@@ -130,13 +127,14 @@ gtm_rwlock::write_lock_generic (gtm_thread *tx)
   // TODO In the worst case, this requires one wait/wake pair for each
   // active reader. Reduce this!
   if (tx != 0)
-    tx->shared_state = ~(typeof tx->shared_state)0;
+    tx->shared_state.store (-1, memory_order_relaxed);
 
   for (gtm_thread *it = gtm_thread::list_of_threads; it != 0;
       it = it->next_thread)
     {
       // Use a loop here to check reader flags again after waiting.
-      while (it->shared_state != ~(typeof it->shared_state)0)
+      while (it->shared_state.load (memory_order_relaxed)
+          != ~(typeof it->shared_state)0)
        {
          // An active reader. Wait until it has finished. To avoid lost
          // wake-ups, we need to use Dekker-like synchronization.
@@ -144,12 +142,13 @@ gtm_rwlock::write_lock_generic (gtm_thread *tx)
          // the barrier that the reader has finished in the meantime;
          // however, this is only possible because we are the only writer.
          // TODO Spin for a while on this reader flag.
-         writer_readers = 1;
-         __sync_synchronize();
-         if (it->shared_state != ~(typeof it->shared_state)0)
+         writer_readers.store (1, memory_order_relaxed);
+         atomic_thread_fence (memory_order_seq_cst);
+         if (it->shared_state.load (memory_order_relaxed)
+             != ~(typeof it->shared_state)0)
            futex_wait(&writer_readers, 1);
          else
-           writer_readers = 0;
+           writer_readers.store (0, memory_order_relaxed);
        }
     }
 
@@ -181,19 +180,28 @@ gtm_rwlock::write_upgrade (gtm_thread *tx)
 void
 gtm_rwlock::read_unlock (gtm_thread *tx)
 {
-  tx->shared_state = ~(typeof tx->shared_state)0;
-
-  // If there is a writer waiting for readers, wake it up. We need the barrier
-  // to avoid lost wake-ups.
+  // We only need release memory order here because of privatization safety
+  // (this ensures that marking the transaction as inactive happens after
+  // any prior data accesses by this transaction, and that neither the
+  // compiler nor the hardware order this store earlier).
+  // ??? We might be able to avoid this release here if the compiler can't
+  // merge the release fence with the subsequent seq_cst fence.
+  tx->shared_state.store (-1, memory_order_release);
+
+  // If there is a writer waiting for readers, wake it up.  We need the fence
+  // to avoid lost wake-ups.  Furthermore, the privatization safety
+  // implementation in gtm_thread::try_commit() relies on the existence of
+  // this seq_cst fence.
   // ??? We might not be the last active reader, so the wake-up might happen
   // too early. How do we avoid this without slowing down readers too much?
   // Each reader could scan the list of txns for other active readers but
   // this can result in many cache misses. Use combining instead?
   // TODO Sends out one wake-up for each reader in the worst case.
-  __sync_synchronize();
-  if (unlikely(writer_readers > 0))
+  atomic_thread_fence (memory_order_seq_cst);
+  if (unlikely (writer_readers.load (memory_order_relaxed) > 0))
     {
-      writer_readers = 0;
+      // No additional barrier needed here (see write_unlock()).
+      writer_readers.store (0, memory_order_relaxed);
       futex_wake(&writer_readers, 1);
     }
 }
@@ -204,11 +212,11 @@ gtm_rwlock::read_unlock (gtm_thread *tx)
 void
 gtm_rwlock::write_unlock ()
 {
-  // This is supposed to be a full barrier.
-  if (__sync_fetch_and_sub(&writers, 1) == 2)
+  // This needs to have seq_cst memory order.
+  if (writers.fetch_sub (1) == 2)
     {
       // There might be waiting writers, so wake them.
-      writers = 0;
+      writers.store (0, memory_order_relaxed);
       if (futex_wake(&writers, 1) == 0)
        {
          // If we did not wake any waiting writers, we might indeed be the
@@ -223,9 +231,13 @@ gtm_rwlock::write_unlock ()
   // No waiting writers, so wake up all waiting readers.
   // Because the fetch_and_sub is a full barrier already, we don't need
   // another barrier here (as in read_unlock()).
-  if (readers > 0)
+  if (readers.load (memory_order_relaxed) > 0)
     {
-      readers = 0;
+      // No additional barrier needed here.  The previous load must be in
+      // modification order because of the coherency constraints.  Late stores
+      // by a reader are not a problem because readers do Dekker-style
+      // synchronization on writers.
+      readers.store (0, memory_order_relaxed);
       futex_wake(&readers, INT_MAX);
     }
 }
index 7e6229b73f2926b92da4c138383c545d07f16f14..e5a53c054f6095bdfde3c9346fe50aaef642fe7c 100644 (file)
@@ -25,6 +25,7 @@
 #ifndef GTM_RWLOCK_H
 #define GTM_RWLOCK_H
 
+#include "local_atomic"
 #include "common.h"
 
 namespace GTM HIDDEN {
@@ -42,9 +43,9 @@ struct gtm_thread;
 class gtm_rwlock
 {
   // TODO Put futexes on different cachelines?
-  int writers;          // Writers' futex.
-  int writer_readers;   // A confirmed writer waits here for readers.
-  int readers;          // Readers wait here for writers (iff true).
+  std::atomic<int> writers;       // Writers' futex.
+  std::atomic<int> writer_readers;// A confirmed writer waits here for readers.
+  std::atomic<int> readers;       // Readers wait here for writers (iff true).
 
  public:
   gtm_rwlock() : writers(0), writer_readers(0), readers(0) {};
index f9381ed6cda6990b85d8ce87b429ff4dbf9a517f..cc85367f72bd4df9a914f0f6700f57cb469d8f1d 100644 (file)
@@ -32,7 +32,7 @@
        trapa #0x14; or r0,r0; or r0,r0; or r0,r0; or r0,r0; or r0,r0"
 
 static inline long
-sys_futex0 (int *addr, long op, long val)
+sys_futex0 (std::atomic<int> *addr, long op, long val)
 {
   int __status;
   register long __r3 asm ("r3") = SYS_futex;
index 9a6b102cd7afab2b17f64538a028a496b7706338..732c4ea96b52a405009fb7ef77aea5cfd85a0037 100644 (file)
@@ -28,7 +28,7 @@
 # endif
 
 static inline long
-sys_futex0 (int *addr, long op, long val)
+sys_futex0 (std::atomic<int> *addr, long op, long val)
 {
   register long r10 __asm__("%r10") = 0;
   long res;
@@ -49,7 +49,7 @@ sys_futex0 (int *addr, long op, long val)
 # ifdef __PIC__
 
 static inline long
-sys_futex0 (int *addr, int op, int val)
+sys_futex0 (std::atomic<int> *addr, int op, int val)
 {
   long res;
 
@@ -66,7 +66,7 @@ sys_futex0 (int *addr, int op, int val)
 # else
 
 static inline long
-sys_futex0 (int *addr, int op, int val)
+sys_futex0 (std::atomic<int> *addr, int op, int val)
 {
   long res;
 
index 2464f041c5a10cbb1ad1b3617e2edbf7357f18bf..7ef39982ccf064b0fb0a785490d8fe3cf3789294 100644 (file)
@@ -53,10 +53,11 @@ void
 gtm_rwlock::read_lock (gtm_thread *tx)
 {
   // Fast path: first announce our intent to read, then check for conflicting
-  // intents to write.  Note that direct assignment to an atomic object
-  // is memory_order_seq_cst.
-  tx->shared_state = 0;
-  unsigned int sum = this->summary;
+  // intents to write.  The fence ensure that this happens in exactly this
+  // order.
+  tx->shared_state.store (0, memory_order_relaxed);
+  atomic_thread_fence (memory_order_seq_cst);
+  unsigned int sum = this->summary.load (memory_order_relaxed);
   if (likely(!(sum & (a_writer | w_writer))))
     return;
 
@@ -74,7 +75,7 @@ gtm_rwlock::read_lock (gtm_thread *tx)
 
   // Read summary again after acquiring the mutex because it might have
   // changed during waiting for the mutex to become free.
-  sum = this->summary;
+  sum = this->summary.load (memory_order_relaxed);
 
   // If there is a writer waiting for readers, wake it up. Only do that if we
   // might be the last reader that could do the wake-up, otherwise skip the
@@ -91,10 +92,10 @@ gtm_rwlock::read_lock (gtm_thread *tx)
   // If there is an active or waiting writer, we must wait.
   while (sum & (a_writer | w_writer))
     {
-      this->summary = sum | w_reader;
+      this->summary.store (sum | w_reader, memory_order_relaxed);
       this->w_readers++;
       pthread_cond_wait (&this->c_readers, &this->mutex);
-      sum = this->summary;
+      sum = this->summary.load (memory_order_relaxed);
       if (--this->w_readers == 0)
        sum &= ~w_reader;
     }
@@ -123,7 +124,7 @@ gtm_rwlock::write_lock_generic (gtm_thread *tx)
 {
   pthread_mutex_lock (&this->mutex);
 
-  unsigned int sum = this->summary;
+  unsigned int sum = this->summary.load (memory_order_relaxed);
 
   // If there is an active writer, wait.
   while (sum & a_writer)
@@ -136,23 +137,23 @@ gtm_rwlock::write_lock_generic (gtm_thread *tx)
          return false;
        }
 
-      this->summary = sum | w_writer;
+      this->summary.store (sum | w_writer, memory_order_relaxed);
       this->w_writers++;
       pthread_cond_wait (&this->c_writers, &this->mutex);
-      sum = this->summary;
+      sum = this->summary.load (memory_order_relaxed);
       if (--this->w_writers == 0)
        sum &= ~w_writer;
     }
 
   // Otherwise we can acquire the lock for write. As a writer, we have
   // priority, so we don't need to take this back.
-  this->summary = sum | a_writer;
+  this->summary.store (sum | a_writer, memory_order_relaxed);
 
   // We still need to wait for active readers to finish. The barrier makes
   // sure that we first set our write intent and check for active readers
   // after that, in strictly this order (similar to the barrier in the fast
   // path of read_lock()).
-  atomic_thread_fence(memory_order_acq_rel);
+  atomic_thread_fence(memory_order_seq_cst);
 
   // If this is an upgrade, we are not a reader anymore.
   if (tx != 0)
@@ -235,8 +236,18 @@ gtm_rwlock::write_upgrade (gtm_thread *tx)
 void
 gtm_rwlock::read_unlock (gtm_thread *tx)
 {
-  tx->shared_state = -1;
-  unsigned int sum = this->summary;
+  // We only need release memory order here because of privatization safety
+  // (this ensures that marking the transaction as inactive happens after
+  // any prior data accesses by this transaction, and that neither the
+  // compiler nor the hardware order this store earlier).
+  // ??? We might be able to avoid this release here if the compiler can't
+  // merge the release fence with the subsequent seq_cst fence.
+  tx->shared_state.store (-1, memory_order_release);
+  // We need this seq_cst fence here to avoid lost wake-ups.  Furthermore,
+  // the privatization safety implementation in gtm_thread::try_commit()
+  // relies on the existence of this seq_cst fence.
+  atomic_thread_fence (memory_order_seq_cst);
+  unsigned int sum = this->summary.load (memory_order_relaxed);
   if (likely(!(sum & (a_writer | w_writer))))
     return;
 
@@ -269,8 +280,8 @@ gtm_rwlock::write_unlock ()
 {
   pthread_mutex_lock (&this->mutex);
 
-  unsigned int sum = this->summary;
-  this->summary = sum & ~a_writer;
+  unsigned int sum = this->summary.load (memory_order_relaxed);
+  this->summary.store (sum & ~a_writer, memory_order_relaxed);
 
   // If there is a waiting writer, wake it.
   if (unlikely (sum & w_writer))
index f538bd059a2ce7b6ccd7d79e0a6b0cc8283431c4..359991ab45b852864e69cbca253291fd3a47c4ce 100644 (file)
@@ -26,6 +26,7 @@
 #define GTM_RWLOCK_H
 
 #include <pthread.h>
+#include "local_atomic"
 
 namespace GTM HIDDEN {
 
@@ -55,7 +56,7 @@ class gtm_rwlock
   static const unsigned w_writer  = 2; // The w_writers field != 0
   static const unsigned w_reader  = 4;  // The w_readers field != 0
 
-  unsigned int summary;                // Bitmask of the above.
+  std::atomic<unsigned int> summary;   // Bitmask of the above.
   unsigned int a_readers;      // Nr active readers as observed by a writer
   unsigned int w_readers;      // Nr waiting readers
   unsigned int w_writers;      // Nr waiting writers
index 81045d319fcf106c3a5afff91937c916dee757ec..e678da76b349c64872f03bcf4575ea77ec4fa02b 100644 (file)
@@ -45,11 +45,14 @@ struct gl_mg : public method_group
 
   virtual void init()
   {
+    // This store is only executed while holding the serial lock, so relaxed
+    // memory order is sufficient here.
     orec.store(0, memory_order_relaxed);
   }
   virtual void fini() { }
 };
 
+// TODO cacheline padding
 static gl_mg o_gl_mg;
 
 
@@ -85,23 +88,34 @@ protected:
   static void pre_write(const void *addr, size_t len)
   {
     gtm_thread *tx = gtm_thr();
-    gtm_word v = tx->shared_state.load(memory_order_acquire);
+    gtm_word v = tx->shared_state.load(memory_order_relaxed);
     if (unlikely(!gl_mg::is_locked(v)))
       {
        // Check for and handle version number overflow.
        if (unlikely(v >= gl_mg::VERSION_MAX))
          tx->restart(RESTART_INIT_METHOD_GROUP);
 
-       // CAS global orec from our snapshot time to the locked state.
        // This validates that we have a consistent snapshot, which is also
        // for making privatization safety work (see the class' comments).
+       // Note that this check here will be performed by the subsequent CAS
+       // again, so relaxed memory order is fine.
        gtm_word now = o_gl_mg.orec.load(memory_order_relaxed);
        if (now != v)
          tx->restart(RESTART_VALIDATE_WRITE);
+
+       // CAS global orec from our snapshot time to the locked state.
+       // We need acq_rel memory order here to synchronize with other loads
+       // and modifications of orec.
        if (!o_gl_mg.orec.compare_exchange_strong (now, gl_mg::set_locked(now),
-                                                  memory_order_acquire))
+                                                  memory_order_acq_rel))
          tx->restart(RESTART_LOCKED_WRITE);
 
+       // We use an explicit fence here to avoid having to use release
+       // memory order for all subsequent data stores.  This fence will
+       // synchronize with loads of the data with acquire memory order.  See
+       // validate() for why this is necessary.
+       atomic_thread_fence(memory_order_release);
+
        // Set shared_state to new value.
        tx->shared_state.store(gl_mg::set_locked(now), memory_order_release);
       }
@@ -112,11 +126,19 @@ protected:
 
   static void validate()
   {
-    // Check that snapshot is consistent. The barrier ensures that this
-    // happens after previous data loads.  Recall that load cannot itself
-    // have memory_order_release.
+    // Check that snapshot is consistent.  We expect the previous data load to
+    // have acquire memory order, or be atomic and followed by an acquire
+    // fence.
+    // As a result, the data load will synchronize with the release fence
+    // issued by the transactions whose data updates the data load has read
+    // from.  This forces the orec load to read from a visible sequence of side
+    // effects that starts with the other updating transaction's store that
+    // acquired the orec and set it to locked.
+    // We therefore either read a value with the locked bit set (and restart)
+    // or read an orec value that was written after the data had been written.
+    // Either will allow us to detect inconsistent reads because it will have
+    // a higher/different value.
     gtm_thread *tx = gtm_thr();
-    atomic_thread_fence(memory_order_release);
     gtm_word l = o_gl_mg.orec.load(memory_order_relaxed);
     if (l != tx->shared_state.load(memory_order_relaxed))
       tx->restart(RESTART_VALIDATE_READ);
@@ -131,9 +153,28 @@ protected:
        pre_write(addr, sizeof(V));
        return *addr;
       }
+    if (unlikely(mod == RaW))
+      return *addr;
+
+    // We do not have acquired the orec, so we need to load a value and then
+    // validate that this was consistent.
+    // This needs to have acquire memory order (see validate()).
+    // Alternatively, we can put an acquire fence after the data load but this
+    // is probably less efficient.
+    // FIXME We would need an atomic load with acquire memory order here but
+    // we can't just forge an atomic load for nonatomic data because this
+    // might not work on all implementations of atomics.  However, we need
+    // the acquire memory order and we can only establish this if we link
+    // it to the matching release using a reads-from relation between atomic
+    // loads.  Also, the compiler is allowed to optimize nonatomic accesses
+    // differently than atomic accesses (e.g., if the load would be moved to
+    // after the fence, we potentially don't synchronize properly anymore).
+    // Instead of the following, just use an ordinary load followed by an
+    // acquire fence, and hope that this is good enough for now:
+    // V v = atomic_load_explicit((atomic<V>*)addr, memory_order_acquire);
     V v = *addr;
-    if (likely(mod != RaW))
-      validate();
+    atomic_thread_fence(memory_order_acquire);
+    validate();
     return v;
   }
 
@@ -142,6 +183,15 @@ protected:
   {
     if (unlikely(mod != WaW))
       pre_write(addr, sizeof(V));
+    // FIXME We would need an atomic store here but we can't just forge an
+    // atomic load for nonatomic data because this might not work on all
+    // implementations of atomics.  However, we need this store to link the
+    // release fence in pre_write() to the acquire operation in load, which
+    // is only guaranteed if we have a reads-from relation between atomic
+    // accesses.  Also, the compiler is allowed to optimize nonatomic accesses
+    // differently than atomic accesses (e.g., if the store would be moved
+    // to before the release fence in pre_write(), things could go wrong).
+    // atomic_store_explicit((atomic<V>*)addr, value, memory_order_relaxed);
     *addr = value;
   }
 
@@ -153,6 +203,8 @@ public:
        && (dst_mod != NONTXNAL || src_mod == RfW))
       pre_write(dst, size);
 
+    // FIXME We should use atomics here (see store()).  Let's just hope that
+    // memcpy/memmove are good enough.
     if (!may_overlap)
       ::memcpy(dst, src, size);
     else
@@ -167,6 +219,8 @@ public:
   {
     if (mod != WaW)
       pre_write(dst, size);
+    // FIXME We should use atomics here (see store()).  Let's just hope that
+    // memset is good enough.
     ::memset(dst, c, size);
   }
 
@@ -183,6 +237,11 @@ public:
     gtm_word v;
     while (1)
       {
+        // We need acquire memory order here so that this load will
+        // synchronize with the store that releases the orec in trycommit().
+        // In turn, this makes sure that subsequent data loads will read from
+        // a visible sequence of side effects that starts with the most recent
+        // store to the data right before the release of the orec.
         v = o_gl_mg.orec.load(memory_order_acquire);
         if (!gl_mg::is_locked(v))
          break;
@@ -201,7 +260,7 @@ public:
     // smaller or equal (the serial lock will set shared_state to zero when
     // marking the transaction as active, and restarts enforce immediate
     // visibility of a smaller or equal value with a barrier (see
-    // release_orec()).
+    // rollback()).
     tx->shared_state.store(v, memory_order_relaxed);
     return NO_RESTART;
   }
@@ -209,7 +268,7 @@ public:
   virtual bool trycommit(gtm_word& priv_time)
   {
     gtm_thread* tx = gtm_thr();
-    gtm_word v = tx->shared_state.load(memory_order_acquire);
+    gtm_word v = tx->shared_state.load(memory_order_relaxed);
 
     // Special case: If shared_state is ~0, then we have acquired the
     // serial lock (tx->state is not updated yet). In this case, the previous
@@ -227,6 +286,7 @@ public:
     if (gl_mg::is_locked(v))
       {
        // Release the global orec, increasing its version number / timestamp.
+        // See begin_or_restart() for why we need release memory order here.
        v = gl_mg::clear_locked(v) + 1;
        o_gl_mg.orec.store(v, memory_order_release);
 
@@ -245,7 +305,7 @@ public:
       return;
 
     gtm_thread *tx = gtm_thr();
-    gtm_word v = tx->shared_state.load(memory_order_acquire);
+    gtm_word v = tx->shared_state.load(memory_order_relaxed);
     // Special case: If shared_state is ~0, then we have acquired the
     // serial lock (tx->state is not updated yet). In this case, the previous
     // value isn't available anymore, so grab it from the global lock, which
@@ -262,6 +322,7 @@ public:
     if (gl_mg::is_locked(v))
       {
        // Release the global orec, increasing its version number / timestamp.
+        // See begin_or_restart() for why we need release memory order here.
        v = gl_mg::clear_locked(v) + 1;
        o_gl_mg.orec.store(v, memory_order_release);
 
@@ -269,7 +330,7 @@ public:
        // Special case: Only do this if we are not a serial transaction
        // because otherwise, we would interfere with the serial lock.
        if (!is_serial)
-         tx->shared_state.store(v, memory_order_relaxed);
+         tx->shared_state.store(v, memory_order_release);
 
        // We need a store-load barrier after this store to prevent it
        // from becoming visible after later data loads because the
@@ -277,7 +338,8 @@ public:
        // snapshot time (the lock bit had been set), which could break
        // privatization safety. We do not need a barrier before this
        // store (see pre_write() for an explanation).
-       atomic_thread_fence(memory_order_acq_rel);
+       // ??? What is the precise reasoning in the C++11 model?
+       atomic_thread_fence(memory_order_seq_cst);
       }
 
   }