from multiprocessing.queues import Queue as MPQueue
from multiprocessing import Manager as MM
proxy_queue = MM().Queue()
+ proxy_joinable_queue = MM().JoinableQueue()
qspec = config['queue']
- if not isinstance(qspec, (queue.Queue, MPQueue, type(proxy_queue))):
+ if not isinstance(qspec, (queue.Queue, MPQueue,
+ type(proxy_queue), type(proxy_joinable_queue))):
if isinstance(qspec, str):
q = self.resolve(qspec)
if not callable(q):
def test_multiprocessing_queues(self):
# See gh-119819
- import_helper.import_module('_multiprocessing') # will skip test if it's not available
+
+ # will skip test if it's not available
+ import_helper.import_module('_multiprocessing')
+
cd = copy.deepcopy(self.config_queue_handler)
from multiprocessing import Queue as MQ, Manager as MM
q1 = MQ() # this can't be pickled
q2 = MM().Queue() # a proxy queue for use when pickling is needed
- for qspec in (q1, q2):
+ q3 = MM().JoinableQueue() # a joinable proxy queue
+ for qspec in (q1, q2, q3):
fn = make_temp_file('.log', 'test_logging-cmpqh-')
cd['handlers']['h1']['filename'] = fn
cd['handlers']['ah']['queue'] = qspec