]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-124694: Add concurrent.futures.InterpreterPoolExecutor (gh-124548)
authorEric Snow <ericsnowcurrently@gmail.com>
Wed, 16 Oct 2024 22:50:46 +0000 (16:50 -0600)
committerGitHub <noreply@github.com>
Wed, 16 Oct 2024 22:50:46 +0000 (16:50 -0600)
This is an implementation of InterpreterPoolExecutor that builds on ThreadPoolExecutor.

(Note that this is not tied to PEP 734, which is strictly about adding a new stdlib module.)

Possible future improvements:

* support passing a script for the initializer or to submit()
* support passing (most) arbitrary functions without pickling
* support passing closures
* optionally exec functions against __main__ instead of the their original module

12 files changed:
Doc/library/asyncio-dev.rst
Doc/library/asyncio-eventloop.rst
Doc/library/asyncio-llapi-index.rst
Doc/library/concurrent.futures.rst
Doc/whatsnew/3.14.rst
Lib/concurrent/futures/__init__.py
Lib/concurrent/futures/interpreter.py [new file with mode: 0644]
Lib/concurrent/futures/thread.py
Lib/test/test_concurrent_futures/executor.py
Lib/test/test_concurrent_futures/test_interpreter_pool.py [new file with mode: 0644]
Lib/test/test_concurrent_futures/util.py
Misc/NEWS.d/next/Library/2024-09-27-15-42-55.gh-issue-124694.uUy32y.rst [new file with mode: 0644]

index a9c3a0183bb72d2be560ee2dff720cf36d022588..44b507a981111671a7b9067000a9049f579af32d 100644 (file)
@@ -103,7 +103,8 @@ To handle signals the event loop must be
 run in the main thread.
 
 The :meth:`loop.run_in_executor` method can be used with a
-:class:`concurrent.futures.ThreadPoolExecutor` to execute
+:class:`concurrent.futures.ThreadPoolExecutor` or
+:class:`~concurrent.futures.InterpreterPoolExecutor` to execute
 blocking code in a different OS thread without blocking the OS thread
 that the event loop runs in.
 
@@ -128,7 +129,8 @@ if a function performs a CPU-intensive calculation for 1 second,
 all concurrent asyncio Tasks and IO operations would be delayed
 by 1 second.
 
-An executor can be used to run a task in a different thread or even in
+An executor can be used to run a task in a different thread,
+including in a different interpreter, or even in
 a different process to avoid blocking the OS thread with the
 event loop.  See the :meth:`loop.run_in_executor` method for more
 details.
index 943683f6b8a7f6cb3d90908f82f8f6ddeb9b5a90..14fd153f640f05de75656abf958aacb6407f253a 100644 (file)
@@ -1305,6 +1305,12 @@ Executing code in thread or process pools
                   pool, cpu_bound)
               print('custom process pool', result)
 
+          # 4. Run in a custom interpreter pool:
+          with concurrent.futures.InterpreterPoolExecutor() as pool:
+              result = await loop.run_in_executor(
+                  pool, cpu_bound)
+              print('custom interpreter pool', result)
+
       if __name__ == '__main__':
           asyncio.run(main())
 
@@ -1329,7 +1335,8 @@ Executing code in thread or process pools
 
    Set *executor* as the default executor used by :meth:`run_in_executor`.
    *executor* must be an instance of
-   :class:`~concurrent.futures.ThreadPoolExecutor`.
+   :class:`~concurrent.futures.ThreadPoolExecutor`, which includes
+   :class:`~concurrent.futures.InterpreterPoolExecutor`.
 
    .. versionchanged:: 3.11
       *executor* must be an instance of
index 3e21054aa4fe9e644be3db6730863bf3171cb231..f5af888f31f18635ddf7c7c72ffcc6b8d0641c13 100644 (file)
@@ -96,7 +96,7 @@ See also the main documentation section about the
       - Invoke a callback *at* the given time.
 
 
-.. rubric:: Thread/Process Pool
+.. rubric:: Thread/Interpreter/Process Pool
 .. list-table::
     :widths: 50 50
     :class: full-width-table
index ce72127127c7a61bafc33549c82710590155ce90..45a73705f10e92ab29c3ce28ead1c516ea139ff3 100644 (file)
@@ -15,9 +15,10 @@ The :mod:`concurrent.futures` module provides a high-level interface for
 asynchronously executing callables.
 
 The asynchronous execution can be performed with threads, using
-:class:`ThreadPoolExecutor`, or separate processes, using
-:class:`ProcessPoolExecutor`.  Both implement the same interface, which is
-defined by the abstract :class:`Executor` class.
+:class:`ThreadPoolExecutor` or :class:`InterpreterPoolExecutor`,
+or separate processes, using :class:`ProcessPoolExecutor`.
+Each implements the same interface, which is defined
+by the abstract :class:`Executor` class.
 
 .. include:: ../includes/wasm-notavail.rst
 
@@ -63,7 +64,8 @@ Executor Objects
       setting *chunksize* to a positive integer.  For very long iterables,
       using a large value for *chunksize* can significantly improve
       performance compared to the default size of 1.  With
