]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-124397: Add free-threading support for iterators. (gh-148894)
authorRaymond Hettinger <rhettinger@users.noreply.github.com>
Fri, 1 May 2026 21:31:00 +0000 (16:31 -0500)
committerGitHub <noreply@github.com>
Fri, 1 May 2026 21:31:00 +0000 (16:31 -0500)
Doc/library/threading.rst
Doc/whatsnew/3.15.rst
Lib/test/test_threading.py
Lib/threading.py
Misc/NEWS.d/next/Library/2026-04-22-20-49-49.gh-issue-124397.plMglV.rst [new file with mode: 0644]

index 19cc4f191dff8d9f28c7cc1cf16f5b472ba57990..fbe3951e034d072aa0cf88c22dc44f4d56652d24 100644 (file)
@@ -1436,3 +1436,159 @@ is equivalent to::
 Currently, :class:`Lock`, :class:`RLock`, :class:`Condition`,
 :class:`Semaphore`, and :class:`BoundedSemaphore` objects may be used as
 :keyword:`with` statement context managers.
+
+
+Iterator synchronization
+------------------------
+
+By default, Python iterators do not support concurrent access. Most iterators make
+no guarantees when accessed simultaneously from multiple threads. Generator
+iterators, for example, raise :exc:`ValueError` if one of their iterator methods
+is called while the generator is already executing. The tools in this section
+allow reliable concurrency support to be added to ordinary iterators and
+iterator-producing callables.
+
+The :class:`serialize_iterator` wrapper lets multiple threads share a single iterator and
+take turns consuming from it. While one thread is running ``__next__()``, the
+others block until the iterator becomes available. Each value produced by the
+underlying iterator is delivered to exactly one caller.
+
+The :func:`concurrent_tee` function lets multiple threads each receive the full
+stream of values from one underlying iterator. It creates independent iterators
+that all draw from the same source. Values are buffered until consumed by all
+of the derived iterators.
+
+.. class:: serialize_iterator(iterable)
+
+   Return an iterator wrapper that serializes concurrent calls to
+   :meth:`~iterator.__next__` using a lock.
+
+   If the wrapped iterator also defines :meth:`~generator.send`,
+   :meth:`~generator.throw`, or :meth:`~generator.close`, those calls
+   are serialized as well.
+
+   This makes it possible to share a single iterator, including a generator
+   iterator, between multiple threads. A lock ensures that calls are handled
+   one at a time. No values are duplicated or skipped by the wrapper itself.
+   Each item from the underlying iterator is given to exactly one caller.
+
+   This wrapper does not copy or buffer values. Threads that call
+   :func:`next` while another thread is already advancing the iterator will
+   block until the active call completes.
+
+   Example:
+
+   .. code-block:: python
+
+      import threading
+
+      def squares(n):
+          for x in range(n):
+              yield x * x
+
+      def consume(name, iterable):
+          for item in iterable:
+              print(name, item)
+
+      source = threading.serialize_iterator(squares(5))
+
+      t1 = threading.Thread(target=consume, args=("left", source))
+      t2 = threading.Thread(target=consume, args=("right", source))
+      t1.start()
+      t2.start()
+      t1.join()
+      t2.join()
+
+   In this example, each number is printed exactly once, but the work is shared
+   between the two threads.
+
+   .. versionadded:: next
+
+
+.. function:: synchronized_iterator(func)
+
+   Wrap an iterator-producing callable so that each iterator it returns is
+   automatically passed through :class:`serialize_iterator`.
+
+   This is especially useful as a :term:`decorator` for generator functions,
+   allowing their generator-iterators to be consumed from multiple threads.
+
+   Example:
+
+   .. code-block:: python
+
+      import threading
+
+      @threading.synchronized_iterator
+      def squares(n):
+          for x in range(n):
+              yield x * x
+
+      def consume(name, iterable):
+          for item in iterable:
+              print(name, item)
+
+      source = squares(5)
+
+      t1 = threading.Thread(target=consume, args=("left", source))
+      t2 = threading.Thread(target=consume, args=("right", source))
+      t1.start()
+      t2.start()
+      t1.join()
+      t2.join()
+
+   The returned wrapper preserves the metadata of *func*, such as its name and
+   wrapped function reference.
+
+   .. versionadded:: next
+
+
+.. function:: concurrent_tee(iterable, n=2)
+
+   Return *n* independent iterators from a single input *iterable*, with
+   guaranteed behavior when the derived iterators are consumed concurrently.
+
+   This function is similar to :func:`itertools.tee`, but is intended for cases
+   where the source iterator may feed consumers running in different threads.
+   Each returned iterator yields every value from the underlying iterable, in
+   the same order.
+
+   Internally, values are buffered until every derived iterator has consumed
+   them.
+
+   The returned iterators share the same underlying synchronization lock. Each
+   individual derived iterator is intended to be consumed by one thread at a
+   time. If a single derived iterator must itself be shared by multiple
+   threads, wrap it with :class:`serialize_iterator`.
+
+   If *n* is ``0``, return an empty tuple. If *n* is negative, raise
+   :exc:`ValueError`.
+
+   Example:
+
+   .. code-block:: python
+
+      import threading
+
+      def squares(n):
+          for x in range(n):
+              yield x * x
+
+      def consume(name, iterable):
+          for item in iterable:
+              print(name, item)
+
+      source = squares(5)
+      left, right = threading.concurrent_tee(source)
+
+      t1 = threading.Thread(target=consume, args=("left", left))
+      t2 = threading.Thread(target=consume, args=("right", right))
+      t1.start()
+      t2.start()
+      t1.join()
+      t2.join()
+
+   In this example, both consumer threads see the full sequence of squares
+   from a single generator expression.
+
+   .. versionadded:: next
index b075441fdeaa3a2d53e2bd0b8a617aeb85bfb0ab..b63e7a4790e9af60a36c8009c4990e34fb091dcb 100644 (file)
@@ -1279,6 +1279,16 @@ tarfile
   (Contributed by Christoph Walcher in :gh:`57911`.)
 
 
