def test_shutdown_immediate_all_methods_in_one_thread(self):
return self._shutdown_all_methods_in_one_thread(True)
- def _write_msg_thread(self, q, n, results, delay,
- i_when_exec_shutdown,
- event_start, event_end):
- event_start.wait()
- for i in range(1, n+1):
+ def _write_msg_thread(self, q, n, results,
+ i_when_exec_shutdown, event_shutdown,
+ barrier_start):
+ # All `write_msg_threads`
+ # put several items into the queue.
+ for i in range(0, i_when_exec_shutdown//2):
+ q.put((i, 'LOYD'))
+ # Wait for the barrier to be complete.
+ barrier_start.wait()
+
+ for i in range(i_when_exec_shutdown//2, n):
try:
q.put((i, "YDLO"))
- results.append(True)
except self.queue.ShutDown:
results.append(False)
- # triggers shutdown of queue
- if i == i_when_exec_shutdown:
- event_end.set()
- time.sleep(delay)
- # end of all puts
- q.join()
+ break
- def _read_msg_thread(self, q, nb, results, delay, event_start):
- event_start.wait()
- block = True
- while nb:
- time.sleep(delay)
+ # Trigger queue shutdown.
+ if i == i_when_exec_shutdown:
+ # Only one thread should call shutdown().
+ if not event_shutdown.is_set():
+ event_shutdown.set()
+ results.append(True)
+
+ def _read_msg_thread(self, q, results, barrier_start):
+ # Get at least one item.
+ q.get(True)
+ q.task_done()
+ # Wait for the barrier to be complete.
+ barrier_start.wait()
+ while True:
try:
- # Get at least one message
- q.get(block)
- block = False
+ q.get(False)
q.task_done()
- results.append(True)
- nb -= 1
except self.queue.ShutDown:
- results.append(False)
- nb -= 1
+ results.append(True)
+ break
except self.queue.Empty:
pass
- q.join()
- def _shutdown_thread(self, q, event_end, immediate):
+ def _shutdown_thread(self, q, results, event_end, immediate):
event_end.wait()
q.shutdown(immediate)
- q.join()
+ results.append(q.qsize() == 0)
- def _join_thread(self, q, delay, event_start):
- event_start.wait()
- time.sleep(delay)
+ def _join_thread(self, q, barrier_start):
+ # Wait for the barrier to be complete.
+ barrier_start.wait()
q.join()
def _shutdown_all_methods_in_many_threads(self, immediate):
+ # Run a 'multi-producers/consumers queue' use case,
+ # with enough items into the queue.
+ # When shutdown, all running threads will be joined.
q = self.type2test()
ps = []
- ev_start = threading.Event()
- ev_exec_shutdown = threading.Event()
res_puts = []
res_gets = []
- delay = 1e-4
- read_process = 4
- nb_msgs = read_process * 16
- nb_msgs_r = nb_msgs // read_process
- when_exec_shutdown = nb_msgs // 2
- lprocs = (
- (self._write_msg_thread, 1, (q, nb_msgs, res_puts, delay,
- when_exec_shutdown,
- ev_start, ev_exec_shutdown)),
- (self._read_msg_thread, read_process, (q, nb_msgs_r,
- res_gets, delay*2,
- ev_start)),
- (self._join_thread, 2, (q, delay*2, ev_start)),
- (self._shutdown_thread, 1, (q, ev_exec_shutdown, immediate)),
- )
- # start all threds
+ res_shutdown = []
+ write_threads = 4
+ read_threads = 6
+ join_threads = 2
+ nb_msgs = 1024*64
+ nb_msgs_w = nb_msgs // write_threads
+ when_exec_shutdown = nb_msgs_w // 2
+ # Use of a Barrier to ensure that
+ # - all write threads put all their items into the queue,
+ # - all read thread get at least one item from the queue,
+ # and keep on running until shutdown.
+ # The join thread is started only when shutdown is immediate.
+ nparties = write_threads + read_threads
+ if immediate:
+ nparties += join_threads
+ barrier_start = threading.Barrier(nparties)
+ ev_exec_shutdown = threading.Event()
+ lprocs = [
+ (self._write_msg_thread, write_threads, (q, nb_msgs_w, res_puts,
+ when_exec_shutdown, ev_exec_shutdown,
+ barrier_start)),
+ (self._read_msg_thread, read_threads, (q, res_gets, barrier_start)),
+ (self._shutdown_thread, 1, (q, res_shutdown, ev_exec_shutdown, immediate)),
+ ]
+ if immediate:
+ lprocs.append((self._join_thread, join_threads, (q, barrier_start)))
+ # start all threads.
for func, n, args in lprocs:
for i in range(n):
ps.append(threading.Thread(target=func, args=args))
ps[-1].start()
- # set event in order to run q.shutdown()
- ev_start.set()
-
- if not immediate:
- assert(len(res_gets) == len(res_puts))
- assert(res_gets.count(True) == res_puts.count(True))
- else:
- assert(len(res_gets) <= len(res_puts))
- assert(res_gets.count(True) <= res_puts.count(True))
-
- for thread in ps[1:]:
+ for thread in ps:
thread.join()
- @unittest.skip("test times out (gh-115258)")
+ self.assertTrue(True in res_puts)
+ self.assertEqual(res_gets.count(True), read_threads)
+ if immediate:
+ self.assertListEqual(res_shutdown, [True])
+ self.assertTrue(q.empty())
+
def test_shutdown_all_methods_in_many_threads(self):
return self._shutdown_all_methods_in_many_threads(False)
- @unittest.skip("test times out (gh-115258)")
def test_shutdown_immediate_all_methods_in_many_threads(self):
return self._shutdown_all_methods_in_many_threads(True)