-      :class:`ThreadPoolExecutor`, *chunksize* has no effect.
+      :class:`ThreadPoolExecutor` and :class:`InterpreterPoolExecutor`,
+      *chunksize* has no effect.
 
       .. versionchanged:: 3.5
          Added the *chunksize* argument.
@@ -227,6 +229,111 @@ ThreadPoolExecutor Example
                print('%r page is %d bytes' % (url, len(data)))
 
 
+InterpreterPoolExecutor
+-----------------------
+
+The :class:`InterpreterPoolExecutor` class uses a pool of interpreters
+to execute calls asynchronously.  It is a :class:`ThreadPoolExecutor`
+subclass, which means each worker is running in its own thread.
+The difference here is that each worker has its own interpreter,
+and runs each task using that interpreter.
+
+The biggest benefit to using interpreters instead of only threads
+is true multi-core parallelism.  Each interpreter has its own
+:term:`Global Interpreter Lock <global interpreter lock>`, so code
+running in one interpreter can run on one CPU core, while code in
+another interpreter runs unblocked on a different core.
+
+The tradeoff is that writing concurrent code for use with multiple
+interpreters can take extra effort.  However, this is because it
+forces you to be deliberate about how and when interpreters interact,
+and to be explicit about what data is shared between interpreters.
+This results in several benefits that help balance the extra effort,
+including true multi-core parallelism,  For example, code written
+this way can make it easier to reason about concurrency.  Another
+major benefit is that you don't have to deal with several of the
+big pain points of using threads, like nrace conditions.
+
+Each worker's interpreter is isolated from all the other interpreters.
+"Isolated" means each interpreter has its own runtime state and
+operates completely independently.  For example, if you redirect
+:data:`sys.stdout` in one interpreter, it will not be automatically
+redirected any other interpreter.  If you import a module in one
+interpreter, it is not automatically imported in any other.  You
+would need to import the module separately in interpreter where
+you need it.  In fact, each module imported in an interpreter is
+a completely separate object from the same module in a different
+interpreter, including :mod:`sys`, :mod:`builtins`,
+and even ``__main__``.
+
+Isolation means a mutable object, or other data, cannot be used
+by more than one interpreter at the same time.  That effectively means
+interpreters cannot actually share such objects or data.  Instead,
+each interpreter must have its own copy, and you will have to
+synchronize any changes between the copies manually.  Immutable
+objects and data, like the builtin singletons, strings, and tuples
+of immutable objects, don't have these limitations.
+
+Communicating and synchronizing between interpreters is most effectively
+done using dedicated tools, like those proposed in :pep:`734`.  One less
+efficient alternative is to serialize with :mod:`pickle` and then send
+the bytes over a shared :mod:`socket <socket>` or
+:func:`pipe <os.pipe>`.
+
+.. class:: InterpreterPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=(), shared=None)
+
+   A :class:`ThreadPoolExecutor` subclass that executes calls asynchronously
+   using a pool of at most *max_workers* threads.  Each thread runs
+   tasks in its own interpreter.  The worker interpreters are isolated
+   from each other, which means each has its own runtime state and that
+   they can't share any mutable objects or other data.  Each interpreter
+   has its own :term:`Global Interpreter Lock <global interpreter lock>`,
+   which means code run with this executor has true multi-core parallelism.
+
+   The optional *initializer* and *initargs* arguments have the same
+   meaning as for :class:`!ThreadPoolExecutor`: the initializer is run
+   when each worker is created, though in this case it is run.in
+   the worker's interpreter.  The executor serializes the *initializer*
+   and *initargs* using :mod:`pickle` when sending them to the worker's
+   interpreter.
+
+   .. note::
+      Functions defined in the ``__main__`` module cannot be pickled
+      and thus cannot be used.
+
+   .. note::
+      The executor may replace uncaught exceptions from *initializer*
+      with :class:`~concurrent.futures.interpreter.ExecutionFailed`.
+
+   The optional *shared* argument is a :class:`dict` of objects that all
+   interpreters in the pool share.  The *shared* items are added to each
+   interpreter's ``__main__`` module.  Not all objects are shareable.
+   Shareable objects include the builtin singletons, :class:`str`
+   and :class:`bytes`, and :class:`memoryview`.  See :pep:`734`
+   for more info.
+
+   Other caveats from parent :class:`ThreadPoolExecutor` apply here.
+
+:meth:`~Executor.submit` and :meth:`~Executor.map` work like normal,
+except the worker serializes the callable and arguments using
+:mod:`pickle` when sending them to its interpreter.  The worker
+likewise serializes the return value when sending it back.
+
+.. note::
+   Functions defined in the ``__main__`` module cannot be pickled
+   and thus cannot be used.
+
+When a worker's current task raises an uncaught exception, the worker
+always tries to preserve the exception as-is.  If that is successful
+then it also sets the ``__cause__`` to a corresponding
+:class:`~concurrent.futures.interpreter.ExecutionFailed`
+instance, which contains a summary of the original exception.
+In the uncommon case that the worker is not able to preserve the
+original as-is then it directly preserves the corresponding
+:class:`~concurrent.futures.interpreter.ExecutionFailed`
+instance instead.
+
+
 ProcessPoolExecutor
 -------------------
 
