from multiprocessing.managers import SharedMemoryManager
except ImportError:
self.skipTest("Test requires multiprocessing")
- from threading import Thread
+ from threading import Thread, Event
- n = 100
+ start = Event()
with SharedMemoryManager() as smm:
obj = smm.ShareableList(range(100))
- threads = []
- for _ in range(n):
- # Issue gh-127085, the `ShareableList.count` is just a convenient way to mess the `exports`
- # counter of `memoryview`, this issue has no direct relation with `ShareableList`.
- threads.append(Thread(target=obj.count, args=(1,)))
-
+ def test():
+ # Issue gh-127085, the `ShareableList.count` is just a
+ # convenient way to mess the `exports` counter of `memoryview`,
+ # this issue has no direct relation with `ShareableList`.
+ start.wait(support.SHORT_TIMEOUT)
+ for i in range(10):
+ obj.count(1)
+ threads = [Thread(target=test) for _ in range(10)]
with threading_helper.start_threads(threads):
- pass
+ start.set()
del obj