+threading
+---------
+
+* Added :class:`~threading.serialize_iterator`,
+  :func:`~threading.synchronized_iterator`,
+  and :func:`~threading.concurrent_tee` to support concurrent access to
+  generators and iterators.
+  (Contributed by Raymond Hettinger in :gh:`124397`.)
+
+
 timeit
 ------
 
index 0ca91ce0d7899d211eee1caadbcea6ebd949b989..3d01804513bde982dd5265974393f19b4e8661f5 100644 (file)
@@ -2368,6 +2368,231 @@ class BarrierTests(lock_tests.BarrierTests):
     barriertype = staticmethod(threading.Barrier)
 
 
+## Test Synchronization tools for iterators ################
+
+class ThreadingIteratorToolsTests(BaseTestCase):
+    def test_serialize_serializes_concurrent_iteration(self):
+        limit = 10_000
+        workers_count = 10
+        result = 0
+        result_lock = threading.Lock()
+        start = threading.Event()
+
+        def producer(limit):
+            for x in range(limit):
+                yield x
+
+        def consumer(iterator):
+            nonlocal result
+            start.wait()
+            total = 0
+            for x in iterator:
+                total += x
+            with result_lock:
+                result += total
+
+        iterator = threading.serialize_iterator(producer(limit))
+        workers = [
+            threading.Thread(target=consumer, args=(iterator,))
+            for _ in range(workers_count)
+        ]
+        with threading_helper.wait_threads_exit():
+            for worker in workers:
+                worker.start()
+            for worker in workers:
+                # Wait for the worker thread to actually start.
+                while worker.ident is None:
+                    time.sleep(0.1)
+            start.set()
+            for worker in workers:
+                worker.join()
+
+        self.assertEqual(result, limit * (limit - 1) // 2)
+
+    def test_serialize_generator_methods(self):
+        # A generator that yields and receives
+        def echo():
+            try:
+                while True:
+                    val = yield "ready"
+                    yield f"received {val}"
+            except ValueError:
+                yield "caught"
+
+        it = threading.serialize_iterator(echo())
+
+        # Test __next__
+        self.assertEqual(next(it), "ready")
+
+        # Test send()
+        self.assertEqual(it.send("hello"), "received hello")
+        self.assertEqual(next(it), "ready")
+
+        # Test throw()
+        self.assertEqual(it.throw(ValueError), "caught")
+
+        # Test close()
+        it.close()
+        with self.assertRaises(StopIteration):
+            next(it)
+
+    def test_serialize_methods_attribute_error(self):
+        # A standard iterator that does not have send/throw/close
+        # should raise AttributeError when called.
+        standard_it = threading.serialize_iterator([1, 2, 3])
+
+        with self.assertRaises(AttributeError):
+            standard_it.send("foo")
+
+        with self.assertRaises(AttributeError):
+            standard_it.throw(ValueError)
+
+        with self.assertRaises(AttributeError):
+            standard_it.close()
+
+    def test_serialize_generator_methods_locking(self):
+        # Verifies that generator methods also acquire the lock.
+        # We can test this by checking if the lock is held during the call.
+
+        class LockCheckingGenerator:
+            def __init__(self, lock):
+                self.lock = lock
+            def __iter__(self):
+                return self
+            def send(self, value):
+                if not self.lock.locked():
+                    raise RuntimeError("Lock not held during send()")
+                return value
+            def throw(self, *args):
+                if not self.lock.locked():
+                    raise RuntimeError("Lock not held during throw()")
+            def close(self):
+                if not self.lock.locked():
+                    raise RuntimeError("Lock not held during close()")
+
+        # Manually create the serialize object to inspect the lock
+        it = threading.serialize_iterator([])
+        mock_gen = LockCheckingGenerator(it._lock)
+        it._iterator = mock_gen
+
+        # These should not raise RuntimeError
+        it.send(1)
+        it.throw(ValueError)
+        it.close()
+
+    def test_serialize_next_exception(self):
+        # Verify exception pass through for calls to next()
+
+        def f():
+            raise RuntimeError
+            yield None
+
+        g = threading.serialize_iterator(f())
+        with self.assertRaises(RuntimeError):
+            next(g)
+
+    def test_synchronized_serializes_generator_instances(self):
+        unique = 10
+        repetitions = 5
+        limit = 100
+        start = threading.Event()
+
+        @threading.synchronized_iterator
+        def atomic_counter():
+            # The sleep widens the race window that would exist without
+            # synchronization between yielding a value and advancing state.
+            i = 0
+            while True:
+                yield i
+                time.sleep(0.0005)
+                i += 1
+
+        def consumer(counter):
+            start.wait()
+            for _ in range(limit):
+                next(counter)
+
+        unique_counters = [atomic_counter() for _ in range(unique)]
+        counters = unique_counters * repetitions
+        workers = [
+            threading.Thread(target=consumer, args=(counter,))
+            for counter in counters
+        ]
+        with threading_helper.wait_threads_exit():
+            for worker in workers:
+                worker.start()
+            start.set()
+            for worker in workers:
+                worker.join()
+
+        self.assertEqual(
+            {next(counter) for counter in unique_counters},
+            {limit * repetitions},
+        )
+
+    def test_synchronized_preserves_wrapped_metadata(self):
+        def gen():
+            yield 1
+
+        wrapped = threading.synchronized_iterator(gen)
+
+        self.assertEqual(wrapped.__name__, gen.__name__)
+        self.assertIs(wrapped.__wrapped__, gen)
+        self.assertEqual(list(wrapped()), [1])
+
+    def test_concurrent_tee_supports_concurrent_consumers(self):
+        limit = 5_000
+        num_threads = 25
+        successes = 0
+        failures = []
+        result_lock = threading.Lock()
+        start = threading.Event()
+        expected = list(range(limit))
+
+        def producer(limit):
+            for x in range(limit):
+                yield x
+
+        def consumer(iterator):
+            nonlocal successes
+            start.wait()
+            items = list(iterator)
+            with result_lock:
+                if items == expected:
+                    successes += 1
+                else:
+                    failures.append(items[:20])
+
+        tees = threading.concurrent_tee(producer(limit), n=num_threads)
+        workers = [
+            threading.Thread(target=consumer, args=(iterator,))
+            for iterator in tees
+        ]
+        with threading_helper.wait_threads_exit():
+            for worker in workers:
+                worker.start()
+            start.set()
+            for worker in workers:
+                worker.join()
+
+        self.assertEqual(failures, [])
+        self.assertEqual(successes, len(tees))
+
+        # Verify that locks are shared
+        self.assertEqual(len({id(t_obj.lock) for t_obj in tees}), 1)
+
+    def test_concurrent_tee_zero_iterators(self):
+        self.assertEqual(threading.concurrent_tee(range(10), n=0), ())
+
+    def test_concurrent_tee_negative_n(self):
+        with self.assertRaises(ValueError):
+            threading.concurrent_tee(range(10), n=-1)
+
+
+#################
+
+
+
 class MiscTestCase(unittest.TestCase):
     def test__all__(self):
         restore_default_excepthook(self)
index 4ebceae70298700a257613ec988ab3e8272ef457..abac31e25886fae34188b80f8c9c3a4c1efe2c76 100644 (file)
@@ -29,6 +29,7 @@ __all__ = ['get_ident', 'active_count', 'Condition', 'current_thread',
            'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError',
            'setprofile', 'settrace', 'local', 'stack_size',
            'excepthook', 'ExceptHookArgs', 'gettrace', 'getprofile',
+           'serialize_iterator', 'synchronized_iterator', 'concurrent_tee',
            'setprofile_all_threads','settrace_all_threads']
 
 # Rename some stuff so "from threading import *" is safe
@@ -842,6 +843,148 @@ class BrokenBarrierError(RuntimeError):
     pass
 
 
+## Synchronization tools for iterators #####################
+
+class serialize_iterator:
+    """Wrap a non-concurrent iterator with a lock to enforce sequential access.
+
+    Applies a non-reentrant lock around calls to __next__.  If the
+    wrapped iterator also defines send(), throw(), or close(), those
+    calls are serialized as well.
+
+    Allows iterator and generator instances to be shared by multiple consumer
+    threads.
+
+    For example, itertools.count does not make thread-safe instances,
+    but that is easily fixed with:
+
+        atomic_counter = serialize_iterator(itertools.count())
+
+    """
+
+    __slots__ = ('_iterator', '_lock')
+
+    def __init__(self, iterable):
+        self._iterator = iter(iterable)
+        self._lock = Lock()
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        with self._lock:
+            return next(self._iterator)
+
+    def send(self, value, /):
+        """Send a value to a generator.
+
+        Raises AttributeError if not a generator.
+        """
+        with self._lock:
+            return self._iterator.send(value)
+
+    def throw(self, *args):
+        """Call throw() on a generator.
+
+        Raises AttributeError if not a generator.
+        """
+        with self._lock:
+            return self._iterator.throw(*args)
+
+    def close(self):
+        """Call close() on a generator.
+
+        Raises AttributeError if not a generator.
+        """
+        with self._lock:
+            return self._iterator.close()
+
+
+def synchronized_iterator(func):
+    """Wrap an iterator-returning callable to make its iterators thread-safe.
+
+    Existing itertools and more-itertools can be wrapped so that their
+    iterator instances are serialized.
+
+    For example, itertools.count does not make thread-safe instances,
+    but that is easily fixed with:
+
+        atomic_counter = synchronized_iterator(itertools.count)
+
+    Can also be used as a decorator for generator function definitions
+    so that the generator instances are serialized::
+
+        import time
+
+        @synchronized_iterator
+        def enumerate_and_timestamp(iterable):
+            for count, value in enumerate(iterable):
+                yield count, time.time_ns(), value
+
+    """
+
+    from functools import wraps
+
+    @wraps(func)
+    def inner(*args, **kwargs):
+        iterator = func(*args, **kwargs)
+        return serialize_iterator(iterator)
+
+    return inner
+
+
+def concurrent_tee(iterable, n=2):
+    """Variant of itertools.tee() but with guaranteed threading semantics.
+
+    Takes a non-threadsafe iterator as an input and creates concurrent
+    tee objects for other threads to have reliable independent copies of
+    the data stream.
+
+    The new iterators are only thread-safe if consumed within a single thread.
+    To share just one of the new iterators across multiple threads, wrap it
+    with threading.serialize_iterator().
+    """
+
+    if n < 0:
+        raise ValueError("n must be a non-negative integer")
+    if n == 0:
+        return ()
+    iterator = _concurrent_tee(iterable)
+    result = [iterator]
+    for _ in range(n - 1):
+        result.append(_concurrent_tee(iterator))
+    return tuple(result)
+
+
+class _concurrent_tee:
+    __slots__ = ('iterator', 'link', 'lock')
+
+    def __init__(self, iterable):
+        if isinstance(iterable, _concurrent_tee):
+            self.iterator = iterable.iterator
+            self.link = iterable.link
+            self.lock = iterable.lock
+        else:
+            self.iterator = iter(iterable)
+            self.link = [None, None]
+            self.lock = Lock()
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        link = self.link
+        if link[1] is None:
+            with self.lock:
+                if link[1] is None:
+                    link[0] = next(self.iterator)
+                    link[1] = [None, None]
+        value, self.link = link
+        return value
+
+############################################################
+
+
 # Helper to generate new thread names
 _counter = _count(1).__next__
 def _newname(name_template):
diff --git a/Misc/NEWS.d/next/Library/2026-04-22-20-49-49.gh-issue-124397.plMglV.rst b/Misc/NEWS.d/next/Library/2026-04-22-20-49-49.gh-issue-124397.plMglV.rst
new file mode 100644 (file)
index 0000000..431448a
--- /dev/null
@@ -0,0 +1,3 @@
+The threading module added tooling to support concurrent iterator access:
+:class:`threading.serialize_iterator`, :func:`threading.synchronized_iterator`,
+and :func:`threading.concurrent_tee`.