self.assertEqual(p.exitcode, 0)
self.assertEqual(p.is_alive(), False)
self.assertNotIn(p, self.active_children())
+ close_queue(q)
@classmethod
def _sleep_some(cls):
gc.collect()
self.assertIs(wr(), None)
+ close_queue(q)
+
def test_many_processes(self):
if self.TYPE == 'threads':
self.skipTest('test not appropriate for {}'.format(self.TYPE))
p.join()
self.assertIs(wr(), None)
self.assertEqual(q.get(), 5)
+ close_queue(q)
@classmethod
def _test_child_fd_inflation(self, evt, q):
evt.set()
for p in procs:
p.join()
+ close_queue(q)
#
#
self.assertEqual(queue_full(queue, MAXSIZE), False)
proc.join()
+ close_queue(queue)
@classmethod
def _test_get(cls, queue, child_can_start, parent_can_continue):
self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
proc.join()
+ close_queue(queue)
@classmethod
def _test_fork(cls, queue):
self.assertRaises(pyqueue.Empty, queue.get, False)
p.join()
+ close_queue(queue)
def test_qsize(self):
q = self.Queue()
for p in workers:
p.join()
+ close_queue(queue)
def test_no_import_lock_contention(self):
with test.support.temp_cwd():
# Tolerate a delta of 30 ms because of the bad clock resolution on
# Windows (usually 15.6 ms)
self.assertGreaterEqual(delta, 0.170)
+ close_queue(q)
def test_queue_feeder_donot_stop_onexc(self):
# bpo-30414: verify feeder handles exceptions correctly
self.run_threads(self._test_wait_return_f, (self.barrier, queue))
results = [queue.get() for i in range(self.N)]
self.assertEqual(results.count(0), 1)
+ close_queue(queue)
@classmethod
def _test_action_f(cls, barrier, results):
w.close()
self.assertEqual(conn.recv(), 'foobar'*2)
+ p.join()
+
#
#
#
except pyqueue.Empty:
pass
-def _test_process(q):
+def _test_process():
queue = multiprocessing.Queue()
subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
subProc.daemon = True
class TestStdinBadfiledescriptor(unittest.TestCase):
def test_queue_in_process(self):
- queue = multiprocessing.Queue()
- proc = multiprocessing.Process(target=_test_process, args=(queue,))
+ proc = multiprocessing.Process(target=_test_process)
proc.start()
proc.join()