@@ -574,6 +681,26 @@ Exception classes
 
    .. versionadded:: 3.7
 
+.. currentmodule:: concurrent.futures.interpreter
+
+.. exception:: BrokenInterpreterPool
+
+   Derived from :exc:`~concurrent.futures.thread.BrokenThreadPool`,
+   this exception class is raised when one of the workers
+   of a :class:`~concurrent.futures.InterpreterPoolExecutor`
+   has failed initializing.
+
+   .. versionadded:: next
+
+.. exception:: ExecutionFailed
+
+   Raised from :class:`~concurrent.futures.InterpreterPoolExecutor` when
+   the given initializer fails or from
+   :meth:`~concurrent.futures.Executor.submit` when there's an uncaught
+   exception from the submitted task.
+
+   .. versionadded:: next
+
 .. currentmodule:: concurrent.futures.process
 
 .. exception:: BrokenProcessPool
index b106578fe9e8b0119011656026f3a18800e82430..9543af3c7ca22599a086427523b7ec2a685d3a5a 100644 (file)
@@ -225,6 +225,14 @@ ast
 * The ``repr()`` output for AST nodes now includes more information.
   (Contributed by Tomas R in :gh:`116022`.)
 
+concurrent.futures
+------------------
+
+* Add :class:`~concurrent.futures.InterpreterPoolExecutor`,
+  which exposes "subinterpreters (multiple Python interpreters in the
+  same process) to Python code.  This is separate from the proposed API
+  in :pep:`734`.
+  (Contributed by Eric Snow in :gh:`124548`.)
 
 ctypes
 ------
index 72de617a5b6f613f5c5d37b4ddd4f93567312bfb..7ada7431c1ab8c2b4c20b73ea0224d7927849e6f 100644 (file)
@@ -29,6 +29,7 @@ __all__ = (
     'Executor',
     'wait',
     'as_completed',
+    'InterpreterPoolExecutor',
     'ProcessPoolExecutor',
     'ThreadPoolExecutor',
 )
@@ -39,7 +40,7 @@ def __dir__():
 
 
 def __getattr__(name):
-    global ProcessPoolExecutor, ThreadPoolExecutor
+    global ProcessPoolExecutor, ThreadPoolExecutor, InterpreterPoolExecutor
 
     if name == 'ProcessPoolExecutor':
         from .process import ProcessPoolExecutor as pe
@@ -51,4 +52,13 @@ def __getattr__(name):
         ThreadPoolExecutor = te
         return te
 
+    if name == 'InterpreterPoolExecutor':
+        try:
+            from .interpreter import InterpreterPoolExecutor as ie
+        except ModuleNotFoundError:
+            ie = InterpreterPoolExecutor = None
+        else:
+            InterpreterPoolExecutor = ie
+        return ie
+
     raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
