import unittest
from threading import Thread, Barrier
-from itertools import batched, cycle
+from itertools import batched, chain, cycle
from test.support import threading_helper
barrier.wait()
while True:
try:
- _ = next(it)
+ next(it)
except StopIteration:
break
barrier.reset()
+ @threading_helper.reap_threads
+ def test_chain(self):
+ number_of_threads = 6
+ number_of_iterations = 20
+
+ barrier = Barrier(number_of_threads)
+ def work(it):
+ barrier.wait()
+ while True:
+ try:
+ next(it)
+ except StopIteration:
+ break
+
+ data = [(1, )] * 200
+ for it in range(number_of_iterations):
+ chain_iterator = chain(*data)
+ worker_threads = []
+ for ii in range(number_of_threads):
+ worker_threads.append(
+ Thread(target=work, args=[chain_iterator]))
+
+ with threading_helper.start_threads(worker_threads):
+ pass
+
+ barrier.reset()
+
+
if __name__ == "__main__":
unittest.main()
return 0;
}
-static PyObject *
-chain_next(PyObject *op)
+static inline PyObject *
+chain_next_lock_held(PyObject *op)
{
chainobject *lz = chainobject_CAST(op);
PyObject *item;
return NULL;
}
+static PyObject *
+chain_next(PyObject *op)
+{
+ PyObject *result;
+ Py_BEGIN_CRITICAL_SECTION(op);
+ result = chain_next_lock_held(op);
+ Py_END_CRITICAL_SECTION()
+ return result;
+}
+
PyDoc_STRVAR(chain_doc,
"chain(*iterables)\n\
--\n\