import unittest
-from itertools import accumulate, batched, chain, combinations_with_replacement, cycle, permutations
+from itertools import accumulate, batched, chain, combinations_with_replacement, cycle, permutations, zip_longest
from test.support import threading_helper
it = permutations(tuple(range(4)), 2)
threading_helper.run_concurrently(work_iterator, nthreads=6, args=[it])
+ @threading_helper.reap_threads
+ def test_zip_longest(self):
+ number_of_iterations = 10
+ for _ in range(number_of_iterations):
+ it = zip_longest(list(range(4)), list(range(8)), fillvalue=0)
+ threading_helper.run_concurrently(work_iterator, nthreads=10, args=[it])
+
if __name__ == "__main__":
unittest.main()
}
static PyObject *
-zip_longest_next(PyObject *op)
+zip_longest_next_lock_held(PyObject *op)
{
ziplongestobject *lz = ziplongestobject_CAST(op);
Py_ssize_t i;
return result;
}
+static PyObject *
+zip_longest_next(PyObject *op)
+{
+ PyObject *result;
+ Py_BEGIN_CRITICAL_SECTION(op);
+ result = zip_longest_next_lock_held(op);
+ Py_END_CRITICAL_SECTION()
+ return result;
+}
+
PyDoc_STRVAR(zip_longest_doc,
"zip_longest(*iterables, fillvalue=None)\n\
--\n\