diff --git a/Lib/concurrent/futures/interpreter.py b/Lib/concurrent/futures/interpreter.py
new file mode 100644 (file)
index 0000000..fd7941a
--- /dev/null
@@ -0,0 +1,241 @@
+"""Implements InterpreterPoolExecutor."""
+
+import contextlib
+import pickle
+import textwrap
+from . import thread as _thread
+import _interpreters
+import _interpqueues
+
+
+class ExecutionFailed(_interpreters.InterpreterError):
+    """An unhandled exception happened during execution."""
+
+    def __init__(self, excinfo):
+        msg = excinfo.formatted
+        if not msg:
+            if excinfo.type and excinfo.msg:
+                msg = f'{excinfo.type.__name__}: {excinfo.msg}'
+            else:
+                msg = excinfo.type.__name__ or excinfo.msg
+        super().__init__(msg)
+        self.excinfo = excinfo
+
+    def __str__(self):
+        try:
+            formatted = self.excinfo.errdisplay
+        except Exception:
+            return super().__str__()
+        else:
+            return textwrap.dedent(f"""
+{super().__str__()}
+
+Uncaught in the interpreter:
+
+{formatted}
+                """.strip())
+
+
+UNBOUND = 2  # error; this should not happen.
+
+
+class WorkerContext(_thread.WorkerContext):
+
+    @classmethod
+    def prepare(cls, initializer, initargs, shared):
+        def resolve_task(fn, args, kwargs):
+            if isinstance(fn, str):
+                # XXX Circle back to this later.
+                raise TypeError('scripts not supported')
+                if args or kwargs:
+                    raise ValueError(f'a script does not take args or kwargs, got {args!r} and {kwargs!r}')
+                data = textwrap.dedent(fn)
+                kind = 'script'
+                # Make sure the script compiles.
+                # Ideally we wouldn't throw away the resulting code
+                # object.  However, there isn't much to be done until
+                # code objects are shareable and/or we do a better job
+                # of supporting code objects in _interpreters.exec().
+                compile(data, '<string>', 'exec')
+            else:
+                # Functions defined in the __main__ module can't be pickled,
+                # so they can't be used here.  In the future, we could possibly
+                # borrow from multiprocessing to work around this.
+                data = pickle.dumps((fn, args, kwargs))
+                kind = 'function'
+            return (data, kind)
+
+        if initializer is not None:
+            try:
+                initdata = resolve_task(initializer, initargs, {})
+            except ValueError:
+                if isinstance(initializer, str) and initargs:
+                    raise ValueError(f'an initializer script does not take args, got {initargs!r}')
+                raise  # re-raise
+        else:
+            initdata = None
+        def create_context():
+            return cls(initdata, shared)
+        return create_context, resolve_task
+
+    @classmethod
+    @contextlib.contextmanager
+    def _capture_exc(cls, resultsid):
+        try:
+            yield
+        except BaseException as exc:
+            # Send the captured exception out on the results queue,
+            # but still leave it unhandled for the interpreter to handle.
+            err = pickle.dumps(exc)
+            _interpqueues.put(resultsid, (None, err), 1, UNBOUND)
+            raise  # re-raise
+
+    @classmethod
+    def _send_script_result(cls, resultsid):
+        _interpqueues.put(resultsid, (None, None), 0, UNBOUND)
+
+    @classmethod
+    def _call(cls, func, args, kwargs, resultsid):
+        with cls._capture_exc(resultsid):
+            res = func(*args or (), **kwargs or {})
+        # Send the result back.
+        try:
+            _interpqueues.put(resultsid, (res, None), 0, UNBOUND)
+        except _interpreters.NotShareableError:
+            res = pickle.dumps(res)
+            _interpqueues.put(resultsid, (res, None), 1, UNBOUND)
+
+    @classmethod
+    def _call_pickled(cls, pickled, resultsid):
+        fn, args, kwargs = pickle.loads(pickled)
+        cls._call(fn, args, kwargs, resultsid)
+
+    def __init__(self, initdata, shared=None):
+        self.initdata = initdata
+        self.shared = dict(shared) if shared else None
+        self.interpid = None
+        self.resultsid = None
+
+    def __del__(self):
+        if self.interpid is not None:
+            self.finalize()
+
+    def _exec(self, script):
+        assert self.interpid is not None
+        excinfo = _interpreters.exec(self.interpid, script, restrict=True)
+        if excinfo is not None:
+            raise ExecutionFailed(excinfo)
+
+    def initialize(self):
+        assert self.interpid is None, self.interpid
+        self.interpid = _interpreters.create(reqrefs=True)
+        try:
+            _interpreters.incref(self.interpid)
+
+            maxsize = 0
+            fmt = 0
+            self.resultsid = _interpqueues.create(maxsize, fmt, UNBOUND)
+
+            self._exec(f'from {__name__} import WorkerContext')
+
+            if self.shared:
+                _interpreters.set___main___attrs(
+                                    self.interpid, self.shared, restrict=True)
+
+            if self.initdata:
+                self.run(self.initdata)
+        except BaseException:
+            self.finalize()
+            raise  # re-raise
+
+    def finalize(self):
+        interpid = self.interpid
+        resultsid = self.resultsid
+        self.resultsid = None
+        self.interpid = None
+        if resultsid is not None:
+            try:
+                _interpqueues.destroy(resultsid)
+            except _interpqueues.QueueNotFoundError:
+                pass
+        if interpid is not None:
+            try:
+                _interpreters.decref(interpid)
+            except _interpreters.InterpreterNotFoundError:
+                pass
+
+    def run(self, task):
+        data, kind = task
+        if kind == 'script':
+            raise NotImplementedError('script kind disabled')
+            script = f"""
+with WorkerContext._capture_exc({self.resultsid}):
+{textwrap.indent(data, '    ')}
+WorkerContext._send_script_result({self.resultsid})"""
+        elif kind == 'function':
+            script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})'
+        else:
+            raise NotImplementedError(kind)
+
+        try:
+            self._exec(script)
+        except ExecutionFailed as exc:
+            exc_wrapper = exc
+        else:
+            exc_wrapper = None
+
+        # Return the result, or raise the exception.
+        while True:
+            try:
+                obj = _interpqueues.get(self.resultsid)
+            except _interpqueues.QueueNotFoundError:
+                raise  # re-raise
+            except _interpqueues.QueueError:
+                continue
+            except ModuleNotFoundError:
+                # interpreters.queues doesn't exist, which means
+                # QueueEmpty doesn't.  Act as though it does.
+                continue
+            else:
+                break
+        (res, excdata), pickled, unboundop = obj
+        assert unboundop is None, unboundop
+        if excdata is not None:
+            assert res is None, res
+            assert pickled
+            assert exc_wrapper is not None
+            exc = pickle.loads(excdata)
+            raise exc from exc_wrapper
+        return pickle.loads(res) if pickled else res
+
+
+class BrokenInterpreterPool(_thread.BrokenThreadPool):
+    """
+    Raised when a worker thread in an InterpreterPoolExecutor failed initializing.
+    """
+
+
+class InterpreterPoolExecutor(_thread.ThreadPoolExecutor):
+
+    BROKEN = BrokenInterpreterPool
+
+    @classmethod
+    def prepare_context(cls, initializer, initargs, shared):
+        return WorkerContext.prepare(initializer, initargs, shared)
+
+    def __init__(self, max_workers=None, thread_name_prefix='',
+                 initializer=None, initargs=(), shared=None):
+        """Initializes a new InterpreterPoolExecutor instance.
+
+        Args:
+            max_workers: The maximum number of interpreters that can be used to
+                execute the given calls.
+            thread_name_prefix: An optional name prefix to give our threads.
+            initializer: A callable or script used to initialize
+                each worker interpreter.
+            initargs: A tuple of arguments to pass to the initializer.
+            shared: A mapping of shareabled objects to be inserted into
+                each worker interpreter.
+        """
+        super().__init__(max_workers, thread_name_prefix,
+                         initializer, initargs, shared=shared)
index a024033f35fb548f07d3a2a849834a8d6a781368..16cc5533d429ef1ef12d7fd37ed533e46ce5d7b0 100644 (file)
@@ -43,19 +43,46 @@ if hasattr(os, 'register_at_fork'):
                         after_in_parent=_global_shutdown_lock.release)
 
 
