for i in range(n):
self.assertIn(f'<RLock(MainProcess|T{i+1}, {i+1})>', l)
-
- t = threading.Thread(target=self._acquire_release,
- args=(lock, 0.2),
- name=f'T1')
+ rlock = self.RLock()
+ t = threading.Thread(target=rlock.acquire)
t.start()
- time.sleep(0.1)
- self.assertEqual('<RLock(SomeOtherThread, nonzero)>', repr(lock))
- time.sleep(0.2)
+ t.join()
+ self.assertEqual('<RLock(SomeOtherThread, nonzero)>', repr(rlock))
pname = 'P1'
l = multiprocessing.Manager().list()
p.join()
self.assertEqual(f'<RLock({pname}, 1)>', l[0])
- event = self.Event()
- lock = self.RLock()
- p = self.Process(target=self._acquire_event,
- args=(lock, event))
+ rlock = self.RLock()
+ p = self.Process(target=self._acquire, args=(rlock,))
p.start()
- event.wait()
- self.assertEqual('<RLock(SomeOtherProcess, nonzero)>', repr(lock))
p.join()
+ self.assertEqual('<RLock(SomeOtherProcess, nonzero)>', repr(rlock))
def test_rlock(self):
lock = self.RLock()