Currently, :class:`Lock`, :class:`RLock`, :class:`Condition`,
:class:`Semaphore`, and :class:`BoundedSemaphore` objects may be used as
:keyword:`with` statement context managers.
+
+
+Iterator synchronization
+------------------------
+
+By default, Python iterators do not support concurrent access. Most iterators make
+no guarantees when accessed simultaneously from multiple threads. Generator
+iterators, for example, raise :exc:`ValueError` if one of their iterator methods
+is called while the generator is already executing. The tools in this section
+allow reliable concurrency support to be added to ordinary iterators and
+iterator-producing callables.
+
+The :class:`serialize_iterator` wrapper lets multiple threads share a single iterator and
+take turns consuming from it. While one thread is running ``__next__()``, the
+others block until the iterator becomes available. Each value produced by the
+underlying iterator is delivered to exactly one caller.
+
+The :func:`concurrent_tee` function lets multiple threads each receive the full
+stream of values from one underlying iterator. It creates independent iterators
+that all draw from the same source. Values are buffered until consumed by all
+of the derived iterators.
+
+.. class:: serialize_iterator(iterable)
+
+ Return an iterator wrapper that serializes concurrent calls to
+ :meth:`~iterator.__next__` using a lock.
+
+ If the wrapped iterator also defines :meth:`~generator.send`,
+ :meth:`~generator.throw`, or :meth:`~generator.close`, those calls
+ are serialized as well.
+
+ This makes it possible to share a single iterator, including a generator
+ iterator, between multiple threads. A lock ensures that calls are handled
+ one at a time. No values are duplicated or skipped by the wrapper itself.
+ Each item from the underlying iterator is given to exactly one caller.
+
+ This wrapper does not copy or buffer values. Threads that call
+ :func:`next` while another thread is already advancing the iterator will
+ block until the active call completes.
+
+ Example:
+
+ .. code-block:: python
+
+ import threading
+
+ def squares(n):
+ for x in range(n):
+ yield x * x
+
+ def consume(name, iterable):
+ for item in iterable:
+ print(name, item)
+
+ source = threading.serialize_iterator(squares(5))
+
+ t1 = threading.Thread(target=consume, args=("left", source))
+ t2 = threading.Thread(target=consume, args=("right", source))
+ t1.start()
+ t2.start()
+ t1.join()
+ t2.join()
+
+ In this example, each number is printed exactly once, but the work is shared
+ between the two threads.
+
+ .. versionadded:: next
+
+
+.. function:: synchronized_iterator(func)
+
+ Wrap an iterator-producing callable so that each iterator it returns is
+ automatically passed through :class:`serialize_iterator`.
+
+ This is especially useful as a :term:`decorator` for generator functions,
+ allowing their generator-iterators to be consumed from multiple threads.
+
+ Example:
+
+ .. code-block:: python
+
+ import threading
+
+ @threading.synchronized_iterator
+ def squares(n):
+ for x in range(n):
+ yield x * x
+
+ def consume(name, iterable):
+ for item in iterable:
+ print(name, item)
+
+ source = squares(5)
+
+ t1 = threading.Thread(target=consume, args=("left", source))
+ t2 = threading.Thread(target=consume, args=("right", source))
+ t1.start()
+ t2.start()
+ t1.join()
+ t2.join()
+
+ The returned wrapper preserves the metadata of *func*, such as its name and
+ wrapped function reference.
+
+ .. versionadded:: next
+
+
+.. function:: concurrent_tee(iterable, n=2)
+
+ Return *n* independent iterators from a single input *iterable*, with
+ guaranteed behavior when the derived iterators are consumed concurrently.
+
+ This function is similar to :func:`itertools.tee`, but is intended for cases
+ where the source iterator may feed consumers running in different threads.
+ Each returned iterator yields every value from the underlying iterable, in
+ the same order.
+
+ Internally, values are buffered until every derived iterator has consumed
+ them.
+
+ The returned iterators share the same underlying synchronization lock. Each
+ individual derived iterator is intended to be consumed by one thread at a
+ time. If a single derived iterator must itself be shared by multiple
+ threads, wrap it with :class:`serialize_iterator`.
+
+ If *n* is ``0``, return an empty tuple. If *n* is negative, raise
+ :exc:`ValueError`.
+
+ Example:
+
+ .. code-block:: python
+
+ import threading
+
+ def squares(n):
+ for x in range(n):
+ yield x * x
+
+ def consume(name, iterable):
+ for item in iterable:
+ print(name, item)
+
+ source = squares(5)
+ left, right = threading.concurrent_tee(source)
+
+ t1 = threading.Thread(target=consume, args=("left", left))
+ t2 = threading.Thread(target=consume, args=("right", right))
+ t1.start()
+ t2.start()
+ t1.join()
+ t2.join()
+
+ In this example, both consumer threads see the full sequence of squares
+ from a single generator expression.
+
+ .. versionadded:: next
(Contributed by Christoph Walcher in :gh:`57911`.)
+threading
+---------
+
+* Added :class:`~threading.serialize_iterator`,
+ :func:`~threading.synchronized_iterator`,
+ and :func:`~threading.concurrent_tee` to support concurrent access to
+ generators and iterators.
+ (Contributed by Raymond Hettinger in :gh:`124397`.)
+
+
timeit
------
barriertype = staticmethod(threading.Barrier)
+## Test Synchronization tools for iterators ################
+
+class ThreadingIteratorToolsTests(BaseTestCase):
+ def test_serialize_serializes_concurrent_iteration(self):
+ limit = 10_000
+ workers_count = 10
+ result = 0
+ result_lock = threading.Lock()
+ start = threading.Event()
+
+ def producer(limit):
+ for x in range(limit):
+ yield x
+
+ def consumer(iterator):
+ nonlocal result
+ start.wait()
+ total = 0
+ for x in iterator:
+ total += x
+ with result_lock:
+ result += total
+
+ iterator = threading.serialize_iterator(producer(limit))
+ workers = [
+ threading.Thread(target=consumer, args=(iterator,))
+ for _ in range(workers_count)
+ ]
+ with threading_helper.wait_threads_exit():
+ for worker in workers:
+ worker.start()
+ for worker in workers:
+ # Wait for the worker thread to actually start.
+ while worker.ident is None:
+ time.sleep(0.1)
+ start.set()
+ for worker in workers:
+ worker.join()
+
+ self.assertEqual(result, limit * (limit - 1) // 2)
+
+ def test_serialize_generator_methods(self):
+ # A generator that yields and receives
+ def echo():
+ try:
+ while True:
+ val = yield "ready"
+ yield f"received {val}"
+ except ValueError:
+ yield "caught"
+
+ it = threading.serialize_iterator(echo())
+
+ # Test __next__
+ self.assertEqual(next(it), "ready")
+
+ # Test send()
+ self.assertEqual(it.send("hello"), "received hello")
+ self.assertEqual(next(it), "ready")
+
+ # Test throw()
+ self.assertEqual(it.throw(ValueError), "caught")
+
+ # Test close()
+ it.close()
+ with self.assertRaises(StopIteration):
+ next(it)
+
+ def test_serialize_methods_attribute_error(self):
+ # A standard iterator that does not have send/throw/close
+ # should raise AttributeError when called.
+ standard_it = threading.serialize_iterator([1, 2, 3])
+
+ with self.assertRaises(AttributeError):
+ standard_it.send("foo")
+
+ with self.assertRaises(AttributeError):
+ standard_it.throw(ValueError)
+
+ with self.assertRaises(AttributeError):
+ standard_it.close()
+
+ def test_serialize_generator_methods_locking(self):
+ # Verifies that generator methods also acquire the lock.
+ # We can test this by checking if the lock is held during the call.
+
+ class LockCheckingGenerator:
+ def __init__(self, lock):
+ self.lock = lock
+ def __iter__(self):
+ return self
+ def send(self, value):
+ if not self.lock.locked():
+ raise RuntimeError("Lock not held during send()")
+ return value
+ def throw(self, *args):
+ if not self.lock.locked():
+ raise RuntimeError("Lock not held during throw()")
+ def close(self):
+ if not self.lock.locked():
+ raise RuntimeError("Lock not held during close()")
+
+ # Manually create the serialize object to inspect the lock
+ it = threading.serialize_iterator([])
+ mock_gen = LockCheckingGenerator(it._lock)
+ it._iterator = mock_gen
+
+ # These should not raise RuntimeError
+ it.send(1)
+ it.throw(ValueError)
+ it.close()
+
+ def test_serialize_next_exception(self):
+ # Verify exception pass through for calls to next()
+
+ def f():
+ raise RuntimeError
+ yield None
+
+ g = threading.serialize_iterator(f())
+ with self.assertRaises(RuntimeError):
+ next(g)
+
+ def test_synchronized_serializes_generator_instances(self):
+ unique = 10
+ repetitions = 5
+ limit = 100
+ start = threading.Event()
+
+ @threading.synchronized_iterator
+ def atomic_counter():
+ # The sleep widens the race window that would exist without
+ # synchronization between yielding a value and advancing state.
+ i = 0
+ while True:
+ yield i
+ time.sleep(0.0005)
+ i += 1
+
+ def consumer(counter):
+ start.wait()
+ for _ in range(limit):
+ next(counter)
+
+ unique_counters = [atomic_counter() for _ in range(unique)]
+ counters = unique_counters * repetitions
+ workers = [
+ threading.Thread(target=consumer, args=(counter,))
+ for counter in counters
+ ]
+ with threading_helper.wait_threads_exit():
+ for worker in workers:
+ worker.start()
+ start.set()
+ for worker in workers:
+ worker.join()
+
+ self.assertEqual(
+ {next(counter) for counter in unique_counters},
+ {limit * repetitions},
+ )
+
+ def test_synchronized_preserves_wrapped_metadata(self):
+ def gen():
+ yield 1
+
+ wrapped = threading.synchronized_iterator(gen)
+
+ self.assertEqual(wrapped.__name__, gen.__name__)
+ self.assertIs(wrapped.__wrapped__, gen)
+ self.assertEqual(list(wrapped()), [1])
+
+ def test_concurrent_tee_supports_concurrent_consumers(self):
+ limit = 5_000
+ num_threads = 25
+ successes = 0
+ failures = []
+ result_lock = threading.Lock()
+ start = threading.Event()
+ expected = list(range(limit))
+
+ def producer(limit):
+ for x in range(limit):
+ yield x
+
+ def consumer(iterator):
+ nonlocal successes
+ start.wait()
+ items = list(iterator)
+ with result_lock:
+ if items == expected:
+ successes += 1
+ else:
+ failures.append(items[:20])
+
+ tees = threading.concurrent_tee(producer(limit), n=num_threads)
+ workers = [
+ threading.Thread(target=consumer, args=(iterator,))
+ for iterator in tees
+ ]
+ with threading_helper.wait_threads_exit():
+ for worker in workers:
+ worker.start()
+ start.set()
+ for worker in workers:
+ worker.join()
+
+ self.assertEqual(failures, [])
+ self.assertEqual(successes, len(tees))
+
+ # Verify that locks are shared
+ self.assertEqual(len({id(t_obj.lock) for t_obj in tees}), 1)
+
+ def test_concurrent_tee_zero_iterators(self):
+ self.assertEqual(threading.concurrent_tee(range(10), n=0), ())
+
+ def test_concurrent_tee_negative_n(self):
+ with self.assertRaises(ValueError):
+ threading.concurrent_tee(range(10), n=-1)
+
+
+#################
+
+
+
class MiscTestCase(unittest.TestCase):
def test__all__(self):
restore_default_excepthook(self)
'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError',
'setprofile', 'settrace', 'local', 'stack_size',
'excepthook', 'ExceptHookArgs', 'gettrace', 'getprofile',
+ 'serialize_iterator', 'synchronized_iterator', 'concurrent_tee',
'setprofile_all_threads','settrace_all_threads']
# Rename some stuff so "from threading import *" is safe
pass
+## Synchronization tools for iterators #####################
+
+class serialize_iterator:
+ """Wrap a non-concurrent iterator with a lock to enforce sequential access.
+
+ Applies a non-reentrant lock around calls to __next__. If the
+ wrapped iterator also defines send(), throw(), or close(), those
+ calls are serialized as well.
+
+ Allows iterator and generator instances to be shared by multiple consumer
+ threads.
+
+ For example, itertools.count does not make thread-safe instances,
+ but that is easily fixed with:
+
+ atomic_counter = serialize_iterator(itertools.count())
+
+ """
+
+ __slots__ = ('_iterator', '_lock')
+
+ def __init__(self, iterable):
+ self._iterator = iter(iterable)
+ self._lock = Lock()
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ with self._lock:
+ return next(self._iterator)
+
+ def send(self, value, /):
+ """Send a value to a generator.
+
+ Raises AttributeError if not a generator.
+ """
+ with self._lock:
+ return self._iterator.send(value)
+
+ def throw(self, *args):
+ """Call throw() on a generator.
+
+ Raises AttributeError if not a generator.
+ """
+ with self._lock:
+ return self._iterator.throw(*args)
+
+ def close(self):
+ """Call close() on a generator.
+
+ Raises AttributeError if not a generator.
+ """
+ with self._lock:
+ return self._iterator.close()
+
+
+def synchronized_iterator(func):
+ """Wrap an iterator-returning callable to make its iterators thread-safe.
+
+ Existing itertools and more-itertools can be wrapped so that their
+ iterator instances are serialized.
+
+ For example, itertools.count does not make thread-safe instances,
+ but that is easily fixed with:
+
+ atomic_counter = synchronized_iterator(itertools.count)
+
+ Can also be used as a decorator for generator function definitions
+ so that the generator instances are serialized::
+
+ import time
+
+ @synchronized_iterator
+ def enumerate_and_timestamp(iterable):
+ for count, value in enumerate(iterable):
+ yield count, time.time_ns(), value
+
+ """
+
+ from functools import wraps
+
+ @wraps(func)
+ def inner(*args, **kwargs):
+ iterator = func(*args, **kwargs)
+ return serialize_iterator(iterator)
+
+ return inner
+
+
+def concurrent_tee(iterable, n=2):
+ """Variant of itertools.tee() but with guaranteed threading semantics.
+
+ Takes a non-threadsafe iterator as an input and creates concurrent
+ tee objects for other threads to have reliable independent copies of
+ the data stream.
+
+ The new iterators are only thread-safe if consumed within a single thread.
+ To share just one of the new iterators across multiple threads, wrap it
+ with threading.serialize_iterator().
+ """
+
+ if n < 0:
+ raise ValueError("n must be a non-negative integer")
+ if n == 0:
+ return ()
+ iterator = _concurrent_tee(iterable)
+ result = [iterator]
+ for _ in range(n - 1):
+ result.append(_concurrent_tee(iterator))
+ return tuple(result)
+
+
+class _concurrent_tee:
+ __slots__ = ('iterator', 'link', 'lock')
+
+ def __init__(self, iterable):
+ if isinstance(iterable, _concurrent_tee):
+ self.iterator = iterable.iterator
+ self.link = iterable.link
+ self.lock = iterable.lock
+ else:
+ self.iterator = iter(iterable)
+ self.link = [None, None]
+ self.lock = Lock()
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ link = self.link
+ if link[1] is None:
+ with self.lock:
+ if link[1] is None:
+ link[0] = next(self.iterator)
+ link[1] = [None, None]
+ value, self.link = link
+ return value
+
+############################################################
+
+
# Helper to generate new thread names
_counter = _count(1).__next__
def _newname(name_template):
--- /dev/null
+The threading module added tooling to support concurrent iterator access:
+:class:`threading.serialize_iterator`, :func:`threading.synchronized_iterator`,
+and :func:`threading.concurrent_tee`.