+class WorkerContext:
+
+    @classmethod
+    def prepare(cls, initializer, initargs):
+        if initializer is not None:
+            if not callable(initializer):
+                raise TypeError("initializer must be a callable")
+        def create_context():
+            return cls(initializer, initargs)
+        def resolve_task(fn, args, kwargs):
+            return (fn, args, kwargs)
+        return create_context, resolve_task
+
+    def __init__(self, initializer, initargs):
+        self.initializer = initializer
+        self.initargs = initargs
+
+    def initialize(self):
+        if self.initializer is not None:
+            self.initializer(*self.initargs)
+
+    def finalize(self):
+        pass
+
+    def run(self, task):
+        fn, args, kwargs = task
+        return fn(*args, **kwargs)
+
+
 class _WorkItem:
-    def __init__(self, future, fn, args, kwargs):
+    def __init__(self, future, task):
         self.future = future
-        self.fn = fn
-        self.args = args
-        self.kwargs = kwargs
+        self.task = task
 
-    def run(self):
+    def run(self, ctx):
         if not self.future.set_running_or_notify_cancel():
             return
 
         try:
-            result = self.fn(*self.args, **self.kwargs)
+            result = ctx.run(self.task)
         except BaseException as exc:
             self.future.set_exception(exc)
             # Break a reference cycle with the exception 'exc'
@@ -66,16 +93,15 @@ class _WorkItem:
     __class_getitem__ = classmethod(types.GenericAlias)
 
 
-def _worker(executor_reference, work_queue, initializer, initargs):
-    if initializer is not None:
-        try:
-            initializer(*initargs)
-        except BaseException:
-            _base.LOGGER.critical('Exception in initializer:', exc_info=True)
-            executor = executor_reference()
-            if executor is not None:
-                executor._initializer_failed()
-            return
+def _worker(executor_reference, ctx, work_queue):
+    try:
+        ctx.initialize()
+    except BaseException:
+        _base.LOGGER.critical('Exception in initializer:', exc_info=True)
+        executor = executor_reference()
+        if executor is not None:
+            executor._initializer_failed()
+        return
     try:
         while True:
             try:
@@ -89,7 +115,7 @@ def _worker(executor_reference, work_queue, initializer, initargs):
                 work_item = work_queue.get(block=True)
 
             if work_item is not None:
-                work_item.run()
+                work_item.run(ctx)
                 # Delete references to object. See GH-60488
                 del work_item
                 continue
@@ -110,6 +136,8 @@ def _worker(executor_reference, work_queue, initializer, initargs):
             del executor
     except BaseException:
         _base.LOGGER.critical('Exception in worker', exc_info=True)
+    finally:
+        ctx.finalize()
 
 
 class BrokenThreadPool(_base.BrokenExecutor):
@@ -120,11 +148,17 @@ class BrokenThreadPool(_base.BrokenExecutor):
 
 class ThreadPoolExecutor(_base.Executor):
 
+    BROKEN = BrokenThreadPool
+
     # Used to assign unique thread names when thread_name_prefix is not supplied.
     _counter = itertools.count().__next__
 
+    @classmethod
+    def prepare_context(cls, initializer, initargs):
+        return WorkerContext.prepare(initializer, initargs)
+
     def __init__(self, max_workers=None, thread_name_prefix='',
-                 initializer=None, initargs=()):
+                 initializer=None, initargs=(), **ctxkwargs):
         """Initializes a new ThreadPoolExecutor instance.
 
         Args:
@@ -133,6 +167,7 @@ class ThreadPoolExecutor(_base.Executor):
             thread_name_prefix: An optional name prefix to give our threads.
             initializer: A callable used to initialize worker threads.
             initargs: A tuple of arguments to pass to the initializer.
+            ctxkwargs: Additional arguments to cls.prepare_context().
         """
         if max_workers is None:
             # ThreadPoolExecutor is often used to:
@@ -146,8 +181,9 @@ class ThreadPoolExecutor(_base.Executor):
         if max_workers <= 0:
             raise ValueError("max_workers must be greater than 0")
 
-        if initializer is not None and not callable(initializer):
-            raise TypeError("initializer must be a callable")
+        (self._create_worker_context,
+         self._resolve_work_item_task,
+         ) = type(self).prepare_context(initializer, initargs, **ctxkwargs)
 
         self._max_workers = max_workers
         self._work_queue = queue.SimpleQueue()
