2 * The read/write mutex module provides a primitive for maintaining shared read
3 * access and mutually exclusive write access.
5 * Copyright: Copyright Sean Kelly 2005 - 2009.
6 * License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
8 * Source: $(DRUNTIMESRC core/sync/_rwmutex.d)
11 /* Copyright Sean Kelly 2005 - 2009.
12 * Distributed under the Boost Software License, Version 1.0.
13 * (See accompanying file LICENSE or copy at
14 * http://www.boost.org/LICENSE_1_0.txt)
16 module core.sync.rwmutex;
19 public import core.sync.exception;
20 private import core.sync.condition;
21 private import core.sync.mutex;
22 private import core.memory;
26 private import core.sys.posix.pthread;
30 ////////////////////////////////////////////////////////////////////////////////
35 ////////////////////////////////////////////////////////////////////////////////
39 * This class represents a mutex that allows any number of readers to enter,
40 * but when a writer enters, all other readers and writers are blocked.
42 * Please note that this mutex is not recursive and is intended to guard access
43 * to data only. Also, no deadlock checking is in place because doing so would
44 * require dynamic memory allocation, which would reduce performance by an
45 * unacceptable amount. As a result, any attempt to recursively acquire this
46 * mutex may well deadlock the caller, particularly if a write lock is acquired
47 * while holding a read lock, or vice-versa. In practice, this should not be
48 * an issue however, because it is uncommon to call deeply into unknown code
49 * while holding a lock that simply protects data.
54 * Defines the policy used by this mutex. Currently, two policies are
57 * The first will queue writers until no readers hold the mutex, then
58 * pass the writers through one at a time. If a reader acquires the mutex
59 * while there are still writers queued, the reader will take precedence.
61 * The second will queue readers if there are any writers queued. Writers
62 * are passed through one at a time, and once there are no writers present,
63 * all queued readers will be alerted.
65 * Future policies may offer a more even balance between reader and writer
70 PREFER_READERS, /// Readers get preference. This may starve writers.
71 PREFER_WRITERS /// Writers get preference. This may starve readers.
75 ////////////////////////////////////////////////////////////////////////////
77 ////////////////////////////////////////////////////////////////////////////
81 * Initializes a read/write mutex object with the supplied policy.
84 * policy = The policy to use.
89 this( Policy policy = Policy.PREFER_WRITERS )
91 m_commonMutex = new Mutex;
93 throw new SyncError( "Unable to initialize mutex" );
95 m_readerQueue = new Condition( m_commonMutex );
97 throw new SyncError( "Unable to initialize mutex" );
99 m_writerQueue = new Condition( m_commonMutex );
100 if ( !m_writerQueue )
101 throw new SyncError( "Unable to initialize mutex" );
104 m_reader = new Reader;
105 m_writer = new Writer;
108 ////////////////////////////////////////////////////////////////////////////
109 // General Properties
110 ////////////////////////////////////////////////////////////////////////////
114 * Gets the policy used by this mutex.
117 * The policy used by this mutex.
119 @property Policy policy()
125 ////////////////////////////////////////////////////////////////////////////
126 // Reader/Writer Handles
127 ////////////////////////////////////////////////////////////////////////////
131 * Gets an object representing the reader lock for the associated mutex.
134 * A reader sub-mutex.
136 @property Reader reader()
143 * Gets an object representing the writer lock for the associated mutex.
146 * A writer sub-mutex.
148 @property Writer writer()
154 ////////////////////////////////////////////////////////////////////////////
156 ////////////////////////////////////////////////////////////////////////////
160 * This class can be considered a mutex in its own right, and is used to
161 * negotiate a read lock for the enclosing mutex.
167 * Initializes a read/write mutex reader proxy object.
172 this.__monitor = &m_proxy;
177 * Acquires a read lock on the enclosing mutex.
181 synchronized( m_commonMutex )
183 ++m_numQueuedReaders;
184 scope(exit) --m_numQueuedReaders;
186 while ( shouldQueueReader )
187 m_readerQueue.wait();
188 ++m_numActiveReaders;
194 * Releases a read lock on the enclosing mutex.
196 @trusted void unlock()
198 synchronized( m_commonMutex )
200 if ( --m_numActiveReaders < 1 )
202 if ( m_numQueuedWriters > 0 )
203 m_writerQueue.notify();
210 * Attempts to acquire a read lock on the enclosing mutex. If one can
211 * be obtained without blocking, the lock is acquired and true is
212 * returned. If not, the lock is not acquired and false is returned.
215 * true if the lock was acquired and false if not.
219 synchronized( m_commonMutex )
221 if ( shouldQueueReader )
223 ++m_numActiveReaders;
230 @property bool shouldQueueReader()
232 if ( m_numActiveWriters > 0 )
237 case Policy.PREFER_WRITERS:
238 return m_numQueuedWriters > 0;
240 case Policy.PREFER_READERS:
253 MonitorProxy m_proxy;
257 ////////////////////////////////////////////////////////////////////////////
259 ////////////////////////////////////////////////////////////////////////////
263 * This class can be considered a mutex in its own right, and is used to
264 * negotiate a write lock for the enclosing mutex.
270 * Initializes a read/write mutex writer proxy object.
275 this.__monitor = &m_proxy;
280 * Acquires a write lock on the enclosing mutex.
284 synchronized( m_commonMutex )
286 ++m_numQueuedWriters;
287 scope(exit) --m_numQueuedWriters;
289 while ( shouldQueueWriter )
290 m_writerQueue.wait();
291 ++m_numActiveWriters;
297 * Releases a write lock on the enclosing mutex.
299 @trusted void unlock()
301 synchronized( m_commonMutex )
303 if ( --m_numActiveWriters < 1 )
308 case Policy.PREFER_READERS:
309 if ( m_numQueuedReaders > 0 )
310 m_readerQueue.notifyAll();
311 else if ( m_numQueuedWriters > 0 )
312 m_writerQueue.notify();
314 case Policy.PREFER_WRITERS:
315 if ( m_numQueuedWriters > 0 )
316 m_writerQueue.notify();
317 else if ( m_numQueuedReaders > 0 )
318 m_readerQueue.notifyAll();
326 * Attempts to acquire a write lock on the enclosing mutex. If one can
327 * be obtained without blocking, the lock is acquired and true is
328 * returned. If not, the lock is not acquired and false is returned.
331 * true if the lock was acquired and false if not.
335 synchronized( m_commonMutex )
337 if ( shouldQueueWriter )
339 ++m_numActiveWriters;
346 @property bool shouldQueueWriter()
348 if ( m_numActiveWriters > 0 ||
349 m_numActiveReaders > 0 )
353 case Policy.PREFER_READERS:
354 return m_numQueuedReaders > 0;
356 case Policy.PREFER_WRITERS:
369 MonitorProxy m_proxy;
379 Condition m_readerQueue;
380 Condition m_writerQueue;
382 int m_numQueuedReaders;
383 int m_numActiveReaders;
384 int m_numQueuedWriters;
385 int m_numActiveWriters;
389 ////////////////////////////////////////////////////////////////////////////////
391 ////////////////////////////////////////////////////////////////////////////////
396 import core.atomic, core.thread, core.sync.semaphore;
398 static void runTest(ReadWriteMutex.Policy policy)
400 scope mutex = new ReadWriteMutex(policy);
401 scope rdSemA = new Semaphore, rdSemB = new Semaphore,
402 wrSemA = new Semaphore, wrSemB = new Semaphore;
403 shared size_t numReaders, numWriters;
407 synchronized (mutex.reader)
409 atomicOp!"+="(numReaders, 1);
412 atomicOp!"-="(numReaders, 1);
418 synchronized (mutex.writer)
420 atomicOp!"+="(numWriters, 1);
423 atomicOp!"-="(numWriters, 1);
427 void waitQueued(size_t queuedReaders, size_t queuedWriters)
431 synchronized (mutex.m_commonMutex)
433 if (mutex.m_numQueuedReaders == queuedReaders &&
434 mutex.m_numQueuedWriters == queuedWriters)
441 scope group = new ThreadGroup;
443 // 2 simultaneous readers
444 group.create(&readerFn); group.create(&readerFn);
445 rdSemA.wait(); rdSemA.wait();
446 assert(numReaders == 2);
447 rdSemB.notify(); rdSemB.notify();
449 assert(numReaders == 0);
450 foreach (t; group) group.remove(t);
452 // 1 writer at a time
453 group.create(&writerFn); group.create(&writerFn);
455 assert(!wrSemA.tryWait());
456 assert(numWriters == 1);
459 assert(numWriters == 1);
462 assert(numWriters == 0);
463 foreach (t; group) group.remove(t);
465 // reader and writer are mutually exclusive
466 group.create(&readerFn);
468 group.create(&writerFn);
470 assert(!wrSemA.tryWait());
471 assert(numReaders == 1 && numWriters == 0);
474 assert(numReaders == 0 && numWriters == 1);
477 assert(numReaders == 0 && numWriters == 0);
478 foreach (t; group) group.remove(t);
480 // writer and reader are mutually exclusive
481 group.create(&writerFn);
483 group.create(&readerFn);
485 assert(!rdSemA.tryWait());
486 assert(numReaders == 0 && numWriters == 1);
489 assert(numReaders == 1 && numWriters == 0);
492 assert(numReaders == 0 && numWriters == 0);
493 foreach (t; group) group.remove(t);
495 // policy determines whether queued reader or writers progress first
496 group.create(&writerFn);
498 group.create(&readerFn);
499 group.create(&writerFn);
501 assert(numReaders == 0 && numWriters == 1);
504 if (policy == ReadWriteMutex.Policy.PREFER_READERS)
507 assert(numReaders == 1 && numWriters == 0);
510 assert(numReaders == 0 && numWriters == 1);
513 else if (policy == ReadWriteMutex.Policy.PREFER_WRITERS)
516 assert(numReaders == 0 && numWriters == 1);
519 assert(numReaders == 1 && numWriters == 0);
523 assert(numReaders == 0 && numWriters == 0);
524 foreach (t; group) group.remove(t);
526 runTest(ReadWriteMutex.Policy.PREFER_READERS);
527 runTest(ReadWriteMutex.Policy.PREFER_WRITERS);