from test.support import import_helper
from test.support import threading_helper
+# queue module depends on threading primitives
+threading_helper.requires_working_threading(module=True)
py_queue = import_helper.import_fresh_module('queue', blocked=['_queue'])
c_queue = import_helper.import_fresh_module('queue', fresh=['_queue'])
self.fail("trigger thread ended but event never set")
-@threading_helper.requires_working_threading()
class BaseQueueTestMixin(BlockingTestMixin):
def setUp(self):
self.cum = 0
class FailingQueueException(Exception): pass
-@threading_helper.requires_working_threading()
class FailingQueueTest(BlockingTestMixin):
def setUp(self):
return
results.append(val)
- @threading_helper.requires_working_threading()
def run_threads(self, n_threads, q, inputs, feed_func, consume_func):
results = []
sentinel = None