@@ -158,13 +194,11 @@ class ThreadPoolExecutor(_base.Executor):
         self._shutdown_lock = threading.Lock()
         self._thread_name_prefix = (thread_name_prefix or
                                     ("ThreadPoolExecutor-%d" % self._counter()))
-        self._initializer = initializer
-        self._initargs = initargs
 
     def submit(self, fn, /, *args, **kwargs):
         with self._shutdown_lock, _global_shutdown_lock:
             if self._broken:
-                raise BrokenThreadPool(self._broken)
+                raise self.BROKEN(self._broken)
 
             if self._shutdown:
                 raise RuntimeError('cannot schedule new futures after shutdown')
@@ -173,7 +207,8 @@ class ThreadPoolExecutor(_base.Executor):
                                    'interpreter shutdown')
 
             f = _base.Future()
-            w = _WorkItem(f, fn, args, kwargs)
+            task = self._resolve_work_item_task(fn, args, kwargs)
+            w = _WorkItem(f, task)
 
             self._work_queue.put(w)
             self._adjust_thread_count()
@@ -196,9 +231,8 @@ class ThreadPoolExecutor(_base.Executor):
                                      num_threads)
             t = threading.Thread(name=thread_name, target=_worker,
                                  args=(weakref.ref(self, weakref_cb),
-                                       self._work_queue,
-                                       self._initializer,
-                                       self._initargs))
+                                       self._create_worker_context(),
+                                       self._work_queue))
             t.start()
             self._threads.add(t)
             _threads_queues[t] = self._work_queue
@@ -214,7 +248,7 @@ class ThreadPoolExecutor(_base.Executor):
                 except queue.Empty:
                     break
                 if work_item is not None:
-                    work_item.future.set_exception(BrokenThreadPool(self._broken))
+                    work_item.future.set_exception(self.BROKEN(self._broken))
 
     def shutdown(self, wait=True, *, cancel_futures=False):
         with self._shutdown_lock:
index 4160656cb133abaf310485d770b7c95129fa9c90..b97d9ffd94b1f8829978018e1c022fb779d8558c 100644 (file)
@@ -23,6 +23,7 @@ def make_dummy_object(_):
 
 
 class ExecutorTest:
+
     # Executor.shutdown() and context manager usage is tested by
     # ExecutorShutdownTest.
     def test_submit(self):
@@ -52,7 +53,8 @@ class ExecutorTest:
         i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
         self.assertEqual(i.__next__(), (0, 1))
         self.assertEqual(i.__next__(), (0, 1))
-        self.assertRaises(ZeroDivisionError, i.__next__)
+        with self.assertRaises(ZeroDivisionError):
+            i.__next__()
 
     @support.requires_resource('walltime')
     def test_map_timeout(self):
