]> git.ipfire.org Git - thirdparty/gcc.git/blob - libphobos/libdruntime/core/sync/rwmutex.d
Add D front-end, libphobos library, and D2 testsuite.
[thirdparty/gcc.git] / libphobos / libdruntime / core / sync / rwmutex.d
1 /**
2 * The read/write mutex module provides a primitive for maintaining shared read
3 * access and mutually exclusive write access.
4 *
5 * Copyright: Copyright Sean Kelly 2005 - 2009.
6 * License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
7 * Authors: Sean Kelly
8 * Source: $(DRUNTIMESRC core/sync/_rwmutex.d)
9 */
10
11 /* Copyright Sean Kelly 2005 - 2009.
12 * Distributed under the Boost Software License, Version 1.0.
13 * (See accompanying file LICENSE or copy at
14 * http://www.boost.org/LICENSE_1_0.txt)
15 */
16 module core.sync.rwmutex;
17
18
19 public import core.sync.exception;
20 private import core.sync.condition;
21 private import core.sync.mutex;
22 private import core.memory;
23
24 version (Posix)
25 {
26 private import core.sys.posix.pthread;
27 }
28
29
30 ////////////////////////////////////////////////////////////////////////////////
31 // ReadWriteMutex
32 //
33 // Reader reader();
34 // Writer writer();
35 ////////////////////////////////////////////////////////////////////////////////
36
37
38 /**
39 * This class represents a mutex that allows any number of readers to enter,
40 * but when a writer enters, all other readers and writers are blocked.
41 *
42 * Please note that this mutex is not recursive and is intended to guard access
43 * to data only. Also, no deadlock checking is in place because doing so would
44 * require dynamic memory allocation, which would reduce performance by an
45 * unacceptable amount. As a result, any attempt to recursively acquire this
46 * mutex may well deadlock the caller, particularly if a write lock is acquired
47 * while holding a read lock, or vice-versa. In practice, this should not be
48 * an issue however, because it is uncommon to call deeply into unknown code
49 * while holding a lock that simply protects data.
50 */
51 class ReadWriteMutex
52 {
53 /**
54 * Defines the policy used by this mutex. Currently, two policies are
55 * defined.
56 *
57 * The first will queue writers until no readers hold the mutex, then
58 * pass the writers through one at a time. If a reader acquires the mutex
59 * while there are still writers queued, the reader will take precedence.
60 *
61 * The second will queue readers if there are any writers queued. Writers
62 * are passed through one at a time, and once there are no writers present,
63 * all queued readers will be alerted.
64 *
65 * Future policies may offer a more even balance between reader and writer
66 * precedence.
67 */
68 enum Policy
69 {
70 PREFER_READERS, /// Readers get preference. This may starve writers.
71 PREFER_WRITERS /// Writers get preference. This may starve readers.
72 }
73
74
75 ////////////////////////////////////////////////////////////////////////////
76 // Initialization
77 ////////////////////////////////////////////////////////////////////////////
78
79
80 /**
81 * Initializes a read/write mutex object with the supplied policy.
82 *
83 * Params:
84 * policy = The policy to use.
85 *
86 * Throws:
87 * SyncError on error.
88 */
89 this( Policy policy = Policy.PREFER_WRITERS )
90 {
91 m_commonMutex = new Mutex;
92 if ( !m_commonMutex )
93 throw new SyncError( "Unable to initialize mutex" );
94
95 m_readerQueue = new Condition( m_commonMutex );
96 if ( !m_readerQueue )
97 throw new SyncError( "Unable to initialize mutex" );
98
99 m_writerQueue = new Condition( m_commonMutex );
100 if ( !m_writerQueue )
101 throw new SyncError( "Unable to initialize mutex" );
102
103 m_policy = policy;
104 m_reader = new Reader;
105 m_writer = new Writer;
106 }
107
108 ////////////////////////////////////////////////////////////////////////////
109 // General Properties
110 ////////////////////////////////////////////////////////////////////////////
111
112
113 /**
114 * Gets the policy used by this mutex.
115 *
116 * Returns:
117 * The policy used by this mutex.
118 */
119 @property Policy policy()
120 {
121 return m_policy;
122 }
123
124
125 ////////////////////////////////////////////////////////////////////////////
126 // Reader/Writer Handles
127 ////////////////////////////////////////////////////////////////////////////
128
129
130 /**
131 * Gets an object representing the reader lock for the associated mutex.
132 *
133 * Returns:
134 * A reader sub-mutex.
135 */
136 @property Reader reader()
137 {
138 return m_reader;
139 }
140
141
142 /**
143 * Gets an object representing the writer lock for the associated mutex.
144 *
145 * Returns:
146 * A writer sub-mutex.
147 */
148 @property Writer writer()
149 {
150 return m_writer;
151 }
152
153
154 ////////////////////////////////////////////////////////////////////////////
155 // Reader
156 ////////////////////////////////////////////////////////////////////////////
157
158
159 /**
160 * This class can be considered a mutex in its own right, and is used to
161 * negotiate a read lock for the enclosing mutex.
162 */
163 class Reader :
164 Object.Monitor
165 {
166 /**
167 * Initializes a read/write mutex reader proxy object.
168 */
169 this()
170 {
171 m_proxy.link = this;
172 this.__monitor = &m_proxy;
173 }
174
175
176 /**
177 * Acquires a read lock on the enclosing mutex.
178 */
179 @trusted void lock()
180 {
181 synchronized( m_commonMutex )
182 {
183 ++m_numQueuedReaders;
184 scope(exit) --m_numQueuedReaders;
185
186 while ( shouldQueueReader )
187 m_readerQueue.wait();
188 ++m_numActiveReaders;
189 }
190 }
191
192
193 /**
194 * Releases a read lock on the enclosing mutex.
195 */
196 @trusted void unlock()
197 {
198 synchronized( m_commonMutex )
199 {
200 if ( --m_numActiveReaders < 1 )
201 {
202 if ( m_numQueuedWriters > 0 )
203 m_writerQueue.notify();
204 }
205 }
206 }
207
208
209 /**
210 * Attempts to acquire a read lock on the enclosing mutex. If one can
211 * be obtained without blocking, the lock is acquired and true is
212 * returned. If not, the lock is not acquired and false is returned.
213 *
214 * Returns:
215 * true if the lock was acquired and false if not.
216 */
217 bool tryLock()
218 {
219 synchronized( m_commonMutex )
220 {
221 if ( shouldQueueReader )
222 return false;
223 ++m_numActiveReaders;
224 return true;
225 }
226 }
227
228
229 private:
230 @property bool shouldQueueReader()
231 {
232 if ( m_numActiveWriters > 0 )
233 return true;
234
235 switch ( m_policy )
236 {
237 case Policy.PREFER_WRITERS:
238 return m_numQueuedWriters > 0;
239
240 case Policy.PREFER_READERS:
241 default:
242 break;
243 }
244
245 return false;
246 }
247
248 struct MonitorProxy
249 {
250 Object.Monitor link;
251 }
252
253 MonitorProxy m_proxy;
254 }
255
256
257 ////////////////////////////////////////////////////////////////////////////
258 // Writer
259 ////////////////////////////////////////////////////////////////////////////
260
261
262 /**
263 * This class can be considered a mutex in its own right, and is used to
264 * negotiate a write lock for the enclosing mutex.
265 */
266 class Writer :
267 Object.Monitor
268 {
269 /**
270 * Initializes a read/write mutex writer proxy object.
271 */
272 this()
273 {
274 m_proxy.link = this;
275 this.__monitor = &m_proxy;
276 }
277
278
279 /**
280 * Acquires a write lock on the enclosing mutex.
281 */
282 @trusted void lock()
283 {
284 synchronized( m_commonMutex )
285 {
286 ++m_numQueuedWriters;
287 scope(exit) --m_numQueuedWriters;
288
289 while ( shouldQueueWriter )
290 m_writerQueue.wait();
291 ++m_numActiveWriters;
292 }
293 }
294
295
296 /**
297 * Releases a write lock on the enclosing mutex.
298 */
299 @trusted void unlock()
300 {
301 synchronized( m_commonMutex )
302 {
303 if ( --m_numActiveWriters < 1 )
304 {
305 switch ( m_policy )
306 {
307 default:
308 case Policy.PREFER_READERS:
309 if ( m_numQueuedReaders > 0 )
310 m_readerQueue.notifyAll();
311 else if ( m_numQueuedWriters > 0 )
312 m_writerQueue.notify();
313 break;
314 case Policy.PREFER_WRITERS:
315 if ( m_numQueuedWriters > 0 )
316 m_writerQueue.notify();
317 else if ( m_numQueuedReaders > 0 )
318 m_readerQueue.notifyAll();
319 }
320 }
321 }
322 }
323
324
325 /**
326 * Attempts to acquire a write lock on the enclosing mutex. If one can
327 * be obtained without blocking, the lock is acquired and true is
328 * returned. If not, the lock is not acquired and false is returned.
329 *
330 * Returns:
331 * true if the lock was acquired and false if not.
332 */
333 bool tryLock()
334 {
335 synchronized( m_commonMutex )
336 {
337 if ( shouldQueueWriter )
338 return false;
339 ++m_numActiveWriters;
340 return true;
341 }
342 }
343
344
345 private:
346 @property bool shouldQueueWriter()
347 {
348 if ( m_numActiveWriters > 0 ||
349 m_numActiveReaders > 0 )
350 return true;
351 switch ( m_policy )
352 {
353 case Policy.PREFER_READERS:
354 return m_numQueuedReaders > 0;
355
356 case Policy.PREFER_WRITERS:
357 default:
358 break;
359 }
360
361 return false;
362 }
363
364 struct MonitorProxy
365 {
366 Object.Monitor link;
367 }
368
369 MonitorProxy m_proxy;
370 }
371
372
373 private:
374 Policy m_policy;
375 Reader m_reader;
376 Writer m_writer;
377
378 Mutex m_commonMutex;
379 Condition m_readerQueue;
380 Condition m_writerQueue;
381
382 int m_numQueuedReaders;
383 int m_numActiveReaders;
384 int m_numQueuedWriters;
385 int m_numActiveWriters;
386 }
387
388
389 ////////////////////////////////////////////////////////////////////////////////
390 // Unit Tests
391 ////////////////////////////////////////////////////////////////////////////////
392
393
394 unittest
395 {
396 import core.atomic, core.thread, core.sync.semaphore;
397
398 static void runTest(ReadWriteMutex.Policy policy)
399 {
400 scope mutex = new ReadWriteMutex(policy);
401 scope rdSemA = new Semaphore, rdSemB = new Semaphore,
402 wrSemA = new Semaphore, wrSemB = new Semaphore;
403 shared size_t numReaders, numWriters;
404
405 void readerFn()
406 {
407 synchronized (mutex.reader)
408 {
409 atomicOp!"+="(numReaders, 1);
410 rdSemA.notify();
411 rdSemB.wait();
412 atomicOp!"-="(numReaders, 1);
413 }
414 }
415
416 void writerFn()
417 {
418 synchronized (mutex.writer)
419 {
420 atomicOp!"+="(numWriters, 1);
421 wrSemA.notify();
422 wrSemB.wait();
423 atomicOp!"-="(numWriters, 1);
424 }
425 }
426
427 void waitQueued(size_t queuedReaders, size_t queuedWriters)
428 {
429 for (;;)
430 {
431 synchronized (mutex.m_commonMutex)
432 {
433 if (mutex.m_numQueuedReaders == queuedReaders &&
434 mutex.m_numQueuedWriters == queuedWriters)
435 break;
436 }
437 Thread.yield();
438 }
439 }
440
441 scope group = new ThreadGroup;
442
443 // 2 simultaneous readers
444 group.create(&readerFn); group.create(&readerFn);
445 rdSemA.wait(); rdSemA.wait();
446 assert(numReaders == 2);
447 rdSemB.notify(); rdSemB.notify();
448 group.joinAll();
449 assert(numReaders == 0);
450 foreach (t; group) group.remove(t);
451
452 // 1 writer at a time
453 group.create(&writerFn); group.create(&writerFn);
454 wrSemA.wait();
455 assert(!wrSemA.tryWait());
456 assert(numWriters == 1);
457 wrSemB.notify();
458 wrSemA.wait();
459 assert(numWriters == 1);
460 wrSemB.notify();
461 group.joinAll();
462 assert(numWriters == 0);
463 foreach (t; group) group.remove(t);
464
465 // reader and writer are mutually exclusive
466 group.create(&readerFn);
467 rdSemA.wait();
468 group.create(&writerFn);
469 waitQueued(0, 1);
470 assert(!wrSemA.tryWait());
471 assert(numReaders == 1 && numWriters == 0);
472 rdSemB.notify();
473 wrSemA.wait();
474 assert(numReaders == 0 && numWriters == 1);
475 wrSemB.notify();
476 group.joinAll();
477 assert(numReaders == 0 && numWriters == 0);
478 foreach (t; group) group.remove(t);
479
480 // writer and reader are mutually exclusive
481 group.create(&writerFn);
482 wrSemA.wait();
483 group.create(&readerFn);
484 waitQueued(1, 0);
485 assert(!rdSemA.tryWait());
486 assert(numReaders == 0 && numWriters == 1);
487 wrSemB.notify();
488 rdSemA.wait();
489 assert(numReaders == 1 && numWriters == 0);
490 rdSemB.notify();
491 group.joinAll();
492 assert(numReaders == 0 && numWriters == 0);
493 foreach (t; group) group.remove(t);
494
495 // policy determines whether queued reader or writers progress first
496 group.create(&writerFn);
497 wrSemA.wait();
498 group.create(&readerFn);
499 group.create(&writerFn);
500 waitQueued(1, 1);
501 assert(numReaders == 0 && numWriters == 1);
502 wrSemB.notify();
503
504 if (policy == ReadWriteMutex.Policy.PREFER_READERS)
505 {
506 rdSemA.wait();
507 assert(numReaders == 1 && numWriters == 0);
508 rdSemB.notify();
509 wrSemA.wait();
510 assert(numReaders == 0 && numWriters == 1);
511 wrSemB.notify();
512 }
513 else if (policy == ReadWriteMutex.Policy.PREFER_WRITERS)
514 {
515 wrSemA.wait();
516 assert(numReaders == 0 && numWriters == 1);
517 wrSemB.notify();
518 rdSemA.wait();
519 assert(numReaders == 1 && numWriters == 0);
520 rdSemB.notify();
521 }
522 group.joinAll();
523 assert(numReaders == 0 && numWriters == 0);
524 foreach (t; group) group.remove(t);
525 }
526 runTest(ReadWriteMutex.Policy.PREFER_READERS);
527 runTest(ReadWriteMutex.Policy.PREFER_WRITERS);
528 }