import unittest
-from threading import Thread, Barrier
+from threading import Thread, Barrier, Event
from test.support import threading_helper
"""Test set contains operation combined with mutation."""
barrier = Barrier(2, timeout=2)
s = set()
- done = False
+ done = Event()
NUM_LOOPS = 1000
def read_set():
barrier.wait()
- while not done:
+ while not done.is_set():
for i in range(self.SET_SIZE):
item = i >> 1
result = item in s
def mutate_set():
- nonlocal done
barrier.wait()
for i in range(NUM_LOOPS):
s.clear()
s.discard(j)
# executes the set_swap_bodies() function
s.__iand__(set(k for k in range(10, 20)))
- done = True
+ done.set()
threads = [Thread(target=read_set), Thread(target=mutate_set)]
for t in threads:
def test_contains_frozenset(self):
barrier = Barrier(3, timeout=2)
- done = False
+ done = Event()
NUM_LOOPS = 1_000
def mutate_set():
barrier.wait()
- while not done:
+ while not done.is_set():
s.add(0)
s.add(1)
s.clear()
def read_set():
- nonlocal done
barrier.wait()
container = frozenset([frozenset([0])])
self.assertTrue(set([0]) in container)
for _ in range(NUM_LOOPS):
# Will return True when {0} is the key and False otherwise
result = s in container
- done = True
+ done.set()
threads = [
Thread(target=read_set),