diff --git a/Lib/test/test_concurrent_futures/test_interpreter_pool.py b/Lib/test/test_concurrent_futures/test_interpreter_pool.py
new file mode 100644 (file)
index 0000000..0de03c0
--- /dev/null
@@ -0,0 +1,346 @@
+import asyncio
+import contextlib
+import io
+import os
+import pickle
+import sys
+import time
+import unittest
+from concurrent.futures.interpreter import (
+    ExecutionFailed, BrokenInterpreterPool,
+)
+import _interpreters
+from test import support
+import test.test_asyncio.utils as testasyncio_utils
+from test.support.interpreters import queues
+
+from .executor import ExecutorTest, mul
+from .util import BaseTestCase, InterpreterPoolMixin, setup_module
+
+
+def noop():
+    pass
+
+
+def write_msg(fd, msg):
+    os.write(fd, msg + b'\0')
+
+
+def read_msg(fd):
+    msg = b''
+    while ch := os.read(fd, 1):
+        if ch == b'\0':
+            return msg
+        msg += ch
+
+
+def get_current_name():
+    return __name__
+
+
+def fail(exctype, msg=None):
+    raise exctype(msg)
+
+
+def get_current_interpid(*extra):
+    interpid, _ = _interpreters.get_current()
+    return (interpid, *extra)
+
+
+class InterpretersMixin(InterpreterPoolMixin):
+
+    def pipe(self):
+        r, w = os.pipe()
+        self.addCleanup(lambda: os.close(r))
+        self.addCleanup(lambda: os.close(w))
+        return r, w
+
+
+class InterpreterPoolExecutorTest(
+            InterpretersMixin, ExecutorTest, BaseTestCase):
+
+    @unittest.expectedFailure
+    def test_init_script(self):
+        msg1 = b'step: init'
+        msg2 = b'step: run'
+        r, w = self.pipe()
+        initscript = f"""
+            import os
+            msg = {msg2!r}
+            os.write({w}, {msg1!r} + b'\\0')
+            """
+        script = f"""
+            os.write({w}, msg + b'\\0')
+            """
+        os.write(w, b'\0')
+
+        executor = self.executor_type(initializer=initscript)
+        before_init = os.read(r, 100)
+        fut = executor.submit(script)
+        after_init = read_msg(r)
+        fut.result()
+        after_run = read_msg(r)
+
+        self.assertEqual(before_init, b'\0')
+        self.assertEqual(after_init, msg1)
+        self.assertEqual(after_run, msg2)
+
+    @unittest.expectedFailure
+    def test_init_script_args(self):
+        with self.assertRaises(ValueError):
+            self.executor_type(initializer='pass', initargs=('spam',))
+
+    def test_init_func(self):
+        msg = b'step: init'
+        r, w = self.pipe()
+        os.write(w, b'\0')
+
+        executor = self.executor_type(
+                initializer=write_msg, initargs=(w, msg))
+        before = os.read(r, 100)
+        executor.submit(mul, 10, 10)
+        after = read_msg(r)
+
+        self.assertEqual(before, b'\0')
+        self.assertEqual(after, msg)
+
+    def test_init_closure(self):
+        count = 0
+        def init1():
+            assert count == 0, count
+        def init2():
+            nonlocal count
+            count += 1
+
+        with self.assertRaises(pickle.PicklingError):
+            self.executor_type(initializer=init1)
+        with self.assertRaises(pickle.PicklingError):
+            self.executor_type(initializer=init2)
+
+    def test_init_instance_method(self):
+        class Spam:
+            def initializer(self):
+                raise NotImplementedError
+        spam = Spam()
+
+        with self.assertRaises(pickle.PicklingError):
+            self.executor_type(initializer=spam.initializer)
+
+    def test_init_shared(self):
+        msg = b'eggs'
+        r, w = self.pipe()
+        script = f"""if True:
+            import os
+            if __name__ != '__main__':
+                import __main__
+                spam = __main__.spam
+            os.write({w}, spam + b'\\0')
+            """
+
+        executor = self.executor_type(shared={'spam': msg})
+        fut = executor.submit(exec, script)
+        fut.result()
+        after = read_msg(r)
+
+        self.assertEqual(after, msg)
+
+    @unittest.expectedFailure
+    def test_init_exception_in_script(self):
+        executor = self.executor_type(initializer='raise Exception("spam")')
+        with executor:
+            with contextlib.redirect_stderr(io.StringIO()) as stderr:
+                fut = executor.submit('pass')
+                with self.assertRaises(BrokenInterpreterPool):
+                    fut.result()
+        stderr = stderr.getvalue()
+        self.assertIn('ExecutionFailed: Exception: spam', stderr)
+        self.assertIn('Uncaught in the interpreter:', stderr)
+        self.assertIn('The above exception was the direct cause of the following exception:',
+                      stderr)
+
+    def test_init_exception_in_func(self):
+        executor = self.executor_type(initializer=fail,
+                                      initargs=(Exception, 'spam'))
+        with executor:
+            with contextlib.redirect_stderr(io.StringIO()) as stderr:
+                fut = executor.submit(noop)
+                with self.assertRaises(BrokenInterpreterPool):
+                    fut.result()
+        stderr = stderr.getvalue()
+        self.assertIn('ExecutionFailed: Exception: spam', stderr)
+        self.assertIn('Uncaught in the interpreter:', stderr)
+        self.assertIn('The above exception was the direct cause of the following exception:',
+                      stderr)
+
+    @unittest.expectedFailure
+    def test_submit_script(self):
+        msg = b'spam'
+        r, w = self.pipe()
+        script = f"""
+            import os
+            os.write({w}, __name__.encode('utf-8') + b'\\0')
+            """
+        executor = self.executor_type()
+
+        fut = executor.submit(script)
+        res = fut.result()
+        after = read_msg(r)
+
+        self.assertEqual(after, b'__main__')
+        self.assertIs(res, None)
+
+    def test_submit_closure(self):
+        spam = True
+        def task1():
+            return spam
+        def task2():
+            nonlocal spam
+            spam += 1
+            return spam
+
+        executor = self.executor_type()
+        with self.assertRaises(pickle.PicklingError):
+            executor.submit(task1)
+        with self.assertRaises(pickle.PicklingError):
+            executor.submit(task2)
+
+    def test_submit_local_instance(self):
+        class Spam:
+            def __init__(self):
+                self.value = True
+
+        executor = self.executor_type()
+        with self.assertRaises(pickle.PicklingError):
+            executor.submit(Spam)
+
+    def test_submit_instance_method(self):
+        class Spam:
+            def run(self):
+                return True
+        spam = Spam()
+
+        executor = self.executor_type()
+        with self.assertRaises(pickle.PicklingError):
+            executor.submit(spam.run)
+
+    def test_submit_func_globals(self):
+        executor = self.executor_type()
+        fut = executor.submit(get_current_name)
+        name = fut.result()
+
+        self.assertEqual(name, __name__)
+        self.assertNotEqual(name, '__main__')
+
+    @unittest.expectedFailure
+    def test_submit_exception_in_script(self):
+        fut = self.executor.submit('raise Exception("spam")')
+        with self.assertRaises(Exception) as captured:
+            fut.result()
+        self.assertIs(type(captured.exception), Exception)
+        self.assertEqual(str(captured.exception), 'spam')
+        cause = captured.exception.__cause__
+        self.assertIs(type(cause), ExecutionFailed)
+        for attr in ('__name__', '__qualname__', '__module__'):
+            self.assertEqual(getattr(cause.excinfo.type, attr),
+                             getattr(Exception, attr))
+        self.assertEqual(cause.excinfo.msg, 'spam')
+
+    def test_submit_exception_in_func(self):
+        fut = self.executor.submit(fail, Exception, 'spam')
+        with self.assertRaises(Exception) as captured:
+            fut.result()
+        self.assertIs(type(captured.exception), Exception)
+        self.assertEqual(str(captured.exception), 'spam')
+        cause = captured.exception.__cause__
+        self.assertIs(type(cause), ExecutionFailed)
+        for attr in ('__name__', '__qualname__', '__module__'):
+            self.assertEqual(getattr(cause.excinfo.type, attr),
+                             getattr(Exception, attr))
+        self.assertEqual(cause.excinfo.msg, 'spam')
+
+    def test_saturation(self):
+        blocker = queues.create()
+        executor = self.executor_type(4, shared=dict(blocker=blocker))
+
+        for i in range(15 * executor._max_workers):
+            executor.submit(exec, 'import __main__; __main__.blocker.get()')
+            #executor.submit('blocker.get()')
+        self.assertEqual(len(executor._threads), executor._max_workers)
+        for i in range(15 * executor._max_workers):
+            blocker.put_nowait(None)
+        executor.shutdown(wait=True)
+
+    @support.requires_gil_enabled("gh-117344: test is flaky without the GIL")
+    def test_idle_thread_reuse(self):
+        executor = self.executor_type()
+        executor.submit(mul, 21, 2).result()
+        executor.submit(mul, 6, 7).result()
+        executor.submit(mul, 3, 14).result()
+        self.assertEqual(len(executor._threads), 1)
+        executor.shutdown(wait=True)
+
+
+class AsyncioTest(InterpretersMixin, testasyncio_utils.TestCase):
+
+    def setUp(self):
+        super().setUp()
+        self.loop = asyncio.new_event_loop()
+        self.set_event_loop(self.loop)
+
+        self.executor = self.executor_type()
+        self.addCleanup(lambda: self.executor.shutdown())
+
+    def tearDown(self):
+        if not self.loop.is_closed():
+            testasyncio_utils.run_briefly(self.loop)
+
+        self.doCleanups()
+        support.gc_collect()
+        super().tearDown()
+
+    def test_run_in_executor(self):
+        unexpected, _ = _interpreters.get_current()
+
+        func = get_current_interpid
+        fut = self.loop.run_in_executor(self.executor, func, 'yo')
+        interpid, res = self.loop.run_until_complete(fut)
+
+        self.assertEqual(res, 'yo')
+        self.assertNotEqual(interpid, unexpected)
+
+    def test_run_in_executor_cancel(self):
+        executor = self.executor_type()
+
+        called = False
+
+        def patched_call_soon(*args):
+            nonlocal called
+            called = True
+
+        func = time.sleep
+        fut = self.loop.run_in_executor(self.executor, func, 0.05)
+        fut.cancel()
+        self.loop.run_until_complete(
+                self.loop.shutdown_default_executor())
+        self.loop.close()
+        self.loop.call_soon = patched_call_soon
+        self.loop.call_soon_threadsafe = patched_call_soon
+        time.sleep(0.4)
+        self.assertFalse(called)
+
+    def test_default_executor(self):
+        unexpected, _ = _interpreters.get_current()
+
+        self.loop.set_default_executor(self.executor)
+        fut = self.loop.run_in_executor(None, get_current_interpid)
+        interpid, = self.loop.run_until_complete(fut)
+
+        self.assertNotEqual(interpid, unexpected)
+
+
+def setUpModule():
+    setup_module()
+
+
+if __name__ == "__main__":
+    unittest.main()
index 3b8ec3e205d5aa77bcd6e33e9ee2f657b271f94f..52baab51340fc9fecf7e23aabde9c4c0e9229029 100644 (file)
@@ -74,6 +74,10 @@ class ThreadPoolMixin(ExecutorMixin):
     executor_type = futures.ThreadPoolExecutor
 
 
