self.assertFalse(lock.locked())
self.assertRaises((ValueError, threading.ThreadError), lock.release)
+ @classmethod
+ def _test_lock_locked_2processes(cls, lock, event, res):
+ lock.acquire()
+ res.value = lock.locked()
+ event.set()
+
+ def test_lock_locked_2processes(self):
+ if self.TYPE != 'processes':
+ self.skipTest('test not appropriate for {}'.format(self.TYPE))
+
+ lock = self.Lock()
+ event = self.Event()
+ res = self.Value('b', 0)
+ p = self.Process(target=self._test_lock_locked_2processes,
+ args=(lock, event, res))
+ p.start()
+ event.wait()
+ self.assertTrue(lock.locked())
+ self.assertTrue(res.value)
+ p.join()
+
@staticmethod
def _acquire_release(lock, timeout, l=None, n=1):
for _ in range(n):
self.assertFalse(lock.locked())
self.assertRaises((AssertionError, RuntimeError), lock.release)
+ def test_rlock_locked_2processes(self):
+ if self.TYPE != 'processes':
+ self.skipTest('test not appropriate for {}'.format(self.TYPE))
+
+ rlock = self.RLock()
+ event = self.Event()
+ res = Value('b', 0)
+ # target is the same as for the test_lock_locked_2processes test.
+ p = self.Process(target=self._test_lock_locked_2processes,
+ args=(rlock, event, res))
+ p.start()
+ event.wait()
+ self.assertTrue(rlock.locked())
+ self.assertTrue(res.value)
+ p.join()
+
def test_lock_context(self):
with self.Lock() as locked:
self.assertTrue(locked)