+class InterpreterPoolMixin(ExecutorMixin):
+    executor_type = futures.InterpreterPoolExecutor
+
+
 class ProcessPoolForkMixin(ExecutorMixin):
     executor_type = futures.ProcessPoolExecutor
     ctx = "fork"
@@ -120,6 +124,7 @@ class ProcessPoolForkserverMixin(ExecutorMixin):
 
 def create_executor_tests(remote_globals, mixin, bases=(BaseTestCase,),
                           executor_mixins=(ThreadPoolMixin,
+                                           InterpreterPoolMixin,
                                            ProcessPoolForkMixin,
                                            ProcessPoolForkserverMixin,
                                            ProcessPoolSpawnMixin)):
diff --git a/Misc/NEWS.d/next/Library/2024-09-27-15-42-55.gh-issue-124694.uUy32y.rst b/Misc/NEWS.d/next/Library/2024-09-27-15-42-55.gh-issue-124694.uUy32y.rst
new file mode 100644 (file)
index 0000000..1aa1a46
--- /dev/null
@@ -0,0 +1,6 @@
+We've added :class:`concurrent.futures.InterpreterPoolExecutor`, which
+allows you to run code in multiple isolated interpreters.  This allows you
+to circumvent the limitations of CPU-bound threads (due to the GIL). Patch
+by Eric Snow.
+
+This addition is unrelated to :pep:`734`.